본문 바로가기

Hadoop/Kudu

[Kudu] FlushMode의 종류 및 주의점

오늘은 이 FlushMode의 사용법과 주의점에 대해서 써보려고 한다.
Kudu에 데이터를 쓸 때 KuduSession 인스턴스를 확보하고, flush 하게 되는데, 이때 Kudu에서는 3가지의 FlushMode를 지원한다.

1. Java Library를 이용한 기본 사용법

import org.apache.kudu.client.*;

public class KuduTest {
    public static void main(String[] args) throws KuduException {
        String tableName = "sample";

        try(KuduClient client = new KuduClient.KuduClientBuilder(masterAddr).build()) {
            KuduTable table = client.openTable(tableName);
            KuduSession session = client.newSession();
            //아래의 메소드를 이용하여 FlushMode를 선택할 수 있다.
            session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);

            Insert insertOperation = table.newInsert();
            PartialRow row = insertOperation.getRow();
            row.addString("column1", "col-value");
            row.addLong("column2", 1L);

            session.apply(insertOperation);
            session.flush();
        }
    }
}

2. FlushMode 설정

kuduSession.setFlushMode를 통해 어떠한 방식으로 테블릿 서버에 전달할지 설정할수 있다.
Kudu는 AUTO_FLUSH_SYNC, AUTO_FLUSH_BACKGROUND, MANUAL_FLUSH 를 제공하고 있으며, 세가지 모드는 아래와 같은 특징이 있다.

1) AUTO_FLUSH_SYNC

기본 옵션이며, kuduSession.apply() 메소드 호출시 바로 태블릿 서버로 플러시 처리되고 리턴된다. apply() 메소드 호출 즉시 플러시 되므로, flush() 호출시에는 아무런 동작도 하지 않는다.

2) AUTO_FLUSH_BACKGROUND

kuduSession.apply() 메소드 호출시 즉시 리턴되지만, 태블릿 서버에 바로 플러시되지 않는다.

apply 메소드는 백그라운드의 버퍼에 쌓는 역할만 하며, 버퍼에 플러시할 데이터가 가득 차게 되면 그때서야 태블릿 서버로 플러시 된다.
플러시 해야할 데이터를 모아 Batch 형태로 처리되어 처리 속도에 이점이 있는 반면, 데이터 적용시 순서 보장이 되지 않음으로 주의해서 사용해야한다.

kuduSession.flush()메소드가 호출되면 로컬 버퍼가 비워지며, 비워지는동안 처리가 완료될때까지 중지(Blocking) 될수 있다.

3) MANUAL_FLUSH

kuduSession.apply()가 호출되는 즉시 리턴되지만, 실제로 바로 태블릿 서버에 쓰여지는것은 아니고, kuduSession.flush()를 호출할때까지 기다린다.

주의할 점은 설정된 버퍼 크기보다 더 많이 버퍼에 쓰여지면 익셉션이 발생함으로 로컬 버퍼가 다 차기 전에 flush()를 호출해야한다.

3. 태블릿 서버 쓰기 실패 체크 방법

AsyncKuduSession.apply()를 통해 태블릿 서버에 쓰여지게 되는데, 이때 public Deferred<OperationResponse> apply(final Operation operation) throws KuduException 와 같이 KuduException이 throw 되고 있다.

메소드 형태만 보면 KuduExcetpion만 try~catch 하여 처리하면 될듯하지만 실제로 이런식으로 처리하면 안된다.

// 잘못된 익셉션 처리방법
try(KuduClient client = new KuduClient.KuduClientBuilder(masterAddr).build()) {
    KuduTable table = client.openTable(tableName);
    KuduSession session = client.newSession();
    //아래의 메소드를 이용하여 FlushMode를 선택할 수 있다.
    session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);

    Insert insertOperation = table.newInsert();
    PartialRow row = insertOperation.getRow();
    row.addString("column1", "col-value");
    row.addLong("column2", 1L);

    session.apply(insertOperation);
    session.flush();
}
catch(KuduException e) {
    //TODO 에러처리
}

문제가 없어보이긴 하지만 실제로 동작해보면 Kudu에 쓰기 오퍼레이션이 실패하더라도 catch에 걸리지 않는다. 왜 그럴까?

FlushMode.AUTO_FLUSH_SYNC 모드일때의 코드를 따라가 보자.

// AsyncKuduSession.doAutoFlushSync
private Deferred<OperationResponse> doAutoFlushSync(final Operation operation) {
    if (timeoutMillis != 0) {
      operation.resetTimeoutMillis(client.getTimer(), timeoutMillis);
    }
    operation.setExternalConsistencyMode(consistencyMode);
    operation.setIgnoreAllDuplicateRows(ignoreAllDuplicateRows);
    operation.setIgnoreAllNotFoundRows(ignoreAllNotFoundRows);

    return client.sendRpcToTablet(operation)
        .addCallbackDeferring(resp -> {
          client.updateLastPropagatedTimestamp(resp.getWriteTimestampRaw());
          return Deferred.fromResult(resp);
        })
        .addErrback(new SingleOperationErrCallback(operation));
}

// SingleOperationErrCallback
// 쓰기 실패시 해당 callback이 호출됨
private static final class SingleOperationErrCallback implements Callback<Object, Exception> {

    private final Operation operation;

    private SingleOperationErrCallback(Operation operation) {
      this.operation = operation;
    }

    @Override
    public Object call(Exception e) throws Exception {
      //KuduException일때(즉 쓰기 실패시) 익셉션이 throw 되지 않고, OperationResponse객체를 생성하여 리턴한다.
      if (e instanceof KuduException) {
        Status status = ((KuduException) e).getStatus();
        RowError rowError = new RowError(status, operation);
        return new OperationResponse(0, null, 0, operation, rowError);
      }
      return e;
    }
  }

코드에서 보면 sendRpcToTablet을 통해 실제 태블릿에 쓰기 시도를 하며, 에러 발생시 addErrback에 등록된 callback을 수행하도록 해뒀다.
SingleOperationErrCallback.call() 의 구현을 보면 KuduException일 경우 바로 익셉션을 throw 하지 않으며, OperationResponse 인스턴스에 에러 정보를 담아 리턴하게 된다.

결과적으로 태블릿에 쓰기를 실패해도 KuduException이 throw 되지 않으며, 그렇기 때문에 try~catch로 쓰기 실패를 찾아낼수가 없는것이다.

정상적으로 쓰기 실패를 체크하려면 FlushMode에 따라 다르게 코드를 작성해야한다.

1) FlushMode.AUTO_FLUSH_SYNC 일때

//... 생략....
OperationResponse response = session.apply(insertOperation);
if(response.hasRowError()) {
    //TODO 에러처리
}

위에서 작성했듯, kuduSession.apply 호출시 바로 flush 됨으로 이때 리턴되는 OperationResponse를 받아 에러 유무를 판단하면된다.

2) FlushMode.AUTO_FLUSH_BACKGROUND, MANUAL_FLUSH 일때

//... 생략....
session.apply(insertOperation);
session.flush();
if(session.countPendingErrors() > 0) {
    //TODO 에러처리
}

AUTO_FLUSH_BACKGROUND, MANUAL_FLUSH 모드일 때는 kuduSession.flush()를 호출하면 비동기적으로 플러시 되기 때문에 따로 리턴값을 받지 않고 kuduSesison.countPendingErrors()를 통해 에러 유무를 판단할 수 있다.