본문 바로가기

데이터 엔지니어링

[Airflow] 증기기관에 필적하는 dataset에 대하여 (inter-DAG)

 

(1) 이런식으로 T 1,2,3이 끝나고 SQL이 업데이트되면, 그 값을 T A,B,C가 받아야 하는 상황이라면, SQL과 TA 사이에 Trigger Operator, Sensor를 활용해서 이를 트리거 하는 방법이 일반적이었음. 그러나 이게 좀 여러모로 복잡하고 어려움. 이걸 쉽게 할수 있게 해주는 녀석이 있다.

 

(2) 또한 이 일련의 흐름에서 하나의 DAG에 각기 다른 팀이 2개씩 맡아야한다고 하면, 서로 여러가지 부분에서 상충될 여지가 있다. 그렇기 때문에 하나의 큰 DAG이 아닌, 세개의 micropipeline으로 세분화하면 좋은데, 이 작업을 쉽게 할 수 있게 하는 녀석이 있다.

 

 

DATASET

dataset은 2.4 버전에 도입된 개념으로, DAG 간의 의존성을 쉽게 관리할 수 있게 하는 기능임.

기존에는 DAG 간 데이터를 주고받으려면 XCom, TriggerDagRunOperator, ExternalTaskSensor 등을 사용했지만, Dataset을 이용하면 더 직관적으로 DAG 간의 연결을 설정할 수 있음.

즉,  Dataset을 이용하면 DAG 실행 결과를 데이터 형태로 저장하고, 이를 다른 DAG이 감지하여 자동 실행할 수 있음.

 

dataset은 그러니까, file, sql table처럼 데이터의 묶음을 말함. URI라는 주소와, EXTRA라는 나머지 정보들로 이뤄짐.

 

from airflow import Dataset

my_file = Dataset(
	"s3://dataset/file.csv",
	extra={'owner': 'james'}
)

 

URI는 해당 dataset에 대한 유일한 식별자로, ASCII로만 이뤄져야 하고 case sensitive함.

그리고 airflow:// 이런식으로 airflow 스키마는 쓸 수 없다고 함.

 

 

DAG을 만드는 최신 방식

우선 큰틀에서는, 두개의 DAG을 만들 거고, producer DAG가 consumer DAG 를 트리거하는 방식으로 만들 것임.

 

producer DAG을 아래와 같이 짬.

from airflow import DAG, Dataset
# Dataset이란 것 추가

from airflow.decorators import task
# task는 python operator 도입을 쉽게끔 도와주는 녀석.

from datetime import date, datetime

my_file = Dataset("/tmp/my_file.txt")

with DAG(
    dag_id="producer",
    schedule="@daily",	# schedule-interval 대신 schedule 사용.
    start_date=datetime(2025,1,1),
    catchup=False
):

    @task(outlets=[my_file])
    def update_dataset():
    	with open(my_file.uri, "a+") as f:
            f.write("producer update")
    
    update_dataset()

 

consumer DAG는

from airflow import DAG, Dataset
from airflow.decorators import task

from datetime import datetime

my_file = Dataset("/tmp/my_file.txt")

with DAG(
    dag_id="consumer",
    schedule=[my_file],
    start_date=datetime(2025,1,1),
    catchup=False
):

    @task
    def read_dataset():
        with open(my_file.uri, "r") as f:
            print(f.read())
    
    read_dataset()

 

이렇게 설정해두면, dataset을 producer가 바꾸는 그 자체만으로 consumer가 인지하고 바로 실행할 수 있게 함. 기존의 특정한 시각 혹은 시간의 단위가 아니라, 인과에 의해 실행되는 것이 가능해진 것임.

 

그리고 이 과정에서 schedule-interval이 아닌 schedule로 파라미터 이름이 바뀌고, dataset을 받을 수 있게 됨!

 

 

Dataset 활용법을 정리하면

dataset은, DAG 끼리의 의존성 문제를 수월하게 해결하기 위해 만든 개념. 원래는 producer와 consumer 사이에 sensor와 operator로 복잡한 관계를 만들어야만 하고, 그렇게 되면 이후 수정 혹은 확장하기가 안 좋았음. 그러나 airflow 2.4 버전부터는, dataset이 등장하면서, 위의 복잡한 작업 대신 dataset만으로 의존성을 정립할 수 있음.

 

(1) producer와 consumer에서 dataset으로 사용할 녀석을 사전에 uri로 맞춰두고, 코드 상단에 정의함.

(2) producer의 outlet에 [dataset 이름] 넣어주고

(3) consumer의 schedule에 [dataset 이름] 넣어주면 됨. 참 쉽쥬? 👏👏👏

 

 

Dataset의 한계

  • DAG과 같은 airflow 인스턴스에 위치한 dataset만 사용할 수 있음.
  • Consumer DAGs는 Producer에서 dataset이 수정완료된 경우 무조건 trigger 됨. dataset이 잘 수정됐는지 확인 안함.
  • Producer DAG에 task 1, 2가 모두 하나의 dataset을 가리킨다면, 둘 중 하나만 수정돼도 Consumer trigger 됨.
  • 외부 툴이 dataset을 수정해도 Consumer는 이를 포착하지 못함.

 

과정에서 그냥 궁금해진 점

dataset이 매개가 되는 그 과정이 더 자세히 알고싶다.

 

예를 들어, outlet에 dataset 이름 넣고 해당 task에서는 막상 dataset은 수정을 안 하면 어떻게 되는거지? 그래도 trigger가 되나?

YES. dataset의 내용에 대해서는 Consumer가 확인 안함. 그냥 해당 dataset을 참고하는 producer의 task가 완료되면 consumer가 실행되는 것임.

 

dataset이 적용되는 것은 DAG 중에서 일부 task라고 한다면, 해당 task가 끝나는 시점에 trigger가 발생해서 consumer DAG의 해당 task가 실행되는 건지, 아니면 producer DAG는 다 끝이 나야 consumer DAG가 실행되는건지?

YES. Producer DAG의 해당 task만 완료되면, Consumer DAG 전체가 실행됨. Producer DAG의 다른 task들은 신경 안 쓰고, 또 Consumer DAG의 해당 task만 실행되는 게 아니라 전체가 된다는 점.

 

@task 어노테이션을 붙이면 해당하는 함수를 실행하는 라인을 꼭 작성해야 작동되는 것인지?

❗YES. task 어노테이션은 이게 task다 라는 의미일 뿐, 해당 라인을 실행하는 파트를 꼭 적어줘야함.

 

/tmp 폴더가 자주 보이던데, 무엇이고 어떤 의미가 있는지?

/tmp 폴더는 임시 파일을 저장하는 디렉토리. XCom이 일반적으로 한 DAG 내부에서만 공유되고, 데이터 한도가 적다는 것을 타파하는 좋은 대안.

주의해야할 점은, 시스템 재부팅 시 파일 삭제된다는 점. 그러니까 일시적으로 쓰이는 녀석들만 활용하면 됨. XCom은 메타데이터 DB 이므로 자체적으로 휘발이 되지는 않음.

그리고 worker마다 /tmp가 지칭하는 폴더가 다를 수 있다고 함.