Kafka 重復消費與 API 冪等消費解決方案

Kafka 是一個高性能的分布式消息系統,但消費者重啟、偏移量(offset)未正確提交或網絡問題可能導致重復消費。API 冪等性設計則用于防止重復操作帶來的副作用。本文從 Kafka 重復消費和 API 冪等性兩個方面提供解決方案,重點深入探討 事務性偏移量管理 如何實現精確一次消費(exactly-once),并結合其他方法確保消息可靠性和一致性。

1. Kafka 重復消費問題

Kafka 的重復消費問題通常由以下原因引發:消費者異常退出導致偏移量未提交、網絡抖動、消費者組再平衡(rebalance)等。以下是解決重復消費的幾種方法,重點聚焦事務性偏移量管理。

1.1 啟用消費者冪等性

  • 手動提交偏移量

    • 設置 enable.auto.commit=false,在消息處理成功后手動提交偏移量(commitSynccommitAsync),確保消費與業務處理一致,減少重復消費風險。
    • commitSync:同步提交,阻塞直到 Broker 確認,適合高一致性場景,但可能降低吞吐量。
    • commitAsync:異步提交,非阻塞,適合高吞吐場景,但需通過回調(OffsetCommitCallback)監控提交失敗并重試,以避免偏移量丟失導致重復消費。
    • 示例:
      Properties props = new Properties();
      props.put("enable.auto.commit", "false");
      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
      consumer.subscribe(Arrays.asList("my-topic"));
      while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 處理消息processRecord(record);}consumer.commitSync(); // 同步提交偏移量
      }
      
  • 事務性消費(重點:事務性偏移量管理)

    • 核心原理:通過 Kafka 的事務機制,將消息生產、消費和偏移量提交綁定在一個原子操作中,確保消息只被處理一次(exactly-once)。這依賴于生產者事務(transactional.id)和消費者隔離級別(isolation.level=read_committed)。
    • 事務性偏移量管理的實現
      • 生產者事務:生產者配置 transactional.idenable.idempotence=true,通過 initTransactions()beginTransaction()commitTransaction() 等操作管理事務。生產者使用 sendOffsetsToTransaction() 將消費者偏移量納入事務,確保偏移量提交與消息寫入原子性一致。
      • 消費者隔離級別:消費者設置 isolation.level=read_committed,只讀取已提交的事務消息,未提交或回滾的消息對消費者不可見。
      • 偏移量存儲:消費者偏移量存儲在 Kafka 內部主題 __consumer_offsets 中,事務性提交通過生產者的事務機制記錄,確保偏移量與消息處理同步。
  • 代碼示例:

    public class TransUse {public static void main(String[] args) {Consumer<String, String> consumer = createConsumer();Producer<String, String> producer = createProduceer();// 初始化事務producer.initTransactions();while(true) {try {// 1. 開啟事務producer.beginTransaction();// 2. 定義Map結構,用于保存分區對應的offsetMap<TopicPartition, OffsetAndMetadata> offsetCommits = new HashMap<>();// 2. 拉取消息ConsumerRecords<String, String> records = consumer.poll(2000);for (ConsumerRecord<String, String> record : records) {// 3. 保存偏移量offsetCommits.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1));// 4. 進行轉換處理String[] fields = record.value().split(",");fields[1] = fields[1].equalsIgnoreCase("1") ? "男":"女";String message = fields[0] + "," + fields[1] + "," + fields[2];// 5. 生產消息到dwd_userproducer.send(new ProducerRecord<>("dwd_user", message));}// 6. 提交偏移量到事務producer.sendOffsetsToTransaction(offsetCommits, "ods_user");// 7. 提交事務producer.commitTransaction();} catch (Exception e) {// 8. 放棄事務producer.abortTransaction();}}}// 1. 創建消費者public static Consumer<String, String> createConsumer() {// 1. 創建Kafka消費者配置Properties props = new Properties();props.setProperty("bootstrap.servers", "node-1:9092");props.setProperty("group.id", "ods_user");props.put("isolation.level","read_committed");props.setProperty("enable.auto.commit", "false");props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 2. 創建Kafka消費者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 3. 訂閱要消費的主題consumer.subscribe(Arrays.asList("ods_user"));return consumer;}// 2. 創建生產者public static Producer<String, String> createProduceer() {// 1. 創建生產者配置Properties props = new Properties();props.put("bootstrap.servers", "node-1:9092");props.put("transactional.id", "dwd_user");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 2. 創建生產者Producer<String, String> producer = new KafkaProducer<>(props);return producer;}}
    
    • 深入理解事務性偏移量管理
      • 原子性:事務性偏移量提交將消息寫入、業務處理和偏移量提交綁定在一個事務中,確保三者要么全成功,要么全失敗。例如,若消費者處理消息后數據庫操作失敗,事務回滾,偏移量不會提交,消費者可重新消費。
      • 去重機制:Broker 根據 transactional.id 和序列號(Sequence Number)對生產者消息去重,防止重復寫入。消費者通過 read_committed 隔離級別避免讀取未提交消息。
      • 偏移量持久化:偏移量記錄在 __consumer_offsets 主題中,事務性提交通過事務協調器(Transaction Coordinator)管理,確保偏移量與消息一致。
      • 故障恢復:消費者重啟后,從 __consumer_offsets 中讀取最后提交的偏移量開始消費。由于事務性提交保證偏移量與消息處理一致,不會重復消費。
    • 適用場景
      • 金融系統:如支付、轉賬,確保每筆交易只處理一次。
      • 訂單處理:防止重復創建訂單。
      • 數據同步:確保數據從源到目標的精確一次傳遞。
    • 性能考量
      • 事務增加日志寫入和協調開銷,適合高一致性場景。
      • 建議保持事務范圍短,避免長時間占用資源。
    • 版本要求:Kafka 0.11.0+ 支持事務,推薦 2.0+ 版本以獲得更穩定的事務支持。

1.2 業務層去重

  • 方法:在消息中添加唯一標識(如消息ID、業務ID),消費者端通過數據庫(如 Redis、MySQL)或內存記錄已處理的消息ID,消費前檢查是否重復。
  • 數據庫表結構示例
    CREATE TABLE consumed_messages (message_id VARCHAR(64) PRIMARY KEY,consume_time TIMESTAMP
    );
    
    消費時查詢 message_id 是否存在,若存在則跳過。
  • Redis 實現
    if (redis.exists(messageId)) {return; // 跳過重復消息
    }
    // 處理消息
    processMessage(message);
    redis.set(messageId, "processed", EXPIRE_TIME_SECONDS);
    
  • 優勢:簡單易實現,適合無事務支持的舊版本 Kafka 或非嚴格 exactly-once 場景。
  • 局限:增加存儲和查詢開銷,需定期清理去重記錄。

1.3 偏移量管理

  • 可靠提交
    • 使用 commitSync() 確保偏移量提交成功,適合高一致性場景。
    • 使用 commitAsync() 提高吞吐量,但需通過回調監控失敗并重試:
      consumer.commitAsync((offsets, exception) -> {if (exception != null) {System.err.println("Commit failed: " + exception);// 重試或記錄日志}
      });
      
  • 外部存儲
    • 將偏移量存儲到外部系統(如 Redis、ZooKeeper),異常恢復時從外部讀取正確偏移量。
    • 示例(Redis):
      redis.set("consumer:group:offset", offset);
      
  • 注意:外部存儲需保證一致性,可能增加復雜度,事務性偏移量管理更推薦。

1.4 消費者組優化

  • 唯一消費者組ID:確保 group.id 唯一,避免多個消費者組重復消費同一分區。
  • 配置超時參數
    • session.timeout.ms:建議 10-20 秒(如 10000ms),避免消費者因網絡延遲被踢出組。
    • max.poll.interval.ms:建議 5-10 分鐘(如 300000ms),適應消息處理耗時,避免超時觸發再平衡。
    • 示例:
      props.put("session.timeout.ms", "10000");
      props.put("max.poll.interval.ms", "300000");
      
  • 監控再平衡:通過日志或 JMX 指標檢查再平衡頻率,優化參數以減少偏移量混亂。

2. API 冪等消費問題

API 冪等性確保多次調用同一 API 產生相同結果,防止重復操作的副作用。結合 Kafka,解決方法如下:

2.1 Kafka 生產者冪等性

  • 配置
    • 設置 enable.idempotence=true,Kafka 自動為消息分配序列號和分區標識,Broker 端去重。
    • 配置 retries=5acks=-1,確保消息可靠投遞:
      props.put("enable.idempotence", "true");
      props.put("retries", "5");
      props.put("acks", "all");
      
  • 作用:生產者重試不會導致消息重復寫入,Broker 根據序列號去重。

2.2 API 層冪等設計

  • 唯一請求ID
    • 為每個 API 請求生成唯一 ID(如 UUID),服務端用 Redis 或數據庫記錄已處理請求。
    • 示例(Redis):
      if (redis.exists(requestId)) {return cachedResult;
      }
      redis.set(requestId, result, EXPIRE_TIME_SECONDS);
      
  • 數據庫約束
    • 使用唯一約束(如訂單號)防止重復插入:
      CREATE TABLE orders (order_id VARCHAR(64) PRIMARY KEY,amount DECIMAL,create_time TIMESTAMP
      );
      
      插入時捕獲唯一約束異常并返回。

2.3 結合 Kafka 事務

  • 方法:使用事務性生產者(transactional.id),將 API 操作(如數據庫寫入)和消息發送綁定在同一事務中,確保原子性。
  • 示例
    producer.initTransactions();
    producer.beginTransaction();
    try {producer.send(new ProducerRecord<>("topic", message));db.save(order); // 數據庫操作producer.commitTransaction();
    } catch (Exception e) {producer.abortTransaction();throw e;
    }
    
  • 作用:事務失敗時,消息和數據庫操作均回滾,避免不一致。

3. 綜合建議

  • 短事務:盡量減少事務范圍(如僅包含必要操作),降低資源占用。
  • 分布式鎖:在分布式系統中,使用 Redis 或 ZooKeeper 實現鎖,防止并發重復處理。
  • 監控與日志:記錄消息ID、處理時間等日志,便于排查重復消費問題。
  • 超時與重試:設置合理超時(如 request.timeout.ms)和重試次數(如 retries),避免無限重試。

4. 注意事項

  • 性能與一致性權衡
    • Redis 適合高性能去重,數據庫適合強一致性場景。
    • 事務性機制增加開銷,適合高一致性需求場景(如金融、訂單)。
  • Kafka 版本:exactly-once 語義需 Kafka 0.11.0+,推薦 2.0+。
  • 清理去重記錄:設置 Redis 過期時間或定期清理數據庫記錄,避免存儲膨脹。
  • Broker 配置
    • min.insync.replicas=2:確保 acks=-1 的可靠性。
    • transaction.state.log.replication.factor=3:事務日志高可用。
    • num.partitions__consumer_offsets__transaction_state):建議 ≥50,提高并發性。

5. 深入理解事務性偏移量管理的優勢

  • 一致性:事務性偏移量提交確保消息處理、偏移量更新和外部操作(如數據庫寫入)原子性一致,消除了重復消費和消息丟失的風險。
  • 容錯性:消費者重啟后,從 __consumer_offsets 中讀取最后提交的偏移量,確保從正確位置繼續消費。
  • 可擴展性:事務機制支持分布式環境,生產者和消費者可跨節點協作,適合復雜系統。
  • Broker 支持
    • 事務協調器(Transaction Coordinator)管理事務狀態,存儲在 __transaction_state 主題。
    • Broker 去重機制(基于 transactional.id 和序列號)防止重復寫入。
  • 實現復雜度
    • 需要生產者和消費者協同配置(transactional.idisolation.level)。
    • 事務性偏移量提交通常由生產者通過 sendOffsetsToTransaction() 完成,消費者僅需確保 read_committed 和手動提交。

6. 總結

通過事務性偏移量管理,Kafka 結合生產者事務(transactional.idenable.idempotence=trueacks=-1)和消費者配置(isolation.level=read_committedenable.auto.commit=false),實現消息從生產到消費的精確一次語義。事務性偏移量提交將消息寫入、業務處理和偏移量更新綁定在一個原子事務中,確保不重復、不丟失。結合業務層去重、偏移量管理和消費者組優化,可進一步提升系統可靠性。Broker 端通過事務協調器和內部主題(__consumer_offsets__transaction_state)支持事務性機制,確保高一致性場景下的可靠投遞和消費。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/916925.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/916925.shtml
英文地址,請注明出處:http://en.pswp.cn/news/916925.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

win11推遲更新

1、按住WINR2、輸入以下命令&#xff1a;reg add "HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\WindowsUpdate\UX\Settings" /v FlightSettingsMaxPauseDays /t reg_dword /d 10000 /f3、點擊確定4、打開搜索框5、在搜索框里邊輸入更新&#xff0c;選擇檢查更新6、在暫停…

【uniapp】---- 使用 uniapp 實現視頻和圖片上傳且都可以預覽展示

1. 前言 接手得 uniapp 開發的微信小程序項目,新的開發需求是需要同時上傳圖片和視頻,但是之前的上傳都沒有進行封裝,都是每個頁面需要的時候單獨實現,現在新的需求,有多個地方都需要上傳圖片、視頻或語音等,這樣就需要封裝一個組件,然后發現部分地方使用了 uni-file-p…

(nice!!!) (LeetCode 每日一題) 2411. 按位或最大的最小子數組長度(位運算+滑動窗口)

2411. 按位或最大的最小子數組長度 思路&#xff1a;位運算滑動窗口&#xff0c;時間復雜度0(n*32)。 **遍歷每一個元素nums[i]&#xff0c;然后看能否改變它前面的元素nums[j]&#xff08; j<i &#xff09;&#xff0c; 當(nums[j]|nums[i])nums[j]時&#xff0c;說明當前…

算法競賽階段二-數據結構(36)數據結構雙向鏈表模擬實現

//#include<bits/stdc.h> #include<iostream> using namespace std; const int N1e510; //定義 int e[N],pre[N],ne[N],h,id; int mp[N]; //頭插 // 兵 y // x void push_front (int x) {id;e[id]x;mp[x]id;pre[id]h;ne[id]ne[h];//先修改新節點…

津發科技帶你了解皮膚電信號中的SCL與SCR

皮膚電&#xff08;Electrodermal Activity, EDA&#xff09;作為一種非常容易獲取的基本生理信號&#xff0c;可以很好地量化我們的情緒反應&#xff0c;被廣泛應用于情感識別研究中。它代表機體受到刺激時皮膚電傳導的變化。皮膚電反應作為交感神經系統功能的直接指標&#x…

spark的broadcast variables

在 Spark 中&#xff0c;廣播變量&#xff08;Broadcast Variables&#xff09; 是一種特殊類型的共享變量&#xff0c;用于高效地在集群中的所有節點間分發大型只讀數據集。它解決了 Spark 任務中頻繁傳輸重復數據的性能問題&#xff0c;特別適用于需要在多個任務中重用相同數…

Python爬蟲實戰:研究Haul庫相關技術構建電商數據采集與分析系統

1. 引言 1.1 研究背景與意義 隨著電子商務的迅速發展,電商平臺上的商品數據呈現爆炸式增長。這些數據蘊含著豐富的商業價值,如消費者行為分析、市場趨勢預測、競爭對手監測等。然而,如何從海量的電商數據中獲取有價值的信息,成為當前電商企業面臨的重要挑戰。 網絡爬蟲技…

Java:高頻面試知識分享1

一、Java 語言核心特性&#xff08;面向對象編程&#xff09;核心知識點梳理&#xff1a;面向對象三大特性&#xff1a;封裝&#xff1a;隱藏對象內部實現&#xff0c;通過 public 方法暴露接口&#xff08;例&#xff1a;類的 private 字段 get/set 方法&#xff09;。繼承&a…

MybatisPlus-核心功能

目錄 條件構造器 QueryWrapper UpdateWrapper LambdaQueryWrapper 自定義SQL 基本用法 多表關聯 Service接口 CRUD 基本用法 Lambda 批量新增 條件構造器 除了新增以外&#xff0c;修改、刪除、查詢的SQL語句都需要指定where條件。因此BaseMapper中提供的相關方法…

RHCE綜合項目:分布式LNMP私有博客服務部署

一、項目概述本次項目基于LNMP&#xff08;linux&#xff0c;nginx&#xff0c;mariadb&#xff0c;php&#xff09;搭建了一個私有的博客平臺&#xff0c;本篇博客詳細記錄了該博客平臺的服務部署全流程。在該項目中&#xff0c;使用了兩臺linux&#xff08;openeuler&#xf…

5種安全方法:如何刪除三星手機上的所有內容

隨著新的三星設備不斷推出&#xff0c;在出售或捐贈舊手機之前&#xff0c;徹底清除舊手機上的數據以保護隱私至關重要。許多人不知道的是&#xff0c;簡單的刪除操作并不能完全清除三星設備上的數據&#xff0c;被刪除的文件可能會處于不可見狀態。本文介紹了如何徹底刪除三星…

Vue 3 入門教程 2- Vue 組件基礎與模板語法

一、Vue 組件基礎在 Vue 中&#xff0c;組件是構建用戶界面的基本單位&#xff0c;它可以將頁面拆分成多個獨立、可復用的部分。一個 Vue 組件通常以 .vue 文件名結尾&#xff0c;包含三個核心部分&#xff1a;模板&#xff08;Template&#xff09;、腳本&#xff08;Script&a…

Linux 進程管理與計劃任務詳解

Linux 進程管理與計劃任務詳解 一、程序與進程的基本概念 程序&#xff1a;保存在外部存儲介質中的可執行機器代碼和數據的靜態集合&#xff0c;是靜態的文件實體進程&#xff1a;在 CPU 及內存中處于動態執行狀態的計算機程序&#xff0c;是程序的動態執行實例關聯關系&#x…

分層解耦(Controller,Service,Dao)

1. 三層架構核心職責層級職責說明關鍵技術 / 注解Controller&#xff08;控制器&#xff09;1. 接收前端請求&#xff08;HTTP&#xff09; 2. 封裝參數、校驗 3. 調用 Service 處理業務 4. 返回視圖 / 數據給前端Controller、GetMapping等Service&#xff08;業務層&#xff0…

鎂金屬接骨螺釘注冊檢測:骨科植入安全的科學基石

在骨科治療領域&#xff0c;鎂金屬接骨螺釘憑借其可降解性與生物相容性&#xff0c;成為傳統金屬植入物的革新替代方案。然而&#xff0c;作為Ⅲ類高風險無源植入器械&#xff08;分類編碼13-01-01&#xff09;&#xff0c;其注冊檢測需覆蓋生物相容性、化學表征、降解性能、力…

模具開發和管理系統(c#)

以前編寫的一個管理模具開發和進度的程序&#xff0c;可以跟蹤模具開發進度&#xff0c;可以查詢模具具體情況&#xff0c;也可以用水晶報表查詢。OS&#xff1a;microsoft windows IDE&#xff1a;microsoft visual studio programming language&#xff1a;C# DataBase&#…

【WRF-Chem 實例1】namelist.input 詳解- 模擬CO2

目錄 &time_control(時間控制) &physics(物理過程參數化方案) &fdda(四維數據同化) 工作機制簡述 &dynamics(WRF 動力核心的數值方法和選項) &bdy_control(邊界控制設置) &chem(WRF-Chem 主要化學設置) &namelist_quilt(并行 I/O 控制…

數據中心-時序數據庫InfluxDB

目錄 一、InfluxDB介紹 1.1 什么是InfluxDB&#xff1f; 1.2 應用場景 1.3 特點 1.4 版本差異 二、數據模型和存儲架構 2.1 相關概念 2.2 存儲架構 三、InfluxDB基礎操作 3.1 數據庫操作 3.2 數據表操作 顯示所有表 新建表 刪除表 3.3 數據保存策略 查看保存策…

webpack-高級配置

多入口文件 如何輸出多個html文件 輸入位置 需要寫兩個entryoutput位置也要改一下 加一個name避免重名 在生成html時 要根據每一個入口都寫一個插件 并且chunks要寫好 當前html引入哪些文件如何抽離壓縮css文件 安裝插件在rules里面添加插件plugins中添加css抽離代碼壓縮css抽離…

WinForm組件之Label 控件

Label 控件Label 控件是 WinForm 中最基礎、最常用的控件之一&#xff0c;主要用于在界面上顯示文本信息&#xff0c;通常作為說明、提示或標題&#xff0c;不直接接受用戶輸入。它是構建用戶界面的基礎組件&#xff0c;在引導用戶操作、展示狀態信息等方面發揮重要作用。Label…