본문 바로가기

OpenSource/Airflow

(14)
[Airflow] BashOperator 확장을 통한 Spark Custom Operator 이전 포스팅을 통해 SparkSubmitOperator을 사용해보았다. 하지만 포스팅 말미에도 적어놓았지만 SparkSubmitOperator의 이슈때문에(yarn queue를 지정하기 어려운점) BashOperator를 상속하여 SparkBashOperator를 만들어 보았다. BashOperator를 이용하여 spark_submit을 직접 호출하는 형태임으로, 상황에 따라 Spark Binary, Hadoop Binary, Hadoop Config 등 설정이 필요하다. 전체 코드 from airflow.operators.bash_operator import BashOperator from airflow.utils.decorators import apply_defaults import re class ..
[Airflow] SparkSubmitOperator를 이용한 spark Job Submit Airflow에서는 다양한 Operator를 지원하는데 그 중 Spark을 실행하기 위한 SparkSubmitOperator 라는 것이 존재한다. 이번에는 SparkSubmitOperator를 이용하여 spark application을 동작시켜보도록 하겠다. 사전작업 SparkSubmitOperator는 내부적으로 spark binary인 spark-submit 을 호출하는 방식으로 spark을 실행하고 있다. 그러므로 Apache spark 에서 각자의 환경에 알맞은 spark을 각 airflow worker에 다운로드 한후 다운로드 경로를 $SPARK_HOME을 환경변수로 등록하고 $SPARK_HOME/bin 을 path에 추가한다. 또한 필자는 yarn에 spark job을 동작시킬것임으로 HADO..
[Airflow] macOS catalina에서 hostname does not match this instance's hostname 에러 처리 얼마전 macOS Catalina버전으로 업그레이드 되면서 로컬에서 airflow가 정상적으로 동작하지 않는 문제가 생겼다. The recorded hostname [1m1.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.ip6.arpa[0m does not match this instance's hostname [1m1.0.0.127.in-addr.arpa 위의 에러 메세지를 기준으로 airflow 코드를 들여다 보기로 했다. ## airflow/jobs/local_task_job.py def heartbeat_callback(self, session=None): """Self destruct task if state has been..
[Airflow] Scheduler SPOF(Single Point Of Failure) 제거하기 위의 그림은 celery executor를 이용한 여러대의 워커로 구성한 아키텍쳐이다. 이중 scheduler는 DB에서 스케줄 정보를 가져와 redis의 pub/sub을 이용하여 worker들에게 잡을 할당하는데, 이때 scheduler는 단일고장점(Single Point Of Failure-SPOF) 이 된다. 따라서 Downtime이 없는 airflow를 구성하려면 SPOF를 제거하는것이 중요한데 airflow 자체적으로 해결할수 있는 방법은 아직 존재하지 않는다. 'master 서버에 scheduler를 두개 실행해놓으면 되지 않을까' 라는 생각을 해보지만, 이렇게 scheduler를 여러개 실행하면 동일한 DAG가 실행시킨 scheduler 갯수만큼 실행된다. 이를 해결하기 위한 ..
[Airflow] 커버로스 설정&Hive 연결 airflow를 이용하여 kerberos 인증이 적용된 데이터소스(ex - hadoop)에 접근하려면 커버로스 설정을 airflow에 적용해야 한다. 아래의 예제는 kerberos 인증이 적용된 hive에 접근하는 방법을 작성해 보고자 한다. 1. 커버로스 설정 커버로스 연동은 id/pw를 통해서 연동할 수도 있지만, 아래의 설정은 keytab 파일을 가지고 설정하는 방법을 다룬다. $ cd $AIRFLOW_HOME $ sudo yum install thrift_sasl cyrus-sasl-devel cyrus-sasl-gssapi cyrus-sasl-md5 cyrus-sasl-plain $ pip3 install apache-airflow[hive]==1.10.9 ## apache-airflow[ker..
[Airflow] Cluster Installation 설치 환경 CentOS 7.7 airflow 1.10.5 airflow는 단일 서버에서 설치하는 방법외에 python celery모듈을 이용하여 webserver와 worker서버를 분리 할수 있다. 아래는 webserver 1대와 worker서버 2대에 설치하는방법이며, 이때 Celery를 이용하여 multiple worker를 설정하려면 추가적으로 Redis 또는 RabbitMQ, Mysql 서버를 추가적으로 필요로 한다. 보통의 구성으로는 web server: airflow webserver, airflow scheduler, mysql, redis 설치 worker server : airflow worker 와 같은 식으로 설치한다. airflow 모듈 airflow는 역할에 따라 몇가지 모듈이 ..