본문 바로가기

OpenSource/Spark

(4)
[Spark] SparkContext.addFile 과 --files 의 차이점 (Spark on YARN) Spark 에서 외부데이터를 불러와 처리할 때 코드 상에 SparkContext.addFile 메소드를 호출하거나, spark-submit시 --files 파라미터를 이용하여 외부데이터를 읽어와 처리할 수 있다. 이때 두가지 방법이 서로 동작하는 방식이 달라 YARN에 cluster 모드로 실행시 파일을 읽는 방법이 달라지는데, 이에 대해 정리해보고자 한다. SparkContext.addFile 공식 문서에 따르면 local file, HDFS 뿐 아니라 HTTP, HTTPS, FTP까지 지정하여 사용할 수 있다고 한다. //add file SparkSession spark = SparkSession.builder().getOrCreate(); spark.sparkContext().addFile("htt..
[Spark Streaming] Streaming 처리시 Offset을 저장해보자. 이전 포스팅 : [Spark Streaming] Kafka를 이용한 스트리밍 처리 시 메세지 중복되는 이유 오래전 포스팅이긴 하지만 과거에 스트리밍 어플리케이션 재시작시 메세지가 중복 처리되는것에 대해 알아보았다. 이번에는 실제로 어떻게 카프카 Offset을 관리하는지 정리해보고자 한다. 1. 다양한 Offset 저장소를 지원하기 위한 추상 클래스 선언 import org.apache.kafka.common.TopicPartition; import org.apache.spark.streaming.kafka010.OffsetRange; import java.util.Map; public abstract class OffsetManager { public abstract void writeOffset(Off..
[Spark Streaming] Kafka를 이용한 스트리밍 처리 시 메세지 중복되는 이유 Spark Streaming를 통해 Kafka에서 메세지를 읽어 HDFS에 저장하는 작업중에, 카프카 메세지가 중복으로 저장되는 이슈가 있어 원인 및 해결책을 알아보았다. 1. 상황 Kafka에서 HDFS에 파일로 저장한 후 CanCommitOffsets.commitAsync 를 이용하여 offset을 커밋중이다. 상단의 이미지는 Spark Streaming Kafka Integration의 Storing Offsets 부분이다. 2. 현상 Spark Streaming 잡을 종료 시킨뒤 다시 재시작하면 동일한 메세지가 중복되어 컨슈밍 된다. 그림에서 보는것과 같이 3번 마이크로 배치 수행시 offset:25~51의 데이터를 가져오는 것이 아니라 offset:10~51의 데이터를 가져오게 됨으로서 offs..
[Spark] HDFS 하위 경로의 모든 파일 읽기(with Java) 동일한 Level에 파일이 있는 경우 경로에 ** 나 *.parquet 을 사용하면 모든 파일을 쉽게 읽을수 있다. 예를 들면 hdfs의 경로가 아래와 같다면 /test/2020-04-25/1.parquet /2.parquet /2020-04-26/1.parquet /2.parquet test 디렉토리 하위의 parquet 파일을 읽으려면 Dataset dataSet = sparkSession.read().parquet("/test/**/*.parquet") 처럼 읽을 수 있다. 하지만 아래와 같이 파일의 depth가 다른경우 ** 로는 하위 모든 파일을 읽을 수 없다. /test/order/2020/04/25/1.parquet /2.parquet /product/2020-04-25/1.parquet /..