필자는 현재 Mysql의 데이터를 ETL 하여 Apache Kudu에 저장하는 업무를 진행하고 있다.
이때 Airflow는 Sqoop을 이용하여 Mysql테이블 덤프 -> Spark을 통한 Apache Kudu에 Insert 하는 Dag를 만들어 수행하고 있는데, 이 Dag는 스케줄을 걸어 실행하는 것이 아니라, 초기 적재시 혹은 데이터 싱크하다 재적재 이슈가 있을때만 단발성으로 적재하기 위함으로 만들어진 Dag이다.
그런데 Mysql에서 Kudu로 싱크할 테이블 갯수는 많고, 테이블별로 Dag를 하나씩 만들기는 너무 비효율 적이라는 생각이 들어, airflow cli 의 trigger_dag
를 이용하여 dag에 Argument를 통해 테이블 명을 전달하고, 이를 통해 하나의 dag를 통해 여러개의 테이블 싱크를 진행하는 작업을 진행하게 되었다.
오늘 포스팅은 해당 작업에 대한 내용을 남겨보려고 한다.
1. trigger_dag 를 통한 Argument 전달
airflow trigger_dag -c '{"table": "my-table"}' dag_id
-c 옵션을 통해 Json 데이터를 Dag로 전달하게 된다.
2. -c 옵션을 통한 Arguments를 Dag에서 사용하기
Airflow에서는 매크로를 이용하여 미리 정의된 몇가지 정보들을 쉽게 가져올 수 있다.
1번에서 전달한 Arguments를 읽으려면 Airflow 매크로중 {{ dag_run }}
을 이용하여 전달된 Arguments를 읽을 수 있다.
## bash operator 사용시
task = BashOperator(
task_id="sample_task",
bash_command="echo '{{ dag_run.conf['table'] }}'"
dag=dag
)
## 혹은
cmd_template = """
echo '{{ dag_run.conf['table'] }}'
"""
task = BashOperator(
task_id="sample_task",
bash_command=cmd_template
dag=dag
)
## python operator 사용시
def print_arguments(**kwargs):
table_name = kwargs['dag_run'].conf.get('table')
print(table_name)
task = PythonOperator(
task_id="sample_task",
python_callable=print_arguments,
provide_context=True, ## 반드시 해당 옵션을 지정해야 함
dag=dag
)
위의 예제에서 보는 것과 같이 Bash Operator에서는 Jinja template + dag_run매크로
로, Python Operator에서는 kwargs
를 이용하여 전달된 Arguments를 읽을 수 있다.
3. dag_run 기본값 설정 방법
dag_run을 통해 파라미터가 들어오지 않을 경우 아래와 같은 방식으로 기본값을 지정할 수 있다.
## |default를 이용하여 기본값을 지정할 수 있다.
cmd_template = """
echo '{{ dag_run.conf['table']|default('my_table', true) }}'
"""
task = BashOperator(
task_id="sample_task",
bash_command=cmd_template
dag=dag
)
주의할 점
위와 같이 Arguments를 읽는 dag는 Airflow WebUI에서 실행하지 못한다.실행하려면 1번과 같이 터미널에서 수행하는 방법 밖에 없음으로, 스케줄에 맞춰 실행하는 dag에는 위와 같은 방법이 어울리지 않는다.
Airflow 1.10.10 버전 이후 부터 Web UI에서 dagrun을 이용한 파라미터 전달이 가능하다. 상세한 내용은 airflow.apache.org/blog/airflow-1.10.10/에서 확인 가능하다.
'OpenSource > Airflow' 카테고리의 다른 글
[Airflow] template 기본값 지정 (0) | 2020.03.19 |
---|---|
[Airflow] Custom Operator에서 Jinja template 값 치환 실패시 해결방법 (0) | 2020.03.19 |
[Airflow] Multi Server Cluster 환경에서 dag 파일은 모든 서버에 배포해야할까? (0) | 2020.02.24 |
[Airflow] Local 개발환경 설정(2)_Dag 개발 (0) | 2020.02.12 |
[Airflow] Local 개발환경 설정(1)_설치 (0) | 2020.02.12 |