一文弄明白KeyedProcessFunction函數

引言

KeyedProcessFunction是Flink用于處理KeyedStream的數據集合,它比ProcessFunction擁有更多特性,例如狀態處理和定時器功能等。接下來就一起來了解下這個函數吧

正文

了解一個函數怎么用最權威的地方就是 官方文檔 以及注解,KeyedProcessFunction的注解如下

/*** A keyed function that processes elements of a stream.** <p>For every element in the input stream {@link #processElement(Object, Context, Collector)} is* invoked. This can produce zero or more elements as output. Implementations can also query the* time and set timers through the provided {@link Context}. For firing timers {@link #onTimer(long,* OnTimerContext, Collector)} will be invoked. This can again produce zero or more elements as* output and register further timers.** <p><b>NOTE:</b> Access to keyed state and timers (which are also scoped to a key) is only* available if the {@code KeyedProcessFunction} is applied on a {@code KeyedStream}.** <p><b>NOTE:</b> A {@code KeyedProcessFunction} is always a {@link* org.apache.flink.api.common.functions.RichFunction}. Therefore, access to the {@link* org.apache.flink.api.common.functions.RuntimeContext} is always available and setup and teardown* methods can be implemented. See {@link* org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}* and {@link org.apache.flink.api.common.functions.RichFunction#close()}.*/

上面簡單來說就是以下四點

  1. Flink中輸入流中的每一條數據都會觸發KeyedProcessFunction類的processElement方法調用
  2. 通過這個方法的Context參數可以設置定時器,在開啟定時器后會程序會定時調用onTimer方法
  3. 由于KeyedProcessFunction實現了RichFunction接口,因此是可以通過RuntimeContext上下文對象管理狀態state的開啟和釋放
  4. 需要注意的是,只有在KeyedStream里才能夠訪問state和定時器,通俗點來說就是這個函數要用在keyBy這個函數的后面

processElement方法解析

  1. Flink會調用processElement方法處理輸入流中的每一條數據
  2. KeyedProcessFunction.Context參數可以用來讀取以及更新內部狀態state
  3. 這個KeyedProcessFunction跟其他function一樣通過參數中的Collector對象以回寫的方式返回數據

onTimer方法解析:在啟用TimerService服務時會定時觸發此方法,一般會在processElement方法中開啟TimerService服務

以上就是這個函數的基本知識,接下來就通過實戰來熟悉下它的使用

實戰簡介

本次實戰的目標是學習KeyedProcessFunction,內容如下:

  1. 監聽本機7777端口讀取字符串
  2. 將每個字符串用空格分隔,轉成Tuple2實例,f0是分隔后的單詞,f1等于1
  3. 將Tuple2實例集合通過f0字段分區,得到KeyedStream
  4. KeyedSteam通過自定義KeyedProcessFunction處理
  5. 自定義KeyedProcessFunction的作用,是記錄每個單詞最新一次出現的時間,然后建一個十秒的定時器進行觸發

使用代碼例子

首先定義pojo類

public class CountWithTimestampNew {private String key;private long count;private long lastQuestTimestamp;public long getAndIncrementCount() {return ++count;}public String getKey() {return key;}public void setKey(String key) {this.key = key;}public long getCount() {return count;}public void setCount(long count) {this.count = count;}public long getLastQuestTimestamp() {return lastQuestTimestamp;}public void setLastQuestTimestamp(long lastQuestTimestamp) {this.lastQuestTimestamp = lastQuestTimestamp;}
}

接著實現KeyedProcessFunction類

public class CountWithTimeoutKeyProcessFunctionNew extends KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Long>> {private ValueState<CountWithTimestampNew> state;@Overridepublic void open(Configuration parameters) throws Exception {state = getRuntimeContext().getState(new ValueStateDescriptor<CountWithTimestampNew>("sherlock-state", CountWithTimestampNew.class));}// 實現數據處理邏輯的地方@Overridepublic void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {Tuple currentKey = ctx.getCurrentKey();CountWithTimestampNew countWithTimestampNew = state.value();if (countWithTimestampNew == null) {countWithTimestampNew = new CountWithTimestampNew();countWithTimestampNew.setKey(value.f0);}countWithTimestampNew.getAndIncrementCount();//更新這個單詞最后一次出現的時間countWithTimestampNew.setLastQuestTimestamp(ctx.timestamp());//單詞之間不會互相覆蓋嗎?推測state對象是跟key綁定,針對每一個不同的key KeyedProcessFunction會創建其對應的state對象state.update(countWithTimestampNew);//給當前單詞創建定時器,十秒后觸發long timer = countWithTimestampNew.getLastQuestTimestamp()+10000;//嘗試注釋掉看看是否還會觸發onTimer方法ctx.timerService().registerProcessingTimeTimer(timer);//打印所有信息,用于確保數據準確性System.out.println(String.format(" 觸發processElement方法,當前的key是 %s, 這個單詞累加次數是 %d, 上次請求的時間是:%s, timer的時間是: %s",currentKey.getField(0),countWithTimestampNew.getCount(),time(countWithTimestampNew.getLastQuestTimestamp()),time(timer)));}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {Tuple currentKey = ctx.getCurrentKey();CountWithTimestampNew countWithTimestampNew = state.value();//標記當前元素是否已經連續10s未出現boolean isTimeout = false;if (timestamp >= countWithTimestampNew.getLastQuestTimestamp()+10000 ) {//out.collect(new Tuple2<>(countWithTimestampNew.getKey(), countWithTimestampNew.getCount()));isTimeout = true;}//打印所有信息,用于確保數據準確性System.out.println(String.format(" 觸發onTimer方法,當前的key是 %s, 這個單詞累加次數是 %d, 上次請求的時間是:%s, timer的時間是: %s, 當前單詞是否已超過10秒沒有再請求: %s",currentKey.getField(0),countWithTimestampNew.getCount(),time(countWithTimestampNew.getLastQuestTimestamp()),time(timestamp),String.valueOf(isTimeout)));}public static String time(long timeStamp) {return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date(timeStamp));}
}

最后是啟動類

public class KeyedProcessFunctionDemo2 {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 并行度1env.setParallelism(1);// 處理時間env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);// 監聽本地9999端口,讀取字符串DataStream<String> socketDataStream = env.socketTextStream("localhost", 7777);// 所有輸入的單詞,如果超過10秒沒有再次出現,都可以通過CountWithTimeoutFunction得到DataStream<Tuple2<String, Long>> timeOutWord = socketDataStream// 對收到的字符串用空格做分割,得到多個單詞.flatMap(new SplitterFlatMapFunction())// 設置時間戳分配器,用當前時間作為時間戳.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Integer>>() {@Overridepublic long extractTimestamp(Tuple2<String, Integer> element, long previousElementTimestamp) {// 使用當前系統時間作為時間戳return System.currentTimeMillis();}@Overridepublic Watermark getCurrentWatermark() {// 本例不需要watermark,返回nullreturn null;}})// 將單詞作為key分區.keyBy(0)// 按單詞分區后的數據,交給自定義KeyedProcessFunction處理.process(new CountWithTimeoutKeyProcessFunctionNew());// 所有輸入的單詞,如果超過10秒沒有再次出現,就在此打印出來timeOutWord.print();env.execute("ProcessFunction demo : KeyedProcessFunction");}
}

演示

在啟動服務前,先通過linux指令監聽端口 nc -lk 7777

  1. 啟動Flink服務后,往7777端口里面發送數據
    在這里插入圖片描述

  2. 通過IDEA的終端可以看到有日志輸出,可以看到在發送消息的時候第一條日志立馬打印出來并在10秒后輸出第二條日志
    在這里插入圖片描述

  3. 那么咱們嘗試連續發送兩條Hello呢,可以看到累加器會持續累加,并且會觸發兩次onTimer方法,也就是每一條消息都會觸發一次。由于連續發送兩條,因此可以看得到第三行日志的末尾是false,說明收到第一條后的10秒內又有相同的消息進來。第二條是ture說明在收到第二條消息后的10秒內沒有消息進來
    在這里插入圖片描述

  4. 再輸入點其他的試試
    在這里插入圖片描述

  5. 通過輸出可以看到這些單詞的計數器又從0開始,說明每一個Key都對應一個狀態
    在這里插入圖片描述

思考題

  1. open方法會在哪里進行調用,KeyedProcessFunction整個類的完整調用邏輯是怎么樣的
  2. registerProcessingTimeTimer和registerEventTimeTimer的差異是什么

參考資料

  1. https://blog.csdn.net/boling_cavalry/article/details/106299167
  2. https://blog.csdn.net/lujisen/article/details/105510532
  3. https://blog.csdn.net/qq_31866793/article/details/102831731

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/696739.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/696739.shtml
英文地址,請注明出處:http://en.pswp.cn/news/696739.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

c++實現棧和隊列類

c實現棧和隊列類 棧(Stack)Stack示意圖Stack.cpp 隊列(queue)queue 示意圖queue.cpp 棧(Stack) Stack示意圖 Stack.cpp #pragma once #include "ListStu.cpp"template<typename T> class Stack { public: /* * void push(T& tDate)* 參數一 &#xff1a;…

【OCR專題文章】

目錄 一、數據獲取及預處理方法篇 二、兩階段算法篇(檢測識別) 三、一階段算法篇(Enc-Dec) 四、拓新篇 本欄聚焦在OCR的相關算法&#xff0c;專欄內文章的代碼均已實現。 一、數據獲取及預處理方法篇 【數據獲取】 合同數據獲取&#xff1a;【OCR】【專題系列】二、數據獲取-…

解決windows無法訪問wsl下docker服務

筆者在初學使用wsl跑docker時,遇到了windows無法訪問的問題,并且瀏覽了大部分的文章,發現并沒有起效,在反復試錯終于成功之后,總結為以下幾點: 1.升級至wsl2 2.將.wslconfig文件(用戶文件夾下)中的如下鏡像服務關閉刪除 networkingModemirrored 3.打開wsl防火墻相應的端口 …

記錄解決uniapp使用uview-plus在vue3+vite+ts項目中打包后樣式不能顯示問題

一、背景 從 vue2uview1 升級到 vue3vitetsuview-plus ,uview組件樣式打包后不顯示&#xff0c;升級前uview 組件是可以正常顯示&#xff0c;升級后本地運行是可以正常顯示&#xff0c;但是打包發布成H5后uview的組件無法正常顯示&#xff0c;其他uniapp自己的組件可以正常顯示…

Vue 中 onclick和@click區別

文章目錄 一、直接上結論二、驗證代碼&#xff0c;可直接運行三、點擊結果 一、直接上結論 onclick 只能觸發 js的原生方法&#xff0c;不能觸發vue的封裝方法click 只能觸發vue的封裝方法&#xff0c;不能觸發js的原生方法 二、驗證代碼&#xff0c;可直接運行 <!DOCTYP…

Vue3 + Ts (使用lodash)

安裝 npm i --save lodash使用 import _ from lodash??報警告&#xff1a;&#xff01;&#xff01;&#xff01; 此時還需要安裝ts聲明文件庫 npm install types/lodash -D安裝之后重啟Vscode還是會提示上面的警告&#xff0c;此時還需在tsconfig.ts里面配置 {"c…

快速將excel/word表格轉換為web頁面(html)的方法

前言 在進行開發企業信息化建設的過程&#xff0c;應該有很多這樣的場景&#xff0c;就是將現有的電子表格記錄的方式轉換為在數據系統中進行網頁上報。也就是需要根據當前一直使用的表格制作一個上傳這個表格信息的網頁&#xff0c;如果要減少系統的使用學習成本&#xff0c;…

【Day55】代碼隨想錄之動態規劃_買賣股票含冷凍期和手續費

文章目錄 動態規劃理論基礎動規五部曲&#xff1a;出現結果不正確&#xff1a; 1. 最佳買賣股票的時機含冷凍期2. 買賣股票的最佳時機含手續費 動態規劃理論基礎 動規五部曲&#xff1a; 確定dp數組 下標及dp[i] 的含義。遞推公式&#xff1a;比如斐波那契數列 dp[i] dp[i-1…

【Elasticsearch專欄 01】深入探索:Elasticsearch的正向索引和倒排索引是什么

文章目錄 什么是Elasticsearch的正向索引和倒排索引&#xff1f;1.倒排索引&#xff08;Inverted Index&#xff09;2.正向索引&#xff08;Forward Index&#xff09;3.小結 什么是Elasticsearch的正向索引和倒排索引&#xff1f; 首先&#xff0c;要明確的是&#xff0c;Ela…

leetcode:78.子集

1.樹形結構&#xff1a;往后依次取該數字往后的數字&#xff08;前面的不要取&#xff0c;否則子集會重復&#xff09;&#xff1b;每一層遞歸的結果都要放入結果集&#xff0c;而并非只放葉子節點。 代碼實現&#xff1a; #達到了葉子節點&#xff08;終止條件&#xff09; …

抖音百科詞條創建在哪里?

抖音百科就是頭條百科&#xff0c;頭條百科是一個在線百科全書平臺&#xff0c;用戶可以在上面創建、編輯和瀏覽各種百科詞條。頭條百科詞條可以被抖音抓取到&#xff0c;從而獲得更多流量和曝光&#xff0c;所以當你創建一個抖音百科詞條的時候&#xff0c;就能更加提高自身的…

logbak日志單獨打印(方法層級)

logbak日志單獨打印 問題 前幾天朋友在群里問&#xff0c;怎么針對方法打印打印日志&#xff0c;不是針對類。 解決辦法 方法層 GetMapping("getLog1")public String getLog1(){Logger specialLogger LoggerFactory.getLogger(TestController.class.getName() …

人工智能_CPU安裝運行ChatGLM大模型_ChatGlm-6B_啟動命令行對話_安裝API調用接口_005---人工智能工作筆記0100

然后我們再進入 /data/module/ChatGLM-6B-main文件夾 然后我們去啟動,命令行工具 python3 cli_demo.py 可以看到也是可以用了. 正常可以用了. 然后主要來看,如何使用api來調用呢,這樣才可以,做自己的界面 可以看到就非常簡單了只需要: 走到 /data/module/

9-1 A. 圖綜合練習--構建鄰接表

題目描述 已知一有向圖&#xff0c;構建該圖對應的鄰接表。 鄰接表包含數組和單鏈表兩種數據結構&#xff0c;其中每個數組元素也是單鏈表的頭結點&#xff0c;數組元素包含兩個屬性&#xff0c;屬性一是頂點編號info&#xff0c;屬性二是指針域next指向與它相連的頂點信息。 單…

即時設計和sketch對比

在設計領域&#xff0c;有很多易于使用的設計軟件&#xff0c;每個軟件都有自己的特點&#xff0c;但在使用中也會有一些限制。例如&#xff0c;傳統的Sketch。Sketch是一款古老的UI設計軟件。自2010年推出以來&#xff0c;已有10多年的歷史&#xff0c;但它始終僅限于MAC。到目…

深入理解Spring Boot Starter:概念、特點、場景、原理及自定義starter

這是目錄 **一、引言****二、Spring Boot Starter基本概念****三、Spring Boot Starter的主要特點****四、Spring Boot Starter的應用場景****五、Spring Boot Starter的實現原理****六、自定義spring boot starter****為什么要創建自定義Starter&#xff1f;****創建自定義Spr…

【JS逆向學習】同花順(q.10jqka)補環境

逆向目標 目標網址&#xff1a;https://q.10jqka.com.cn/ 目標接口&#xff1a; https://q.10jqka.com.cn/index/index/board/all/field/zdf/order/desc/page/3/ajax/1/ 目標參數&#xff1a;cookie 逆向過程 老規矩&#xff0c;先分析網絡請求&#xff0c;發現是 cookie 加…

matlab代碼--基于matlabLDPC-和積譯碼系統

LDPC編碼 一個碼長為n、信息位個數為k的線性分組碼&#xff08;n,k&#xff09;可以由一個生成矩陣 來定義&#xff0c;信息序列 通過G被映射到碼字XS.G。線性分組碼也可以由一個校驗矩陣 來描述。所以碼字均滿足 。校驗矩陣的每一行表示一個校驗約束 &#xff0c;其中所有的非…

一文吃透計算機網絡面試八股文

面試網站&#xff1a;topjavaer.cn 目錄&#xff1a; 網絡分層結構三次握手兩次握手可以嗎&#xff1f;四次揮手第四次揮手為什么要等待2MSL&#xff1f;為什么是四次揮手&#xff1f;TCP有哪些特點&#xff1f;說說TCP報文首部有哪些字段&#xff0c;其作用又分別是什么&…

Autochip rtos videoin enqueuedequeue

vBackcarMainTask 目錄 xBCModulesInit 初始化videoin xVideoinHwInit 初始創建mipi 初始化線程 vis_init 初始化g_vis_ctrl 配置mipi 初始化參數 xVideoinFillParam獲取sensor驅動配置的分辨率 xVideoinHwGetInfo 配置分辨率 vendor/autochips/proprietary/tinysys/vis…