使用消息隊列怎樣防止消息重復?

大家好,我是君哥。

使用消息隊列時,我們經常會遇到一個可能對業務產生影響的問題,消息重復。在訂單、扣款、對賬等對冪等有要求的場景,消息重復的問題必須解決。

那怎樣應對重復消息呢?今天來聊一聊這個話題。

1.三個語義

正確使用消息隊列,我們會考慮到消息防丟失、防重復,我們介紹 3 個語義:

  • At Least Once:在消息隊列中,指消息不丟失,一條消息最少被消費一次,但是可能會有重復消費。

  • Exactly Once:在消息隊列中,消息被精準消費一次,不丟失,也不會重復;

  • At Most Once:在消息隊列中,消息不會被重復消費,但是可能會有消息丟失

不同的消息場景,需要的語義不同。比如?Exactly Once?最難實現,一般需要引入事務消息。

不同使用場景,對語義的要求也不一樣。比如日志收集類的場景,At Most Once?就可以滿足,而支付類的場景則要求?Exactly Once。

2.消息重復

什么情況下會導致消息重復呢?

生產者發送消息后,Broker?保存成功,但是沒有成功給生產者返回?ACK,生產者以為消息發送失敗,重試,再次給?Broker?發送。Broker?保存了重復消息,導致?Consumer?多次消費。

圖片

消費者消費消息后,給?Broker?返回?ACK?失敗,導致?Broker?沒有修改偏移量,同一條消息再次發送給消費者,或者被消費者拉取到。

圖片

3.生產者防重

有的消息中間件是支持生產者冪等的。比如 Kafka 從 0.11.0?版本開始引入了冪等?Producer,可以使用下面代碼開啟冪等?Producer:

Properties props =?new?Properties();
//省略其他代碼
//配置冪等性
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,?true);?
//創建生產者實例
KafkaProducer<String, String> producer =?new?KafkaProducer<>(props);

Kafka?實現生產者冪等的原理是在生產者引入了?Producer ID(PID)和 Sequence Number?這兩個參數。

  • PID:Producer?擁有的?ID,唯一標識一個?Producer。

  • Sequence Number:自增的數值,唯一標識同一個?Producer?發送到指定分區的消息?ID。

有了這兩個參數,Broker?單分區就可以唯一標識一個生產者發送的唯一一條消息<PID,SequenceNumber>。Broker 收到消息時,如果檢查到消息的<PID,SequenceNumber>已經存在,就不會再保留這條消息。

但冪等?Producer?只能在單分區下生效,多分區情況下是不生效的。因為多個分區之間并不能相互訪問對方的<PID,SequenceNumber>。

圖片

4.Broker 防重

Broker?如果可以防重,那對于生產者和消費者來說,節省了大量的工作。下面我們看下?Pulsar?是怎樣防重的。

Broker?通過參數?BrokerDeduplicationEnabled?開啟防重功能。對于?Producer?發送的重復消息,Broker?返回響應?-1:-1。

Producer?發送消息時,會帶一個?sequenceId?字段,Broker?會按照?ProducerName?維度記錄當前生產者最大的?sequenceId(highestSequenceId)。Broker?收到消息時,首先會判斷消息中的?sequenceId?是否大于自己保存的當前生產者的?highestSequenceId,如果是則保存消息并更新?highestSequenceId,否則丟棄消息,并且給?Producer?返回?-1:-1。

下面是三個極端情況:

  1. Producer?斷開連接:這種情況下,跟?Broker?重新建立連接后,本地保存的?sequenceId?還在,只要使用?sequenceId?遞增后發送消息即可;

  2. Producer?宕機:Producer?重啟后,緩存的?sequenceId?肯定不存在了,這時跟?Broker?重新建立連接后,Broker?會根據?ProducerName?找出?highestSequenceId?發給?Producer,Producer?使用這個?sequenceId?來發送消息;

  3. Producer 和 Broker?都宕機:Broker?重啟后,可以從宕機前保存的快照中恢復各?Producer?對應的?highestSequenceId?發送給各?Producer。但這個?highestSequenceId?不一定準確,因為?Broker?宕機瞬間很有可能最新的?sequenceId?沒有來得及保存快照。

需要注意的是,跟?Kafka?的冪等?Producer?類似,Pulsar 的 Broker?冪等也只能保證?Topic/Partition?級別。

5.消費者防重

從上面的分析可以看出,靠生產者防重和?Broker?防重,只能在?Topic/Partition?級別生效,這通常并不能滿足我們的需求。而為了避免消費者重復消費對業務造成影響,消息防重還是必要的。這就要求我們做最后一道防線,在消費端進行防重或冪等處理。

消費端做防重,就不再考慮消息中間件層面的配置(比如?sequenceId),而是從消息體進行下手。

生產者發送消息時,給消息體賦值一個全局唯一的?ID,消費者處理消息時,根據全局唯一?ID?做防重。

比如消費端的邏輯是保存一條訂單消息,那把唯一?ID?保存到數據庫并且加一個唯一索引,這樣根據唯一索引就可以做消息去重。

不過使用唯一索引也有缺點:

  • 如果使用?MySQL?數據庫,不能使用?Change Buffer;

  • 非插入的場景(比如更新庫存)不能去重。

對于唯一索引的缺點,我們可以引入?Redis?對唯一?ID?做保存,利用?setNx?判斷消息是否已經處理過。如下圖:

圖片

if?(jedis.setnx(ID,?"1") ==?1) {//處理業務,返回 ACK
}else?{//直接返回 返回 ACK
}

6.總結

使用消息隊列,在一些場景下是需要防重的。主流消息隊列提供了一些防重的能力,但并不是完全可靠的。在對重復消息敏感的場景下,最好是在消費端處理消息時,從業務層面進行消息防重。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/72374.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/72374.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/72374.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

005 公網訪問 docker rocketmq

文章目錄 創建自定義網絡創建NameServer容器創建Broker容器正式開始啟動 Nameserver 容器啟動 Broker 容器并關聯 Nameserverdocker exec -it rmqbroker vi /etc/rocketmq/broker.conf檢查 namesrv 解析檢查 Broker 注冊狀態Nameserver 日志Broker 日志檢查容器日志手動指定 Br…

解決Docker Desktop啟動后Docker Engine stopped問題

一、問題描述 當我們更新了Docker Desktop后,在重新打開就顯示【Docker Engine stopped(Docker引擎已經停止)】,無法正常使用Docker,如下圖所示: 二、問題分析 1、檢查電腦主板的CPU是否開啟虛擬化; 2、需檢查Docker所需的功能是否開啟; 3、檢查WSL是否匹配; Docker的…

MongoDB—(一主、一從、一仲裁)副本集搭建

MongoDB集群介紹&#xff1a; MongoDB 副本集是由多個MongoDB實例組成的集群&#xff0c;其中包含一個主節點&#xff08;Primary&#xff09;和多個從節點&#xff08;Secondary&#xff09;&#xff0c;用于提供數據冗余和高可用性。以下是搭建 MongoDB 副本集的詳細步驟&am…

C++STL---<limits>

C <limits> 頭文件&#xff1a; <limits> 頭文件是 C 標準庫中用于獲取各種數據類型的數值范圍、精度等信息的工具。它通過模板類 std::numeric_limits 提供了對基本數據類型&#xff08;如 int、float、double 等&#xff09;的詳細屬性查詢功能。通過 std::nume…

藍橋杯自我復習打卡

總復習&#xff0c;打卡1. 一。排序 1。選段排序 太可惡了&#xff0c;直接全排輸出&#xff0c;一個測試點都沒過。 AC 首先&#xff0c;這個【l,r】區間一定要包含p,或者q&#xff0c;pq一個都不包含的&#xff0c;[l,r]區間無論怎么變&#xff0c;都對ans沒有影響。 其次&…

Flutter_學習記錄_實現列表上拉加載更多的功能

可以用ScrollController組件來實現這樣列表上拉加載更多的功能: 1. 定義變量 在StatefulWidget 的組件內&#xff0c;添加三個屬性&#xff1a; // 滾動視圖的控制器final ScrollController _scrollController ScrollController();// 是否已顯示了上拉加載中bool _isShowM…

【Linux】【網絡】不同子網下的客戶端和服務器通信其它方式

【Linux】【網絡】不同子網下的客戶端和服務器通信其它方式 那么&#xff0c;在 NAT 環境下&#xff0c;應該如何讓內網設備做為服務器&#xff0c;使內網設備被外部連接&#xff1f; 1 多撥 部分運營商&#xff0c;支持在多個設備上&#xff0c;通過 PPPoE 登錄同一個寬帶賬…

《Python百練成仙》31-40章(不定時更新)

第卅一章 函數結丹def開紫府 羅酆山的鬼門關吞吐著猩紅的變量陰風&#xff0c;每個風眼都涌動著作用域混亂的靈力亂流。葉軍手握薛香遺留的丹田玉簡&#xff0c;玉簡表面浮現出殘缺的函數符文&#xff1a; def 凝聚金丹(靈氣):道基 靈氣 * 0.618print(金丹品質) # 作用域外變…

六十天前端強化訓練之第一天到第七天——綜合案例:響應式個人博客項目

歡迎來到編程星辰海的博客講解 目錄 前言回顧 HTML5與CSS3基礎 一、知識講解 1. 項目架構設計&#xff08;語義化HTML&#xff09; 2. 響應式布局系統&#xff08;Flex Grid&#xff09; 3. 樣式優先級與組件化設計 4. 完整響應式工作流 二、核心代碼示例 完整HTML結…

測試的BUG分析

在了解BUG之前,我們要先了解軟件測試的生命周期,因為大多數BUG都是在軟件測試的過程中被發現的 軟件測試的生命周期 在了解 軟件測試的生命周期 之前,我們要先了解 軟件的生命周期 ,雖然他們之間只差了兩個字,但是差距還是很大的 首先是 軟件生命周期 ,這個是站在 軟件 的角…

【洛谷貪心算法題】P1094紀念品分組

該題運用貪心算法&#xff0c;核心思想是在每次分組時&#xff0c;盡可能讓價格較小和較大的紀念品組合在一起&#xff0c;以達到最少分組的目的。 【算法思路】 輸入處理&#xff1a;首先讀取紀念品的數量n和價格上限w&#xff0c;然后依次讀取每件紀念品的價格&#xff0c;…

[STM32]從零開始的STM32 BSRR、BRR、ODR寄存器講解

一、前言 學習STM32一陣子以后&#xff0c;相信大家對STM32 GPIO的控制也有一定的了解了。之前在STM32 LED的教程中也教了大家如何使用寄存器以及庫函數控制STM32的引腳從而點亮一個LED&#xff0c;之前的寄存器只是作為一個引入&#xff0c;并沒有深層次的講解&#xff0c;在教…

SQL分組問題

下列為電商公司用戶訪問時間數據 統計某個用戶連續的訪問記錄&#xff0c;如果時間間隔小于60s&#xff0c;就分為一組 id ts 1001 17523641234 1001 17523641256 1002 17523641278 1001 17523641334 1002 17523641434 1001 17523641534 1001 17523641544 1002 17523…

3月2日 C++日常習題測試一答案

C測試題答案與講解 一、填空題答案及講解 答案&#xff1a;const 講解&#xff1a;在 C 中&#xff0c;const關鍵字用于定義常量&#xff0c;一旦定義&#xff0c;其值不能被修改。例如const int num 10;&#xff0c;這里的num就是一個常量。 答案&#xff1a;3 講解&…

2W8000字 LLM架構文章閱讀指北

? 大模型架構專欄已經更新了30多篇文章。完整的專欄內容歡迎訂閱&#xff1a; LLM 架構專欄 1、LLM大模型架構專欄|| 從NLP基礎談起 2、 LLM大模型架構專欄|| 自然語言處理&#xff08;NLP&#xff09;之建模 3、 LLM大模型架構之詞嵌入&#xff08;Part1&#xff09; 3、 LLM…

SP導入智能材質球

智能材質球路徑 ...\Adobe Substance 3D Painter\resources\starter_assets\smart-materials 放入之后就會自動刷新

網絡原理----TCP/IP(3)

核心機制七----延時應答 默認情況下&#xff0c;接收方都是在收到數據報的第一時間&#xff0c;就返回ack&#xff0c;但是可以通過延時返回ack的方式來提高效率&#xff0c;理論上不是100%提高效率&#xff0c;但還是有一定幫助的。 因為如果接收數據的主機?刻返回ACK應答,…

MacBook Pro使用FFmpeg捕獲攝像頭與麥克風推流音視頻

FFmpeg查看macos系統音視頻設備列表 ffmpeg -f avfoundation -list_devices true -i "" 使用攝像頭及麥克風同時推送音頻及視頻流: ffmpeg -f avfoundation -pixel_format yuyv422 -framerate 30 -i "0:1" -c:v libx264 -preset ultrafast -b:v 1000k -…

部署Joplin私有云服務器postgres版-docker compose

我曾經使用過一段時間 Joplin&#xff0c;官方版本是收費的&#xff0c;而我更傾向于將數據掌握在自己手中。因此&#xff0c;在多次權衡后&#xff0c;我決定自己搭建 Joplin 服務器并進行嘗試。 個人搭建的版本與數據庫直連&#xff0c;下面是使用 Docker Compose 配置數據庫…

SQL的select語句完整的執行順序

SQL的SELECT語句的執行順序可以用"做菜流程"來類比理解。雖然我們寫SQL時按SELECT…FROM…WHERE…順序寫&#xff0c;但數據庫執行順序完全不同。以下是通俗易懂的講解&#xff08;附流程圖和示例&#xff09;&#xff1a; &#x1f527; 執行順序流程圖&#xff1a…