RabbitMQ高級特性——TTL、死信隊列、延遲隊列、事務、消息分發

目錄

一、TTL

1.1設置消息的TTL

1.2設置隊列的TTL

1.3兩者之間的區別

二、死信隊列

2.1死信的概念

2.2死信產生的條件:

2.3死信隊列的實現

死信隊列的工作原理

2.4常??試題

三、延遲隊列

3.1概念

3.2應用場景

3.3RabbitMQ 實現延遲隊列的核心原理

1.?基于 TTL(Time-To-Live)+ 死信隊列(Dead Letter Queue)

2.使用RabbitMQ延遲插件?

1. 安裝插件

2. 配置隊列和交換機(Spring Boot)

3. 發送延遲消息

4. 消費延遲消息

5. 測試

注意事項

四、事務

五、消息分發

5.1 介紹

5.2 限流

5.3負載均衡


一、TTL

TTL(Time to Live,過期時間)即過期時間,RabbitMQ可以對消息和隊列設置TTL過期時間

當消息到達存活的時間之后,還沒有被消費就自動清除

類似于 購物訂單超時了沒有付款,訂單被自動取消

1.1設置消息的TTL

目前有兩種方法可以設置消息的TTL

  1. 設置隊列的TTL,該隊列中的所有消息均為相同的過期時間
  2. 針對消息本身設置的TTL,每條消息TTL可以不同

但是如果兩種方法一起使用,則會根據兩種方法的較小的數據為準


針對消息本身設置的TTL方法,是在發送消息的方針中加入expiration的屬性參數(單位為毫秒)

完整代碼:

    //TTLpublic static final String TTL_QUEUE = "ttl.queue";public static final String TTL_QUEUE2 = "ttl2.queue";public static final String TTL_EXCHANGE = "ttl.exchange";@Configuration
public class TTLRabbitMQ {@Bean("ttlQueue")public Queue ttlQueue(){return QueueBuilder.durable(Constants.TTL_QUEUE).build();}@Bean("ttlExchange")public DirectExchange ttlExchange(){return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).build();}@Bean("ttlBinding")public Binding ttlBinding(@Qualifier("ttlQueue") Queue queue, @Qualifier("ttlExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("ttl").noargs();}
}@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")private RabbitTemplate rabbitTemplate;@RequestMapping("/ttl")public String ttl() {System.out.println("ttl...");rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test 30s...", message -> {message.getMessageProperties().setExpiration("30000");  //單位: 毫秒, 過期時間為30sreturn message;});rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test 10s...", message -> {message.getMessageProperties().setExpiration("10000");  //單位: 毫秒, 過期時間為10sreturn message;});return "消息發送成功";}
}

運行程序,觀察結果

1.可以發現當發送四條消息,兩個隊列的Ready消息均為4,

2.10秒鐘之后, 刷新??, 發現ttl2隊列的消息已被刪除(再過20秒ttl隊列消息也會被刪除)

  • 如果不設置TTL,則表?此消息不會過期;如果將TTL設置為0,則表?除?此時可以直接將消息投遞到 消費者,否則該消息會被?即丟棄

1.2設置隊列的TTL

設置隊列TTL的方法是在創建隊列時,加入x-messahe-ttl參數實現(單位是毫米)

完整代碼:

    //TTLpublic static final String TTL_QUEUE = "ttl.queue";public static final String TTL_QUEUE2 = "ttl2.queue";public static final String TTL_EXCHANGE = "ttl.exchange";//設置ttl@Bean("ttlQueue2")public Queue ttlQueue2(){return QueueBuilder.durable(Constants.TTL_QUEUE2).ttl(20000).build();  //設置隊列的ttl為20s}@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")private RabbitTemplate rabbitTemplate;@RequestMapping("/ttl2")public String ttl2() {System.out.println("ttl2...");//發送普通消息rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test...");return "消息發送成功";}
}

啟動程序,驗證結果:

運?之后發現,新增了?個隊列, 隊列有?個TTL標識

1.發送消息后, 可以看到, Ready消息為1
  • 采?發布訂閱模式, 所有與該交換機綁定的隊列(ttl_queue和ttl_queue2)都會收到消息

2.20秒鐘之后, 刷新??, 發現消息已被刪除

  • 由于ttl_queue隊列, 未設置過期時間, 所以ttl_queue的消息未刪除

1.3兩者之間的區別

  • 設置隊列TTL屬性的?法, ?旦消息過期, 就會從隊列中刪除
  • 設置消息TTL的?法, 即使消息過期, 也不會?上從隊列中刪除, ?是在即將投遞到消費者之前進?判定的.
為什么這兩種?法處理的?式不?樣?
  1. 因為設置隊列過期時間, 隊列中已過期的消息肯定在隊列頭部, RabbitMQ只要定期從隊頭開始掃描是否 有過期的消息即可.
  2. ?設置消息TTL的?式, 每條消息的過期時間不同, 如果要刪除所有過期消息需要掃描整個隊列, 所以不如等到此消息即將被消費時再判定是否過期, 如果過期再進?刪除即可

二、死信隊列

2.1死信的概念

  • 死信簡單理解就是因為種種原因,無法被消費的消息,就是死信
  • 有死信,自然就有死信隊列.當消息在一個隊列中變成死信之后,它能被重新被發送到另一個交換器中,這個交換器就是DLX(Dead Letter Exchange),綁定DLX的隊列,就稱為死信隊列(Dead LetterQueue,簡稱DLQ).
  • RabbitMQ 的死信隊列(Dead Letter Queue,DLQ)是一種特殊的隊列,用于存儲無法被正常消費的消息(即 "死信")。當消息滿足特定條件時,會被從原隊列轉發到死信隊列,這有助于消息的可靠性處理和問題排查。

2.2死信產生的條件:

消息變成死信一般是由一下幾種情況:

  1. 消息被拒絕(Basic.Reject / Basic.Nack),并且設置了requeue 參數為false
  2. 消息過期
  3. 隊列達到最大長度

2.3死信隊列的實現

死信隊列的工作原理

  1. 為普通隊列設置死信交換機(Dead Letter Exchange,DLX)
  2. 當消息成為死信時,RabbitMQ 會自動將其發送到設置的死信交換機
  3. 死信交換機通過綁定關系將消息路由到死信隊列
  4. 可以專門創建消費者處理死信隊列中的消息

注意:死信隊列和死信交換機 與 普通隊列和普通交換機沒有區別。

實現代碼:

    //死信public static final String NORMAL_QUEUE = "normal.queue";public static final String NORMAL_EXCHANGE = "normal.exchange";public static final String DL_QUEUE = "dl.queue";public static final String DL_EXCHANGE= "dl.exchange";//死信相關配置@Configuration
public class DLConfig {//演示TTL+死信隊列模擬的延遲隊列存在問題//制造死信產生的條件@Bean("normalQueue")public Queue normalQueue(){return QueueBuilder.durable(Constants.NORMAL_QUEUE) //綁定普通隊列.deadLetterExchange(Constants.DL_EXCHANGE) //綁定死信交換機.deadLetterRoutingKey("dlx")       //綁定死信路由鍵//制造死信條件如下兩種.ttl(10000) //設置TTL 10秒.maxLength(10L) //設置隊列最大長度.build();}/*** 創建正常隊列* 設置死信交換機和死信路由鍵* @return Queue*/
//    @Bean("normalQueue")
//    public Queue normalQueue(){
//        return QueueBuilder.durable(Constants.NORMAL_QUEUE)
//                .deadLetterExchange(Constants.DL_EXCHANGE)
//                .deadLetterRoutingKey("dlx")
//                .build();
//    }/*** 創建正常交換機* @return DirectExchange*/@Bean("normalExchange")public DirectExchange normalExchange(){return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build();}/*** 綁定正常隊列到正常交換機* @param queue 正常隊列* @param exchange 正常交換機* @return Binding*/@Bean("normalBinding")public Binding normalBinding(@Qualifier("normalQueue") Queue queue, @Qualifier("normalExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("normal").noargs();}//死信交換機和隊列/*** 創建死信隊列* @return Queue*/@Bean("dlQueue")public Queue dlQueue(){return QueueBuilder.durable(Constants.DL_QUEUE).build();}/*** 創建死信交換機* @return DirectExchange*/@Bean("dlExchange")public DirectExchange dlExchange(){return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();}/*** 綁定死信隊列到死信交換機* @param queue 死信隊列* @param exchange 死信交換機* @return Binding*/@Bean("dlBinding")public Binding dlBinding(@Qualifier("dlQueue") Queue queue, @Qualifier("dlExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("dlx").noargs();}
}

發布消息:


@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")private RabbitTemplate rabbitTemplate;/*** 測試死信* @return*/@RequestMapping("/dl")public String dl() {System.out.println("dl...");//發送普通消息
//        rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "dl test...");System.out.printf("%tc 消息發送成功 \n", new Date());
//        //測試隊列長度for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "dl test..."+i);}return "消息發送成功";}
}

測試結果:

  • 已知普通隊列長度為10TTL為10秒,當發送20條消息后,觀察隊列的情況
  • 當啟動生產者時,可以看到兩個隊列均有十條消息,但因為超出普通隊列長度,導致后十條消息會直接到達死信隊列,等待10后再次觀察,發現原有的消息也會給死信隊列

2.4常??試題

死信隊列作為RabbitMQ的?級特性,也是?試的??重點.
1.死信隊列的概念
  • 死信(Dead Letter)是消息隊列中的?種特殊消息, 它指的是那些?法被正常消費或處理的消息,在消息隊列系統中, 如RabbitMQ, 死信隊列?于存儲這些死信消息
2.死信的來源
  • 消息過期: 消息在隊列中存活的時間超過了設定的TTL
  • 消息被拒絕: 消費者在處理消息時, 可能因為消息內容錯誤, 處理邏輯異常等原因拒絕處理該消息. 如果拒絕時指定不重新?隊(requeue=false), 消息也會成為死信.
  • 隊列滿了: 當隊列達到最??度, ?法再容納新的消息時, 新來的消息會被處理為死信.
3.死信隊列的應?場景
  • 消息重試機制:對于處理失敗的消息,可以在死信隊列中進行重試
  • 延遲任務:利用消息過期時間,實現延遲任務(如訂單超時取消)
  • 異常監控:通過監控死信隊列,及時發現系統問題
  • 數據恢復:死信隊列可作為消息的備份,便于數據恢復

三、延遲隊列

3.1概念

在 RabbitMQ 中,延遲隊列(Delay Queue)?是一種特殊的消息隊列,用于存儲需要在指定時間后才被消費的消息。它的核心作用是實現消息的 “延時投遞”,即消息發送后不會立即被消費者處理,而是等待預設的延遲時間后才進入消費流程。

3.2應用場景

  1. 訂單超時取消:用戶下單后,若 30 分鐘內未支付,自動取消訂單并釋放庫存。
  2. 定時任務觸發:例如每天凌晨 2 點執行數據備份、定時發送提醒消息等。
  3. 失敗重試機制:當某個操作失敗時,延遲一段時間后重試(如接口調用失敗后,5 分鐘后再次嘗試)。
  4. 消息通知延遲:如用戶注冊成功后,1 小時后發送新手引導郵件。

3.3RabbitMQ 實現延遲隊列的核心原理

RabbitMQ 本身沒有直接提供 “延遲隊列” 的功能,但可以通過以下兩種方式間接實現:

1.?基于 TTL(Time-To-Live)+ 死信隊列(Dead Letter Queue)

這是最常用的實現方式,利用了 RabbitMQ 的兩個特性:

  • TTL(消息存活時間):設置消息的過期時間,當消息超過該時間未被消費時,會變成 “死信”(Dead Letter)。
  • 死信隊列(DLQ):為隊列配置 “死信交換機(Dead Letter Exchange)”,當消息成為死信后,會被自動路由到死信交換機綁定的隊列(即死信隊列),消費者從死信隊列中獲取消息,實現延遲效果。
@Configuration
public class DelayConfig {//延遲隊列public static final String DELAY_QUEUE = "delay.queue";public static final String DELAY_EXCHANGE = "delay.exchange";@Bean("delayQueue")public Queue delayQueue(){return QueueBuilder.durable(Constants.DELAY_QUEUE).build();}@Bean("delayExchange")public Exchange delayExchange(){return ExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE).delayed().build();}@Bean("delayBinding")public Binding delayBinding(@Qualifier("delayQueue") Queue queue, @Qualifier("delayExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("delay").noargs();}
}@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")private RabbitTemplate rabbitTemplate;* 延遲隊列測試 - 使用 TTL 實現延遲效果* 通過設置消息的過期時間來模擬延遲隊列的效果* @return 發送結果提示信息*/@RequestMapping("/delay")public String delay() {System.out.println("delay...");rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "ttl test 10s...", message -> {message.getMessageProperties().setExpiration("10000");  //單位: 毫秒, 過期時間為10sreturn message;});rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "ttl test 30s...", message -> {message.getMessageProperties().setExpiration("30000");  //單位: 毫秒, 過期時間為30sreturn message;});System.out.printf("%tc 消息發送成功 \n", new Date());return "消息發送成功";}/*** 延遲隊列測試 - 使用 RabbitMQ 延遲插件實現延遲效果* 利用 RabbitMQ 的延遲插件來精確控制消息的延遲時間* @return 發送結果提示信息*/@RequestMapping("/delay2")public String delay2() {System.out.println("delay2...");rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay", "delay test 30s...", message -> {message.getMessageProperties().setDelayLong(30000L);  //單位: 毫秒, 延遲時間為30sreturn message;});rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay", "delay test 10s...", message -> {message.getMessageProperties().setDelayLong(10000L);  //單位: 毫秒, 延遲時間為10sreturn message;});System.out.printf("%tc 消息發送成功 \n", new Date());return "消息發送成功";}}

2.使用RabbitMQ延遲插件?

RabbitMQ 官方提供了一個插件?rabbitmq_delayed_message_exchange,專門用于實現延遲隊列,功能更強大且靈活。

特點

  • 支持為每個消息單獨設置延遲時間(無需依賴死信隊列)。
  • 延遲精度更高,避免了 TTL + 死信隊列方式中 “消息堆積導致的延遲偏差” 問題。

實現步驟

  • 安裝插件:在 RabbitMQ 服務器上安裝?rabbitmq_delayed_message_exchange?插件(需重啟 RabbitMQ 生效)。
  • 聲明一個類型為?x-delayed-message?的交換機,并指定延遲類型(如?x-delayed-type: direct)。
  • 發送消息時,通過?x-delay?頭部字段設置延遲時間(單位:毫秒)。
  • 交換機在延遲時間到達后,會自動將消息路由到綁定的隊列,消費者直接從隊列中獲取消息。

兩種實現方式的對比

實現方式優點缺點適用場景
TTL + 死信隊列無需額外插件,依賴 RabbitMQ 原生特性1. 隊列級 TTL 不支持單消息單獨設置延遲;
2. 消息堆積可能導致延遲時間不準
延遲時間固定、精度要求不高的場景
延遲插件(推薦)支持單消息單獨設置延遲,精度高需要額外安裝插件延遲時間靈活、精度要求高的場景

代碼示例(基于 Spring Boot + 延遲插件)

以下是使用?rabbitmq_delayed_message_exchange?插件實現延遲隊列的簡單示例:

1. 安裝插件
# 下載插件(版本需與RabbitMQ匹配)
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.12.0/rabbitmq_delayed_message_exchange-3.12.0.ez# 復制到RabbitMQ插件目錄
cp rabbitmq_delayed_message_exchange-3.12.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.12.0/plugins/# 啟用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange# 重啟RabbitMQ
systemctl restart rabbitmq-server
2. 配置隊列和交換機(Spring Boot)
@Configuration
public class DelayQueueConfig {// 延遲交換機名稱public static final String DELAY_EXCHANGE_NAME = "delay.exchange";// 延遲隊列名稱public static final String DELAY_QUEUE_NAME = "delay.queue";// 路由鍵public static final String DELAY_ROUTING_KEY = "delay.routing.key";// 聲明延遲交換機(類型為x-delayed-message)@Beanpublic CustomExchange delayExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct"); // 指定延遲交換機的底層類型(如direct、topic等)return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", true, false, args);}// 聲明延遲隊列@Beanpublic Queue delayQueue() {return QueueBuilder.durable(DELAY_QUEUE_NAME).build();}// 綁定交換機和隊列@Beanpublic Binding delayBinding() {return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(DELAY_ROUTING_KEY).noargs();}
}
3. 發送延遲消息
@Service
public class DelayMessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendDelayMessage(String message, long delayTime) {// 發送消息時,通過x-delay頭設置延遲時間(毫秒)rabbitTemplate.convertAndSend(DelayQueueConfig.DELAY_EXCHANGE_NAME,DelayQueueConfig.DELAY_ROUTING_KEY,message,correlationData -> {correlationData.getMessageProperties().setHeader("x-delay", delayTime);return correlationData;});System.out.println("發送延遲消息:" + message + ",延遲時間:" + delayTime + "ms");}
}
4. 消費延遲消息
@Service
public class DelayMessageReceiver {@RabbitListener(queues = DelayQueueConfig.DELAY_QUEUE_NAME)public void receiveDelayMessage(String message) {System.out.println("收到延遲消息:" + message + ",時間:" + LocalDateTime.now());}
}
5. 測試
@SpringBootTest
public class DelayQueueTest {@Autowiredprivate DelayMessageSender sender;@Testpublic void testDelayMessage() {// 發送一條延遲5秒的消息sender.sendDelayMessage("訂單超時取消提醒", 5000);}
}

輸出結果

發送延遲消息:訂單超時取消提醒,延遲時間:5000ms
(5秒后)
收到延遲消息:訂單超時取消提醒,時間:2025-08-18T15:30:05

注意事項

  1. 延遲精度:延遲插件的精度較高,但受 RabbitMQ 服務器負載影響,可能存在毫秒級偏差。
  2. 消息持久化:若需保證消息不丟失,需將隊列、交換機和消息都設置為持久化(durable)。
  3. 插件版本兼容:延遲插件版本需與 RabbitMQ 版本匹配,否則可能無法正常工作。
  4. 避免消息堆積:延遲隊列中的消息在延遲時間到達前會暫存在內存或磁盤中,需合理設置隊列容量,避免堆積過多消息影響性能。

四、事務

RabbitMQ是基于AMQP協議實現的, 該協議實現了事務機制, 因此RabbitMQ也?持事務機制. Spring AMQP也提供了對事務相關的操作. RabbitMQ事務允許開發者確保消息的發送和接收是原?性的, 要么全部成功, 要么全部失敗

rabbitTemplate.setChannelTransacted(true);?是 Spring AMQP 中用于開啟 RabbitMQ 事務模式的配置,它會為?RabbitTemplate?操作的信道(Channel)啟用事務支持。

相關配置

@Configuration
public class TransactionConfig {@Beanpublic RabbitTransactionManager transactionManager(CachingConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);}@Beanpublic RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true);return rabbitTemplate;}@Bean("transQueue")public Queue transQueue() {return QueueBuilder.durable("trans_queue").build();}}

生產者代碼

@RequestMapping("/trans")
@RestController
public class TransactionProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;@Transactional@RequestMapping("/send")public String send(){rabbitTemplate.convertAndSend("","transQueue", "trans teat 1......");int a = 5/0;rabbitTemplate.convertAndSend("","transQueue", "trans teat 2......");return "發送成功";}
}
  1. 不加 @Transactional , 會發現消息1發送成功
  2. 添加 @Transactional , 消息1和消息2全部發送失敗

五、消息分發

?消息分發機制主要有兩種應用場景:

  1. 限流
  2. 負載均衡

5.1 介紹

當隊列擁有多個消費者時,RabbitMQ默認會通過輪詢的方式將消息平均的分發給每個消費者,但是沒有可能其中一部分消費者消費消息的速度很快,另一部分消費者消費很慢呢?其實是有可能的,那么這就有可能導致這個系統的吞吐量下降,那如何分發消息才是合理的?在前面學習RabbitMQ JDK Client 時,我們可以通過 channel.basicQos(int prefetchCount) 來設置當前信道的消費者所能擁有的最大未確認消息數量,在Spring AMQP中我們可以通過配置 prefetch 來達到同樣的效果,使用消息分發機制時消息確認機制必須為手動確認。
?

5.2 限流

? ? ? ? 在秒殺場景下,假設訂單系統每秒能處理的訂單數是10000,但是秒殺場景下可能某一瞬間會有50000訂單數,這就會導致訂單系統處理不過來而壓垮。可以利用basicQos()來進行限流:

  1. ? SpringBoot配置文件用prefetch控制限流數,對應channel.basicQos(int prefetchCount)的prefetchCount。
  2. ? ?開啟消息確認機制的手動確認模式manual。未手動確認的消息都視為未消費完的消費,prefetchCount并不會-1。

配置信息:

spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://admin:admin@8.140.60.17:5672/xibbeilistener:simple:acknowledge-mode: manual #消息接收確認(MQ-消費者):none(自動確認)、auto(正常自動確認,異常不確認)、manual(手動確認)prefetch: 5 #控制消費者從隊列中預取(prefetch)消息的數量retry:enabled: true           # 啟用重試機制initial-interval: 5000ms # 初始重試間隔5秒max-attempts: 5         # 最多重試5次

?聲明隊列和交換機:

public class RabbitMQConnection {public static final String QOS_QUEUE = "qos.queue";public static final String QOS_EXCHANGE = "qos.exchange";}@Configuration
public class QosConfig {@Bean("qosQueue")public Queue qosQueue(){return QueueBuilder.durable(Constants.QOS_QUEUE).build();}@Bean("qosExchange")public Exchange qosExchange(){return ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).build();}@Bean("qosBinding")public Binding qosBinding(@Qualifier("qosQueue") Queue queue, @Qualifier("qosExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("qos").noargs();}
}

生產者代碼:

@RestController@RequestMapping("/producer")public class ProducerController {@Resource(name = "rabbitTemplate")private RabbitTemplate rabbitTemplate;@RequestMapping("qos")public String qos() {for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend(RabbitMQConnection.QOS_EXCHANGE, "qos", "Hello SpringBoot RabbitMQ");}return "發送成功";}}

消費者代碼:

@Component
public class QosListener {@RabbitListener(queues = Constants.QOS_QUEUE)public void handMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//消費者邏輯System.out.printf("111接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());Thread.sleep(2000);
//            System.out.println("業務處理完成");//肯定確認channel.basicAck(deliveryTag,false);} catch (Exception e) {//否定確認channel.basicNack(deliveryTag, false, true);}}}

5.3負載均衡

?負載均衡主要是根據不同消費者消費消息的速度來協調它們的壓力,比如一個消費者處理消息快,另一個消費者處理消息滿,那么就可以配置 prefetch(如配置prefetch為1),就可以使這些消費者還未處理完當前消息,不允許處理下一條,這樣就可以使處理消息滿的消費者可以慢慢處理一條消息,而處理消息快的消費者,可以在處理完一條消息后,繼續處理下一條
?

代碼示例:

一、修改 prefetch 配置

spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://admin:admin@8.140.60.17:5672/xibbeilistener:simple:acknowledge-mode: manual  # 手動確認模式prefetch: 1               # 每次只預取1條消息

二、修改消費者代碼(取消手動確認的注釋并新增一個消費者)

@Component
public class QosListener {@RabbitListener(queues = Constants.QOS_QUEUE)public void handMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//消費者邏輯System.out.printf("111接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());Thread.sleep(2000);
//            System.out.println("業務處理完成");//肯定確認channel.basicAck(deliveryTag,false);} catch (Exception e) {//否定確認channel.basicNack(deliveryTag, false, true);}}@RabbitListener(queues = Constants.QOS_QUEUE)public void handMessage2(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//消費者邏輯System.out.printf("222接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());Thread.sleep(1000);
//            System.out.println("業務處理完成");//肯定確認channel.basicAck(deliveryTag,false);} catch (Exception e) {//否定確認channel.basicNack(deliveryTag, false, true);}}
}

三、測試

接收到消息: qos test...1, deliveryTag: 1
消費者2接收到消息: qos test...0, deliveryTag: 1
接收到消息: qos test...2, deliveryTag: 2
接收到消息: qos test...3, deliveryTag: 3
接收到消息: qos test...4, deliveryTag: 4
接收到消息: qos test...5, deliveryTag: 5
消費者2接收到消息: qos test...6, deliveryTag: 2
接收到消息: qos test...7, deliveryTag: 6
接收到消息: qos test...8, deliveryTag: 7
接收到消息: qos test...9, deliveryTag: 8
接收到消息: qos test...10, deliveryTag: 9
消費者2接收到消息: qos test...11, deliveryTag: 3
接收到消息: qos test...12, deliveryTag: 10
接收到消息: qos test...13, deliveryTag: 11
接收到消息: qos test...14, deliveryTag: 12
接收到消息: qos test...15, deliveryTag: 13
消費者2接收到消息: qos test...16, deliveryTag: 4
接收到消息: qos test...17, deliveryTag: 14
接收到消息: qos test...18, deliveryTag: 15
接收到消息: qos test...19, deliveryTag: 16
deliveryTag 有重復是因為兩個消費者使?的是不同的Channel, 每個 Channel 上的
deliveryTag 是獨?計數的

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/93771.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/93771.shtml
英文地址,請注明出處:http://en.pswp.cn/web/93771.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

神經網絡設計中關于BN歸一化(Normalization)的討論

在神經網絡的結構中&#xff0c;我們常常可以看見歸一化&#xff08;Normalization&#xff09;如BN的出現&#xff0c;無論是模型的backbone或者是neck的設計都與它有著重大的關系。 因此引發了我對它的思考&#xff0c;接下來我將從 是什么&#xff08;知識領域&#xff0c;誕…

MacOS 安全機制與“文件已損壞”排查完整指南

1. 背景說明macOS 為了保護系統安全&#xff0c;內置了多個安全機制&#xff1a;機制作用是否影響第三方 AppSIP (System Integrity Protection)保護系統關鍵文件/目錄不被篡改高風險 App/驅動可能受限Gatekeeper限制未簽名/未認證 App 運行阻止“未知開發者” App文件隔離屬性…

package.json文件中的devDependencies和dependencies對象有什么區別?

前端項目的package.json文件中&#xff0c;dependencies和devDependencies對象都用于指定項目所依賴的軟件包&#xff0c;但它們在項目的開發和生產環境中的使用有所不同。1.dependencies&#xff1a;dependencies是指定項目在生產環境中運行所需要的依賴項。這些依賴項通常包括…

【最新版】CRMEB Pro版v3.4系統源碼全開源+PC端+uniapp前端+搭建教程

一.系統介紹 crmebPro版 v3.4正式發布&#xff0c;智能任務推送、動態標簽管理、商城AI生產力&#xff0c;煥然一新&#xff0c;不負期待&#xff01;頁面DIY設計功能全面升級&#xff0c;組件更豐富&#xff0c;樣式設計更全面&#xff1b;移動端商家管理&#xff0c;讓商城管…

AI 浪潮下 IT 從業者的職業展望:替代之惑與轉型之道

一、引言1.1 科技變革的浪潮&#xff1a;AI 崛起與 IT 行業震蕩在當今科技飛速發展的時代&#xff0c;人工智能&#xff08;AI&#xff09;無疑是最具影響力的變革力量之一。從實驗室的前沿研究到廣泛的商業應用&#xff0c;AI 以驚人的速度滲透到各個領域&#xff0c;徹底改變…

DSP音頻算法移植優化工程師實戰

以下以音頻FIR濾波器算法為例&#xff0c;完整演示從MATLAB原型 → Python驗證 → TI DSP C語言移植優化的全流程&#xff0c;包含關鍵代碼和優化技巧&#xff1a;關鍵優化技術解析&#xff1a; 內存訪問優化使用#pragma DATA_ALIGN確保64位對齊&#xff08;滿足LDDW指令要求&a…

Spark 運行流程核心組件(三)任務執行

一、啟動模式 1、standalone資源申請&#xff1a;Driver向Master申請Executor資源Executor啟動&#xff1a;Master調度Worker啟動Executor注冊通信&#xff1a;Executor直接向Driver注冊 2、YARNDriver向YARN ResourceManager(RM)申請AM容器RM分配NodeManager(NM)啟動AM&#x…

rabbitmq發送的延遲消息時間過長就立即消費了

RabbitMQ延遲消息在設置過長時間后被立即消費的問題&#xff0c;通常與以下原因有關&#xff1a; TTL限制問題 RabbitMQ對消息TTL(Time To Live)有32位整數限制(0-4294967295毫秒)&#xff0c;約49.7天。超過該值的延遲時間會導致消息立即被消費解決方案&#xff1a;確保設置的…

kafka的pull的依據

1. 每次 pull() 是否必須在提交上一批消息的 offset 之后&#xff1f;絕對不需要&#xff01; 提交 offset 和調用 poll() (拉取消息) 是兩個完全獨立的行為。消費者可以連續調用 poll() 多次&#xff0c;期間完全不提交任何 offset。 這是 Kafka 消費者的正常工作模式。提交 o…

學習嵌入式的第二十一天——數據結構——鏈表

單向鏈表特點&#xff1a;存儲的內存空間不連續 。為了彌補順序存儲存劣勢。優勢 插入&#xff0c;刪除 O(1) 動態存儲 &#xff0c;在程序運行期間決定大小。劣勢&#xff1a; 不能隨機訪問 O(N) 節點-> 數據域指針域 順序表(數組) 只有數據域鏈表的操作代碼&#xff1…

Rust Web 全棧開發(十三):發布

Rust Web 全棧開發&#xff08;十三&#xff09;&#xff1a;發布Rust Web 全棧開發&#xff08;十三&#xff09;&#xff1a;發布發布 teacher_service發布 svr測試 teacher_service 和 svr發布 wasm-client測試 wasm-clientRust Web 全棧開發&#xff08;十三&#xff09;&a…

Zephyr 中的 bt_le_per_adv_set_data 函數的介紹和應用方法

目錄 概述 1 函數接口介紹 1.1 函數原型 1.2 功能詳解 2 使用方法 2.1 創建流程 2.1.1 創建擴展廣播實例 2.1.2 設置周期性廣播數據 2.1.3 配置周期性廣播參數 2.1.4 啟動廣播 2.2 主流程函數 2.3 關鍵配置 (prj.conf) 3 高級用法 3.1 大數據分片傳輸 3.2 動態數…

Ansible 角色管理指南

Ansible 角色管理指南 實驗環境設置 以下命令用于準備實驗環境&#xff0c;創建一個工作目錄并配置基本的Ansible設置&#xff1a; # 創建web工作目錄并進入 [azurewhiskycontroller ~]$ mkdir web && cd web# 創建Ansible配置文件 [azurewhiskycontroller web]$ cat &…

【補充】數據庫中有關系統編碼和校驗規則的簡述

一、字符集和校驗規則&#xfeff;1.創建數據庫案例數據庫創建方法&#xff1a;使用CREATE DATABASE語句創建數據庫字符集指定方式&#xff1a;通過CHARACTER SETutf8指定數據庫編碼格式默認配置說明&#xff1a;未指定字符集時默認使用utf8和utf8_general_ci配置文件位置&…

計算機網絡 HTTP1.1、HTTP2、HTTP3 的核心對比及性能分析

以下是 HTTP/1.1、HTTP/2、HTTP/3 的核心對比及性能分析&#xff0c;重點關注 HTTP/3 的性能優勢&#xff1a;&#x1f4ca; HTTP 協議演進對比表特性HTTP/1.1 (1997)HTTP/2 (2015)HTTP/3 (2022)傳輸層協議TCPTCPQUIC (基于 UDP)連接建立TCP 三次握手 TLS 握手 (高延遲)同 HTT…

【計算機視覺與深度學習實戰】07基于Hough變換的答題卡識別技術:原理、實現與生物識別拓展(有完整代碼)

1. 引言 在人工智能和計算機視覺快速發展的今天,自動化圖像識別技術已經滲透到社會生活的各個角落。從工業質檢到醫學影像分析,從自動駕駛到教育評估,計算機視覺技術正在重塑我們與數字世界的交互方式。在這眾多應用中,答題卡識別技術作為教育信息化的重要組成部分,承載著…

《WASM驅動本地PDF與Excel預覽組件的深度實踐》

WASM為何能成為本地文件解析的核心載體,首先需要跳出“前端只能處理輕量任務”的固有認知,從“性能與兼容性平衡”的角度切入。PDF與Excel這類文件格式的解析,本質是對復雜二進制數據的解碼與重構——PDF包含嵌套的對象結構、字體渲染規則和矢量圖形描述,Excel則涉及單元格…

Oracle Free 實例重裝系統操作指南

之前申請了兩臺 x86 架構的 Oracle 機器&#xff0c;偶爾用來部署開源項目測試&#xff0c;有一臺在測試 SSH 相關功能時 “變磚”&#xff0c;網上看重裝系統發現很繁瑣就沒去打理&#xff0c;近期又想到這個機器&#xff0c;發現去年就有了官方重裝方法&#xff0c;簡單配置下…

Linux 基礎指令與權限管理

一、Linux 操作系統概述1.1 操作系統的核心價值操作系統的本質是 "使計算機更好用"。它作為用戶與硬件之間的中間層&#xff0c;負責內存管理、進程調度、文件系統管理和設備驅動管理等核心功能&#xff0c;讓用戶無需直接操作硬件即可完成復雜任務。在服務器領域&am…

深度學習-167-MCP技術之工具函數的設計及注冊到MCP服務器的兩種方式

文章目錄 1 MCP協議概述 1.1 MCP的原理 1.2 兩種主要的通信模式 2 工具函數的設計與實現 2.1 tools.py(工具函數) 2.2 工具函數的設計原則 2.3 工具函數的測試 3 MCP服務器的構建與配置 3.1 安裝mcp庫 3.2 main.py(MCP服務器) 3.2.1 方式一(add_tool方法) 3.2.2 方式二(@mcp.to…