[Airflow] Airflow가 뭘까-요?
What is Airflow?
Apache Airflow는 워크플로(Workflow)와 데이터 파이프라인을 자동화하고 스케줄링하는 도구.
즉, 데이터가 이동하는 과정을 정의하고, 실행 순서를 조정하고, 모니터링하는 역할을 함.
Why Airflow?
1 Workflow Organization: 태스크들이 순서에 맞게 진행되게끔 도와줌
2 Visibility: 동시다발적으로 여러개의 워크플로우가 있을 때, 전체를 다 파악하고 문제를 해결하기 용이.
3 Flexibility and Scalability: 동시에 여러개의 data source와 툴에 연결 가능하고, 필요에 의해 점진적으로 DB 쿼리, AI 모델, 데이터 퀄리티 체크, API 갖다쓰기 등의 확장 및 개진이 가능함.
또한,
python based라 진입장벽이 낮고, dynamic tasks/ workflows, 그리고 branching (if) 등의 기술적 장점도 있음.
뿐만 아니라 UI가 잘 되어 있다는 점도 장점임.
What to not use Airflow
1 실시간 스트리밍: Airflow는 배치 데이터에 적합하고, 실시간 스트리밍과는 맞지 않음.
혹은 매초, 매분 이런 식의 자주 업데이트가 필요한 경우에도 맞지 않음.
2 데이터 프로세싱 프레임워크: 데이터 프로세서들을 관리하는 툴이지, 직접 해당 데이터들을 관리하는 게 아님.
3 데이터 저장소: 2번과 마찬가지로, 직접 데이터를 저장하지 않음.
4 복잡하지 않은 워크플로우 혹은 의존관계:
airflow는 복잡한 의존관계를 관리하고 대규모 트래픽을 처리하기 위해 병렬화하는 등의 기능에 강점이 있음.
그게 아니라면 cron jobs 같은 걸로 단순하게 쓰는 게 나음.
Core Components of Airflow
1 Web Server: UI 대시보드를 보여줌.
2 Scheduler: 특정 태스크가 정확한 순서대로, 정확한 시각에 실행되게끔 만듦.
3 Meta Database: 모든 workflows의 디테일한 정보들을 모두 소지하고 있음. 플랜 뿐 아니라 결과까지 저장.
4 Triggerer: 자동화되지 않은 작업이 나오면 알림을 알려주는 기능. 유저의 행동을 필요로 함.
5 Executor: 직렬/병렬 여부, 그리고 어떤 시스템으로 실행할지 여부 등을 매니지함.
6 Queue: 태스크를 진행할 순서대로 줄세워둔 것
7 Worker: 태스크를 수행하는 바로 그 녀석.
Core Concepts
1 DAG: 의존관계를 반영하여 태스크들을 Directed Acyclic Graph 형태로 연결시켜둔 것.
The structure of entire workflow.
2 Operator: 하나의, 이상적으로는 멱등한, 태스크를 정의함. 함수의 정의와 같음.
The individual part of the workflow. PythonOp, BashOp, SQLExecuteQueryOp 등.
3 Task/Task Instance: Operator가 DAG에 배정되면, 그걸 task라고 부름.
Operator는 그냥 존재하고, 그걸 DAG에 배정한 상태가 되면 그건 task임.
4 Workflow: DAG에서 정의된 전체 프로세스- 모든 태스크, 의존관계를 포함 - 를 말함.
Various Architectures
1 Single Node Architecture: Airflow의 모든 컴포넌트가 하나의 노드(컴퓨) 위에서 실행되고 있을 때.
이게 디폴트라고 할 수 있음. 셋업 쉽고 관리 쉬워서 작은 워크플로우에 알맞음.
2 Multi Node Architecture: Airflow의 여러 컴포넌트가 여러 개의 노드에서 실행될 때
확장성을 갖춤. 단일 지점 장애에 강함. 그러나 복잡함.
How it works @ high level
1 DAG 작성: python 파일. task가 포함됨. task는 Python, Bash, SQL 등 다양한 실행 방식이 가능함.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def my_task():
print("Hello, Airflow!")
# DAG 정의
dag = DAG(
"my_dag",
schedule_interval="@daily",
start_date=datetime(2025, 1, 1),
catchup=False,
)
# Task 정의
task1 = PythonOperator(
task_id="print_hello",
python_callable=my_task,
dag=dag,
)
2 DAG이 Airflow에 등록됨: 정확히는 dags/ 폴더에 저장.
airflow dags list
3 Scheduler가 DAG를 주기적으로 스캔 후 실행할 task를 executor에 전달함.
airflow scheduler
4 Executor는 Scheduler가 실행할 Task를 Worker에게 넘기는 역할을 함.
5 Worker가 task를 실제로 실행하는 역할.
airflow celery worker
6 실행이 끝나면 그 결과를 Meta DB에 저장. Web UI에도 결과 반영
airflow tasks list my_dag
7 DAG 끝, (1) 성공했다면 다음 실행 주기 준비 - 3번으로 회귀함. (2) 아니라면 retry 혹은 failure 처리 수행.