【面試題】如何保證MQ的消息不丟失、不重復

文章目錄

  • 一、消息丟失問題的解決方案
    • (一)發送端丟失
    • (二)存儲端丟失
      • 1. 同步刷盤
      • 2. Broker 集群
    • (三)消費端丟失
  • 二、消息重復問題的解決方案
    • (一)唯一鍵約束
    • (二)保存消費記錄
  • 三、總結

在消息隊列的使用過程中, 消息丟失和消息重復是兩個常見且令開發人員困擾的問題。

因為從生產者發送消息,到 Broker 保存消息,再到消費者消費消息,每個環節都暗藏著消息丟失的風險;而消息重復的產生,往往源于生產者的重復發送或消費者的重復接收。

那么接下來,我們深入剖析一下這兩個問題及其對應的策略。

一、消息丟失問題的解決方案

(一)發送端丟失

生產者發送消息時,處理不當極易造成消息丟失。目前,主流消息隊列普遍支持同步發送和異步發送兩種模式。

同步發送時,生產者發送消息后會同步等待 Broker 返回的 ACK 確認消息,只有收到 ACK 才認定消息發送成功;若長時間未收到,則判定發送失敗并進行重試。這種方式雖能確保消息不丟失,但會帶來性能瓶頸,因此在實際應用中,異步發送更為常用。

以 Kafka 為例,主流消息隊列(如 Kafka 和 RocketMQ)通常采用回調函數來保障異步發送時消息不丟失,具體代碼如下:

// 配置Kafka生產者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Kafka集群地址
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all"); // 確保所有副本都收到消息才確認
props.put("retries", 3); // 重試次數Producer<String, String> producer = new KafkaProducer<>(props);// 創建消息記錄
ProducerRecord<String, String> record = new ProducerRecord<>("topicName", "key", "message");// 異步發送消息并添加回調處理
producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {// 處理發送失敗的情況logger.error("消息發送失敗,topic: {}, partition: {}, 異常信息: {}", metadata.topic(), metadata.partition(), exception.getMessage());// 可在此處添加重試邏輯或告警機制} else {// 處理發送成功的情況logger.info("消息發送成功,topic: {}, partition: {}, offset: {}",metadata.topic(), metadata.partition(), metadata.offset());}}
});// 關閉生產者
producer.close();

(二)存儲端丟失

即便生產者成功發送消息,也無法保證消息絕對不丟失。因為若消息發送到 Broker 后,在消費者拉取之前,Broker 突然宕機且消息尚未落盤,同樣會導致消息丟失。為避免存儲階段的消息丟失,可從以下方面著手:

1. 同步刷盤

異步刷盤存在消息未落盤 Broker 就宕機的風險,而同步刷盤則是在消息成功落盤后,才向 Sender 返回發送成功的確認,從而從消息發送環節保障消息不丟失。在 RocketMQ 中,只需將flushDiskType參數配置為SYNC_FLUSH,即可開啟同步刷盤功能。

以下是兩種刷盤機制的對比示意圖:
在這里插入圖片描述
在這里插入圖片描述

2. Broker 集群

若 Broker 集群僅有一個節點,即便消息成功落盤,一旦 Broker 發生故障,在恢復前消費者將無法拉取消息;若出現磁盤故障且無法恢復,消息更是會永久丟失。

采用Broker 集群可有效解決該問題。在 Broker 集群環境下,可設置等待 2 個以上節點同步完消息后,再向 Producer 返回成功確認。如此一來,即便某個 Broker 節點掛掉,也能迅速找到替代節點,確保消息的可用性。

以下是 Broker 集群架構圖:
在這里插入圖片描述

(三)消費端丟失

消費者要確保消息不丟失,需在消費完成后再向 Broker 返回 ACK 確認。主流消息隊列中,若 Broker 未收到 ACK,會重新向消費者發送消息。

有時為了解決消息積壓問題,消費者會在拉取消息后直接返回 ACK,再異步執行消息處理邏輯。此時,為保證消息不丟失,需在返回 ACK 前將消息持久化到本地,例如保存至數據庫,后續可從數據庫讀取消息進行處理。

以下是消費者消息處理流程圖:
在這里插入圖片描述

二、消息重復問題的解決方案

消息重復產生的原因主要有兩點:

  • 一是生產者發送消息后未收到 ACK,進而進行重復發送;
  • 二是消費者消費完成后,Broker 未收到 ACK,導致消息被重復推送給消費者。

消息重復會對業務產生嚴重影響,比如電商場景中的重復支付、賬務場景中的重復記賬等。

以下是消息重復產生原因的分析圖:
在這里插入圖片描述

在這里插入圖片描述

從當前主流消息隊列來看,尚無一款能夠直接解決消息重復的消費問題,所以通常需要在消費端進行冪等處理

以下是幾種常見的冪等處理思路:

(一)唯一鍵約束

若消息會存儲到本地數據庫,可將消息 ID 設為唯一鍵;若消息不存入數據庫,也可選取消息 ID 或消息中其他具有唯一性的屬性,作為唯一鍵存儲到業務數據表中,以此避免重復消費。

(二)保存消費記錄

借助 Redis 保存消息 ID 也是一種有效方式。在消費消息前,先判斷 Redis 中是否已存在該消息 ID,示例代碼如下:

@Service
public class MessageConsumerService {private static final Logger logger = LoggerFactory.getLogger(MessageConsumerService.class);@Autowiredprivate StringRedisTemplate redisTemplate;@Autowiredprivate BusinessService businessService; // 業務處理服務// 消費消息的方法public void consumeMessage(String messageId, String messageBody) {try {// 1. 檢查消息是否已消費(利用Redis的原子性操作)Boolean isConsumed = redisTemplate.opsForValue().setIfAbsent("message:consumed:" + messageId, // Redis鍵名,格式為 message:consumed:{消息ID}"1",                             // 值設為1表示已消費30, TimeUnit.DAYS);             // 設置過期時間,防止內存泄漏if (isConsumed != null && isConsumed) {// 2. 消息未被消費,執行具體業務邏輯try {businessService.processMessage(messageBody);logger.info("消息處理成功,messageId: {}", messageId);} catch (Exception e) {// 業務處理失敗,刪除Redis標記以便重新消費redisTemplate.delete("message:consumed:" + messageId);logger.error("消息處理失敗,已刪除消費標記,messageId: {}", messageId, e);throw e; // 向上拋出異常,觸發重試機制}} else {// 3. 消息已被消費,直接跳過logger.info("消息已被消費,跳過處理,messageId: {}", messageId);}} catch (Exception e) {// 處理異常情況,可根據業務需求添加告警或補償邏輯logger.error("消息消費過程中發生異常,messageId: {}", messageId, e);// 可添加額外的重試邏輯或告警通知}}
}

需要注意的是,若消費失敗,需及時刪除 Redis 中保存的消息 ID,防止后續消息無法正常消費。

在這里插入圖片描述

三、總結

最后我們用一張圖總結一下這篇文章:
在這里插入圖片描述

消息不丟失、不重復是消息隊列的核心需求,但在實際應用中,滿足這一要求并非易事。

對于消息丟失問題,主流消息隊列可通過消息重試和消息持久化等手段有效解決;然而,消息重試機制又不可避免地帶來了消息重復的風險。

目前,主流消息隊列在處理消息重復問題上缺乏現成解決方案,對于不允許重復消費的業務場景,開發人員需在 消費端實現冪等處理邏輯,以保障業務的準確性和穩定性。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/86330.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/86330.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/86330.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

ArcGIS Maps SDK for JavaScript:使用圖層過濾器只顯示FeatureLayer的部分要素

文章目錄 引言1 需求場景分析2精確過濾實現方案2.1 基礎過濾語法2.2 動態過濾實現 3 模糊查詢進階技巧3.1 LIKE操作符使用3.2 特殊字段處理 4. 性能優化與注意事項4.1 服務端vs客戶端過濾4.2 最佳實踐建議 5 常見問題解答 引言 在地圖應用開發中&#xff0c;圖層過濾是常見的需…

day25-計算機網絡-3

1. DNS解析流程 windows host文件是否配置域名對應的ip查詢本地DNS緩存是否有這個域名對應的ip詢問本地DNS&#xff08;網卡配置的&#xff09;是否知曉域名對應的ip本地DNS訪問根域名解析服務器&#xff0c;但是根DNS只有頂級域名的記錄&#xff0c;根告訴我們.cn頂級域名的D…

中達瑞和SHIS高光譜相機在黑色水彩筆墨跡鑒定中的應用

在文件檢驗與物證溯源領域&#xff0c;對書寫材料&#xff08;如墨水&#xff09;進行快速、準確、無損的鑒別至關重要。由陳維娜等人撰寫的《高光譜技術結合化學計量法鑒別黑色水彩筆墨跡》&#xff08;發表于《光譜學與光譜分析》2023年第7期&#xff09;利用中達瑞和SHIS凝采…

華為OD機考 - 水仙花數 Ⅰ(2025B卷 100分)

import java.util.*; public static Integer get(int count,int c){if(count<3||count>7){return -1;}//存儲每位數的最高位……最低位int[] arr new int[count];List<Integer> res new ArrayList<>();for(int i(int) Math.pow(10,count-1);i<(int) Math…

Go 標準庫 encoding/gob 快速上手

文章目錄 1.簡介2.基礎3.類型和值4.編碼細節5.安全6.主要函數6.1 注冊1. 接口的底層類型在運行時才能確定2.類型標識的唯一性3.安全性與顯式意圖4.與結構體的自動處理對比5.示例分析為什么不能像 JSON 那樣自動處理&#xff1f;總結 6.2 編碼6.3 解碼 7.示例7.1 編解碼結構體7.…

Ubuntu ifconfig 查不到ens33網卡

BUG&#xff1a;ifconfig查看網絡配置信息&#xff1a; 終端輸入以下命令&#xff1a; sudo service network-manager stop sudo rm /var/lib/NetworkManager/NetworkManager.state sudo service network-manager start - service network - manager stop &#xff1a;停止…

算法-數論

C-小紅的數組查詢&#xff08;二&#xff09;_牛客周賽 Round 95 思路&#xff1a;不難看出a數組是有循環的 d3,p4時&#xff0c;a數組&#xff1a;1、0、3、2、1、0、3、2....... 最小循環節為4&#xff0c;即最多4種不同的數 d4,p6時&#xff0c;a數組&#xff1a;1、5、3、…

CSS中text-align: justify文本兩端對齊

text-align: justify; 是 CSS 中用于控制文本對齊方式的屬性值&#xff0c;它的核心作用是讓文本兩端對齊&#xff08;分散對齊&#xff09;&#xff0c;使段落左右邊緣整齊排列。以下是詳細解析&#xff1a; 作用效果 均勻分布間距 瀏覽器會自動調整單詞/字符之間的間距&#…

WebFuture:啟動數據庫提示: error while loading shared libraries: libaio.so.1問題處理

問題分析 當出現./mysqld: error while loading shared libraries: libaio.so.1: cannot open shared object file: No such file or directory這個錯誤時&#xff0c;這意味著 MySQL 服務器&#xff08;mysqld&#xff09;在啟動過程中無法找到libaio.so.1這個共享庫文件。li…

74常用控件_QSpacerItem的使用

目錄 代碼?例: 創建?組左右排列的按鈕. Spacer 使?布局管理器的時候, 可能需要在控件之間, 添加?段空?. 就可以使? QSpacerItem 來表?. 核?屬性 屬性說明width寬度height高度hData水平方向的 sizePolicy - QSizePolicy::Ignored&#xff1a;忽略控件的尺寸&#xf…

vmware 設置 dns

vmware 設置 dns 常用的 DNS&#xff08;Domain Name System&#xff09;服務器地址可以幫助你更快、更安全地解析域名。以下是一些國內外常用的公共 DNS 服務&#xff1a; 國內常用 DNS 阿里云 DNS IPv4: 223.5.5.5、223.6.6.6IPv6: 2400:3200::1、2400:3200:baba::1特點&am…

從一次日期格式踩坑經歷,談談接口設計中的“約定大于配置“

從一次日期格式踩坑經歷&#xff0c;談談接口設計中的"約定大于配置" 背景 最近在對接一個第三方接口時&#xff0c;遇到了一個有趣的"坑"。接口文檔中要求傳入一個符合 RFC3339 格式的日期時間字符串&#xff0c;格式示例為&#xff1a;2019-10-01T08:1…

高考數學易錯考點01 | 臨陣磨槍

文章目錄 前言集合與函數不等式數列三角函數 前言 本篇內容下載于網絡&#xff0c;網絡上的都是以 WORD 版本呈現&#xff0c;缺字缺圖很不完整&#xff0c;沒法使用&#xff0c;我只是做了補充和完善。有空準備進行第二次完善&#xff0c;添加問題解釋的鏈接。 集合與函數 …

YOLO12 改進|融入 Mamba 架構:插入視覺狀態空間模塊 VSS Block 的硬核升級

在醫學圖像分割領域&#xff0c;傳統卷積神經網絡&#xff08;CNNs&#xff09;受限于局部感受野&#xff0c;難以捕捉長距離依賴關系&#xff0c;而基于 Transformer 的模型因自注意力機制的二次計算復雜度&#xff0c;在處理高分辨率圖像時效率低下。近年來&#xff0c;狀態空…

MATLAB遍歷生成20到1000個節點的無線通信網絡拓撲推理數據

功能&#xff1a; 遍歷生成20到1000個節點的無線通信網絡拓撲推理數據&#xff0c;包括網絡拓撲和每個節點發射的電磁信號&#xff0c;采樣率1MHz/3000&#xff0c;信號時長5.7s&#xff0c;單幀數據波形為實采 數據生成效果&#xff1a; 拓撲及空間位置&#xff1a; 節點電磁…

oss:上傳圖片到阿里云403 Forbidden

訪問圖片出現403Forbidden問題&#xff0c;我們可以直接登錄oss賬號&#xff0c;查看對應權限是否開通&#xff0c;是否存在跨域問題

香橙派3B學習筆記8:snap安裝管理軟件包_打包倆個有調用的python文件

現在嘗試一下打包多個有互相調用的 py程序&#xff1a; ssh &#xff1a; orangepi本地ip 密碼 &#xff1a; orangepi 操作系統發行版&#xff1a; 基于 Ubuntu 20.04.6 LTS&#xff08;Focal Fossa&#xff09;的定制版本&#xff0c;專門為 Orange Pi 設備優化。PRETTY_NAM…

Spring Boot 中實現 HTTPS 加密通信及常見問題排查指南

Spring Boot 中實現 HTTPS 加密通信及常見問題排查指南 在金融行業安全審計中&#xff0c;未啟用HTTPS的Web應用被列為高危漏洞。通過正確配置HTTPS&#xff0c;可將中間人攻擊風險降低98%——本文將全面解析Spring Boot中HTTPS的實現方案與實戰避坑指南。 一、HTTPS 核心原理與…

前端對WebSocket進行封裝,并建立心跳監測

WebSocket的介紹&#xff1a; WebSocket 是一種在客戶端和服務器之間進行全雙工、雙向通信的協議。它是基于 HTTP 協議&#xff0c;但通過升級&#xff08;HTTP 升級請求&#xff09;將連接轉換為 WebSocket 協議&#xff0c;從而提供更高效的實時數據交換。 WebSocket 的特點…

【AI】智駕地圖在不同自動駕駛等級中的作用演變

一、功能價值動態模型&#xff1a;基于自動駕駛等級的權重遷移 功能演變四階段&#xff1a; █ 輔助階段&#xff08;L2&#xff09;&#xff1a;單功能補足 → █ 拓展階段&#xff08;L2 NOA&#xff09;&#xff1a;多模態增強 → █ 融合階段&#xff08;L3&#xff09;…