為什么需要?StateBackend
?—— 職責分離原則
我們可以用一個銀行的例子來類比:
State
?(如?ValueState
,?ListState
) 就像是你的銀行卡。AbstractKeyedStateBackend
?就像是銀行的整個后臺系統(包括總服務器、數據庫、風控系統、會計系統等)。
你不能直接用一張塑料卡片去操作你的錢,你需要把卡片插入 ATM 機或交給柜員,由他們背后的銀行系統來完成真正的存取款、轉賬等操作。
AbstractKeyedStateBackend
?的存在正是為了實現這種職責分離:
State
?接口的職責(銀行卡):
- 定義用戶交互的契約:提供一組簡單、清晰的 API 給用戶使用,比如?
value()
,?update()
,?add()
,?clear()
。它只關心“做什么”,不關心“怎么做”。
AbstractKeyedStateBackend
?的職責(銀行系統):
它是一個龐大而復雜的“狀態引擎”,負責所有底層的、與具體實現相關的臟活累活。
- 生命周期管理:負責所有狀態的創建、初始化和銷毀 (
dispose
)。 - 持久化與容錯(核心):實現快照 (
snapshot
) 和恢復邏輯。這是 Flink 實現 Exactly-Once 的基石。單個?State
?對象自身無法完成復雜的分布式快照。 - 物理存儲交互:它才是真正與 RocksDB、堆內存(Heap)等物理存儲打交道的組件。它管理著數據庫連接、Column Family、讀寫選項等。
- Key/Namespace 管理:管理?
keySerializer
,計算當前 key 屬于哪個 Key Group (KeyGroupRangeAssignment
),處理不同?namespace
?下的狀態隔離。 - 中央緩存與優化:如您所見,它內部有?
lastName
?和?lastState
?這樣的緩存機制,用于優化對同一狀態的連續訪問。 - 應用橫切關注點(AOP):它是一個中心化的工廠,可以在創建?
State
?時,統一應用 TTL、Metrics 監控等功能。
看?getOrCreateKeyedState
?這段代碼,它完美地展示了?StateBackend
?作為“工廠”和“管理者”的角色:
// ... existing code ...@Override@SuppressWarnings("unchecked")public <N, S extends State, V> S getOrCreateKeyedState(final TypeSerializer<N> namespaceSerializer, StateDescriptor<S, V> stateDescriptor)throws Exception {
// ... existing code ...InternalKvState<K, ?, ?> kvState = keyValueStatesByName.get(stateDescriptor.getName());if (kvState == null) {if (!stateDescriptor.isSerializerInitialized()) {stateDescriptor.initializeSerializerUnlessSet(executionConfig);}// 這里是關鍵:一個裝飾器鏈條// Backend作為工廠,負責創建原始State,并用TTL、Metrics等功能進行包裝kvState =MetricsTrackingStateFactory.createStateAndWrapWithMetricsTrackingIfEnabled(TtlStateFactory.createStateAndWrapWithTtlIfEnabled(namespaceSerializer, stateDescriptor, this, ttlTimeProvider),this,stateDescriptor,latencyTrackingStateConfig,sizeTrackingStateConfig);keyValueStatesByName.put(stateDescriptor.getName(), kvState);publishQueryableStateIfEnabled(stateDescriptor, kvState);}return (S) kvState;}
// ... existing code ...
結論:如果直接使用?State
,就意味著每一個?State
?對象都需要自己實現一套完整的快照、恢復、緩存、物理存儲交互邏輯。這將導致代碼極度冗余、混亂且難以維護。AbstractKeyedStateBackend
?將這些公共的、復雜的底層邏輯全部收斂,使得?State
?對象可以保持為一個輕量級的、只關注業務邏輯的句柄。
State
?反過來引用?Backend
,這并非傳統意義上需要避免的耦合,而是一種委托(Delegation)。我們來梳理一下這個流程:
- 創建:
Backend
?創建了一個具體的?State
?實現類(比如?RocksDBValueState
)。 - 持有引用:在創建?
RocksDBValueState
?時,Backend
?會把自身的引用 (this
)?傳遞給?RocksDBValueState
?的構造函數。因此,這個?State
?實例從誕生起就知道“是誰創造了我”、“我應該向誰匯報”。
比如update
public void update(V value) throws IOException {if (value == null) {clear();return;}try {backend.db.put(columnFamily,writeOptions,serializeCurrentKeyWithGroupAndNamespace(),serializeValue(value));} catch (RocksDBException e) {throw new IOException("Error while adding data to RocksDB", e);}}
雖然直接調用了 backend.db.put(...),但我們仔細分析一下它所需要的所有參數,就會發現委托模式的本質依然存在:
- columnFamily: 這個 ColumnFamilyHandle 是從哪里來的?它是在 RocksDBValueState 被創建時,由 backend 傳入的。State 自己不管理 Column Family 的生命周期。
- writeOptions: 這個 WriteOptions 對象同樣是 backend 的成員變量,由 backend 統一配置和管理。
- serializeCurrentKeyWithGroupAndNamespace(): 這是最關鍵的一步。這個方法內部需要:
- backend.getCurrentKey(): 獲取當前正在處理的 Key。
- backend.getCurrentKeyGroupIndex(): 計算 Key Group。
- getNamespaceSerializer(): 獲取 Namespace 序列化器。
- backend.getKeySerializer(): 獲取 Key 序列化器。 這些核心的上下文信息和組件(序列化器),全部是由 backend 提供的。State 對象本身是無狀態的(stateless in terms of context),它不知道當前在為哪個 key 工作,必須向 backend 查詢。
- serializeValue(value): 這個方法內部需要 getValueSerializer(),而這個序列化器也是在創建時由 backend 提供的。
所以,即使 State 執行了最后那一下 put 操作,它也像一個“一線工人”,雖然親手把螺絲擰上去了,但這個螺絲(value)、螺絲刀(writeOptions)、圖紙(columnFamily)以及擰在哪個位置(key 和 namespace),全部是由 backend 這個“車間主任”提供的。
這是一種更細粒度的委托:State 被委托了“如何將序列化好的 key 和 value 放入指定的 Column Family”這個具體的執行邏輯,但它依然將“獲取所有執行前提條件(上下文、資源、配置)”這項更重要的職責委托給了 backend。
Subtask、RocksDB 實例、窗口和 Namespace 的關系
Operator 的一個 Subtask 實例 對應一個獨立的 RocksDB 實例。
讓我們把這個關系鏈梳理清楚:
- 一個 Flink Job:可以包含多個 Operator(
map
,?filter
,?keyBy
?等)。 - 一個 Operator:可以有多個并行的 Subtask 實例(并行度決定)。
- 一個 Subtask 實例:運行在一個 TaskManager 的一個 Slot 中。
- 一個?
RocksDBKeyedStateBackend
?實例:每個有狀態的 Subtask 實例都會創建一個自己的?RocksDBKeyedStateBackend
?對象。 - 一個 RocksDB 數據庫實例:每個?
RocksDBKeyedStateBackend
?都會在 TaskManager 的本地磁盤上創建一個獨立的 RocksDB 數據庫目錄和實例(db
?對象)。
所以,如果你的一個?window
?操作的并行度是 10,那么就會有 10 個 Subtask,對應 10 個?RocksDBKeyedStateBackend
?實例,進而在不同的 TaskManager 上創建 10 個獨立的 RocksDB 數據庫。它們之間物理隔離,互不干擾。
那么窗口和 Namespace 是什么關系?
在一個 Subtask 內部(也就是在一個 RocksDB 實例內部),Namespace 是用來在邏輯上區分不同窗口的狀態的。
mergeNamespaces
?就是最好的例子。當會話窗口需要合并時:
source
?namespaces 就是舊的、待合并的窗口的標識符。target
?namespace 就是合并后的新窗口的標識符。
這些?namespace
?和用戶的?key
?組合在一起,構成了 RocksDB 中真正的 key。
總結:
- 物理隔離:不同的 Subtask 通過擁有各自獨立的 RocksDB 實例來實現物理隔離。
- 邏輯隔離:在同一個 Subtask(同一個 RocksDB 實例)內部,不同的窗口(或其它需要隔離的場景,如?
ProcessFunction
?中的不同 Timer)通過?namespace
?來實現邏輯隔離。
所有 State 如何共享 DB 并互相區分?—— Column Family
在一個?RocksDBKeyedStateBackend
?內部,所有不同名稱的?State
(比如你在一個?ProcessFunction
?中定義了?ValueState<Integer>
、ListState<String>
?和?MapState<Long, Double>
)是共享同一個?RocksDB
?實例的。
那它們的數據是如何區分,不會混在一起的呢?答案是:列族(Column Family)。
Column Family 是 RocksDB 中用于隔離數據的邏輯命名空間,可以把它想象成關系型數據庫中的一張張獨立的表。
我們來看?RocksDBKeyedStateBackend.java
?中的關鍵實現:
當一個?State
?首次被創建時,RocksDBKeyedStateBackend
?會為它做兩件事:
- 創建一個新的 Column Family:每個?
StateDescriptor
?的唯一名稱(stateDesc.getName()
)會被用來命名一個新的 Column Family。 - 注冊元信息:將這個 State 的名稱、序列化器信息以及它對應的?
ColumnFamilyHandle
?存儲在一個?Map
?中,也就是?kvStateInformation
。
// ... existing code .../*** Information about the k/v states, maintained in the order as we create them. This is used to* retrieve the column family that is used for a state and also for sanity checks when* restoring.*/private final LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation;
// ... existing code ...
當具體的?State
?對象(如?RocksDBValueState
)執行讀寫操作時,它會從?backend
?獲取自己專屬的?ColumnFamilyHandle
,并將其作為參數傳遞給?db.get()
、db.put()
?或?db.merge()
?等方法。
// RocksDBValueState.java 中的 value() 方法
byte[] valueBytes = backend.db.get(columnFamily, serializeCurrentKeyWithGroupAndNamespace());
這樣一來,雖然所有的?State
?都在同一個?db
?對象上操作,但由于它們使用了不同的?columnFamily
,數據就被天然地隔離在了不同的“表”里,絕不會互相干擾。
這種設計的優勢是什么?
- 資源共享:所有 Column Family 共享同一個 MemTable、Write-Ahead-Log (WAL)、Block Cache 等核心 RocksDB 資源。這大大減少了內存開銷和管理成本,而不是為每個 State 都啟動一個完整的 DB 實例。
- 原子寫入:可以通過?
WriteBatch
?實現跨多個 Column Family 的原子寫入,這對于保證 Flink 復雜操作的原子性至關重要。 - 統一快照:可以對整個 RocksDB 實例(包含所有 Column Family)進行一次統一的、物理一致性的快照,極大地簡化了 Checkpoint 的實現。
ColumnFamilyDescriptor
ColumnFamilyDescriptor
?是 RocksDB Java API 的一部分,它本質上是一個列族(Column Family)的描述符,包含了創建列族所需的名稱和配置選項?(ColumnFamilyOptions
)。
在 Flink 中,ColumnFamilyDescriptor
?的構建主要通過?RocksDBOperationUtils.createColumnFamilyDescriptor
?這個靜態方法來完成。
我們來看一下這個方法的實現:
RocksDBOperationUtils.java
// ... existing code ...public static ColumnFamilyDescriptor createColumnFamilyDescriptor(RegisteredStateMetaInfoBase metaInfoBase,Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,@Nullable RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,@Nullable Long writeBufferManagerCapacity) {byte[] nameBytes = metaInfoBase.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);Preconditions.checkState(!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, nameBytes),"The chosen state name 'default' collides with the name of the default column family!");ColumnFamilyOptions options =createColumnFamilyOptions(columnFamilyOptionsFactory, metaInfoBase.getName());if (ttlCompactFiltersManager != null) {ttlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(metaInfoBase, options);}if (writeBufferManagerCapacity != null) {// It'd be great to perform the check earlier, e.g. when creating write buffer manager.// Unfortunately the check needs write buffer size that was just calculated.sanityCheckArenaBlockSize(options.writeBufferSize(),options.arenaBlockSize(),writeBufferManagerCapacity);}return new ColumnFamilyDescriptor(nameBytes, options);}
// ... existing code ...
從代碼中我們可以清晰地看到構建?ColumnFamilyDescriptor
?的步驟:
獲取列族名稱:
- 從傳入的?
RegisteredStateMetaInfoBase
?對象中獲取 State 的名稱 (metaInfoBase.getName()
)。 - 將這個名稱轉換為字節數組 (
byte[] nameBytes
)。這是因為 RocksDB 的原生 API 使用字節數組來標識列族。 - 這里有一個檢查,確保 State 的名稱不是 "default",以避免與 RocksDB 的默認列族沖突。
- 從傳入的?
創建列族配置 (
ColumnFamilyOptions
):- 調用?
createColumnFamilyOptions
?方法,這個方法會使用?columnFamilyOptionsFactory
?來生成一個?ColumnFamilyOptions
?實例。 - 這個?
columnFamilyOptionsFactory
?正是我們在?EmbeddedRocksDBStateBackend
?中看到的那個函數:stateName -> resourceContainer.getColumnOptions()
。它為每個 State 提供了基礎的列族配置。
- 調用?
(可選)配置 TTL 壓縮過濾器:
- 如果?
ttlCompactFiltersManager
?不為 null,會檢查當前 State 是否配置了 TTL(Time-to-Live,生存時間)。 - 如果配置了 TTL,它會為這個列族的?
ColumnFamilyOptions
?設置一個特定的壓縮過濾器(Compaction Filter),這個過濾器會在 RocksDB 的后臺壓縮過程中自動清理過期的數據。
- 如果?
(可選)內存檢查:
- 如果傳入了?
writeBufferManagerCapacity
,會進行一個健全性檢查,確保?arenaBlockSize
?的配置是合理的。
- 如果傳入了?
實例化?
ColumnFamilyDescriptor
:- 最后,使用前面準備好的列族名稱字節數組和配置好的?
ColumnFamilyOptions
?對象,通過?new ColumnFamilyDescriptor(nameBytes, options)
?來創建一個新的?ColumnFamilyDescriptor
?實例并返回。
- 最后,使用前面準備好的列族名稱字節數組和配置好的?
綜上所述,一個?ColumnFamilyDescriptor
?對象主要包含以下兩個核心信息:
列族名稱 (Column Family Name):
- 以?
byte[]
?數組的形式存儲。 - 這個名稱直接來源于 Flink State 的?
StateDescriptor
?中定義的名字。例如,new ValueStateDescriptor<>("my-state", String.class)
?中的?"my-state"
。
- 以?
列族選項 (Column Family Options):
- 一個?
org.rocksdb.ColumnFamilyOptions
?對象。 - 這個對象包含了該列族所有詳細的配置參數,例如:
- Merge Operator: 用于處理 ListState、AggregatingState 等需要合并操作的狀態。
- Write Buffer Size: 寫緩沖區大小。
- Compression Type: 壓縮算法(如 Snappy, LZ4)。
- Compaction Filter: 壓縮過濾器,如用于實現 TTL 的過濾器。
- 以及其他大量控制 RocksDB 行為的底層參數。
- 一個?
這個?ColumnFamilyDescriptor
?對象隨后會被傳遞給?db.createColumnFamily()
?方法,RocksDB 會根據其中的名稱和配置信息,在數據庫實例中創建一個新的、隔離的列族。
Namespace 可以自己隨意指定類型和值嗎?
答案是:是的,幾乎可以。
任意的類型 (Type): 在 Flink 的狀態接口定義中,命名空間(Namespace)是一個泛型參數?
N
,例如?InternalKvState<K, N, V>
。這意味著理論上你可以使用任何 Java/Scala 類型作為?Namespace
?的類型,比如?String
,?Long
,?Integer
,甚至是自定義的 POJO 對象。唯一的硬性要求是:Flink 必須知道如何序列化和反序列化你的 Namespace 類型。你需要為該類型提供一個?
TypeSerializer<N>
。對于?Long
,?String
?等基礎類型,Flink 會自動推斷并使用內置的序列化器。對于自定義的 POJO,你需要確保它符合 Flink 的 POJO 規范,或者手動創建一個?TypeSerializer
。任意的值 (Value): 一旦確定了?
Namespace
?的類型,你就可以在代碼中通過調用?state.setCurrentNamespace(namespace)
?來傳入該類型的任意實例作為當前操作的命名空間。這正是你之前實現的?
NamespacedStateListView
?的核心思想:將用戶的?UID
(無論是?Long
?還是?String
?類型)作為?namespace
?的值,從而為每個?UID
?創建了一個邏輯上獨立的?ListState
。AbstractRocksDBState.java
?文件中的?setCurrentNamespace
?方法就證明了這一點:// ... existing code ... /** The current namespace, which the next value methods will refer to. */ private N currentNamespace; // ... existing code ... @Override public void setCurrentNamespace(N namespace) {this.currentNamespace = namespace; } // ... existing code ...
這個方法簡單地將傳入的?
namespace
?對象賦值給內部的?currentNamespace
?字段,后續的狀態操作(如?get
,?add
,?clear
)都會使用這個字段來構建最終存儲到 RocksDB 的 key。
和 KeyedStream,Window 的關系
這里需要分開來看:
與?KeyedStream
?的關系:強依賴關系
- 必須在?
KeyedStream
?上使用:所有帶?Namespace
?的狀態(InternalKvState
)都屬于Keyed State。你必須先通過?dataStream.keyBy(...)
?將數據流轉換成?KeyedStream
,然后才能在下游的算子(如?ProcessFunction
)中使用這些狀態。 - 原因:Flink 的狀態是根據?
keyBy
?指定的 Key (K
) 來進行分區和管理的。Namespace
?(N
) 只是在某個特定 Key (K
) 的狀態內部做的進一步劃分。可以理解為一種二級索引或子分區。沒有?keyBy
?提供的一級分區,Namespace
?就無從談起。
與?Window
?的關系:沒有必然關系,窗口是 Namespace 的一種應用場景
窗口是 Namespace 的使用者,而非前提:可以把窗口(Window)看作是 Flink 框架自身對?
Namespace
?機制的一種自動化應用。當你使用窗口操作時(例如?.window(TumblingEventTimeWindows.of(...))
),Flink 會自動地:- 為每一個窗口實例(比如?
[00:00:05, 00:00:10)
?這個時間窗口)創建一個?TimeWindow
?對象。 - 在處理屬于該窗口的數據時,自動調用?
setCurrentNamespace()
,并將這個?TimeWindow
?對象作為?namespace
?傳入。 這樣,窗口內的所有狀態計算就被天然地隔離在了這個?TimeWindow
?命名空間下。
- 為每一個窗口實例(比如?
可以完全脫離窗口使用 Namespace:在一個普通的?
KeyedProcessFunction
?中,完全沒有使用任何窗口操作,而是通過手動調用?setCurrentNamespace(uid)
?來實現了自定義的狀態劃分。這賦予了超越窗口框架的、更細粒度的狀態管理能力。
概念 | 與 Namespace 的關系 | 解釋 |
---|---|---|
??KeyedStream?? | ??強依賴?? | 必須先? |
??Window?? | ??無直接依賴?? | 窗口是? |
所以,可以自定義?Namespace
?的類型和值,并且這個機制可以獨立于 Flink 的窗口(Window)功能來使用,只要你的操作是建立在?KeyedStream
?之上即可。
window設置NameSpace
負責調用?setCurrentNamespace
?的是窗口算子(WindowOperator
)。WindowOperator
?的工作流程是:
- 接收到一條數據。
- 調用?
WindowAssigner
?的?assignWindows
?方法,獲取這條數據所屬的窗口列表。 - 遍歷這個窗口列表。
- 對于列表中的每一個窗口,先調用?
state.setCurrentNamespace(window)
?將當前狀態的上下文切換到這個窗口。 - 然后,再對該窗口的狀態進行更新(比如累加、添加元素等)。
我們可以從工程代碼中找到清晰的證據:
在?WindowOperator.java
?中,你可以看到這個完整的邏輯:
// ... existing code ...public void processElement(StreamRecord<RowData> record) throws Exception {
// ... existing code ...timestamp = TimeWindowUtil.toUtcTimestampMills(timestamp, shiftTimeZone);// 1. 調用 assigner 獲取窗口列表// the windows which the input row should be placed intoCollection<W> affectedWindows = windowFunction.assignStateNamespace(inputRow, timestamp);boolean isElementDropped = true;// 2. 遍歷窗口列表for (W window : affectedWindows) {isElementDropped = false;// 3. 為每個窗口設置 NamespacewindowState.setCurrentNamespace(window);// 4. 更新狀態RowData acc = windowState.value();if (acc == null) {acc = windowAggregator.createAccumulators();}windowAggregator.setAccumulators(window, acc);if (RowDataUtil.isAccumulateMsg(inputRow)) {windowAggregator.accumulate(inputRow);} else {windowAggregator.retract(inputRow);}acc = windowAggregator.getAccumulators();windowState.update(acc);}
// ... existing code ...
另一個例子在處理 Python UDAF 的算子中也可以看到同樣的設計模式:
StreamArrowPythonGroupWindowAggregateFunctionOperator.java
// ... existing code ...public void bufferInput(RowData input) throws Exception {
// ... existing code ...// 1. 調用 assigner 獲取窗口列表// Given the timestamp and element, returns the set of windows into which it// should be placed.elementWindows = windowAssigner.assignWindows(input, timestamp);// 2. 遍歷窗口列表for (W window : elementWindows) {if (RowDataUtil.isAccumulateMsg(input)) {// 3. 為每個窗口設置 NamespacewindowAccumulateData.setCurrentNamespace(window);// 4. 更新狀態windowAccumulateData.add(input);} else {windowRetractData.setCurrentNamespace(window);windowRetractData.add(input);}}}
// ... existing code ...