文章目錄
- 一、Broadcast
- 二、代碼示例
- 三.或者第二種(只讀取一個csv文件到廣播內存中)
提示:以下是本篇文章正文內容,下面案例可供參考
一、Broadcast
為了關聯一個非廣播流(keyed 或者 non-keyed)與一個廣播流(BroadcastStream),我們可以調用非廣播流的方法 connect(),并將 BroadcastStream 當做參數傳入。 這個方法的返回參數是 BroadcastConnectedStream,具有類型方法 process(),傳入一個特殊的 CoProcessFunction 來書寫我們的模式識別邏輯。 具體傳入 process() 的是哪個類型取決于非廣播流的類型:
- 如果流是一個 keyed 流,那就是 KeyedBroadcastProcessFunction 類型;
- 如果流是一個 non-keyed 流,那就是 BroadcastProcessFunction 類型。
1).例如非keyby的要實現兩個方法
public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {//主流 public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;//廣播操作public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
}
2).keyby的
public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> {//主流public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;//廣播public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;//只有keyby的可以onTimer。此方法可以不重寫public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
}
在處理廣播流元素這端,是具有讀寫權限的,而對于處理非廣播流元素這端是只讀的。 這樣做的原因是,Flink 中是不存在跨 task 通訊的。所以為了保證 broadcast state 在所有的并發實例中是一致的,我們在處理廣播流元素的時候給予寫權限,在所有的 task 中均可以看到這些元素,并且要求對這些元素處理是一致的, 那么最終所有 task 得到的 broadcast state 是一致的。
廣播算子是不使用 RocksDB state backend: broadcast state 在運行時保存在內存中,需要保證內存充足。這一特性同樣適用于所有其他 Operator State。
二、代碼示例
此處將本地csv文件加載到內存廣播中
CSV文件的內容是:
1.user_details.csv
1,Alice,30
2,Bob,25
2.user_details03.csv
3,Charlie,35
5,name,5
下面是代碼(下面是將兩個本地CSV文件放到廣播內存中案例)
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;import java.util.HashMap;
import java.util.Map;
public <