【Flink】狀態管理

目錄

1、狀態概述

1.1 無狀態算子

1.2 有狀態算子

2、狀態分類

?編輯?2.1 算子狀態

2.1.1?列表狀態(ListState)

2.1.2?聯合列表狀態(UnionListState)

2.1.3?廣播狀態(BroadcastState)

2.2 按鍵分區狀態?

2.2.1?值狀態(ValueState)

2.2.2?列表狀態(ListState)

2.2.3?Map狀態(MapState)

2.2.4?歸約狀態(ReducingState)

2.2.5?聚合狀態(AggregatingState)

2.2.6?狀態生存時間(TTL)

3、狀態后端(State Backends)

3.1?狀態后端的分類(HashMapStateBackend/RocksDB)

3.1.1?哈希表狀態后端(HashMapStateBackend)

3.1.2?內嵌RocksDB狀態后端(EmbeddedRocksDBStateBackend)

3.2?如何選擇正確的狀態后端

3.3?狀態后端的配置


1、狀態概述

1.1 無狀態算子

根據當前的輸入可以直接轉換得到輸出結果,這種鼻子就是無狀態算子,如map,flatMap,filter

1.2 有狀態算子

除當前處理之外,還需要其他處理才能得到計算結果。如聚合算子,窗口算子等

2、狀態分類

????????Flink的狀態有兩種:托管狀態(Managed State)原始狀態(Raw State)托管狀態就是由Flink統一管理的,狀態的存儲訪問、故障恢復和重組等一系列問題都由Flink實現,我們只要調接口就可以;而原始狀態則是自定義的,相當于就是開辟了一塊內存,需要我們自己管理,實現狀態的序列化和故障恢復。

????????通常我們采用Flink托管狀態來實現需求。

?

2.1 算子狀態

??????????一個算子任務會按照并行度分為多個并行子任務執行,而不同的子任務會占據不同的任務槽(task slot)。由于不同的slot在計算資源上是物理隔離的,所以Flink能管理的狀態在并行任務間是無法共享的,每個狀態只能針對當前子任務的實例有效。

算子狀態(Operator State)就是一個算子并行實例上定義的狀態,作用范圍被限定為當前算子任務。?????????

算子狀態的實際應用場景不如Keyed State多,一般用在Source或Sink等與外部系統連接的算子上,或者完全沒有key定義的場景。比如Flink的Kafka連接器中,就用到了算子狀態。

算子狀態也支持不同的結構類型,主要有三種:ListState、UnionListState和BroadcastState。

2.1.1?列表狀態(ListState)

與Keyed State中的列表狀態的區別是:在算子狀態的上下文中,不會按鍵(key)分別處理狀態,所以每一個并行子任務上只會保留一個“列表”(list),也就是當前并行子任務上所有狀態項的集合。列表中的狀態項就是可以重新分配的最細粒度,彼此之間完全獨立。

案例實操:在map算子中計算數據的個數。

public class OperatorListStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);env.socketTextStream("hadoop102", 7777).map(new MyCountMapFunction()).print();env.execute();}// TODO 1.實現 CheckpointedFunction 接口public static class MyCountMapFunction implements MapFunction<String, Long>, CheckpointedFunction {private Long count = 0L;private ListState<Long> state;@Overridepublic Long map(String value) throws Exception {return ++count;}/*** TODO 2.本地變量持久化:將 本地變量 拷貝到 算子狀態中,開啟checkpoint時才會調用** @param context* @throws Exception*/@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {System.out.println("snapshotState...");// 2.1 清空算子狀態state.clear();// 2.2 將 本地變量 添加到 算子狀態 中state.add(count);}/*** TODO 3.初始化本地變量:程序啟動和恢復時, 從狀態中 把數據添加到 本地變量,每個子任務調用一次** @param context* @throws Exception*/@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {System.out.println("initializeState...");// 3.1 從 上下文 初始化 算子狀態state = context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("state", Types.LONG));// 3.2 從 算子狀態中 把數據 拷貝到 本地變量if (context.isRestored()) {for (Long c : state.get()) {count += c;}}}}
}

2.1.2?聯合列表狀態(UnionListState)

與ListState類似,聯合列表狀態也會將狀態表示為一個列表。它與常規列表狀態的區別在于,算子并行度進行縮放調整時對于狀態的分配方式不同。

UnionListState的重點就在于“聯合”(union)。在并行度調整時,常規列表狀態是輪詢分配狀態項,而聯合列表狀態的算子則會直接廣播狀態的完整列表。

如果列表中狀態項數量太多,為資源和效率考慮一般不建議使用聯合重組的方式。

使用方式同ListState,區別在:getUnionListState(new ListStateDescriptor<Long>("union-state", Types.LONG));

state = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<Long>("union-state", Types.LONG));

2.1.3?廣播狀態(BroadcastState)

有時我們希望算子并行子任務都保持同一份“全局”狀態,用來做統一的配置和規則設定。

案例實操:水位超過指定的閾值發送告警,閾值可以動態修改

public class OperatorBroadcastStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 數據流SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction());// 配置流(用來廣播配置)DataStreamSource<String> configDS = env.socketTextStream("hadoop102", 8888);// TODO 1. 將 配置流 廣播MapStateDescriptor<String, Integer> broadcastMapState = new MapStateDescriptor<>("broadcast-state", Types.STRING, Types.INT);BroadcastStream<String> configBS = configDS.broadcast(broadcastMapState);// TODO 2.把 數據流 和 廣播后的配置流 connectBroadcastConnectedStream<WaterSensor, String> sensorBCS = sensorDS.connect(configBS);// TODO 3.調用 processsensorBCS.process(new BroadcastProcessFunction<WaterSensor, String, String>() {/*** 數據流的處理方法: 數據流 只能 讀取 廣播狀態,不能修改* @param value* @param ctx* @param out* @throws Exception*/@Overridepublic void processElement(WaterSensor value, ReadOnlyContext ctx, Collector<String> out) throws Exception {// TODO 5.通過上下文獲取廣播狀態,取出里面的值(只讀,不能修改)ReadOnlyBroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broadcastMapState);Integer threshold = broadcastState.get("threshold");// 判斷廣播狀態里是否有數據,因為剛啟動時,可能是數據流的第一條數據先來threshold = (threshold == null ? 0 : threshold);if (value.getVc() > threshold) {out.collect(value + ",水位超過指定的閾值:" + threshold + "!!!");}}/*** 廣播后的配置流的處理方法:  只有廣播流才能修改 廣播狀態* @param value* @param ctx* @param out* @throws Exception*/@Overridepublic void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {// TODO 4. 通過上下文獲取廣播狀態,往里面寫數據BroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broadcastMapState);broadcastState.put("threshold", Integer.valueOf(value));}}).print();env.execute();}
}

2.2 按鍵分區狀態?

而很多有狀態的操作(比如聚合、窗口)都是要先做keyBy進行按鍵分區的。按鍵分區之后,任務所進行的所有計算都應該只針對當前key有效,所以狀態也應該按照key彼此隔離。

它的特點非常鮮明,就是以key為作用范圍進行隔離。

需要注意,使用Keyed State必須基于KeyedStream。沒有進行keyBy分區的DataStream,即使轉換算子實現了對應的富函數類,也不能通過運行時上下文訪問Keyed State。

2.2.1?值狀態(ValueState)

public interface ValueState<T> extends State {T value() throws IOException;void update(T value) throws IOException;
}
  • T value():獲取當前狀態的值;
  • update(T?value):對狀態進行更新,傳入的參數value就是要覆寫的狀態值。

????????在具體使用時,為了讓運行時上下文清楚到底是哪個狀態,我們還需要創建一個“狀態描述器”(StateDescriptor)來提供狀態的基本信息。例如源碼中,ValueState的狀態描述器構造方法如下:

public ValueStateDescriptor(String name, Class<T> typeClass) {super(name, typeClass, null);
}

案例需求:檢測每種傳感器的水位值,如果連續的兩個水位值超過10,就輸出報警。

public class KeyedValueStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) -> element.getTs() * 1000L));sensorDS.keyBy(r -> r.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {// TODO 1.定義狀態ValueState<Integer> lastVcState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// TODO 2.在open方法中,初始化狀態// 狀態描述器兩個參數:第一個參數,起個名字,不重復;第二個參數,存儲的類型lastVcState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("lastVcState", Types.INT));}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
//                                lastVcState.value();  // 取出 本組 值狀態 的數據
//                                lastVcState.update(); // 更新 本組 值狀態 的數據
//                                lastVcState.clear();  // 清除 本組 值狀態 的數據// 1. 取出上一條數據的水位值(Integer默認值是null,判斷)int lastVc = lastVcState.value() == null ? 0 : lastVcState.value();// 2. 求差值的絕對值,判斷是否超過10Integer vc = value.getVc();if (Math.abs(vc - lastVc) > 10) {out.collect("傳感器=" + value.getId() + "==>當前水位值=" + vc + ",與上一條水位值=" + lastVc + ",相差超過10!!!!");}// 3. 更新狀態里的水位值lastVcState.update(vc);}}).print();env.execute();}

2.2.2?列表狀態(ListState)

將需要保存的數據,以列表(List)的形式組織起來。在ListState<T>接口中同樣有一個類型參數T,表示列表中數據的類型。ListState也提供了一系列的方法來操作狀態,使用方式與一般的List非常相似。

  • Iterable<T> get():獲取當前的列表狀態,返回的是一個可迭代類型Iterable<T>;
  • update(List<T>?values):傳入一個列表values,直接對狀態進行覆蓋;
  • add(T?value):在狀態列表中添加一個元素value;
  • addAll(List<T>?values):向列表中添加多個元素,以列表values形式傳入。

類似地,ListState的狀態描述器就叫作ListStateDescriptor,用法跟ValueStateDescriptor完全一致。

案例:針對每種傳感器輸出最高的3個水位值

public class KeyedListStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) -> element.getTs() * 1000L));sensorDS.keyBy(r -> r.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {ListState<Integer> vcListState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcListState = getRuntimeContext().getListState(new ListStateDescriptor<Integer>("vcListState", Types.INT));}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {// 1.來一條,存到list狀態里vcListState.add(value.getVc());// 2.從list狀態拿出來(Iterable), 拷貝到一個List中,排序, 只留3個最大的Iterable<Integer> vcListIt = vcListState.get();// 2.1 拷貝到List中List<Integer> vcList = new ArrayList<>();for (Integer vc : vcListIt) {vcList.add(vc);}// 2.2 對List進行降序排序vcList.sort((o1, o2) -> o2 - o1);// 2.3 只保留最大的3個(list中的個數一定是連續變大,一超過3就立即清理即可)if (vcList.size() > 3) {// 將最后一個元素清除(第4個)vcList.remove(3);}out.collect("傳感器id為" + value.getId() + ",最大的3個水位值=" + vcList.toString());// 3.更新list狀態vcListState.update(vcList);//                                vcListState.get();            //取出 list狀態 本組的數據,是一個Iterable
//                                vcListState.add();            // 向 list狀態 本組 添加一個元素
//                                vcListState.addAll();         // 向 list狀態 本組 添加多個元素
//                                vcListState.update();         // 更新 list狀態 本組數據(覆蓋)
//                                vcListState.clear();          // 清空List狀態 本組數據}}).print();env.execute();}
}

2.2.3?Map狀態(MapState)

把一些鍵值對(key-value)作為狀態整體保存起來,可以認為就是一組key-value映射的列表。

MapState提供了操作映射狀態的方法,與Map的使用非常類似。

  • UV?get(UK?key):傳入一個key作為參數,查詢對應的value值;
  • put(UK?key, UV?value):傳入一個鍵值對,更新key對應的value值;
  • putAll(Map<UK, UV>?map):將傳入的映射map中所有的鍵值對,全部添加到映射狀態中;
  • remove(UK key):將指定key對應的鍵值對刪除;
  • boolean contains(UK key):判斷是否存在指定的key,返回一個boolean值。

另外,MapState也提供了獲取整個映射相關信息的方法;

  • Iterable<Map.Entry<UK, UV>>?entries():獲取映射狀態中所有的鍵值對;
  • Iterable<UK>?keys():獲取映射狀態中所有的鍵(key),返回一個可迭代Iterable類型;
  • Iterable<UV>?values():獲取映射狀態中所有的值(value),返回一個可迭代Iterable類型;
  • boolean?isEmpty():判斷映射是否為空,返回一個boolean值。

案例需求:統計每種傳感器每種水位值出現的次數。

public class KeyedMapStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) -> element.getTs() * 1000L));sensorDS.keyBy(r -> r.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {MapState<Integer, Integer> vcCountMapState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcCountMapState = getRuntimeContext().getMapState(new MapStateDescriptor<Integer, Integer>("vcCountMapState", Types.INT, Types.INT));}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {// 1.判斷是否存在vc對應的keyInteger vc = value.getVc();if (vcCountMapState.contains(vc)) {// 1.1 如果包含這個vc的key,直接對value+1Integer count = vcCountMapState.get(vc);vcCountMapState.put(vc, ++count);} else {// 1.2 如果不包含這個vc的key,初始化put進去vcCountMapState.put(vc, 1);}// 2.遍歷Map狀態,輸出每個k-v的值StringBuilder outStr = new StringBuilder();outStr.append("======================================\n");outStr.append("傳感器id為" + value.getId() + "\n");for (Map.Entry<Integer, Integer> vcCount : vcCountMapState.entries()) {outStr.append(vcCount.toString() + "\n");}outStr.append("======================================\n");out.collect(outStr.toString());//                                vcCountMapState.get();          // 對本組的Map狀態,根據key,獲取value
//                                vcCountMapState.contains();     // 對本組的Map狀態,判斷key是否存在
//                                vcCountMapState.put(, );        // 對本組的Map狀態,添加一個 鍵值對
//                                vcCountMapState.putAll();  // 對本組的Map狀態,添加多個 鍵值對
//                                vcCountMapState.entries();      // 對本組的Map狀態,獲取所有鍵值對
//                                vcCountMapState.keys();         // 對本組的Map狀態,獲取所有鍵
//                                vcCountMapState.values();       // 對本組的Map狀態,獲取所有值
//                                vcCountMapState.remove();   // 對本組的Map狀態,根據指定key,移除鍵值對
//                                vcCountMapState.isEmpty();      // 對本組的Map狀態,判斷是否為空
//                                vcCountMapState.iterator();     // 對本組的Map狀態,獲取迭代器
//                                vcCountMapState.clear();        // 對本組的Map狀態,清空}}).print();env.execute();}
}

2.2.4?歸約狀態(ReducingState)

歸約邏輯的定義,是在歸約狀態描述器(ReducingStateDescriptor)中,通過傳入一個歸約函數(ReduceFunction)來實現的。這里的歸約函數,就是我們之前介紹reduce聚合算子時講到的ReduceFunction,所以狀態類型跟輸入的數據類型是一樣的。

public ReducingStateDescriptor(String name, ReduceFunction<T> reduceFunction, Class<T> typeClass) {...}

案例:計算每種傳感器的水位和

.process(new KeyedProcessFunction<String, WaterSensor, Integer>() {private ReducingState<Integer> sumVcState;@Overridepublic void open(Configuration parameters) throws Exception {sumVcState = this.getRuntimeContext().getReducingState(new ReducingStateDescriptor<Integer>("sumVcState",Integer::sum,Integer.class));}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<Integer> out) throws Exception {sumVcState.add(value.getVc());out.collect(sumVcState.get());}
})

2.2.5?聚合狀態(AggregatingState)

與歸約狀態非常類似,聚合狀態也是一個值,用來保存添加進來的所有數據的聚合結果。與ReducingState不同的是,它的聚合邏輯是由在描述器中傳入一個更加一般化的聚合函數(AggregateFunction)來定義的;這也就是之前我們講過的AggregateFunction,里面通過一個累加器(Accumulator)來表示狀態,所以聚合的狀態類型可以跟添加進來的數據類型完全不同,使用更加靈活。

案例需求:計算每種傳感器的平均水位

public class KeyedAggregatingStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) -> element.getTs() * 1000L));sensorDS.keyBy(r -> r.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {AggregatingState<Integer, Double> vcAvgAggregatingState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcAvgAggregatingState = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<Integer, Tuple2<Integer, Integer>, Double>("vcAvgAggregatingState",new AggregateFunction<Integer, Tuple2<Integer, Integer>, Double>() {@Overridepublic Tuple2<Integer, Integer> createAccumulator() {return Tuple2.of(0, 0);}@Overridepublic Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) {return Tuple2.of(accumulator.f0 + value, accumulator.f1 + 1);}@Overridepublic Double getResult(Tuple2<Integer, Integer> accumulator) {return accumulator.f0 * 1D / accumulator.f1;}@Overridepublic Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
//                                                                return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);return null;}},Types.TUPLE(Types.INT, Types.INT)));}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {// 將 水位值 添加到  聚合狀態中vcAvgAggregatingState.add(value.getVc());// 從 聚合狀態中 獲取結果Double vcAvg = vcAvgAggregatingState.get();out.collect("傳感器id為" + value.getId() + ",平均水位值=" + vcAvg);//                                vcAvgAggregatingState.get();    // 對 本組的聚合狀態 獲取結果
//                                vcAvgAggregatingState.add();    // 對 本組的聚合狀態 添加數據,會自動進行聚合
//                                vcAvgAggregatingState.clear();  // 對 本組的聚合狀態 清空數據}}).print();env.execute();}
}

2.2.6?狀態生存時間(TTL)

在實際應用中,很多狀態會隨著時間的推移逐漸增長,如果不加以限制,最終就會導致存儲空間的耗盡。

配置狀態的TTL時,需要創建一個StateTtlConfig配置對象,然后調用狀態描述器的.enableTimeToLive()方法啟動TTL功能。

StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(10)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("my state", String.class);stateDescriptor.enableTimeToLive(ttlConfig);

這里用到了幾個配置項:

  • .newBuilder()

狀態TTL配置的構造器方法,必須調用,返回一個Builder之后再調用.build()方法就可以得到StateTtlConfig了。方法需要傳入一個Time作為參數,這就是設定的狀態生存時間

  • .setUpdateType()

設置更新類型。更新類型指定了什么時候更新狀態失效時間,這里的OnCreateAndWrite表示只有創建狀態和更改狀態(寫操作)時更新失效時間。另一種類型OnReadAndWrite則表示無論讀寫操作都會更新失效時間,也就是只要對狀態進行了訪問,就表明它是活躍的,從而延長生存時間。這個配置默認為OnCreateAndWrite。

  • .setStateVisibility()

設置狀態的可見性。所謂的“狀態可見性”,是指因為清除操作并不是實時的,所以當狀態過期之后還有可能繼續存在,這時如果對它進行訪問,能否正常讀取到就是一個問題了。這里設置的NeverReturnExpired是默認行為,表示從不返回過期值,也就是只要過期就認為它已經被清除了,應用不能繼續讀取;這在處理會話或者隱私數據時比較重要。對應的另一種配置是ReturnExpireDefNotCleanedUp,就是如果過期狀態還存在,就返回它的值。

public class StateTTLDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) -> element.getTs() * 1000L));sensorDS.keyBy(r -> r.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {ValueState<Integer> lastVcState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// TODO 1.創建 StateTtlConfigStateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.seconds(5)) // 過期時間5s
//                                        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 狀態 創建和寫入(更新) 更新 過期時間.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite) // 狀態 讀取、創建和寫入(更新) 更新 過期時間.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 不返回過期的狀態值.build();// TODO 2.狀態描述器 啟用 TTLValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("lastVcState", Types.INT);stateDescriptor.enableTimeToLive(stateTtlConfig);this.lastVcState = getRuntimeContext().getState(stateDescriptor);}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {// 先獲取狀態值,打印 ==》 讀取狀態Integer lastVc = lastVcState.value();out.collect("key=" + value.getId() + ",狀態值=" + lastVc);// 如果水位大于10,更新狀態值 ===》 寫入狀態if (value.getVc() > 10) {lastVcState.update(value.getVc());}}}).print();env.execute();}
}

3、狀態后端(State Backends)

在Flink中,狀態的存儲、訪問以及維護,都是由一個可插拔的組件決定的,這個組件就叫作狀態后端(state backend)。狀態后端主要負責管理本地狀態的存儲方式和位置

3.1?狀態后端的分類(HashMapStateBackend/RocksDB

Flink中提供了兩類不同的狀態后端,一種是“哈希表狀態后端”(HashMapStateBackend),另一種是“內嵌RocksDB狀態后端”(EmbeddedRocksDBStateBackend)。

系統默認的狀態后端是HashMapStateBackend。

3.1.1?哈希表狀態后端(HashMapStateBackend)

HashMapStateBackend是把狀態存放在內存里。具體實現上,哈希表狀態后端在內部會直接把狀態當作對象(objects),保存在Taskmanager的JVM堆上

普通的狀態,以及窗口中收集的數據和觸發器,都會以鍵值對的形式存儲起來,所以底層是一個哈希表(HashMap),這種狀態后端也因此得名。

3.1.2?內嵌RocksDB狀態后端(EmbeddedRocksDBStateBackend)

RocksDB是一種內嵌的key-value存儲介質,可以把數據持久化到本地硬盤

配置EmbeddedRocksDBStateBackend后,會將處理中的數據全部放入RocksDB數據庫中,RocksDB默認存儲在TaskManager的本地數據目錄里

3.2?如何選擇正確的狀態后端

HashMap和RocksDB兩種狀態后端最大的區別,就在于本地狀態存放在哪里。

HashMapStateBackend是內存計算,讀寫速度非常快;但是,狀態的大小會受到集群可用內存的限制,如果應用的狀態隨著時間不停地增長,就會耗盡內存資源。

而RocksDB是硬盤存儲,所以可以根據可用的磁盤空間進行擴展,所以它非常適合于超級海量狀態的存儲。不過由于每個狀態的讀寫都需要做序列化/反序列化,而且可能需要直接從磁盤讀取數據,這就會導致性能的降低,平均讀寫性能要比HashMapStateBackend慢一個數量級。

3.3?狀態后端的配置

3.3.1?配置默認的狀態后端

#flink-conf.yaml# 默認狀態后端
state.backend: hashmap# 存放檢查點的文件路徑
# 這里的state.checkpoints.dir配置項,定義了檢查點和元數據寫入的目錄。
state.checkpoints.dir: hdfs://hadoop102:8020/flink/checkpoints

3.3.2?為每個作業(Per-job/Application)單獨配置狀態后端

通過執行環境設置,HashMapStateBackend

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStateBackend(new HashMapStateBackend());

通過執行環境設置,EmbeddedRocksDBStateBackend。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStateBackend(new EmbeddedRocksDBStateBackend());

需要注意,如果想在IDE中使用EmbeddedRocksDBStateBackend,需要為Flink項目添加依賴:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb</artifactId><version>${flink.version}</version>
</dependency>

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/166177.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/166177.shtml
英文地址,請注明出處:http://en.pswp.cn/news/166177.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Redis Transaction事務

Redis 事務的目的是方便用戶一次執行多個命令。執行 Redis 事務可分為三個階段&#xff1a; 開始事務命令入隊執行事務 Redis事務特性 Redis 事務具有兩個重要特性&#xff1a; 1) 單獨的隔離操作 事務中的所有命令都會被序列化&#xff0c;它們將按照順序執行&#xff0c…

圖像標記上線,描點信息盡在掌握丨三疊云

圖像標記 路徑 表單設計 >> 組件 >> 增強組件 功能簡介 「圖像標記」字段是「增強字段」類型字段。用戶通過上傳圖片的方式構建一個背景圖片&#xff0c;并在構建的圖片背景上添加描點信息。搭配「儀表盤」中的「圖像軌跡」&#xff0c;可繪制出相應的數據軌跡…

界面組件DevExpress Reporting v23.1 - Web報表設計器功能升級

DevExpress Reporting是.NET Framework下功能完善的報表平臺&#xff0c;它附帶了易于使用的Visual Studio報表設計器和豐富的報表控件集&#xff0c;包括數據透視表、圖表&#xff0c;因此您可以構建無與倫比、信息清晰的報表 界面組件DevExpress Reporting v23.1已經發布一段…

基于JavaWeb+SSM+Vue微信閱讀小程序的設計和實現

基于JavaWebSSMVue微信閱讀小程序的設計和實現 源碼獲取入口Lun文目錄前言主要技術系統設計功能截圖訂閱經典源碼專欄[Java 源碼獲取 源碼獲取入口 Lun文目錄 第1章 緒論 1 1.1 課題背景 1 1.2 課題意義 1 1.3 研究內容 1 第2章 開發環境與技術 3 2.1 MYSQL數據庫 3 2.2 JSP技…

2016年8月15日 Go生態洞察:Go 1.7版本發布

&#x1f337;&#x1f341; 博主貓頭虎&#xff08;&#x1f405;&#x1f43e;&#xff09;帶您 Go to New World?&#x1f341; &#x1f984; 博客首頁——&#x1f405;&#x1f43e;貓頭虎的博客&#x1f390; &#x1f433; 《面試題大全專欄》 &#x1f995; 文章圖文…

解決traefik/nginx-ingress-controller配置正確的情況訪問域名仍然報錯: Connection Refused的問題

最近碰到一個很奇怪的問題&#xff1a; traefik/nginx-ingress-controller配置正確&#xff0c;但是訪問ingress配置的host域名就是死活報錯&#xff1a; Connection Refused 這樣怎么也找不到原因&#xff0c;然后一咬牙直接在其中一臺節點yum安裝nginx, 通過直接反向代理的方…

微信小程序開發資源匯總

本文收集了微信小程序開發過程中會使用到的資料、問題以及第三方組件庫。本文不是一篇關于如何學習微信小程序的入門指南&#xff0c;也非參考手冊&#xff0c;只是一些資料的整理。 本倉庫中的資料整理自網絡&#xff0c;也有一些來自網友的推薦。 官方文檔 小程序設計指南…

UE5 UI教程學習筆記

參考資料&#xff1a;https://item.taobao.com/item.htm?spma21n57.1.0.0.2b4f523cAV5i43&id716635137219&ns1&abbucket15#detail 基礎工程&#xff1a;https://download.csdn.net/download/qq_17523181/88559312 1. 介紹 工程素材 2. 創建Widget UE5 UI系統的…

那些被玩爛了的設計模式

單例模式 單例模式是指一個類在一個進程中只有一個實例對象&#xff08;但也不一定&#xff0c;比如Spring中的Bean的單例是指在一個容器中是單例的&#xff09; 單例模式創建分為餓漢式和懶漢式&#xff0c;總共大概有8種寫法。但是在開源項目中使用最多的主要有兩種寫法&am…

electron實現截圖的功能

Electron是一種跨平臺的桌面應用程序開發框架&#xff0c;可以使用HTML、CSS和JavaScript等Web技術構建桌面應用程序。下面是一種使用Electron實現截圖的簡單方法&#xff1a; 安裝Electron和截圖庫 首先&#xff0c;需要安裝Electron和一個截圖庫&#xff0c;例如electron-sc…

替換jar文件中的jar文件中的class

文件格式 testjar.jar在ruoyi.jar中。 AssetServiceImpl.class在testjar.jar 查找testjar.jar路徑 jar -tvf ruoyi.jar | grep testjar.jar 解析testjar.jar jar -xvf ruoyi.jar BOOT-INF/lib/testjar.jar 查找class文件路徑 jar -tvf testjar.jar | grep AssetServiceImp…

ELK: logstash gork filter 多個模式(pattern)匹配規則語法和多行日志匹配設置

項目里用logstash分析日志&#xff0c;由于有多種模式&#xff08;pattern&#xff09;需要匹配&#xff0c;網上搜了很多示例&#xff0c;發現這些都是老的寫法&#xff0c;都會報錯&#xff0c;后來查閱了官方文檔&#xff0c;才發現&#xff0c;新版本只支持新語法。 錯誤的…

【MISRA-C 2012】濃縮版解讀

文章目錄 1、前言2、簡介2.1、如何看待MISRA-C 20122.2、準則(guidelines)里面的指示(Directive)和規則(Rule)2.3、準則(guidelines)的級別(Category) 3、若干重要的Directive和Rule3.1、指示(Directive)Dir 2.1&#xff08;必要&#xff09; 所有的源文件編譯過程不得有編譯錯…

聚類筆記/sklearn筆記:Affinity Propagation親和力傳播

1 算法原理 1.1 基本思想 將全部數據點都當作潛在的聚類中心(稱之為 exemplar )然后數據點兩兩之間連線構成一個網絡( 相似度矩陣 )再通過網絡中各條邊的消息( responsibility 和 availability )傳遞計算出各樣本的聚類中心。 1.2 主要概念 Examplar聚類中心similarity S(i…

Java Excel Poi 單元格內置的數據格式

位置 //在類 org.apache.poi.ss.usermodel.BuiltinFormats 中的私有成員變量_formats中 private static final String[] _formats new String[]{"General", "0", "0.00", "#,##0", "#,##0.00", "\"$\"#,##…

【ARM CoreLink 系列 3.2 -- CCI-400,CCI-500, CCI-550 差異】

文章目錄 CCI-400 和 CCI-500 差異ARM CCI-400ARM CCI-500ARM CCI-550CCI-400 和 CCI-500 差異 ARM的 CCI(Cache Coherent Interconnect)系列產品是用于多核處理器之間的高性能緩存一致性互連。CCI-400 和 CCI-500 是該系列中的兩種設計,它們旨在允許多個處理器核心和其他資…

TopNet-(CVPR2023)前背景圖像合成

文章目錄 摘要引言算法架構結構損失函數 實驗數據集評估SOTA比較模型是否過擬合到修復區域泛化到真實圖片消融實驗 討論及結論限制 參考文獻 摘要 作者調研自動放置目標到背景進行圖像合成的問題。提供背景圖、分割的目標&#xff0c;訓練模型預測合理放置信息&#xff08;位置…

JavaScript文檔加載和文檔準備的區別

你可能已經聽說過JavaScript中的“文檔加載”和“文檔準備”這兩個術語。雖然它們聽起來很相似&#xff0c;但它們實際上有一些重要的區別。在本文中&#xff0c;我們將深入探討這兩個概念的區別&#xff0c;以及它們在實際編碼中的應用。 引言 在開始討論JS文檔加載和文檔準備…

批量添加PPT備注

我一直都覺得&#xff0c;用python高效辦公&#xff0c;是件沒必要的事。。。 但直到最近寫課做PPT&#xff0c;做了80多頁PPT&#xff0c;要把每一頁PPT的備注粘貼進去時 我覺得&#xff0c;有什么關系呢&#xff0c;一頁一頁粘 但是粘到5頁&#xff0c;我感覺ctlc\v頻率有點兒…

程序員接單,寶藏好平臺抄底攻略清單!五大平臺精選。

前陣子“雙十一”購物節狂歡促銷&#xff0c;各種好貨清單席卷而來。 程序員購不購物我不知道&#xff0c;但是這個兼職、接單清單相信你一定用得著。 搜羅海量信息&#xff0c;整理大量數據與評價&#xff0c;挖出了5個寶藏平臺&#xff0c;絕對個個精選&#xff0c;保證量大…