目錄
一、TTL
1.1設置消息的TTL
1.2設置隊列的TTL
1.3兩者之間的區別
二、死信隊列
2.1死信的概念
2.2死信產生的條件:
2.3死信隊列的實現
死信隊列的工作原理
2.4常??試題
三、延遲隊列
3.1概念
3.2應用場景
3.3RabbitMQ 實現延遲隊列的核心原理
1.?基于 TTL(Time-To-Live)+ 死信隊列(Dead Letter Queue)
2.使用RabbitMQ延遲插件?
1. 安裝插件
2. 配置隊列和交換機(Spring Boot)
3. 發送延遲消息
4. 消費延遲消息
5. 測試
注意事項
四、事務
五、消息分發
5.1 介紹
5.2 限流
5.3負載均衡
一、TTL
TTL(Time to Live,過期時間)即過期時間,RabbitMQ可以對消息和隊列設置TTL過期時間
當消息到達存活的時間之后,還沒有被消費就自動清除
類似于 購物訂單超時了沒有付款,訂單被自動取消
1.1設置消息的TTL
目前有兩種方法可以設置消息的TTL
- 設置隊列的TTL,該隊列中的所有消息均為相同的過期時間
- 針對消息本身設置的TTL,每條消息TTL可以不同
但是如果兩種方法一起使用,則會根據兩種方法的較小的數據為準
針對消息本身設置的TTL方法,是在發送消息的方針中加入expiration的屬性參數(單位為毫秒)
完整代碼:
//TTLpublic static final String TTL_QUEUE = "ttl.queue";public static final String TTL_QUEUE2 = "ttl2.queue";public static final String TTL_EXCHANGE = "ttl.exchange";@Configuration
public class TTLRabbitMQ {@Bean("ttlQueue")public Queue ttlQueue(){return QueueBuilder.durable(Constants.TTL_QUEUE).build();}@Bean("ttlExchange")public DirectExchange ttlExchange(){return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).build();}@Bean("ttlBinding")public Binding ttlBinding(@Qualifier("ttlQueue") Queue queue, @Qualifier("ttlExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("ttl").noargs();}
}@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")private RabbitTemplate rabbitTemplate;@RequestMapping("/ttl")public String ttl() {System.out.println("ttl...");rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test 30s...", message -> {message.getMessageProperties().setExpiration("30000"); //單位: 毫秒, 過期時間為30sreturn message;});rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test 10s...", message -> {message.getMessageProperties().setExpiration("10000"); //單位: 毫秒, 過期時間為10sreturn message;});return "消息發送成功";}
}
運行程序,觀察結果
1.可以發現當發送四條消息,兩個隊列的Ready消息均為4,
2.10秒鐘之后, 刷新??, 發現ttl2隊列的消息已被刪除(再過20秒ttl隊列消息也會被刪除)
- 如果不設置TTL,則表?此消息不會過期;如果將TTL設置為0,則表?除?此時可以直接將消息投遞到 消費者,否則該消息會被?即丟棄
1.2設置隊列的TTL
設置隊列TTL的方法是在創建隊列時,加入x-messahe-ttl參數實現(單位是毫米)
完整代碼:
//TTLpublic static final String TTL_QUEUE = "ttl.queue";public static final String TTL_QUEUE2 = "ttl2.queue";public static final String TTL_EXCHANGE = "ttl.exchange";//設置ttl@Bean("ttlQueue2")public Queue ttlQueue2(){return QueueBuilder.durable(Constants.TTL_QUEUE2).ttl(20000).build(); //設置隊列的ttl為20s}@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")private RabbitTemplate rabbitTemplate;@RequestMapping("/ttl2")public String ttl2() {System.out.println("ttl2...");//發送普通消息rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test...");return "消息發送成功";}
}
啟動程序,驗證結果:
運?之后發現,新增了?個隊列, 隊列有?個TTL標識
1.發送消息后, 可以看到, Ready消息為1

- 采?發布訂閱模式, 所有與該交換機綁定的隊列(ttl_queue和ttl_queue2)都會收到消息
2.20秒鐘之后, 刷新??, 發現消息已被刪除
- 由于ttl_queue隊列, 未設置過期時間, 所以ttl_queue的消息未刪除
1.3兩者之間的區別
- 設置隊列TTL屬性的?法, ?旦消息過期, 就會從隊列中刪除
- 設置消息TTL的?法, 即使消息過期, 也不會?上從隊列中刪除, ?是在即將投遞到消費者之前進?判定的.
- 因為設置隊列過期時間, 隊列中已過期的消息肯定在隊列頭部, RabbitMQ只要定期從隊頭開始掃描是否 有過期的消息即可.
- ?設置消息TTL的?式, 每條消息的過期時間不同, 如果要刪除所有過期消息需要掃描整個隊列, 所以不如等到此消息即將被消費時再判定是否過期, 如果過期再進?刪除即可
二、死信隊列
2.1死信的概念
- 死信簡單理解就是因為種種原因,無法被消費的消息,就是死信
- 有死信,自然就有死信隊列.當消息在一個隊列中變成死信之后,它能被重新被發送到另一個交換器中,這個交換器就是DLX(Dead Letter Exchange),綁定DLX的隊列,就稱為死信隊列(Dead LetterQueue,簡稱DLQ).
- RabbitMQ 的死信隊列(Dead Letter Queue,DLQ)是一種特殊的隊列,用于存儲無法被正常消費的消息(即 "死信")。當消息滿足特定條件時,會被從原隊列轉發到死信隊列,這有助于消息的可靠性處理和問題排查。
2.2死信產生的條件:
消息變成死信一般是由一下幾種情況:
- 消息被拒絕(Basic.Reject / Basic.Nack),并且設置了requeue 參數為false
- 消息過期
- 隊列達到最大長度
2.3死信隊列的實現
死信隊列的工作原理
- 為普通隊列設置死信交換機(Dead Letter Exchange,DLX)
- 當消息成為死信時,RabbitMQ 會自動將其發送到設置的死信交換機
- 死信交換機通過綁定關系將消息路由到死信隊列
- 可以專門創建消費者處理死信隊列中的消息
注意:死信隊列和死信交換機 與 普通隊列和普通交換機沒有區別。
實現代碼:
//死信public static final String NORMAL_QUEUE = "normal.queue";public static final String NORMAL_EXCHANGE = "normal.exchange";public static final String DL_QUEUE = "dl.queue";public static final String DL_EXCHANGE= "dl.exchange";//死信相關配置@Configuration
public class DLConfig {//演示TTL+死信隊列模擬的延遲隊列存在問題//制造死信產生的條件@Bean("normalQueue")public Queue normalQueue(){return QueueBuilder.durable(Constants.NORMAL_QUEUE) //綁定普通隊列.deadLetterExchange(Constants.DL_EXCHANGE) //綁定死信交換機.deadLetterRoutingKey("dlx") //綁定死信路由鍵//制造死信條件如下兩種.ttl(10000) //設置TTL 10秒.maxLength(10L) //設置隊列最大長度.build();}/*** 創建正常隊列* 設置死信交換機和死信路由鍵* @return Queue*/
// @Bean("normalQueue")
// public Queue normalQueue(){
// return QueueBuilder.durable(Constants.NORMAL_QUEUE)
// .deadLetterExchange(Constants.DL_EXCHANGE)
// .deadLetterRoutingKey("dlx")
// .build();
// }/*** 創建正常交換機* @return DirectExchange*/@Bean("normalExchange")public DirectExchange normalExchange(){return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build();}/*** 綁定正常隊列到正常交換機* @param queue 正常隊列* @param exchange 正常交換機* @return Binding*/@Bean("normalBinding")public Binding normalBinding(@Qualifier("normalQueue") Queue queue, @Qualifier("normalExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("normal").noargs();}//死信交換機和隊列/*** 創建死信隊列* @return Queue*/@Bean("dlQueue")public Queue dlQueue(){return QueueBuilder.durable(Constants.DL_QUEUE).build();}/*** 創建死信交換機* @return DirectExchange*/@Bean("dlExchange")public DirectExchange dlExchange(){return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();}/*** 綁定死信隊列到死信交換機* @param queue 死信隊列* @param exchange 死信交換機* @return Binding*/@Bean("dlBinding")public Binding dlBinding(@Qualifier("dlQueue") Queue queue, @Qualifier("dlExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("dlx").noargs();}
}
發布消息:
@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")private RabbitTemplate rabbitTemplate;/*** 測試死信* @return*/@RequestMapping("/dl")public String dl() {System.out.println("dl...");//發送普通消息
// rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "dl test...");System.out.printf("%tc 消息發送成功 \n", new Date());
// //測試隊列長度for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "dl test..."+i);}return "消息發送成功";}
}
測試結果:
- 已知普通隊列長度為10和TTL為10秒,當發送20條消息后,觀察隊列的情況
- 當啟動生產者時,可以看到兩個隊列均有十條消息,但因為超出普通隊列長度,導致后十條消息會直接到達死信隊列,等待10后再次觀察,發現原有的消息也會給死信隊列
2.4常??試題
- 死信(Dead Letter)是消息隊列中的?種特殊消息, 它指的是那些?法被正常消費或處理的消息,在消息隊列系統中, 如RabbitMQ, 死信隊列?于存儲這些死信消息
- 消息過期: 消息在隊列中存活的時間超過了設定的TTL
- 消息被拒絕: 消費者在處理消息時, 可能因為消息內容錯誤, 處理邏輯異常等原因拒絕處理該消息. 如果拒絕時指定不重新?隊(requeue=false), 消息也會成為死信.
- 隊列滿了: 當隊列達到最??度, ?法再容納新的消息時, 新來的消息會被處理為死信.
- 消息重試機制:對于處理失敗的消息,可以在死信隊列中進行重試
- 延遲任務:利用消息過期時間,實現延遲任務(如訂單超時取消)
- 異常監控:通過監控死信隊列,及時發現系統問題
- 數據恢復:死信隊列可作為消息的備份,便于數據恢復
三、延遲隊列
3.1概念
在 RabbitMQ 中,延遲隊列(Delay Queue)?是一種特殊的消息隊列,用于存儲需要在指定時間后才被消費的消息。它的核心作用是實現消息的 “延時投遞”,即消息發送后不會立即被消費者處理,而是等待預設的延遲時間后才進入消費流程。
3.2應用場景
- 訂單超時取消:用戶下單后,若 30 分鐘內未支付,自動取消訂單并釋放庫存。
- 定時任務觸發:例如每天凌晨 2 點執行數據備份、定時發送提醒消息等。
- 失敗重試機制:當某個操作失敗時,延遲一段時間后重試(如接口調用失敗后,5 分鐘后再次嘗試)。
- 消息通知延遲:如用戶注冊成功后,1 小時后發送新手引導郵件。
3.3RabbitMQ 實現延遲隊列的核心原理
RabbitMQ 本身沒有直接提供 “延遲隊列” 的功能,但可以通過以下兩種方式間接實現:
1.?基于 TTL(Time-To-Live)+ 死信隊列(Dead Letter Queue)
這是最常用的實現方式,利用了 RabbitMQ 的兩個特性:
- TTL(消息存活時間):設置消息的過期時間,當消息超過該時間未被消費時,會變成 “死信”(Dead Letter)。
- 死信隊列(DLQ):為隊列配置 “死信交換機(Dead Letter Exchange)”,當消息成為死信后,會被自動路由到死信交換機綁定的隊列(即死信隊列),消費者從死信隊列中獲取消息,實現延遲效果。
@Configuration
public class DelayConfig {//延遲隊列public static final String DELAY_QUEUE = "delay.queue";public static final String DELAY_EXCHANGE = "delay.exchange";@Bean("delayQueue")public Queue delayQueue(){return QueueBuilder.durable(Constants.DELAY_QUEUE).build();}@Bean("delayExchange")public Exchange delayExchange(){return ExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE).delayed().build();}@Bean("delayBinding")public Binding delayBinding(@Qualifier("delayQueue") Queue queue, @Qualifier("delayExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("delay").noargs();}
}@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")private RabbitTemplate rabbitTemplate;* 延遲隊列測試 - 使用 TTL 實現延遲效果* 通過設置消息的過期時間來模擬延遲隊列的效果* @return 發送結果提示信息*/@RequestMapping("/delay")public String delay() {System.out.println("delay...");rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "ttl test 10s...", message -> {message.getMessageProperties().setExpiration("10000"); //單位: 毫秒, 過期時間為10sreturn message;});rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "ttl test 30s...", message -> {message.getMessageProperties().setExpiration("30000"); //單位: 毫秒, 過期時間為30sreturn message;});System.out.printf("%tc 消息發送成功 \n", new Date());return "消息發送成功";}/*** 延遲隊列測試 - 使用 RabbitMQ 延遲插件實現延遲效果* 利用 RabbitMQ 的延遲插件來精確控制消息的延遲時間* @return 發送結果提示信息*/@RequestMapping("/delay2")public String delay2() {System.out.println("delay2...");rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay", "delay test 30s...", message -> {message.getMessageProperties().setDelayLong(30000L); //單位: 毫秒, 延遲時間為30sreturn message;});rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay", "delay test 10s...", message -> {message.getMessageProperties().setDelayLong(10000L); //單位: 毫秒, 延遲時間為10sreturn message;});System.out.printf("%tc 消息發送成功 \n", new Date());return "消息發送成功";}}
2.使用RabbitMQ延遲插件?
RabbitMQ 官方提供了一個插件?rabbitmq_delayed_message_exchange
,專門用于實現延遲隊列,功能更強大且靈活。
特點:
- 支持為每個消息單獨設置延遲時間(無需依賴死信隊列)。
- 延遲精度更高,避免了 TTL + 死信隊列方式中 “消息堆積導致的延遲偏差” 問題。
實現步驟:
- 安裝插件:在 RabbitMQ 服務器上安裝?
rabbitmq_delayed_message_exchange
?插件(需重啟 RabbitMQ 生效)。- 聲明一個類型為?
x-delayed-message
?的交換機,并指定延遲類型(如?x-delayed-type: direct
)。- 發送消息時,通過?
x-delay
?頭部字段設置延遲時間(單位:毫秒)。- 交換機在延遲時間到達后,會自動將消息路由到綁定的隊列,消費者直接從隊列中獲取消息。
兩種實現方式的對比
實現方式 | 優點 | 缺點 | 適用場景 |
---|---|---|---|
TTL + 死信隊列 | 無需額外插件,依賴 RabbitMQ 原生特性 | 1. 隊列級 TTL 不支持單消息單獨設置延遲; 2. 消息堆積可能導致延遲時間不準 | 延遲時間固定、精度要求不高的場景 |
延遲插件(推薦) | 支持單消息單獨設置延遲,精度高 | 需要額外安裝插件 | 延遲時間靈活、精度要求高的場景 |
代碼示例(基于 Spring Boot + 延遲插件)
以下是使用?rabbitmq_delayed_message_exchange
?插件實現延遲隊列的簡單示例:
1. 安裝插件
# 下載插件(版本需與RabbitMQ匹配)
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.12.0/rabbitmq_delayed_message_exchange-3.12.0.ez# 復制到RabbitMQ插件目錄
cp rabbitmq_delayed_message_exchange-3.12.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.12.0/plugins/# 啟用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange# 重啟RabbitMQ
systemctl restart rabbitmq-server
2. 配置隊列和交換機(Spring Boot)
@Configuration
public class DelayQueueConfig {// 延遲交換機名稱public static final String DELAY_EXCHANGE_NAME = "delay.exchange";// 延遲隊列名稱public static final String DELAY_QUEUE_NAME = "delay.queue";// 路由鍵public static final String DELAY_ROUTING_KEY = "delay.routing.key";// 聲明延遲交換機(類型為x-delayed-message)@Beanpublic CustomExchange delayExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct"); // 指定延遲交換機的底層類型(如direct、topic等)return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", true, false, args);}// 聲明延遲隊列@Beanpublic Queue delayQueue() {return QueueBuilder.durable(DELAY_QUEUE_NAME).build();}// 綁定交換機和隊列@Beanpublic Binding delayBinding() {return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(DELAY_ROUTING_KEY).noargs();}
}
3. 發送延遲消息
@Service
public class DelayMessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendDelayMessage(String message, long delayTime) {// 發送消息時,通過x-delay頭設置延遲時間(毫秒)rabbitTemplate.convertAndSend(DelayQueueConfig.DELAY_EXCHANGE_NAME,DelayQueueConfig.DELAY_ROUTING_KEY,message,correlationData -> {correlationData.getMessageProperties().setHeader("x-delay", delayTime);return correlationData;});System.out.println("發送延遲消息:" + message + ",延遲時間:" + delayTime + "ms");}
}
4. 消費延遲消息
@Service
public class DelayMessageReceiver {@RabbitListener(queues = DelayQueueConfig.DELAY_QUEUE_NAME)public void receiveDelayMessage(String message) {System.out.println("收到延遲消息:" + message + ",時間:" + LocalDateTime.now());}
}
5. 測試
@SpringBootTest
public class DelayQueueTest {@Autowiredprivate DelayMessageSender sender;@Testpublic void testDelayMessage() {// 發送一條延遲5秒的消息sender.sendDelayMessage("訂單超時取消提醒", 5000);}
}
輸出結果:
發送延遲消息:訂單超時取消提醒,延遲時間:5000ms
(5秒后)
收到延遲消息:訂單超時取消提醒,時間:2025-08-18T15:30:05
注意事項
- 延遲精度:延遲插件的精度較高,但受 RabbitMQ 服務器負載影響,可能存在毫秒級偏差。
- 消息持久化:若需保證消息不丟失,需將隊列、交換機和消息都設置為持久化(durable)。
- 插件版本兼容:延遲插件版本需與 RabbitMQ 版本匹配,否則可能無法正常工作。
- 避免消息堆積:延遲隊列中的消息在延遲時間到達前會暫存在內存或磁盤中,需合理設置隊列容量,避免堆積過多消息影響性能。
四、事務
RabbitMQ是基于AMQP協議實現的, 該協議實現了事務機制, 因此RabbitMQ也?持事務機制. Spring AMQP也提供了對事務相關的操作. RabbitMQ事務允許開發者確保消息的發送和接收是原?性的, 要么全部成功, 要么全部失敗
rabbitTemplate.setChannelTransacted(true);
?是 Spring AMQP 中用于開啟 RabbitMQ 事務模式的配置,它會為?RabbitTemplate
?操作的信道(Channel)啟用事務支持。
相關配置
@Configuration
public class TransactionConfig {@Beanpublic RabbitTransactionManager transactionManager(CachingConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);}@Beanpublic RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true);return rabbitTemplate;}@Bean("transQueue")public Queue transQueue() {return QueueBuilder.durable("trans_queue").build();}}
生產者代碼
@RequestMapping("/trans")
@RestController
public class TransactionProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;@Transactional@RequestMapping("/send")public String send(){rabbitTemplate.convertAndSend("","transQueue", "trans teat 1......");int a = 5/0;rabbitTemplate.convertAndSend("","transQueue", "trans teat 2......");return "發送成功";}
}
- 不加 @Transactional , 會發現消息1發送成功
- 添加 @Transactional , 消息1和消息2全部發送失敗
五、消息分發
?消息分發機制主要有兩種應用場景:
- 限流
- 負載均衡
5.1 介紹
當隊列擁有多個消費者時,RabbitMQ默認會通過輪詢的方式將消息平均的分發給每個消費者,但是沒有可能其中一部分消費者消費消息的速度很快,另一部分消費者消費很慢呢?其實是有可能的,那么這就有可能導致這個系統的吞吐量下降,那如何分發消息才是合理的?在前面學習RabbitMQ JDK Client 時,我們可以通過 channel.basicQos(int prefetchCount) 來設置當前信道的消費者所能擁有的最大未確認消息數量,在Spring AMQP中我們可以通過配置 prefetch 來達到同樣的效果,使用消息分發機制時消息確認機制必須為手動確認。
?
5.2 限流
? ? ? ? 在秒殺場景下,假設訂單系統每秒能處理的訂單數是10000,但是秒殺場景下可能某一瞬間會有50000訂單數,這就會導致訂單系統處理不過來而壓垮。可以利用basicQos()來進行限流:
- ? SpringBoot配置文件用prefetch控制限流數,對應channel.basicQos(int prefetchCount)的prefetchCount。
- ? ?開啟消息確認機制的手動確認模式manual。未手動確認的消息都視為未消費完的消費,prefetchCount并不會-1。
配置信息:
spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://admin:admin@8.140.60.17:5672/xibbeilistener:simple:acknowledge-mode: manual #消息接收確認(MQ-消費者):none(自動確認)、auto(正常自動確認,異常不確認)、manual(手動確認)prefetch: 5 #控制消費者從隊列中預取(prefetch)消息的數量retry:enabled: true # 啟用重試機制initial-interval: 5000ms # 初始重試間隔5秒max-attempts: 5 # 最多重試5次
?聲明隊列和交換機:
public class RabbitMQConnection {public static final String QOS_QUEUE = "qos.queue";public static final String QOS_EXCHANGE = "qos.exchange";}@Configuration
public class QosConfig {@Bean("qosQueue")public Queue qosQueue(){return QueueBuilder.durable(Constants.QOS_QUEUE).build();}@Bean("qosExchange")public Exchange qosExchange(){return ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).build();}@Bean("qosBinding")public Binding qosBinding(@Qualifier("qosQueue") Queue queue, @Qualifier("qosExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("qos").noargs();}
}
生產者代碼:
@RestController@RequestMapping("/producer")public class ProducerController {@Resource(name = "rabbitTemplate")private RabbitTemplate rabbitTemplate;@RequestMapping("qos")public String qos() {for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend(RabbitMQConnection.QOS_EXCHANGE, "qos", "Hello SpringBoot RabbitMQ");}return "發送成功";}}
消費者代碼:
@Component
public class QosListener {@RabbitListener(queues = Constants.QOS_QUEUE)public void handMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//消費者邏輯System.out.printf("111接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());Thread.sleep(2000);
// System.out.println("業務處理完成");//肯定確認channel.basicAck(deliveryTag,false);} catch (Exception e) {//否定確認channel.basicNack(deliveryTag, false, true);}}}
5.3負載均衡
?負載均衡主要是根據不同消費者消費消息的速度來協調它們的壓力,比如一個消費者處理消息快,另一個消費者處理消息滿,那么就可以配置 prefetch(如配置prefetch為1),就可以使這些消費者還未處理完當前消息,不允許處理下一條,這樣就可以使處理消息滿的消費者可以慢慢處理一條消息,而處理消息快的消費者,可以在處理完一條消息后,繼續處理下一條
?
代碼示例:
一、修改 prefetch 配置
spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://admin:admin@8.140.60.17:5672/xibbeilistener:simple:acknowledge-mode: manual # 手動確認模式prefetch: 1 # 每次只預取1條消息
二、修改消費者代碼(取消手動確認的注釋并新增一個消費者)
@Component
public class QosListener {@RabbitListener(queues = Constants.QOS_QUEUE)public void handMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//消費者邏輯System.out.printf("111接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());Thread.sleep(2000);
// System.out.println("業務處理完成");//肯定確認channel.basicAck(deliveryTag,false);} catch (Exception e) {//否定確認channel.basicNack(deliveryTag, false, true);}}@RabbitListener(queues = Constants.QOS_QUEUE)public void handMessage2(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//消費者邏輯System.out.printf("222接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());Thread.sleep(1000);
// System.out.println("業務處理完成");//肯定確認channel.basicAck(deliveryTag,false);} catch (Exception e) {//否定確認channel.basicNack(deliveryTag, false, true);}}
}
三、測試
接收到消息: qos test...1, deliveryTag: 1
消費者2接收到消息: qos test...0, deliveryTag: 1
接收到消息: qos test...2, deliveryTag: 2
接收到消息: qos test...3, deliveryTag: 3
接收到消息: qos test...4, deliveryTag: 4
接收到消息: qos test...5, deliveryTag: 5
消費者2接收到消息: qos test...6, deliveryTag: 2
接收到消息: qos test...7, deliveryTag: 6
接收到消息: qos test...8, deliveryTag: 7
接收到消息: qos test...9, deliveryTag: 8
接收到消息: qos test...10, deliveryTag: 9
消費者2接收到消息: qos test...11, deliveryTag: 3
接收到消息: qos test...12, deliveryTag: 10
接收到消息: qos test...13, deliveryTag: 11
接收到消息: qos test...14, deliveryTag: 12
接收到消息: qos test...15, deliveryTag: 13
消費者2接收到消息: qos test...16, deliveryTag: 4
接收到消息: qos test...17, deliveryTag: 14
接收到消息: qos test...18, deliveryTag: 15
接收到消息: qos test...19, deliveryTag: 16