본문 바로가기

OpenSource

(32)
[Airflow] Custom Operator에서 Jinja template 값 치환 실패시 해결방법 앞서 BashOperator 확장을 통한 Spark Custom Operator 를 통해 Custom Operator를 만들어 보았고, dag 실행시 arguments를 전달하여 실행하는 방법을 통해 arguments를 dag에 전달하는 방법을 알아보았다. 필자는 SparkCustomOperator와 dag arguments 전달 방식을 통해 spark job을 수행하고자 하였는데, 어찌된 일인지 template의 값이 치환되지 않고 문자 그대로 수행되는 바람에 계속 실패를 하게 되어 이를 해결한 내용을 포스팅 하고자 한다. 1. 문제 발생 kudu_sync_task = SparkBashOperator( task_id="spark_task", driver_cores="{{ dag_run.conf[&#39..
[Airflow] dag 실행시 arguments를 전달하여 실행하는 방법 필자는 현재 Mysql의 데이터를 ETL 하여 Apache Kudu에 저장하는 업무를 진행하고 있다. 이때 Airflow는 Sqoop을 이용하여 Mysql테이블 덤프 -> Spark을 통한 Apache Kudu에 Insert 하는 Dag를 만들어 수행하고 있는데, 이 Dag는 스케줄을 걸어 실행하는 것이 아니라, 초기 적재시 혹은 데이터 싱크하다 재적재 이슈가 있을때만 단발성으로 적재하기 위함으로 만들어진 Dag이다. 그런데 Mysql에서 Kudu로 싱크할 테이블 갯수는 많고, 테이블별로 Dag를 하나씩 만들기는 너무 비효율 적이라는 생각이 들어, airflow cli 의 trigger_dag 를 이용하여 dag에 Argument를 통해 테이블 명을 전달하고, 이를 통해 하나의 dag를 통해 여러개의 테..
[Airflow] Multi Server Cluster 환경에서 dag 파일은 모든 서버에 배포해야할까? 오늘은 Multi Server Cluster 환경에서 dag를 배포하는것에 대해 이야기를 해보려고한다. 얼마전에 블로그 댓글을 통해 이런 문의가 왔었다. Master 역할을 하는 서버에만 dag를 배포하고, 이를 통해 Worker에 dag 파일을 전달하는 방식으로 dag를 수행할수 있습니까? 답변으로 간단하게 불가함을 적었었는데, 오늘은 조금더 상세히 남겨보려고한다. 일단 좀더 상세한 로그를 확인하려면 airflow.cfg의 로그레벨 설정항목인 logging_level = INFO 로 설정을 한뒤, scheduler.log 파일을 훑어보면 아래와 같은 로그가 남겨진다. [2020-02-12 15:12:17,513] {base_executor.py:59} INFO - Adding to queue: ['ai..
[Spring Batch] MapJobRepositoryFactoryBean을 사용하여 메타 테이블 생성 없이 배치 작업 설정 with CommandLineRunner 환경 Java 1.8 Spring Boot 2.2.4.RELEASE Spring Batch 4.2.1.RELEASE Maven Spring Batch를 설정할때 각종 실행 결과, 파라미터 등의 메타데이터를 RDBMS에 저장하게 되는데, 간단한 배치작업 위주로 동작할때 이 메타데이터는 크게 중요하지 않을때가 있다. 이번에는 메타 데이터를 DB에 저장하지 않는 방법에 대해 남겨보려고 한다. 1. pom.xml에 Dependency 추가 org.springframework.boot spring-boot-starter-batch org.projectlombok lombok true 2. BatchConfigDefaultBatchConfigurer 상속하여 DataSource를 null로 유지 import org..
[Airflow] Local 개발환경 설정(2)_Dag 개발 앞서 Local 개발환경 설정(1)_설치를 통해 로컬에 Airflow를 설치를 해보았다. 이번에는 로컬에 설치된 Airflow를 이용하여 dag를 개발할 수 있는 환경을 만들어 보고자 한다. 개발 환경 Mac OS(Catalina) Intellij 이전 포스팅에서도 이야기했듯, 2가지 git repository를 사용하는데 airflow-devel repository : 로컬에 airflow 모듈을 설정 airflow-dags repository : dag를 생성하고, 이를 통해 Production Level의 서버에 배포 하는 역할을 한다. 이 airflow-dags repository는 git submodule을 이용하여 로컬에 설치한 airflow-devel/dags로 추가한다. 1. git sub..
[Airflow] Local 개발환경 설정(1)_설치 로컬에서 airflow dag를 생성하거나, 커스텀 오퍼레이터를 생성하기 위해서 로컬에 airflow를 세팅하고, dag를 개발하는 환경을 구축해 보고자 한다. 요구사항 os: Mac OS(Catalina) github Homebrew direnv를 설치하기 위함 direnv direnv는 해당 디렉토리에 .envrc 파일을 읽어 자동으로 python virtualenv 환경을 활성화 시켜주는 역할을 한다. 구조 github에 2개의 repository를 생성한다.(airflow-devel, airflow-dags) airflow-devel: 로컬에서 테스트 하기 위한 root 프로젝트, 이 프로젝트는 로컬 테스트 환경에서만 사용한다. airflow-dags: 로컬 및 실제 Airflow 클러스터에서 실..
[Airflow] BashOperator 확장을 통한 Spark Custom Operator 이전 포스팅을 통해 SparkSubmitOperator을 사용해보았다. 하지만 포스팅 말미에도 적어놓았지만 SparkSubmitOperator의 이슈때문에(yarn queue를 지정하기 어려운점) BashOperator를 상속하여 SparkBashOperator를 만들어 보았다. BashOperator를 이용하여 spark_submit을 직접 호출하는 형태임으로, 상황에 따라 Spark Binary, Hadoop Binary, Hadoop Config 등 설정이 필요하다. 전체 코드 from airflow.operators.bash_operator import BashOperator from airflow.utils.decorators import apply_defaults import re class ..
[Nifi]Could Not Generate Extensions Documentation 에러 해결 방법 커스텀 프로세서 생성 후 mvn clean package를 통해 .nar파일을 생성하는데, 아래와 같은 에러가 발생했다. [ERROR] Could not generate extensions' documentation org.apache.maven.plugin.MojoExecutionException: Failed to create Extension Documentation at org.apache.nifi.NarMojo.generateDocumentation (NarMojo.java:596) at org.apache.nifi.NarMojo.execute (NarMojo.java:499) at org.apache.maven.plugin.DefaultBuildPluginManager.executeMo..