Apache RocketMQ:消息可靠性、順序性與冪等處理的全面實踐

Apache RocketMQ 是一個高性能、高可靠的分布式消息中間件,廣泛應用于異步通信、事件驅動架構和分布式系統中。本文深入探討 RocketMQ 的消息可靠性、順序性和冪等處理機制,結合 Redisson 分布式鎖實現冪等消費,提供詳細的代碼示例和實踐建議,幫助開發者構建健壯的消息系統。

一、RocketMQ 概述

Apache RocketMQ 由阿里巴巴開源,現為 Apache 頂級項目,支持發布/訂閱和點對點消息模型,提供普通消息、定時消息、事務消息等多種類型。其核心組件包括:

  • NameServer:管理 Broker 元數據,提供服務發現和路由。
  • Broker:負責消息存儲、轉發和持久化。
  • Producer:消息生產者,發送消息到 Broker。
  • Consumer:消息消費者,從 Broker 訂閱消息。

RocketMQ 的高性能和靈活性使其成為企業級應用的理想選擇,尤其在需要保證消息可靠性、順序性和冪等性的場景中。以下逐一分析這三方面的實現機制。


二、消息可靠性

消息可靠性確保消息從生產者到消費者的整個流程中不丟失、不重復且正確傳遞。RocketMQ 從生產者、Broker 和消費者三個層面提供保障。

1. 生產者端可靠性

RocketMQ 支持三種發送模式:

  • 同步發送:等待 Broker 確認,確保消息成功存儲。
  • 異步發送:通過回調確認結果,適合高吞吐場景。
  • 單向發送:無確認機制,適用于低可靠性場景(如日志收集)。

生產者內置重試機制(默認重試 2 次),可通過 setRetryTimesWhenSendFailed 配置。

代碼示例(同步發送)

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
SendResult sendResult = producer.send(msg);
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {System.out.println("Message sent successfully: " + sendResult.getMsgId());
}
producer.shutdown();

2. Broker 端可靠性

Broker 通過持久化存儲消息到磁盤(commitlog),支持兩種刷盤模式:

  • 同步刷盤flushDiskType = SYNC_FLUSH):消息寫入磁盤后返回,適合高可靠性場景。
  • 異步刷盤flushDiskType = ASYNC_FLUSH):消息先寫入內存,定期刷盤,性能更高但有少量丟失風險。

配置示例

flushDiskType=SYNC_FLUSH

3. 消費者端可靠性

消費者通過 Push 或 Pull 模式消費消息,RocketMQ 提供以下機制:

  • 消息確認:Push 模式下,消費者需顯式確認消息處理狀態。
  • 消費重試:消費失敗時,消息進入重試隊列(%RETRY%ConsumerGroup),按時間間隔重試(默認 16 次)。
  • 死信隊列:重試失敗后,消息進入死信隊列(%DLQ%ConsumerGroup),便于人工處理。

代碼示例(消費者)

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {System.out.println("Received message: " + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();

4. 事務消息

事務消息用于分布式事務場景,確保消息發送與本地事務一致。例如,在電商訂單系統中,只有數據庫更新成功后,消息才會被提交。

事務消息流程

  1. 發送半消息(Half Message)到 Broker。
  2. 執行本地事務。
  3. 根據事務結果提交或回滾消息。

代碼示例

TransactionMQProducer producer = new TransactionMQProducer("ProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.setTransactionListener(new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 執行本地事務return LocalTransactionState.COMMIT_MESSAGE;}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {// 檢查事務狀態return LocalTransactionState.COMMIT_MESSAGE;}
});
producer.start();
Message msg = new Message("TopicTest", "TagA", "Transaction Message".getBytes());
producer.sendMessageInTransaction(msg, null);

三、消息順序性

順序消息確保消息按照發送順序被消費,適用于訂單狀態流轉、日志處理等場景。RocketMQ 通過分區順序和單線程消費實現。

1. 順序消息機制

  • 全局順序:所有消息發送到一個隊列,消費者單線程消費,性能較低。
  • 分區順序:按業務分區(如訂單 ID)將消息發送到不同隊列,同一分區的消息保持順序,性能較高。

RocketMQ 使用 MessageQueueSelector 確保同一業務的消息發送到同一隊列,消費者通過 MessageListenerOrderly 實現單線程消費。

2. MessageListenerOrderly 的工作原理

MessageListenerOrderly 通過以下機制保障順序消費:

  • 隊列鎖:Broker 為每個消息隊列分配鎖,確保同一隊列只被一個消費者線程處理。
  • 單線程消費:每個隊列由單一線程按序處理消息,未完成當前消息前不會拉取下一條。
  • 消費進度管理:只有消息消費成功后,Offset 才會更新。
  • 負載均衡:隊列重新分配時,消費者從上次 Offset 繼續消費,避免亂序。

代碼示例(生產者)

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 10; i++) {String orderId = "order" + (i % 3);Message msg = new Message("OrderTopic", "TagA", orderId, ("Order Step " + i).getBytes());SendResult sendResult = producer.send(msg, (mqs, msg1, arg) -> {String id = (String) arg;int index = Math.abs(id.hashCode() % mqs.size());return mqs.get(index);}, orderId);
}
producer.shutdown();

代碼示例(順序消費者)

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderlyConsumerGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("OrderTopic", "*");
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {for (MessageExt msg : msgs) {System.out.printf("Thread: %s, QueueId: %d, Message: %s%n", Thread.currentThread().getName(), msg.getQueueId(), new String(msg.getBody()));}try {Thread.sleep(100); // 模擬處理耗時return ConsumeOrderlyStatus.SUCCESS;} catch (Exception e) {return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}
});
consumer.start();

四、消息冪等處理(基于 Redisson)

冪等性確保重復消費同一消息不會導致狀態不一致,例如避免重復扣款。RocketMQ 本身不提供內置冪等機制,但可以通過 Redisson 的分布式鎖實現。

1. 冪等處理原理

  • 唯一標識:使用消息的 MessageId 或業務 ID 作為去重依據。
  • 分布式鎖:通過 Redisson 獲取基于消息 ID 的鎖,鎖獲取成功則處理消息,失敗則跳過。
  • 狀態記錄:可選地將消費狀態存入 Redis 或數據庫,進一步防止重復消費。
  • 鎖的 TTL:設置鎖過期時間,避免異常導致鎖無法釋放。

2. Redisson 配置

配置 Redisson 客戶端連接 Redis:

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;public class RedissonConfig {public static RedissonClient getRedissonClient() {Config config = new Config();config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(0);return Redisson.create(config);}
}

3. 冪等消費者實現

以下是使用 Redisson 分布式鎖的消費者代碼:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;import java.util.List;
import java.util.concurrent.TimeUnit;public class IdempotentConsumer {public static void main(String[] args) throws Exception {RedissonClient redissonClient = RedissonConfig.getRedissonClient();DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("IdempotentConsumerGroup");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("TopicTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {String msgId = msg.getMsgId();String lockKey = "rocketmq:msg:" + msgId;RLock lock = redissonClient.getLock(lockKey);boolean acquired = false;try {acquired = lock.tryLock(1, 10, TimeUnit.SECONDS);if (acquired) {System.out.println("Processing message: " + new String(msg.getBody()) + ", MsgId: " + msgId);Thread.sleep(100); // 模擬業務處理return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} else {System.out.println("Duplicate message skipped: " + msgId);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}} catch (Exception e) {System.err.println("Error processing message: " + msgId + ", error: " + e.getMessage());return ConsumeConcurrentlyStatus.RECONSUME_LATER;} finally {if (acquired) {lock.unlock();}}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("Consumer Started.");}
}

4. 結合順序消費的冪等處理

對于順序消費場景,使用 MessageListenerOrderly 實現冪等處理:

consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {String msgId = msg.getMsgId();String lockKey = "rocketmq:msg:" + msgId;RLock lock = redissonClient.getLock(lockKey);boolean acquired = false;try {acquired = lock.tryLock(1, 10, TimeUnit.SECONDS);if (acquired) {System.out.println("Processing message: " + new String(msg.getBody()) + ", MsgId: " + msgId);Thread.sleep(100);return ConsumeOrderlyStatus.SUCCESS;} else {System.out.println("Duplicate message skipped: " + msgId);return ConsumeOrderlyStatus.SUCCESS;}} catch (Exception e) {System.err.println("Error processing message: " + msgId + ", error: " + e.getMessage());return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;} finally {if (acquired) {lock.unlock();}}}return ConsumeOrderlyStatus.SUCCESS;}
});

五、應用場景與注意事項

1. 應用場景

  • 消息可靠性:電商訂單、支付通知,確保消息不丟失。
  • 消息順序性:訂單狀態流轉(創建 -> 支付 -> 發貨),保證處理順序。
  • 消息冪等性:支付扣款、庫存更新,防止重復處理。

2. 注意事項

  • 可靠性
    • 使用同步刷盤和事務消息確保高可靠性場景。
    • 配置合理的重試次數和死信隊列處理失敗消息。
  • 順序性
    • 生產者需確保同一業務消息發送到同一隊列。
    • MessageListenerOrderly 犧牲部分性能,適合低吞吐場景。
  • 冪等性
    • 確保 Redis 高可用,避免單點故障。
    • 鎖的 TTL 需大于業務處理時間,但不宜過長。
    • 可結合數據庫唯一約束作為兜底去重機制。
  • 性能優化
    • 調整隊列數量以平衡吞吐量和順序性。
    • 批量消費時,優化鎖粒度或使用 Redisson 的 MultiLock

六、總結

Apache RocketMQ 通過同步發送、刷盤機制和事務消息保證消息可靠性;通過分區順序和 MessageListenerOrderly 實現消息順序性;通過 Redisson 分布式鎖實現高效的冪等處理。開發者可根據業務需求選擇合適的機制:

  • 高可靠性場景:啟用同步刷盤和事務消息。
  • 順序消費場景:使用 MessageQueueSelectorMessageListenerOrderly
  • 冪等性場景:結合 Redisson 分布式鎖和狀態記錄。

通過合理配置和代碼實現,RocketMQ 可以滿足復雜分布式系統中的消息處理需求。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/95141.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/95141.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/95141.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

無服務器日志分析由 Elasticsearch 提供支持,推出新的低價層

作者&#xff1a;來自 Elastic Log Analytics Elastic Observability Logs Essentials 在 Elastic Cloud Serverless 上提供成本效益高、無麻煩的日志分析。 SREs 可以攝取、搜索、豐富、分析、存儲和處理日志&#xff0c;而無需管理部署的運營開銷。[](https://www.elastic.co…

(Arxiv-2025)Phantom-Data:邁向通用的主體一致性視頻生成數據集

Phantom-Data&#xff1a;邁向通用的主體一致性視頻生成數據集 paper是字節發布在Arxiv2025的工作 paper title&#xff1a;Phantom-Data: Towards a General Subject-Consistent Video Generation Dataset Code&#xff1a;鏈接 Abstract 近年來&#xff0c;主體到視頻&#…

如何解決pip安裝報錯ModuleNotFoundError: No module named ‘mlflow’問題

【Python系列Bug修復PyCharm控制臺pip install報錯】如何解決pip安裝報錯ModuleNotFoundError: No module named ‘mlflow’問題 摘要 在Python開發中&#xff0c;pip install 報錯是一種常見問題&#xff0c;尤其是在使用集成開發環境&#xff08;IDE&#xff09;如PyCharm時…

2020/12 JLPT聽力原文 問題一 3番

3番&#xff1a;會社で女の人と男の人が話しています。女の人は倉庫に入るとき、どの順番で入口のボタンを押さなければなりませんか。 女&#xff1a;すみません。地下の倉庫に行って、資料を取ってきたいんですが、入口の開け方がわからなくて… 男&#xff1a;ああ、最近、管…

C#/.NET/.NET Core技術前沿周刊 | 第 49 期(2025年8.1-8.10)

前言 C#/.NET/.NET Core技術前沿周刊&#xff0c;你的每周技術指南針&#xff01;記錄、追蹤C#/.NET/.NET Core領域、生態的每周最新、最實用、最有價值的技術文章、社區動態、優質項目和學習資源等。讓你時刻站在技術前沿&#xff0c;助力技術成長與視野拓寬。 歡迎投稿、推薦…

基于強化學習的目標跟蹤 研究初探

強化學習 目標跟蹤Visual tracking by means of deep reinforcement learning and an expert demonstratorYOLO 檢測下基于 ETC-DDPG 算法的無人機視覺跟蹤基于特征與深度強化學習方法的機器人視覺伺服技術研究高性能可拓展視頻目標跟蹤算法研究基于目標運動與外觀特征的多目標…

排序與查找,簡略版

數組的排序 排序的基本介紹 排序是將一組數據&#xff0c;按照一定順序進行排列的過程 排序的分類&#xff1a; 內部排序&#xff1a; 一次性適用數據量小的情況 將需要處理的數據都加載到內部存儲器中進行排序。包括交換式排序&#xff0c;選擇式排序&#xff0c;插入式排序 外…

打靶日常-XSS(反射型和存儲型)

目錄 小皮: 1. 2.這里需要登錄,我們之前爆破出賬號密碼在這里就可以用?編輯 登錄之后:?編輯 使用工具: 先輸入正確字符進行測試:aaa 進行測試: 3.換種控制臺顯示 結果:(使用f12大法) DVWA: 反射型XSS: 低: ?編輯 中:大小寫繞過: ?編輯 也可以雙寫繞過: ?編…

二叉搜索樹深度解析:從原理實現到算法應用----《Hello C++ Wrold!》(18)--(C/C++)

文章目錄前言二叉搜索樹&#xff08;二叉排序樹或二叉查找樹&#xff09;二叉搜索樹的模擬實現二叉搜索樹和有序數組二分查找的比較兩個搜索模型作業部分前言 二叉搜索樹&#xff08;Binary Search Tree&#xff0c;簡稱 BST&#xff09;作為一種重要的樹形數據結構&#xff0…

牛客.空調遙控二分查找牛客.kotori和氣球(數學問題)力扣.二叉樹的最大路徑和牛客.主持人調度(二)

目錄 牛客.空調遙控 二分查找 牛客.kotori和氣球&#xff08;數學問題) 力扣.二叉樹的最大路徑和 牛客.主持人調度(二) 牛客.空調遙控 枚舉n個空調之后&#xff0c;使數組有序&#xff0c;左右下標&#xff0c;用二分查找&#xff0c;然后一個求 長度就好 二分查找 /二分理…

《嵌入式Linux應用編程(二):標準IO高級操作與文件流定位實戰》

今日學習內容1. 行輸入函數安全實踐(1) fgets vs gets函數安全特性換行符處理緩沖區保護fgets指定讀取長度&#xff08;size-1&#xff09;保留\n并添加\0安全&#xff08;防溢出&#xff09;gets無長度限制將\n替換為\0危險2. Linux標準文件流文件流符號設備 標準輸入stdin鍵盤…

Springboot2+vue2+uniapp 小程序端實現搜索聯想自動補全功能

目錄 一、實現目標 1.1 需求 1.2 實現示例圖: 二、實現步驟 2.1 實現方法簡述 2.2 簡單科普 2.3 實現步驟及代碼 一、實現目標 1.1 需求 搜索聯想——自動補全 &#xff08;1&#xff09;實現搜索輸入框&#xff0c;用戶輸入時能顯示模糊匹配結果 &am…

極簡 5 步:Ubuntu+RTX4090 源碼編譯 vLLM

極簡 5 步&#xff1a;UbuntuRTX4090 源碼編譯 vLLM1. 系統依賴&#xff08;一次性&#xff09;2. 進入源碼目錄 & 激活環境3. 啟用 ccache 自動并行度4. 拉代碼 編譯&#xff08;2 行搞定&#xff09;5. 更新 flash-attn&#xff08;與 vLLM 配套&#xff09;6. 啟動 4 …

生產工具革命:定制開發開源AI智能名片S2B2C商城小程序重構商業生態的范式研究

摘要互聯網作為信息工具已深刻改變商業生態&#xff0c;但其本質仍停留在效率優化層面。本文提出&#xff0c;基于定制開發開源AI智能名片與S2B2C商城小程序的深度融合&#xff0c;正在引發生產工具層面的革命性變革。該技術架構通過重構"人-貨-場"關系&#xff0c;實…

Transformer前傳:Seq2Seq與注意力機制Attention

前言 參考了以下大佬的博客 https://blog.csdn.net/v_july_v/article/details/127411638 https://blog.csdn.net/andy_shenzl/article/details/140146699 https://blog.csdn.net/weixin_42475060/article/details/121101749 https://blog.csdn.net/weixin_43334693/article/det…

企業架構工具篇之ArchiMate的HelloWorld(2)

本文通過ArchiMate做一個員工報銷流程設計的小demo,按照步驟都可以做出來,在做這個demo之前先簡單認識下Archimate的開發界面: 模型樹(Models)窗口:通常位于左上方,以樹形結構展示一個或多個 ArchiMate 模型。用戶可在此瀏覽模型的整體結構,快速定位到特定的模型元素,…

Docker 詳解(保姆級安裝+配置+使用教程)

文章目錄一、初識 Docker二、Docker 命令1、安裝2、配置鏡像加速器檢查配置是否生效3、服務相關命令4、鏡像相關命令5、容器相關命令三、Docker 容器數據卷1、數據卷概念2、數據卷作用3、配置數據卷4、配置數據卷容器四、Docker 應用部署五、備份與遷移六、Dockerfile七、Docke…

做調度作業提交過程簡單介紹一下

?作業提交與執行流程前文提到在 Linux 的 HPC 或超算環境中&#xff0c;可以只在共享存儲上安裝一次應用程序&#xff0c;然后所有計算節點通過掛載共享目錄來訪問和執行這些程序&#xff0c;那么作業提交及執行過程是怎么樣的流程呢&#xff1f;結構說明&#xff1a;第一行是…

【Altium designer】解決報錯“Access violation at address...“

問題現象如上AD9原理圖工程所示報錯&#xff0c;當我關閉這個“CMM-WEIER-VA”原理圖工程以及其他不相關的原理圖工程出現報錯&#xff1a;Access violation at address 0832A5EC in module WorkspaceManager.DLL. Read of address 00000061 at 0832A5EC&#xff0c;任務管理器…

小杰python高級(three day)——numpy庫

1.numpy數組的操作&#xff08;1&#xff09;數組的連接stack該函數可以實現多個數組的堆疊(連接)&#xff0c;會創建新的軸&#xff0c;用于沿著新的軸連接一系列數組&#xff0c;所有數組必須具有相同的形狀。可以增加數組的維度。假設輸入的每個數組都是 n 維數組&#xff0…