一 kafka事務介紹
1.1 Kafka事務的作用
Exactly-Once Semantics (EOS):在“消費 → 處理 → 生產”的流式鏈路里避免重復寫與重復讀帶來的副作用,確保“處理一次且僅一次”的可見效果。
跨分區 / 跨 Topic 原子性:將一次處理內寫入的多分區多主題消息,以及本次消費位點 offset 的提交,綁定在同一個事務里,要么都生效,要么都回滾。
1.2 相關術語
PID / Producer ID、Epoch、Sequence Number:冪等生產者元數據,避免重復寫。
事務協調器(Transaction Coordinator):位于 broker 側的協調者,管理事務狀態機與兩階段提交。
控制批次(Control Batch / Control Records):日志里的特殊記錄,用于標記事務,主要是?COMMIT / ABORT(注意:數據分區不寫“BEGIN”標記)。
LSO(Last Stable Offset) 與 HW(High Watermark):對
read_committed
消費者只暴露到 LSO,屏蔽未決事務。__transaction_state
:kafka內部主題,用于持久化事務狀態機。__consumer_offsets
:kafka內部主題,存消費組位點;位點也可以被納入事務。僵尸實例:一個舊的 Producer 實例(帶著同樣的
transactional.id
)在崩潰或網絡分區后掛掉了,但它可能在恢復后繼續嘗試往 Kafka 寫數據,但是與此同時,已經有一個新的 Producer 實例已經起來并接管了同樣的transactional.id,我們把這個宕機后又恢復的producer叫做僵尸實例
1.3?消費者隔離級別
消費者的隔離級別有下面兩種
read_uncommitted
(默認):可讀到未提交和已提交數據。read_committed
:只讀取已提交事務的數據(EOS 流水線應使用)。
假設想要配置消費者隔離級別為read_committed,可通過下面配置完成
props.put("isolation.level", "read_committed");
二、使用 Kafka 事務
2.1 生產者端配置
Properties props = new Properties();
// broker地址
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
// transactional.id 必須唯一且穩定(可復用)
props.put("transactional.id", "order-service-txn-1");
// 配了 transactional.id 會自動開啟,但是最好還是顯式配置
props.put("enable.idempotence", "true");
/**
然后通常由客戶端自動/隱式設置為適配冪等語義:
要求acks=all、retries>0 max.in.flight.requests.per.connection<=5 等,
不配置就會取默認的值,比如retries = Integer.MAX_VALUE
*/
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 找到協調器、申請 PID/epoch、登記事務狀態
producer.initTransactions();
2.2 事務性生產
// 開啟事務
producer.beginTransaction();// 發送消息
producer.send(new ProducerRecord<>("demo-topic", "key1", "message-1"));
producer.send(new ProducerRecord<>("demo-topic", "key2", "message-2"));
producer.send(new ProducerRecord<>("demo-topic2", "key3", "message-3"));// 提交事務
producer.commitTransaction();
這樣,對于配置了read_committed的消費者而言,要么這三個消息同時可見,要么同時不可見。
2.3?實踐建議
使用穩定且可復用的
transactional.id,這樣
服務重啟后就可恢復事務上下文,還能對“僵尸實例”做圍欄。事務應盡可能短小且頻繁提交,避免長時間占用導致 LSO 卡住,增加讀延遲。
失敗重試要以事務回滾為界,確保回滾后可安全重放。
EOS 只覆蓋 Kafka 內部的原子性;涉及外部系統,則需要額外使用?Outbox/Saga 等模式。
三 kafka事務的實現
3.1?關鍵組件
事務生產者:發數據、報告參與分區、發起事務結束(提交/回滾)。
消費組協調器(Group Coordinator):當offset被納入事務時,消費組協調器需要把最新offset發送到專門存儲offset的內部主題
__consumer_offsets中,
?所以消費者協調器和__consumer_offsets里的對應分區
也是事務參與者。事務協調器(Transaction Coordinator):負責給生產者分配 Producer ID/epoch(每個
transactional.id
對應一個PID),維護事務狀態機,持久化事務日志,并且當事務結束時(commit 或 abort),事務協調器 會把這個事務的結果(commit/abort 標記)廣播到所有該事務涉及的分區)。數據分區所在的 Broker Leader:接受數據與控制批次寫入,維護 High Watermark/Last Stable Offset與中止事務索引。
消費者:根據隔離級別獲取數據,包括
read_committed
?和read_uncommitted
?,依賴隔離級別和?abortedTransactions
過濾。
3.2 事務實現流程
下圖是kafka事務消息的總體流程圖
3.2.1 冪等生產者
協調器為每個
transactional.id
分配 PID 與 epoch。生產者對每個分區維護單調遞增的序列號;Broker 端以
(PID, epoch, seq)
去重,避免“重復寫”。若同一
transactional.id
的新實例啟動并initTransactions()
,協調器會提升 epoch 并圍欄舊實例;舊實例寫入不會成功并且得到INVALID_PRODUCER_EPOCH
/ProducerFencedException
。
3.2.2 事務狀態機與內部日志
事務協調器將每個事務的狀態持久化到
__transaction_state
:EMPTY/ONGOING → PREPARE_COMMIT | PREPARE_ABORT → COMPLETE_COMMIT | COMPLETE_ABORT
事務涉及到的分區集合(數據分區與
__consumer_offsets
的目標分區)由生產者在首次寫入/首次提交位點時通過AddPartitionsToTxn
/AddOffsetsToTxn
報告給協調器并持久化。
3.2.3? 兩階段提交(2PC)
與傳統數據庫不同的是,數據分區里只寫“結束標記”——COMMIT 或 ABORT 的控制批次;不寫 BEGIN。BEGIN 只體現在協調器的內部狀態機與日志。信息會包含自己所屬的事務producer。
階段 A:事務進行中(ONGOING)
beginTransaction()
后,生產者向多個分區寫入消息(每條攜帶 PID/epoch/seq)。如首次寫入某分區,生產者會先向協調器請求?
AddPartitionsToTxn
,協調器會記錄“本事務涉及到這個分區”。
階段 B:準備提交(PREPARE_COMMIT)/ 準備回滾(PREPARE_ABORT)
生產者調用
commitTransaction()
(或abortTransaction()
),就會發送EndTxn請求
給協調器。協調器把事務狀態改為
PREPARE_COMMIT
(或PREPARE_ABORT
)并寫入kafka內內部主題?__transaction_state
。扇出:協調器向所有涉及分區的 leader 發起WriteTxnMarkers請求。
階段 C:各分區落盤控制記錄 + 反饋
在收到事務協調器的WriteTxnMarkers
請求后,
各分區在自己的日志里追加一個“控制批次(Control Batch)”,類型為 COMMIT 或 ABORT。注意kafka沒有“BEGIN”控制批次,BEGIN 信息由協調器掌分區 leader 追加成功后應答協調器。
當所有目標分區都落成控制批次,協調器將事務狀態置為
COMPLETE_COMMIT
(或COMPLETE_ABORT
),并更新__transaction_state
。
3.3?可見性控制
HW(High Watermark):副本多數派確認的最高位移。
read_uncommitted
可讀到 HW。LSO(Last Stable Offset):保證其之前沒有“未決事務”的最末位移。
對read_committed
,Broker 只返回 ≤ LSO 的數據,從源頭屏蔽未提交事務。為何消費者還能拿到“已中止事務”的數據片段?
為性能考慮,Broker 可能仍返回包含已中止事務記錄的批次,但會攜帶一個
abortedTransactions 列表(含producerId
與firstOffset
)。客戶端在解碼時跳過這些記錄。事務索引(.txnindex):每個日志段都有一個中止事務索引,Broker 用它在 Fetch 時快速收集
abortedTransactions
列表。
小結:在
read_committed
下,消費者不用“暫存不確定狀態數據”去等控制標記;Broker 通過 LSO 保證不給你發“未決事務”的記錄。客戶端只需在已決事務里過濾 ABORT 記錄(根據abortedTransactions
)。
3.4?消費-處理-生產 模式中消費offset與輸出的原子綁定
sendOffsetsToTransaction(offsets, groupMetadata)
背后做了兩件事,
1?AddOffsetsToTxn
告訴事務協調器:這次事務會提交哪個消費組的位點
2?TxnOffsetCommit
把位點寫入 __consumer_offsets
對應分區
在最終 COMMIT(或 ABORT)時,__consumer_offsets
分區也會收到相應的 COMMIT/ABORT 控制批次,從而與輸出數據一并原子生效(或放棄)。
3.5?常見故障的處理
3.5.1 失敗與恢復
如果某些分區暫不可用,協調器會持續重試
WriteTxnMarkers
(最終一致的 2PC)。事務超時(由客戶端
transaction.timeout.ms
申請,受 broker 上限約束)協調器主動 ABORT 并下發 ABORT 標記。協調器宕機可通過
__transaction_state
重放恢復事務狀態并繼續扇出事務標記。在事物未提交之前,配置了
read_committed的消費者
不會看到未決事務。
3.5.2 應對僵尸實例
Kafka 引入了 Producer Epoch,通過圍欄機制來隔離僵尸實例。每個 Producer 在第一次用某個
transactional.id
初始化事務時,Kafka 的 Transaction Coordinator 會給它分配一個 producerId 和 producerEpoch。當相同transactional.id
的新實例啟動時,Coordinator 會給它分配 更高的 epoch,并更新元數據。就這樣,新實例可以用高 epoch 寫數據,而舊實例(僵尸)帶著低 epoch 再寫數據時,Broker 會直接拒絕。
四 運維和調優要點
事務大小與超時
客戶端的
transaction.timeout.ms
受 Broker 端上限約束(如transaction.max.timeout.ms
)。事務過大或時間過長,會拖慢 LSO 前進,導致
read_committed
消費延遲升高
圍欄與異常
ProducerFencedException
/INVALID_PRODUCER_EPOCH
:同一transactional.id
新實例已接管;舊實例必須停止。TransactionAbortedException
:本事務已被中止;需要清理/重啟事務。
副本與可靠性
冪等/EOS 通常要求
acks=all
與合適的min.insync.replicas
。避免不干凈選主導致重寫。
重要監控指標
生產端:
transactional.commit.latency.avg
、transactional.abort.rate
、record-errors
/retries
。Broker:
transaction-coordinator-metrics
(扇出延遲、超時/中止率)、replica-fetcher-metrics
。消費端:
records-lag-max
(在read_committed
下對 LSO 滯后敏感)。
主題壓縮與控制記錄
控制批次(COMMIT/ABORT)是特殊記錄,日志清理/壓縮會保留其必要語義,確保歷史可正確回放。
邊界與限制
事務只在同一 Kafka 集群內跨 Topic/分區原子;不跨外部系統。
超大事務(大量分區/消息)會放大標記扇出成本與恢復時間。
五 Kafka Streams 中的事務
processing.guarantee=exactly_once_v2
/exactly_once
:Streams 在內部為每個任務(Task)維護事務性生產者,把處理結果與位點綁定到同一事務中;重平衡時靠 epoch 圍欄防止舊實例寫入。