RocketMq如何保證消息的順序性

文章目錄

  • 1.順序消息的全流程
    • 1.1 發送階段:消息分區
    • 1.2.存儲階段:順序寫入
    • 1.3.消費階段:串行消費
  • 2.第三把鎖有什么用?
  • 3.順序消費存在的問題

Kafka只支持同一個Partition內消息的順序性一樣,RocketMQ中也提供了基于隊列(分區)的順序消費。即同個隊列內的消息可以做到有序,但是不同隊列內的消息是無序的!

在RocketMq中,它的順序消息通過客戶端的三把鎖以及同一隊列順序寫入協同工作,確保消息從發送、存儲到消費的全流程嚴格有序。其核心在于:

  • 分區一致性(同一業務邏輯的消息發送到同一個分區)
  • 存儲順序性(單線程順序寫入)
  • 消費串行化(單線程消費分區)

這種設計在需要嚴格順序性的場景(如金融交易、訂單處理)中非常關鍵。
RocketMQ 的順序消息需要從 發送、存儲、消費 三個環節嚴格控制順序性,因此引入了三把鎖:

鎖類型作用階段實現方式目標
發送端消息發送階段, 固定 MessageQueue,確保同一業務邏輯的消息發送到同一個 MessageQueue,為后續的存儲和消費順序性奠定基礎MessageQueueSelector消息分區一致性
Broker消息存儲階段, 確保同一 MessageQueue的消息按順序寫入磁盤CommmitLog順序寫機制+ConsumeQueue 分區索引,每個 MessageQueue 對應一個 ConsumeQueue 文件,記錄消息在 CommitLog 中的位置(偏移量),確保同一 MessageQueue 的消息在 ConsumeQueue 中的順序性消息存儲順序性
消費鎖消息消費階段,通過三把鎖確保同一 MessageQueue的消息單線程串行消費分布式鎖 + 本地鎖 + ProcessQueue 鎖
分布式鎖:消費者向 Broker 申請分布式鎖(默認每 20 秒續簽),確保同一消費組內只有一個消費者能消費該 MessageQueue
本地鎖:通過 MessageQueueLock 的 Synchronized 鎖,確保同一消費者線程池中只有一個線程處理該 MessageQueue
ProcessQueue 鎖:通過 ProcessQueue 的 ReentrantLock(consumeLock),防止消費過程中因負載均衡或重平衡導致 ProcessQueue 被刪除
通過這三個組合確保同一 MessageQueue 的消息由單線程串行消費,避免多線程并發導致順序錯亂
消息消費順序性

1.順序消息的全流程

1.1 發送階段:消息分區

生產者通過 MessageQueueSelector(如 ShardingKeySelector)將 同一業務邏輯的消息(如同一訂單 ID)發送到 同一個 MessageQueue

當我們作為MQ的生產者需要發送順序消息時,需要在Send方法中,傳入一個MessageQueueSelector
MessageQueueSelector中需要實現一個select方法,這個方法就是用來定義要把消息發送到哪個MessageQueue的,通常可以使用取模法進行路由:

    public void send() {SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;//根據參數,計算出一個要接收消息的MessageQueue的下標int index = id % mqs.size();//返回這個MessageQueuereturn mqs.get(index);}}, orderId);}

通過以上形式就可以將需要有序的消息發送到同一個隊列中。需要注意的是,這里需要使用同步發送的方式!

1.2.存儲階段:順序寫入

Broker 接收到消息后,根據 MessageQueue 的物理分區(CommitLog + ConsumeQueue)進行 順序寫入。

同一 MessageQueue 的消息在物理文件中 RocketMQ 通過單線程寫入 CommitLog 保證順序性,按順序追加寫入,保證存儲順序性。

消息按照順序發送的消息隊列中之后,那么,消費者如何按照發送順序進行消費呢?

1.3.消費階段:串行消費

RocketMQ的MessageListener回調函數提供了兩種消費模式:

  • 有序消費模式MessageListenerOrderly
  • 并發消費模式MessageListenerConcurrently。

所以,想要實現順序消費,需要使用MessageListenerOrderly模式接收消息:

        consumer.registerMessagelistener(new MessagelistenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {System.out.printf("Receive order msg:" + new String(msgs.get(0) .getBody( )));return ConsumeOrderlyStatus.SUCCESS;}}, new ConsumeOrderlyContext());

當我們用以上方式注冊一個消費之后,為了保證同一個隊列中的有序消息可以被順序消費,就要保證RocketMQ Broker只會把消息發送到同一個消費者上,這時候就需要加鎖了。

在實現中,ConsumeMessageOrderlyService 初始化的時候,會啟動一個定時任務,會嘗試向 Broker 為當前消費者客戶端申請分布式鎖(第一把鎖)。如果獲取成功,那么后續消息將會只發給這個Consumer。

接下來在消息拉取的過程中,消費者會一次性拉取多條消息的,并且會將拉取到的消息放入 ProcessQueue,同時將消息提交到消費線程池進行執行。

那么拉取之后的消費過程,怎么保證順序消費呢?這里就需要更多的鎖了

RocketMQ在消費的過程中,需要申請 MessageQueue 鎖,消費線程通過 MessageQueueLock 的 synchronized 鎖(第二把鎖)住當前 MessageQueue 的消費過程,確保在同一時間,一個隊列中只有一個線程能處理列中的消息。

獲取到 MessageQueue 的鎖后,就可以從ProcessQueue中依次拉取一批消息處理了,但是這個過程中,為了保證消息不會出現重復消費,還需要對ProcessQueue進行加鎖,通過 ProcessQueue 的 ReentrantLock。然后就可以開始處理業務邏輯了

總結下來就是三次加鎖:

  • 首先鎖定Broker上的MessageQueue,確保消息只會投遞到唯一的消費者(分布式鎖)
  • 然后對本地的MessageQueue加鎖,確保只有一個線程能處理這個消息隊列。(synchronized)
  • 最后對存儲消息的ProcessQueue加鎖,確保在重平衡的過程中不會出現消息的重復消費。(ReentrantLock)

在這里插入圖片描述

里面有幾個點需要大家注意下:

2.第三把鎖有什么用?

前面介紹客戶端加鎖過程中,一共加了三把鎖,那么,有沒有想過這樣一個問題,第三把鎖如果不加的話,是不是也沒問題?

因為我們已經對MessageQueue加鎖了,為啥還需要對ProcessQueue再次加鎖呢?

這里其實主要考慮的是重平衡(Rebalance)的問題

當我們的消費者集群,新增了一些消費者,發生重平衡的時候,某個隊列可能會原來屬于客戶端A消費的,但是現在要重新分配給客戶端B了。

這時候客戶端A就需要把自己加在Broker上的鎖解掉,而在這個解鎖的過程中,就需要確保消息不能在消費過程中就被移除了,因為如果客戶端A可能正在處理一部分消息,但是位點信息還沒有提交,如果客戶端B立馬去消費隊列中的消息,那存在一部分數據會被重復消費

那么如何判斷消息是否正在消費中呢,就需要通過這個ProcessQueue上面的鎖來判斷了,也就是說在解鎖的線程也需要嘗試對ProcessQueue進行加鎖,加鎖成功才能進行解鎖操作。以避免過程中有消息消費。

3.順序消費存在的問題

通過上面的介紹,我們知道了RocketMQ的順序消費是通過在消費者上多次加鎖實現的,這種方式帶來的問題就是會降低吞吐量,并且如果前面的消息阻寨,會導致更多消息阻塞。所以,順序消息需要慎用。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/91985.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/91985.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/91985.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

zabbix平臺無法刪除已停用主機的處理案例

在zabbix平臺上刪除已停用的主機&#xff0c;提示“SQL描述式執行已失敗: "DELETE FROM items WHERE (itemid IN &#xff08;.....)”&#xff0c;無法刪除&#xff0c;本文為處理情況。一、問題現象在zabbix平臺上刪除已停用的主機&#xff0c;提示“SQL描述式執行已失敗…

【計算機網絡】6應用層

1.網絡應用模型 特性 客戶/服務器模型(Client-Server, C/S) 對等模型(Peer-to-Peer, P2P) 中心化 是(依賴服務器) 否(去中心化) 角色特點 服務器 客戶機 無中心服務器 提供計算服務 請求計算服務 每個節點(Peer)既是客戶機也是服務器 永久在線 間歇接入網絡 節點間…

基于 Spring Boot + Vue 實現人臉采集功能全流程

一、技術棧與依賴引入 后端依賴 (pom.xml) <!-- 百度AI SDK --> <dependency><groupId>com.baidu.aip</groupId><artifactId>java-sdk</artifactId><version>4.16.19</version><exclusions><exclusion><grou…

《Python基礎》第3期:使用PyCharm編寫Hello World

我們寫文檔大多用 Word、寫表格大多用 Excel、寫幻燈片大多用 PPT。 寫代碼也需要一個軟件作為編輯器&#xff08;傳說的大神用記事本寫代碼純屬玩笑了&#xff0c;越是大神越追求效率&#xff0c;用的軟件功能越強&#xff09;。 Python 現在已經有了非常多的代碼編輯器&#…

我的第一個開源項目:排序算法的多種實現方式

以 排序算法 為例&#xff0c;展示如何在 Python 中進行不同實現方式的對比項目概述本項目旨在通過 Python 實現幾種經典的排序算法&#xff0c;并通過性能對比、代碼注釋和優化手段&#xff0c;為開源社區提供參考。選擇排序、冒泡排序、快速排序和歸并排序作為主要算法&#…

5G-LEO - 用于 5g satellite 鏈接的 OpenAirInterface? 擴展

目標&#xff1a;5G-LEO 旨在加速 OAI 作為開源工具的發展&#xff0c;允許衛星通信社區交流和比較 5G NTN 結果&#xff0c;并促進研發活動的合作。擴展的OAI軟件庫被視為開發早期原型的重要工具&#xff0c;用于驗證關鍵的5G NTN設計方面&#xff0c;并為3GPP標準化過程提供及…

基于 Mybatis 框架*的完整開發流程與順序

基于 MyBatis 框架 的完整開發流程與順序一、環境準備階段1. 新建 Maven 項目&#xff08;或普通 Java 項目&#xff09;作用&#xff1a;用 Maven 統一管理依賴&#xff0c;自動下載 MyBatis、MySQL 驅動等 Jar 包操作&#xff1a;IDE&#xff08;如 IDEA&#xff09;選 Maven…

機械學習--決策樹(實戰案例)

決策樹分兩種分類和回歸&#xff0c;這篇博客我將對兩種方法進行實戰講解一、分類決策樹代碼的核心任務是預測 “電信客戶流失狀態”&#xff0c;這是一個典型的分類任務數據集附在該博客上&#xff0c;可以直接下載代碼整體結構整理代碼主要分為以下幾個部分&#xff1a;導入必…

SQL154 插入記錄(一)

描述牛客后臺會記錄每個用戶的試卷作答記錄到exam_record表&#xff0c;現在有兩個用戶的作答記錄詳情如下&#xff1a;用戶1001在2021年9月1日晚上10點11分12秒開始作答試卷9001&#xff0c;并在50分鐘后提交&#xff0c;得了90分&#xff1b;用戶1002在2021年9月4日上午7點1分…

BeanFactory 和 ApplicationContext 的區別?

口語化答案好的&#xff0c;面試官。BeanFactory和ApplicationContext都是用于管理Bean的容器接口。BeanFactory功能相對簡單。提供了Bean的創建、獲取和管理功能。默認采用延遲初始化&#xff0c;只有在第一次訪問Bean時才會創建該Bean。因為功能較為基礎&#xff0c;BeanFact…

VNC連接VirtualBox中的Ubuntu24.04 desktop圖形化(GUI)界面

測試環境&#xff1a;VirtualBox 7,Ubuntu24.04 desktop,Ubuntu24.04 server(no desktop) 一、下載和安裝dRealVNC viewer。 二、配置 VirtualBox 網絡&#xff1a;NAT 模式 端口轉發 1、打開 VirtualBox&#xff0c;選擇您的 Ubuntu 虛擬機&#xff0c;點擊 設置。 選擇 網…

浮動路由和BFD配置

拓撲圖 前期的拓撲圖沒有交換機配置步驟 1、配置IP地址 終端IP地址的配置 路由器IP地址的配置 配置router的對應接口的IP地址 <Huawei>sys [Huawei]sysname router [router]interface Ethernet 0/0/0 [router-Ethernet0/0/0]ip address 192.168.10.254 24 [router-Ethern…

Docker 實戰 -- Nextcloud

文章目錄前言1. 創建 docker-compose.yml2. 啟動 Nextcloud3. 訪問 Nextcloud4. 配置優化&#xff08;可選&#xff09;使用 PostgreSQL使用 redis添加 Cron 后臺任務5. 常用命令6. 反向代理&#xff08;Nginx/Apache&#xff09;前言 當你迷茫的時候&#xff0c;請點擊 Docke…

【計算機網絡 | 第2篇】計算機網絡概述(下)

文章目錄七.因特網服務提供商&#x1f95d;八.接入網&#x1f95d;主流的家庭寬帶接入方式介入網工作原理&#x1f9d0;DSL技術&#xff1a;銅線上的“三通道”通信DSL的速率標準呈現出顯著的"不對稱"特征&#x1f914;電纜互聯網接入技術&#x1f34b;?&#x1f7e…

SpringMVC 6+源碼分析(四)DispatcherServlet實例化流程 3--(HandlerAdapter初始化)

一、概述 HandlerAdapter 是 Spring MVC 框架中的一個核心組件&#xff0c;它在 DispatcherServlet 和處理程序&#xff08;handler&#xff09;之間扮演適配器的角色。DispatcherServlet 接收到 HTTP 請求后&#xff0c;需要調用對應的 handler 來處理請求&#xff08;如控制器…

【lucene】FastVectorHighlighter案例

下面給出一套可直接拷貝運行的 Lucene 8.5.0 FastVectorHighlighter 完整示例&#xff08;JDK 8&#xff09;&#xff0c;演示從建索引、查詢到高亮的全過程。 > 關鍵點&#xff1a;字段必須 1. 存儲原始內容&#xff08;setStored(true)&#xff09; 2. 開啟 TermVecto…

C++返回值優化(RVO):高效返回對象的藝術

在C開發中&#xff0c;按值返回對象的場景十分常見&#xff08;如運算符重載、工廠函數等&#xff09;&#xff0c;但開發者常因擔憂“構造/析構的性能開銷”而陷入糾結&#xff1a;該不該返回對象&#xff1f;如何避免額外成本&#xff1f;本文將剖析痛點、拆解錯誤思路&#…

用 PyTorch 實現一個簡單的神經網絡:從數據到預測

PyTorch 是目前最流行的深度學習框架之一&#xff0c;以其靈活性和易用性受到開發者的喜愛。本文將帶你從零開始&#xff0c;用 PyTorch 實現一個簡單的神經網絡&#xff0c;用于解決經典的 MNIST 手寫數字分類問題。我們將涵蓋數據準備、模型構建、訓練和預測的完整流程&#…

四級頁表通俗講解與實踐(以 64 位 ARM Cortex-A 為例)

&#x1f4d6; &#x1f3a5; B 站博文精講視頻&#xff1a;點擊鏈接&#xff0c;配合視頻深度學習 四級頁表通俗講解與實踐&#xff08;以 64 位 ARM Cortex-A 為例&#xff09; 本文面向希望徹底理解現代 64 位架構下四級頁表的開發者&#xff0c;結合 ARM Cortex-A 系列處理…

AI模型整合包上線!一鍵部署ComfyUI,2.19TB模型全解析

最近體驗了AIStarter平臺上線的AI模型整合包&#xff0c;包含2.19TB ComfyUI大模型&#xff0c;整合市面主流模型&#xff0c;一鍵部署ComfyUI&#xff0c;省去重復下載煩惱&#xff01;以下是使用心得和部署步驟&#xff0c;適合AI開發者參考。工具亮點這款AI模型整合包由熊哥…