본문 바로가기

OpenSource/Airflow

(14)
[Airflow] Trouble Shooting 1. webserver에서 "Some workers seem to have died and gunicorn did not restart them as expected" 에러 발생하며 웹페이지가 보여지지 않을때 원인: webserver에 메모리 부족으로 인하여 gunicorn worker가 kill 됨 해결책: airflow.cfg 의 [webserver] 섹션에 workers 의 갯수를 줄인다(default 4). 혹은 도커 컨테이너 생성시 environment에 AIRFLOW__WEBSERVER__WORKERS를 지정한다. webserver의 사용 메모리를 키운다. 필자는 docker-compose를 이용하여 airflow 환경을 구성중이다. 이때 mem_limit을 이용하여 메모리를 조절할 수 있다...
[Airflow] schedule_interval을 한국시간으로 설정하기 기본적으로 airflow는 UTC 시간을 기준으로 하고 있기 때문에 start_date 속성이나 schedule_interval 같은 속성을 사용할때 한국시간 - 9시간 으로 계산하여 입력하는데 무척이나 불편하다. 오늘은 timezone 설정을 통해 schedule_interval 설정을 한국시간으로 입력하도록 해보겠다. pendulum을 이용한 타임존 지정 from datetime import datetime import pendulum from airflow import DAG from airflow.operators.bash_operator import BashOperator ## 로컬 타임존 생성 local_tz = pendulum.timezone("Asia/Seoul") ## start_date..
[Airflow] template 기본값 지정 Custom Operator에서 Jinja template 값 치환 실패시 해결방법 포스팅을 통해 Jinja template을 airflow에서 사용해 보았다. dag_run에 해당 설정 값이 없을때 기본값을 지정하고 싶으면 아래와 같이 하면 된다. task = SparkBashOperator( task_id="kudu_syc", driver_cores="{{ dag_run.conf['driver_cores']|default('1', true)}}", driver_memory="{{ dag_run.conf['driver_memory']|default('1g', true)}}", executor_cores="{{ dag_run.conf['executor_cores']|default('1', true)}}"..
[Airflow] Custom Operator에서 Jinja template 값 치환 실패시 해결방법 앞서 BashOperator 확장을 통한 Spark Custom Operator 를 통해 Custom Operator를 만들어 보았고, dag 실행시 arguments를 전달하여 실행하는 방법을 통해 arguments를 dag에 전달하는 방법을 알아보았다. 필자는 SparkCustomOperator와 dag arguments 전달 방식을 통해 spark job을 수행하고자 하였는데, 어찌된 일인지 template의 값이 치환되지 않고 문자 그대로 수행되는 바람에 계속 실패를 하게 되어 이를 해결한 내용을 포스팅 하고자 한다. 1. 문제 발생 kudu_sync_task = SparkBashOperator( task_id="spark_task", driver_cores="{{ dag_run.conf[&#39..
[Airflow] dag 실행시 arguments를 전달하여 실행하는 방법 필자는 현재 Mysql의 데이터를 ETL 하여 Apache Kudu에 저장하는 업무를 진행하고 있다. 이때 Airflow는 Sqoop을 이용하여 Mysql테이블 덤프 -> Spark을 통한 Apache Kudu에 Insert 하는 Dag를 만들어 수행하고 있는데, 이 Dag는 스케줄을 걸어 실행하는 것이 아니라, 초기 적재시 혹은 데이터 싱크하다 재적재 이슈가 있을때만 단발성으로 적재하기 위함으로 만들어진 Dag이다. 그런데 Mysql에서 Kudu로 싱크할 테이블 갯수는 많고, 테이블별로 Dag를 하나씩 만들기는 너무 비효율 적이라는 생각이 들어, airflow cli 의 trigger_dag 를 이용하여 dag에 Argument를 통해 테이블 명을 전달하고, 이를 통해 하나의 dag를 통해 여러개의 테..
[Airflow] Multi Server Cluster 환경에서 dag 파일은 모든 서버에 배포해야할까? 오늘은 Multi Server Cluster 환경에서 dag를 배포하는것에 대해 이야기를 해보려고한다. 얼마전에 블로그 댓글을 통해 이런 문의가 왔었다. Master 역할을 하는 서버에만 dag를 배포하고, 이를 통해 Worker에 dag 파일을 전달하는 방식으로 dag를 수행할수 있습니까? 답변으로 간단하게 불가함을 적었었는데, 오늘은 조금더 상세히 남겨보려고한다. 일단 좀더 상세한 로그를 확인하려면 airflow.cfg의 로그레벨 설정항목인 logging_level = INFO 로 설정을 한뒤, scheduler.log 파일을 훑어보면 아래와 같은 로그가 남겨진다. [2020-02-12 15:12:17,513] {base_executor.py:59} INFO - Adding to queue: ['ai..
[Airflow] Local 개발환경 설정(2)_Dag 개발 앞서 Local 개발환경 설정(1)_설치를 통해 로컬에 Airflow를 설치를 해보았다. 이번에는 로컬에 설치된 Airflow를 이용하여 dag를 개발할 수 있는 환경을 만들어 보고자 한다. 개발 환경 Mac OS(Catalina) Intellij 이전 포스팅에서도 이야기했듯, 2가지 git repository를 사용하는데 airflow-devel repository : 로컬에 airflow 모듈을 설정 airflow-dags repository : dag를 생성하고, 이를 통해 Production Level의 서버에 배포 하는 역할을 한다. 이 airflow-dags repository는 git submodule을 이용하여 로컬에 설치한 airflow-devel/dags로 추가한다. 1. git sub..
[Airflow] Local 개발환경 설정(1)_설치 로컬에서 airflow dag를 생성하거나, 커스텀 오퍼레이터를 생성하기 위해서 로컬에 airflow를 세팅하고, dag를 개발하는 환경을 구축해 보고자 한다. 요구사항 os: Mac OS(Catalina) github Homebrew direnv를 설치하기 위함 direnv direnv는 해당 디렉토리에 .envrc 파일을 읽어 자동으로 python virtualenv 환경을 활성화 시켜주는 역할을 한다. 구조 github에 2개의 repository를 생성한다.(airflow-devel, airflow-dags) airflow-devel: 로컬에서 테스트 하기 위한 root 프로젝트, 이 프로젝트는 로컬 테스트 환경에서만 사용한다. airflow-dags: 로컬 및 실제 Airflow 클러스터에서 실..