Apache nifi 是個數據流系統,可以通過配置 自定義的流程來實現數據的轉換。
比如可以配置一個流程,讀取數據庫里的數據,再轉換,最后保存到本地文件。
這樣可以來實現一些數據轉換的操作,而不用特地編寫程序來導入導出。
- 相關資料
國內下載鏡像: https://mirrors.aliyun.com/apache/nifi/2.2.0/, 里面有不同的版本,這里下載標準版的nifi-2.2.0-bin.zip
官方文檔: https://nifi.apache.org/
processor類型的文檔:https://nifi.apache.org/components
本文包含以下內容:
- 啟動運行
- 一些概念
- 一個讀取csv格式數據,轉換為json格式數據,寫入到本地磁盤的demo
啟動運行:
只需要解壓zip,進入bin目錄,打開命令行,執行執行nifi start即可啟動,啟動后密碼會生成到logs目錄的nifi-app.log文件里,搜索password可以看到生成的賬號密碼
D:\nifi-2.2.0-bin\nifi-2.2.0\bin>nifi start
JAVA_HOME=C:\Program Files\Java\jdk1.8.0_161
NIFI_HOME=D:\ljhlab\nifi-2.2.0-bin\nifi-2.2.0
日志文件里搜索,可以看到:
2025-03-02 22:17:53,849 INFO [main] o.a.n.a.s.u.SingleUserLoginIdentityProvider
Generated Username [8b418056-cf0a-46a3-b08f-a8ba25563eca]
Generated Password [9Va1ZkYgiXtqnOUytqj63ympwhZOjqZi]
這個就是生成的賬號密碼
異常處理,可能會遇到jdk版本問題,請下載對于版本的jdk
Error: A JNI error has occurred, please check your installation and try again
Exception in thread "main" java.lang.UnsupportedClassVersionError: org/apache/nifi/bootstrap/BootstrapProcess has been compiled by a more recent version of the Java Runtime (class file version 65.0), this version of the Java Runtime only recognizes class file versions up to 52.0
啟動失敗,可以看到是jdk版本要 65 ,需要jdk21的意思。
修改nifi-env.cmd,添加,或者直接在環境變量里設置JAVA_HOME的值
set JAVA_HOME="D:\env\openjdk-22.0.1_windows-x64_bin\jdk-22.0.1"
- 進入網頁·
訪問https://localhost:8443 進入網頁ui界面
實驗demo:
下面進行一個讀取本地csv文件,然后導入到另外目錄成json格式的實驗,這里為了演示方便,csv數據寫死在一個輸入組件里。
數據準備:
title,author,score
你殺了誰,東野圭吾,7.1
黃仁勛:英偉達之芯,斯蒂芬·威特,7.3
流程簡介: 就是不斷添加不同類型的processor,比如獲取文件的,轉換數據格式的,然后用線將不同組件鏈接起來,比如讀取文件,鏈接到解析csv,鏈接的時候選擇relationship,每個組件都會有relationship
實驗流程:
- 添加一個組件作為輸出數據源
- 添加一個轉換數據格式的組件,ConvertRecord,轉換csv成json格式
- 添加一個組件,將ConvertRecord都內容輸出到本地文件
添加一個GenerateFLowFile的組件,用來生成一個FlowFile,FlowFile可以理解為一行數據,即這個組件會輸出一行數據,這里生成一個csv數據
剛添加時有問題就會有黃色標,可以點擊查看錯誤,來修正
每個組件都有properties來設置屬性,這里點開來設置,這個組件輸出的內容
- 再添加一個處理器,ConvertRecord,這個processor可以配置一個reader和writer,這里的話配置一個csv reader,用來解析第一個處理器傳進來的數據。
點開properties,看到設置值里是空的,點擊create new service創建一個配置。
然后將兩個組件鏈接起來
可以看到還是有警告,是因為還有relationship沒有設置,每個都要設置,
像failure就是失敗時的,如果不需要進一步處理可以勾選terminate。success的等會會和鏈接到一個寫入文件的組件里,這里測試也可以暫時設置為terminate先。
如果看到這種警告,就是剛剛添加的service還有有啟動,打開properties,點擊三個點,點go to service,把服務啟動就行,在這里也可以進一步配置這些服務的屬性
然后沒有警告了,運行測試下,先啟動第一個GenerateFlowFile,右鍵start
可以看到queued里有一個等待了,因為下一個組件還沒啟動,就在隊列里了,再啟動CovertRecord來消費。
啟動后可以看到in了三個,由于沒有輸出,可以點擊VIew Data Provenance來檢查,輸入的數據情況。
點擊view detail,可以看到輸入輸出情況,
輸出的已經是json格式了,然后可以,添加一個寫出的processor,來輸出到某個目錄。
繼續添加processor,
繼續鏈接到一起
然后配置下putFIle的properties,設置寫出的目錄,
右鍵start,后可以看到目錄以及有寫出的文件了。
經驗:
- 下拉框為空時,點擊三點可以打開創建新service的菜單
- properties tab里配置完后可以點擊verfication驗證