引言
KeyedProcessFunction是Flink用于處理KeyedStream的數據集合,它比ProcessFunction擁有更多特性,例如狀態處理和定時器功能等。接下來就一起來了解下這個函數吧
正文
了解一個函數怎么用最權威的地方就是 官方文檔 以及注解,KeyedProcessFunction的注解如下
/*** A keyed function that processes elements of a stream.** <p>For every element in the input stream {@link #processElement(Object, Context, Collector)} is* invoked. This can produce zero or more elements as output. Implementations can also query the* time and set timers through the provided {@link Context}. For firing timers {@link #onTimer(long,* OnTimerContext, Collector)} will be invoked. This can again produce zero or more elements as* output and register further timers.** <p><b>NOTE:</b> Access to keyed state and timers (which are also scoped to a key) is only* available if the {@code KeyedProcessFunction} is applied on a {@code KeyedStream}.** <p><b>NOTE:</b> A {@code KeyedProcessFunction} is always a {@link* org.apache.flink.api.common.functions.RichFunction}. Therefore, access to the {@link* org.apache.flink.api.common.functions.RuntimeContext} is always available and setup and teardown* methods can be implemented. See {@link* org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}* and {@link org.apache.flink.api.common.functions.RichFunction#close()}.*/
上面簡單來說就是以下四點
- Flink中輸入流中的每一條數據都會觸發KeyedProcessFunction類的processElement方法調用
- 通過這個方法的Context參數可以設置定時器,在開啟定時器后會程序會定時調用onTimer方法
- 由于KeyedProcessFunction實現了RichFunction接口,因此是可以通過RuntimeContext上下文對象管理狀態state的開啟和釋放
- 需要注意的是,只有在KeyedStream里才能夠訪問state和定時器,通俗點來說就是這個函數要用在keyBy這個函數的后面
processElement方法解析
- Flink會調用processElement方法處理輸入流中的每一條數據
- KeyedProcessFunction.Context參數可以用來讀取以及更新內部狀態state
- 這個KeyedProcessFunction跟其他function一樣通過參數中的Collector對象以回寫的方式返回數據
onTimer方法解析:在啟用TimerService服務時會定時觸發此方法,一般會在processElement方法中開啟TimerService服務
以上就是這個函數的基本知識,接下來就通過實戰來熟悉下它的使用
實戰簡介
本次實戰的目標是學習KeyedProcessFunction,內容如下:
- 監聽本機7777端口讀取字符串
- 將每個字符串用空格分隔,轉成Tuple2實例,f0是分隔后的單詞,f1等于1
- 將Tuple2實例集合通過f0字段分區,得到KeyedStream
- KeyedSteam通過自定義KeyedProcessFunction處理
- 自定義KeyedProcessFunction的作用,是記錄每個單詞最新一次出現的時間,然后建一個十秒的定時器進行觸發
使用代碼例子
首先定義pojo類
public class CountWithTimestampNew {private String key;private long count;private long lastQuestTimestamp;public long getAndIncrementCount() {return ++count;}public String getKey() {return key;}public void setKey(String key) {this.key = key;}public long getCount() {return count;}public void setCount(long count) {this.count = count;}public long getLastQuestTimestamp() {return lastQuestTimestamp;}public void setLastQuestTimestamp(long lastQuestTimestamp) {this.lastQuestTimestamp = lastQuestTimestamp;}
}
接著實現KeyedProcessFunction類
public class CountWithTimeoutKeyProcessFunctionNew extends KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Long>> {private ValueState<CountWithTimestampNew> state;@Overridepublic void open(Configuration parameters) throws Exception {state = getRuntimeContext().getState(new ValueStateDescriptor<CountWithTimestampNew>("sherlock-state", CountWithTimestampNew.class));}// 實現數據處理邏輯的地方@Overridepublic void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {Tuple currentKey = ctx.getCurrentKey();CountWithTimestampNew countWithTimestampNew = state.value();if (countWithTimestampNew == null) {countWithTimestampNew = new CountWithTimestampNew();countWithTimestampNew.setKey(value.f0);}countWithTimestampNew.getAndIncrementCount();//更新這個單詞最后一次出現的時間countWithTimestampNew.setLastQuestTimestamp(ctx.timestamp());//單詞之間不會互相覆蓋嗎?推測state對象是跟key綁定,針對每一個不同的key KeyedProcessFunction會創建其對應的state對象state.update(countWithTimestampNew);//給當前單詞創建定時器,十秒后觸發long timer = countWithTimestampNew.getLastQuestTimestamp()+10000;//嘗試注釋掉看看是否還會觸發onTimer方法ctx.timerService().registerProcessingTimeTimer(timer);//打印所有信息,用于確保數據準確性System.out.println(String.format(" 觸發processElement方法,當前的key是 %s, 這個單詞累加次數是 %d, 上次請求的時間是:%s, timer的時間是: %s",currentKey.getField(0),countWithTimestampNew.getCount(),time(countWithTimestampNew.getLastQuestTimestamp()),time(timer)));}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {Tuple currentKey = ctx.getCurrentKey();CountWithTimestampNew countWithTimestampNew = state.value();//標記當前元素是否已經連續10s未出現boolean isTimeout = false;if (timestamp >= countWithTimestampNew.getLastQuestTimestamp()+10000 ) {//out.collect(new Tuple2<>(countWithTimestampNew.getKey(), countWithTimestampNew.getCount()));isTimeout = true;}//打印所有信息,用于確保數據準確性System.out.println(String.format(" 觸發onTimer方法,當前的key是 %s, 這個單詞累加次數是 %d, 上次請求的時間是:%s, timer的時間是: %s, 當前單詞是否已超過10秒沒有再請求: %s",currentKey.getField(0),countWithTimestampNew.getCount(),time(countWithTimestampNew.getLastQuestTimestamp()),time(timestamp),String.valueOf(isTimeout)));}public static String time(long timeStamp) {return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date(timeStamp));}
}
最后是啟動類
public class KeyedProcessFunctionDemo2 {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 并行度1env.setParallelism(1);// 處理時間env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);// 監聽本地9999端口,讀取字符串DataStream<String> socketDataStream = env.socketTextStream("localhost", 7777);// 所有輸入的單詞,如果超過10秒沒有再次出現,都可以通過CountWithTimeoutFunction得到DataStream<Tuple2<String, Long>> timeOutWord = socketDataStream// 對收到的字符串用空格做分割,得到多個單詞.flatMap(new SplitterFlatMapFunction())// 設置時間戳分配器,用當前時間作為時間戳.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Integer>>() {@Overridepublic long extractTimestamp(Tuple2<String, Integer> element, long previousElementTimestamp) {// 使用當前系統時間作為時間戳return System.currentTimeMillis();}@Overridepublic Watermark getCurrentWatermark() {// 本例不需要watermark,返回nullreturn null;}})// 將單詞作為key分區.keyBy(0)// 按單詞分區后的數據,交給自定義KeyedProcessFunction處理.process(new CountWithTimeoutKeyProcessFunctionNew());// 所有輸入的單詞,如果超過10秒沒有再次出現,就在此打印出來timeOutWord.print();env.execute("ProcessFunction demo : KeyedProcessFunction");}
}
演示
在啟動服務前,先通過linux指令監聽端口 nc -lk 7777
-
啟動Flink服務后,往7777端口里面發送數據
-
通過IDEA的終端可以看到有日志輸出,可以看到在發送消息的時候第一條日志立馬打印出來并在10秒后輸出第二條日志
-
那么咱們嘗試連續發送兩條Hello呢,可以看到累加器會持續累加,并且會觸發兩次onTimer方法,也就是每一條消息都會觸發一次。由于連續發送兩條,因此可以看得到第三行日志的末尾是false,說明收到第一條后的10秒內又有相同的消息進來。第二條是ture說明在收到第二條消息后的10秒內沒有消息進來
-
再輸入點其他的試試
-
通過輸出可以看到這些單詞的計數器又從0開始,說明每一個Key都對應一個狀態
思考題
- open方法會在哪里進行調用,KeyedProcessFunction整個類的完整調用邏輯是怎么樣的
- registerProcessingTimeTimer和registerEventTimeTimer的差異是什么
參考資料
- https://blog.csdn.net/boling_cavalry/article/details/106299167
- https://blog.csdn.net/lujisen/article/details/105510532
- https://blog.csdn.net/qq_31866793/article/details/102831731