一、核心機制:PID與序列號
1.?Producer ID (PID)
- 唯一標識:每個生產者實例啟動時,由Kafka Broker分配一個全局唯一的PID,用于標識消息來源。
- 持久化存儲:PID由Broker持久化保存,確保生產者重啟后仍能追蹤歷史狀態(但跨會話時PID會變更)。
2.?序列號 (Sequence Number)
- 分區級遞增:生產者為每個分區維護一個單調遞增的序列號,從0開始。
- 消息附加:每條消息發送時,附帶當前分區的序列號。
- Broker驗證:Broker為每個
<PID, Partition>
對記錄最后接收的序列號,新消息的序列號必須滿足:- 等于預期值:
SN_new = SN_old + 1
?→ 接受并更新序列號。 - 小于預期值:
SN_new < SN_old + 1
?→ 視為重復消息,丟棄。 - 大于預期值:
SN_new > SN_old + 1
?→ 視為亂序或丟失,觸發異常。
二、分區級別冪等性實現
1.?單分區內的唯一性保證
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-cluster:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // 啟用冪等性
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 默認值,確保消息可靠存儲
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); // 無限重試
- 機制:通過PID和序列號,確保同一生產者實例向同一分區發送的消息不重復。
- 限制:
- 跨分區無效:同一生產者向不同分區發送的消息可能重復。
- 跨會話無效:生產者重啟后PID變更,跨會話消息無法保證冪等性。
2.?Broker端去重緩存
- 緩存結構:Broker維護最近接收的
<PID, SequenceNumber>
映射,緩存最近5個批次的消息(固定大小,不可配置)。 - 驗證流程:
- 接收消息后,檢查PID和序列號是否存在于緩存。
- 若存在且序列號連續,接受消息并更新緩存。
- 若序列號不連續或重復,丟棄消息。
三、配置與啟用
1.?生產者配置
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-cluster:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // 啟用冪等性
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 默認值,確保消息可靠存儲
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); // 無限重試
2.?默認行為
- 啟用冪等性后,
acks
自動設為all
,確保所有副本確認后再返回成功。 - 重試機制默認啟用,避免因網絡問題導致消息丟失。
四、限制與擴展
1.?單會話限制
- PID變更:生產者重啟后,PID變更,跨會話消息無法保證冪等性。
- 解決方案:結合事務機制(
transactional.id
)實現跨會話的精確一次語義。
2.?事務擴展
3.?消費者端處理
- 去重需求:消費者需自行處理重復消息,例如:
- 數據庫唯一約束:在消息處理時添加業務唯一鍵(如訂單ID)。
- 業務邏輯去重:通過狀態檢查避免重復操作。
五、性能與調優
1.?性能影響
- Broker端開銷:維護PID和序列號緩存增加內存消耗,但通過固定緩存大小(5個批次)平衡性能與空間。
- 客戶端優化:
- 增大
batch.size
和linger.ms
,減少網絡請求次數。 - 調整
max.in.flight.requests.per.connection
(默認5)以控制并發請求。
2.?高并發優化
- Broker配置:
- 增加
num.io.threads
和queued.max.requests
,提升處理能力。
- 架構優化:動態均衡分區熱點,避免單分區過載。
六、總結
- 核心原理:通過PID和序列號在分區級別實現消息唯一性,確保同一生產者會話內消息不重復。
- 適用場景:單分區消息去重,結合事務可擴展至跨分區和跨會話。
- 消費者責任:需額外處理重復消息,依賴業務邏輯或外部機制(如數據庫唯一約束)。