看本章之前強烈建議先去看博主的這篇博客
????????? ? ??RabbitMQ--消費端單線程與多線程-CSDN博客
一、消息順序性概念
消息順序性是指消息在生產者發送的順序和消費者接收處理的順序保持一致。
二、RabbitMQ 順序性保證機制
情況 順序保證情況 備注 單隊列,單消費者 消息嚴格按發送順序消費 最簡單且唯一保證順序的場景 單隊列,多個消費者 無法保證全局順序,但可以設置 QoS 保證消費者串行處理自己收到的消息 通過 basicQos(1)
保證每個消費者一次只處理一條消息,但整體隊列消息按消費者分配,順序不保證消息確認和重發機制 如果未正確使用 ack,消息重發可能導致順序亂 需開啟手動確認,確保消息處理完畢后才 ack 消息重試與死信機制 可能導致消息順序錯亂 需要設計合理的重試策略和死信隊列策略
三、順序性的保證方式
單隊列單消費者
保證消息完全順序消費。適合嚴格順序場景。
消息確認機制
使用手動確認
autoAck=false
,處理完后再basicAck
,防止消息亂序重發。QoS(basicQos)
設置
basicQos(1)
,保證消費者一次只處理一條消息,避免多條消息并發處理導致亂序。業務分區設計
按某個字段(比如訂單ID)分區到不同隊列,保證分區內順序。
四、原生 Java 示例
1. 依賴
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version> </dependency>
2. 生產者代碼
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;public class Producer {private static final String QUEUE_NAME = "order_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 聲明隊列,持久化channel.queueDeclare(QUEUE_NAME, true, false, false, null);for (int i = 1; i <= 10; i++) {String message = "Order Message " + i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("Sent: " + message);Thread.sleep(100); // 模擬發送間隔}}} }
3. 消費者代碼(單個消費者,保證順序)
import com.rabbitmq.client.*;public class Consumer {private static final String QUEUE_NAME = "order_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 聲明隊列channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 設置每次只處理一條消息,避免亂序channel.basicQos(1);System.out.println("Waiting for messages...");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());System.out.println("Received: " + message);try {// 模擬處理消息Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();} finally {// 手動確認消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);System.out.println("Ack sent for: " + message);}};// 關閉自動確認,開啟手動確認channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});} }
4. 多消費者并發消費注意事項
多個消費者消費同一隊列,消息分發是輪詢,整體消息順序無法保證。
basicQos(1)
只保證單個消費者串行處理自己拿到的消息,但多個消費者間消息順序無保證。若需要嚴格順序,需要保證單消費者消費或者分隊列處理。
五、Spring Boot 示例
1.
pom.xml
依賴<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.
application.yml
spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestlistener:simple:# 每個消費者預取消息數量,類似 basicQos(1)prefetch: 1
3. RabbitMQ 配置類
import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;@Configuration public class RabbitConfig {public static final String QUEUE_NAME = "order_queue";@Beanpublic Queue orderQueue() {return new Queue(QUEUE_NAME, true); // 持久化隊列} }
4. 生產者代碼
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;@Component public class Producer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessages() throws InterruptedException {for (int i = 1; i <= 10; i++) {String message = "Order Message " + i;rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_NAME, message);System.out.println("Sent: " + message);Thread.sleep(100);}} }
5. 消費者代碼
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;@Component public class Consumer {@RabbitListener(queues = RabbitConfig.QUEUE_NAME)public void receiveMessage(String message) throws InterruptedException {System.out.println("Received: " + message);// 模擬消息處理時間,確保消息順序Thread.sleep(500);System.out.println("Processed: " + message);} }
6. 主啟動類
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication public class RabbitOrderApplication implements CommandLineRunner {@Autowiredprivate Producer producer;public static void main(String[] args) {SpringApplication.run(RabbitOrderApplication.class, args);}@Overridepublic void run(String... args) throws Exception {producer.sendMessages();} }
六、總結
方面 說明 單隊列單消費者 保證嚴格消息順序,消息先進先出。 單隊列多消費者 消息輪詢分發,整體順序無法保證;設置 basicQos(1)
保證單個消費者順序處理自己的消息。消息確認機制 手動 ack,避免消息未處理完成就確認導致順序亂。 Spring Boot 配置 spring.rabbitmq.listener.simple.prefetch=1
控制每個消費者預取消息數。業務設計建議 對于嚴格順序場景,推薦單隊列單消費者或消息分區+單消費者方案。 如果要嚴格保證消息順序性:
????????1. 單隊列單消費者?
????????2.?多消費者分區順序
????????????????當你只要求 “某一類業務 ID 下的順序”一致,如訂單、用戶、設備號等,而不要求全局順序時,這種方案很好。
? ? ? ? ? ? ? ? 不能做到全局順序消費!
? ? ? ? ? ? ? ? ? ? ? ? 不同隊列之間順序是無法控制的
? ? ? ? ? ? ? ? ? ? ? ? 比如
order_1
和order_5
屬于不同分區,它們的處理時間會交叉,整體順序就亂了。多消費者分區順序代碼樣例
利用多個隊列(分區),每個隊列綁定一個消費者,保證隊列內消息順序;
生產者根據某個分區鍵(如訂單ID哈希)選擇發送到對應隊列,保證同一個分區的消息順序。
多消費者分區順序消費示例(Spring Boot)
1. 項目結構與依賴
pom.xml
添加:<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2. 配置類:定義多個隊列與交換機綁定
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;@Configuration public class RabbitConfig {public static final int PARTITION_COUNT = 3;@Beanpublic DirectExchange directExchange() {return new DirectExchange("order_exchange");}@Beanpublic Queue queue0() {return new Queue("order_queue_0", true);}@Beanpublic Queue queue1() {return new Queue("order_queue_1", true);}@Beanpublic Queue queue2() {return new Queue("order_queue_2", true);}@Beanpublic Binding binding0(Queue queue0, DirectExchange directExchange) {return BindingBuilder.bind(queue0).to(directExchange).with("partition_0");}@Beanpublic Binding binding1(Queue queue1, DirectExchange directExchange) {return BindingBuilder.bind(queue1).to(directExchange).with("partition_1");}@Beanpublic Binding binding2(Queue queue2, DirectExchange directExchange) {return BindingBuilder.bind(queue2).to(directExchange).with("partition_2");} }
3. 生產者:根據訂單ID哈希選擇分區,發送到對應RoutingKey
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;@Component public class Producer {@Autowiredprivate RabbitTemplate rabbitTemplate;private static final int PARTITION_COUNT = RabbitConfig.PARTITION_COUNT;public void sendOrder(String orderId, String message) {int partition = Math.abs(orderId.hashCode()) % PARTITION_COUNT;String routingKey = "partition_" + partition;rabbitTemplate.convertAndSend("order_exchange", routingKey, message);System.out.println("Sent to " + routingKey + ": " + message);} }
4. 消費者:為每個隊列配置單獨消費者,保證分區順序
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;@Component public class Consumer {@RabbitListener(queues = "order_queue_0")public void receivePartition0(String message) {System.out.println("Partition 0 received: " + message);// 業務處理,保證隊列內順序}@RabbitListener(queues = "order_queue_1")public void receivePartition1(String message) {System.out.println("Partition 1 received: " + message);}@RabbitListener(queues = "order_queue_2")public void receivePartition2(String message) {System.out.println("Partition 2 received: " + message);} }
5. 測試調用示例(主程序)
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication public class PartitionOrderApplication implements CommandLineRunner {@Autowiredprivate Producer producer;public static void main(String[] args) {SpringApplication.run(PartitionOrderApplication.class, args);}@Overridepublic void run(String... args) throws Exception {// 發送多條訂單消息,orderId不同分區for (int i = 0; i < 20; i++) {String orderId = "order" + i;String message = "Order message for " + orderId;producer.sendOrder(orderId, message);Thread.sleep(100);}} }
6. 說明
消息根據訂單ID哈希決定發送哪個隊列
每個隊列由單個消費者消費,保證該分區消息順序
多個隊列+多消費者,實現并發消費和分區順序
🔁 順序保證范圍
粒度 保證情況 同一個 orderId
? 順序消費(始終落在同一隊列) 不同 orderId
? 不保證順序(本來就不是要求)
? 結論
你這套方案:
👍 是 Spring Boot 下 RabbitMQ 順序消費的推薦做法
👍 保證了“每個訂單 ID 的消息順序”
👍 可擴展,增加分區數提升并發能力