StreamOperatorStateHandler
在StreamTask啟動初始化時通過StreamTaskStateInitializerImpl::streamOperatorStateContext會為每個StreamOperator 創建keyedStatedBackend和operatorStateBackend,在AbstractStreamOperator中有個StreamOperatorStateHandler成員變量,調用AbstractStreamOperator::initializeState方法中會初始化StreamOperatorStateHandler類型的成員變量, StreamOperatorStateHandler對象變量封裝了keyedStatedBackend和operatorStateBackend,用于統一管理SteamOperator的狀態。
OperatorChain::initializeStateAndOpenOperators //調用每個Operator的initializeState和Open方法AbstractStreamOperator::initializeState(StreamTaskStateInitializer) StreamTaskStateInitializerImpl::streamOperatorStateContext //此時會創建keyedStatedBackend和operatorStateBackendStreamOperatorStateHandler::new //初始化StreamOperator的stateHandler成員變量,用于狀態管理StreamOperatorStateHandler::initializeOperatorStateStateInitializationContextImpl::new //封裝DefaultKeyedStateStore和OperatorStateStoreCheckpointedStreamOperator::initializeState(StateInitializationContext)//調用用戶定義函數中的initializeState方法,可獲取Operator StateStreamingRuntimeContext::setKeyedStateStore
Flink中主要有兩種StateBackend:
- HashMapStateBackend //內存
- EmbeddedRocksDBStateBackend //內存+磁盤
每個StreamTask一個StateBackend成員變量,在構造函數中進行初始化,通過用戶代碼中設置或StateBackendLoader::loadStateBackendFromConfig從配置中加載,默認為HashMapStateBackend。簡單起見,以HashMapStateBackend為例剖析創建KeyedStatedBackend和OperatorStateBackend以及處理數據流時是如何使用KeyedState和OperatorState的。
OperatorState
OperatorState創建流程:
OperatorChain::initializeStateAndOpenOperators //調用每個Operator的initializeState和Open方法AbstractStreamOperator::initializeStateStreamTaskStateInitializerImpl::streamOperatorStateContextStreamTaskStateInitializerImpl::operatorStateBackendHashMapStateBackend::createOperatorStateBackend //創建DefaultOperatorStateBackendStreamOperatorStateHandler::new //創建StreamOperatorStateHandlerStreamOperatorStateHandler::initializeOperatorState //調用CheckpointedFunction::initializeStateStateInitializationContextImpl::new //該實例可getOperatorStateStore
使用Operator State的用戶業務代碼需要實現CheckpointedFunction接口,該接口中有以兩個下方法:
void initializeState(FunctionInitializationContext context) throws Exception;void snapshotState(FunctionSnapshotContext context) throws Exception;
其中initializeState方法則會被StreamOperatorStateHandler.initializeOperatorState 調用,在initializeState方法中可使用
FunctionInitializationContext.getOperatorStateStore().getListState(ListStateDescriptor)
DefaultOperatorStateBackend::getListState::newPartitionableListState::new //內部是ArrayList
因此通過OperatorStateStore獲取的ListState內部本質上是一個ArrayList, 業務代碼中可以調用add方法向這個內部List添加元素,由StateBackend管理每個Operator State,這樣就實現了一個分布式狀態管理,借助Checkpoint可以實現狀態持久化及容災恢復。
OperatorStateStore有三個獲取狀態方法:
<S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception;
<S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception;
<K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)throws Exception
KeyedState
KeyedState創建流程如下:
OperatorChain::initializeStateAndOpenOperators //調用每個Operator的initializeState和Open方法AbstractStreamOperator::initializeStateStreamTaskStateInitializerImpl::streamOperatorStateContextStreamTaskStateInitializerImpl::keyedStatedBackendHashMapStateBackend::createKeyedStateBackend //創建HeapKeyedStateBackendHeapKeyedStateBackendBuilder::buildInternalKeyContextImpl::new //用于保存當前正在處理的keyStreamOperatorStateHandler::new //創建StreamOperatorStateHandlerDefaultKeyedStateStore::new //創建DefaultKeyedStateStoreStreamingRuntimeContext::setKeyedStateStore //設置keyedStateStore成員變量AbstractStreamUdfOperator::openFunctionUtils::openFunctionRichFunction::open
KeyedStateStore保存在StreamingRuntimeContext中,使用KeyedState時,用戶自定義函數實現RichFunction接口,在open方法中調用getRuntimeContext().getState方法獲取狀態:
getRuntimeContext().getState() //獲取ValueState
DefaultKeyedStateStore::getState
DefaultKeyedStateStore::getPartitionedState
HeapKeyedStateBackend::getPartitionedState
AbstractKeyedStateBackend::getOrCreateKeyedStateLatencyTrackingStateFactory::createStateAndWrapWithLatencyTrackingIfEnabledTtlStateFactory::createStateAndWrapWithTtlIfEnabled //包裝TTLHeapKeyedStateBackend::createInternalStateHeapKeyedStateBackend::tryRegisterStateTable //這里很關鍵,對每個State創建一個StateTableCopyOnWriteStateTable::new//異步快照,這里傳遞了當前KeyedStateBackend的InternalKeyContextStateTable::new //根據當前Task管理的KeyGroups數量創建StateMap數組CopyOnWriteStateTable::createStateMap //一個KeyGroup一個StateMapCopyOnWriteStateMap::new //存儲key及其對應的狀態HeapValueState::createHeapValueState::new //有個成員變量指向存儲當前state的CopyOnWriteStateMapHeapValueState::setCurrentNamespace //默認為VoidNamespace
KeyedState有以下幾種類型
ValueState<T> getState(ValueStateDescriptor<T> stateProperties) 獲取HeapValueStateListState<T> getListState(ListStateDescriptor<T> stateProperties)獲取HeapListStateMapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties)獲取HeapMapStategetAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties)獲取HeapAggregatingStategetReducingState(ReducingStateDescriptor<T> stateProperties)獲取HeapReducingState
RocksDBStateBackend
EmbeddedRocksDBStateBackend 管理OperatorState與HashMapStateBackend 一樣,也是通過DefaultOperatorStateBackend進行管理的。
EmbeddedRocksDBStateBackend 管理KeyedState則是使用RocksDBKeyedStateBackend實現,這樣可以借助磁盤加內存進行大狀態管理:
RocksDBValueState
RocksDBListState
RocksDBMapState
RocksDBAggregatingState
RocksDBReducingState
總結
Flink內置狀態管理是相比其他分布式流式處理系統最大的優勢之一,不用借助外部存儲組件,就可實現高效可靠的分布式狀態管理,極大降低了學習和使用成本。