Apache Huid에 대해 POC할 기회가 생겨 잊어버리지 않도록 블로그에 정리해보고자 한다.
1. Hudi란 무엇일까?
Apache Hudi는 DFS(hadoop hdfs 또는 Cloud Storage)를 기반으로 대량의 분석데이터를 저장하고, 관리할수 있는 스토리지이다. Hudi는 스트림을 통해서 실시간 데이터를 가져올수도 있고, 전통적인 batch 프로세싱으로 데이터를 저장할 수도 있다.
Hudi가 처음이라면 위의 글이 잘 와닿지 않을것이다. 스트리밍으로 데이터를 가져와서 저장하는게 특이한것도 아니고, 배치로 데이터를 저장하는것 또한 마찬가지이기 때문이다.
다만 인입되는 데이터의 사이즈가 아주 크고, Daily가 아닌 더 짧은 주기로 데이터를 분석하기 위해 스트리밍으로 데이터를 싱크해야한다면,그리고 스트리밍을 통해 저장되는 데이터가 Update가 일어나는 데이터라면 어떤 플랫폼을 도입해야할까?
위와 같은 workload를 위해 hudi가 개발되었다.
hudi를 다시한번 정의하자면
- update가 발생하는 대량의 데이터를 실시간으로 DFS(hdfs와 같은)에 저장하고
- 이를 Spark SQL, Hive, Trino(presto)와 같은 쿼리 엔진으로 질의 하여 데이터분석이 가능한
스토리지 플랫폼으로 정의할 수 있다.
Hudi와 비슷한 플랫폼으로는 Apache Iceberg, Delta Lake 이 있다.
2. Hudi Architecture
그림에서 보듯 아키텍쳐는 간단하다.
좌측의 RDD[Records]를 hudi포멧으로 HDFS에 저장하면 이를 통해 전체데이터를 기반으로 하는 Snapshot쿼리, 특정 시간동안 변경된 데이터를 기반으로 하는 Incremental Query를 수행할 수 있게 된다.
RDD[Records]로 표현 된것같이 Spark을 통해 데이터를 Hudi 형식으로 저장할 수 있으며, Spark core를 이용하여 배치방식으로 저장할 수도, Spark Streaming을 이용하여 실시간 스트리밍으로도 저장이 가능하다.(hudi에 저장하는 플랫폼은 Spark 이외에 Flink라고 하는 스트리밍 오픈소스를 이용하여 저장도 지원한다)
Sample 1) Spark을 이용한 Hudi 테이블 생성
val inserts = convertToStringList(dataGen.generateInserts(10)) //테스트를 위한 자동 데이터 생성
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, "hudi_sample_table").
mode(Overwrite).
save(basePath)
Sample 2) Spark SQL을 이용한 쿼리
val tripsSnapshotDF = spark.
read.
format("hudi").
load(basePath)
tripsSnapshotDF.createOrReplaceTempView("hudi_sample_table")
spark.sql("select fare, begin_lon, begin_lat, ts from hudi_sample_table where fare > 20.0").show()
'Hadoop > Hudi' 카테고리의 다른 글
[Apache Hudi] 3-2. Components - Table Type & Query Type (0) | 2022.01.23 |
---|---|
[Apache Hudi] 3-1. Components - Timeline (0) | 2022.01.20 |
[Apache Hudi] 2. 파일 구조 (0) | 2022.01.20 |