본문 바로가기

OpenSource/Nifi

[Nifi] Flowfile 순서보장 삽질기

앞서 몇번의 포스팅에서도 소개했듯, 필자는 현재 Nifi를 통해 RDBMS 데이터를 하둡으로 실시간 복제하기 위하여 사용중이다. Nifi Processor를 설정한 뒤에 데이터를 검증하는 도중 특정 상황에 데이터(flowfile)이 역전되는 현상을 발견하였고 이를 해결하기 위한 내용을 포스팅해보고자한다.

1. Nifi 클러스터 정보

  • nifi version: 1.10.0
  • openjdk 1.8

2. 데이터 역전이 발생되는 상황

1) Processor 구성

Custom Processor를 생성하여 주기적으로 특정 테이블에 쿼리를 수행하여 각 row에 대한 flowfile을 생성하고 해당 데이터를 Kafka로 보내고 있다.
그리고 쿼리 수행이 완료되면 CheckPoint를 state에 저장하고 다음 쿼리에 해당 CheckPoint부터 쿼리를 수행하는 방식으로 데이터를 읽는다.
Nifi의 QueryDatabaseTable 과 동일하다고 보면 될것이다.

2) 발생 조건

  1. 오랫동안 프로세서를 멈추었다가 동작시킬때 발생
  2. Nifi Queue가 가득 찼을때 발생

여러번의 테스트를 통해 위의 두가지 조건일때 데이터가 역전이 되는 현상이 발생됨을 확인하였다.

필자가 만든 CustomProcessor나 QueryDatabaseTable 같은 Processor들은 마지막 수행시점을 state에 저장하고, 이를 다음번 쿼리의 where조건으로 사용한다.
즉 오랫동안 Processor를 중지 시켜두었다가 동작시키게 되면 대량의 flowfile이 순간적으로 생성이 된다. 순간적으로 대량으로 발생한 flowfile은 큐를 가득차게 만들었다.

Queue가 가득 찬 상황

3.원인

1) Nifi Queue의 구조

Nifi는 SwappablePriorityQueue 로 프로세서간 큐를 구현하고 있다.

Nifi의 큐는 java.util.PriorityQueue로 선언되어있다. 즉 Nifi의 큐는 JVM heap 메모리에 저장되는 구조이다. Heap에 저장할 경우 쉽게 구현할수 있다는 장점이 있는 반면, 데이터가 한번에 몰리게 되어 큐에 가득 쌓이게 된다면 Out Of Memory를 발생시켜 데이터를 유실할 수도 있는 단점을 가지고 있다.

이를 해결하기 위해 Nifi에서는 프로세서에서 처리를 진행하는 큐인 Active Queue와 후순위의 데이터를 저장하는 SwapQueue로 큐를 정의하고 있다.

Active Queue가

가득찼을때

nifi.properties 설정파일의 nifi.queue.swap.threshold(기본값은 10,000) 값보다 더 많이 쌓여있을때의 시나리오를 보면 아래와 같다.

1. 대량의 flowfile이 Active Queue에 가득차면
2. 우선순위가 낮은 flowfile들을 SwapQueue로 이동시키고
3. SwapQueue로 이동된 데이터를 파일형태로 저장하여 Active Queue의 크기를 줄여 Heap을 확보한다.
4. Active Queue의 데이터를 처리 완료하면 파일 형태로 저장된 Swap 데이터를 다시 Active Queue로 읽어 처리를 진행한다.

2) 원인

시나리오상으로는 전혀 문제가 될것이 없어보여 찾다보니 우연히 SwappablePriorityQueue의 커밋로그를 보게 되었고 하나의 Jira를 찾게되었다.

https://issues.apache.org/jira/browse/NIFI-7011

위의 이슈에 따르자면, SwapQueue로 이동된 데이터가 파일로 플러시 된 후, Active Queue가 소모되면 Swap In 되어 처리가 되어야 하는데, 이때 새로운 데이터가 Swap In 데이터 보다 빠르게 Active Queue로 들어온다면 Swap In 되어야할 데이터 보다 Active Queue로 들어온 데이터가 먼저 처리가 되어 Swap out된 데이터가 후순위로 밀린다는 이야기 이다.

즉 Flowfile의 역전이 발생하는 원인은 위와 같으며 프로세서간 Queue가 가득 찰때 발생할 수 있는 버그 인것으로 판명이 되었다.

4. 해결

위의 지라 이슈 링크를 보면 알겠지만 해당이슈는 처리완료되어 Nifi version 1.11.0에 포함되었다.
즉 Nifi의 버전 업그레이드를 진행하면 해결 가능하다. (고생하며 원인을찾은 것이 비하면 비교적 쉽게 해결 가능하다.)