Flink-Source算子狀態恢復分析

背景
修改 source 算子

kafka_old_topic?消費任務運行一段時間后,暫停狀態并保留。然后將 uid 和 topic 都改了,消費者 offset 會從 earliest 開始。

// before
FlinkKafkaConsumer consumer = KafkaConfig.getConsumer("kafka_old_topic");
consumer.setStartFromGroupOffsets();
DataStream<String> stream = env.addSource(consumer).uid("kafka-source-old").name("kafka-source-old");// after
FlinkKafkaConsumer consumer = KafkaConfig.getConsumer("kafka_new_topic");
consumer.setStartFromGroupOffsets();
DataStream<String> stream = env.addSource(consumer).uid("kafka-source-new").name("kafka-source-new");
新增 source 算子

但是只新增一個同樣的 kafka-source-new 算子(old 保留,消費者 offset 卻會從最近開始。

FlinkKafkaConsumer consumer = KafkaConfig.getConsumer("kafka_old_topic");
consumer.setStartFromGroupOffsets();
DataStream<String> stream = env.addSource(consumer).uid("kafka-source-old").name("kafka-source-old");// 新增
FlinkKafkaConsumer consumer = KafkaConfig.getConsumer("kafka_new_topic");
consumer.setStartFromGroupOffsets();
DataStream<String> stream = env.addSource(consumer).uid("kafka-source-new").name("kafka-source-new");
算子(鏈)子任務狀態列表(operatorSubtaskStates)

針對第一種情況,job 的算子狀態(localStates)有三個,分別對應xxx,

當給 Task【Source: kafka-source-new -> map-heart (org.apache.flink.streaming.runtime.tasks.SourceStreamTask) 】(被修改的 source)分配狀態時,該 Task 的每個算子都會綁定一個狀態(OperatorState):“kafka-source-new”、“map-heart”,只不過這兩個 OperatorState 有點差異:

這兩個算子狀態的 operatorSubtaskStates (存儲算子子任務的狀態信息)集合一個為空,一個不為空。原因就是在分配 “kafka-source-new” 算子狀態時,由于其不在 localState,于是走了默認的構造函數創建 OperatorState 對象:

其實關鍵點就在 operatorSubtaskStates 的封裝。

TaskStateAssignment 任務狀態分配

TaskStateAssignment 的構造方法有個核心參數 hasNonFinishedState。

如果當前 Task 的子任務狀態列表(operatorSubtaskStates全集)不為空,該值就為 true。

一旦該值為 true,就會執行 assignTaskStateToExecutionJobVertices:

給當前 Task 的每個 subTask 賦值狀態:

那么每個 subTask 都會有一份狀態(JobManagerTaskRestore,綁定 checkpointId):

JobManagerTaskRestore(JM與TM狀態交互中間站)

一個 Execution 就是一個 subTask:

Task 部署階段(JM 向 TM 提交 Task 任務),TM 會根據 TaskDeploymentDescriptor 來恢復狀態和創建算子(其中 taskRestore 就是 JobManagerTaskRestore,在 setInitialState 中賦值)。

TM 接收到提交任務請求時,解析出 taskRestore 創建任務狀態管理器(TaskStateManager)

TaskStateManager(TM 的任務管理器)
算子子任務狀態獲取

prioritizedOperatorState:傳入算子 ID,即可從 JobMangerTaskRestore 獲取子任務狀態。

  1. 如果 JobMangerTaskRestore 為 null,那么返回一個空的 PrioritizedOperatorSubtaskState(checkpoint設置為null

  1. 如果不為 null,則會從 JobManagerTaskRestore 中根據算子ID封裝 PrioritizedOperatorSubtaskState。

StateInitializationContext(UDF-算子狀態初始化上下文)

KafkaConsumerBase 在初始化狀態階段,會調用context.isRestored()判斷是否從狀態恢復:

算子狀態句柄(StreamOperatorStateHandler)處理算子狀態的初始化,該階段會調用 UDFKafkaConsumerBase.initializeState初始化算子的本地狀態并且 checkpointId 就是在這里被寫入狀態上下文 StateInitializationContext(該上下文是可以被用戶訪問的)。

StreamOperatorStateContext(initializeState全局上下文)

以上得知 checkpointId 來自context.getRestoredCheckpointId,那么 context (該上下文是不可以被用戶訪問的)從何而來?

算子狀態初始化AbstractStreamOperator.initialState,利用 StreamTaskStateInitializer

封裝 StreamOperatorStateContext:

那么 checkpointId 的封裝肯定在streamTaskStateManger.streamOperatorStateContext中:

方法中通過 taskStateManger 封裝算子狀態,如果 prioritizedOperatorSubtaskState 為空對象,那么這里的 checkpointId 就為 null


針對第二種情況,task 的算子狀態(有多個算子,算子鏈)不存在 localOperators 中,則默認使用構造方法封裝 OperatorState,每個OperatorState 的 operatorSubtaskStates 集合都為空。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/913777.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/913777.shtml
英文地址,請注明出處:http://en.pswp.cn/news/913777.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

IDEA中application.yml配置文件不自動提示解決辦法

今天在自己的電腦上使用IDEA的時候&#xff0c;發現在application配置文件里面輸入配置項的時候沒有提示&#xff0c;網上找了一圈也沒解決&#xff0c;最后自己試出來了。 解決辦法&#xff1a; 鼠標移動到配置文件上&#xff0c;單擊右鍵-重寫文件類型、選擇YAML(捆綁)&#…

Vite 完整功能詳解與 Vue 項目實戰指南

Vite 完整功能詳解與 Vue 項目實戰指南 Vite 是下一代前端開發工具&#xff0c;由 Vue 作者尤雨溪開發&#xff0c;提供極速的開發體驗和高效的生產構建。以下是完整功能解析和實戰示例&#xff1a;一、Vite 核心功能亮點閃電般冷啟動 基于原生 ES 模塊&#xff08;ESM&#xf…

Vue 3 中使用路由參數跳轉時 watch 觸發重復請求問題詳解

&#x1f4d8;Vue 3 中使用路由參數跳轉時 watch 觸發重復請求問題詳解&#x1f516; 收藏 點贊 關注&#xff0c;掌握 Vue 3 路由參數監聽中的隱藏陷阱&#xff0c;避免詳情頁、嵌套路由頁誤觸發重復請求&#xff01;&#x1f9e9; 一、問題背景 在 Vue 3 項目中&#xff0c…

前端 項目更新通知 (plugin-web-update-notification)

項目版本更新迭代時&#xff0c;需提示用戶更新系統&#xff0c;不然早時間不更新對用戶體驗很不好&#xff0c;所以在每次部署后需要提示用戶&#xff0c;刷新靜態資源。推薦插件 plugin-web-update-notification .具體配置 vite.config.js文件中 import { webUpdateNotice …

【力扣(LeetCode)】數據挖掘面試題0002:當面對實時數據流時您如何設計和實現機器學習模型?

文章大綱一、實時數據處理&#xff1a;構建低延遲的數據管道1. 數據接入與緩沖2. 實時清洗與校驗3. 特征標準化與對齊二、模型設計&#xff1a;選擇適配實時場景的模型架構1. 模型選擇原則三、訓練與更新策略&#xff1a;離線與在線協同&#xff0c;應對概念漂移1. 離線-在線協…

TongWeb8.0.9.0.3部署后端應用,前端訪問后端報405(by sy+lqw)

問題描述&#xff1a; 客戶前端部署在nginx上&#xff0c;后端部署在tongweb8上&#xff08;相當于前后端分離&#xff09;&#xff0c;登錄的時候報錯&#xff0c;f12看network&#xff0c;狀態碼405&#xff0c;如下所示&#xff1a;看console&#xff0c;如下所示&#xff1…

mysql互為主從失效,重新同步

一、分別登錄服務器A和服務器B的mysqlmysql -u root -p 123456789二、分別查看數據庫狀態信息,下邊兩項參數有一項為NO就表示同步異常Slave_IO_Running:從服務器&#xff08;Slave&#xff09;中的 I/O 線程的運行狀態Slave_SQL_Running:從服務器上的 SQL 線程是否正在運行mysq…

板凳-------Mysql cookbook學習 (十一--------6)

https://blog.csdn.net/weixin_43236925/article/details/146382981 清晰易懂的 PHP 安裝與配置教程 12.6 查找每組行中含有最大或最小值的行 mysql> set max_price (select max(price) from painting); Query OK, 0 rows affected (0.01 sec)mysql> select artist.name…

ECS由淺入深第四節:ECS 與 Unity 傳統開發模式的結合?混合架構的藝術

ECS由淺入深第一節 ECS由淺入深第二節 ECS由淺入深第三節 ECS由淺入深第四節 ECS由淺入深第五節 盡管 ECS 帶來了顯著的性能和架構優勢&#xff0c;但在實際的 Unity 項目中&#xff0c;完全摒棄 GameObject 和 MonoBehaviour 往往是不現實的。Unity 引擎本身的大部分功能&…

Mac關閉觸控板

打開 “有鼠標或無線觸控板時忽略內建觸控板”選項即可 參考&#xff1a;Mac如何關閉觸控板防止誤觸&#xff1f;內置的設置就可以達成 - Mac天空

Python:Rich 終端富文本與界面樣式工具庫

??? 1、簡述 Rich 是一個強大的 Python 庫,用于在終端中呈現富文本和精美的格式,讓命令行界面(CLI)應用擁有現代、美觀的輸出效果。本文將深入介紹 Rich 的核心功能,并通過一系列實際示例展示其強大能力。 Rich 由 Will McGugan 開發,主要特點包括: 豐富的文本樣式:支…

深入解析享元模式:通過共享技術高效支持大量細粒度對象

深入解析享元模式&#xff1a;通過共享技術高效支持大量細粒度對象 &#x1f31f; 嗨&#xff0c;我是IRpickstars&#xff01; &#x1f30c; 總有一行代碼&#xff0c;能點亮萬千星辰。 &#x1f50d; 在技術的宇宙中&#xff0c;我愿做永不停歇的探索者。 ? 用代碼丈量世…

Docker高級管理

一、Docker 容器的網絡模式 當項目大規模使用 Docker 時&#xff0c;容器通信的問題也就產生了。要解決容器通信問題&#xff0c;必須先了解很多關于網絡的知識。Docker 的網絡模式非常豐富&#xff0c;可以滿足不同容器的通信要求&#xff0c;下表列出了這些網絡模式的主要信息…

ABP VNext + Tye:本地微服務編排與調試

ABP VNext Tye&#xff1a;本地微服務編排與調試 &#x1f680; &#x1f4da; 目錄ABP VNext Tye&#xff1a;本地微服務編排與調試 &#x1f680;TL;DR ?一、環境與依賴 &#x1f6e0;?二、核心配置詳解 &#x1f680;1. 主配置 tye.yaml三、多環境文件 &#x1f331;&am…

Vue響應式原理一:認識響應式邏輯

核心思想&#xff1a;當數據發生變化時&#xff0c;依賴該數據的代碼能夠自動重新執行Vue中的應用&#xff1a;在data或ref/reactive中定義的數據&#xff0c;當數據變化時template會自動更新template的本質&#xff1a; 是render()函數, 用變化之后的數據重新執行render()函數…

Redis:分組與設備在 Redis 中緩存存儲設計

一、緩存存儲結構設計 分組與設備的映射關系&#xff08;使用 Set 結構&#xff09;&#xff1a; 鍵格式&#xff1a;采用 group:{groupId}:devices 的格式作為 Redis 中 Set 的鍵&#xff0c;例如 group:1:devices 就代表了分組 ID 為 1 的分組所關聯的設備集合。值內容&#…

Leetcode 3605. Minimum Stability Factor of Array

Leetcode 3605. Minimum Stability Factor of Array 1. 解題思路2. 代碼實現 題目鏈接&#xff1a;3605. Minimum Stability Factor of Array 1. 解題思路 這一題的核心思路是二分法&#xff0c;本質上就是我們給定一個常數kkk&#xff0c;然后考察是否存在一個構造使得能夠…

編譯安裝的Mysql5.7報“Couldn‘t find MySQL server (mysqld_safe)“的原因 筆記250709

編譯安裝的Mysql5.7報"Couldn’t find MySQL server (mysqld_safe)"的原因 筆記250709 MySQL 的安裝路徑與配置文件&#xff08;如 my.cnf 或 mysql.server&#xff09;中指定的 basedir 不一致。 mysqld_safe 文件實際位置與系統查找路徑不匹配&#xff08;常見于自…

在 Ubuntu 下配置 oh-my-posh —— 普通用戶 + root 各自使用獨立主題(共享可執行)

&#x1f9e9; 在 Ubuntu 下配置 oh-my-posh —— 普通用戶 root 各自使用獨立主題&#xff08;共享可執行&#xff09;? 目標說明普通用戶 使用 tokyonight_storm 主題 root 用戶 使用 1_shell 主題 共用全局路徑下的 oh-my-posh 可執行文件 正確加載 Homebrew 到環境變量中…

Spring Boot 項目中的多數據源配置

關鍵詞&#xff1a;Spring Boot、多數據源配置、MySQL、SQL Server、Oracle、動態切換 ? 摘要 在實際企業級開發中&#xff0c;一個 Spring Boot 項目可能需要連接多個數據庫&#xff0c;比如 MySQL、SQL Server 和 Oracle。不同的業務模塊可能依賴不同的數據源&#xff0c;這…