一、重試機制
? ? ? 在消息傳輸過程中,可能遇到各種問題,如網絡故障,服務器不可用等,這些問題可能導致消息處理失敗,因此RabbitMQ提供了重試機制,允許消息處理失敗后重新發送,但是,如果是因為程序邏輯發生的錯誤,那么重試多次也是無用的,因此重試機制可以設置重試次數。
1.1 重試配置
spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: auto #消息接收確認 retry:enabled: true # 開啟消費者失敗重試 initial-interval: 5000ms # 初始失敗等待時?為5秒 max-attempts: 5 # 最?重試次數(包括??消費的?次)
2.2 配置交換機、隊列
(1)配置交換機、隊列、及綁定關系:
/** 重試機制*/@Bean("retryQueue")public Queue retryQueue(){return QueueBuilder.durable(Constants.RETRY_QUEUE).build();}@Bean("retryExchange")public DirectExchange retryExchange(){return ExchangeBuilder.directExchange(Constants.RETRY_EXCHANGE).build();}@Bean("retryBinding")public Binding retryBinding(@Qualifier("retryQueue") Queue queue,@Qualifier("retryExchange") DirectExchange directExchange){return BindingBuilder.bind(queue).to(directExchange).with("retry");}
(2)生產者
/** 重試機制*/@RequestMapping("/retry")public String retry(){rabbitTemplate.convertAndSend(Constants.RETRY_EXCHANGE,"retry","retry test...");return "消息發送成功";}
(3)消費者
@Component
public class RetryListener {@RabbitListener(queues = Constants.RETRY_QUEUE)public void handlerMessage(Message message) throws UnsupportedEncodingException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("[" + Constants.RETRY_QUEUE + "]接收到消息: %s," +" deliveryTag: %S \n",new String(message.getBody(),"UTF-8"),deliveryTag);int num = 10/0;System.out.println("業務處理完成");}
}
2.3 測試
? ? ?可以看到,重試后還是未能正常消費消息,拋出異常,需要注意的是,如果手動處理異常,是不會觸發重試的,如:
@Component
public class RetryListener {@RabbitListener(queues = Constants.RETRY_QUEUE)public void handlerMessage(Message message) throws UnsupportedEncodingException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try{System.out.printf("[" + Constants.RETRY_QUEUE + "]接收到消息: %s," +" deliveryTag: %S \n",new String(message.getBody(),"UTF-8"),deliveryTag);int num = 10/0;System.out.println("業務處理完成");}catch (Exception e){System.out.println("業務處理失敗");}}
再次測試代碼:
沒有觸發重試
2.4 重試注意事項
1. 自動確認模式 : 程序邏輯異常, 多次重試還是失敗, 消息就會被自動確認, 那么消息就丟失 了
2. 手動確認模式:程序邏輯異常, 多次重試消息依然處理失敗, 無法被確認, 就?直是 unacked的狀態, 導致消息積壓
?二、TTL 機制
? ? ? ?TTL 即 Time To Live(過期時間), RabbitMQ可以對消息或隊列設置過期時間,當消息過期后,就會被自動清除,無論是對消息設置TTL還是對隊列設置TTL,本質上都是設置消息的TTL
?
2.1 設置消息的TTL
一、準備工作(聲明隊列、交換機)
//未設置TTL的queue@Bean("ttlQueue")public Queue ttlQueue(){return QueueBuilder.durable(Constants.TTL_QUEUE).build();}@Bean("ttlExchange")public DirectExchange ttlExchange(){return ExchangeBuilder.directExchange(Constants.TTL_exchange).build();}@Bean("ttlBinding")public Binding ttlBinding(@Qualifier("ttlQueue") Queue queue,@Qualifier("ttlExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with("ttl");}
?二、如何設置消息的TTL
設置消息的TTL是在發送消息是設置的,通過下面這個方法來發送:
public void convertAndSend(String exchange, String routingKey, final Object message,final MessagePostProcessor messagePostProcessor)
? ? ?這個方法比前面使用的方法多了一個參數,就是通過這個messagePostProcessor來設置消息的TTL,只需要在發送消息前,構造一個MessagePostProcesser對象并傳入即可:
@RequestMapping("/ttl")public String ttl(){MessagePostProcessor messagePostProcessor = message -> {message.getMessageProperties().setExpiration("10000");return message;};rabbitTemplate.convertAndSend(Constants.TTL_exchange,"ttl" ,"ttl test 10s...",messagePostProcessor);return "消息發送成功";}
三、測試代碼
10s后:
消息已近被清除
四、設置消息的TTL存在的問題
? ? ?大家都知道,隊列滿足先進先出的特性,那么如果先發送一條TTL為30s的消息,再發送一條TTL為10s的消息,那么當10s后,后進隊列的那條消息是否會被移除?
? ? 不妨測試一下:
@RequestMapping("/ttl")public String ttl(){MessagePostProcessor messagePostProcessor = message -> {message.getMessageProperties().setExpiration("10000");return message;};MessagePostProcessor messagePostProcessor2 = message -> {message.getMessageProperties().setExpiration("30000");return message;};rabbitTemplate.convertAndSend(Constants.TTL_exchange,"ttl" ,"ttl test 30s...",messagePostProcessor2);rabbitTemplate.convertAndSend(Constants.TTL_exchange,"ttl" ,"ttl test 10s...",messagePostProcessor);return "消息發送成功";}
? ? 運行程序:
為了解決這個問題,我們可以對隊列設置TTL
2.2 設置隊列的TTL
?一、聲明隊列,綁定交換機(綁定在前面聲明的交換機上即可)
//設置TTL的queue@Bean("ttlQueue2")public Queue ttlQueue2(){return QueueBuilder.durable(Constants.TTL_QUEUE2).ttl(20*1000).build();//設置隊列TTL為20s}@Bean("ttlBinding2")public Binding ttlBinding2(@Qualifier("ttlQueue2") Queue queue,@Qualifier("ttlExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with("ttl2");}
二、如何對隊列設置TTL?
其實在上面聲明隊列時已經設置了,只需要在聲明隊列時通過 ttl方法 設置即可。
三、在TTL隊列中存放為設置TTL的消息,消息是否移除?
? ?生產者代碼:
@RequestMapping("/ttl")public String ttl(){rabbitTemplate.convertAndSend(Constants.TTL_exchange,"ttl2" ,"ttl2 test...");return "消息發送成功";}
? ?測試:
? ?可以看到,消息同樣會過期
四、隊列TTL為20s,消息TTL為10s,消息什么時候過期?
? ?生產者代碼:
@RequestMapping("/ttl")public String ttl(){MessagePostProcessor messagePostProcessor = message -> {message.getMessageProperties().setExpiration("10000");return message;};rabbitTemplate.convertAndSend(Constants.TTL_exchange,"ttl" ,"ttl test 10s...",messagePostProcessor);rabbitTemplate.convertAndSend(Constants.TTL_exchange,"ttl2" ,"ttl2 test...");return "消息發送成功";}
? ?測試:
三、死信隊列
?3.1 什么是死信
? ? ?由于各種原因,導致的無法被消費的消息,就是死信,死信隊列就是用來存儲死心的隊列,當一個消息在隊列中變成死信后,可以被重新發送到另一個交換機DLX(Dead Letter Exchange)中,這個交換機綁定的隊列就是死信隊列DLQ(Dead Letter Queue)。
? ? 消息變成死信有以下幾種原因:
1> 消息過期
2> 消息被拒絕 ,且requeue參數置為false
3> 隊列達到最大長度
3.2 死信代碼示例
一、聲明隊列、交換機及綁定關系
@Bean("normalQueue")public Queue normalQueue(){return QueueBuilder.durable(Constants.NORMAL_QUEUE).ttl(10000).deadLetterExchange(Constants.DL_EXCHANGE).deadLetterRoutingKey("dl").build();}@Bean("normalExchange")public DirectExchange normalExchange(){return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build();}@Bean("normalBinding")public Binding normalBinding(@Qualifier("normalQueue") Queue queue,@Qualifier("normalExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with("normal");}@Bean("dlQueue")public Queue dlQueue(){return QueueBuilder.durable(Constants.DL_QUEUE).build();}@Bean("dlExchange")public DirectExchange dlExchange(){return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();}@Bean("dlBinding")public Binding dlBinding(@Qualifier("dlQueue") Queue queue,@Qualifier("dlExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with("dl");}
? ? ?上面在聲明了普通交換機、隊列以及死信交換機、隊列,還要聲明普通隊列與死信交換機的關系(確保消息變成死信后會通過死信交換機路由到死信隊列),只需要在聲明普通隊列時通過 deadLetterExchange 和 deadLetterRoutingKey 綁定即可。
二、測試由于消息過期而導致的死信
? ? 前面在聲明normalQueue時已經通過ttl方法設置了過期時間,所以只需編寫生產者代碼即可:
@RequestMapping("/dl")public String dl(){rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","dl test...");return "消息發送成功";}
? ? 運行程序,測試:
三、測試由于消息被拒絕導致的死信
? ? 編寫消費者代碼:
@Component public class DlListener {@RabbitListener(queues = Constants.NORMAL_QUEUE)public void handlerMessage1(Message message, Channel channel) throws IOException {Long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.printf("接收到消息:%s,deliveryTag: %d \n", new String(message.getBody(), "UTF-8"), message.getMessageProperties().getDeliveryTag());//業務邏輯處理System.out.println("業務邏輯處理");int num = 10 / 0;System.out.println("業務處理完成");channel.basicAck(deliveryTag, false);} catch (Exception e) {//!!!注意requeue一定要置為false才能變成死信System.out.println("業務處理失敗");channel.basicNack(deliveryTag, false, false);}}@RabbitListener(queues = Constants.DL_QUEUE)public void handlerMessage2(Message message, Channel channel) throws IOException {Long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("死信隊列接收到消息:%s,deliveryTag: %d \n", new String(message.getBody(), "UTF-8"), message.getMessageProperties().getDeliveryTag());channel.basicAck(deliveryTag,false);} }
? ? 測試:
四、測試由于隊列達到最大長度導致的死信
? ? 修改normal隊列的聲明(添加一個maxLength方法指定隊列最大長度):
@Bean("normalQueue")public Queue normalQueue(){return QueueBuilder.durable(Constants.NORMAL_QUEUE).ttl(10000).maxLength(1).deadLetterExchange(Constants.DL_EXCHANGE).deadLetterRoutingKey("dl").build();}
? ? 重新編寫生產者代碼,連續發送10條消息:
@RequestMapping("/dl")public String dl(){for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","dl test..." + i);}return "消息發送成功";}
? ? 由于隊列的最大長度為1,因此應該有9條消息進入死信隊列,測試: