深入解析 Apache Flink FLIP-511:優化 Kafka Sink 事務處理,減輕 Broker 負載

一、 背景與核心問題:Kafka Sink 事務的痛點

Flink Kafka Sink 在?Exactly-Once?模式下依賴 Kafka 事務來確保數據寫入的原子性,并與 Flink 檢查點對齊。然而,非優雅關閉(如任務失敗、非?stop-with-savepoint?的停止)會導致?“滯留事務”。這些滯留事務在 Kafka 中會:

  1. 阻塞消費者 (READ_COMMITTED):阻礙消費進度 (LSO)。
  2. 阻礙數據卸載和主題壓縮
  3. 最關鍵的是:Kafka Broker 會在內存中保留每個事務的元數據?長達 7 天

1、舊方案

探測式事務恢復 (INCREMENTING?+?PROBING) 的致命缺陷

Flink 原有的恢復機制基于“探測”:

  • 事務 ID 命名規則:?transactionalIdPrefix-subtaskId-checkpointId?(每個檢查點生成唯一 ID)。
  • 恢復邏輯:?根據檢查點狀態,嘗試初始化并提交/中止可能滯留的事務 ID(按檢查點 ID 和子任務 ID 維度遞增探測)。探測到?epoch > 0?表示事務滯留需中止。

該方案存在兩大嚴重問題:

  1. Kafka Broker 內存爆炸性增長:
    • 高檢查點頻率(如 1 分鐘)結合唯一 ID 策略,導致海量短期事務 ID
    • 計算:7天 * 24小時 * 60分鐘 * 并行度 ≈ 10080 * 并行度?個 ID 需在 Broker 內存保留 7 天。
    • 這是 Kafka 設計(預期 ID 重用)與 Flink 實現(唯一 ID)的根本沖突,給 Broker 帶來巨大且不必要的內存壓力 (FLINK-34554)。
  2. 恢復時間不可預測與“探測爆炸”:
    • 在連續重啟失敗(無法完成新檢查點)的最壞情況下,每次重啟探測的 ID 范圍會指數級擴大(每次約 3 倍)。
    • 恢復時間可能變得非常長且難以預估。
    • 雖然成功檢查點能重置此問題,但重啟循環本身已表明系統存在其他問題,此機制會雪上加霜。

二、 FLIP-511 解決方案:池化 ID 與精準清理

提案的核心是摒棄唯一 ID 策略,改為重用有限數量的事務 ID,并利用?Kafka 3.0+ 的?ListTransactions?API?實現精準的事務狀態查詢和清理。

1、新方案核心機制 (POOLING?+?LISTING)

1、事務 ID 命名與池化管理 (POOLING):

  • 格式仍為?<prefix>-<subtask id>-<counter>,但?counter?是動態遞增的整數
  • Writer (寫入器) 職責:
    • 啟動:?創建一個新事務(分配新 ID 或復用池中可用 ID),開始寫入。存儲當前使用的 ID 到狀態
    • 檢查點 (snapshotState):?將當前活躍事務?finalize?并傳遞給 Committer。立即開啟一個新事務(分配新 ID 或復用)。存儲所有已開始但未最終釋放(提交/中止/復用)的 ID 到狀態
    • 檢查點完成通知 (notifyCheckpointComplete):?收到 Committer 成功提交某事務 ID 的通知后,將該 ID 標記為可用并放入池中復用
    • 狀態合并/清理 (snapshotState/initializeState):?在后續檢查點或恢復時,清理已確認完成的事務 ID 狀態,回收其計數器或標記 ID 可用。
    • 關閉:?中止當前活躍事務。
  • Committer (提交器) 職責:
    • 接收 Writer 傳遞的需要提交的事務 ID 信息。
    • 執行?commitTransaction。成功后將 ID 釋放通知回 Writer(通過回調或狀態更新)。

2、精準恢復利用?ListTransactions?API (LISTING):

  • 恢復啟動時:
    1. 查詢:?調用 Kafka AdminClient 的?ListTransactions?API,獲取 Kafka Broker 上所有屬于該 Sink 的?未完成?(Open) 事務
    2. 對比:?從 Flink 狀態中恢復出需要重新提交的事務 ID 列表(即上次運行中已?finalize?但可能未提交的事務)。
    3. 清理:?精準中止所有在?ListTransactions?結果中但?不在?需重新提交列表中的 Open 事務。這些是真正的“滯留垃圾事務”。
  • 重新提交:?Committer 重新提交狀態中記錄的待提交事務 ID。冪等操作,已提交的事務會靜默成功。

2、新方案的優勢

  • 大幅減少 Broker 內存占用:
    • 預期 ID 數量 ≈ 3 * 并行度?(1 Writer 活躍事務 + 1-2 個等待/提交中事務)。
    • 相比舊方案(可能數萬/數十萬 ID),減少 2-3 個數量級。即使臨時峰值到 100 個 ID,影響也遠小于舊方案。
  • 穩定且快速的恢復:
    • 無需復雜探測邏輯,恢復時間確定且快速
    • 徹底消除“探測爆炸”問題。
  • 更健壯:?直接依賴 Kafka API 查詢事務狀態,邏輯更清晰可靠。
  • 資源效率提升:?減少了網絡交互(探測)和狀態管理開銷。

三、 公共接口與配置變更

提案引入了靈活的配置選項,允許用戶選擇策略:

public class KafkaSinkBuilder<IN> {...public KafkaSinkBuilder<IN> setTransactionNamingStrategy(TransactionNamingStrategy transactionNamingStrategy);// 設置命名策略}public class KafkaConnectorOptions {...public static final ConfigOption<TransactionNamingStrategy> TRANSACTION_NAMING_STRATEGY =ConfigOptions.key("sink.transaction-naming-strategy").enumType(TransactionNamingStrategy.class).defaultValue(TransactionNamingStrategy.DEFAULT);// 表/SQL 選項}@PublicEvolving
public enum TransactionNamingStrategy {
// 舊行為:遞增唯一ID + 探測恢復 (INCREMENTING + PROBING)INCREMENTING(...),
// 新行為:池化ID + ListTransactions恢復 (POOLING + LISTING)POOLING(...);public static final TransactionNamingStrategy DEFAULT = INCREMENTING;// 初始默認值}
  • sink.transaction-naming-strategy:核心配置項,可選?INCREMENTING?(舊) 或?POOLING?(新)。
  • 默認值:初始版本保持?INCREMENTING?以確保行為一致性和向后兼容性。用戶需顯式啟用?POOLING?以使用新特性。
  • 設計考量:使用?enum?為未來可能的其他策略(如靜態池?STATIC_POOL)預留了擴展空間。

四、 實現關鍵點與兼容性

  1. 狀態擴展:
    • Writer State:需要擴展以存儲?當前活躍事務 ID?和?所有已開始但尚未釋放(等待提交確認或復用)的事務 ID 列表。這是實現 ID 池化和精準恢復的基礎。
  2. 策略抽象:
    • 將事務 ID 生成 (TransactionNamingStrategyImpl) 和滯留事務中止 (TransactionAbortStrategyImpl) 邏輯解耦并抽象為策略模式。
    • 現有代碼重構為?INCREMENTING?(命名) +?PROBING?(中止)。
    • 新增?POOLING?(命名) +?LISTING?(中止)。
  3. Kafka 版本依賴:
    • LISTING?策略強依賴 Kafka Broker 3.0+?提供的?ListTransactions?API。使用前需確保集群版本滿足要求。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/91414.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/91414.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/91414.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

設計模式:組合模式 Composite

目錄前言問題解決方案結構代碼前言 組合是一種結構型設計模式&#xff0c;你可以使用它將對象組合成樹狀結構&#xff0c;并且能像使用獨立對象一樣使用它們。 問題 如果應用的核心模型能用樹狀結構表示&#xff0c; 在應用中使用組合模式才有價值。 例如&#xff0c; 你有兩…

嵌入式 C 語言入門:函數封裝與參數傳遞學習筆記 —— 從定義到內存機制

前言 大家好&#xff0c;這里是 Hello_Embed。在前一篇筆記中&#xff0c;我們用循環實現了 LED 閃爍&#xff0c;其中重復使用了兩段幾乎一樣的延時代碼&#xff1a; for(i 0; i < 100000000; i); // 延時這種重復不僅讓代碼冗余&#xff0c;還不利于后續修改&#xff08…

第一個大語言模型的微調

模型推理 現在,我們的模型應該能夠針對輸入的任何短句生成類似尤達大師風格的句子作為回應。 該模型要求其輸入格式規范。我們需要構建一個 “消息” 列表 —— 在這個案例中,就是來自用戶的消息 —— 并通過提示表明輪到模型進行輸出,以促使其做出回答。 add_generation…

Linux內核驅動開發核心問題全解

&#x1f4d6; 推薦閱讀&#xff1a;《Yocto項目實戰教程:高效定制嵌入式Linux系統》 &#x1f3a5; 更多學習視頻請關注 B 站&#xff1a;嵌入式Jerry Linux內核驅動開發核心問題全解 本文系統梳理了 Linux 驅動開發、內核同步、中斷處理、內存管理、進程通信、系統啟動等典型…

【C++篇】C++11入門:踏入C++新世界的大門

文章目錄C11簡介列表初始化1. {}初始化2. initializer_list容器initializer_list的使用場景聲明1. auto2. decltype3. nullptrSTL中的變化1. 新容器array容器forward_list容器unordered_map和unordered_set容器2. 新接口C11簡介 C98/03&#xff1a;在2003年C標準委員會曾經提交…

Java 日期時間處理:分類、用途與性能分析

Java提供了多種日期時間處理API&#xff0c;隨著版本演進不斷改進。以下是主要日期時間類的分類、用途和性能分析&#xff1a;一、Java日期時間API分類1. 傳統日期時間API (Java 1.0/1.1)java.util.Date - 表示特定的瞬間&#xff0c;精確到毫秒java.util.Calendar - 抽象類&am…

[Linux]學習筆記系列 --GCC

文章目錄屬性__cleanup__attribute_malloc__ 用于標記函數返回一個新分配的內存塊__attribute_alloc_size__ 用于指定分配的內存大小__attribute__((const)) 標記為純函數(pure function)__attribute__((__externally_visible__)) 使其在編譯器優化過程中保持對外部模塊的可見性…

【龍澤科技】汽車維護與底盤拆裝檢修仿真教學軟件【風光580】

產品簡介汽車維護與底盤拆裝檢修仿真教學軟件是依托《全國職業院校技能大賽》“汽車維修”賽項中“汽車維護與底盤拆裝檢修模塊”競賽模塊&#xff0c;自主開發的一款仿真教學軟件。軟件采用仿真仿真技術模擬實際汽車維修工的崗位技能操作流程&#xff0c;操作內容主要包括&…

Spring之【循環引用】

目錄前置知識SingletonBeanRegistryDefaultSingletonBeanRegistrySpring中處理循環引用的流程分析定義兩個具有循環引用特點的Bean執行A的實例化執行A的屬性填充(執行過程中發現A依賴B&#xff0c;就去執行B的實例化邏輯)執行B的實例化執行B的屬性填充執行B的初始化執行A的屬性…

LRU緩存淘汰算法的詳細介紹與具體實現

LRU&#xff08;Least Recently Used&#xff0c;最近最少使用&#xff09;是一種基于時間局部性原理的緩存淘汰策略。其核心思想是&#xff1a;最近被訪問的數據在未來更可能被再次使用&#xff0c;而最久未被訪問的數據應優先被淘汰&#xff0c;從而在有限的緩存空間內保留高…

JS-第十九天-事件(一)

一、事件基礎概念1.1 事件三要素事件源&#xff1a;觸發事件的元素事件類型&#xff1a;事件的種類&#xff08;如click、mouseover等&#xff09;事件處理程序&#xff1a;響應事件的函數1.2 事件流機制事件傳播分為三個階段&#xff1a;捕獲階段&#xff1a;事件從頂層開始&a…

Matplotlib(三)- 圖表輔助元素

文章目錄一、圖表輔助元素簡介二、坐標軸的標簽、刻度范圍和刻度標簽1. 坐標軸標簽1.1 x軸標簽1.2 y軸標簽1.3 示例&#xff1a;繪制天氣氣溫折線圖2. 刻度范圍和刻度標簽2.1 刻度范圍2.1.1 x軸刻度范圍2.1.2 y軸刻度范圍2.2 刻度標簽2.2.1 x軸刻度標簽2.2.2 y軸刻度標簽2.3 示…

【Linux基礎知識系列】第七十八篇 - 初識Nmap:網絡掃描工具

在網絡管理和安全領域&#xff0c;網絡掃描是一個不可或缺的工具。它可以幫助網絡管理員了解網絡中的設備、服務以及潛在的安全漏洞。Nmap&#xff08;Network Mapper&#xff09;是一個功能強大的開源網絡掃描工具&#xff0c;它能夠快速發現網絡中的主機、端口和服務&#xf…

EasyGBS的兩種錄像回看

EasyGBS 支持兩種錄像回看&#xff0c;即“平臺端”的錄像回看和“設備端”的錄像回看。本期我們來介紹兩者的區別和使用方法。一、平臺端錄像1、什么是平臺端錄像平臺端錄像是指由 EasyGBS 平臺直接錄制并存儲。2、配置平臺端錄像進入平臺&#xff0c;依次點擊【錄像回放】→【…

大模型學習思路推薦!

為進一步貫徹落實中共中央印發《關于深化人才發展體制機制改革的意見》和國務院印發《關于“十四五”數字經濟發展規劃》等有關工作的部署要求&#xff0c;深入實施人才強國戰略和創新驅動發展戰略&#xff0c;加強全國數字化人才隊伍建設&#xff0c;持續推進人工智能從業人員…

數據庫連接池性能優化實戰

背景我們公司正在處于某個項目的維護階段&#xff0c;領導對資源告警比較重視&#xff0c;服務器資源告警的就不說了&#xff0c;運維同學每隔一小時都會檢測線上環境的應用服務信息&#xff0c;例如&#xff1a;網關日志響應時間告警/nginx日志接口響應時間告警/日志關鍵字異常…

Excel常用函數大全,非常實用

一、數學與統計函數1. SUM作用&#xff1a;求和SUM(number1, [number2], ...)SUM(A1:A10) ? 計算A1到A10單元格的總和注意&#xff1a;自動忽略文本和空單元格2. AVERAGE作用&#xff1a;計算平均值AVERAGE(number1, [number2], ...)AVERAGE(B2:B20) ? 計算B列20個數據的平均…

性能優化(一):時間分片(Time Slicing):讓你的應用在高負載下“永不卡頓”的秘密

性能優化(一)&#xff1a;時間分片&#xff08;Time Slicing&#xff09;&#xff1a;讓你的應用在高負載下“永不卡頓”的秘密 引子&#xff1a;那張讓你瀏覽器崩潰的“無限列表” 想象一個場景&#xff1a;你需要渲染一個包含一萬個項目的列表。在我們的“看不見”的應用中&a…

《C++》STL--list容器詳解

在 C 標準模板庫(STL)中&#xff0c;list 是一個非常重要的序列容器&#xff0c;它實現了雙向鏈表的數據結構。與 vector 和 deque 不同&#xff0c;list 提供了高效的插入和刪除操作&#xff0c;特別是在任意位置。本文將深入探討 list 容器的特性、使用方法以及常見操作。 文…

Day 28:類的定義和方法

DAY 28 類的定義和方法 知識點學習 1. 類的定義 在Python中&#xff0c;類是創建對象的模板。使用class關鍵字來定義一個類。類名通常采用首字母大寫的命名方式&#xff08;PascalCase&#xff09;。 # 最簡單的類定義 class MyClass:pass # 使用pass占位符類的定義就像是…