1 事務簡介
Kafka事務是Apache Kafka在流處理場景中實現Exactly-Once語義的核心機制。它允許生產者在跨多個分區和主題的操作中,以原子性(Atomicity)的方式提交或回滾消息,確保數據處理的最終一致性。例如,在流處理中,消費者讀取消息后處理并生成新消息,若處理失敗,事務可確保原始消息的消費偏移與新消息的發送同時回滾,避免數據不一致。
事務的核心作用:
原子性: 跨分區的寫操作要么全部成功,要么全部失敗。
隔離性: 事務未提交時,消息對消費者不可見(通過isolation.level=read_committed配置實現)。
持久性: 事務狀態持久化至內部Topic __transaction_state,支持故障恢復。
2 事務原理詳解
了解即可
kafka學習筆記(四、生產者、消費者(客戶端)深入研究(三)——事務詳解及代碼實例)
3 示例
from confluent_kafka import Producer, KafkaException
import time# 配置生產者
conf = {'bootstrap.servers': 'localhost:9092','transactional.id': 'my-transactional-id', # 唯一事務ID'enable.idempotence': True, # 啟用冪等性'acks': 'all', # 必須為all'retries': 5, # 必須啟用重試'debug': 'txn' # 開啟事務調試日志(可選)
}# 創建事務生產者
producer = Producer(conf)try:# 初始化事務(必須調用!)producer.init_transactions()# 開始事務producer.begin_transaction()try:# 發送消息(事務內)producer.produce(topic='my_topic',value=b'Message 1',key=b'key1')producer.produce(topic='my_topic',value=b'Message 2',key=b'key2')# 模擬業務邏輯(如數據庫操作)# ...# 提交事務(消息對消費者可見)producer.commit_transaction()print("Transaction committed.")except Exception as e:# 回滾事務(丟棄未提交的消息)producer.abort_transaction()print(f"Transaction aborted: {e}")except KafkaException as e:print(f"Failed to initialize transactions: {e}")finally:# 關閉生產者producer.flush(timeout=10)