본문 바로가기

카테고리 없음

[Debezium] Mysql Binlog를 이용하여 특정 시점부터 다시 메세지 전송하는 방법

debezium with kafka connect는 디비지움의 offset (혹은 체크포인트)를 카프카 토픽(`offset.storage.topic으로 설정된 값`)에 저장한다.
이를 통해 디비지움이 재시작되면 offset 토픽에서 마지막으로 저장된 체크포인트부터 다시 카프카로 프로듀싱을 하게 되는데, 이때 몇가지 이슈가 발생해서 과거 시점부터 다시 빈로그를 프로듀싱 해야할 일이 발생했다.

공식 가이드 문서에 따르면 offset 카프카 토픽에 아래와 같은 데이터를 프로듀싱하면 된다고 나와있다.

다만 offset 토픽에 프로듀싱 해야하는 값들(ts_sec, file, pos, row, server_id, event)는 어디서 가지고 와야 하는지 나와있지 않아 여러가지 방법으로 처리한 결과를 정리해보고자 한다.

0. 요구사항

특정 시간대의 빈로그부터 다시 프로듀싱을 할수 있어야 한다.

1. 수동 처리 방법

mysql의 binlog를 직접 조회하여 디비지움이 프로듀싱을 하기 offset 정보를 찾아내어 세팅한다.

1) binlog 파일 리스트 조회
만약 mysql 서버에 접속이 가능한 환경이라면 mysql 서버의 binlog가 저장되어있는 디렉토리로 이동하여 파일리스트를 출력해 본다.

만약 찾고자 하는 binlog의 위치가 4월 30일 경이라면 4월 29일에 생성된 mysql-bin.000003 파일을 분석하면 된다.

그러나 mysql 서버에 직접 접근을 할수가 없다면 아래와 같은 쿼리로 빈로그 파일 목록을 추출해내야 한다.

보통 mysql-bin.xxx 형태로 존재하며 숫자가 클수록 최신의 데이터이다. 위의 binlog file list를 토대로 빈로그 파일을 읽으면서 카프카로 프로듀싱 하고자 하는 위치를 찾아내야 한다.

2) mysqlbinlog를 이용하여 binlog 읽기

mysql에는 mysqlbinlog라고 하는 유틸리티를 제공해준다. 이 유틸리티를 통해 빈로그를 읽을수 있는데, 이를 통해 빈로그를 읽고 debezium offset에 저장할 데이터를 찾아내면 된다. mysqlbinlog를 직접 설치해줘도 되지만, 필자는 도커를 이용하여 설치없이 mysqlbinlog를 바로 실행하도록 하였다.

$docker run -it --rm --platform linux/amd64 mysql:5.7.37-debian \
mysqlbinlog -h localhost \
--port 3306 \
-u {userid} \
--password={password} \
--base64-output=DECODE-ROWS \
--read-from-remote-server \
--start-datetime="2023-04-30 03:15:00" \
mysql-bin.000012 | head -30
  • read-from-remote-server: mysql 외부에서 mysqlbinlog를 수행한다면 해당 옵션을 줘서 수행해야 한다.
  • start-datetime: 특정 위치의 시간 빈로그 부터 읽도록 한다. 이때 시간대는 mysqlbinlog를 수행하는 머신의 local timezone을 기준으로 수행해야 한다. 필자 같은 경우는 docker 컨테이너에서 mysqlbinlog를 수행했기 때문에 local timezone은 UTC이다. 따라서 KST 기준 "2023-04-30 12:15:00" 데이터를 조회하고자 한다면 -9 시간을 해서 "2023-04-30 03:15:00" 으로 설정하도록 한다.
  • mysql-bin.000012: 찾고자 하는 시간대의 빈로그가 존재하지 않는다면 "1) binlog 파일 리스트 조회"의 방법으로 찾아낸 빈로그 파일명을 하나씩 대입하여 원하는 시간대를 찾아낸다.
  • head -30 : 위와 같이 mysqlbinlog를 수행하면 --start-datetime 이후 모든 binlog 데이터를 불러오게 되는데, 우리는 모든 데이터는 필요 없음으로 검색된 데이터 중 최 상단 30 라인만 출력하도록 설정한다.

mysqlbinlog를 수행하면 아래와 같은 결과를 얻을 수 있다.

  • server id: mysql이 가지는 각각의 유니크한 아이디
  • end_log_pos: 로그 위치
  • GTID_NEXT : 다음번에 사용할 GTID
  • TIMESTAMP: 빈로그 timestamp

위 데이터들을 통해 필요한 정보를 얻을수 있는데 GTID값만 약간의 계산을 해야한다.
GTID는 Global Transaction ID란 뜻을 가지고 있는데, 말 그대로 트랜잭션마다 발급되는 id라고 보면 된다.
이 GTID는 {host UUID}:{Auto Increment ID} 형태로 생성이 되는데 위의 빈로그에서는 "c04910ee-xxx-xxx-xxx:3909"로 되어있는것을 볼수 있다.

이때 해당 GTID가 현재의 GTID가 아니라 GTID_NEXT인것을 볼수 있는데, offset을 설정하기 위한 gtid는 위의 gtid를 그대로 가져다 쓰는 것이 아니라 GTID_NEXT - 1의 값으로 설정해 줘야 정상적으로 offset 설정이 된다.

이러한 방식으로 "2023-04-30 03:15:00" 시간대의 binlog 정보는 server_id: 1, pos: 811003, gtid: c04910ee-xxx-xxx-xxx:3908(3909아님 주의), timestamp: 1682824506 이라는것을 찾아냈다.

3) Mysql Gtid Set 구하기

debezium 에 offset을 저장하려면 gtid가 아니라 gtid set을 구해야 한다. gtid set은 gtid의 범위를 나타내는 값인데 이 값을 구해야 offset 설정을 할수 있다.

`SHOW MASTER STATUS` 쿼리를 수행하면 Executed_Gtid_Set을 구할수 있다. 이 값과 "2) mysqlbinlog를 이용하여 binlog 읽기" 에서 구한 GTID 값을 조합하여 Gtid Set을 만들어 내면 된다.

예를 들어 "2) mysqlbinlog를 이용하여 binlog 읽기" 단계에서 구한 gtid가 c04910ee-xxx-xxx-xxx:3908 이고, `SHOW MASTER STATUS`를 통해 구한 GTID Set이 c04910ee-xxx-xxx-xxx:1-3909 이라면 Executed_Gtid_Set에서 3909를 를 3908 로 변경하면 된다. 즉 c04910ee-xxx-xxx-xxx:1-3908 라는 GTID Set을 만들어 낼수 있다. 이 값을 어딘가에 잘 저장해 두자.

싱글서버로 mysql이 구성이 되어있다면 `SHOW MASTER STATUS`를 통해 검색되는 gtid set은 1개의 server uuid를 가진 형태로 될것이다. 만약 2대 이상으로 mysql 클러스터가 구성되어있다면 아래와 같은 형태로 저장되어있을것이다.

Z2174B383-5441-11E8-B90A-C80AA9429562:1-3,c04910ee-xxx-xxx-xxx:1-3909

이때 2의 단계에서 검색한 gtid와 uuid가 동일한 gtid의 auto increment 부분만 수정하고 나머지 gtidset은 그대로 둔다.

최종적으로 gitd set은 Z2174B383-5441-11E8-B90A-C80AA9429562:1-3, c04910ee-xxx-xxx-xxx:1-3908 이 된다.

4) kafka Offset Topic에 offset 정보 저장

binlog 파일을 통해 조회된 정보는 아래와 같은 json 형태로 만들수 있다.

{
  "transaction_id":null, //기본값 null로 설정
  "ts_sec":1682824506,   //binlog의 timestamp 값
  "file":"mysql-bin.000012",  //binlog 파일명
  "pos":811003,                  //binlog의 position
  "gtids":"c04910ee-607d-11eb-b473-fa163ebb265f:1-3908", //gtid set
  "row":1,                        // 기본값 1로 설정
  "server_id":1,                // binlog의 server id
  "event":2                        // 기본값 2로 설정 
}
  • ts_sec: "2) mysqlbinlog를 이용하여 binlog 읽기"에서 구한 timestamp
  • file: binlog 파일명
  • pos: "2) mysqlbinlog를 이용하여 binlog 읽기"에서 구한 position
  • gtids: "3) Mysql Gtid Set 구하기"에서 생성한 gtid set
  • server_id: "2) mysqlbinlog를 이용하여 binlog 읽기"에서 구한 server id
  • row, event, transaction_id: 기본값으로 설정

이 값을 kafka connect offset topic으로 프로듀싱 하면 된다.

$ echo '["inventory-connector",{"server":"dbserver1"}]|{"transaction_id":null,"ts_sec":1682824506,"file":"mysql-bin.000012","pos":811003,"gtids":"c04910ee-607d-11eb-b473-fa163ebb265f:1-3908","row":1,"server_id":1,"event":2}' | \
kafkacat -P -b localhost -t my_connect_offsets -K \| -p 11

 

2. 어플리케이션을  통한 offset 설정

mysql의 binlog를 JVM 언어에서 읽으려면 mysql-binlog-connector-java라이브러리를 사용하면 된다. 이 라이브러리는 debezium에서도 사용하는 라이브러리로 안정적으로 binlog를 읽을 수 있다.

아래의 예제는 mysql-binlog-connector-java라이브러리를 이용하여  kotlin으로 작성한 예제이다.