Flink源碼之State創建流程

StreamOperatorStateHandler

在StreamTask啟動初始化時通過StreamTaskStateInitializerImpl::streamOperatorStateContext會為每個StreamOperator 創建keyedStatedBackend和operatorStateBackend,在AbstractStreamOperator中有個StreamOperatorStateHandler成員變量,調用AbstractStreamOperator::initializeState方法中會初始化StreamOperatorStateHandler類型的成員變量, StreamOperatorStateHandler對象變量封裝了keyedStatedBackend和operatorStateBackend,用于統一管理SteamOperator的狀態。

 OperatorChain::initializeStateAndOpenOperators //調用每個Operator的initializeState和Open方法AbstractStreamOperator::initializeState(StreamTaskStateInitializer) StreamTaskStateInitializerImpl::streamOperatorStateContext //此時會創建keyedStatedBackend和operatorStateBackendStreamOperatorStateHandler::new //初始化StreamOperator的stateHandler成員變量,用于狀態管理StreamOperatorStateHandler::initializeOperatorStateStateInitializationContextImpl::new //封裝DefaultKeyedStateStore和OperatorStateStoreCheckpointedStreamOperator::initializeState(StateInitializationContext)//調用用戶定義函數中的initializeState方法,可獲取Operator StateStreamingRuntimeContext::setKeyedStateStore

Flink中主要有兩種StateBackend:

  • HashMapStateBackend //內存
  • EmbeddedRocksDBStateBackend //內存+磁盤

每個StreamTask一個StateBackend成員變量,在構造函數中進行初始化,通過用戶代碼中設置或StateBackendLoader::loadStateBackendFromConfig從配置中加載,默認為HashMapStateBackend。簡單起見,以HashMapStateBackend為例剖析創建KeyedStatedBackend和OperatorStateBackend以及處理數據流時是如何使用KeyedState和OperatorState的。

OperatorState

OperatorState創建流程:

OperatorChain::initializeStateAndOpenOperators //調用每個Operator的initializeState和Open方法AbstractStreamOperator::initializeStateStreamTaskStateInitializerImpl::streamOperatorStateContextStreamTaskStateInitializerImpl::operatorStateBackendHashMapStateBackend::createOperatorStateBackend //創建DefaultOperatorStateBackendStreamOperatorStateHandler::new //創建StreamOperatorStateHandlerStreamOperatorStateHandler::initializeOperatorState //調用CheckpointedFunction::initializeStateStateInitializationContextImpl::new //該實例可getOperatorStateStore

使用Operator State的用戶業務代碼需要實現CheckpointedFunction接口,該接口中有以兩個下方法:

void initializeState(FunctionInitializationContext context) throws Exception;void snapshotState(FunctionSnapshotContext context) throws Exception;

其中initializeState方法則會被StreamOperatorStateHandler.initializeOperatorState 調用,在initializeState方法中可使用

FunctionInitializationContext.getOperatorStateStore().getListState(ListStateDescriptor)
DefaultOperatorStateBackend::getListState::newPartitionableListState::new  //內部是ArrayList

因此通過OperatorStateStore獲取的ListState內部本質上是一個ArrayList, 業務代碼中可以調用add方法向這個內部List添加元素,由StateBackend管理每個Operator State,這樣就實現了一個分布式狀態管理,借助Checkpoint可以實現狀態持久化及容災恢復。

OperatorStateStore有三個獲取狀態方法:

<S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception;
<S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception;
<K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)throws Exception

KeyedState

KeyedState創建流程如下:

OperatorChain::initializeStateAndOpenOperators //調用每個Operator的initializeState和Open方法AbstractStreamOperator::initializeStateStreamTaskStateInitializerImpl::streamOperatorStateContextStreamTaskStateInitializerImpl::keyedStatedBackendHashMapStateBackend::createKeyedStateBackend //創建HeapKeyedStateBackendHeapKeyedStateBackendBuilder::buildInternalKeyContextImpl::new //用于保存當前正在處理的keyStreamOperatorStateHandler::new //創建StreamOperatorStateHandlerDefaultKeyedStateStore::new //創建DefaultKeyedStateStoreStreamingRuntimeContext::setKeyedStateStore //設置keyedStateStore成員變量AbstractStreamUdfOperator::openFunctionUtils::openFunctionRichFunction::open

KeyedStateStore保存在StreamingRuntimeContext中,使用KeyedState時,用戶自定義函數實現RichFunction接口,在open方法中調用getRuntimeContext().getState方法獲取狀態:

getRuntimeContext().getState() //獲取ValueState
DefaultKeyedStateStore::getState
DefaultKeyedStateStore::getPartitionedState
HeapKeyedStateBackend::getPartitionedState
AbstractKeyedStateBackend::getOrCreateKeyedStateLatencyTrackingStateFactory::createStateAndWrapWithLatencyTrackingIfEnabledTtlStateFactory::createStateAndWrapWithTtlIfEnabled //包裝TTLHeapKeyedStateBackend::createInternalStateHeapKeyedStateBackend::tryRegisterStateTable //這里很關鍵,對每個State創建一個StateTableCopyOnWriteStateTable::new//異步快照,這里傳遞了當前KeyedStateBackend的InternalKeyContextStateTable::new //根據當前Task管理的KeyGroups數量創建StateMap數組CopyOnWriteStateTable::createStateMap //一個KeyGroup一個StateMapCopyOnWriteStateMap::new //存儲key及其對應的狀態HeapValueState::createHeapValueState::new //有個成員變量指向存儲當前state的CopyOnWriteStateMapHeapValueState::setCurrentNamespace  //默認為VoidNamespace

KeyedState有以下幾種類型

ValueState<T> getState(ValueStateDescriptor<T> stateProperties) 獲取HeapValueStateListState<T> getListState(ListStateDescriptor<T> stateProperties)獲取HeapListStateMapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties)獲取HeapMapStategetAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties)獲取HeapAggregatingStategetReducingState(ReducingStateDescriptor<T> stateProperties)獲取HeapReducingState

RocksDBStateBackend

EmbeddedRocksDBStateBackend 管理OperatorState與HashMapStateBackend 一樣,也是通過DefaultOperatorStateBackend進行管理的。

EmbeddedRocksDBStateBackend 管理KeyedState則是使用RocksDBKeyedStateBackend實現,這樣可以借助磁盤加內存進行大狀態管理:

RocksDBValueState
RocksDBListState
RocksDBMapState
RocksDBAggregatingState
RocksDBReducingState

總結

Flink內置狀態管理是相比其他分布式流式處理系統最大的優勢之一,不用借助外部存儲組件,就可實現高效可靠的分布式狀態管理,極大降低了學習和使用成本。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/39695.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/39695.shtml
英文地址,請注明出處:http://en.pswp.cn/news/39695.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Web framework-Gin

一、Gin Go Web--Go Module 軟件框架&#xff08;software framework&#xff09;&#xff0c;通常指的是為了實現某個業界標準或完成特定基本任務的軟件組件規范&#xff0c;也指為了實現某個軟件組件規范時&#xff0c;提供規范所要求之基礎功能的軟件產品。 框架就是&#…

機器學習|Softmax 回歸的數學理解及代碼解析

機器學習&#xff5c;Softmax 回歸的數學理解及代碼解析 Softmax 回歸是一種常用的多類別分類算法&#xff0c;適用于將輸入向量映射到多個類別的概率分布。在本文中&#xff0c;我們將深入探討 Softmax 回歸的數學原理&#xff0c;并提供 Python 示例代碼幫助讀者更好地理解和…

HarmonyOS NEXT新能力,一站式高效開發HarmonyOS應用

2023年8月6日華為開發者大會2023&#xff08;HDC.Together&#xff09;圓滿收官&#xff0c;伴隨著HarmonyOS 4的發布&#xff0c;華為向開發者發布了匯聚所有最新開發能力的HarmonyOS NEXT開發者預覽版&#xff0c;并分享了圍繞“一次開發&#xff0c;多端部署” “可分可合&a…

代碼隨想錄算法訓練營第60天|動態規劃part17| 647. 回文子串、516.最長回文子序列、動態規劃總結篇

代碼隨想錄算法訓練營第60天&#xff5c;動態規劃part17&#xff5c; 647. 回文子串、516.最長回文子序列、動態規劃總結篇 647. 回文子串 647. 回文子串 思路&#xff1a; 暴力解法 兩層for循環&#xff0c;遍歷區間起始位置和終止位置&#xff0c;然后還需要一層遍歷判斷…

【mysql】—— 表的增刪改查

目錄 序言 &#xff08;一&#xff09;Create操作 1、單行數據 全列插入 2、多行數據 指定列插入 3、插入否則更新 4、直接替換 &#xff08;二&#xff09;Retrieve操作 1、SELECT 列 1??全列查詢 2??指定列查詢 3??查詢字段為表達式 4??為查詢結果指定…

數據結構——堆

數據結構——堆 堆堆簡介堆的分類 二叉堆過程插入操作 刪除操作向下調整&#xff1a; 增加某個點的權值實現參考代碼&#xff1a;建堆方法一&#xff1a;使用 decreasekey&#xff08;即&#xff0c;向上調整&#xff09;方法二&#xff1a;使用向下調整 應用對頂堆 其他&#…

Python Flask+Echarts+sklearn+MySQL(評論情感分析、用戶推薦、BI報表)項目分享

Python FlaskEchartssklearnMySQL(評論情感分析、用戶推薦、BI報表)項目分享 項目背景&#xff1a; 隨著互聯網的快速發展和智能手機的普及&#xff0c;人們越來越傾向于在網上查找餐廳、購物中心、酒店和旅游景點等商戶的點評和評分信息&#xff0c;以便做出更好的消費決策。…

YOLOv8 : 網絡結構

一. YOLOv8網絡結構 1. Backbone YOLOv8的Backbone同樣參考了CSPDarkNet-53網絡&#xff0c;我們可以稱之為CSPDarkNet結構吧&#xff0c;與YOLOv5不同的是&#xff0c;YOLOv8使用C2f(CSPLayer_2Conv)代替了C3模塊(如果你比較熟悉YOLOv5的網絡結構&#xff0c;那YOLOv8的網絡…

【GitHub】Pycharm本地項目打包上傳到Github倉庫的操作步驟

文章目錄 1、Pycharm端的設置操作2、Github端的設置操作3、Pycharm上配置Github4、Git本地項目至GitHub倉庫5、前往Github中查看確認6、常見報錯 1、Pycharm端的設置操作 通過CtrlAltS快捷組合鍵的方式&#xff0c;打開設置&#xff0c;導航到版本控制一欄中的Git&#xff0c;…

Gin安裝解決國內go 與 熱加載

get 方式安裝超時問題&#xff0c;國內直接用官網推薦的下面這個命令大概率是安裝不成功的 go get -u github.com/gin-gonic/gin 可以在你的項目目錄下執行下面幾個命令&#xff1a; 比如我的項目在E:\Oproject\zl cmd E:\Oproject\zl>就在目錄下執行 go env -w GO111…

回歸預測 | MATLAB實現GRU門控循環單元多輸入多輸出

回歸預測 | MATLAB實現GRU門控循環單元多輸入多輸出 目錄 回歸預測 | MATLAB實現GRU門控循環單元多輸入多輸出預測效果基本介紹程序設計往期精彩參考資料 預測效果 基本介紹 MATLAB實現GRU門控循環單元多輸入多輸出&#xff0c;數據為多輸入多輸出預測數據&#xff0c;輸入10個…

pytorch安裝VAE項目詳解

安裝VAE項目 一、 基本環境二、代碼來源三、搭建conda環境四、下載數據集五、啟動項目六、其他相關問題 一、 基本環境 工具版本號OSwin 11pycharm2020.1GPU3050 二、代碼來源 github地址為&#xff1a; https://github.com/AntixK/PyTorch-VAE/blob/8700d245a9735640dda458d…

ZooKeeper的應用場景(集群管理、Master選舉)

5 集群管理 隨著分布式系統規模的日益擴大&#xff0c;集群中的機器規模也隨之變大&#xff0c;因此&#xff0c;如何更好地進行集群管理也顯得越來越重要了。 所謂集群管理&#xff0c;包括集群監控與集群控制兩大塊&#xff0c;前者側重對集群運行時狀態的收集&#xff0c;后…

08 - 追加commit和修改最新的commit message

查看所有文章鏈接&#xff1a;&#xff08;更新中&#xff09;GIT常用場景- 目錄 文章目錄 1. 追加提交2. 修改最新的commit message 1. 追加提交 將改動追加到上一次的commit 現在我已經修改了Readme文件并且已經add、commit操作&#xff0c;但是還沒有push到遠程倉庫&#x…

【左神算法刷題班】第17節:在有序二維數組中查找目標值、等于目標字符串的子序列個數

第17節 題目1&#xff1a;在有序二維數組中查找目標值 給定一個每一行有序、每一列也有序&#xff0c;整體可能無序的二維數組 再給定一個數num&#xff0c; 返回二維數組中有沒有num這個數 例子 數組如下&#xff0c;找 6 是否存在。 1 3 5 7 2 4 6 13 3 9 14 …

“心理健康人工智能產學研創新聯盟”揭牌成立|深蘭科技

8月14日上午&#xff0c;“2023樹洞救援年會”在上海舉行&#xff0c;會上舉行了“心理健康人工智能產學研創新聯盟”的簽約和揭牌儀式。“樹洞行動救援團”創始人深蘭科技科學院智能科學首席科學家、荷蘭阿姆斯特丹自由大學人工智能系終身教授黃智生&#xff0c;深蘭科技集團創…

華納云:ubuntu啟動寶塔的方法是什么

要在Ubuntu上啟動寶塔面板&#xff08;BT Panel&#xff09;&#xff0c;請按照以下步驟操作&#xff1a; 下載并安裝寶塔面板&#xff1a; 在終端中執行以下命令&#xff0c;以下載并運行寶塔面板的安裝腳本&#xff1a; sudo su wget -O install.sh http://download.bt.c…

Git 常用操作

一、Git 常用操作 1、Git 切換分支 git checkout命令可以用于三種不同的實體&#xff1a;文件&#xff0c;commit&#xff0c;以及分支。checkout的意思就是對于一種實體的不同版本之間進行切換的操作。checkout一個分支&#xff0c;會更新當前的工作空間中的文件&#xff0c;…

68 # 中間層如何請求其他服務

前端 ajax 有跨域問題&#xff0c;可以先訪問中間層&#xff0c;在通過 node 去請求別的服務端口&#xff0c;可以解決跨域問題 編寫中間層調用 // 中間層的方式const http require("http");// http.get 默認發送 get 請求 // http.request 支持其他請求格式 postl…

Java基礎知識實際應用(學生信息管理系統、猜拳小游戲、打印日歷)

一、Java學生信息管理系統 這個系統包含了添加、修改、刪除、查詢和顯示所有學生信息等功能。您可以在此基礎上進行修改和完善&#xff0c;以適應您的需求。 import java.util.Scanner;public class StudentManagementSystem {private static Scanner scanner new Scanner(S…