深入理解Kafka事務

一 kafka事務介紹

1.1 Kafka事務的作用

  • Exactly-Once Semantics (EOS):在“消費 → 處理 → 生產”的流式鏈路里避免重復寫重復讀帶來的副作用,確保“處理一次且僅一次”的可見效果。

  • 跨分區 / 跨 Topic 原子性:將一次處理內寫入的多分區多主題消息,以及本次消費位點 offset 的提交,綁定在同一個事務里,要么都生效,要么都回滾。

1.2 相關術語

  • PID / Producer IDEpochSequence Number:冪等生產者元數據,避免重復寫。

  • 事務協調器(Transaction Coordinator):位于 broker 側的協調者,管理事務狀態機與兩階段提交。

  • 控制批次(Control Batch / Control Records):日志里的特殊記錄,用于標記事務,主要是?COMMIT / ABORT(注意:數據分區不寫“BEGIN”標記)。

  • LSO(Last Stable Offset)HW(High Watermark):對 read_committed 消費者只暴露到 LSO,屏蔽未決事務。

  • __transaction_state:kafka內部主題,用于持久化事務狀態機。

  • __consumer_offsets:kafka內部主題,存消費組位點;位點也可以被納入事務。

  • 僵尸實例:一個舊的 Producer 實例(帶著同樣的 transactional.id)在崩潰或網絡分區后掛掉了,但它可能在恢復后繼續嘗試往 Kafka 寫數據,但是與此同時,已經有一個新的 Producer 實例已經起來并接管了同樣的 transactional.id,我們把這個宕機后又恢復的producer叫做僵尸實例

1.3?消費者隔離級別

消費者的隔離級別有下面兩種

  • read_uncommitted(默認):可讀到未提交已提交數據。

  • read_committed:只讀取已提交事務的數據(EOS 流水線應使用)。

假設想要配置消費者隔離級別為read_committed,可通過下面配置完成

props.put("isolation.level", "read_committed");

二、使用 Kafka 事務

2.1 生產者端配置

Properties props = new Properties(); 
// broker地址
props.put("bootstrap.servers", "broker1:9092,broker2:9092"); 
// transactional.id 必須唯一且穩定(可復用)
props.put("transactional.id", "order-service-txn-1");  
// 配了 transactional.id 會自動開啟,但是最好還是顯式配置
props.put("enable.idempotence", "true"); 
/** 
然后通常由客戶端自動/隱式設置為適配冪等語義:
要求acks=all、retries>0 max.in.flight.requests.per.connection<=5 等,
不配置就會取默認的值,比如retries = Integer.MAX_VALUE
*/
KafkaProducer<String, String> producer = new KafkaProducer<>(props); 
// 找到協調器、申請 PID/epoch、登記事務狀態
producer.initTransactions(); 

2.2 事務性生產

// 開啟事務
producer.beginTransaction();// 發送消息
producer.send(new ProducerRecord<>("demo-topic", "key1", "message-1"));
producer.send(new ProducerRecord<>("demo-topic", "key2", "message-2"));
producer.send(new ProducerRecord<>("demo-topic2", "key3", "message-3"));// 提交事務
producer.commitTransaction();

這樣,對于配置了read_committed的消費者而言,要么這三個消息同時可見,要么同時不可見。

2.3?實踐建議

  • 使用穩定且可復用的 transactional.id,這樣服務重啟后就可恢復事務上下文,還能對“僵尸實例”做圍欄。

  • 事務應盡可能短小且頻繁提交,避免長時間占用導致 LSO 卡住,增加讀延遲。

  • 失敗重試要以事務回滾為界,確保回滾后可安全重放。

  • EOS 只覆蓋 Kafka 內部的原子性;涉及外部系統,則需要額外使用?Outbox/Saga 等模式。

三 kafka事務的實現

3.1?關鍵組件

  • 事務生產者:發數據、報告參與分區、發起事務結束(提交/回滾)。

  • 消費組協調器(Group Coordinator):當offset被納入事務時,消費組協調器需要把最新offset發送到專門存儲offset的內部主題__consumer_offsets中,?所以消費者協調器和__consumer_offsets里的對應分區也是事務參與者。

  • 事務協調器(Transaction Coordinator):負責給生產者分配 Producer ID/epoch(每個 transactional.id對應一個PID),維護事務狀態機,持久化事務日志,并且當事務結束時(commit 或 abort),事務協調器 會把這個事務的結果(commit/abort 標記)廣播到所有該事務涉及的分區)。

  • 數據分區所在的 Broker Leader:接受數據與控制批次寫入,維護 High Watermark/Last Stable Offset與中止事務索引。

  • 消費者:根據隔離級別獲取數據,包括read_committed?和read_uncommitted?,依賴隔離級別和?abortedTransactions 過濾。

3.2 事務實現流程

下圖是kafka事務消息的總體流程圖

3.2.1 冪等生產者

  • 協調器為每個 transactional.id 分配 PIDepoch

  • 生產者對每個分區維護單調遞增的序列號;Broker 端以 (PID, epoch, seq) 去重,避免“重復寫”。

  • 若同一 transactional.id 的新實例啟動并 initTransactions(),協調器會提升 epoch 并圍欄舊實例;舊實例寫入不會成功并且得到 INVALID_PRODUCER_EPOCH/ProducerFencedException

3.2.2 事務狀態機與內部日志

  • 事務協調器將每個事務的狀態持久化到 __transaction_state
    EMPTY/ONGOING → PREPARE_COMMIT | PREPARE_ABORT → COMPLETE_COMMIT | COMPLETE_ABORT

  • 事務涉及到的分區集合(數據分區與 __consumer_offsets 的目標分區)由生產者在首次寫入/首次提交位點時通過
    AddPartitionsToTxn / AddOffsetsToTxn 報告給協調器并持久化。

3.2.3? 兩階段提交(2PC)

與傳統數據庫不同的是,數據分區里只寫“結束標記”——COMMIT 或 ABORT 的控制批次;不寫 BEGIN。BEGIN 只體現在協調器的內部狀態機與日志。信息會包含自己所屬的事務producer。

階段 A:事務進行中(ONGOING)

  1. beginTransaction() 后,生產者向多個分區寫入消息(每條攜帶 PID/epoch/seq)。

  2. 如首次寫入某分區,生產者會先向協調器請求?AddPartitionsToTxn,協調器會記錄“本事務涉及到這個分區”。

階段 B:準備提交(PREPARE_COMMIT)/ 準備回滾(PREPARE_ABORT)

  1. 生產者調用 commitTransaction()(或 abortTransaction()),就會發送 EndTxn請求給協調器。

  2. 協調器把事務狀態改為 PREPARE_COMMIT(或 PREPARE_ABORT)并寫入kafka內內部主題?__transaction_state

  3. 扇出:協調器向所有涉及分區的 leader 發起WriteTxnMarkers請求。

階段 C:各分區落盤控制記錄 + 反饋

  1. 在收到事務協調器的WriteTxnMarkers請求后,各分區在自己的日志里追加一個“控制批次(Control Batch)”,類型為 COMMIT 或 ABORT。注意kafka沒有“BEGIN”控制批次,BEGIN 信息由協調器掌

  2. 分區 leader 追加成功后應答協調器。

  3. 當所有目標分區都落成控制批次,協調器將事務狀態置為 COMPLETE_COMMIT(或 COMPLETE_ABORT),并更新 __transaction_state

3.3?可見性控制

  • HW(High Watermark):副本多數派確認的最高位移。read_uncommitted 可讀到 HW。

  • LSO(Last Stable Offset):保證其之前沒有“未決事務”的最末位移
    read_committed,Broker 只返回 ≤ LSO 的數據,從源頭屏蔽未提交事務。

  • 為何消費者還能拿到“已中止事務”的數據片段?
    為性能考慮,Broker 可能仍返回包含已中止事務記錄的批次,但會攜帶一個
    abortedTransactions 列表(含 producerIdfirstOffset)。客戶端在解碼時跳過這些記錄

  • 事務索引(.txnindex):每個日志段都有一個中止事務索引,Broker 用它在 Fetch 時快速收集 abortedTransactions 列表。

小結:在 read_committed 下,消費者不用“暫存不確定狀態數據”去等控制標記;Broker 通過 LSO 保證不給你發“未決事務”的記錄。客戶端只需在已決事務里過濾 ABORT 記錄(根據 abortedTransactions)。

3.4?消費-處理-生產 模式中消費offset與輸出的原子綁定

sendOffsetsToTransaction(offsets, groupMetadata) 背后做了兩件事,

1?AddOffsetsToTxn告訴事務協調器:這次事務會提交哪個消費組的位點

2?TxnOffsetCommit 把位點寫入 __consumer_offsets 對應分區

在最終 COMMIT(或 ABORT)時,__consumer_offsets 分區也會收到相應的 COMMIT/ABORT 控制批次,從而與輸出數據一并原子生效(或放棄)。

3.5?常見故障的處理

3.5.1 失敗與恢復

  • 如果某些分區暫不可用,協調器會持續重試 WriteTxnMarkers最終一致的 2PC)。

  • 事務超時(由客戶端 transaction.timeout.ms 申請,受 broker 上限約束)協調器主動 ABORT 并下發 ABORT 標記。

  • 協調器宕機可通過 __transaction_state 重放恢復事務狀態并繼續扇出事務標記。

  • 在事物未提交之前,配置了read_committed的消費者不會看到未決事務。

3.5.2 應對僵尸實例

  • Kafka 引入了 Producer Epoch,通過圍欄機制來隔離僵尸實例。每個 Producer 在第一次用某個 transactional.id 初始化事務時,Kafka 的 Transaction Coordinator 會給它分配一個 producerIdproducerEpoch。當相同 transactional.id 的新實例啟動時,Coordinator 會給它分配 更高的 epoch,并更新元數據。就這樣,新實例可以用高 epoch 寫數據,而舊實例(僵尸)帶著低 epoch 再寫數據時,Broker 會直接拒絕。

四 運維和調優要點

事務大小與超時

  • 客戶端的 transaction.timeout.ms 受 Broker 端上限約束(如 transaction.max.timeout.ms)。

  • 事務過大或時間過長,會拖慢 LSO 前進,導致 read_committed 消費延遲升高

圍欄與異常

  • ProducerFencedException / INVALID_PRODUCER_EPOCH:同一 transactional.id 新實例已接管;舊實例必須停止。

  • TransactionAbortedException:本事務已被中止;需要清理/重啟事務。

副本與可靠性

  • 冪等/EOS 通常要求 acks=all 與合適的 min.insync.replicas。避免不干凈選主導致重寫。

重要監控指標

  • 生產端:transactional.commit.latency.avgtransactional.abort.raterecord-errors/retries

  • Broker:transaction-coordinator-metrics(扇出延遲、超時/中止率)、replica-fetcher-metrics

  • 消費端:records-lag-max(在 read_committed 下對 LSO 滯后敏感)。

主題壓縮與控制記錄

  • 控制批次(COMMIT/ABORT)是特殊記錄,日志清理/壓縮會保留其必要語義,確保歷史可正確回放。

邊界與限制

  • 事務只在同一 Kafka 集群內跨 Topic/分區原子;不跨外部系統

  • 超大事務(大量分區/消息)會放大標記扇出成本與恢復時間。

五 Kafka Streams 中的事務

  • processing.guarantee=exactly_once_v2/exactly_once:Streams 在內部為每個任務(Task)維護事務性生產者,把處理結果與位點綁定到同一事務中;重平衡時靠 epoch 圍欄防止舊實例寫入。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/95328.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/95328.shtml
英文地址,請注明出處:http://en.pswp.cn/web/95328.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

RabbitMinQ(模擬實現消息隊列項目)

目錄 一.消息隊列背景 二.需求分析 核心概念: BrokerServer: BrokerServer的核心API: 交換機Exchange: 持久化&#xff1a; 網絡通信&#xff1a; 消息應答&#xff1a; 三、模塊劃分 四、創建項目 五、創建核心類 Exchange: MSGQueue: Binding: Message: 六.…

如何構建StarRocks官方文檔

不知道是網絡問題還是官網問題&#xff0c;StarRocks文檔經常出現卡頓的情況&#xff0c;曾經構建過Flink文檔&#xff0c; 所以也想嘗試自己構建一個StarRocks的本地官方文檔 斷斷續續折騰了好幾天&#xff0c;就不廢話了&#xff0c;直接上實際步驟 1. 環境 1.1 Linux環境 …

堡壘機(跳板機)入門指南:構建更安全的多服務器運維架構

隨著你的業務不斷擴張&#xff0c;你云上服務器的數量&#xff0c;是不是也從一臺&#xff0c;變成了三臺、五臺、甚至一個由幾十臺機器組成的龐大集群&#xff1f;你像一個盡職的“國王”&#xff0c;為你王國的每一座“城池”&#xff08;每一臺服務器&#xff09;&#xff0…

(鏈表)Leetcode206鏈表反轉+Leetcode6刪除鏈表的倒數第N個結點+虛擬頭節點使用

虛擬頭結點的作用是&#xff1a;簡化插入/刪除邏輯方便返回頭節點減少邊界錯誤 Leetcode206鏈表反轉 206. 反轉鏈表 - 力扣&#xff08;LeetCode&#xff09; 頭插法 # Definition for singly-linked list. # class ListNode(object): # def __init__(self, val0, nextN…

自然語言處理NLP:嵌入層Embedding中input_dim的計算——Tokenizer文本分詞和編碼

1. 詞匯表大小&#xff08;input_dim&#xff09;計算方法 嵌入層Embedding中的input_dim是根據數據中所有唯一詞&#xff08;或字&#xff09;的總數來決定的。可以通過Tokenizer文本分詞和編碼得到。 簡單說&#xff0c;Tokenizer 是一個文本分詞和編碼器&#xff0c;它主要做…

python中的分代垃圾回收機制的原理【python進階二、2】

1. 分代設計思想Python 將對象按存活時間分為三代&#xff08;Generation 0, 1, 2&#xff09;&#xff1a;0代&#xff08;年輕代&#xff09;&#xff1a;新創建的對象。1代&#xff08;中年代&#xff09;&#xff1a;經歷一次GC掃描后存活的對象。2代&#xff08;老年代&am…

【后端】云服務器用nginx配置域名訪問前后端分離項目

云服務器有多個服務&#xff08;前端 3000 端口、后端 8288 端口&#xff0c;甚至還有別的服務&#xff09;。希望用戶只輸入 域名&#xff08;比如 https://example.com&#xff09;&#xff0c;而不是 example.com:3000、example.com:8288。本質上是要做 端口隱藏 域名統一入…

軟考中級數據庫系統工程師學習專篇(67、數據庫恢復)

67、數據庫恢復數據庫故障恢復中基于檢查點的事務分類與處理策略在數據庫系統發生故障后的恢復過程中&#xff0c;?檢查點&#xff08;Checkpoint&#xff09;?? 技術是關鍵機制&#xff0c;它能有效縮小恢復范圍&#xff0c;減少需要掃描的日志量&#xff0c;從而加速恢復進…

SpringBoot 分庫分表 - 實現、配置與優化

分庫分表&#xff08;Database Sharding&#xff09;是一種數據庫架構優化技術&#xff0c;通過將數據分散到多個數據庫或表中&#xff0c;以應對高并發、大數據量場景&#xff0c;提升系統性能和擴展性。 在 Spring Boot 中&#xff0c;分庫分表可以通過框架支持&#xff08;如…

爬蟲代理實操:選擇可靠的HTTP(S)代理的方法

在爬蟲工作里&#xff0c;選對代理協議&#xff08;HTTP/HTTPS&#xff09;只是第一步&#xff0c;更關鍵的是找到 “可靠” 的代理 —— 哪怕是 HTTPS 代理&#xff0c;若節點不穩定、IP 純凈度低&#xff0c;照樣會頻繁被封&#xff0c;反而耽誤采集進度。這幾年踩過不少坑&a…

數據庫常見故障類型

數據庫常見故障類型數據庫系統運行過程中可能發生的故障主要分為以下三類&#xff0c;其破壞性由小到大&#xff1a;故障類型別名根本原因影響范圍典型例子?1. 事務故障?邏輯故障事務內部的程序邏輯錯誤或輸入異常。?單個或少量事務。- 輸入數據不合法&#xff08;如除零錯誤…

【Android】Span富文本簡介

一&#xff0c;概述android.text包下span體系類&#xff0c;主要指Spanned、Spannable、ParagraphStyle、CharacterStyle實現類。Android通過Span體系&#xff0c;搭建了富文本API&#xff0c;其中Spanned、Spannable實現了CharSequence接口&#xff0c;旨在映射段落start~end之…

【HTML】draggable 屬性:解鎖網頁交互新維度

一、簡介 在Web開發中&#xff0c;用戶與內容的交互方式直接影響用戶體驗的深度。在 HTML 中&#xff0c;draggable 是一個全局屬性&#xff0c;通過簡單配置即可讓任意元素實現拖拽功能。也可通過結合 draggable 屬性和 JavaScript 事件&#xff0c;可以實現豐富的拖放交互功能…

如何在Github中創建倉庫?如何將本地項目上傳到GitHub中?

1.1 點擊New repository&#xff08;這個是創建代碼倉庫的意思&#xff09;初次完成后只有一個文件最后&#xff1a;在本地git clone 項目地址然后把項目文件復制到git的文件夾內再提交到遠程倉庫git add . git commit -m "修改https"git push origin mainmain為分支…

【前端教程】HTML 基礎界面開發

一、網站導航欄設計與實現 導航欄是網站的重要組成部分&#xff0c;負責引導用戶瀏覽網站的各個板塊。以下是一個實用的導航欄實現方案&#xff1a; 實現代碼 HTML 結構&#xff1a; <!DOCTYPE html> <html> <head><meta charset"utf-8" /&…

【學Python自動化】 6. Python 模塊系統學習筆記

一、模塊基礎 什么是模塊&#xff1f;包含 Python 定義和語句的 .py 文件解決代碼復用和組織問題每個模塊有自己的命名空間創建模塊示例# fibo.py - 斐波那契模塊 def fib(n):"""打印小于n的斐波那契數列"""a, b 0, 1while a < n:print(a, e…

機器學習-時序預測2

門控循環單元GRU 接著機器學習-時序預測1-CSDN博客這個說&#xff0c;GRU是LSTM的一個簡化而高效的變體&#xff0c;都使用“門控機制”來控制信息流&#xff0c;但它通過合并一些組件&#xff0c;使結構更簡單、參數更少、計算更快&#xff0c;同時在許多任務上性能與 LSTM 相…

數據湖與數據倉庫

大數據前沿技術詳解 目錄 數據湖技術湖倉一體架構數據網格實時流處理技術云原生數據技術數據治理與血緣AI原生數據平臺邊緣計算與大數據 核心內容包括&#xff1a; 數據湖技術 - 架構模式、技術棧、面臨的挑戰 湖倉一體架構 - Delta Lake、Iceberg、Hudi等主流實現 數據網格…

Python OpenCV圖像處理與深度學習:Python OpenCV入門-圖像處理基礎

Python OpenCV入門實踐&#xff1a;圖像處理基礎 學習目標 通過本課程&#xff0c;學員們將了解OpenCV的基本概念、安裝方法&#xff0c;掌握如何使用Python和OpenCV進行基本的圖像處理操作&#xff0c;包括圖像的讀取、顯示、保存以及簡單的圖像變換。 相關知識點 Python Open…

【lua】Lua 入門教程:從環境搭建到基礎編程

Lua 入門教程&#xff1a;從環境搭建到基礎編程 Lua 是一種輕量級、可擴展的腳本語言&#xff0c;廣泛應用于游戲開發&#xff08;如《魔獸世界》《Roblox》&#xff09;、嵌入式系統、Web 后端等領域。它語法簡潔、運行高效&#xff0c;非常適合作為編程入門語言或輔助開發工…