본문 바로가기

OpenSource/Nifi

(5)
[Nifi] Flowfile 순서보장 삽질기 앞서 몇번의 포스팅에서도 소개했듯, 필자는 현재 Nifi를 통해 RDBMS 데이터를 하둡으로 실시간 복제하기 위하여 사용중이다. Nifi Processor를 설정한 뒤에 데이터를 검증하는 도중 특정 상황에 데이터(flowfile)이 역전되는 현상을 발견하였고 이를 해결하기 위한 내용을 포스팅해보고자한다. 1. Nifi 클러스터 정보 nifi version: 1.10.0 openjdk 1.8 2. 데이터 역전이 발생되는 상황 1) Processor 구성 Custom Processor를 생성하여 주기적으로 특정 테이블에 쿼리를 수행하여 각 row에 대한 flowfile을 생성하고 해당 데이터를 Kafka로 보내고 있다. 그리고 쿼리 수행이 완료되면 CheckPoint를 state에 저장하고 다음 쿼리에 해당..
[Nifi] Flowfile 중복제거를 위한 DetectDuplicate 사용 및 주의점 필자는 커스텀 프로세서를 통해 DB에서 특정 시간컬럼을 기준으로 데이터를 크롤링하고, 실시간 반영을 위하여 kafka로 전송하는 것을 Nifi를 통해 구현하였다. 이때 내부적 이슈에 의해 DB에서 데이터를 검색하는 쿼리에서 누락이 발생했고, 이를 해결하기 위하여 마지막 동작시간 - 10초 전 데이터를 더 가지고 오도록 함으로서 누락 문제를 해결하였으나, 추가적으로 데이터의 중복이라는 문제를 해결해야 하는 이슈가 있었다. 이번에는 flowfile 중복을 해결할수 있는 DetectDuplicate를 사용하는 방법과 주의할 점에 대해서 이야기 해보도록 하려고 한다. 1. DetectDuplicate의 기본컨셉 DetectDuplicate는 flowfile contents의 hash값을 통하여 기존에 캐시에 존..
[Nifi] Load Balance와 Partition by attribute 삽질기 Nifi 1.8.0 버전부터 Load Balance라는 기능이 생겼다. (자세한 설명은 여기로) 말그대로 flowfile을 적당한 기준에 맞춰 각 노드들로 분산을 시켜준다는것인데, 직관적인 이름이기 때문에 이해하기 어렵지 않다. 1. Load Balance 종류 Do Not load balance(기본값): 로드 밸런스를 사용하지 않음 Partition by attribute : Flowfile의 특정 attribute를 통해서 로드 밸런싱 함 Round robin: Nifi 클러스터를 구성하는 노드에 순차적으로 분산시킴 Single Node: 하나의 Nifi 노드로만 보냄 테스트 클러스터 : 3 Nodes GenerateFlowFile(Primary Node Only) : 테스트용 Flowfile 생성..
[Nifi]Could Not Generate Extensions Documentation 에러 해결 방법 커스텀 프로세서 생성 후 mvn clean package를 통해 .nar파일을 생성하는데, 아래와 같은 에러가 발생했다. [ERROR] Could not generate extensions' documentation org.apache.maven.plugin.MojoExecutionException: Failed to create Extension Documentation at org.apache.nifi.NarMojo.generateDocumentation (NarMojo.java:596) at org.apache.nifi.NarMojo.execute (NarMojo.java:499) at org.apache.maven.plugin.DefaultBuildPluginManager.executeMo..
[Nifi] Custom Processor 생성 및 테스트 nifi에 커스텀 프로세서를 등록할 일이 생겼는데, 커스텀 프로세서를 빌드한 후 테스트 하기가 까다로워 도커를 이용하여 로컬에 환경을 구축하고 간단하게 테스트 하는 방법을 적용해 보았다. 1. 프로젝트 생성 아래는 maven이 설치 된 환경에서 진행했으며, maven이 설치되지 않았다면 Installing Apache Maven 을 참고하여 maven을 설치하고 진행하도록 하자. $ mvn archetype:generate ## 아래와 같이 Choose a number .. 라고 나오면 nifi를 입력하여 nifi archetype을 검색하자. Choose a number or apply filter (format: [groupId:]artifactId, case sensitive contains): 1..