詳解Kafka通過冪等性實現分區消息不重復的機制

一、核心機制:PID與序列號

1.?Producer ID (PID)

  • 唯一標識:每個生產者實例啟動時,由Kafka Broker分配一個全局唯一的PID,用于標識消息來源。
  • 持久化存儲:PID由Broker持久化保存,確保生產者重啟后仍能追蹤歷史狀態(但跨會話時PID會變更)。

2.?序列號 (Sequence Number)

  • 分區級遞增:生產者為每個分區維護一個單調遞增的序列號,從0開始。
  • 消息附加:每條消息發送時,附帶當前分區的序列號。
  • Broker驗證:Broker為每個<PID, Partition>對記錄最后接收的序列號,新消息的序列號必須滿足:
    • 等于預期值SN_new = SN_old + 1?→ 接受并更新序列號。
    • 小于預期值SN_new < SN_old + 1?→ 視為重復消息,丟棄。
    • 大于預期值SN_new > SN_old + 1?→ 視為亂序或丟失,觸發異常。

二、分區級別冪等性實現

1.?單分區內的唯一性保證

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-cluster:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // 啟用冪等性
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 默認值,確保消息可靠存儲
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); // 無限重試
  • 機制:通過PID和序列號,確保同一生產者實例向同一分區發送的消息不重復。
  • 限制
    • 跨分區無效:同一生產者向不同分區發送的消息可能重復。
    • 跨會話無效:生產者重啟后PID變更,跨會話消息無法保證冪等性。

2.?Broker端去重緩存

  • 緩存結構:Broker維護最近接收的<PID, SequenceNumber>映射,緩存最近5個批次的消息(固定大小,不可配置)。
  • 驗證流程
    1. 接收消息后,檢查PID和序列號是否存在于緩存。
    2. 若存在且序列號連續,接受消息并更新緩存。
    3. 若序列號不連續或重復,丟棄消息。

三、配置與啟用

1.?生產者配置

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-cluster:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // 啟用冪等性
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 默認值,確保消息可靠存儲
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); // 無限重試

2.?默認行為

  • 啟用冪等性后,acks自動設為all,確保所有副本確認后再返回成功。
  • 重試機制默認啟用,避免因網絡問題導致消息丟失。

四、限制與擴展

1.?單會話限制

  • PID變更:生產者重啟后,PID變更,跨會話消息無法保證冪等性。
  • 解決方案:結合事務機制(transactional.id)實現跨會話的精確一次語義。

2.?事務擴展

  • 事務ID:通過配置transactional.id,將生產者ID與事務關聯,確保跨分區和跨會話的原子性。
  • 配置示例
    props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "pay-service-1");
    producer.initTransactions();
    try {producer.beginTransaction();producer.send(record1);producer.send(record2);producer.commitTransaction();
    } catch (Exception e) {producer.abortTransaction();
    }

3.?消費者端處理

  • 去重需求:消費者需自行處理重復消息,例如:
    • 數據庫唯一約束:在消息處理時添加業務唯一鍵(如訂單ID)。
    • 業務邏輯去重:通過狀態檢查避免重復操作。

五、性能與調優

1.?性能影響

  • Broker端開銷:維護PID和序列號緩存增加內存消耗,但通過固定緩存大小(5個批次)平衡性能與空間。
  • 客戶端優化
    • 增大batch.sizelinger.ms,減少網絡請求次數。
    • 調整max.in.flight.requests.per.connection(默認5)以控制并發請求。

2.?高并發優化

  • Broker配置
    • 增加num.io.threadsqueued.max.requests,提升處理能力。
  • 架構優化:動態均衡分區熱點,避免單分區過載。

六、總結

  • 核心原理:通過PID和序列號在分區級別實現消息唯一性,確保同一生產者會話內消息不重復。
  • 適用場景:單分區消息去重,結合事務可擴展至跨分區和跨會話。
  • 消費者責任:需額外處理重復消息,依賴業務邏輯或外部機制(如數據庫唯一約束)。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/88509.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/88509.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/88509.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

壓縮包方式在centos7版本上安裝mysql8.0

使用tar命令解壓 tar -zxvf mysql-8.0.32-el7-x86_64.tar.gz -C /usr/local/到/usr/local/修改解壓后的文件名為mysql 創建mysql用戶組和用戶&#xff0c;自己在mysql下面創建data目錄存儲信息&#xff0c;把權限交給mysql這個用戶 groupadd mysql useradd -r -g mysql mysql c…

使用ansible給被管理節點安裝docker

在跳板機上安裝ansible,再通過ansible的playbook,給被管理節點安裝docker。 跳板機配置 實驗環境 華為云上按需開兩臺2核2G的Ubuntu的ECS&#xff1b;2臺公網IP為5Mbit/s&#xff0c;按需按流量&#xff1b;2臺服務器在一個子網內;跳板機和被管理節點主機分別掛不通的安全組 在…

《Java EE與中間件》實驗三 基于Spring Boot框架的購物車

目 錄 一、實驗目的和要求 二、實驗實現思路及步驟 1、實驗思路 2、實驗步驟 3、實驗方案 三、主要開發工具 四、實驗效果及實現代碼 1、購物車數據庫構建實現 &#xff08;1&#xff09;建立javaee-project數據庫 &#xff08;2&#xff09;建立t_cart數據表 &…

DAS3D: Dual-modality Anomaly Synthesis for 3D Anomaly Detection 論文精讀

題目&#xff1a;DAS3D: Dual-modality Anomaly Synthesis for 3D Anomaly Detection 題目&#xff1a;DAS3D&#xff1a;用于三維異常檢測的雙模態異常合成 論文地址&#xff1a;ECCVW 2024 2410 Dual-modality 雙模態 Anomaly Synthesis 異常合成 for 3D Anomaly Detection…

EasyCVR視頻匯聚平臺國標接入設備TCP主動播放失敗排查指南

部分客戶現場的下級平臺通過國標級聯接入安防監控系統EasyCVR后&#xff0c;只能通過TCP主動的播放方式進行播放&#xff08;并不是所有下級平臺都支持tcp主動播放&#xff0c;模式需下級平臺支持&#xff09;&#xff0c;但是有些平臺剛接入的時候發現不能播放。核心原因分析&…

linux打包指令和移動指令

在Linux中&#xff0c;常用的文件夾打包命令是 tar&#xff0c;它可以將文件夾壓縮打包成 .tar、.tar.gz、.tar.bz2 等格式的文件。以下是具體用法&#xff1a; 1. 基礎打包&#xff08;不壓縮&#xff0c;生成 .tar 文件&#xff09; 將文件夾 folder 打包為 folder.tar&#…

神經符號AI:結合深度學習和符號邏輯的下一代AI

神經符號AI&#xff1a;結合深度學習和符號邏輯的下一代AI當AI醫生解釋診斷時&#xff0c;它不僅能指出醫學影像中的異常像素模式&#xff0c;還能引用臨床指南中的第三條第二款&#xff0c;推演病理發展的邏輯鏈條——這正是神經符號AI賦予機器的“理性之光”。2025年初&#…

SpringBoot JWT

jsonwebtoken 引依賴 <dependency><groupId>io.jsonwebtoken</groupId><artifactId>jjwt</artifactId><version>0.12.3</version></dependency> 測試一下&#xff0c;jwt是2個帶逗號的3段字符串 官網參考&#xff1a;JSON …

讀取QPS 10萬,寫入QPS 1000,如何設計系統架構?

你是否也曾深陷在臃腫的領域模型&#xff08;Domain Model&#xff09;的泥潭&#xff0c;一個 User 或 Order 實體類&#xff0c;既要處理復雜的業務邏輯和數據校驗&#xff0c;又要承載各種為前端展示而生的DTO轉換&#xff0c;導致模型越來越胖&#xff0c;讀寫性能相互掣肘…

UE5 Rotate 3 Axis In One Material

首先沒有用旋轉矩陣&#xff0c;我用過旋轉矩陣&#xff0c;傳進去的角度旋轉的角度和歐拉角傳進去角度旋轉出來的不一樣&#xff0c;就沒有用最后用的RotateAboutAxis&#xff0c;這個玩意兒研究老半天&#xff0c;只能轉一個軸&#xff0c;角度和歐拉角的一樣的最后研究出Rot…

計算機網絡實驗——訪問H3C網絡設備

一、實驗目的1. 熟悉H3C路由器的開機界面&#xff1b;2. 通過Console端口實現對上電的H3C路由器的第一次本地訪問&#xff1b;3. 掌握H3C設備命名等幾個常用指令&#xff1b;4. 掌握如何將H3C設備配置為Telnet服務器&#xff1b;5. 掌握如何將H3C設備配置為Telnet客戶端并實現訪…

【C語言】學習過程教訓與經驗雜談:思想準備、知識回顧(四)

&#x1f525;個人主頁&#xff1a;艾莉絲努力練劍 ?專欄傳送門&#xff1a;《C語言》、《數據結構與算法》、C語言刷題12天IO強訓、LeetCode代碼強化刷題 &#x1f349;學習方向&#xff1a;C/C方向 ??人生格言&#xff1a;為天地立心&#xff0c;為生民立命&#xff0c;為…

Vim 指令

Vim 是一款功能強大但學習曲線陡峭的文本編輯器&#xff0c;核心在于其模式化操作。掌握常用指令能極大提升效率。以下是指令分類整理&#xff1a;一、核心模式切換 (必須掌握&#xff01;)i&#xff1a;在光標前進入 插入模式 (Insert Mode)a&#xff1a;在光標后進入 插入模式…

vue2中使用xgplayer播放流視頻

1、官網 2、安裝后無法播放時&#xff0c;經測試&#xff0c;需要降低版本 "xgplayer-hls": "2.2.2","xgplayer": "2.31.6"改為以上版本可以正常播放 3、完整使用 &#xff08;1&#xff09;引入 import xgplayer import hlsjsPlayer…

Jmeter進階篇(35)完美解決Jmeter轉換HTML報告報錯“Begin size 0 is not equal to fixed size 5”

今天博主在使用Jmeter運行完壓測,使用生成的csv文件,運行以下命令: C:\apache-jmeter-5.2.1\bin>jmeter -g C:\res.csv -o C:\report生成HTML報告時,發現報錯“Begin size 0 is not equal to fixed size 5”。 問題原因 原因是我:本地用的是JDK17,但Jmeter5.2.1僅支…

linux中tcpdump抓包中有組播數據,應用程序收不到數據問題

問題描述服務器運行正常&#xff0c;維保需要&#xff0c;重啟服務器后應用程序無法收到組播的媒體數據。百思不得其解。原因分析最終的定位原因是 linux系統的自我保護機制導致的。rp_filter&#xff08;反向路徑過濾&#xff09;是Linux內核的一個安全特性&#xff0c;用于防…

人工智能-基礎篇-29-什么是低代碼平臺?

低代碼平臺&#xff08;Low-Code Development Platform, LCDP&#xff09;是一種通過可視化界面和少量代碼&#xff08;或無需代碼&#xff09;快速構建應用程序的開發工具。它的核心目標是通過簡化開發流程&#xff0c;降低技術門檻&#xff0c;使企業能夠更高效地響應業務需求…

PyTorch隨機擦除:提升模型抗遮擋能力

PyTorch中內置的隨機擦除&#xff08;Random Erasing&#xff09;數據增強通過torchvision.transforms.RandomErasing實現&#xff0c;以下是原理和用法的詳細說明&#xff1a;核心原理正則化作用&#xff1a; 隨機擦除在訓練圖像上隨機遮蓋一個矩形區域&#xff0c;模擬遮擋場…

微信小程序交互精髓:點擊操作與狀態管理實戰

目錄 一、點擊事件綁定&#xff1a;bindtap 與 catchtap 的正確使用 基礎語法對比 事件對象詳解 二、點擊切換選中狀態&#xff1a;數據驅動視圖的實現 1. 單元素狀態切換 2. 多元素單選狀態 3. 多元素多選狀態 三、樣式動態切換&#xff1a;數據綁定與 CSS 的完美結合 …

Language Models are Few-Shot Learners: 開箱即用的GPT-3(二)

接上一篇 Approach 前面的摘要和Introduction做了一些概要性的介紹,論文在第二章,也就是approach中,介紹了模型的設計,zero,one,few-shot的設計等等。 這一章一開頭就說,GPT-3的結構和GPT-2的結構一樣,只是在相應的把模型尺寸,數據規模,訓練時間等增加了。Our bas…