Flink多流處理之Broadcast(廣播變量)

寫過Spark批處理的應該都知道,有一個廣播變量broadcast這樣的一個算子,可以優化我們計算的過程,有效的提高效率;同樣在Flink中也有broadcast,簡單來說和Spark中的類似,但是有所區別,首先Spark中的broadcast是靜態的數據,而Flink中的broadcast是動態的,也就是源源不斷的數據流.在Flink中會將廣播的數據存到state中.
在這里插入圖片描述
在Flink中主流數據可以獲取state中的所有狀態數據,使用過window的應該都清楚,當兩個streamData中的數據到達窗口的時間剛好錯過時就會發生關聯不上的情況,如window2S,sreamData1到達窗口的時間剛好卡在這個2S窗口的尾端,而streamData到達窗口時,這個窗口已經結束了,這種情況就算這兩條數據有相同id也無法進行關聯了.
但是broadcast會將到達的數據都存儲在state中,這樣主流到達的每一條數據都可以和state中的廣播流數據進行關聯比較.
在這里插入圖片描述
流程圖內容可能不夠準確,只是為了看起來方便理解.

  • 數據源
    # 主流數據
    ?  ~ nc -lk 1234
    101,瀏覽商品,2023-08-02
    102,瀏覽商品,2023-08-02
    103,查看商品價格,2023-08-04
    101,商品加入購物車,2023-08-03
    101,從購物車刪除商品,2023-08-03
    102,下單,2023-08-02
    102,申請延期發貨,2023-08-03
    103,點擊商品詳情頁,2023-08-04
    104,點擊收藏,2023-08-05
    104,下單,2023-08-05
    104,付款,2023-08-06
    105,瀏覽商品,2023-08-07
    106,瀏覽商品,2023-08-07
    106,加入購物車,2023-08-08
    107,瀏覽商品,2023-08-10
    
    # 廣播流數據
    ?  ~ nc -lk 5678
    101,小明
    102,張麗
    103,公孫飛天
    104,王二虎
    106,李四
    108,趙屋面
    
  • 代碼
    import org.apache.flink.api.common.state.BroadcastState;
    import org.apache.flink.api.common.state.MapStateDescriptor;
    import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
    import org.apache.flink.api.common.typeinfo.TypeHint;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.tuple.Tuple3;
    import org.apache.flink.streaming.api.datastream.*;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
    import org.apache.flink.util.Collector;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/11* @Description: 多流操作-廣播流**/
    public class FlinkBroadcast {public static void main(String[] args) throws Exception {// 構建流環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 設置并行度env.setParallelism(3);// 數據集源1作為主流數據(用戶行為日志[id,behavior,date])DataStreamSource<String> sourceStream1 = env.socketTextStream("localhost", 1234);// 將字符串切割處理SingleOutputStreamOperator<Tuple3<String, String, String>> mainSourceStream = sourceStream1.map(str -> Tuple3.of(str.split(",")[0], str.split(",")[1], str.split(",")[2])).returns(new TypeHint<Tuple3<String, String, String>>() {});// 數據源2作為廣播流數據(用戶信息(id,name))DataStreamSource<String> sourceStream2 = env.socketTextStream("localhost", 5678);// 將字符串切割處理SingleOutputStreamOperator<Tuple2<String, String>> mapStream2 = sourceStream2.map(str -> Tuple2.of(str.split(",")[0], str.split(",")[1])).returns(new TypeHint<Tuple2<String, String>>() {});// 將廣播流數據源進行廣播/***參數說明* 這里需要我們傳入一個MapStateDescriptor,其實就是一個Map結構的數據<k,v>* <String, Tuple2<String, String>>,第一個String類型就是廣播流和主流連接的字段,在這個代碼中就是id,由實際業務決定* <String, Tuple2<String, String>>,第二個Tuple2<String, String>就是實際廣播數據流的數據,由實際業務決定* "userInfo"就是給一個名字,這個自定義無強制要求**/// 先構建一個狀態,后面也會使用MapStateDescriptor<String, Tuple2<String, String>> userInfoState = new MapStateDescriptor<>("userInfo", TypeInformation.of(String.class), TypeInformation.of(new TypeHint<Tuple2<String, String>>() {}));BroadcastStream<Tuple2<String, String>> userInfoBroadStream = mapStream2.broadcast(userInfoState);// 將主流數據和廣播流數據使用connect連接/*** 我們將數據轉變成廣播流之后,在Flink中也不知哪個數據流需要使用這個廣播流(userInfoBroadStream),* 這個時候就需要我們自己將主流數據和該廣播流數據進行連接**/BroadcastConnectedStream<Tuple3<String, String, String>, Tuple2<String, String>> connectedStream = mainSourceStream.connect(userInfoBroadStream);/*** 在process()中有兩類函數供我們選擇,KeyedBroadcastProcessFunction和BroadcastProcessFunction,* 這里要注意當"connectedStream"是KeyedStream時選擇KeyedBroadcastProcessFunction* 當"connectedStream"不是KeyedStream時選擇BroadcastProcessFunction就可以.* 使用keyBy算子返回的就是KeyedStream**/SingleOutputStreamOperator<String> resultStream = connectedStream.process(new BroadcastProcessFunction<Tuple3<String, String, String>, Tuple2<String, String>, String>() {// 這個方法寫主流數據處理邏輯@Overridepublic void processElement(Tuple3<String, String, String> value, BroadcastProcessFunction<Tuple3<String, String, String>, Tuple2<String, String>, String>.ReadOnlyContext ctx, Collector<String> out) throws Exception {/*** 要注意,這里我們最好從ReadOnlyContext來獲取廣播狀態數據,因為獲取只讀的狀態數據可以保證數據的安全性,* 如果是通過成員變量的方式獲取可修改的狀態數據,就會存在數據不安全的問題,如在代碼邏輯中出現了對狀態數據* 修改的代碼,那么共享此狀態的并行算子可能看到的狀態數據不一致,就會導致數據錯誤或者代碼報錯.* 而使用ReadOnlyContext就可以保證processElement這個方法中我們只對狀態數據進行讀取.**/ReadOnlyBroadcastState<String, Tuple2<String, String>> broadcastState = ctx.getBroadcastState(userInfoState);if (broadcastState != null) {// 通過主流中的ID作為key獲取廣播變量中的用戶信息Tuple2<String, String> userInfo = broadcastState.get(value.f0);// 輸出數據的形式(id,behavior,date,name)if (userInfo == null) {out.collect(value.f0 + "," + value.f1 + "," + value.f2 + "," + "NULL");} else {out.collect(value.f0 + "," + value.f1 + "," + value.f2 + "," + userInfo.f1);}} else {out.collect(value.f0 + "," + value.f1 + "," + value.f2 + "," + "NULL");}}// 這個方法寫廣播流數據處理邏輯@Overridepublic void processBroadcastElement(Tuple2<String, String> value, BroadcastProcessFunction<Tuple3<String, String, String>, Tuple2<String, String>, String>.Context ctx, Collector<String> out) throws Exception {// 使用Context獲取狀態BroadcastState<String, Tuple2<String, String>> broadcastState = ctx.getBroadcastState(userInfoState);// 將數據存入到狀態中broadcastState.put(value.f0, value);}});// 打印結果resultStream.print();env.execute("Flink broadcast");}
    }
    
  • 結果
    3> 101,瀏覽商品,2023-08-02,小明
    3> 101,商品加入購物車,2023-08-03,小明
    3> 102,申請延期發貨,2023-08-03,張麗
    3> 104,下單,2023-08-05,王二虎
    3> 106,瀏覽商品,2023-08-07,李四
    1> 102,瀏覽商品,2023-08-02,張麗
    1> 101,從購物車刪除商品,2023-08-03,小明
    1> 103,點擊商品詳情頁,2023-08-04,公孫飛天
    1> 104,付款,2023-08-06,王二虎
    1> 106,加入購物車,2023-08-08,李四
    2> 103,查看商品價格,2023-08-04,公孫飛天
    2> 102,下單,2023-08-02,張麗
    2> 104,點擊收藏,2023-08-05,王二虎
    2> 105,瀏覽商品,2023-08-07,NULL
    2> 107,瀏覽商品,2023-08-10,NULL
    
    代碼內容就不進行詳細解釋了,注釋基本都寫清楚了,如有疑問可評論提問,共同探討.

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/36623.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/36623.shtml
英文地址,請注明出處:http://en.pswp.cn/news/36623.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

批處理自動切換ip地址與網絡的啟用、禁用

啟用禁用網絡 echo off :: BatchGotAdmin :------------------------------------- REM --> Check for permissions >nul 2>&1 "%SYSTEMROOT%\system32\cacls.exe" "%SYSTEMROOT%\system32\config\system" REM --> If error flag set,…

什么是微服務?

2.微服務的優缺點 優點 單一職責原則每個服務足夠內聚&#xff0c;足夠小&#xff0c;代碼容易理解&#xff0c;這樣能聚焦一個指定的業務功能或業務需求&#xff1b;開發簡單&#xff0c;開發效率提高&#xff0c;一個服務可能就是專一的只干一件事&#xff1b;微服務能夠被小…

命令提示符之操作基礎(Windows)

打開命令提示符 方法一 打開指定文件的文件夾&#xff0c;在路徑欄里輸入“cmd”&#xff0c;回車&#xff0c;就進入控制臺了。默認路徑就是指定文件夾的路徑。 方法二 打開指定的文件夾&#xff0c;按住shift鍵&#xff0c;在空白處右擊&#xff0c;在菜單欄中選擇“在此處打…

社區團購商城拼團秒殺接龍分銷團長小程序開源版開發

社區團購商城拼團秒殺接龍分銷團長小程序開源版開發 功能介紹&#xff1a; 商品管理&#xff1a;增加商品-商品列表-商品分類-商品單/多規格-商品標簽 訂單管理&#xff1a;訂單列表-訂單挑選-訂單導出-訂單打印-批量發貨-商品評價 會員管理&#xff1a;會員列表-會員挑選-會員…

【Git】—— 標簽管理

目錄 &#xff08;一&#xff09;理解標簽 1、作用 &#xff08;二&#xff09;創建標簽 &#xff08;三&#xff09;操作標簽 1、刪除標簽 2、推送標簽 3、刪除遠程標簽 &#xff08;一&#xff09;理解標簽 標簽 tag &#xff0c;可以簡單的理解為是對某次 commit 的…

python中的迭代器和生成器

一、迭代器 支持迭代的容器&#xff0c;如列表&#xff08;list&#xff09;、元組&#xff08;tuple&#xff09;、字典&#xff08;dict&#xff09;、集合&#xff08;set&#xff09;這些序列式容器。 自定義迭代器的類中必須實現以下2個方法&#xff1a; __next__(self)…

監控Kubernetes 控制面組件的關鍵指標

控制面組件的監控&#xff0c;包括 APIServer、Controller-manager&#xff08;簡稱 CM&#xff09;、Scheduler、etcd 四個組件。 1、APIServer APIServer 的核心職能是 Kubernetes 集群的 API 總入口&#xff0c;Kube-Proxy、Kubelet、Controller-Manager、Scheduler 等都需…

【字符串】649. Dota2 參議院

649. Dota2 參議院 解題思路 R true 表示循環結束之后 字符串仍然存在 Rflag > 0 說明R在D之前出現 R可以消滅Dflag < 0 說明D在R之前出現 D 可以消滅R一旦其中有一個為false 說明只剩下R 或者D 那么就可以決定誰獲勝遍歷字符串 如果當前字符是R 判斷flag 如果flag &l…

‘open3d.open3d.geometry.PointCloud‘ object has no attribute ‘voxel_down_sample‘

scene_cloud open3d.geometry.PointCloud() scene_cloud.points open3d.utility.Vector3dVector(scene_points) scene_cloud scene_cloud.voxel_down_sample(voxel_size) 執行上面代碼第三句報錯&#xff0c;出現了下面這個錯誤&#xff1a; AttributeError: open3d.open…

TCP 協議十大相關特性總結

目錄 一、TCP特性 二、報文格式 TCP十大核心特性 1. 確認應答 2. 超時重傳 3. 連接管理(三次握手,四次揮手) 三次握手 四次揮手 4. 滑動窗口 情況一:接收方的ACK丟失 情況二:發送方的數據包丟失 5. 流量控制 6. 擁塞控制 7. 延遲應答 8. 捎帶應答 9. 字節流粘包問題 10. TCP的…

k8s--使用cornJob定時執行sql文件

CronJob apiVersion: batch/v1beta1 kind: CronJob metadata:name: hello spec:schedule: "0 * * * *"jobTemplate:spec:template:spec:containers:- name: postgres-alpineimage: xxxximagePullPolicy: IfNotPresentcommand:- psql- -h- 數據庫服務地址- -d- 數據庫…

大語言模型:LLM的概念是個啥?

一、說明 大語言模型&#xff08;維基&#xff1a;LLM- large language model&#xff09;是以大尺寸為特征的語言模型。它們的規模是由人工智能加速器實現的&#xff0c;人工智能加速器能夠處理大量文本數據&#xff0c;這些數據大部分是從互聯網上抓取的。 [1]所構建的人工神…

02 - git 文件重命名

查看所有文章鏈接&#xff1a;&#xff08;更新中&#xff09;GIT常用場景- 目錄 文章目錄 1. 第一種方式2. 第二種方式 1. 第一種方式 mv kongfu_person.txt kongfu.txt git add .2. 第二種方式 git mv kongfu_person.txt kongfu.txt

微服務實戰項目-學成在線-項目優化(redis緩存優化)

微服務實戰項目-學成在線-項目優化(redis緩存優化) 1 優化需求 視頻播放頁面用戶未登錄也可以訪問&#xff0c;當用戶觀看試學課程時需要請求服務端查詢數據&#xff0c;接口如下&#xff1a; 1、根據課程id查詢課程信息。 2、根據文件id查詢視頻信息。 這些接口在用戶未認…

Unity 人物連招(三段連擊)

一&#xff1a; 連招思路 首先人物角色上有三個攻擊實例對象 Damage,每一個damage定義了攻擊的傷害值&#xff0c;攻擊距離&#xff0c;觸發器名稱&#xff0c;傷害的發起者&#xff0c;攻擊持續時間&#xff0c;攻擊重置時間&#xff0c;傷害的碰撞框大小等字段&#xff1a; …

【WordPress】給你一萬個不使用WP-Cron定時機制的理由

這篇文章也可以在我的博客中查看 定時任務 cron Cron是Unix/Linux系統中的任務調度工具&#xff0c;允許用戶在預定的時間和日期間隔自動運行命令或腳本 它通過Cron表達式定義任務執行的頻率&#xff0c;該表達式包含分鐘、小時、日期等信息 我們可以利用Cron來定期執行維護…

MySQL表的增刪查改

目錄 一&#xff0c;新增 二&#xff0c;查詢 2.1 全列查詢 2.2 指定列查詢 2.3 查詢字段為表達式 2.4 別名 - as 2.5 去重 - distinct 2.6 排序 - order by 2.7 條件查詢 - where 2.8 分頁查詢 - limit 三&#xff0c;修改 - update 四&#xff0c;刪除 - delete 一…

@Mapper POJO 與DTO之間的class屬性轉換映射

Mapper注解基于mapStract 框架實現對象轉換&#xff1a;MapStract java bean 屬性轉換映射 引用轉自&#xff1a;org.mapstruct:mapstruct 包&#xff08;Mapper、Mapping&#xff09;的使用 依賴包&#xff1a; <!--mapStruct依賴--> <dependency><groupId&g…

Spring-2-透徹理解Spring 注解方式創建Bean--IOC

今日目標 學習使用XML配置第三方Bean 掌握純注解開發定義Bean對象 掌握純注解開發IOC模式 1. 第三方資源配置管理 說明&#xff1a;以管理DataSource連接池對象為例講解第三方資源配置管理 1.1 XML管理Druid連接池(第三方Bean)對象【重點】 數據庫準備 -- 創建數據庫 create …

純前端 -- html轉pdf插件總結

一、html2canvasjsPDF&#xff08;文字會被截斷&#xff09;&#xff1a; 將HTML元素呈現給添加到PDF中的畫布對象&#xff0c;不能僅使用jsPDF&#xff0c;需要html2canvas或rasterizeHTML html2canvasjsPDF的具體使用鏈接 二、html2pdf&#xff08;內容顯示不全文字會被截斷…