Flink(十):DataStream API (七) 狀態

1. 狀態的定義

Apache Flink 中,狀態(State) 是指在數據流處理過程中需要持久化和追蹤的中間數據,它允許 Flink 在處理事件時保持上下文信息,從而支持復雜的流式計算任務,如聚合、窗口計算、聯接等。狀態是 Flink 處理有狀態操作(如窗口、時間戳操作、聚合等)的核心組成部分。

2. 狀態的類型

Flink 提供了強大的狀態管理機制,允許應用程序在分布式環境中處理狀態,保證高可用性和容錯性。Flink 的狀態分為 Keyed StateOperator State,并提供了不同的存儲和恢復機制。

2.1 Keyed State(按鍵狀態)

  • Keyed State 是基于流中每個元素的鍵進行管理的狀態。每個鍵會有一個獨立的狀態,這對于需要按照每個輸入元素的唯一標識符(如用戶 ID、商品 ID 等)維護狀態的操作非常有用。
  • Keyed State 主要用于需要對流中的每個“鍵”進行獨立計算的場景,如按用戶進行會話計算、按時間窗口聚合等。

常見的 Keyed State 類型包括:

  • ValueState:存儲與每個鍵相關聯的單個值。
  • ListState:存儲與每個鍵相關聯的多個值,通常用來表示一個元素列表。
  • MapState:存儲與每個鍵相關聯的鍵值對,適用于需要維護多個關聯數據的場景。
  • ReducingState:支持對每個鍵的值進行累加或其他聚合操作。
  • AggregatingState:可以根據給定的聚合函數對每個鍵的狀態進行聚合。

2.2 Operator State(操作符狀態)

  • Operator State 是由操作符(如 Flink 中的算子)管理的狀態,通常用于保持操作符內部的狀態信息,不與鍵相關聯。它用于管理一些需要跨整個流處理作業的全局狀態,如窗口管理、算子內部緩沖區等。
  • Operator State 主要用于在分布式環境中處理 算子 級別的狀態,尤其在對狀態進行恢復時非常重要,幫助 Flink 恢復作業。

常見的 Operator State 類型包括:

  • ListState:與鍵無關,存儲多個值。
  • UnionListState :?與鍵無關,存儲多個值,?UnionListStateListState 的擴展,主要用于 跨多個并行實例 共享狀態。在 Flink 的 流式應用程序 中,如果多個并行實例需要訪問和修改共享的狀態,通常使用 UnionListState。

  • BroadcastState:存儲和廣播信息。

3. Keyed State

Keyed 狀態可以看作是一個嵌入式的鍵值存儲。該狀態是與由有狀態操作符讀取的流一起嚴格地進行分區和分布的。因此,只有在 Keyed 流 上才能訪問鍵值狀態,也就是說,只有在進行鍵控/分區數據交換后,才能訪問與當前事件的鍵相關聯的值。將流的鍵與狀態對齊確保了所有的狀態更新都是本地操作,從而在沒有事務開銷的情況下保證一致性。這個對齊還使得 Flink 能夠透明地重新分配狀態并調整流的分區。

Keyed 狀態進一步組織為所謂的 Key Groups(鍵組)。Key Groups 是 Flink 重新分配 Keyed 狀態的最小單位;其數量與定義的最大并行度相同。在執行過程中,每個并行實例的鍵控操作符都處理一個或多個 Key Groups 中的鍵。

3.1 使用 Keyed State

keyed state 接口提供不同類型狀態的訪問接口,這些狀態都作用于當前輸入數據的 key 下。換句話說,這些狀態僅可在?KeyedStream?上使用,在Java/Scala API上可以通過?stream.keyBy(...)?得到?KeyedStream,在Python API上可以通過?stream.key_by(...)?得到?KeyedStream

接下來,我們會介紹不同類型的狀態,然后介紹如何使用他們。所有支持的狀態類型如下所示:

  • ValueState<T>: 保存一個可以更新和檢索的值(如上所述,每個值都對應到當前的輸入數據的 key,因此算子接收到的每個 key 都可能對應一個值)。 這個值可以通過?update(T)?進行更新,通過?T value()?進行檢索。

  • ListState<T>: 保存一個元素的列表。可以往這個列表中追加數據,并在當前的列表上進行檢索。可以通過?add(T)?或者?addAll(List<T>)?進行添加元素,通過?Iterable<T> get()?獲得整個列表。還可以通過?update(List<T>)?覆蓋當前的列表。

  • ReducingState<T>: 保存一個單值,表示添加到狀態的所有值的聚合。接口與?ListState?類似,但使用?add(T)?增加元素,會使用提供的?ReduceFunction?進行聚合。

  • AggregatingState<IN, OUT>: 保留一個單值,表示添加到狀態的所有值的聚合。和?ReducingState?相反的是, 聚合類型可能與 添加到狀態的元素的類型不同。 接口與?ListState?類似,但使用?add(IN)?添加的元素會用指定的?AggregateFunction?進行聚合。

  • MapState<UK, UV>: 維護了一個映射列表。 你可以添加鍵值對到狀態中,也可以獲得反映當前所有映射的迭代器。使用?put(UK,UV)?或者?putAll(Map<UK,UV>)?添加映射。 使用?get(UK)?檢索特定 key。 使用?entries()keys()?和?values()?分別檢索映射、鍵和值的可迭代視圖。你還可以通過?isEmpty()?來判斷是否包含任何鍵值對。

所有類型的狀態還有一個clear()?方法,清除當前 key 下的狀態數據,也就是當前輸入元素的 key。

請牢記,這些狀態對象僅用于與狀態交互。狀態本身不一定存儲在內存中,還可能在磁盤或其他位置。 另外需要牢記的是從狀態中獲取的值取決于輸入元素所代表的 key。 因此,在不同 key 上調用同一個接口,可能得到不同的值。

你必須創建一個?StateDescriptor,才能得到對應的狀態句柄。 這保存了狀態名稱(正如我們稍后將看到的,你可以創建多個狀態,并且它們必須具有唯一的名稱以便可以引用它們), 狀態所持有值的類型,并且可能包含用戶指定的函數,例如ReduceFunction。 根據不同的狀態類型,可以創建ValueStateDescriptorListStateDescriptor,?AggregatingStateDescriptor,?ReducingStateDescriptor?或?MapStateDescriptor

狀態通過?RuntimeContext?進行訪問,因此只能在?rich functions?中使用。RichFunction?中?RuntimeContext?提供如下方法:

  • ValueState<T> getState(ValueStateDescriptor<T>)
  • ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
  • ListState<T> getListState(ListStateDescriptor<T>)
  • AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
  • MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)

下面是一個?FlatMapFunction?的例子,展示了如何將這些部分組合起來:

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {/*** The ValueState handle. The first field is the count, the second field a running sum.*/private transient ValueState<Tuple2<Long, Long>> sum;@Overridepublic void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {// access the state valueTuple2<Long, Long> currentSum = sum.value();// update the countcurrentSum.f0 += 1;// add the second field of the input valuecurrentSum.f1 += input.f1;// update the statesum.update(currentSum);// if the count reaches 2, emit the average and clear the stateif (currentSum.f0 >= 2) {out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));sum.clear();}}@Overridepublic void open(OpenContext ctx) {ValueStateDescriptor<Tuple2<Long, Long>> descriptor =new ValueStateDescriptor<>("average", // the state nameTypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type informationTuple2.of(0L, 0L)); // default value of the state, if nothing was setsum = getRuntimeContext().getState(descriptor);}
}// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L)).keyBy(value -> value.f0).flatMap(new CountWindowAverage()).print();// the printed output will be (1,4) and (1,5)

3.2?狀態有效期 (TTL)

任何類型的 keyed state 都可以有?有效期?(TTL)。如果配置了 TTL 且狀態值已過期,則會盡最大可能清除對應的值,所有狀態類型都支持單元素的 TTL。 這意味著列表元素和映射元素將獨立到期。在使用狀態 TTL 前,需要先構建一個StateTtlConfig?配置對象。 然后把配置傳遞到 state descriptor 中啟用 TTL 功能:

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import java.time.Duration;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Duration.ofSeconds(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);

TTL 配置有以下幾個選項:?newBuilder?的第一個參數表示數據的有效期,是必選項。TTL 的更新策略(默認是?OnCreateAndWrite):

  • StateTtlConfig.UpdateType.OnCreateAndWrite?- 僅在創建和寫入時更
  • StateTtlConfig.UpdateType.OnReadAndWrite?- 讀取時也更新?

數據在過期但還未被清理時的可見性配置如下(默認為?NeverReturnExpired):

  • StateTtlConfig.StateVisibility.NeverReturnExpired?- 不返回過期數據
  • StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp?- 會返回過期但未清理的數據

NeverReturnExpired?情況下,過期數據就像不存在一樣,不管是否被物理刪除。這對于不能訪問過期數據的場景下非常有用,比如敏感數據。?ReturnExpiredIfNotCleanedUp?在數據被物理刪除前都會返回。

注意:

  • 狀態上次的修改時間會和數據一起保存在 state backend 中,因此開啟該特性會增加狀態數據的存儲。 Heap state backend 會額外存儲一個包括用戶狀態以及時間戳的 Java 對象,RocksDB state backend 會在每個狀態值(list 或者 map 的每個元素)序列化后增加 8 個字節。
  • 暫時只支持基于?processing time?的 TTL。
  • 嘗試從 checkpoint/savepoint 進行恢復時,TTL 的狀態(是否開啟)必須和之前保持一致,否則會遇到 “StateMigrationException”。
  • TTL 的配置并不會保存在 checkpoint/savepoint 中,僅對當前 Job 有效。
  • 不建議checkpoint恢復前后將state TTL從短調長,這可能會產生潛在的數據錯誤。
  • 當前開啟 TTL 的 map state 僅在用戶值序列化器支持 null 的情況下,才支持用戶值為 null。如果用戶值序列化器不支持 null, 可以用?NullableSerializer?包裝一層。
  • 啟用 TTL 配置后,StateDescriptor?中的?defaultValue(已被標記?deprecated)將會失效。這個設計的目的是為了確保語義更加清晰,在此基礎上,用戶需要手動管理那些實際值為 null 或已過期的狀態默認值。

3.2.1 過期數據的清理

默認情況下,過期數據會在讀取的時候被刪除,例如?ValueState#value,同時會有后臺線程定期清理(如果 StateBackend 支持的話)。可以通過?StateTtlConfig?配置關閉后臺清理:

import org.apache.flink.api.common.state.StateTtlConfig;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Duration.ofSeconds(1)).disableCleanupInBackground().build();

可以按照如下所示配置更細粒度的后臺清理策略。當前的實現中?HeapStateBackend?依賴增量數據清理,RocksDBStateBackend?利用壓縮過濾器進行后臺清理。

3.2.2 全量快照時進行清理?

另外,你可以啟用全量快照時進行清理的策略,這可以減少整個快照的大小。當前實現中不會清理本地的狀態,但從上次快照恢復時,不會恢復那些已經刪除的過期數據。 該策略可以通過?StateTtlConfig?配置進行配置:

import org.apache.flink.api.common.state.StateTtlConfig;
import java.time.Duration;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Duration.ofSeconds(1)).cleanupFullSnapshot().build();

這種策略在?RocksDBStateBackend?的增量 checkpoint 模式下無效。

注意:這種清理方式可以在任何時候通過?StateTtlConfig?啟用或者關閉,比如在從 savepoint 恢復時。

3.2.3 增量數據清理

另外可以選擇增量式清理狀態數據,在狀態訪問或/和處理時進行。如果某個狀態開啟了該清理策略,則會在存儲后端保留一個所有狀態的惰性全局迭代器。 每次觸發增量清理時,從迭代器中選擇已經過期的數進行清理。該特性可以通過?StateTtlConfig?進行配置:

import org.apache.flink.api.common.state.StateTtlConfig;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Duration.ofSeconds(1)).cleanupIncrementally(10, true).build();

該策略有兩個參數。 第一個是每次清理時檢查狀態的條目數,在每個狀態訪問時觸發。第二個參數表示是否在處理每條記錄時觸發清理。 Heap backend 默認會檢查 5 條狀態,并且關閉在每條記錄時觸發清理。

注意:

  • 如果沒有 state 訪問,也沒有處理數據,則不會清理過期數據。
  • 增量清理會增加數據處理的耗時。
  • 現在僅 Heap state backend 支持增量清除機制。在 RocksDB state backend 上啟用該特性無效。
  • 如果 Heap state backend 使用同步快照方式,則會保存一份所有 key 的拷貝,從而防止并發修改問題,因此會增加內存的使用。但異步快照則沒有這個問題。
  • 對已有的作業,這個清理方式可以在任何時候通過?StateTtlConfig?啟用或禁用該特性,比如從 savepoint 重啟后.

3.2.4 在 RocksDB 壓縮時清理

如果使用 RocksDB state backend,則會啟用 Flink 為 RocksDB 定制的壓縮過濾器。RocksDB 會周期性的對數據進行合并壓縮從而減少存儲空間。 Flink 提供的 RocksDB 壓縮過濾器會在壓縮時過濾掉已經過期的狀態數據。該特性可以通過?StateTtlConfig?進行配置:

import org.apache.flink.api.common.state.StateTtlConfig;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Duration.ofSeconds(1)).cleanupInRocksdbCompactFilter(1000, Duration.ofHours(1)).build();

Flink 處理一定條數的狀態數據后,會使用當前時間戳來檢測 RocksDB 中的狀態是否已經過期, 你可以通過?

StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries)?方法指定處理狀態的條數。 時間戳更新的越頻繁,狀態的清理越及時,但由于壓縮會有調用 JNI 的開銷,因此會影響整體的壓縮性能。 RocksDB backend 的默認后臺清理策略會每處理 1000 條數據進行一次。定期壓縮可以加速過期狀態條目的清理,特別是對于很少訪問的狀態條目。 比這個值早的文件將被選取進行壓縮,并重新寫入與之前相同的 Level 中。 該功能可以確保文件定期通過壓縮過濾器壓縮。 您可以通過StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries, Duration periodicCompactionTime)?方法設定定期壓縮的時間。 定期壓縮的時間的默認值是 30 天。 您可以將其設置為 0 以關閉定期壓縮或設置一個較小的值以加速過期狀態條目的清理,但它將會觸發更多壓縮。還可以通過配置開啟 RocksDB 過濾器的 debug 日志:?log4j.logger.org.rocksdb.FlinkCompactionFilter=DEBUG

注意:

  • 壓縮時調用 TTL 過濾器會降低速度。TTL 過濾器需要解析上次訪問的時間戳,并對每個將參與壓縮的狀態進行是否過期檢查。 對于集合型狀態類型(比如 list 和 map),會對集合中每個元素進行檢查。
  • 對于元素序列化后長度不固定的列表狀態,TTL 過濾器需要在每次 JNI 調用過程中,額外調用 Flink 的 java 序列化器, 從而確定下一個未過期數據的位置。
  • 對已有的作業,這個清理方式可以在任何時候通過?StateTtlConfig?啟用或禁用該特性,比如從 savepoint 重啟后。
  • 定期壓縮功能只在 TTL 啟用時生效。

4. Operator State

算子狀態(或者非 keyed 狀態)是綁定到一個并行算子實例的狀態。Kafka Connector?是 Flink 中使用算子狀態一個很具有啟發性的例子。Kafka consumer 每個并行實例維護了 topic partitions 和偏移量的 map 作為它的算子狀態。當并行度改變的時候,算子狀態支持將狀態重新分發給各并行算子實例。處理重分發過程有多種不同的方案。在典型的有狀態 Flink 應用中你無需使用算子狀態。它大都作為一種特殊類型的狀態使用。用于實現 source/sink,以及無法對 state 進行分區而沒有主鍵的這類場景中。

4.1?廣播狀態 (Broadcast State)

廣播狀態是一種特殊的算子狀態。引入它的目的在于支持一個流中的元素需要廣播到所有下游任務的使用情形。在這些任務中廣播狀態用于保持所有子任務狀態相同。 該狀態接下來可在第二個處理記錄的數據流中訪問。可以設想包含了一系列用于處理其他流中元素規則的低吞吐量數據流,這個例子自然而然地運用了廣播狀態。 考慮到上述這類使用情形,廣播狀態和其他算子狀態的不同之處在于:

  1. 它具有 map 格式,
  2. 它僅在一些特殊的算子中可用。這些算子的輸入為一個廣播數據流和非廣播數據流,
  3. 這類算子可以擁有不同命名的多個廣播狀態?。

4.2 使用 Operator State

用戶可以通過實現?CheckpointedFunction?接口來使用 operator stateCheckpointedFunction?接口提供了訪問 non-keyed state 的方法,需要實現如下兩個方法:

void snapshotState(FunctionSnapshotContext context) throws Exception;void initializeState(FunctionInitializationContext context) throws Exception;

進行 checkpoint 時會調用?snapshotState()。 用戶自定義函數初始化時會調用?initializeState(),初始化包括第一次自定義函數初始化和從之前的 checkpoint 恢復。 因此?initializeState()?不僅是定義不同狀態類型初始化的地方,也需要包括狀態恢復的邏輯。當前 operator state 以 list 的形式存在。這些狀態是一個?可序列化?對象的集合?List,彼此獨立,方便在改變并發后進行狀態的重新分派。 換句話說,這些對象是重新分配 non-keyed state 的最細粒度。根據狀態的不同訪問方式,有如下幾種重新分配的模式:

  • Even-split redistribution:?每個算子都保存一個列表形式的狀態集合,整個狀態由所有的列表拼接而成。當作業恢復或重新分配的時候,整個狀態會按照算子的并發度進行均勻分配。 比如說,算子 A 的并發讀為 1,包含兩個元素?element1?和?element2,當并發讀增加為 2 時,element1?會被分到并發 0 上,element2?則會被分到并發 1 上。

  • Union redistribution:?每個算子保存一個列表形式的狀態集合。整個狀態由所有的列表拼接而成。當作業恢復或重新分配時,每個算子都將獲得所有的狀態數據。如果你的列表可能具有高基數,請不要使用此功能。檢查點元數據將存儲指向每個列表項的偏移量,這可能導致 RPC 幀大小或內存溢出錯誤。

下面的例子中的?SinkFunction?在?CheckpointedFunction?中進行數據緩存,然后統一發送到下游,這個例子演示了列表狀態數據的 event-split redistribution。

public class BufferingSinkimplements SinkFunction<Tuple2<String, Integer>>,CheckpointedFunction {private final int threshold;private transient ListState<Tuple2<String, Integer>> checkpointedState;private List<Tuple2<String, Integer>> bufferedElements;public BufferingSink(int threshold) {this.threshold = threshold;this.bufferedElements = new ArrayList<>();}@Overridepublic void invoke(Tuple2<String, Integer> value, Context contex) throws Exception {bufferedElements.add(value);if (bufferedElements.size() >= threshold) {for (Tuple2<String, Integer> element: bufferedElements) {// send it to the sink}bufferedElements.clear();}}@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {checkpointedState.update(bufferedElements);}@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {ListStateDescriptor<Tuple2<String, Integer>> descriptor =new ListStateDescriptor<>("buffered-elements",TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));checkpointedState = context.getOperatorStateStore().getListState(descriptor);if (context.isRestored()) {for (Tuple2<String, Integer> element : checkpointedState.get()) {bufferedElements.add(element);}}}
}

initializeState?方法接收一個?FunctionInitializationContext?參數,會用來初始化 non-keyed state 的 “容器”。這些容器是一個?ListState?用于在 checkpoint 時保存 non-keyed state 對象。注意這些狀態是如何初始化的,和 keyed state 類似,StateDescriptor?會包括狀態名字、以及狀態類型相關信息。

ListStateDescriptor<Tuple2<String, Integer>> descriptor =new ListStateDescriptor<>("buffered-elements",TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));checkpointedState = context.getOperatorStateStore().getListState(descriptor);

調用不同的獲取狀態對象的接口,會使用不同的狀態分配算法。比如?

getUnionListState(descriptor)?會使用 union redistribution 算法, 而?getListState(descriptor)?則簡單的使用 even-split redistribution 算法。

當初始化好狀態對象后,我們通過?isRestored()?方法判斷是否從之前的故障中恢復回來,如果該方法返回?true?則表示從故障中進行恢復,會執行接下來的恢復邏輯。

正如代碼所示,BufferingSink?中初始化時,恢復回來的?ListState?的所有元素會添加到一個局部變量中,供下次?snapshotState()?時使用。 然后清空?ListState,再把當前局部變量中的所有元素寫入到 checkpoint 中。另外,我們同樣可以在?initializeState()?方法中使用?FunctionInitializationContext?初始化 keyed state。

4.3 帶狀態的 Source Function?

帶狀態的數據源比其他的算子需要注意更多東西。為了保證更新狀態以及輸出的原子性(用于支持 exactly-once 語義),用戶需要在發送數據前獲取數據源的全局鎖。

public static class CounterSourceextends RichParallelSourceFunction<Long>implements CheckpointedFunction {/**  current offset for exactly once semantics */private Long offset = 0L;/** flag for job cancellation */private volatile boolean isRunning = true;/** 存儲 state 的變量. */private ListState<Long> state;@Overridepublic void run(SourceContext<Long> ctx) {final Object lock = ctx.getCheckpointLock();while (isRunning) {// output and state update are atomicsynchronized (lock) {ctx.collect(offset);offset += 1;}}}@Overridepublic void cancel() {isRunning = false;}@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {state = context.getOperatorStateStore().getListState(new ListStateDescriptor<>("state",LongSerializer.INSTANCE));// 從我們已保存的狀態中恢復 offset 到內存中,在進行任務恢復的時候也會調用此初始化狀態的方法for (Long l : state.get()) {offset = l;}}@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {state.update(Collections.singletonList(offset));}
}

希望訂閱 checkpoint 成功消息的算子,可以參考?

org.apache.flink.api.common.state.CheckpointListener?接口。

5. State Backends

鍵/值索引存儲的具體數據結構取決于所選擇的狀態后端。一種狀態后端將數據存儲在內存中的哈希映射中,另一種狀態后端則使用 RocksDB 作為鍵/值存儲。除了定義存儲狀態的數據結構外,狀態后端還實現了在某個時間點對鍵/值狀態進行快照并將該快照作為檢查點的一部分存儲的邏輯。狀態后端可以在不更改應用程序邏輯的情況下進行配置。

Flink 提供了多種 state backends,它用于指定狀態的存儲方式和位置。狀態可以位于 Java 的堆或堆外內存。取決于你的 state backend,Flink 也可以自己管理應用程序的狀態。 為了讓應用程序可以維護非常大的狀態,Flink 可以自己管理內存(如果有必要可以溢寫到磁盤)。 默認情況下,所有 Flink Job 會使用?Flink 配置文件?中指定的 state backend。但是,配置文件中指定的默認 state backend 會被 Job 中指定的 state backend 覆蓋,如下所示。

Configuration config = new Configuration();
config.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
env.configure(config);

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/66433.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/66433.shtml
英文地址,請注明出處:http://en.pswp.cn/web/66433.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

C#項目生成時提示缺少引用

問題描述 剛從git或svn拉取下來的C#項目&#xff0c;在VS生成時提示缺少引用 解決方案 1、從“管理NuGet程序包”中下載并安裝缺少的引用&#xff0c;如果引用較多逐個下載安裝會比較麻煩&#xff0c;建議采用下面第2種方案處理 2、通過命令對所有缺少引用進行安裝 &#…

EAMM: 通過基于音頻的情感感知運動模型實現的一次性情感對話人臉合成

EAMM: 通過基于音頻的情感感知運動模型實現的一次性情感對話人臉合成 1所有的材料都可以在EAMM: One-Shot Emotional Talking Face via Audio-Based Emotion-Aware Motion Model網站上找到。 摘要 盡管音頻驅動的對話人臉生成技術已取得顯著進展&#xff0c;但現有方法要么忽…

BeanFactory 是什么?它與 ApplicationContext 有什么區別?

談到Spring&#xff0c;那勢必要講講容器 BeanFactory 和 ApplicationContext。 BeanFactory是什么&#xff1f; BeanFactory&#xff0c;其實就是 Spring 容器&#xff0c;用于管理和操作 Spring 容器中的 Bean。可能此時又有初學的小伙伴會問&#xff1a;Bean 是什么&#x…

【深度學習】Huber Loss詳解

文章目錄 1. Huber Loss 原理詳解2. Pytorch 代碼詳解3.與 MSELoss、MAELoss 區別及各自優缺點3.1 MSELoss 均方誤差損失3.2 MAELoss 平均絕對誤差損失3.3 Huber Loss 4. 總結4.1 優化平滑4.2 梯度較好4.3 為什么說 MSE 是平滑的 1. Huber Loss 原理詳解 Huber Loss 是一種結合…

python實現pdf轉word和excel

一、引言   在辦公中&#xff0c;我們經常遇收到pdf文件格式&#xff0c;因為pdf格式文件不易修改&#xff0c;當我們需要編輯這些pdf文件時&#xff0c;經常需要開通會員或收費功能才能使用編輯功能。今天&#xff0c;我要和大家分享的&#xff0c;是如何使用python編程實現…

【PyCharm】連接Jupyter Notebook

【PyCharm】相關鏈接 【PyCharm】連接 Git【PyCharm】連接Jupyter Notebook【PyCharm】快捷鍵使用【PyCharm】遠程連接Linux服務器【PyCharm】設置為中文界面 【PyCharm】連接Jupyter Notebook PyCharm連接Jupyter Notebook的過程可以根據不同的需求分為 本地連接 和 遠程連…

Java鎖 公平鎖和非公平鎖 ReentrantLock() 深入源碼解析

賣票問題 我們現在有五個售票員 五個線程分別賣票 賣票 ReentrantLock(); 運行后全是 a 對象獲取 非公平鎖缺點之一 容易出現鎖饑餓 默認是使用的非公平鎖 也可以傳入一個 true 參數 使其變成公平鎖 生活中排隊講求先來后到 視為公平 程序中的公平性也是符合請求鎖的絕對…

「劉一哥GIS」系列專欄《GRASS GIS零基礎入門實驗教程(配套案例數據)》專欄上線了

「劉一哥GIS」系列專欄《GRASS GIS零基礎入門實驗教程》全新上線了&#xff0c;歡迎廣大GISer朋友關注&#xff0c;一起探索GIS奧秘&#xff0c;分享GIS價值&#xff01; 本專欄以實戰案例的形式&#xff0c;深入淺出地介紹了GRASS GIS的基本使用方法&#xff0c;用一個個實例講…

企業級NoSQL數據庫Redis

1.瀏覽器緩存過期機制 1.1 最后修改時間 last-modified 瀏覽器緩存機制是優化網頁加載速度和減少服務器負載的重要手段。以下是關于瀏覽器緩存過期機制、Last-Modified 和 ETag 的詳細講解&#xff1a; 一、Last-Modified 頭部 定義&#xff1a;Last-Modified 表示服務器上資源…

使用Flask和Pydantic實現參數驗證

使用Flask和Pydantic實現參數驗證 1 簡介 Pydantic是一個用于數據驗證和解析的 Python 庫&#xff0c;版本2的性能有較大提升&#xff0c;很多框架使用Pydantic做數據校驗。 # 官方參考文檔 https://docs.pydantic.dev/latest/# Github地址 https://github.com/pydantic/pyd…

ScratchLLMStepByStep:訓練自己的Tokenizer

1. 引言 分詞器是每個大語言模型必不可少的組件&#xff0c;但每個大語言模型的分詞器幾乎都不相同。如果要訓練自己的分詞器&#xff0c;可以使用huggingface的tokenizers框架&#xff0c;tokenizers包含以下主要組件&#xff1a; Tokenizer: 分詞器的核心組件&#xff0c;定…

C# OpenCvSharp 部署3D人臉重建3DDFA-V3

目錄 說明 效果 模型信息 landmark.onnx net_recon.onnx net_recon_mbnet.onnx retinaface_resnet50.onnx 項目 代碼 下載 參考 C# OpenCvSharp 部署3D人臉重建3DDFA-V3 說明 地址&#xff1a;https://github.com/wang-zidu/3DDFA-V3 3DDFA_V3 uses the geometri…

從零開始學數據庫 day2 DML

從零開始學數據庫&#xff1a;DML操作詳解 在今天的數字化時代&#xff0c;數據庫的使用已經成為了各行各業的必備技能。無論你是想開發一個簡單的應用&#xff0c;還是想要管理復雜的數據&#xff0c;掌握數據庫的基本操作都是至關重要的。在這篇博客中&#xff0c;我們將專注…

Java 8 Stream API

文章目錄 Java 8 Stream API1. Stream2. Stream 的創建3. 常見的 Stream 操作3.1 中間操作3.2 終止操作 4. Stream 的并行操作 Java 8 Stream API Java 8 引入了 Stream API&#xff0c;使得對集合類&#xff08;如 List、Set 等&#xff09;的操作變得更加簡潔和直觀。Stream…

運行fastGPT 第五步 配置FastGPT和上傳知識庫 打造AI客服

運行fastGPT 第五步 配置FastGPT和上傳知識庫 打造AI客服 根據上一步的步驟&#xff0c;已經調試了ONE API的接口&#xff0c;下面&#xff0c;我們就登陸fastGPT吧 http://xxx.xxx.xxx.xxx:3000/ 這個就是你的fastGPT后臺地址&#xff0c;可以在configer文件中找到。 賬號是…

第4章 Kafka核心API——Kafka客戶端操作

Kafka客戶端操作 一. 客戶端操作1. AdminClient API 一. 客戶端操作 1. AdminClient API

【王樹森搜索引擎技術】相關性02:評價指標(AUC、正逆序比、DCG)

相關性的評價指標 Pointwise評價指標&#xff1a;Area Under the Curve&#xff08;AUC&#xff09;Pairwise評價指標&#xff1a;正逆序比&#xff08;Positive to Negative Ratio, PNR&#xff09;Listwise評價指標&#xff1a;Discounted Cumulative Gain(DCG)用AUC和PNR作…

人物一致性訓練測評數據集

1.Pulid 訓練:由1.5M張從互聯網收集的高質量人類圖像組成,圖像標題由blip2自動生成。 測試:從互聯網上收集了一個多樣化的肖像測試集,該數據集涵蓋了多種膚色、年齡和性別,共計120張圖像,我們稱之為DivID-120,作為補充資源,還使用了最近開源的測試集Unsplash-50,包含…

Android 項目依賴沖突問題:Duplicate class found in modules

問題描述與處理處理 1、問題描述 plugins {id com.android.application }android {compileSdk 34defaultConfig {applicationId "com.my.dialog"minSdk 21targetSdk 34versionCode 1versionName "1.0"testInstrumentationRunner "androidx.test.run…

計算機網絡 | 什么是公網、私網、NAT?

關注&#xff1a;CodingTechWork 引言 計算機網絡是現代信息社會的基石&#xff0c;而網絡通信的順暢性和安全性依賴于有效的IP地址管理和網絡轉換機制。在網絡中&#xff0c;IP地址起到了標識設備和進行數據傳輸的核心作用。本文將詳細討論公網IP、私網IP以及NAT轉換等網絡技…