RocketMQ順序消費機制

RocketMQ的順序消費機制通過生產端和消費端的協同設計實現,其核心在于局部順序性,即保證同一隊列(MessageQueue)內的消息嚴格按發送順序消費。以下是詳細機制解析及關鍵源碼實現:
在這里插入圖片描述


一、順序消費的核心機制

1. 生產端路由策略
  • Sharding Key路由:生產者通過MessageQueueSelector接口將同一業務標識(如訂單ID)的消息路由到同一隊列。例如,根據訂單ID對隊列數取模,確保同一訂單的消息進入同一隊列。
    // 示例:生產者選擇隊列
    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}
    }, orderId);
    

路由方法:
在這里插入圖片描述
SelectMessageQueueByHash:按哈希選擇消息隊列。
SelectMessageQueueByRandom:隨機選擇消息隊列。
SelectMessageQueueByMachineRoom:按照機房選擇消息隊列。

  • 同步發送:必須使用同步發送(send()方法),異步發送無法保證消息順序。
2. 消費端鎖機制
  • Broker端隊列鎖:消費者集群模式下,通過定時任務(默認每20秒)向Broker申請隊列鎖,只有獲得鎖的消費者實例才能拉取并消費該隊列消息。鎖的有效期默認60秒,避免宕機導致死鎖。
  • 本地隊列快照鎖:消費者在消費時對ProcessQueue(隊列快照)加內存鎖(synchronized塊),確保同一隊列的消息僅由一個線程順序處理。
3. 消費流程控制
  • 單線程順序消費:每個隊列對應一個消費線程,從ProcessQueue的紅黑樹(msgTreeMap)中按消息偏移量順序取出消息,保證消費順序與存儲順序一致。
  • 失敗重試機制:消費失敗時,若未達最大重試次數,消息會重新放回ProcessQueue等待下次消費;若超過次數則進入死信隊列。

二、關鍵源碼解析

1. 消費者啟動與鎖管理
  • 服務初始化:消費者啟動時,若監聽器為MessageListenerOrderly,則創建ConsumeMessageOrderlyService,并啟動定時加鎖任務。

    // DefaultMQPushConsumerImpl#start
    if (getMessageListenerInner() instanceof MessageListenerOrderly) {this.consumeMessageService = new ConsumeMessageOrderlyService(this, listener);consumeMessageService.start();
    }
    
  • 定時加鎖ConsumeMessageOrderlyService啟動后,定時調用RebalanceImpl.lockAll()向Broker申請鎖,更新ProcessQueue的鎖定狀態。

      public synchronized void lockMQPeriodically() {if (!this.stopped) {this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();}}
    
      				for (MessageQueue mq : mqs) {ProcessQueue processQueue = this.processQueueTable.get(mq);if (processQueue != null) {if (lockOKMQSet.contains(mq)) {if (!processQueue.isLocked()) {log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq);}// 更新`ProcessQueue`的鎖定狀態 trueprocessQueue.setLocked(true);processQueue.setLastLockTimestamp(System.currentTimeMillis());} else {// 更新`ProcessQueue`的鎖定狀態 falseprocessQueue.setLocked(false);log.warn("the message queue locked Failed, Group: {} {}", this.consumerGroup, mq);}}}
    
2. 消息拉取與消費
  • 鎖檢查:拉取消息前檢查ProcessQueue是否已鎖定,未鎖定則延遲拉取。
    // DefaultMQPushConsumerImpl#pullMessage
    if (processQueue.isLocked()) {// 計算消費偏移量并拉取消息
    } else {executePullRequestLater(pullRequest, 3000); // 延遲3秒重試
    }
    
  • 消費線程加鎖:消費線程運行時獲取隊列內存鎖,確保單線程處理。
    synchronized (messageQueueLock.fetchLockObject(messageQueue)) {List<MessageExt> msgs = processQueue.takeMessags(batchSize);// 執行消費邏輯
    }
    
3. Broker端鎖管理
  • 鎖存儲:Broker通過RebalanceLockManager維護鎖信息,記錄消費者ClientID和最后更新時間,超時(默認60秒)則自動釋放。
    class LockEntry {String clientId;long lastUpdateTimestamp;boolean isExpired() { /* 檢查是否超時 */ }
    }
    
  • 鎖競爭:消費者通過lockBatchMQ請求批量加鎖,Broker返回成功鎖定的隊列列表。

三、適用場景與注意事項

  1. 適用場景

    • 分區順序:如訂單流程(創建、支付、完成),同一訂單ID的消息需順序處理。
    • 全局順序Topic僅一個隊列,性能較低,適用于強一致性場景(如證券交易)。
  2. 注意事項

    • 冪等性:因網絡抖動或消費者重啟可能導致短暫亂序,業務邏輯需支持冪等處理。
    • 隊列數選擇:分區數越多并發度越高,但需確保同一業務ID的路由一致性。

總結

RocketMQ的順序消費通過生產端路由策略消費端鎖機制Broker協同管理實現。其設計在保證局部順序的同時兼顧性能,適用于多數業務場景。源碼層面,ConsumeMessageOrderlyServiceRebalanceImpl是核心模塊,通過定時加鎖單線程消費隊列快照管理確保順序性。實際使用時需結合業務特點設計Sharding Key,并處理可能的異常情況。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/896962.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/896962.shtml
英文地址,請注明出處:http://en.pswp.cn/news/896962.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【JavaEE】-- 多線程(初階)4

文章目錄 8.多線程案例8.1 單例模式8.1.1 餓漢模式8.1.2 懶漢模式 8.2 阻塞隊列8.2.1 什么是阻塞隊列8.2.2 生產者消費者模型8.2.3 標準庫中的阻塞隊列8.2.4 阻塞隊列的應用場景8.2.4.1 消息隊列 8.2.5 異步操作8.2.5 自定義實現阻塞隊列8.2.6 阻塞隊列--生產者消費者模型 8.3 …

【C++設計模式】第四篇:建造者模式(Builder)

注意&#xff1a;復現代碼時&#xff0c;確保 VS2022 使用 C17/20 標準以支持現代特性。 分步驟構造復雜對象&#xff0c;實現靈活裝配 1. 模式定義與用途 核心目標&#xff1a;將復雜對象的構建過程分離&#xff0c;使得同樣的構建步驟可以創建不同的表示形式。 常見場景&am…

vuex中的state是響應式的嗎?

在 Vue.js 中&#xff0c;Vuex 的 state 是響應式的。這意味著當你更改 state 中的數據時&#xff0c;依賴于這些數據的 Vue 組件會自動更新。這是通過 Vue 的響應式系統實現的&#xff0c;該系統使用了 ES6 的 Proxy 對象來監聽數據的變化。 當你在 Vuex 中定義了一個 state …

若依框架中的崗位與角色詳解

若依框架中的崗位與角色詳解 一、核心概念與定位 崗位&#xff08;Post&#xff09; 業務職能導向&#xff1a;崗位是用戶在組織架構中的職務標識&#xff08;如“開發人員”“項目經理”&#xff09;&#xff0c;用于描述工作職責而非直接控制權限。崗位與部門關聯&#xff…

SQL經典常用查詢語句

1. 基礎查詢語句 1.1 查詢表中所有數據 在SQL中&#xff0c;查詢表中所有數據是最基本的操作之一。通過使用SELECT * FROM table_name;語句&#xff0c;可以獲取指定表中的所有記錄和列。例如&#xff0c;假設有一個名為employees的表&#xff0c;包含員工的基本信息&#xf…

EP 架構:未來主流方向還是特定場景最優解?

DeepSeek MoE架構采用跨節點專家并行&#xff08;EP&#xff09;架構&#xff0c;在提升推理系統性能方面展現出巨大潛力。這一架構在發展進程中也面臨諸多挑戰&#xff0c;其未來究竟是會成為行業的主流方向&#xff0c;還是僅適用于特定場景&#xff0c;成為特定領域的最優解…

[密碼學實戰]Java實現國密(SM2)密鑰協商詳解:原理、代碼與實踐

一、代碼運行結果 二、國密算法與密鑰協商背景 2.1 什么是國密算法&#xff1f; 國密算法是由中國國家密碼管理局制定的商用密碼標準&#xff0c;包括&#xff1a; SM2&#xff1a;橢圓曲線公鑰密碼算法&#xff08;非對稱加密/簽名/密鑰協商&#xff09;SM3&#xff1a;密碼…

動漫短劇開發公司,短劇小程序搭建快速上線

在當今快節奏的生活里&#xff0c;人們的娛樂方式愈發多元&#xff0c;而動漫短劇作為新興娛樂形式&#xff0c;正以獨特魅力迅速崛起&#xff0c;成為娛樂市場的耀眼新星。近年來&#xff0c;動漫短劇市場呈爆發式增長&#xff0c;吸引眾多創作者與觀眾目光。 從市場規模來看…

第四十五:創建一個vue 的程序

html <div id"app">{{ msg }}<h2>{{ web.title }}</h2><h3>{{ web.url }}</h3> </div> js /*<div id"app"></div> 指定一個 id 為 app 的 div 元素{{ }} 插值表達式, 可以將 Vue 實例中定義的數據在視圖…

docer swarm集群部署springboot項目

1.準備兩臺服務器&#xff0c;安裝好docker、docker-compose 因為用到了docker倉庫&#xff0c;安裝harbor,可以從github下載離線安裝包 2. 我這邊用到了gitlab-ci,整體流程也都差不多 1&#xff09;打包mvn clean install 2&#xff09;打鏡像 docker-compose -f docker-compo…

Python測試框架Pytest的參數化

上篇博文介紹過&#xff0c;Pytest是目前比較成熟功能齊全的測試框架&#xff0c;使用率肯定也不斷攀升。 在實際工作中&#xff0c;許多測試用例都是類似的重復&#xff0c;一個個寫最后代碼會顯得很冗余。這里&#xff0c;我們來了解一下pytest.mark.parametrize裝飾器&…

開發博客系統

前言 準備工作 數據庫表分為實體表和關系表 第一&#xff0c;建數據庫表 然后導入前端頁面 創建公共模塊 就是統一返回值&#xff0c;異常那些東西 自己造一個自定義異常 普通類 mapper 獲取全部博客 我們只需要返回id&#xff0c;title&#xff0c;content&#xff0c;us…

【Spring Boot 應用開發】-05 命令行參數

Spring Boot 常用命令行參數 Spring Boot 支持多種命令行參數&#xff0c;這些參數可以在啟動應用時通過命令行直接傳遞。以下是一些常用的命令行參數及其詳細說明&#xff1a; 1. 基本配置參數 --server.port端口號 指定應用程序運行的HTTP端口&#xff0c;默認為8080。 jav…

20250304學習記錄

第一部分&#xff0c;先來了解一下各種論文期刊吧&#xff0c;畢竟也是這把歲數了&#xff0c;還什么都不懂呢 國際期刊&#xff1a; EI收集的主要有兩種&#xff0c; JA&#xff1a;EI源刊 CA&#xff1a;EI會議 CPCI也叫 ISTP 常說的SCI分區是指&#xff0c;JCR的一區、…

2024 年 MySQL 8.0.40 安裝配置、Workbench漢化教程最簡易(保姆級)

首先到官網上下載安裝包&#xff1a;http://www.mysql.com 點擊下載&#xff0c;拉到最下面&#xff0c;點擊社區版下載 windows用戶點擊下面適用于windows的安裝程序 點擊下載&#xff0c;網絡條件好可以點第一個&#xff0c;怕下著下著斷了點第二個離線下載 雙擊下載好的安裝…

網絡安全檢查漏洞內容回復 網絡安全的漏洞

網絡安全的核心目標是保障業務系統的可持續性和數據的安全性&#xff0c;而這兩點的主要威脅來自于蠕蟲的暴發、黑客的攻擊、拒絕服務攻擊、木馬。蠕蟲、黑客攻擊問題都和漏洞緊密聯系在一起&#xff0c;一旦有重大安全漏洞出現&#xff0c;整個互聯網就會面臨一次重大挑戰。雖…

汽車智能鑰匙中PKE低頻天線的作用

PKE&#xff08;Passive Keyless Entry&#xff09;即被動式無鑰匙進入系統&#xff0c;汽車智能鑰匙中PKE低頻天線在現代汽車的智能功能和安全保障方面發揮著關鍵作用&#xff0c;以下是其具體作用&#xff1a; 信號交互與身份認證 低頻信號接收&#xff1a;當車主靠近車輛時…

uiautomatorviewer定位元素報Unexpected ... UI hierarchy

發現問題 借鑒博客 Unexpected error while obtaining UI hierarchy android app UI自動化-元素定位輔助工具 Unexpected error while obtaining UI hierarchy&#xff1a;使用uiautomatorviewer定位元素報錯 最近在做安卓自動化,安卓自動化主要工作之一就是獲取UI樹 app端獲…

通俗的方式解釋“零錢兌換”問題

“零錢兌換”是一道經典的算法題目&#xff0c;其主要問題是&#xff1a;給定不同面額的硬幣和一個總金額&#xff0c;求出湊成總金額所需的最少硬幣個數。如果沒有任何一種硬幣組合能組成總金額&#xff0c;返回-1。 解題思路 動態規劃&#xff1a;使用動態規劃是解決零錢兌…

GBT32960 協議編解碼器的設計與實現

GBT32960 協議編解碼器的設計與實現 引言 在車聯網領域&#xff0c;GBT32960 是一個重要的國家標準協議&#xff0c;用于新能源汽車與監控平臺之間的數據交互。本文將詳細介紹如何使用 Rust 實現一個高效可靠的 GBT32960 協議編解碼器。 整體架構 編解碼器的核心由三個主要組…