본문 바로가기

데이터 엔지니어링

[Airflow] 진화론, SubDAG을 지나 TaskGroups까지! (intra-DAG)

SubDAGs

Airflow에서, 하위 DAG를 생성하는 등의 계층화를 통해 복잡한 task들의 관계를 더 추상화할 수 있게 하는 개념.

DAG 내부에서 또 다른 DAG를 실행할 수 있게 하는 기능. 상위 DAG에서 하위 DAG을 하나의 task처럼 다룰 수 있음.

SubDAG은,

 

(1) DAG 내부에서 논리적으로 연관된 task들을 묶어서 재사용할 수 있게 함.

(2) 복잡한 DAG을 단순화 및 모듈화할 수 있게 함.

 

 

그림과 같이 파일을 다운로드하는 task가 3개 있고, 처리하는 task가 또 3개 있다고 하자. 코드로 나타내면 아래와 같음.

from airflow import DAG
from airflow.operators.bash import BashOperator
 
from datetime import datetime
 
with DAG('group_dag', start_date=datetime(2022, 1, 1), 
    schedule_interval='@daily', catchup=False) as dag:
 
    download_a = BashOperator(
        task_id='download_a',
        bash_command='sleep 10'
    )
 
    download_b = BashOperator(
        task_id='download_b',
        bash_command='sleep 10'
    )
 
    download_c = BashOperator(
        task_id='download_c',
        bash_command='sleep 10'
    )
 
    check_files = BashOperator(
        task_id='check_files',
        bash_command='sleep 10'
    )
 
    transform_a = BashOperator(
        task_id='transform_a',
        bash_command='sleep 10'
    )
 
    transform_b = BashOperator(
        task_id='transform_b',
        bash_command='sleep 10'
    )
 
    transform_c = BashOperator(
        task_id='transform_c',
        bash_command='sleep 10'
    )
 
    [download_a, download_b, download_c] >> check_files >> [transform_a, transform_b, transform_c]

 

이렇게 하나의 role을 여러개의 task가 나눠서 하는 경우에는, DAG을 그리기도 복잡하고, 또 유지보수하기도 어렵다.

그래서 이걸, 논리적으로 연관된 task들을 묶어서 하나의 DAG로 만들어주는 것이다.

 

이를 위해서는,

 

(1) SubDAG를 만들어준다 - 일반적으로 dag 만들듯이 dags 폴더에 만들어도 되겠지만, 일반 dag과의 구별을 위해 subdags 폴더를 만들어서 그 안에 작성해 줌.

from airflow import DAG
from airflow.operators.bash import BashOperator

def subdag_downloads(parent_dag_id, child_dag_id, args):

    with DAG(f"{parent_dag_id}.{child_dag_id}",
        start_date=args['start_date'],
        schedule_interval=args['schedule_interval'],
        catchup=args['catchup']) as dag:
        
        download_a = BashOperator(
            task_id='download_a',
            bash_command='sleep 10'
        )
        
        // download_a와 병렬인 것들 다 집어넣기
    
        return dag

 

subdag_downloads라는 함수를 만들고, parent_dag_id, child_dag_id, args 이렇게 파라미터를 받아준다.

 

그 안에 기존의 tasks들을 대체할 하나의 DAG(subDAG)을 정의한다.

이때 중요한 점은,

1 DAG의 이름은 <parentDAG_name>.<childDAG_name> 형태가 되어야 하고

2 나머지 값들을 모두 args 형태로 받아줘야 함.

3 그리고 with 절의 끝에 return으로 dag를 반환해줘야 함.

 

 

(2) 기존 DAG 파일의 task들을 SubDAG으로 대체해준다.

...
from airflow.operators.subdag import SubDagOperator
from subdags.subdag_downloads import subdag_downloads

with DAG('group_dag', ...) as dag:

    args = {'start_date': dag.start_date, ...}	# args 만들어주고   
    
    // downloads로 대체
    downloads = SubDagOperator(
        task_id='downloads',
        subdag=subdag_downloads(dag.dag_id, 'downloads', args)
    )
    
    ...
    
    // 아래의 dependencies도 subdag명으로 대체
    downloads >> check_files >> [transform_a, transform_b, transform_c]

 

1 args라는 dictionary를 정의해서 넘겨주기 쉽게 준비해둔다

2 대체 대상인 task들을 모두 지우고 subDAG을 그 자리에 집어넣는다. 그 과정에서는 SubDagOperator를 사용한다.

3 이때, 사실상 parentDAG의 입장에서 subDAG은 하나의 task와 같은 개념이므로, task_id와 subdag의 이름은 통일한다. 이렇게 하지 않으면 문제가 생길 수 있다고 함.

 

 

이렇게 subdag으로 대체하고 나면, 아래의 기존의 graph가

이렇게 바뀌고,

downloads를 zoom-in 해서 들어가면

이렇게 grouping 되어있는 것을 확인할 수 있다.

 

 

결론적으로, SubDAG이 왜 필요한지는 알았지만, 그래도 귀차니즘으로 점철된 우리 개발자들에게는 이조차도 귀찮다.

더불어서, SubDAG 내부의 task들은 기본이 sequential로 진행됨. 그래서 우리는, TaskGroup을 쓴다.

 

 

SubDAG에서 TaskGroup으로!

TaskGroup은 더 간소화된 코드로, SubDAG을 대체한다. 심지어 병렬화도 지원한다.

 

1 subdags가 아니라 groups로 폴더명을 바꿈.

2 기존의 subdag_downloads.py는 다음과 같이 바꿈.

from airflow.operators.bash import BashOperator
from airflow.utils.task_group import TaskGroup

def download_tasks():
    with TaskGroup("downloads", tooltip="Download tasks") as group:
        
        download_a = BashOperator(
            task_id='download_a',
            bash_command='sleep 10'
        )
        
        // download_a와 병렬인 것들 다 집어넣기
    
        return group

 

3 기존의 parentDAG는 다음과 같이 바꿈.

...
from groups.group_downloads import download_tasks

with DAG('group_dag', ...) as dag:
    
    // 훨씬 간편해짐. 걍 만든 함수만 부르면 끝.
    downloads = download_tasks()
    
    ...
    
    // 아래의 dependencies도 subdag명으로 대체
    downloads >> check_files >> [transform_a, transform_b, transform_c]

 

일단 args부터 시작해서 parameter들 거의 안 써도 되게끔 바꿨고, 특히 parentDAG와 childDAG 이름 맞춰주고 이런 걸 모두 자동화함으로써 우리가 건드려야 하는 부분이 줄어 편의성이 확 개선되었다.

 

그리고 UI를 살펴보면,

 

이렇게 색깔이 들어가게 바뀌었고, 각각을 펼쳐보면

이렇게 됨. 훨씬 편하쥬?

 

 

그런데 든 생각이,

만약 group에 부모 DAG의 메타데이터 - 예를 들면 start_date, schedule 등 - 와 다른 값을 적용하고 싶으면 어떻게 해줘야 할까?

 

❗TaskGroup에서는 무조건 상위 DAG의 설정을 물려받아야 함. 만약 바꾸고 싶으면 SubDAG을 써야 함.

그래서 TaskGroup은 말그대로 상위 DAG의 값들을 그대로 가져와서 논리적으로 묶어둔 박스같은 느낌이라면, subDAG은 말그대로 그 자체로 메타데이터를 가지는 DAG인 것이다.

 

 

최종적으로 SubDAG vs TaskGroup

TaskGroup이 설정도 편하고, 관리 오버헤드도 없으므로 더 가볍고 빠름.

그러나 자체적인 설정변경은 어렵고 부모 DAG 설정 그대로 상속받아야 함.

그러나 부모 DAG 설정을 바꿔야 하는 경우가 잘 없어서 실무에서는 TaskGroup을 쓰는 추세.