1 自動提交
1.1 原理:
Kafka 消費者后臺線程每隔 auto.commit.interval.ms 自動提交最近一次 poll() 的 offset
無需開發者干預
1.2 示例:
enable.auto.commit=true
auto.commit.interval.ms=5000 # 每 5 秒自動提交一次
from confluent_kafka import Consumer, KafkaError
import sys# 配置消費者
conf = {'bootstrap.servers': 'localhost:9092','group.id': 'mygroup','auto.offset.reset': 'earliest','enable.auto.commit': True, # 自動提交'auto.commit.interval.ms': 5000 # 每個5s自動提交一次
}consumer = Consumer(conf)
consumer.subscribe(['my_topic'])while True:msg = consumer.poll(1.0)if msg is None:continueif msg.error():if msg.error().code() == KafkaError._PARTITION_EOF:continueprint(f"Error: {msg.error()}", file=sys.stderr)breaktry:# consumer.commit(msg) 不需要這行,這是手動提交時需要用的# 業務處理邏輯print(f"處理消息: {msg.value().decode('utf-8')}")except KeyboardInterrupt:print("中斷信號已接收")finally:consumer.close()
2 手動提交
2.1 至多一次
2.1.1 原理
消息處理后立即提交偏移量;
即使處理失敗也不會重試;
適合對消息丟失容忍度高的場景(如日志采集)。
2.1.2 示例:
enable.auto.commit=False:禁用自動提交偏移量
手動調用consumer.commit(msg)提交偏移量
auto.offset.reset=‘earliest’:從最早消息開始消費
from confluent_kafka import Consumer, KafkaError
import sys# 配置消費者
conf = {'bootstrap.servers': 'localhost:9092','group.id': 'mygroup','auto.offset.reset': 'earliest','enable.auto.commit': False
}consumer = Consumer(conf)
consumer.subscribe(['my_topic'])while True:msg = consumer.poll(1.0)if msg is None:continueif msg.error():if msg.error().code() == KafkaError._PARTITION_EOF:continueprint(f"Error: {msg.error()}", file=sys.stderr)breaktry:# 手動提交偏移量(最多一次核心)consumer.commit(msg)print(f"已提交偏移量: {msg.offset()}")# 業務處理邏輯print(f"處理消息: {msg.value().decode('utf-8')}")except KeyboardInterrupt:print("中斷信號已接收")finally:consumer.close()
2.2 最少一次
2.2.1 原理
通過重試機制+手動提交偏移量實現:
- 消息處理失敗時自動重試(最多3次)
- 成功處理后批量提交偏移量
- 延長輪詢間隔避免重平衡
2.2.2 示例
該示例是批量處理消息,批量提交;當然也可以一次處理一條消息,并一次提交一條消息偏移量
from confluent_kafka import Consumer, KafkaError, TopicPartition
import time
import sys# 配置消費者
conf = {'bootstrap.servers': 'localhost:9092','group.id': 'my_group','auto.offset.reset': 'earliest','enable.auto.commit': False, # 手動提交控制'max.poll.interval.ms': 300000, # 延長輪詢間隔'session.timeout.ms': 10000,'heartbeat.interval.ms': 3000
}consumer = Consumer(conf)
consumer.subscribe(['my_topic'])def process_with_retry(msg):"""帶重試的消息處理"""for attempt in range(3):try:# 替換為實際業務邏輯print(f"處理消息: {msg.value().decode('utf-8')}")return Trueexcept Exception:time.sleep(1) # 指數退避可在此實現return Falsetry:while True:# 批量拉取10條消息msgs = consumer.consume(num_messages=10, timeout=1.0)processed = []for msg in msgs:if msg.error():continue# 處理消息(帶重試)if process_with_retry(msg):processed.append(TopicPartition(msg.topic(), msg.partition(), msg.offset()))# 批量提交偏移量if processed:consumer.commit(offsets=processed)print(f"已提交偏移量: {[p.offset for p in processed]}")except KeyboardInterrupt:pass
finally:consumer.close()
補充:延長輪詢間隔避免重平衡
核心概念解析:
- 輪詢間隔:指Kafka消費者兩次調用poll()方法拉取消息的時間間隔,由max.poll.interval.ms參數控制(默認5分鐘)。
- 重平衡(Rebalance):當消費者組成員變動、主題/分區變化或心跳超時時,Kafka會觸發分區重新分配,導致消費者暫停消費。
為什么延長輪詢間隔能避免重平衡?
- 防止誤判消費者宕機
- Kafka通過session.timeout.ms(默認10秒)和heartbeat.interval.ms(默認3秒)檢測消費者存活。
- 若消息處理耗時超過max.poll.interval.ms(默認5分鐘),Kafka會認為消費者已宕機,觸發重平衡。
- 延長輪詢間隔(如設為10分鐘)可允許更長的消息處理時間,避免因業務邏輯耗時過長導致的誤判重平衡。
- 避免頻繁重平衡的連鎖反應
- 重平衡期間消費者暫停消費,導致消息堆積和延遲。
- 頻繁重平衡(如每5分鐘觸發一次)會顯著降低吞吐量,延長端到端延遲。
關鍵參數配置建議:
參數 | 默認值 | 推薦值 | 作用 |
---|---|---|---|
max.poll.interval.ms | 5分鐘 | 根據業務處理時間調整(如10-30分鐘) | 控制兩次poll的最大間隔,防止處理超時觸發重平衡 |
session.timeout.ms | 10秒 | 30秒-1分鐘 | 心跳超時時間,需大于heartbeat.interval.ms |
heartbeat.interval.ms | 3秒 | 2-5秒 | 心跳發送頻率,建議設為session.timeout.ms的1/3 |
2.3 精確一次
2.3.1 冪等性消費 (Idempotent Consumption) - 推薦且最常用
思路:承認消息可能會被重復傳遞,但從業務邏輯上保證重復執行不會產生負面效果。
做法:在消費者的處理邏輯中,設計冪等性。例如:
為每條消息生成一個唯一 ID(可以是消息key,或自定義UUID)。
在處理前,先檢查這個 ID 是否已經被處理過(比如在數據庫里查一下)。
如果已處理,就直接跳過并提交位移(視為成功);如果未處理,則執行業務邏輯。
這是最有效、最通用的方法,因為它不依賴于任何特定技術,而是從業務設計上根本性地解決問題。
例如:
a) 對于流程中的消息,每條消息中包含唯一id,比如業務id,在數據庫中將業務id作為Unique key,插入重復時會報duplicate key異常,不會導致數據庫中出現臟數據
b) Redis中使用set存儲業務id,天然冪等性
c) 如果不是上面兩個場景,需要讓生產者發送每條數據的時候,里面加一個全局唯一的 id,然后你這里消費到了之后,先根據這個 id 去比如 Redis 里查一下消費過嗎?如果沒有消費過,就執行相應業務進行處理,然后這個 id 寫 Redis,最后提交偏移。如果消費過了,那如果消費過了,那就別處理了,保證不重復處理相同的消息即可
2.3.2 事務性輸出 (Transactional Output) / 兩階段提交 (2PC) - 復雜且受限
思路:將消費者的“業務處理”和“位移提交”綁定為一個分布式事務。
做法:例如,使用 Kafka 的事務性生產者,將處理結果和位移提交到外部系統(如另一個Kafka主題)的操作放在一個事務里。但這通常需要外部系統(如數據庫)也支持參與 Kafka 事務(通過 Kafka Connect),實現復雜度非常高,性能和可用性也會受影響。不推薦普通應用使用。