본문 바로가기

[Spark Streaming] Streaming 처리시 Offset을 저장해보자. 이전 포스팅 : [Spark Streaming] Kafka를 이용한 스트리밍 처리 시 메세지 중복되는 이유 오래전 포스팅이긴 하지만 과거에 스트리밍 어플리케이션 재시작시 메세지가 중복 처리되는것에 대해 알아보았다. 이번에는 실제로 어떻게 카프카 Offset을 관리하는지 정리해보고자 한다. 1. 다양한 Offset 저장소를 지원하기 위한 추상 클래스 선언 import org.apache.kafka.common.TopicPartition; import org.apache.spark.streaming.kafka010.OffsetRange; import java.util.Map; public abstract class OffsetManager { public abstract void writeOffset(Off..
[Docker] 이미지 다운로드를 위한 Proxy 설정 $ sudo mkdir -p /etc/systemd/system/docker.service.d $ sudo vi /etc/systemd/system/docker.service.d/http-proxy.conf ## PROXY 정보 추가 [Service] Environment="HTTP_PROXY=http://proxy.example.com:80" Environment="HTTPS_PROXY=https://proxy.example.com:443" Environment="NO_PROXY=localhost,127.0.0.1,docker-registry.example.com,.corp" $sudo systemctl daemon-reload $sudo systemctl restart docker
[Docker] Dockerfile을 이용한 도커 이미지 선언하기 도커 이미지를 여러번 만들어 사용해보았지만, 기존에 만들어진, 혹은 github에 공유된 Dockerfile을 기준으로 복사하고 짜집기 하여 사용해왔었다. 오늘은 Dockerfile Format에 대해 정리해보려고 한다. 1. 주석 도커파일 내 주석은 아래와 같이 표현한다. # Comment 2. FROM 새로 생성하고자 하는 이미지의 베이스 이미지를 선언한다 FROM 이미지명 # 태그를 선언하지 않을 경우 latest 태그가 적용된다. FROM 이미지명:태그명 # 예시: FROM ubuntu:21.04 3. RUN 베이스 이미지에 명령을 실행하여 어플리케이션을 추가로 설치한다거나 할 때 사용한다. RUN 명령어 # 예시 ## shell 형식으로 실행 RUN apt-get install -y htop #..
[Kotlin] Kotlin의 Class 생성 방법들(Constructor, Data Class) Koltin에서는 클래스를 인스턴스화 할때 자바와 같이 생성자(Constructor)를 사용하기도 하고, 축약된 방법으로 클래스를 선언할 수 있다. 이번에는 다양한 방식으로 클래스를 생성하고, 클래스의 프로퍼티를 초기화 하는 방법을 정리해 보고자 한다. 1. 기본적인 클래스 생성 클래스의 속성(property)를 가지고 있지 않는 경우, 위와 같이 간단하게 class 키워드와 Class이름을 지정하여 간단하게 선언할 수 있다. 이때 중괄호({})는 생략 가능하다. class Person 2. 생성자(Constructor)를 이용한 프로퍼티 초기화 Kotlin에서는 Java와 유사하게 생성자(Constructor)를 통해 클래스 프로퍼티를 초기화 할 수 있다. Kotlin의 Constructor는 Prim..
[Kudu] FlushMode의 종류 및 주의점 오늘은 이 FlushMode의 사용법과 주의점에 대해서 써보려고 한다. Kudu에 데이터를 쓸 때 KuduSession 인스턴스를 확보하고, flush 하게 되는데, 이때 Kudu에서는 3가지의 FlushMode를 지원한다. 1. Java Library를 이용한 기본 사용법 import org.apache.kudu.client.*; public class KuduTest { public static void main(String[] args) throws KuduException { String tableName = "sample"; try(KuduClient client = new KuduClient.KuduClientBuilder(masterAddr).build()) { KuduTable table ..
[Spark Streaming] Kafka를 이용한 스트리밍 처리 시 메세지 중복되는 이유 Spark Streaming를 통해 Kafka에서 메세지를 읽어 HDFS에 저장하는 작업중에, 카프카 메세지가 중복으로 저장되는 이슈가 있어 원인 및 해결책을 알아보았다. 1. 상황 Kafka에서 HDFS에 파일로 저장한 후 CanCommitOffsets.commitAsync 를 이용하여 offset을 커밋중이다. 상단의 이미지는 Spark Streaming Kafka Integration의 Storing Offsets 부분이다. 2. 현상 Spark Streaming 잡을 종료 시킨뒤 다시 재시작하면 동일한 메세지가 중복되어 컨슈밍 된다. 그림에서 보는것과 같이 3번 마이크로 배치 수행시 offset:25~51의 데이터를 가져오는 것이 아니라 offset:10~51의 데이터를 가져오게 됨으로서 offs..
[JupyterLab] Nginx Reverse Proxy를 통한 Jupyter Lab 연결 설정 주피터를 서버에 구성할때 nginx reverse proxy를 통해 도메인 및 80포트로 접근을 하려고 설정했는데, 정상적으로 동작하지 않는 문제가 발생했다. 원인은 주피터에서 WebSocket을 통해 통신을 하는데, WebSocket 관련 설정이 Reverse proxy 설정에 빠져있어 정상 동작이 안되었던 것이었다. 이를 해결하기 위해서는 아래와 같이 nginx 설정을 변경하면 된다. http { ## ... 생략 ... server { listen 80 default_server; listen [::]:80 default_server; ## 각자 도메인에 맞게 설정 server_name louisdev.com root /usr/share/nginx/html; # Load configuration f..
[JAVA] JDBC 메타정보 추출하기 최근 프로젝트를 진행하면서 CDC(Change Data Capture)를 구현할 일이 있었는데, 이때 ResultSetMetaData 와 DatabaseMetaData 를 통해 테이블의 각종정보를 가져오는 일을 진행했었다. 오늘은 관련하여 기록을 하기위해 포스팅을 진행하고자 한다. 1. 컬럼 리스트 추출 //getConnection은 생략 try(Connection connection = getConnection(), Statement stmt = connection.createStatement()) { //데이터는 필요 없기 때문에 where 조건에 1 != 0을 입력 ResultSet rs = stmt.executeQuery("SELECT * FROM test_table WHERE 1 != 0"); ..