전체 글 (72) 썸네일형 리스트형 [Airflow] Airflow 내맘대로 꾸미기 (Elasticsearch, plugin) Plugins기본적으로 소프트웨어 기능을 호가장하는 모듈을 plugin이라고 부름.기본으로 제공되지 않는 기능을 추가하기 위해 사용. Airflow는 확장성을 위해 plugin 시스템을 제공함. python 코드로 작성 가능.Airflow에서는, Views, Operators, Hooks, Sensors 포함 모든 컴포넌 customize 가능하다. customizing을 위해서는, 우선 class를 만들어야 함. AirflowPlugin class.한번 만들면, 마치 시중에 유통되는 python module처럼 쓸 수 있음.염두할 것은, custom plugin은 lazy loaded 되기 때문에, 만들고 나면 restart 해줘야 반영됨. 우리는 Elasticsearch를 활용할 수 있는 airflo.. [Airflow] Fancy functions: XCom (기본 Intra) + Branching XCom 이란?두 개의 서로 다른 태스크 사이에 공유되는 메모리는, external tool을 활용할 수도 있겠지만 그렇게 하면 GB, TB의 데이터에 대해서는 효율적이겠지만, 작고 가벼운 메모리에 대해서는 비효율적임. 그래서 task 사이에 공유하는 작고 가벼운 메모리에 대해서는, Airflow에서 일반적으로 XCom을 사용.XCom은 기본적으로 Airflow의 메타데이터 DB에 저장됨.특정 Task에서 데이터를 push하고, 다른 task에서 이를 pull하는 식으로 진행. XCom은 Task 간의 데이터 공유를 가능케 하지만, 파일 시스템을 직접 활용하는 방식은 아님.파일 시스템이 아닌 메타데이터 DB에 저장됨. XCom의 데이터는 Key-Value 형식으로 저장됨. Airflow의 메타데이터 DB.. [Airflow] 진화론, SubDAG을 지나 TaskGroups까지! (intra-DAG) SubDAGsAirflow에서, 하위 DAG를 생성하는 등의 계층화를 통해 복잡한 task들의 관계를 더 추상화할 수 있게 하는 개념.DAG 내부에서 또 다른 DAG를 실행할 수 있게 하는 기능. 상위 DAG에서 하위 DAG을 하나의 task처럼 다룰 수 있음.SubDAG은, (1) DAG 내부에서 논리적으로 연관된 task들을 묶어서 재사용할 수 있게 함.(2) 복잡한 DAG을 단순화 및 모듈화할 수 있게 함. 그림과 같이 파일을 다운로드하는 task가 3개 있고, 처리하는 task가 또 3개 있다고 하자. 코드로 나타내면 아래와 같음.from airflow import DAGfrom airflow.operators.bash import BashOperator from datetime import d.. [Airflow] 닉값 못하는 실행자와, 그 단짝 데이터베이스 Executors 이름이 executor지만 task를 실행하지 않음. 어떤식으로 worker를 운영할지 결정하는 컴포넌트임. 어떤 executor를 사용할지는 config 파일에 들어가서 바꿔주면 되는데, airflow의 기본템은 sequantial executor.이때 만약 celery executor를 쓰고 싶다면, airflow config 파일을 들어가서 수정하고 그럴 필요 없음. docker-compose 를 쓸 거라면 docker-compose.yaml이 어차피 기본파일을 override 하기 때문에 거기에 AIRFLOW_CORE_EXECUTOR: CeleryExecutor 해주면 그만임. 아래와 같이. Executor은 대략 다음과 같은 종류들이 있음. 각각의 executor들은 task .. [Airflow] 증기기관에 필적하는 dataset에 대하여 (inter-DAG) (1) 이런식으로 T 1,2,3이 끝나고 SQL이 업데이트되면, 그 값을 T A,B,C가 받아야 하는 상황이라면, SQL과 TA 사이에 Trigger Operator, Sensor를 활용해서 이를 트리거 하는 방법이 일반적이었음. 그러나 이게 좀 여러모로 복잡하고 어려움. 이걸 쉽게 할수 있게 해주는 녀석이 있다. (2) 또한 이 일련의 흐름에서 하나의 DAG에 각기 다른 팀이 2개씩 맡아야한다고 하면, 서로 여러가지 부분에서 상충될 여지가 있다. 그렇기 때문에 하나의 큰 DAG이 아닌, 세개의 micropipeline으로 세분화하면 좋은데, 이 작업을 쉽게 할 수 있게 하는 녀석이 있다. DATASETdataset은 2.4 버전에 도입된 개념으로, DAG 간의 의존성을 쉽게 관리할 수 있게 하는 기능.. [Airflow] 비로소, DAG 작성하고 테스트해보기 DAG 작성 기본 틀DAG를 짜보자. DAG를 정의하는 기본적인 형태는 아래와 같음.with DAG('user_processing', start_date=datetime(2023,1,1), schedule_interval='@daily', catchup=False) as dag: None with A as B: B라는 변수명에 A를 안전하게 할당하고 싶을 때 사용하는 python 문법이라고 함 - context manager.파라미터에 들어가는 건, 유일한 이름이어야 하는 이름과, 시작일자, 인터벌 주기, catchup 변수. 이제 저 None 자리에 저 DAG에 해당되는 task들을 넣어주면 된다.기본적으로 하나의 task는 하나의 operator에 할당하고, 하나의 목적만을 할당해야 함.서로 다.. [Airflow] Airflow UI에 대한 간단한 정리 이게 일반적인 airflow의 DAG 대시보드.다른 건 어려울 게 없고, runs와 recent tasks가 차이를 가지는데,runs는 DAG run이라는 의미. 특정 시각에 실행된 DAG의 인스턴스.airflow 스케줄러가 DAG을 주기적으로 스캔하여, 실행 시점에 DAG run을 생성함. recent tasks는 해당 DAG로부터 생성된 tasks들을 의미하는 것임.그니까 DAG runs는 전체에 대한 이야기, Recent Tasks는 각각의 task instance에 대한 이야기. 하나를 pause untoggle해서 돌려보고, 그 이름을 누르면 돌린 결과물이 뜬다.기본으로 뜨는 화면인데, grid 뷰.직관적이지 않은 부분은 아직까지 없다. graph 뷰는 이런식. 위에 hover 해보면 관련 정.. [Airflow] airflow 띄워보기 with/without 도커! Run Airflow in Python Environment기본적으로 윈도우 환경에서는 VSC에서 리눅스 커맨드 사용 어려움 => 그래서 WSL 활용함.globalman, globalman패키지 충돌 방지 및 프로젝트를 가상으로 독립적으로 관리하기 위해 python의 venv를 활용함.이 가상환경에 들어가려면 source py_env/bin/activate 커맨드로 입장하면 됨.이 가상환경에서 나가려면 deactivate 커맨드로 퇴장하면 됨.Apache Airflow를 로컬환경에 다운로드받음. 시간이 꽤나 걸림. 대략 5분 소요됨.pip install 'apache-airflow==2.10.4' \ --constraint "https://raw.githubusercontent.com/apache/air.. 이전 1 2 3 4 5 6 ··· 9 다음 목록 더보기