1. 簡介
在前面的高級特性中,我們介紹了重試機制和 TTL,那么產生下列問題:
- 在重試機制中,當消費者消費消息發生異常時,會觸發消息重發機制,由于我們配置了最大的重發次數,那么當超過這個次數后,若消息依然沒有被成功消費,就需要將消息進行保存,等待下一次消費,那么,這條消息應該保存到哪里去呢?
- 在 TTL 機制中,我們為隊列和消息設置了過期時間,當超過這個時間后消息就會被刪除,但是這條消息是需要被消費的,于是就需要將過期的消息保存下來,等待下次消費。但是,消息應該保存到哪里去呢?
在 RabbitMQ 中,將類似于結果的消息稱為死信,那么就涉及到,變成死信的消息應該存儲到哪里去?
2. 會產生死信的場景
- 消息重發后,次數到達指定重發次數依然未被消費,就會成為死信
- 消息到達過期時間依然沒有被消費,就會成為死信
- 隊列已經滿了,卻依然由消息入隊列,就會產生溢出,溢出的這部分消息就會成為死信
3. 死信隊列
在 RabbitMQ 中,可以聲明一個隊列,這個隊列專門用來存放死信,于是就成為死信隊列。
死信隊列的工作流程如下:
- 首先需要聲明一個死信交換機,與普通隊列進行綁定;
- 其次需要聲明一個死信隊列,與死信交換機進行綁定;
- 當普通隊列中的消息成為死信后,就會被發送給死信交換機,然后由死信交換機分配給與之綁定的死信隊列;
- 存儲在死信隊列中的死信會等待被別的消費者再次消費。
4. 配置死信交換機與死信隊列
聲明一個正常交換機,正常隊列,死信交換機,死信隊列,并將正常交換機與正常隊列進行綁定,將正常隊列與死信交換機進行綁定,將死信交換機與死信隊列進行綁定,代碼如下:
@Configuration
public class DLConfig {/*** 正常* @return*/@Bean("norQueue")public Queue norQueue() {return QueueBuilder.durable(Constants.NOR_QUEUE).ttl(10000) //過期時間 10s.deadLetterExchange(Constants.DL_EXCHANGE) //綁定死信交換機.deadLetterRoutingKey(Constants.DL_ROUTINGKEY).maxLength(10L) //隊列長度為 10.build();}@Bean("norExchange")public DirectExchange norExchange() {return ExchangeBuilder.directExchange(Constants.NOR_EXCHANGE).build();}@Bean("norBind")public Binding norBind(@Qualifier("norExchange") DirectExchange directExchange,@Qualifier("norQueue") Queue queue) {return BindingBuilder.bind(queue).to(directExchange).with(Constants.NOR_ROUTINGKEY);}/*** 死信*/@Bean("dlQueue")public Queue dlQueue() {return QueueBuilder.durable(Constants.DL_QUEUE).build();}@Bean("dlExchange")public DirectExchange dlExchange() {return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();}@Bean("dlBind")public Binding dlBind(@Qualifier("dlExchange") DirectExchange directExchange,@Qualifier("dlQueue") Queue queue) {return BindingBuilder.bind(queue).to(directExchange).with(Constants.DL_ROUTINGKEY);}
}
在上面的代碼中,我們指定了下面的條件:
- 隊列的過期時間為 10s
- 隊列長度為 10?
重試機制的配置如下:
spring:rabbitmq:listener:simple:acknowledge-mode: autoretry:enabled: true # 開啟消費者失敗重試initial-interval: 5000ms # 初始失敗等待時?為5秒max-attempts: 5 # 最?重試次數(包括自身消費的?次)
在上面的配置中,設置的最大的重發次數為 5 次。
5. 驗證消息重試超過規定次數是否進入死信隊列
生產者代碼如下:
@RequestMapping("/dl")public String dl() {String messageInfo = "dl... ";rabbitTemplate.convertAndSend(Constants.NOR_EXCHANGE, Constants.NOR_ROUTINGKEY, messageInfo);return "消息發送成功";}
消費者代碼如下:
@Component
@Slf4j
public class DLListener {@RabbitListener(queues = Constants.NOR_QUEUE)public void listener(Message message) throws IOException {String messageInfo = new String(message.getBody());long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("接收消息, message: {}, deliveryTag: {}", messageInfo, deliveryTag);int num = 1 / 0;log.info("消息消費完成");}
}
代碼運行結果如下:
?
消息重發五次后拋出異常,觀察 RabbitMQ 客戶端后,這條消息已經存放進了死信隊列中:
??
6. 驗證 TTL 與死信隊列
生產者代碼如下:
@RequestMapping("/dl")public String dl() {String messageInfo = "dl... ";rabbitTemplate.convertAndSend(Constants.NOR_EXCHANGE, Constants.NOR_ROUTINGKEY, messageInfo);return "消息發送成功";}
將正常隊列的 TTL 設置為 10s,運行代碼后,觀察 RabbitMQ 客戶端:
10s 前:
10s 后:
?
7. 驗證隊列溢出與死信隊列
將正常隊列的長度設置為 10,生產者向 RabbitMQ 發送 20 條數據,生產者代碼如下:
@RequestMapping("/dl")public String dl() {for (int i = 0; i < 20; i++) {String messageInfo = "dl... " + i;rabbitTemplate.convertAndSend(Constants.NOR_EXCHANGE, Constants.NOR_ROUTINGKEY, messageInfo);}return "消息發送成功";}
運行代碼,觀察 RabbitMQ 客戶端:
可以看到,溢出的 10 條消息進入了死信隊列。?