본문 바로가기

OpenSource/Spark

[Spark Streaming] Streaming 처리시 Offset을 저장해보자.

 

이전 포스팅 : [Spark Streaming] Kafka를 이용한 스트리밍 처리 시 메세지 중복되는 이유

오래전 포스팅이긴 하지만 과거에 스트리밍 어플리케이션 재시작시 메세지가 중복 처리되는것에 대해 알아보았다.
이번에는 실제로 어떻게 카프카 Offset을 관리하는지 정리해보고자 한다.

1. 다양한 Offset 저장소를 지원하기 위한 추상 클래스 선언

import org.apache.kafka.common.TopicPartition;
import org.apache.spark.streaming.kafka010.OffsetRange;

import java.util.Map;

public abstract class OffsetManager {
    public abstract void writeOffset(OffsetRange[] offsetRanges, String consumerGroupId)  throws Exception ;
    public abstract Map<TopicPartition, Long> readOffset(String consumerGroupId, String topic) throws Exception;
}

필자는 주키퍼에 Offset을 저장하였지만, 상황에 따라 NoSql이나 또 다른 저장소에 저장하고 싶을 수 있을것이다. 이럴때는 위의 OffsetManager 클래스를 상속받아 두개의 메소드만 구현해서 쓸수 있도록 Abstract Class를 만들어 두었다.

2. OffsetManager를 상속받은 구현체 작성

필자는 위에서 말했던 것처럼 zookeeper를 이용해 offset을 저장하기로 했다. Zookeeper Library는 Apache Curator를 이용하여 구현하였다.

import lombok.Getter;
import lombok.extern.log4j.Log4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.ZKPaths;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.streaming.kafka010.OffsetRange;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Log4j
public class ZkOffsetManager extends OffsetManager{
    private String zkString;

    private static final int DEFAULT_RETRY_SLEEP_MS = 1000;
    private static final int DEFAULT_RETRY_COUNT = 3;
    private static final int DEFAULT_CONNECTION_TIMEOUT_MS = 1000;
    private static final int DEFAULT_SESSION_TIMEOUT_MS = 2000;

    //주키퍼 최상위 저장 위치
    private static final String OFFSET_ROOT_PATH = "/streaming/offset";
    //카프카 컨슈머 그룹별로 저장, /consumerGroup/kafkaTopic/parittionNumber 의 구조
    public static final String OFFSET_PATH_WRITE_PATTERN = OFFSET_ROOT_PATH + "/%s/%s/%d";
    public static final String OFFSET_PATH_READ_PATTERN = OFFSET_ROOT_PATH + "/%s/%s";

    @Getter
    private CuratorFramework zkClient;

    public ZkOffsetManager(String zkString) {
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(DEFAULT_RETRY_SLEEP_MS, DEFAULT_RETRY_COUNT);
        this.zkClient = CuratorFrameworkFactory.newClient(zkString, DEFAULT_SESSION_TIMEOUT_MS, DEFAULT_CONNECTION_TIMEOUT_MS, retryPolicy);
        this.zkClient.start();

        this.zkString = zkString;
    }

    @Override
    public void writeOffset(OffsetRange[] offsetRanges, String consumerGroupId) throws Exception {
        if(offsetRanges == null && offsetRanges.length < 1) {
            throw new RuntimeException("Offset Range is empty...");
        }

        for(OffsetRange range: offsetRanges) {

            String zkPath = getOffsetWritePath(range.topic(), consumerGroupId, range.partition());
            log.debug("zk Path : " + zkPath);

            ZkUtils.create(zkClient, zkPath, Long.toString(range.untilOffset()));
        }
    }

    @Override
    public Map<TopicPartition, Long> readOffset(String consumerGroupId, String topic) throws Exception {
        Map<TopicPartition, Long> offsetMap = new HashMap<>();

        String offsetPath = getOffsetReadPath(topic, consumerGroupId);
        List<String> partitions = ZkUtils.getChildren(zkClient, offsetPath);


        if(CollectionUtils.isNotEmpty(partitions)) {
            for(String partition: partitions) {
                Long untilOffset = Long.parseLong(StringUtils.defaultIfEmpty(ZkUtils.getData(zkClient, getOffsetReadPath(topic, consumerGroupId) + "/" + partition), "0"));
                Integer partitionNumber = Integer.parseInt(partition);

                offsetMap.put(new TopicPartition(topic, partitionNumber), untilOffset);
            }
        }

        return offsetMap;
    }

    /*
    * zookeeper read path 생성
    */
    public String getOffsetReadPath(String topicName, String consumerGroupId) {
        return String.format(OFFSET_PATH_READ_PATTERN, consumerGroupId, topicName);
    }

    /*
    * zookeeper write path 생성
    */
    public String getOffsetWritePath(String topicName, String consumerGroupId, int partition) {
        return String.format(OFFSET_PATH_WRITE_PATTERN, consumerGroupId, topicName, partition);
    }

}

ZookeeperOffsetManager에서 사용한 ZkUtils 클래스를 보자.


import org.apache.curator.framework.CuratorFramework;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ZkUtils {

    /**
     * 지정된 path의 하위 노드의 이름을 가져옴
     *
     * */
    public static List<String> getChildren(CuratorFramework zkClient, String path) throws Exception {
        if(zkClient.checkExists().forPath(path) == null) {
            return new ArrayList<>();
        }
        return zkClient.getChildren().forPath(path);
    }

    public static String getData(CuratorFramework zkClient, String path) throws Exception {
        byte[] dataArray = zkClient.getData().forPath(path);

        if(dataArray != null && dataArray.length > 0) {
            return new String(dataArray, StandardCharsets.UTF_8);
        }
        else {
            return null;
        }
    }

    /**
     * @key path
     * @value zk data
     * */
    public static Map<String, String> getData(CuratorFramework zkClient, List<String> paths) throws Exception {
        Map<String, String> dataMap = new HashMap<>();

        for(String path : paths) {
            dataMap.put(path, getData(zkClient, path));
        }

        return dataMap;
    }

    //특정위치에 Zookeeper 노드를 생성
    public static void create(CuratorFramework zkClient, String path, String value) throws Exception{
        if(zkClient.checkExists().forPath(path) == null) {
            zkClient.create().creatingParentsIfNeeded().forPath(path, value.getBytes("UTF-8"));
        }
        else {
            zkClient.setData().forPath(path, value.getBytes("UTF-8"));
        }
    }
}

3. 실제 사용 예시

OffsetManager offsetManager = new ZkOffsetManager("localhost:2181");
Map<TopicPartition, Long> offsetMap = offsetManager.readOffset("consumer_group", "kafka_topic_name");


JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
                ssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.Subscribe(Arrays.asList(topicName), kafkaParams, offsetMap)
        );

stream.foreachRDD((VoidFunction<JavaRDD<ConsumerRecord<String, String>>>) rdd -> {
    if(!rdd.isEmpty()) {

        OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

        //TODO 비지니스 로직 처리

        //수동으로 Offset 저장
        offsetManager.writeOffset(offsetRanges, "consumer_group");
    }
});