RabbitMQ--消息順序性

看本章之前強烈建議先去看博主的這篇博客

????????? ? ??RabbitMQ--消費端單線程與多線程-CSDN博客

一、消息順序性概念

消息順序性是指消息在生產者發送的順序消費者接收處理的順序保持一致。


二、RabbitMQ 順序性保證機制

情況順序保證情況備注
單隊列,單消費者消息嚴格按發送順序消費最簡單且唯一保證順序的場景
單隊列,多個消費者無法保證全局順序,但可以設置 QoS 保證消費者串行處理自己收到的消息通過 basicQos(1) 保證每個消費者一次只處理一條消息,但整體隊列消息按消費者分配,順序不保證
消息確認和重發機制如果未正確使用 ack,消息重發可能導致順序亂需開啟手動確認,確保消息處理完畢后才 ack
消息重試與死信機制可能導致消息順序錯亂需要設計合理的重試策略和死信隊列策略


三、順序性的保證方式

  1. 單隊列單消費者

    • 保證消息完全順序消費。適合嚴格順序場景。

  2. 消息確認機制

    • 使用手動確認 autoAck=false,處理完后再 basicAck,防止消息亂序重發。

  3. QoS(basicQos)

    • 設置 basicQos(1),保證消費者一次只處理一條消息,避免多條消息并發處理導致亂序。

  4. 業務分區設計

    • 按某個字段(比如訂單ID)分區到不同隊列,保證分區內順序。


四、原生 Java 示例


1. 依賴

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version>
</dependency>

2. 生產者代碼

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer {private static final String QUEUE_NAME = "order_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 聲明隊列,持久化channel.queueDeclare(QUEUE_NAME, true, false, false, null);for (int i = 1; i <= 10; i++) {String message = "Order Message " + i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("Sent: " + message);Thread.sleep(100);  // 模擬發送間隔}}}
}

3. 消費者代碼(單個消費者,保證順序)

import com.rabbitmq.client.*;public class Consumer {private static final String QUEUE_NAME = "order_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 聲明隊列channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 設置每次只處理一條消息,避免亂序channel.basicQos(1);System.out.println("Waiting for messages...");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());System.out.println("Received: " + message);try {// 模擬處理消息Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();} finally {// 手動確認消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);System.out.println("Ack sent for: " + message);}};// 關閉自動確認,開啟手動確認channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});}
}

4. 多消費者并發消費注意事項

  • 多個消費者消費同一隊列,消息分發是輪詢,整體消息順序無法保證

  • basicQos(1) 只保證單個消費者串行處理自己拿到的消息,但多個消費者間消息順序無保證。

  • 若需要嚴格順序,需要保證單消費者消費或者分隊列處理。


五、Spring Boot 示例


1. pom.xml 依賴

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. application.yml

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestlistener:simple:# 每個消費者預取消息數量,類似 basicQos(1)prefetch: 1

3. RabbitMQ 配置類

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {public static final String QUEUE_NAME = "order_queue";@Beanpublic Queue orderQueue() {return new Queue(QUEUE_NAME, true); // 持久化隊列}
}

4. 生產者代碼

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class Producer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessages() throws InterruptedException {for (int i = 1; i <= 10; i++) {String message = "Order Message " + i;rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_NAME, message);System.out.println("Sent: " + message);Thread.sleep(100);}}
}

5. 消費者代碼

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class Consumer {@RabbitListener(queues = RabbitConfig.QUEUE_NAME)public void receiveMessage(String message) throws InterruptedException {System.out.println("Received: " + message);// 模擬消息處理時間,確保消息順序Thread.sleep(500);System.out.println("Processed: " + message);}
}

6. 主啟動類

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class RabbitOrderApplication implements CommandLineRunner {@Autowiredprivate Producer producer;public static void main(String[] args) {SpringApplication.run(RabbitOrderApplication.class, args);}@Overridepublic void run(String... args) throws Exception {producer.sendMessages();}
}

六、總結

方面說明
單隊列單消費者保證嚴格消息順序,消息先進先出。
單隊列多消費者消息輪詢分發,整體順序無法保證;設置 basicQos(1) 保證單個消費者順序處理自己的消息。
消息確認機制手動 ack,避免消息未處理完成就確認導致順序亂。
Spring Boot 配置spring.rabbitmq.listener.simple.prefetch=1 控制每個消費者預取消息數。
業務設計建議對于嚴格順序場景,推薦單隊列單消費者或消息分區+單消費者方案。

如果要嚴格保證消息順序性:

????????1. 單隊列單消費者?

????????2.?多消費者分區順序

????????????????當你只要求 “某一類業務 ID 下的順序”一致,如訂單、用戶、設備號等,而不要求全局順序時,這種方案很好。

? ? ? ? ? ? ? ? 不能做到全局順序消費!

? ? ? ? ? ? ? ? ? ? ? ? 不同隊列之間順序是無法控制的

? ? ? ? ? ? ? ? ? ? ? ? 比如 order_1order_5 屬于不同分區,它們的處理時間會交叉,整體順序就亂了。

多消費者分區順序代碼樣例

  • 利用多個隊列(分區),每個隊列綁定一個消費者,保證隊列內消息順序;

  • 生產者根據某個分區鍵(如訂單ID哈希)選擇發送到對應隊列,保證同一個分區的消息順序。


多消費者分區順序消費示例(Spring Boot)


1. 項目結構與依賴

pom.xml 添加:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. 配置類:定義多個隊列與交換機綁定

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {public static final int PARTITION_COUNT = 3;@Beanpublic DirectExchange directExchange() {return new DirectExchange("order_exchange");}@Beanpublic Queue queue0() {return new Queue("order_queue_0", true);}@Beanpublic Queue queue1() {return new Queue("order_queue_1", true);}@Beanpublic Queue queue2() {return new Queue("order_queue_2", true);}@Beanpublic Binding binding0(Queue queue0, DirectExchange directExchange) {return BindingBuilder.bind(queue0).to(directExchange).with("partition_0");}@Beanpublic Binding binding1(Queue queue1, DirectExchange directExchange) {return BindingBuilder.bind(queue1).to(directExchange).with("partition_1");}@Beanpublic Binding binding2(Queue queue2, DirectExchange directExchange) {return BindingBuilder.bind(queue2).to(directExchange).with("partition_2");}
}

3. 生產者:根據訂單ID哈希選擇分區,發送到對應RoutingKey

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class Producer {@Autowiredprivate RabbitTemplate rabbitTemplate;private static final int PARTITION_COUNT = RabbitConfig.PARTITION_COUNT;public void sendOrder(String orderId, String message) {int partition = Math.abs(orderId.hashCode()) % PARTITION_COUNT;String routingKey = "partition_" + partition;rabbitTemplate.convertAndSend("order_exchange", routingKey, message);System.out.println("Sent to " + routingKey + ": " + message);}
}

4. 消費者:為每個隊列配置單獨消費者,保證分區順序

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class Consumer {@RabbitListener(queues = "order_queue_0")public void receivePartition0(String message) {System.out.println("Partition 0 received: " + message);// 業務處理,保證隊列內順序}@RabbitListener(queues = "order_queue_1")public void receivePartition1(String message) {System.out.println("Partition 1 received: " + message);}@RabbitListener(queues = "order_queue_2")public void receivePartition2(String message) {System.out.println("Partition 2 received: " + message);}
}

5. 測試調用示例(主程序)

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class PartitionOrderApplication implements CommandLineRunner {@Autowiredprivate Producer producer;public static void main(String[] args) {SpringApplication.run(PartitionOrderApplication.class, args);}@Overridepublic void run(String... args) throws Exception {// 發送多條訂單消息,orderId不同分區for (int i = 0; i < 20; i++) {String orderId = "order" + i;String message = "Order message for " + orderId;producer.sendOrder(orderId, message);Thread.sleep(100);}}
}

6. 說明

  • 消息根據訂單ID哈希決定發送哪個隊列

  • 每個隊列由單個消費者消費,保證該分區消息順序

  • 多個隊列+多消費者,實現并發消費和分區順序

🔁 順序保證范圍

粒度保證情況
同一個 orderId? 順序消費(始終落在同一隊列)
不同 orderId? 不保證順序(本來就不是要求)

? 結論

你這套方案:

  • 👍 是 Spring Boot 下 RabbitMQ 順序消費的推薦做法

  • 👍 保證了“每個訂單 ID 的消息順序

  • 👍 可擴展,增加分區數提升并發能力

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/916038.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/916038.shtml
英文地址,請注明出處:http://en.pswp.cn/news/916038.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

.net core接收對方傳遞的body體里的json并反序列化

1、首先我在通用程序里有一個可以接收對象型和數組型json串的反序列化方法public static async Task<Dictionary<string, string>> AllParameters(this HttpRequest request){Dictionary<string, string> parameters QueryParameters(request);request.Enab…

(10)機器學習小白入門 YOLOv:YOLOv8-cls 模型評估實操

YOLOv8-cls 模型評估實操 (1)機器學習小白入門YOLOv &#xff1a;從概念到實踐 (2)機器學習小白入門 YOLOv&#xff1a;從模塊優化到工程部署 (3)機器學習小白入門 YOLOv&#xff1a; 解鎖圖片分類新技能 (4)機器學習小白入門YOLOv &#xff1a;圖片標注實操手冊 (5)機器學習小…

Vue 腳手架基礎特性

一、ref屬性1.被用來給元素或子組件注冊引用信息&#xff08;id的替代者&#xff09;2.應用在html標簽上獲取的是真實DOM元素&#xff0c;用在組件標簽上是組件實例對象3.使用方式&#xff1a;(1).打標識&#xff1a;<h1 ref"xxx">...</h1> 或 <Schoo…

Ubuntu安裝k8s集群入門實踐-v1.31

準備3臺虛擬機 在自己電腦上使用virtualbox 開了3臺1核2G的Ubuntu虛擬機&#xff0c;你可以先安裝好一臺&#xff0c;安裝第一臺的時候配置臨時調高到2核4G&#xff0c;安裝速度會快很多&#xff0c;安裝完通過如下命令關閉桌面&#xff0c;能夠省內存占用&#xff0c;后面我們…

Word Press富文本控件的保存

新建富文本編輯器&#xff0c;并編寫save方法如下&#xff1a; edit方法&#xff1a; export default function Edit({ attributes, setAttributes }) {return (<><div { ...useBlockProps() }><RichTexttagNameponChange{ (value) > setAttributes({ noteCo…

【編程趣味游戲】:基于分支循環語句的猜數字、關機程序

&#x1f31f;菜鳥主頁&#xff1a;晨非辰的主頁 &#x1f440;學習專欄&#xff1a;《C語言學習》 &#x1f4aa;學習階段&#xff1a;C語言方向初學者 ?名言欣賞&#xff1a;"編程的核心是實踐&#xff0c;而非空談" 目錄 1. 游戲1--猜數字 1.1 rand函數 1.2 sr…

UE5 UI 控件切換器

文章目錄分類作用屬性分類 面板 作用 可以根據索引切換要顯示哪個子UI&#xff0c;可以擁有多個子物體&#xff0c;但是任何時間只能顯示一個 屬性 在這里指定要顯示的UI的索引

scikit-learn 包

文章目錄scikit-learn 包核心功能模塊案例其他用法**常用功能詳解****(1) 分類任務示例&#xff08;SVM&#xff09;****(2) 回歸任務示例&#xff08;線性回歸&#xff09;****(3) 聚類任務示例&#xff08;K-Means&#xff09;****(4) 特征工程&#xff08;PCA降維&#xff0…

Excel 將數據導入到SQLServer數據庫

一般系統上線前期都會導入期初數據&#xff0c;業務人員一般要求你提供一個Excel模板&#xff0c;業務人員根據要求整理數據。SQLServer管理工具是支持批量導入數據的&#xff0c;所以我們可以使用該工具導入期初。Excel格式 第一行為字段1、連接登入的數據庫并且選中你需要導入…

剪枝和N皇后在后端項目中的應用

剪枝算法&#xff08;Pruning Algorithm&#xff09; 生活比喻&#xff1a;就像修剪樹枝一樣&#xff0c;把那些明顯不會結果的枝條提前剪掉&#xff0c;節省養分。 在后端項目中的應用場景&#xff1a; 搜索優化&#xff1a;在商品搜索中&#xff0c;如果某個分類下沒有符合條…

cocos 2d游戲中多邊形碰撞器會觸發多次,怎么解決

子彈打到敵機 一發子彈擊中&#xff0c;碰撞回調多次執行 我碰撞組件原本是多邊形碰撞組件 PolygonCollider2D&#xff0c;我改成盒碰撞組件BoxCollider2D 就好了 用前端的節流方式。或者loading處理邏輯。我測試過了&#xff0c;是可以 本來就是多次啊,設計上貌似就是這樣的…

Kubernetes環境中GPU分配異常問題深度分析與解決方案

Kubernetes環境中GPU分配異常問題深度分析與解決方案 一、問題背景與核心矛盾 在基于Kubernetes的DeepStream應用部署中&#xff0c;GPU資源的獨占性分配是保障應用性能的關鍵。本文將圍繞一個典型的GPU分配異常問題展開分析&#xff1a;多個請求GPU的容器本應獨占各自的GPU&…

Django與模板

我叫補三補四&#xff0c;很高興見到大家&#xff0c;歡迎一起學習交流和進步今天來講一講視圖Django與模板文件工作流程模板引擎&#xff1a;主要參與模板渲染的系統。內容源&#xff1a;輸入的數據流。比較常見的有數據庫、XML文件和用戶請求這樣的網絡數據。模板&#xff1a…

日本上市IT企業|8月25日將在大連舉辦赴日it招聘會

株式會社GSD的核心戰略伙伴貝斯株式會社&#xff0c;將于2025年8月25日在大連香格里拉大酒店商務會議室隆重舉辦赴日技術人才專場招聘會。本次招聘會面向全國范圍內的優秀IT人才&#xff0c;旨在為貝斯株式會社東京本社長期發展招募優質的系統開發與管理人才。招聘計劃&#xf…

低功耗設計雙目協同畫面實現光學變焦內帶AI模型

低功耗設計延長續航&#xff0c;集成儲能模塊保障陰雨天氣下的鐵塔路線的安全一、智能感知與識別技術 多光譜融合監控結合可見光、紅外熱成像、激光補光等技術&#xff0c;實現全天候監測。例如&#xff0c;紅外熱成像可穿透雨霧監測山火隱患&#xff0c;激光補光技術則解決夜間…

datasophon下dolphinscheduler執行腳本出錯

執行hive腳本出錯&#xff1a; 錯誤消息&#xff1a; FAILED: RuntimeException Error loading hooks(hive.exec.post.hooks): java.lang.ClassNotFoundException: org.apache.atlas.hive.hook.HiveHookat java.net.URLClassLoader.findClass(URLClassLoader.java:387)at java.…

【Elasticsearch】安全地刪除快照倉庫、快照

《Elasticsearch 集群》系列&#xff0c;共包含以下文章&#xff1a; 1?? 冷熱集群架構2?? 合適的鍋炒合適的菜&#xff1a;性能與成本平衡原理公式解析3?? ILM&#xff08;Index Lifecycle Management&#xff09;策略詳解4?? Elasticsearch 跨機房部署5?? 快照與恢…

nodejs的npm

1. 什么是 npm&#xff1f; npm&#xff08;Node Package Manager&#xff09; 是 Node.js 的默認包管理工具&#xff0c;用于&#xff1a; 安裝和管理依賴&#xff08;第三方庫、框架等&#xff09;。運行項目腳本&#xff08;如啟動服務、測試、構建等&#xff09;。發布和共…

外網訪問內部私有局域網方案,解決運營商只分配內網IP不給公網IP問題

相信不少網友和我一樣&#xff0c;為了實現遠程控制、NAS訪問、組建私有云、攝像頭監控之類的需求&#xff0c;把光貓改成了橋接模式&#xff0c;并用自己的路由器撥號、進行端口了映射。本人之前一直用著沒啥問題&#xff0c;不過&#xff0c;最近突然出現了無法訪問的情況&am…

大模型——上下文工程 (Context Engineering) – 現代 AI 系統的架構基礎

上下文工程 (Context Engineering) – 現代 AI 系統的架構基礎 最近,AI大神 Andrej Karpathy 在YC的一個演講《Software in the era of AI 》帶火了一個新的概念 Context Engineering,上下文工程,LangChain也于7月2號在官網博客發表以《Context Engineering》為題目的文章(h…