XCom 이란?
두 개의 서로 다른 태스크 사이에 공유되는 메모리는, external tool을 활용할 수도 있겠지만 그렇게 하면 GB, TB의 데이터에 대해서는 효율적이겠지만, 작고 가벼운 메모리에 대해서는 비효율적임.
그래서 task 사이에 공유하는 작고 가벼운 메모리에 대해서는, Airflow에서 일반적으로 XCom을 사용.
XCom은 기본적으로 Airflow의 메타데이터 DB에 저장됨.
특정 Task에서 데이터를 push하고, 다른 task에서 이를 pull하는 식으로 진행.
XCom은 Task 간의 데이터 공유를 가능케 하지만, 파일 시스템을 직접 활용하는 방식은 아님.
파일 시스템이 아닌 메타데이터 DB에 저장됨.
XCom의 데이터는 Key-Value 형식으로 저장됨. Airflow의 메타데이터 DB에 저장되며, task_id, dag_id, key 값을 기준으로 조회 가능. 예를 들면 아래와 같이.
XCom 활용법
기본적으로는, @task를 쓴 Task의 python function에서 뭔가를 return 하면, 자동으로 XCom에 저장됨. 위의 그림에서의 key 아래에 return_value라고 쓰여 있다면, 별다른 key 주입 없이 return된 값을 저장했다는 의미다.
만약 @task를 안 썼거나, 그냥 return 하는 게 아닌 특정한 key를 포함하고 싶다면, xcom_push() 사용하면 됨. 이때는 python 함수의 parameter로 ti를 써줌.
@task
def push_data(ti):
data = {"name": "Alice", "age": 30}
ti.xcom_push(key="user_data", value=data) # 데이터 저장 (push)
이렇게 하면 key 값에는 "user_data"가 들어감.
이렇게 저장해둔 키-값을 가져오고 싶다면, xcom_pull() 해주면 됨. task_ids, key를 특정할 수 있음.
@task
def pull_data(ti):
data = ti.xcom_pull(task_ids="push_data", key="user_data") # 데이터 조회 (pull)
print(f"Received Data: {data}") # {'name': 'Alice', 'age': 30} 출력
XCom의 특징
1) XCom 데이터 크기 제한
XCom은 Airflow 메타데이터 DB에 저장되므로, 대용량 데이터를 저장하는 것은 비효율적.
예를 들어 대량의 CSV, JSON 등의 데이터는 XCom에 저장하면 안됨. 대신, S3, GCS, Local filesystem에 저장하고 path만 XCom에 저장하는 것이 좋음.
2) XCom은 기본적으로 같은 DAG 내에서만 사용 가능
기본적으로 하나의 DAG 내에서만 사용할 목적으로 만들어짐. 만약 정말 원한다면, Custom XCom Backend를 설정하면 서로 다른 DAG 간에 사용할 수도 있기는 함.
3) Push된 데이터를 여러 task에서 Pull 가능
한 task에서 xcom_push()로 저장한 데이터는 여러개의 task에서 동시에 xcom_pull() 가능.
즉, 여러 task가 동일한 데이터를 공유할 수 있음.
갑자기 생기는 궁금증
❓ 여러 개의 task가 하나의 XCom 데이터에 대해서 변경하려 들면, 이게 어떻게 되는거지? 변경이 되나? 아니면 이를 해결하는 로직을 설계해야 하나?
❗
1 일단 XCom은 기본적으로 읽기 전용임. 그래서 pull해서 수정한다고 해도 그게 메타데이터에는 반영 안됨.
2 동일한 DAG, 동일한 task, 동일한 execution_date, 동일한 key라면 "수정"이 가능함. 그게 아니라면 아예 composite key가 다르기 때문에 서로 다른 데이터로 취급, 따로 저장됨.
3 물론 필요하다면 locking을 쓸 수 있음. airflow의 기본 기능이 아니므로 reddis를 쓴다던가 하는 식으로. 그러나 그럴 일은 잘 없을 것으로 추정됨.
분기를 만들어 보자 - Branching
Condition에 따라 서로 다른 task를 실행시키기 위해 branching을 시전할 수 있음. BranchPythonOperator 쓰면 됨.
활용법은 일단 간단하다. 기존에 t1, t2, t3, t4가 이미 정의됐다고 가정하고, t1 이후에 t1의 결과에 따라 t2 혹은 t3를 실행하고, 이후 t4를 실행하도록 하는 흐름을 짜보면, 아래와 같다.
def _branch(ti):
value = ti.xcom_pull(key='my_key', task_ids='t1')
if (value == 42): return 't2'
else: return 't3'
...
branch = BranchPythonOperator(
task_id='branch',
python_callable=_branch
)
...
t1 >> branch >> [t2, t3] >> t4
매우 간단하지만, 유의해야할 점이 있음. 맨 마지막 dependency를 짜줄 때, branch의 결과에 따라 t2가 실행될지 t3가 실행될지가 결정되는데, 그걸 dependency에도 [t2, t3] 이런 식으로 명시를 해줘야 한다는 점. 언뜻 생각하면 branch 실행이 곧 t2 혹은 t3의 실행까지 포함한다고 생각할 수 있지만, branch에서는 return 까지만 하는 것이고, 그에 따라 PythonBranchOperator가 작동해서 그 결과에 맞는 task를 실행하는 형태기 때문에 저렇게 표시해주는 게 맞음.
그리고, 또 하나. 저렇게 돌려보면 나머지는 잘 되는데, t4 부분에서 문제가 생김.
이 dependency에서도 알 수 있듯, t2 혹은 t3 둘 중 하나만 실행되는데, 기본적으로는 t2, t3 모두 실행되어야만 t4가 실행되는 것. 그것은 기본적으로 Airflow에서는 trigger rule이 all_success로 지정되어 있기 때문.
그래서 이때는, t4에 trigger_rule을 명시해주면 됨. 아래와 같이.
t4 = BashOperator(
task_id='t4',
bash_command="echo ''",
trigger_rule='none_failed_and_skipped'
)
t1 >> branch >> [t2, t3] >> t4
'데이터 엔지니어링' 카테고리의 다른 글
[Spark] 빅데이터와 데이터 레이크 - Hadoop 변천사 (0) | 2025.02.05 |
---|---|
[Airflow] Airflow 내맘대로 꾸미기 (Elasticsearch, plugin) (0) | 2025.02.04 |
[Airflow] 진화론, SubDAG을 지나 TaskGroups까지! (intra-DAG) (0) | 2025.02.02 |
[Airflow] 닉값 못하는 실행자와, 그 단짝 데이터베이스 (0) | 2025.02.02 |
[Airflow] 증기기관에 필적하는 dataset에 대하여 (inter-DAG) (0) | 2025.02.01 |