1 從Kafka接收消息,存儲到數據庫中。
(1) ConsumerKafka processor
(2)Execute Scripts Processor
我這里是使用JS腳本進行處理。 還有很多其他語言的腳本。
var flowFile = session.get();
if (flowFile != null) {var IOUtils = Java.type("org.apache.commons.io.IOUtils");var StreamCallback = Java.type("org.apache.nifi.processor.io.StreamCallback");var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");var DateFormatUtils=Java.type("org.apache.commons.lang3.time.DateFormatUtils");// var dataType=flowFile.getAttribute('data_type')// var FLAG=flowFile.getAttribute('flag')var tm = null;try {flowFile = session.write(flowFile, new StreamCallback(function (inputStream, outputStream) {var inputText = IOUtils.toString(inputStream, StandardCharsets.UTF_8);var msg = JSON.parse(inputText);var stationId = msg['stationId'];var stationName = msg['stationName'];var deviceId = msg['deviceId'];var deviceName = msg['deviceName'];var deviceNo = msg['deviceNo'];var receiveType = msg['receiveType'];var createAt = msg['createAt'];var createAtString=DateFormatUtils.format(Number(createAt),'yyyy-MM-dd HH:mm:ss');var obTime = msg['obTime'];var obDate = msg['obDate'];var obDateString=DateFormatUtils.format(Number(obDate),'yyyy-MM-dd HH:mm:ss');var order = msg['order'];var distance = msg['distance'];var channel1SignalStrength = msg['channel1SignalStrength']var powerVoltage = msg['powerVoltage']var sql = 'insert into "SJZT_ODS"."water_data_distance"('+ '"station_id", "station_name", "device_id", "device_name", "device_no", "receive_type", "create_at", "ob_time", "ob_date", "order", "distance", "channel1_signal_strength", "power_voltage")'+ 'VALUES('+ stationId + ', \'' + stationName + '\', ' + deviceId + ', \'' + deviceName + '\', \'' + deviceNo + '\', ' + receiveType + ', \'' + createAtString + '\', \'' + obTime + '\', \'' + obDateString + '\', ' + order + ', ' + distance + ', ' + channel1SignalStrength + ', ' + powerVoltage + ')';outputStream.write(sql.getBytes(StandardCharsets.UTF_8));}));// flowFile = session.putAttribute(flowFile, "tm",tableName);session.transfer(flowFile, REL_SUCCESS);} catch (e) {flowFile = session.putAttribute(flowFile, "rsvr.transfer.error", e);session.transfer(flowFile, REL_FAILURE);}
}
注意: 這里只是生成了一個sql字符串,并沒有執行sql,因此需要后面的processor來執行sql語句。
(3)PutSql processor
注意:autocommit要設置為true,否則看不到數據庫里面的數據的。
2 將一堆Processors移動到一個Group里面界面操作
貌似沒有直接的移動操作。
(1) Ctrl + A 全選要移動的processors
(2) 點擊左邊的group按鈕
(3)為新的Group命名
(4)好了。選中的所有的processors都移動到了自己新創建的group中了。
參考材料
[1] https://blog.csdn.net/guijianchouxyz/article/details/120340154