目錄
1. 重試機制
1.1 簡介
1.2 配置文件?
1.3 消費者確認機制為 auto 時
1.4 消費者確認機制為 manual 時
2. TTL
2.1 設置消息的過期時間
2.2 設置隊列的過期時間
2.3 給過期隊列中消息設置過期時間
1. 重試機制
1.1 簡介
在消息傳遞過程中, 可能會遇到各種問題, 如網絡故障, 服務不可用, 資源不足... 這些問題可能導致消息處理失敗. 為了解決這些問題, RabbitMQ 提供了重試機制, 允許消息在處理失敗后重新發送. 但如果是程序邏輯引起的錯誤, 那么多次重試也是沒有用的, 可以設置重試次數.
1.2 配置文件?
spring:application:name: rabbitmq-extensions-demorabbitmq:addresses: amqp://study:study@192.168.100.10:5672/extensionlistener:simple:
# acknowledge-mode: noneacknowledge-mode: auto
# acknowledge-mode: manualretry:enabled: true # 開啟消費者失敗重試initial-interval: 5000ms # 初始失敗等待時長為5秒max-attempts: 5 # 最大重試次數
//重試機制@Bean("retryQueue")public Queue retryQueue(){return QueueBuilder.durable(Constants.RETRY_QUEUE).build();}@Bean("retryExchange")public DirectExchange retryExchange(){return ExchangeBuilder.directExchange(Constants.RETRY_EXCHANGE).build();}@Bean("retryBinding")public Binding retryBinding(@Qualifier("retryQueue") Queue queue, @Qualifier("retryExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("retry").noargs();}
1.3 消費者確認機制為 auto 時
如果程序邏輯錯誤, 那么就會不斷重試, 造成消息積壓. 因此我們就需要設置重試次數, 當多次重試還是失敗, 消息就會被自動確認, 自然消息就會丟失 (這里可以弄個死信隊列, 來存儲這些丟失的消息)
生產者 :?
@RequestMapping("/retry")public String retry() {System.out.println("retry...");rabbitTemplate.convertAndSend(Constants.RETRY_EXCHANGE, "retry", "retry test...");return "消息發送成功";}
消費者 :?
@Component
public class RetryListener {@RabbitListener(queues = Constants.RETRY_QUEUE)public void handlerMessage(Message message) throws UnsupportedEncodingException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("[" + Constants.RETRY_QUEUE + "]接收到消息: %s, deliveryTag: %s \n",new String(message.getBody(), StandardCharsets.UTF_8), deliveryTag);int num = 3/0;System.out.println("業務處理完成");}
}
由于消費者代碼邏輯錯誤, 消息重發機制觸發了 4 次, 一共發了 5 次, 日志中, 每次的 deliverTag 都一樣, 表示發送的是同一個消息.??
查看隊列中消息也已經被刪除.
1.4 消費者確認機制為 manual 時
更改消費者 :?
@RabbitListener(queues = Constants.RETRY_QUEUE)public void handlerMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("[" + Constants.RETRY_QUEUE + "]接收到消息: %s, deliveryTag: %s \n",new String(message.getBody(), StandardCharsets.UTF_8), deliveryTag);try {int num = 3/0;System.out.println("業務處理完成");channel.basicAck(deliveryTag, false);}catch (Exception e){// 不成功消息重新入隊列channel.basicNack(deliveryTag, false, true);}}
這里我們發現, 雖然我們設置了重試機制, 但是 deliveryTag 還是在不斷更新... 隊列中的消息也不會消失. 原因是手動模式下, 消費者需要顯示的對消息進行確認, 如果消費者在消息處理過程中遇到異常, 可以選擇確認, 不確認消息,也可以選擇重新入隊. 所以重試的控制權不在應用程序本身, 而在于代碼邏輯本身.
- 消費者確認機制為 AUTO 時, 如果程序邏輯異常, 多次重試還是失敗. 那么消息就會自動確認, 進而消息就會丟失.
- 消費者確認機制為 MANAUL 時, 如果程序邏輯異常, 多次重試依然處理失敗, 無法被確認, 消息就會積壓.
- 消費者確認機制為 NONE 時, 不管發生什么情況, 當消息從 Broker 內部發出時, 就會自動確認, 因此它不存在任何內容
2. TTL
TTL -> time to live, 指的是過期時間.
- 給消息設置過期時間 : 當消息到達過期時間, 還沒有被消費, 就會被自動清除.
- 給隊列設置過期時間 :? 就相當于給隊列的所有消息設置了一個過期時間, 這消息的過期時間與隊列的過期時間是相同的. 從消息入隊列開始算起, 經過這個時間, 就會被刪除
2.1 設置消息的過期時間
@Bean("ttlQueue")public Queue ttlQueue(){return QueueBuilder.durable(Constants.TTL_QUEUE).build();}@Bean("ttlExchange")public DirectExchange ttlExchange(){return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).build();}@Bean("ttlBinding")public Binding ttlBinding(@Qualifier("ttlQueue") Queue queue, @Qualifier("ttlExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("ttl").noargs();}
@RequestMapping("/ttl")public String ttl() {System.out.println("ttl...");MessagePostProcessor messageInfo = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("30000"); //單位: 毫秒, 過期時間為30sreturn message;}};rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test 30s", messageInfo);rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test 10s...", message -> {message.getMessageProperties().setExpiration("10000"); //單位: 毫秒, 過期時間為10sreturn message;});return "消息發送成功";}
不需要消費者, 不然看不到消息在隊列中的自動刪除.
2.2 設置隊列的過期時間
//設置ttl@Bean("ttlQueue2")public Queue ttlQueue2(){return QueueBuilder.durable(Constants.TTL_QUEUE2).ttl(20000).build(); //設置隊列的ttl為20s}@Bean("ttlBinding2")public Binding ttlBinding2(@Qualifier("ttlQueue2") Queue queue, @Qualifier("ttlExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("ttl").noargs();}
@RequestMapping("/ttl2")public String ttl2() {System.out.println("ttl2...");//發送普通消息rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test...");return "消息發送成功";}
我們發現了消息經過 20 秒后被刪除了, 但是隊列沒有被刪除, 說明過期隊列是給消息上過期時間的. 這個隊列也有 TTL 的標志了.
2.3 給過期隊列中消息設置過期時間
如果兩種方法同時使用, 那么就以過期時間較小的值為準.
設置隊列的過期時間, 一旦消息過期, 就會從隊列中刪除.
設置消息的過期時間, 即使消息過期, 如果消息不在隊首, 還得等到消息到達隊首之后才會進行判定是否過期. 如果過期, 那就刪除, 反之就投遞到相應的消費者中.
為什么這兩種方法處理的方式不一樣?
因為設置隊列的過期時間, 那么隊列中過期的消息一定在隊首, RabbitMQ 只需要定期從隊首掃描消息是否有過期的消息即可, 而設置消息的過期時間, 每條消息的過期時間都不一致, 如果要刪除隊列的所有過期消息那么就要掃描整個隊列, 所以不如等到消息要進行投遞時再判斷消息是否過期, 這樣可以減少一定的資源消耗.