目錄
1、狀態概述
1.1 無狀態算子
1.2 有狀態算子
2、狀態分類
?編輯?2.1 算子狀態
2.1.1?列表狀態(ListState)
2.1.2?聯合列表狀態(UnionListState)
2.1.3?廣播狀態(BroadcastState)
2.2 按鍵分區狀態?
2.2.1?值狀態(ValueState)
2.2.2?列表狀態(ListState)
2.2.3?Map狀態(MapState)
2.2.4?歸約狀態(ReducingState)
2.2.5?聚合狀態(AggregatingState)
2.2.6?狀態生存時間(TTL)
3、狀態后端(State Backends)
3.1?狀態后端的分類(HashMapStateBackend/RocksDB)
3.1.1?哈希表狀態后端(HashMapStateBackend)
3.1.2?內嵌RocksDB狀態后端(EmbeddedRocksDBStateBackend)
3.2?如何選擇正確的狀態后端
3.3?狀態后端的配置
1、狀態概述
1.1 無狀態算子
根據當前的輸入可以直接轉換得到輸出結果,這種鼻子就是無狀態算子,如map,flatMap,filter
1.2 有狀態算子
除當前處理之外,還需要其他處理才能得到計算結果。如聚合算子,窗口算子等
2、狀態分類
????????Flink的狀態有兩種:托管狀態(Managed State)和原始狀態(Raw State)。托管狀態就是由Flink統一管理的,狀態的存儲訪問、故障恢復和重組等一系列問題都由Flink實現,我們只要調接口就可以;而原始狀態則是自定義的,相當于就是開辟了一塊內存,需要我們自己管理,實現狀態的序列化和故障恢復。
????????通常我們采用Flink托管狀態來實現需求。
?
2.1 算子狀態
??????????一個算子任務會按照并行度分為多個并行子任務執行,而不同的子任務會占據不同的任務槽(task slot)。由于不同的slot在計算資源上是物理隔離的,所以Flink能管理的狀態在并行任務間是無法共享的,每個狀態只能針對當前子任務的實例有效。
算子狀態(Operator State)就是一個算子并行實例上定義的狀態,作用范圍被限定為當前算子任務。?????????
算子狀態的實際應用場景不如Keyed State多,一般用在Source或Sink等與外部系統連接的算子上,或者完全沒有key定義的場景。比如Flink的Kafka連接器中,就用到了算子狀態。
算子狀態也支持不同的結構類型,主要有三種:ListState、UnionListState和BroadcastState。
2.1.1?列表狀態(ListState)
與Keyed State中的列表狀態的區別是:在算子狀態的上下文中,不會按鍵(key)分別處理狀態,所以每一個并行子任務上只會保留一個“列表”(list),也就是當前并行子任務上所有狀態項的集合。列表中的狀態項就是可以重新分配的最細粒度,彼此之間完全獨立。
案例實操:在map算子中計算數據的個數。
public class OperatorListStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);env.socketTextStream("hadoop102", 7777).map(new MyCountMapFunction()).print();env.execute();}// TODO 1.實現 CheckpointedFunction 接口public static class MyCountMapFunction implements MapFunction<String, Long>, CheckpointedFunction {private Long count = 0L;private ListState<Long> state;@Overridepublic Long map(String value) throws Exception {return ++count;}/*** TODO 2.本地變量持久化:將 本地變量 拷貝到 算子狀態中,開啟checkpoint時才會調用** @param context* @throws Exception*/@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {System.out.println("snapshotState...");// 2.1 清空算子狀態state.clear();// 2.2 將 本地變量 添加到 算子狀態 中state.add(count);}/*** TODO 3.初始化本地變量:程序啟動和恢復時, 從狀態中 把數據添加到 本地變量,每個子任務調用一次** @param context* @throws Exception*/@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {System.out.println("initializeState...");// 3.1 從 上下文 初始化 算子狀態state = context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("state", Types.LONG));// 3.2 從 算子狀態中 把數據 拷貝到 本地變量if (context.isRestored()) {for (Long c : state.get()) {count += c;}}}}
}
2.1.2?聯合列表狀態(UnionListState)
與ListState類似,聯合列表狀態也會將狀態表示為一個列表。它與常規列表狀態的區別在于,算子并行度進行縮放調整時對于狀態的分配方式不同。
UnionListState的重點就在于“聯合”(union)。在并行度調整時,常規列表狀態是輪詢分配狀態項,而聯合列表狀態的算子則會直接廣播狀態的完整列表。
如果列表中狀態項數量太多,為資源和效率考慮一般不建議使用聯合重組的方式。
使用方式同ListState,區別在:getUnionListState(new ListStateDescriptor<Long>("union-state", Types.LONG));
state = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<Long>("union-state", Types.LONG));
2.1.3?廣播狀態(BroadcastState)
有時我們希望算子并行子任務都保持同一份“全局”狀態,用來做統一的配置和規則設定。
案例實操:水位超過指定的閾值發送告警,閾值可以動態修改。
public class OperatorBroadcastStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 數據流SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction());// 配置流(用來廣播配置)DataStreamSource<String> configDS = env.socketTextStream("hadoop102", 8888);// TODO 1. 將 配置流 廣播MapStateDescriptor<String, Integer> broadcastMapState = new MapStateDescriptor<>("broadcast-state", Types.STRING, Types.INT);BroadcastStream<String> configBS = configDS.broadcast(broadcastMapState);// TODO 2.把 數據流 和 廣播后的配置流 connectBroadcastConnectedStream<WaterSensor, String> sensorBCS = sensorDS.connect(configBS);// TODO 3.調用 processsensorBCS.process(new BroadcastProcessFunction<WaterSensor, String, String>() {/*** 數據流的處理方法: 數據流 只能 讀取 廣播狀態,不能修改* @param value* @param ctx* @param out* @throws Exception*/@Overridepublic void processElement(WaterSensor value, ReadOnlyContext ctx, Collector<String> out) throws Exception {// TODO 5.通過上下文獲取廣播狀態,取出里面的值(只讀,不能修改)ReadOnlyBroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broadcastMapState);Integer threshold = broadcastState.get("threshold");// 判斷廣播狀態里是否有數據,因為剛啟動時,可能是數據流的第一條數據先來threshold = (threshold == null ? 0 : threshold);if (value.getVc() > threshold) {out.collect(value + ",水位超過指定的閾值:" + threshold + "!!!");}}/*** 廣播后的配置流的處理方法: 只有廣播流才能修改 廣播狀態* @param value* @param ctx* @param out* @throws Exception*/@Overridepublic void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {// TODO 4. 通過上下文獲取廣播狀態,往里面寫數據BroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broadcastMapState);broadcastState.put("threshold", Integer.valueOf(value));}}).print();env.execute();}
}
2.2 按鍵分區狀態?
而很多有狀態的操作(比如聚合、窗口)都是要先做keyBy進行按鍵分區的。按鍵分區之后,任務所進行的所有計算都應該只針對當前key有效,所以狀態也應該按照key彼此隔離。
它的特點非常鮮明,就是以key為作用范圍進行隔離。
需要注意,使用Keyed State必須基于KeyedStream。沒有進行keyBy分區的DataStream,即使轉換算子實現了對應的富函數類,也不能通過運行時上下文訪問Keyed State。
2.2.1?值狀態(ValueState)
public interface ValueState<T> extends State {T value() throws IOException;void update(T value) throws IOException;
}
- T value():獲取當前狀態的值;
- update(T?value):對狀態進行更新,傳入的參數value就是要覆寫的狀態值。
????????在具體使用時,為了讓運行時上下文清楚到底是哪個狀態,我們還需要創建一個“狀態描述器”(StateDescriptor)來提供狀態的基本信息。例如源碼中,ValueState的狀態描述器構造方法如下:
public ValueStateDescriptor(String name, Class<T> typeClass) {super(name, typeClass, null);
}
案例需求:檢測每種傳感器的水位值,如果連續的兩個水位值超過10,就輸出報警。
public class KeyedValueStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) -> element.getTs() * 1000L));sensorDS.keyBy(r -> r.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {// TODO 1.定義狀態ValueState<Integer> lastVcState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// TODO 2.在open方法中,初始化狀態// 狀態描述器兩個參數:第一個參數,起個名字,不重復;第二個參數,存儲的類型lastVcState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("lastVcState", Types.INT));}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
// lastVcState.value(); // 取出 本組 值狀態 的數據
// lastVcState.update(); // 更新 本組 值狀態 的數據
// lastVcState.clear(); // 清除 本組 值狀態 的數據// 1. 取出上一條數據的水位值(Integer默認值是null,判斷)int lastVc = lastVcState.value() == null ? 0 : lastVcState.value();// 2. 求差值的絕對值,判斷是否超過10Integer vc = value.getVc();if (Math.abs(vc - lastVc) > 10) {out.collect("傳感器=" + value.getId() + "==>當前水位值=" + vc + ",與上一條水位值=" + lastVc + ",相差超過10!!!!");}// 3. 更新狀態里的水位值lastVcState.update(vc);}}).print();env.execute();}
2.2.2?列表狀態(ListState)
將需要保存的數據,以列表(List)的形式組織起來。在ListState<T>接口中同樣有一個類型參數T,表示列表中數據的類型。ListState也提供了一系列的方法來操作狀態,使用方式與一般的List非常相似。
- Iterable<T> get():獲取當前的列表狀態,返回的是一個可迭代類型Iterable<T>;
- update(List<T>?values):傳入一個列表values,直接對狀態進行覆蓋;
- add(T?value):在狀態列表中添加一個元素value;
- addAll(List<T>?values):向列表中添加多個元素,以列表values形式傳入。
類似地,ListState的狀態描述器就叫作ListStateDescriptor,用法跟ValueStateDescriptor完全一致。
案例:針對每種傳感器輸出最高的3個水位值
public class KeyedListStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) -> element.getTs() * 1000L));sensorDS.keyBy(r -> r.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {ListState<Integer> vcListState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcListState = getRuntimeContext().getListState(new ListStateDescriptor<Integer>("vcListState", Types.INT));}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {// 1.來一條,存到list狀態里vcListState.add(value.getVc());// 2.從list狀態拿出來(Iterable), 拷貝到一個List中,排序, 只留3個最大的Iterable<Integer> vcListIt = vcListState.get();// 2.1 拷貝到List中List<Integer> vcList = new ArrayList<>();for (Integer vc : vcListIt) {vcList.add(vc);}// 2.2 對List進行降序排序vcList.sort((o1, o2) -> o2 - o1);// 2.3 只保留最大的3個(list中的個數一定是連續變大,一超過3就立即清理即可)if (vcList.size() > 3) {// 將最后一個元素清除(第4個)vcList.remove(3);}out.collect("傳感器id為" + value.getId() + ",最大的3個水位值=" + vcList.toString());// 3.更新list狀態vcListState.update(vcList);// vcListState.get(); //取出 list狀態 本組的數據,是一個Iterable
// vcListState.add(); // 向 list狀態 本組 添加一個元素
// vcListState.addAll(); // 向 list狀態 本組 添加多個元素
// vcListState.update(); // 更新 list狀態 本組數據(覆蓋)
// vcListState.clear(); // 清空List狀態 本組數據}}).print();env.execute();}
}
2.2.3?Map狀態(MapState)
把一些鍵值對(key-value)作為狀態整體保存起來,可以認為就是一組key-value映射的列表。
MapState提供了操作映射狀態的方法,與Map的使用非常類似。
- UV?get(UK?key):傳入一個key作為參數,查詢對應的value值;
- put(UK?key, UV?value):傳入一個鍵值對,更新key對應的value值;
- putAll(Map<UK, UV>?map):將傳入的映射map中所有的鍵值對,全部添加到映射狀態中;
- remove(UK key):將指定key對應的鍵值對刪除;
- boolean contains(UK key):判斷是否存在指定的key,返回一個boolean值。
另外,MapState也提供了獲取整個映射相關信息的方法;
- Iterable<Map.Entry<UK, UV>>?entries():獲取映射狀態中所有的鍵值對;
- Iterable<UK>?keys():獲取映射狀態中所有的鍵(key),返回一個可迭代Iterable類型;
- Iterable<UV>?values():獲取映射狀態中所有的值(value),返回一個可迭代Iterable類型;
- boolean?isEmpty():判斷映射是否為空,返回一個boolean值。
案例需求:統計每種傳感器每種水位值出現的次數。
public class KeyedMapStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) -> element.getTs() * 1000L));sensorDS.keyBy(r -> r.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {MapState<Integer, Integer> vcCountMapState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcCountMapState = getRuntimeContext().getMapState(new MapStateDescriptor<Integer, Integer>("vcCountMapState", Types.INT, Types.INT));}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {// 1.判斷是否存在vc對應的keyInteger vc = value.getVc();if (vcCountMapState.contains(vc)) {// 1.1 如果包含這個vc的key,直接對value+1Integer count = vcCountMapState.get(vc);vcCountMapState.put(vc, ++count);} else {// 1.2 如果不包含這個vc的key,初始化put進去vcCountMapState.put(vc, 1);}// 2.遍歷Map狀態,輸出每個k-v的值StringBuilder outStr = new StringBuilder();outStr.append("======================================\n");outStr.append("傳感器id為" + value.getId() + "\n");for (Map.Entry<Integer, Integer> vcCount : vcCountMapState.entries()) {outStr.append(vcCount.toString() + "\n");}outStr.append("======================================\n");out.collect(outStr.toString());// vcCountMapState.get(); // 對本組的Map狀態,根據key,獲取value
// vcCountMapState.contains(); // 對本組的Map狀態,判斷key是否存在
// vcCountMapState.put(, ); // 對本組的Map狀態,添加一個 鍵值對
// vcCountMapState.putAll(); // 對本組的Map狀態,添加多個 鍵值對
// vcCountMapState.entries(); // 對本組的Map狀態,獲取所有鍵值對
// vcCountMapState.keys(); // 對本組的Map狀態,獲取所有鍵
// vcCountMapState.values(); // 對本組的Map狀態,獲取所有值
// vcCountMapState.remove(); // 對本組的Map狀態,根據指定key,移除鍵值對
// vcCountMapState.isEmpty(); // 對本組的Map狀態,判斷是否為空
// vcCountMapState.iterator(); // 對本組的Map狀態,獲取迭代器
// vcCountMapState.clear(); // 對本組的Map狀態,清空}}).print();env.execute();}
}
2.2.4?歸約狀態(ReducingState)
歸約邏輯的定義,是在歸約狀態描述器(ReducingStateDescriptor)中,通過傳入一個歸約函數(ReduceFunction)來實現的。這里的歸約函數,就是我們之前介紹reduce聚合算子時講到的ReduceFunction,所以狀態類型跟輸入的數據類型是一樣的。
public ReducingStateDescriptor(String name, ReduceFunction<T> reduceFunction, Class<T> typeClass) {...}
案例:計算每種傳感器的水位和
.process(new KeyedProcessFunction<String, WaterSensor, Integer>() {private ReducingState<Integer> sumVcState;@Overridepublic void open(Configuration parameters) throws Exception {sumVcState = this.getRuntimeContext().getReducingState(new ReducingStateDescriptor<Integer>("sumVcState",Integer::sum,Integer.class));}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<Integer> out) throws Exception {sumVcState.add(value.getVc());out.collect(sumVcState.get());}
})
2.2.5?聚合狀態(AggregatingState)
與歸約狀態非常類似,聚合狀態也是一個值,用來保存添加進來的所有數據的聚合結果。與ReducingState不同的是,它的聚合邏輯是由在描述器中傳入一個更加一般化的聚合函數(AggregateFunction)來定義的;這也就是之前我們講過的AggregateFunction,里面通過一個累加器(Accumulator)來表示狀態,所以聚合的狀態類型可以跟添加進來的數據類型完全不同,使用更加靈活。
案例需求:計算每種傳感器的平均水位
public class KeyedAggregatingStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) -> element.getTs() * 1000L));sensorDS.keyBy(r -> r.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {AggregatingState<Integer, Double> vcAvgAggregatingState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcAvgAggregatingState = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<Integer, Tuple2<Integer, Integer>, Double>("vcAvgAggregatingState",new AggregateFunction<Integer, Tuple2<Integer, Integer>, Double>() {@Overridepublic Tuple2<Integer, Integer> createAccumulator() {return Tuple2.of(0, 0);}@Overridepublic Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) {return Tuple2.of(accumulator.f0 + value, accumulator.f1 + 1);}@Overridepublic Double getResult(Tuple2<Integer, Integer> accumulator) {return accumulator.f0 * 1D / accumulator.f1;}@Overridepublic Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
// return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);return null;}},Types.TUPLE(Types.INT, Types.INT)));}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {// 將 水位值 添加到 聚合狀態中vcAvgAggregatingState.add(value.getVc());// 從 聚合狀態中 獲取結果Double vcAvg = vcAvgAggregatingState.get();out.collect("傳感器id為" + value.getId() + ",平均水位值=" + vcAvg);// vcAvgAggregatingState.get(); // 對 本組的聚合狀態 獲取結果
// vcAvgAggregatingState.add(); // 對 本組的聚合狀態 添加數據,會自動進行聚合
// vcAvgAggregatingState.clear(); // 對 本組的聚合狀態 清空數據}}).print();env.execute();}
}
2.2.6?狀態生存時間(TTL)
在實際應用中,很多狀態會隨著時間的推移逐漸增長,如果不加以限制,最終就會導致存儲空間的耗盡。
配置狀態的TTL時,需要創建一個StateTtlConfig配置對象,然后調用狀態描述器的.enableTimeToLive()方法啟動TTL功能。
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(10)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("my state", String.class);stateDescriptor.enableTimeToLive(ttlConfig);
這里用到了幾個配置項:
- .newBuilder()
狀態TTL配置的構造器方法,必須調用,返回一個Builder之后再調用.build()方法就可以得到StateTtlConfig了。方法需要傳入一個Time作為參數,這就是設定的狀態生存時間。
- .setUpdateType()
設置更新類型。更新類型指定了什么時候更新狀態失效時間,這里的OnCreateAndWrite表示只有創建狀態和更改狀態(寫操作)時更新失效時間。另一種類型OnReadAndWrite則表示無論讀寫操作都會更新失效時間,也就是只要對狀態進行了訪問,就表明它是活躍的,從而延長生存時間。這個配置默認為OnCreateAndWrite。
- .setStateVisibility()
設置狀態的可見性。所謂的“狀態可見性”,是指因為清除操作并不是實時的,所以當狀態過期之后還有可能繼續存在,這時如果對它進行訪問,能否正常讀取到就是一個問題了。這里設置的NeverReturnExpired是默認行為,表示從不返回過期值,也就是只要過期就認為它已經被清除了,應用不能繼續讀取;這在處理會話或者隱私數據時比較重要。對應的另一種配置是ReturnExpireDefNotCleanedUp,就是如果過期狀態還存在,就返回它的值。
public class StateTTLDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) -> element.getTs() * 1000L));sensorDS.keyBy(r -> r.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {ValueState<Integer> lastVcState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// TODO 1.創建 StateTtlConfigStateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.seconds(5)) // 過期時間5s
// .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 狀態 創建和寫入(更新) 更新 過期時間.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite) // 狀態 讀取、創建和寫入(更新) 更新 過期時間.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 不返回過期的狀態值.build();// TODO 2.狀態描述器 啟用 TTLValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("lastVcState", Types.INT);stateDescriptor.enableTimeToLive(stateTtlConfig);this.lastVcState = getRuntimeContext().getState(stateDescriptor);}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {// 先獲取狀態值,打印 ==》 讀取狀態Integer lastVc = lastVcState.value();out.collect("key=" + value.getId() + ",狀態值=" + lastVc);// 如果水位大于10,更新狀態值 ===》 寫入狀態if (value.getVc() > 10) {lastVcState.update(value.getVc());}}}).print();env.execute();}
}
3、狀態后端(State Backends)
在Flink中,狀態的存儲、訪問以及維護,都是由一個可插拔的組件決定的,這個組件就叫作狀態后端(state backend)。狀態后端主要負責管理本地狀態的存儲方式和位置。
3.1?狀態后端的分類(HashMapStateBackend/RocksDB)
Flink中提供了兩類不同的狀態后端,一種是“哈希表狀態后端”(HashMapStateBackend),另一種是“內嵌RocksDB狀態后端”(EmbeddedRocksDBStateBackend)。
系統默認的狀態后端是HashMapStateBackend。
3.1.1?哈希表狀態后端(HashMapStateBackend)
HashMapStateBackend是把狀態存放在內存里。具體實現上,哈希表狀態后端在內部會直接把狀態當作對象(objects),保存在Taskmanager的JVM堆上。
普通的狀態,以及窗口中收集的數據和觸發器,都會以鍵值對的形式存儲起來,所以底層是一個哈希表(HashMap),這種狀態后端也因此得名。
3.1.2?內嵌RocksDB狀態后端(EmbeddedRocksDBStateBackend)
RocksDB是一種內嵌的key-value存儲介質,可以把數據持久化到本地硬盤。
配置EmbeddedRocksDBStateBackend后,會將處理中的數據全部放入RocksDB數據庫中,RocksDB默認存儲在TaskManager的本地數據目錄里。
3.2?如何選擇正確的狀態后端
HashMap和RocksDB兩種狀態后端最大的區別,就在于本地狀態存放在哪里。
HashMapStateBackend是內存計算,讀寫速度非常快;但是,狀態的大小會受到集群可用內存的限制,如果應用的狀態隨著時間不停地增長,就會耗盡內存資源。
而RocksDB是硬盤存儲,所以可以根據可用的磁盤空間進行擴展,所以它非常適合于超級海量狀態的存儲。不過由于每個狀態的讀寫都需要做序列化/反序列化,而且可能需要直接從磁盤讀取數據,這就會導致性能的降低,平均讀寫性能要比HashMapStateBackend慢一個數量級。
3.3?狀態后端的配置
3.3.1?配置默認的狀態后端
#flink-conf.yaml# 默認狀態后端
state.backend: hashmap# 存放檢查點的文件路徑
# 這里的state.checkpoints.dir配置項,定義了檢查點和元數據寫入的目錄。
state.checkpoints.dir: hdfs://hadoop102:8020/flink/checkpoints
3.3.2?為每個作業(Per-job/Application)單獨配置狀態后端
通過執行環境設置,HashMapStateBackend。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStateBackend(new HashMapStateBackend());
通過執行環境設置,EmbeddedRocksDBStateBackend。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStateBackend(new EmbeddedRocksDBStateBackend());
需要注意,如果想在IDE中使用EmbeddedRocksDBStateBackend,需要為Flink項目添加依賴:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb</artifactId><version>${flink.version}</version>
</dependency>