1. 狀態的定義
在 Apache Flink 中,狀態(State) 是指在數據流處理過程中需要持久化和追蹤的中間數據,它允許 Flink 在處理事件時保持上下文信息,從而支持復雜的流式計算任務,如聚合、窗口計算、聯接等。狀態是 Flink 處理有狀態操作(如窗口、時間戳操作、聚合等)的核心組成部分。
2. 狀態的類型
Flink 提供了強大的狀態管理機制,允許應用程序在分布式環境中處理狀態,保證高可用性和容錯性。Flink 的狀態分為 Keyed State 和 Operator State,并提供了不同的存儲和恢復機制。
2.1 Keyed State(按鍵狀態)
- Keyed State 是基于流中每個元素的鍵進行管理的狀態。每個鍵會有一個獨立的狀態,這對于需要按照每個輸入元素的唯一標識符(如用戶 ID、商品 ID 等)維護狀態的操作非常有用。
- Keyed State 主要用于需要對流中的每個“鍵”進行獨立計算的場景,如按用戶進行會話計算、按時間窗口聚合等。
常見的 Keyed State 類型包括:
- ValueState:存儲與每個鍵相關聯的單個值。
- ListState:存儲與每個鍵相關聯的多個值,通常用來表示一個元素列表。
- MapState:存儲與每個鍵相關聯的鍵值對,適用于需要維護多個關聯數據的場景。
- ReducingState:支持對每個鍵的值進行累加或其他聚合操作。
- AggregatingState:可以根據給定的聚合函數對每個鍵的狀態進行聚合。
2.2 Operator State(操作符狀態)
- Operator State 是由操作符(如 Flink 中的算子)管理的狀態,通常用于保持操作符內部的狀態信息,不與鍵相關聯。它用于管理一些需要跨整個流處理作業的全局狀態,如窗口管理、算子內部緩沖區等。
- Operator State 主要用于在分布式環境中處理 算子 級別的狀態,尤其在對狀態進行恢復時非常重要,幫助 Flink 恢復作業。
常見的 Operator State 類型包括:
- ListState:與鍵無關,存儲多個值。
-
UnionListState :?與鍵無關,存儲多個值,?UnionListState 是 ListState 的擴展,主要用于 跨多個并行實例 共享狀態。在 Flink 的 流式應用程序 中,如果多個并行實例需要訪問和修改共享的狀態,通常使用 UnionListState。
- BroadcastState:存儲和廣播信息。
3. Keyed State
Keyed 狀態可以看作是一個嵌入式的鍵值存儲。該狀態是與由有狀態操作符讀取的流一起嚴格地進行分區和分布的。因此,只有在 Keyed 流 上才能訪問鍵值狀態,也就是說,只有在進行鍵控/分區數據交換后,才能訪問與當前事件的鍵相關聯的值。將流的鍵與狀態對齊確保了所有的狀態更新都是本地操作,從而在沒有事務開銷的情況下保證一致性。這個對齊還使得 Flink 能夠透明地重新分配狀態并調整流的分區。
Keyed 狀態進一步組織為所謂的 Key Groups(鍵組)。Key Groups 是 Flink 重新分配 Keyed 狀態的最小單位;其數量與定義的最大并行度相同。在執行過程中,每個并行實例的鍵控操作符都處理一個或多個 Key Groups 中的鍵。
3.1 使用 Keyed State
keyed state 接口提供不同類型狀態的訪問接口,這些狀態都作用于當前輸入數據的 key 下。換句話說,這些狀態僅可在?KeyedStream
?上使用,在Java/Scala API上可以通過?stream.keyBy(...)
?得到?KeyedStream
,在Python API上可以通過?stream.key_by(...)
?得到?KeyedStream
。
接下來,我們會介紹不同類型的狀態,然后介紹如何使用他們。所有支持的狀態類型如下所示:
-
ValueState<T>
: 保存一個可以更新和檢索的值(如上所述,每個值都對應到當前的輸入數據的 key,因此算子接收到的每個 key 都可能對應一個值)。 這個值可以通過?update(T)
?進行更新,通過?T value()
?進行檢索。 -
ListState<T>
: 保存一個元素的列表。可以往這個列表中追加數據,并在當前的列表上進行檢索。可以通過?add(T)
?或者?addAll(List<T>)
?進行添加元素,通過?Iterable<T> get()
?獲得整個列表。還可以通過?update(List<T>)
?覆蓋當前的列表。 -
ReducingState<T>
: 保存一個單值,表示添加到狀態的所有值的聚合。接口與?ListState
?類似,但使用?add(T)
?增加元素,會使用提供的?ReduceFunction
?進行聚合。 -
AggregatingState<IN, OUT>
: 保留一個單值,表示添加到狀態的所有值的聚合。和?ReducingState
?相反的是, 聚合類型可能與 添加到狀態的元素的類型不同。 接口與?ListState
?類似,但使用?add(IN)
?添加的元素會用指定的?AggregateFunction
?進行聚合。 -
MapState<UK, UV>
: 維護了一個映射列表。 你可以添加鍵值對到狀態中,也可以獲得反映當前所有映射的迭代器。使用?put(UK,UV)
?或者?putAll(Map<UK,UV>)
?添加映射。 使用?get(UK)
?檢索特定 key。 使用?entries()
,keys()
?和?values()
?分別檢索映射、鍵和值的可迭代視圖。你還可以通過?isEmpty()
?來判斷是否包含任何鍵值對。
所有類型的狀態還有一個clear()
?方法,清除當前 key 下的狀態數據,也就是當前輸入元素的 key。
請牢記,這些狀態對象僅用于與狀態交互。狀態本身不一定存儲在內存中,還可能在磁盤或其他位置。 另外需要牢記的是從狀態中獲取的值取決于輸入元素所代表的 key。 因此,在不同 key 上調用同一個接口,可能得到不同的值。
你必須創建一個?StateDescriptor
,才能得到對應的狀態句柄。 這保存了狀態名稱(正如我們稍后將看到的,你可以創建多個狀態,并且它們必須具有唯一的名稱以便可以引用它們), 狀態所持有值的類型,并且可能包含用戶指定的函數,例如ReduceFunction
。 根據不同的狀態類型,可以創建ValueStateDescriptor
,ListStateDescriptor
,?AggregatingStateDescriptor
,?ReducingStateDescriptor
?或?MapStateDescriptor
。
狀態通過?RuntimeContext
?進行訪問,因此只能在?rich functions?中使用。RichFunction
?中?RuntimeContext
?提供如下方法:
ValueState<T> getState(ValueStateDescriptor<T>)
ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
ListState<T> getListState(ListStateDescriptor<T>)
AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)
下面是一個?FlatMapFunction
?的例子,展示了如何將這些部分組合起來:
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {/*** The ValueState handle. The first field is the count, the second field a running sum.*/private transient ValueState<Tuple2<Long, Long>> sum;@Overridepublic void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {// access the state valueTuple2<Long, Long> currentSum = sum.value();// update the countcurrentSum.f0 += 1;// add the second field of the input valuecurrentSum.f1 += input.f1;// update the statesum.update(currentSum);// if the count reaches 2, emit the average and clear the stateif (currentSum.f0 >= 2) {out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));sum.clear();}}@Overridepublic void open(OpenContext ctx) {ValueStateDescriptor<Tuple2<Long, Long>> descriptor =new ValueStateDescriptor<>("average", // the state nameTypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type informationTuple2.of(0L, 0L)); // default value of the state, if nothing was setsum = getRuntimeContext().getState(descriptor);}
}// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L)).keyBy(value -> value.f0).flatMap(new CountWindowAverage()).print();// the printed output will be (1,4) and (1,5)
3.2?狀態有效期 (TTL)
任何類型的 keyed state 都可以有?有效期?(TTL)。如果配置了 TTL 且狀態值已過期,則會盡最大可能清除對應的值,所有狀態類型都支持單元素的 TTL。 這意味著列表元素和映射元素將獨立到期。在使用狀態 TTL 前,需要先構建一個StateTtlConfig
?配置對象。 然后把配置傳遞到 state descriptor 中啟用 TTL 功能:
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import java.time.Duration;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Duration.ofSeconds(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
TTL 配置有以下幾個選項:?newBuilder
?的第一個參數表示數據的有效期,是必選項。TTL 的更新策略(默認是?OnCreateAndWrite
):
StateTtlConfig.UpdateType.OnCreateAndWrite
?- 僅在創建和寫入時更StateTtlConfig.UpdateType.OnReadAndWrite
?- 讀取時也更新?
數據在過期但還未被清理時的可見性配置如下(默認為?NeverReturnExpired
):
StateTtlConfig.StateVisibility.NeverReturnExpired
?- 不返回過期數據StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp
?- 會返回過期但未清理的數據
NeverReturnExpired
?情況下,過期數據就像不存在一樣,不管是否被物理刪除。這對于不能訪問過期數據的場景下非常有用,比如敏感數據。?ReturnExpiredIfNotCleanedUp
?在數據被物理刪除前都會返回。
注意:
- 狀態上次的修改時間會和數據一起保存在 state backend 中,因此開啟該特性會增加狀態數據的存儲。 Heap state backend 會額外存儲一個包括用戶狀態以及時間戳的 Java 對象,RocksDB state backend 會在每個狀態值(list 或者 map 的每個元素)序列化后增加 8 個字節。
- 暫時只支持基于?processing time?的 TTL。
- 嘗試從 checkpoint/savepoint 進行恢復時,TTL 的狀態(是否開啟)必須和之前保持一致,否則會遇到 “StateMigrationException”。
- TTL 的配置并不會保存在 checkpoint/savepoint 中,僅對當前 Job 有效。
- 不建議checkpoint恢復前后將state TTL從短調長,這可能會產生潛在的數據錯誤。
- 當前開啟 TTL 的 map state 僅在用戶值序列化器支持 null 的情況下,才支持用戶值為 null。如果用戶值序列化器不支持 null, 可以用?
NullableSerializer
?包裝一層。 - 啟用 TTL 配置后,
StateDescriptor
?中的?defaultValue
(已被標記?deprecated
)將會失效。這個設計的目的是為了確保語義更加清晰,在此基礎上,用戶需要手動管理那些實際值為 null 或已過期的狀態默認值。
3.2.1 過期數據的清理
默認情況下,過期數據會在讀取的時候被刪除,例如?ValueState#value
,同時會有后臺線程定期清理(如果 StateBackend 支持的話)。可以通過?StateTtlConfig
?配置關閉后臺清理:
import org.apache.flink.api.common.state.StateTtlConfig;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Duration.ofSeconds(1)).disableCleanupInBackground().build();
可以按照如下所示配置更細粒度的后臺清理策略。當前的實現中?HeapStateBackend
?依賴增量數據清理,RocksDBStateBackend
?利用壓縮過濾器進行后臺清理。
3.2.2 全量快照時進行清理?
另外,你可以啟用全量快照時進行清理的策略,這可以減少整個快照的大小。當前實現中不會清理本地的狀態,但從上次快照恢復時,不會恢復那些已經刪除的過期數據。 該策略可以通過?StateTtlConfig
?配置進行配置:
import org.apache.flink.api.common.state.StateTtlConfig;
import java.time.Duration;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Duration.ofSeconds(1)).cleanupFullSnapshot().build();
這種策略在?RocksDBStateBackend
?的增量 checkpoint 模式下無效。
注意:這種清理方式可以在任何時候通過?StateTtlConfig
?啟用或者關閉,比如在從 savepoint 恢復時。
3.2.3 增量數據清理
另外可以選擇增量式清理狀態數據,在狀態訪問或/和處理時進行。如果某個狀態開啟了該清理策略,則會在存儲后端保留一個所有狀態的惰性全局迭代器。 每次觸發增量清理時,從迭代器中選擇已經過期的數進行清理。該特性可以通過?StateTtlConfig
?進行配置:
import org.apache.flink.api.common.state.StateTtlConfig;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Duration.ofSeconds(1)).cleanupIncrementally(10, true).build();
該策略有兩個參數。 第一個是每次清理時檢查狀態的條目數,在每個狀態訪問時觸發。第二個參數表示是否在處理每條記錄時觸發清理。 Heap backend 默認會檢查 5 條狀態,并且關閉在每條記錄時觸發清理。
注意:
- 如果沒有 state 訪問,也沒有處理數據,則不會清理過期數據。
- 增量清理會增加數據處理的耗時。
- 現在僅 Heap state backend 支持增量清除機制。在 RocksDB state backend 上啟用該特性無效。
- 如果 Heap state backend 使用同步快照方式,則會保存一份所有 key 的拷貝,從而防止并發修改問題,因此會增加內存的使用。但異步快照則沒有這個問題。
- 對已有的作業,這個清理方式可以在任何時候通過?
StateTtlConfig
?啟用或禁用該特性,比如從 savepoint 重啟后.
3.2.4 在 RocksDB 壓縮時清理
如果使用 RocksDB state backend,則會啟用 Flink 為 RocksDB 定制的壓縮過濾器。RocksDB 會周期性的對數據進行合并壓縮從而減少存儲空間。 Flink 提供的 RocksDB 壓縮過濾器會在壓縮時過濾掉已經過期的狀態數據。該特性可以通過?StateTtlConfig
?進行配置:
import org.apache.flink.api.common.state.StateTtlConfig;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Duration.ofSeconds(1)).cleanupInRocksdbCompactFilter(1000, Duration.ofHours(1)).build();
Flink 處理一定條數的狀態數據后,會使用當前時間戳來檢測 RocksDB 中的狀態是否已經過期, 你可以通過?
StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries)
?方法指定處理狀態的條數。 時間戳更新的越頻繁,狀態的清理越及時,但由于壓縮會有調用 JNI 的開銷,因此會影響整體的壓縮性能。 RocksDB backend 的默認后臺清理策略會每處理 1000 條數據進行一次。定期壓縮可以加速過期狀態條目的清理,特別是對于很少訪問的狀態條目。 比這個值早的文件將被選取進行壓縮,并重新寫入與之前相同的 Level 中。 該功能可以確保文件定期通過壓縮過濾器壓縮。 您可以通過StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries, Duration periodicCompactionTime)
?方法設定定期壓縮的時間。 定期壓縮的時間的默認值是 30 天。 您可以將其設置為 0 以關閉定期壓縮或設置一個較小的值以加速過期狀態條目的清理,但它將會觸發更多壓縮。還可以通過配置開啟 RocksDB 過濾器的 debug 日志:?log4j.logger.org.rocksdb.FlinkCompactionFilter=DEBUG
注意:
- 壓縮時調用 TTL 過濾器會降低速度。TTL 過濾器需要解析上次訪問的時間戳,并對每個將參與壓縮的狀態進行是否過期檢查。 對于集合型狀態類型(比如 list 和 map),會對集合中每個元素進行檢查。
- 對于元素序列化后長度不固定的列表狀態,TTL 過濾器需要在每次 JNI 調用過程中,額外調用 Flink 的 java 序列化器, 從而確定下一個未過期數據的位置。
- 對已有的作業,這個清理方式可以在任何時候通過?
StateTtlConfig
?啟用或禁用該特性,比如從 savepoint 重啟后。 - 定期壓縮功能只在 TTL 啟用時生效。
4. Operator State
算子狀態(或者非 keyed 狀態)是綁定到一個并行算子實例的狀態。Kafka Connector?是 Flink 中使用算子狀態一個很具有啟發性的例子。Kafka consumer 每個并行實例維護了 topic partitions 和偏移量的 map 作為它的算子狀態。當并行度改變的時候,算子狀態支持將狀態重新分發給各并行算子實例。處理重分發過程有多種不同的方案。在典型的有狀態 Flink 應用中你無需使用算子狀態。它大都作為一種特殊類型的狀態使用。用于實現 source/sink,以及無法對 state 進行分區而沒有主鍵的這類場景中。
4.1?廣播狀態 (Broadcast State)
廣播狀態是一種特殊的算子狀態。引入它的目的在于支持一個流中的元素需要廣播到所有下游任務的使用情形。在這些任務中廣播狀態用于保持所有子任務狀態相同。 該狀態接下來可在第二個處理記錄的數據流中訪問。可以設想包含了一系列用于處理其他流中元素規則的低吞吐量數據流,這個例子自然而然地運用了廣播狀態。 考慮到上述這類使用情形,廣播狀態和其他算子狀態的不同之處在于:
- 它具有 map 格式,
- 它僅在一些特殊的算子中可用。這些算子的輸入為一個廣播數據流和非廣播數據流,
- 這類算子可以擁有不同命名的多個廣播狀態?。
4.2 使用 Operator State
用戶可以通過實現?CheckpointedFunction
?接口來使用 operator stateCheckpointedFunction
?接口提供了訪問 non-keyed state 的方法,需要實現如下兩個方法:
void snapshotState(FunctionSnapshotContext context) throws Exception;void initializeState(FunctionInitializationContext context) throws Exception;
進行 checkpoint 時會調用?snapshotState()
。 用戶自定義函數初始化時會調用?initializeState()
,初始化包括第一次自定義函數初始化和從之前的 checkpoint 恢復。 因此?initializeState()
?不僅是定義不同狀態類型初始化的地方,也需要包括狀態恢復的邏輯。當前 operator state 以 list 的形式存在。這些狀態是一個?可序列化?對象的集合?List
,彼此獨立,方便在改變并發后進行狀態的重新分派。 換句話說,這些對象是重新分配 non-keyed state 的最細粒度。根據狀態的不同訪問方式,有如下幾種重新分配的模式:
-
Even-split redistribution:?每個算子都保存一個列表形式的狀態集合,整個狀態由所有的列表拼接而成。當作業恢復或重新分配的時候,整個狀態會按照算子的并發度進行均勻分配。 比如說,算子 A 的并發讀為 1,包含兩個元素?
element1
?和?element2
,當并發讀增加為 2 時,element1
?會被分到并發 0 上,element2
?則會被分到并發 1 上。 -
Union redistribution:?每個算子保存一個列表形式的狀態集合。整個狀態由所有的列表拼接而成。當作業恢復或重新分配時,每個算子都將獲得所有的狀態數據。如果你的列表可能具有高基數,請不要使用此功能。檢查點元數據將存儲指向每個列表項的偏移量,這可能導致 RPC 幀大小或內存溢出錯誤。
下面的例子中的?SinkFunction
?在?CheckpointedFunction
?中進行數據緩存,然后統一發送到下游,這個例子演示了列表狀態數據的 event-split redistribution。
public class BufferingSinkimplements SinkFunction<Tuple2<String, Integer>>,CheckpointedFunction {private final int threshold;private transient ListState<Tuple2<String, Integer>> checkpointedState;private List<Tuple2<String, Integer>> bufferedElements;public BufferingSink(int threshold) {this.threshold = threshold;this.bufferedElements = new ArrayList<>();}@Overridepublic void invoke(Tuple2<String, Integer> value, Context contex) throws Exception {bufferedElements.add(value);if (bufferedElements.size() >= threshold) {for (Tuple2<String, Integer> element: bufferedElements) {// send it to the sink}bufferedElements.clear();}}@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {checkpointedState.update(bufferedElements);}@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {ListStateDescriptor<Tuple2<String, Integer>> descriptor =new ListStateDescriptor<>("buffered-elements",TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));checkpointedState = context.getOperatorStateStore().getListState(descriptor);if (context.isRestored()) {for (Tuple2<String, Integer> element : checkpointedState.get()) {bufferedElements.add(element);}}}
}
initializeState
?方法接收一個?FunctionInitializationContext
?參數,會用來初始化 non-keyed state 的 “容器”。這些容器是一個?ListState
?用于在 checkpoint 時保存 non-keyed state 對象。注意這些狀態是如何初始化的,和 keyed state 類似,StateDescriptor
?會包括狀態名字、以及狀態類型相關信息。
ListStateDescriptor<Tuple2<String, Integer>> descriptor =new ListStateDescriptor<>("buffered-elements",TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));checkpointedState = context.getOperatorStateStore().getListState(descriptor);
調用不同的獲取狀態對象的接口,會使用不同的狀態分配算法。比如?
getUnionListState(descriptor)
?會使用 union redistribution 算法, 而?getListState(descriptor)
?則簡單的使用 even-split redistribution 算法。
當初始化好狀態對象后,我們通過?isRestored()
?方法判斷是否從之前的故障中恢復回來,如果該方法返回?true
?則表示從故障中進行恢復,會執行接下來的恢復邏輯。
正如代碼所示,BufferingSink
?中初始化時,恢復回來的?ListState
?的所有元素會添加到一個局部變量中,供下次?snapshotState()
?時使用。 然后清空?ListState
,再把當前局部變量中的所有元素寫入到 checkpoint 中。另外,我們同樣可以在?initializeState()
?方法中使用?FunctionInitializationContext
?初始化 keyed state。
4.3 帶狀態的 Source Function?
帶狀態的數據源比其他的算子需要注意更多東西。為了保證更新狀態以及輸出的原子性(用于支持 exactly-once 語義),用戶需要在發送數據前獲取數據源的全局鎖。
public static class CounterSourceextends RichParallelSourceFunction<Long>implements CheckpointedFunction {/** current offset for exactly once semantics */private Long offset = 0L;/** flag for job cancellation */private volatile boolean isRunning = true;/** 存儲 state 的變量. */private ListState<Long> state;@Overridepublic void run(SourceContext<Long> ctx) {final Object lock = ctx.getCheckpointLock();while (isRunning) {// output and state update are atomicsynchronized (lock) {ctx.collect(offset);offset += 1;}}}@Overridepublic void cancel() {isRunning = false;}@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {state = context.getOperatorStateStore().getListState(new ListStateDescriptor<>("state",LongSerializer.INSTANCE));// 從我們已保存的狀態中恢復 offset 到內存中,在進行任務恢復的時候也會調用此初始化狀態的方法for (Long l : state.get()) {offset = l;}}@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {state.update(Collections.singletonList(offset));}
}
希望訂閱 checkpoint 成功消息的算子,可以參考?
org.apache.flink.api.common.state.CheckpointListener
?接口。
5. State Backends
鍵/值索引存儲的具體數據結構取決于所選擇的狀態后端。一種狀態后端將數據存儲在內存中的哈希映射中,另一種狀態后端則使用 RocksDB 作為鍵/值存儲。除了定義存儲狀態的數據結構外,狀態后端還實現了在某個時間點對鍵/值狀態進行快照并將該快照作為檢查點的一部分存儲的邏輯。狀態后端可以在不更改應用程序邏輯的情況下進行配置。
Flink 提供了多種 state backends,它用于指定狀態的存儲方式和位置。狀態可以位于 Java 的堆或堆外內存。取決于你的 state backend,Flink 也可以自己管理應用程序的狀態。 為了讓應用程序可以維護非常大的狀態,Flink 可以自己管理內存(如果有必要可以溢寫到磁盤)。 默認情況下,所有 Flink Job 會使用?Flink 配置文件?中指定的 state backend。但是,配置文件中指定的默認 state backend 會被 Job 中指定的 state backend 覆蓋,如下所示。
Configuration config = new Configuration();
config.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
env.configure(config);