概念
延遲隊列顧名思義就是消息不立即發送給消費者消費,而是延遲一段時間再交給消費者。
RabbitMQ本身沒有直接支持延遲隊列的的功能,但是可以通過前面所介紹的TTL+死信隊列的方式組合
模擬出延遲隊列的功能.
RabbitMQ 有些版本還支持延遲隊列的插件安裝,我們也可以通過安裝這個插件實現延遲隊列的功能。
TTL + 死信隊列
實現思路:
假設一個應用中需要將每條消息都設置為10秒的延遲,生產者通過normal_exchange這個交換器將發送的消息存儲在normal_queue這個隊列中.消費者訂閱的并非是normal_queue這個隊列,而是dlx_queue這個隊列.當消息從normal_queue這個隊列中過期之后被存入dlx_queue這個隊列中,消費者就恰巧消費到了延遲10秒的這條消息。
代碼演示:
常量設置:
//死信隊列public static final String DL_QUEUE = "DL_QUEUE";public static final String DL_EXCHANGE = "DL_EXCHANGE";public static final String DL_KEY = "DL_KEY";//普通隊列public static final String NORMAL_QUEUE = "NORMAL_QUEUE";public static final String NORMAL_EXCHANGE = "NORMAL_EXCHANGE";public static final String NORMAL_KEY = "NORMAL_KEY";
聲明隊列、交換機、綁定關系:
//普通隊列@Bean("normalQueue")public Queue normalQueue() {return QueueBuilder.durable(MQConstants.NORMAL_QUEUE).deadLetterExchange(MQConstants.DL_EXCHANGE).deadLetterRoutingKey(MQConstants.DL_KEY).build();}@Bean("normalExchange")public Exchange normalExchange() {return ExchangeBuilder.directExchange(MQConstants.NORMAL_EXCHANGE).durable(true).build();}@Bean("normalBinding")public Binding normalBinding(@Qualifier("normalExchange") Exchange exchange, @Qualifier("normalQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(MQConstants.NORMAL_KEY).noargs();}//死信隊列@Bean("dlQueue")public Queue dlQueue() {return QueueBuilder.durable(MQConstants.DL_QUEUE).build();}@Bean("dlExchange")public Exchange dlExchange() {return ExchangeBuilder.directExchange(MQConstants.DL_EXCHANGE).durable(true).build();}@Bean("dlBinding")public Binding dlBinding(@Qualifier("dlExchange") Exchange exchange, @Qualifier("dlQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(MQConstants.DL_KEY).noargs();}
生產者:將消息過期時間設置為 10 s
@RequestMapping("/dl")public String dl() {for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend(MQConstants.NORMAL_EXCHANGE, MQConstants.NORMAL_KEY, "dl" + i, message -> {message.getMessageProperties().setExpiration("10000");return message;});}return "消息發送成功";}
消費者需要消費的隊列是死信隊列:
@Component
@RabbitListener(queues = MQConstants.DL_QUEUE)
public class DLListener {@RabbitHandlerpublic void handle(String messageContent, Channel channel, Message message) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {channel.basicAck(deliveryTag, false);System.out.println("消息成功消費:" + messageContent);} catch (Exception e) {channel.basicNack(deliveryTag, false, false);}}
}
存在的問題
當我們先發送一條延遲時間長的消息,然后再發送一條延遲時間短的消息,我們會發現,短的消息并沒有被即使消費,而是等到長的消息時間一到,才被消費了
@RequestMapping("/dl")public String dl() {rabbitTemplate.convertAndSend(MQConstants.NORMAL_EXCHANGE, MQConstants.NORMAL_KEY, "30s ",message -> {message.getMessageProperties().setExpiration("30000");return message;});rabbitTemplate.convertAndSend(MQConstants.NORMAL_EXCHANGE, MQConstants.NORMAL_KEY, "10s ",message -> {message.getMessageProperties().setExpiration("10000");return message;});return "消息發送成功";}
原因如下:
消息過期之后,不一定會被馬上丟棄,因為RabbitMQ只會檢查隊首消息是否過期,如果過期則丟到死信隊列,此時就會造成一個問題,如果第一個消息的延時時間很長,第二個消息的延時時間很短,那第二個
消息并不會優先得到執行。
所以在考慮使用TTL+死信隊列實現延遲任務隊列的時候,需要確認業務上每個任務的延遲時間是一致的,如果遇到不同的任務類型需要不同的延遲的話,需要為每一種不同延遲時間的消息建立單獨的消息隊列。
延遲隊列的插件
安裝
官方文檔:Scheduling Messages with RabbitMQ
下載鏈接:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
下載的插件需要存放到哪個目錄:https://www.rabbitmq.com/docs/installing-plugins
根據你不同的環境去選擇不同的目錄:
Linux命令:
#查看插件列表
rabbitmq-plugins list#啟動插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange#重啟服務
service rabbitmq-server restart
我們去到 rabbitmq 管理界面查看 exchange 有沒有延遲類型 “x-delayed-messge” ,如果存在這一類型說明我們的插件安裝成功了
代碼演示
常量類:
//延遲隊列public static final String DELAY_QUEUE = "DELAY_QUEUE";public static final String DELAY_EXCHANGE = "DELAY_EXCHANGE";public static final String DELAY_KEY = "DELAY_KEY";
聲明:
//延遲隊列@Bean("delayQueue")public Queue delayQueue() {return QueueBuilder.durable(MQConstants.DELAY_QUEUE).build();}@Bean("delayExchange")public Exchange delayExchange() {return ExchangeBuilder.directExchange(MQConstants.DL_EXCHANGE).durable(true).delayed().build();}@Bean("delayBinding")public Binding delayBinding(@Qualifier("delayExchange") Exchange exchange, @Qualifier("delayQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(MQConstants.DELAY_KEY).noargs();}
生產者:這里我們發送三條不同過期時間的消息來進行演示:
通過setDelayLong() 方法設置延遲時間
@RequestMapping("/delay")public String delay() {rabbitTemplate.convertAndSend(MQConstants.DELAY_EXCHANGE, MQConstants.DELAY_KEY, "30s ",message -> {message.getMessageProperties().setDelayLong(30000L);return message;});rabbitTemplate.convertAndSend(MQConstants.DELAY_EXCHANGE, MQConstants.DELAY_KEY, "10s ",message -> {message.getMessageProperties().setDelayLong(10000L);return message;});rabbitTemplate.convertAndSend(MQConstants.DELAY_EXCHANGE, MQConstants.DELAY_KEY, "40s ", message -> {message.getMessageProperties().setDelayLong(40000L);return message;});return "消息發送成功";}
這里我們將確認模式設置為自動模式,不進行手動確認,便于我們書寫代碼:
@Component
@RabbitListener(queues = MQConstants.DELAY_QUEUE)
public class DelayListener {@RabbitHandlerpublic void handle(String message) {System.out.printf("%tc 接收到的消息為:%s\n", new Date(), message);}
}
最終效果:
總結
1.基于死信實現的延遲隊列
a優點:1)靈活不需要額外的插件支持
b.缺點: 1) 存在消息順序問題 2)需要額外的邏輯來處理死信隊列的消息,增加了系統的復雜性
2.基于插件實現的延遲隊列
a.優點:1)通過插件可以直接創建延遲隊列,簡化延遲消息的實現. 2)避免了DLX的時序問題
b.缺點:1)需要依賴特定的插件,有運維工作2)只適用特定版本