寫過Spark批處理的應該都知道,有一個廣播變量broadcast
這樣的一個算子,可以優化我們計算的過程,有效的提高效率;同樣在Flink中也有broadcast
,簡單來說和Spark中的類似,但是有所區別,首先Spark中的broadcast
是靜態的數據,而Flink中的broadcast
是動態的,也就是源源不斷的數據流.在Flink中會將廣播的數據存到state
中.
在Flink中主流數據可以獲取state
中的所有狀態數據,使用過window
的應該都清楚,當兩個streamData
中的數據到達窗口的時間剛好錯過時就會發生關聯不上的情況,如window
是2S
,sreamData1
到達窗口的時間剛好卡在這個2S
窗口的尾端,而streamData
到達窗口時,這個窗口已經結束了,這種情況就算這兩條數據有相同id
也無法進行關聯了.
但是broadcast
會將到達的數據都存儲在state
中,這樣主流到達的每一條數據都可以和state
中的廣播流數據進行關聯比較.
流程圖內容可能不夠準確,只是為了看起來方便理解.
- 數據源
# 主流數據 ? ~ nc -lk 1234 101,瀏覽商品,2023-08-02 102,瀏覽商品,2023-08-02 103,查看商品價格,2023-08-04 101,商品加入購物車,2023-08-03 101,從購物車刪除商品,2023-08-03 102,下單,2023-08-02 102,申請延期發貨,2023-08-03 103,點擊商品詳情頁,2023-08-04 104,點擊收藏,2023-08-05 104,下單,2023-08-05 104,付款,2023-08-06 105,瀏覽商品,2023-08-07 106,瀏覽商品,2023-08-07 106,加入購物車,2023-08-08 107,瀏覽商品,2023-08-10
# 廣播流數據 ? ~ nc -lk 5678 101,小明 102,張麗 103,公孫飛天 104,王二虎 106,李四 108,趙屋面
- 代碼
import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReadOnlyBroadcastState; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.*; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.util.Collector;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/11* @Description: 多流操作-廣播流**/ public class FlinkBroadcast {public static void main(String[] args) throws Exception {// 構建流環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 設置并行度env.setParallelism(3);// 數據集源1作為主流數據(用戶行為日志[id,behavior,date])DataStreamSource<String> sourceStream1 = env.socketTextStream("localhost", 1234);// 將字符串切割處理SingleOutputStreamOperator<Tuple3<String, String, String>> mainSourceStream = sourceStream1.map(str -> Tuple3.of(str.split(",")[0], str.split(",")[1], str.split(",")[2])).returns(new TypeHint<Tuple3<String, String, String>>() {});// 數據源2作為廣播流數據(用戶信息(id,name))DataStreamSource<String> sourceStream2 = env.socketTextStream("localhost", 5678);// 將字符串切割處理SingleOutputStreamOperator<Tuple2<String, String>> mapStream2 = sourceStream2.map(str -> Tuple2.of(str.split(",")[0], str.split(",")[1])).returns(new TypeHint<Tuple2<String, String>>() {});// 將廣播流數據源進行廣播/***參數說明* 這里需要我們傳入一個MapStateDescriptor,其實就是一個Map結構的數據<k,v>* <String, Tuple2<String, String>>,第一個String類型就是廣播流和主流連接的字段,在這個代碼中就是id,由實際業務決定* <String, Tuple2<String, String>>,第二個Tuple2<String, String>就是實際廣播數據流的數據,由實際業務決定* "userInfo"就是給一個名字,這個自定義無強制要求**/// 先構建一個狀態,后面也會使用MapStateDescriptor<String, Tuple2<String, String>> userInfoState = new MapStateDescriptor<>("userInfo", TypeInformation.of(String.class), TypeInformation.of(new TypeHint<Tuple2<String, String>>() {}));BroadcastStream<Tuple2<String, String>> userInfoBroadStream = mapStream2.broadcast(userInfoState);// 將主流數據和廣播流數據使用connect連接/*** 我們將數據轉變成廣播流之后,在Flink中也不知哪個數據流需要使用這個廣播流(userInfoBroadStream),* 這個時候就需要我們自己將主流數據和該廣播流數據進行連接**/BroadcastConnectedStream<Tuple3<String, String, String>, Tuple2<String, String>> connectedStream = mainSourceStream.connect(userInfoBroadStream);/*** 在process()中有兩類函數供我們選擇,KeyedBroadcastProcessFunction和BroadcastProcessFunction,* 這里要注意當"connectedStream"是KeyedStream時選擇KeyedBroadcastProcessFunction* 當"connectedStream"不是KeyedStream時選擇BroadcastProcessFunction就可以.* 使用keyBy算子返回的就是KeyedStream**/SingleOutputStreamOperator<String> resultStream = connectedStream.process(new BroadcastProcessFunction<Tuple3<String, String, String>, Tuple2<String, String>, String>() {// 這個方法寫主流數據處理邏輯@Overridepublic void processElement(Tuple3<String, String, String> value, BroadcastProcessFunction<Tuple3<String, String, String>, Tuple2<String, String>, String>.ReadOnlyContext ctx, Collector<String> out) throws Exception {/*** 要注意,這里我們最好從ReadOnlyContext來獲取廣播狀態數據,因為獲取只讀的狀態數據可以保證數據的安全性,* 如果是通過成員變量的方式獲取可修改的狀態數據,就會存在數據不安全的問題,如在代碼邏輯中出現了對狀態數據* 修改的代碼,那么共享此狀態的并行算子可能看到的狀態數據不一致,就會導致數據錯誤或者代碼報錯.* 而使用ReadOnlyContext就可以保證processElement這個方法中我們只對狀態數據進行讀取.**/ReadOnlyBroadcastState<String, Tuple2<String, String>> broadcastState = ctx.getBroadcastState(userInfoState);if (broadcastState != null) {// 通過主流中的ID作為key獲取廣播變量中的用戶信息Tuple2<String, String> userInfo = broadcastState.get(value.f0);// 輸出數據的形式(id,behavior,date,name)if (userInfo == null) {out.collect(value.f0 + "," + value.f1 + "," + value.f2 + "," + "NULL");} else {out.collect(value.f0 + "," + value.f1 + "," + value.f2 + "," + userInfo.f1);}} else {out.collect(value.f0 + "," + value.f1 + "," + value.f2 + "," + "NULL");}}// 這個方法寫廣播流數據處理邏輯@Overridepublic void processBroadcastElement(Tuple2<String, String> value, BroadcastProcessFunction<Tuple3<String, String, String>, Tuple2<String, String>, String>.Context ctx, Collector<String> out) throws Exception {// 使用Context獲取狀態BroadcastState<String, Tuple2<String, String>> broadcastState = ctx.getBroadcastState(userInfoState);// 將數據存入到狀態中broadcastState.put(value.f0, value);}});// 打印結果resultStream.print();env.execute("Flink broadcast");} }
- 結果
代碼內容就不進行詳細解釋了,注釋基本都寫清楚了,如有疑問可評論提問,共同探討.3> 101,瀏覽商品,2023-08-02,小明 3> 101,商品加入購物車,2023-08-03,小明 3> 102,申請延期發貨,2023-08-03,張麗 3> 104,下單,2023-08-05,王二虎 3> 106,瀏覽商品,2023-08-07,李四 1> 102,瀏覽商品,2023-08-02,張麗 1> 101,從購物車刪除商品,2023-08-03,小明 1> 103,點擊商品詳情頁,2023-08-04,公孫飛天 1> 104,付款,2023-08-06,王二虎 1> 106,加入購物車,2023-08-08,李四 2> 103,查看商品價格,2023-08-04,公孫飛天 2> 102,下單,2023-08-02,張麗 2> 104,點擊收藏,2023-08-05,王二虎 2> 105,瀏覽商品,2023-08-07,NULL 2> 107,瀏覽商品,2023-08-10,NULL