SubDAGs
Airflow에서, 하위 DAG를 생성하는 등의 계층화를 통해 복잡한 task들의 관계를 더 추상화할 수 있게 하는 개념.
DAG 내부에서 또 다른 DAG를 실행할 수 있게 하는 기능. 상위 DAG에서 하위 DAG을 하나의 task처럼 다룰 수 있음.
SubDAG은,
(1) DAG 내부에서 논리적으로 연관된 task들을 묶어서 재사용할 수 있게 함.
(2) 복잡한 DAG을 단순화 및 모듈화할 수 있게 함.
그림과 같이 파일을 다운로드하는 task가 3개 있고, 처리하는 task가 또 3개 있다고 하자. 코드로 나타내면 아래와 같음.
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
with DAG('group_dag', start_date=datetime(2022, 1, 1),
schedule_interval='@daily', catchup=False) as dag:
download_a = BashOperator(
task_id='download_a',
bash_command='sleep 10'
)
download_b = BashOperator(
task_id='download_b',
bash_command='sleep 10'
)
download_c = BashOperator(
task_id='download_c',
bash_command='sleep 10'
)
check_files = BashOperator(
task_id='check_files',
bash_command='sleep 10'
)
transform_a = BashOperator(
task_id='transform_a',
bash_command='sleep 10'
)
transform_b = BashOperator(
task_id='transform_b',
bash_command='sleep 10'
)
transform_c = BashOperator(
task_id='transform_c',
bash_command='sleep 10'
)
[download_a, download_b, download_c] >> check_files >> [transform_a, transform_b, transform_c]
이렇게 하나의 role을 여러개의 task가 나눠서 하는 경우에는, DAG을 그리기도 복잡하고, 또 유지보수하기도 어렵다.
그래서 이걸, 논리적으로 연관된 task들을 묶어서 하나의 DAG로 만들어주는 것이다.
이를 위해서는,
(1) SubDAG를 만들어준다 - 일반적으로 dag 만들듯이 dags 폴더에 만들어도 되겠지만, 일반 dag과의 구별을 위해 subdags 폴더를 만들어서 그 안에 작성해 줌.
from airflow import DAG
from airflow.operators.bash import BashOperator
def subdag_downloads(parent_dag_id, child_dag_id, args):
with DAG(f"{parent_dag_id}.{child_dag_id}",
start_date=args['start_date'],
schedule_interval=args['schedule_interval'],
catchup=args['catchup']) as dag:
download_a = BashOperator(
task_id='download_a',
bash_command='sleep 10'
)
// download_a와 병렬인 것들 다 집어넣기
return dag
subdag_downloads라는 함수를 만들고, parent_dag_id, child_dag_id, args 이렇게 파라미터를 받아준다.
그 안에 기존의 tasks들을 대체할 하나의 DAG(subDAG)을 정의한다.
이때 중요한 점은,
1 DAG의 이름은 <parentDAG_name>.<childDAG_name> 형태가 되어야 하고
2 나머지 값들을 모두 args 형태로 받아줘야 함.
3 그리고 with 절의 끝에 return으로 dag를 반환해줘야 함.
(2) 기존 DAG 파일의 task들을 SubDAG으로 대체해준다.
...
from airflow.operators.subdag import SubDagOperator
from subdags.subdag_downloads import subdag_downloads
with DAG('group_dag', ...) as dag:
args = {'start_date': dag.start_date, ...} # args 만들어주고
// downloads로 대체
downloads = SubDagOperator(
task_id='downloads',
subdag=subdag_downloads(dag.dag_id, 'downloads', args)
)
...
// 아래의 dependencies도 subdag명으로 대체
downloads >> check_files >> [transform_a, transform_b, transform_c]
1 args라는 dictionary를 정의해서 넘겨주기 쉽게 준비해둔다
2 대체 대상인 task들을 모두 지우고 subDAG을 그 자리에 집어넣는다. 그 과정에서는 SubDagOperator를 사용한다.
3 이때, 사실상 parentDAG의 입장에서 subDAG은 하나의 task와 같은 개념이므로, task_id와 subdag의 이름은 통일한다. 이렇게 하지 않으면 문제가 생길 수 있다고 함.
이렇게 subdag으로 대체하고 나면, 아래의 기존의 graph가
이렇게 바뀌고,
downloads를 zoom-in 해서 들어가면
이렇게 grouping 되어있는 것을 확인할 수 있다.
결론적으로, SubDAG이 왜 필요한지는 알았지만, 그래도 귀차니즘으로 점철된 우리 개발자들에게는 이조차도 귀찮다.
더불어서, SubDAG 내부의 task들은 기본이 sequential로 진행됨. 그래서 우리는, TaskGroup을 쓴다.
SubDAG에서 TaskGroup으로!
TaskGroup은 더 간소화된 코드로, SubDAG을 대체한다. 심지어 병렬화도 지원한다.
1 subdags가 아니라 groups로 폴더명을 바꿈.
2 기존의 subdag_downloads.py는 다음과 같이 바꿈.
from airflow.operators.bash import BashOperator
from airflow.utils.task_group import TaskGroup
def download_tasks():
with TaskGroup("downloads", tooltip="Download tasks") as group:
download_a = BashOperator(
task_id='download_a',
bash_command='sleep 10'
)
// download_a와 병렬인 것들 다 집어넣기
return group
3 기존의 parentDAG는 다음과 같이 바꿈.
...
from groups.group_downloads import download_tasks
with DAG('group_dag', ...) as dag:
// 훨씬 간편해짐. 걍 만든 함수만 부르면 끝.
downloads = download_tasks()
...
// 아래의 dependencies도 subdag명으로 대체
downloads >> check_files >> [transform_a, transform_b, transform_c]
일단 args부터 시작해서 parameter들 거의 안 써도 되게끔 바꿨고, 특히 parentDAG와 childDAG 이름 맞춰주고 이런 걸 모두 자동화함으로써 우리가 건드려야 하는 부분이 줄어 편의성이 확 개선되었다.
그리고 UI를 살펴보면,
이렇게 색깔이 들어가게 바뀌었고, 각각을 펼쳐보면
이렇게 됨. 훨씬 편하쥬?
그런데 든 생각이,
❓만약 group에 부모 DAG의 메타데이터 - 예를 들면 start_date, schedule 등 - 와 다른 값을 적용하고 싶으면 어떻게 해줘야 할까?
❗TaskGroup에서는 무조건 상위 DAG의 설정을 물려받아야 함. 만약 바꾸고 싶으면 SubDAG을 써야 함.
그래서 TaskGroup은 말그대로 상위 DAG의 값들을 그대로 가져와서 논리적으로 묶어둔 박스같은 느낌이라면, subDAG은 말그대로 그 자체로 메타데이터를 가지는 DAG인 것이다.
최종적으로 SubDAG vs TaskGroup
TaskGroup이 설정도 편하고, 관리 오버헤드도 없으므로 더 가볍고 빠름.
그러나 자체적인 설정변경은 어렵고 부모 DAG 설정 그대로 상속받아야 함.
그러나 부모 DAG 설정을 바꿔야 하는 경우가 잘 없어서 실무에서는 TaskGroup을 쓰는 추세.
'데이터 엔지니어링' 카테고리의 다른 글
[Airflow] Airflow 내맘대로 꾸미기 (Elasticsearch, plugin) (0) | 2025.02.04 |
---|---|
[Airflow] Fancy functions: XCom (기본 Intra) + Branching (0) | 2025.02.02 |
[Airflow] 닉값 못하는 실행자와, 그 단짝 데이터베이스 (0) | 2025.02.02 |
[Airflow] 증기기관에 필적하는 dataset에 대하여 (inter-DAG) (0) | 2025.02.01 |
[Airflow] 비로소, DAG 작성하고 테스트해보기 (0) | 2025.01.31 |