데이터 엔지니어링

[Airflow] 닉값 못하는 실행자와, 그 단짝 데이터베이스

magnate96 2025. 2. 2. 00:03

Executors

 

이름이 executor지만 task를 실행하지 않음. 어떤식으로 worker를 운영할지 결정하는 컴포넌트임.

 

어떤 executor를 사용할지는 config 파일에 들어가서 바꿔주면 되는데, airflow의 기본템은 sequantial executor.

이때 만약 celery executor를 쓰고 싶다면, airflow config 파일을 들어가서 수정하고 그럴 필요 없음. docker-compose 를 쓸 거라면 docker-compose.yaml이 어차피 기본파일을 override 하기 때문에 거기에 AIRFLOW_CORE_EXECUTOR: CeleryExecutor 해주면 그만임. 아래와 같이.

 

Executor은 대략 다음과 같은 종류들이 있음.

 

각각의 executor들은 task 실행 방식이 다르므로, task의 실행상태를 추적하고 저장할 DB를 필요로 함.

 

정리하면, 병렬화의 여지가 있으면 SQLite는 안되고 다른 RDBMS 쓴다는 거다.

 

 

Sequential Executor

말그대로 직렬로 실행함. 그렇다고 일자형 DAG만 할 수 있다는 게 아니고,

이렇게 생겼다면 T1 -> T2 -> T3 -> T4 이런식으로 진행한다는 뜻.

설정은, 기본 sqlite 를 활용하므로 추가적인 config 필요없음.

 

 

Local Executor

Sequential과 똑같이 하나의 host에서 돌지만, 병렬실행이 가능한, 한 단계 업그레이드 버전임.

[core]
executor = LocalExecutor

[database]
sql_alchemy_conn = postgresql+psycopg2://user:password@postgres:5432/airflow

 

 

Celery Executor

여러 개의 workers 들을 활용하여, Celery queue의

Broker에서 worker들이 실행대기 중인 task를 pull 해가고, 끝난 결과물을 Result Backend에 저장하는 방식으로 진행됨.

 

Celery Queue라서 추가로 설정해줘야 하는 부분은, 사진을 봐도 알 수 있지만, Celery Queue의 Result Backend와 Broker.

Broker는 메시지 큐, 그러니까 RabbitMQ 혹은 redis 같은 메시지 브로커로 설정하고,

Result Backend에는 일반적인 Postgres 혹은 MySQL 등을 설정할 수 있음.

[core]
executor = CeleryExecutor

[database]
sql_alchemy_conn = postgresql+psycopg2://user:password@postgres:5432/airflow

[celery]
broker_url = redis://redis:6379/0
result_backend = db+postgresql://user:password@postgres:5432/airflow

 

 

Celery를 위한 flower

Flower는 Celery만을 위한 모니터링 도구. 다른 executor에는 flower가 없음.

docker-compose down && docker-compose --profile flower up -d

 

 

Celery에서의 Queue

 

정리하면, Celery 내부에는 두개의 종류의 Queue가 있다. 중앙에서 모든 데이터를 일차적으로 모으는 Central Broker Queue (일명 broker) 가 있고, 여기서 각각의 worker에게 보낼 목적으로 만든 worker-specific queue 들이 해당 하는 worker에 연결되게끔 있다. 당연히 이건 worker의 개수만큼 있다. 그리고 그 둘과 별개로, worker에서 나온 결과물을 모두 다시 모으는 곳인 result backend가 있다. 이건 그냥 저장소임.

 

그림이 헷갈리게 그려져 있는데, 왼쪽의 Celery에서, 

 

Celery에서 queue가 특별히 더 중요한 이유는, worker들이 서로 다른 조건에 있을 때, 어떤 worker에 어떤 일을 배분하는지가 일을 효율적으로 진행할 수 있는지에 매우 중요한 조건이 되기 때문. 위 사진처럼 5 cpu, 1 gpu, 1 cpu 를 각각 갖고 있는 worker들이 있다고 하면, 각각에 대해 보낼 task들을 사전에 분류해서 queueing을 해둬야 효율적인 실행이 가능함.

 

특정한 worker에게 특정한 task만 주고 싶다면, 그걸 사전에 config 단에서 정리할 수 있음.

 

이렇게, worker-2를 추가하고, 그에 해당하는 command를 -q 옵션과 high_cpu라는 큐 이름을 넣어주면 됨.

이렇게 하면 다른 worker가 high_cpu에 포함 안된 모든 task를 떠안게 됨(default queue).

 

그럼 특정 task를 어떻게 저 queue에 넣냐고?

 

이렇게 그냥 task 내부에 queue= 옵션으로 특정해주면 됨! 저게 없으면 그냥 default queue행임.

 

 

Airflow에서의 Concurrency에 대하여

추가로, 하나의 task를 실행하는 것은 하나의 cpu (물론 그 내부의 core다) 그렇기에 celery와 같은 병렬은, local과 같은 단순 병렬이 아니라, 병렬이 두번되는, 병렬병렬 혹은 더블 병렬이라고 칭할 수 있음. 이걸 공식적으로는 "분산 병렬 처리" 라고 부른다고 함. airflow와는 크게 상관없지만 병렬을 정리해보면 이러함.

 

Airflow에서의 동시성은 한번에 실행할 수 있는 task 혹은 한번에 실행할 수 있는 DAG run 개수를 조절함.
여러개의 파라미터로 조절 가능하지만, 시스템 리소스와 실행 환경에 따라 적절한 값을 설정해야 함.

parallelism: 하나의 scheduler가 Airflow 전체에서 실행가능한 task 수. 그러니까 scheduler 수만큼 곱한 게 토탈.

max_active_tasks_per_dag은 하나의 dag에서 한번에 실행 가능한 task 수. 그니까 쉬운데,

max_active_runs_per_dag은 하나의 dag에서 한번에 실행 가능한 run 수. 이게 뭐지? 스케줄러 여러 개가 하나의 dag을 돌리는 건 지양하고, 한번에 하나의 스케줄러는 하나의 dag instance를 만들텐데, 여러개, 특히나 16개씩이나 두는 이유가 뭘까, 생각해봤는데, (1) catchup을 쓴 경우 밀린 것들을 한번에 진행하니까 그럴 수 있고 (2) 실행 시간이 주기보다 긴 경우도 간혹 있을 수 있다고 함.