본문 바로가기

[Spark] HDFS 하위 경로의 모든 파일 읽기(with Java) 동일한 Level에 파일이 있는 경우 경로에 ** 나 *.parquet 을 사용하면 모든 파일을 쉽게 읽을수 있다. 예를 들면 hdfs의 경로가 아래와 같다면 /test/2020-04-25/1.parquet /2.parquet /2020-04-26/1.parquet /2.parquet test 디렉토리 하위의 parquet 파일을 읽으려면 Dataset dataSet = sparkSession.read().parquet("/test/**/*.parquet") 처럼 읽을 수 있다. 하지만 아래와 같이 파일의 depth가 다른경우 ** 로는 하위 모든 파일을 읽을 수 없다. /test/order/2020/04/25/1.parquet /2.parquet /product/2020-04-25/1.parquet /..
[Airflow] schedule_interval을 한국시간으로 설정하기 기본적으로 airflow는 UTC 시간을 기준으로 하고 있기 때문에 start_date 속성이나 schedule_interval 같은 속성을 사용할때 한국시간 - 9시간 으로 계산하여 입력하는데 무척이나 불편하다. 오늘은 timezone 설정을 통해 schedule_interval 설정을 한국시간으로 입력하도록 해보겠다. pendulum을 이용한 타임존 지정 from datetime import datetime import pendulum from airflow import DAG from airflow.operators.bash_operator import BashOperator ## 로컬 타임존 생성 local_tz = pendulum.timezone("Asia/Seoul") ## start_date..
[Airflow] template 기본값 지정 Custom Operator에서 Jinja template 값 치환 실패시 해결방법 포스팅을 통해 Jinja template을 airflow에서 사용해 보았다. dag_run에 해당 설정 값이 없을때 기본값을 지정하고 싶으면 아래와 같이 하면 된다. task = SparkBashOperator( task_id="kudu_syc", driver_cores="{{ dag_run.conf['driver_cores']|default('1', true)}}", driver_memory="{{ dag_run.conf['driver_memory']|default('1g', true)}}", executor_cores="{{ dag_run.conf['executor_cores']|default('1', true)}}"..
[Airflow] Custom Operator에서 Jinja template 값 치환 실패시 해결방법 앞서 BashOperator 확장을 통한 Spark Custom Operator 를 통해 Custom Operator를 만들어 보았고, dag 실행시 arguments를 전달하여 실행하는 방법을 통해 arguments를 dag에 전달하는 방법을 알아보았다. 필자는 SparkCustomOperator와 dag arguments 전달 방식을 통해 spark job을 수행하고자 하였는데, 어찌된 일인지 template의 값이 치환되지 않고 문자 그대로 수행되는 바람에 계속 실패를 하게 되어 이를 해결한 내용을 포스팅 하고자 한다. 1. 문제 발생 kudu_sync_task = SparkBashOperator( task_id="spark_task", driver_cores="{{ dag_run.conf[&#39..
[Airflow] dag 실행시 arguments를 전달하여 실행하는 방법 필자는 현재 Mysql의 데이터를 ETL 하여 Apache Kudu에 저장하는 업무를 진행하고 있다. 이때 Airflow는 Sqoop을 이용하여 Mysql테이블 덤프 -> Spark을 통한 Apache Kudu에 Insert 하는 Dag를 만들어 수행하고 있는데, 이 Dag는 스케줄을 걸어 실행하는 것이 아니라, 초기 적재시 혹은 데이터 싱크하다 재적재 이슈가 있을때만 단발성으로 적재하기 위함으로 만들어진 Dag이다. 그런데 Mysql에서 Kudu로 싱크할 테이블 갯수는 많고, 테이블별로 Dag를 하나씩 만들기는 너무 비효율 적이라는 생각이 들어, airflow cli 의 trigger_dag 를 이용하여 dag에 Argument를 통해 테이블 명을 전달하고, 이를 통해 하나의 dag를 통해 여러개의 테..
[Kudu]Kudu와 Presto 그리고 unix_timestamp에 대해 이해하기 Kudu 공식 문서에도 나와있지만 Kudu에서 주로 사용하는 쿼리엔진은 Impala나 Hive다. 하지만 필자는 이미 사용하고 있는 Presto 클러스터가 있어 Presto-Kudu Connector 를 이용하여 Kudu에 쿼리를 수행하고 있었는데, Kudu의 unixtime_micros 타입의 컬럼이 계속 헷갈려 여러가지 테스트를 수행해 보았다. 이 포스팅은 그 테스트 과정 및 결과에 대한 이야기이다. Kudu에 데이터를 넣을때 KST(UTC+9) 시간을 UTC로 변경해서 넣어야 할까? 사실 많은 개발자들이 알고있듯, 유닉스_시간은 1970년 1월1일 부터 현재까지의 시간을 정수형으로 나타낸 값이다. Kudu에도 시간을 나타내는 컬럼인 unixtime_micros 타입의 컬럼이 유닉스 타임을 저장하기 ..
[Airflow] Multi Server Cluster 환경에서 dag 파일은 모든 서버에 배포해야할까? 오늘은 Multi Server Cluster 환경에서 dag를 배포하는것에 대해 이야기를 해보려고한다. 얼마전에 블로그 댓글을 통해 이런 문의가 왔었다. Master 역할을 하는 서버에만 dag를 배포하고, 이를 통해 Worker에 dag 파일을 전달하는 방식으로 dag를 수행할수 있습니까? 답변으로 간단하게 불가함을 적었었는데, 오늘은 조금더 상세히 남겨보려고한다. 일단 좀더 상세한 로그를 확인하려면 airflow.cfg의 로그레벨 설정항목인 logging_level = INFO 로 설정을 한뒤, scheduler.log 파일을 훑어보면 아래와 같은 로그가 남겨진다. [2020-02-12 15:12:17,513] {base_executor.py:59} INFO - Adding to queue: ['ai..
[Spring Batch] MapJobRepositoryFactoryBean을 사용하여 메타 테이블 생성 없이 배치 작업 설정 with CommandLineRunner 환경 Java 1.8 Spring Boot 2.2.4.RELEASE Spring Batch 4.2.1.RELEASE Maven Spring Batch를 설정할때 각종 실행 결과, 파라미터 등의 메타데이터를 RDBMS에 저장하게 되는데, 간단한 배치작업 위주로 동작할때 이 메타데이터는 크게 중요하지 않을때가 있다. 이번에는 메타 데이터를 DB에 저장하지 않는 방법에 대해 남겨보려고 한다. 1. pom.xml에 Dependency 추가 org.springframework.boot spring-boot-starter-batch org.projectlombok lombok true 2. BatchConfigDefaultBatchConfigurer 상속하여 DataSource를 null로 유지 import org..