본문 바로가기

OpenSource

(32)
[Docker] Dockerfile을 이용한 도커 이미지 선언하기 도커 이미지를 여러번 만들어 사용해보았지만, 기존에 만들어진, 혹은 github에 공유된 Dockerfile을 기준으로 복사하고 짜집기 하여 사용해왔었다. 오늘은 Dockerfile Format에 대해 정리해보려고 한다. 1. 주석 도커파일 내 주석은 아래와 같이 표현한다. # Comment 2. FROM 새로 생성하고자 하는 이미지의 베이스 이미지를 선언한다 FROM 이미지명 # 태그를 선언하지 않을 경우 latest 태그가 적용된다. FROM 이미지명:태그명 # 예시: FROM ubuntu:21.04 3. RUN 베이스 이미지에 명령을 실행하여 어플리케이션을 추가로 설치한다거나 할 때 사용한다. RUN 명령어 # 예시 ## shell 형식으로 실행 RUN apt-get install -y htop #..
[Spark Streaming] Kafka를 이용한 스트리밍 처리 시 메세지 중복되는 이유 Spark Streaming를 통해 Kafka에서 메세지를 읽어 HDFS에 저장하는 작업중에, 카프카 메세지가 중복으로 저장되는 이슈가 있어 원인 및 해결책을 알아보았다. 1. 상황 Kafka에서 HDFS에 파일로 저장한 후 CanCommitOffsets.commitAsync 를 이용하여 offset을 커밋중이다. 상단의 이미지는 Spark Streaming Kafka Integration의 Storing Offsets 부분이다. 2. 현상 Spark Streaming 잡을 종료 시킨뒤 다시 재시작하면 동일한 메세지가 중복되어 컨슈밍 된다. 그림에서 보는것과 같이 3번 마이크로 배치 수행시 offset:25~51의 데이터를 가져오는 것이 아니라 offset:10~51의 데이터를 가져오게 됨으로서 offs..
[Nifi] Flowfile 순서보장 삽질기 앞서 몇번의 포스팅에서도 소개했듯, 필자는 현재 Nifi를 통해 RDBMS 데이터를 하둡으로 실시간 복제하기 위하여 사용중이다. Nifi Processor를 설정한 뒤에 데이터를 검증하는 도중 특정 상황에 데이터(flowfile)이 역전되는 현상을 발견하였고 이를 해결하기 위한 내용을 포스팅해보고자한다. 1. Nifi 클러스터 정보 nifi version: 1.10.0 openjdk 1.8 2. 데이터 역전이 발생되는 상황 1) Processor 구성 Custom Processor를 생성하여 주기적으로 특정 테이블에 쿼리를 수행하여 각 row에 대한 flowfile을 생성하고 해당 데이터를 Kafka로 보내고 있다. 그리고 쿼리 수행이 완료되면 CheckPoint를 state에 저장하고 다음 쿼리에 해당..
[Nifi] Flowfile 중복제거를 위한 DetectDuplicate 사용 및 주의점 필자는 커스텀 프로세서를 통해 DB에서 특정 시간컬럼을 기준으로 데이터를 크롤링하고, 실시간 반영을 위하여 kafka로 전송하는 것을 Nifi를 통해 구현하였다. 이때 내부적 이슈에 의해 DB에서 데이터를 검색하는 쿼리에서 누락이 발생했고, 이를 해결하기 위하여 마지막 동작시간 - 10초 전 데이터를 더 가지고 오도록 함으로서 누락 문제를 해결하였으나, 추가적으로 데이터의 중복이라는 문제를 해결해야 하는 이슈가 있었다. 이번에는 flowfile 중복을 해결할수 있는 DetectDuplicate를 사용하는 방법과 주의할 점에 대해서 이야기 해보도록 하려고 한다. 1. DetectDuplicate의 기본컨셉 DetectDuplicate는 flowfile contents의 hash값을 통하여 기존에 캐시에 존..
[Nifi] Load Balance와 Partition by attribute 삽질기 Nifi 1.8.0 버전부터 Load Balance라는 기능이 생겼다. (자세한 설명은 여기로) 말그대로 flowfile을 적당한 기준에 맞춰 각 노드들로 분산을 시켜준다는것인데, 직관적인 이름이기 때문에 이해하기 어렵지 않다. 1. Load Balance 종류 Do Not load balance(기본값): 로드 밸런스를 사용하지 않음 Partition by attribute : Flowfile의 특정 attribute를 통해서 로드 밸런싱 함 Round robin: Nifi 클러스터를 구성하는 노드에 순차적으로 분산시킴 Single Node: 하나의 Nifi 노드로만 보냄 테스트 클러스터 : 3 Nodes GenerateFlowFile(Primary Node Only) : 테스트용 Flowfile 생성..
[Spark] HDFS 하위 경로의 모든 파일 읽기(with Java) 동일한 Level에 파일이 있는 경우 경로에 ** 나 *.parquet 을 사용하면 모든 파일을 쉽게 읽을수 있다. 예를 들면 hdfs의 경로가 아래와 같다면 /test/2020-04-25/1.parquet /2.parquet /2020-04-26/1.parquet /2.parquet test 디렉토리 하위의 parquet 파일을 읽으려면 Dataset dataSet = sparkSession.read().parquet("/test/**/*.parquet") 처럼 읽을 수 있다. 하지만 아래와 같이 파일의 depth가 다른경우 ** 로는 하위 모든 파일을 읽을 수 없다. /test/order/2020/04/25/1.parquet /2.parquet /product/2020-04-25/1.parquet /..
[Airflow] schedule_interval을 한국시간으로 설정하기 기본적으로 airflow는 UTC 시간을 기준으로 하고 있기 때문에 start_date 속성이나 schedule_interval 같은 속성을 사용할때 한국시간 - 9시간 으로 계산하여 입력하는데 무척이나 불편하다. 오늘은 timezone 설정을 통해 schedule_interval 설정을 한국시간으로 입력하도록 해보겠다. pendulum을 이용한 타임존 지정 from datetime import datetime import pendulum from airflow import DAG from airflow.operators.bash_operator import BashOperator ## 로컬 타임존 생성 local_tz = pendulum.timezone("Asia/Seoul") ## start_date..
[Airflow] template 기본값 지정 Custom Operator에서 Jinja template 값 치환 실패시 해결방법 포스팅을 통해 Jinja template을 airflow에서 사용해 보았다. dag_run에 해당 설정 값이 없을때 기본값을 지정하고 싶으면 아래와 같이 하면 된다. task = SparkBashOperator( task_id="kudu_syc", driver_cores="{{ dag_run.conf['driver_cores']|default('1', true)}}", driver_memory="{{ dag_run.conf['driver_memory']|default('1g', true)}}", executor_cores="{{ dag_run.conf['executor_cores']|default('1', true)}}"..