1. 簡介
在某些場景下,當生產者發送消息后,可能不需要讓消費者立即接收到,而是讓消息延遲一段時間后再發送給消費者。
2. 實現方式
2.1 TTL + 死信隊列
給消息設置過期時間后,若消息在這段時間內沒有被消費,就會將消息發送到死信隊列中,我們可以利用這一特性,將需要延遲發送的消息設置過期時間,然后再讓消費者從死信隊列中獲取消息,這樣就實現了消息的延遲發送。
隊列與交換機配置如下:
@Configuration
public class DLConfig {/*** 正常隊列、交換機* @return*/@Bean("norQueue")public Queue norQueue() {return QueueBuilder.durable(Constants.NOR_QUEUE).deadLetterExchange(Constants.DL_EXCHANGE) //綁定死信交換機.deadLetterRoutingKey(Constants.DL_ROUTINGKEY).build();}@Bean("norExchange")public DirectExchange norExchange() {return ExchangeBuilder.directExchange(Constants.NOR_EXCHANGE).build();}@Bean("norBind")public Binding norBind(@Qualifier("norExchange") DirectExchange directExchange,@Qualifier("norQueue") Queue queue) {return BindingBuilder.bind(queue).to(directExchange).with(Constants.NOR_ROUTINGKEY);}/*** 死信隊列、交換機*/@Bean("dlQueue")public Queue dlQueue() {return QueueBuilder.durable(Constants.DL_QUEUE).build();}@Bean("dlExchange")public DirectExchange dlExchange() {return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();}@Bean("dlBind")public Binding dlBind(@Qualifier("dlExchange") DirectExchange directExchange,@Qualifier("dlQueue") Queue queue) {return BindingBuilder.bind(queue).to(directExchange).with(Constants.DL_ROUTINGKEY);}
}
生產者代碼如下:
@RequestMapping("/dl1")public String dl1() {String messageInfo = "dl... " + new Date();MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("10000"); //10s 后過期return message;}};rabbitTemplate.convertAndSend(Constants.NOR_EXCHANGE, Constants.NOR_ROUTINGKEY, messageInfo, messagePostProcessor);return "消息發送成功";}
消費者代碼如下:
@Component
@Slf4j
public class DLListener {/*** ttl + 死信隊列 -> 延時隊列* @param message*/@RabbitListener(queues = Constants.DL_QUEUE)public void listener(Message message) {String messageInfo = new String(message.getBody());log.info("接收到消息: {}, time: {}", messageInfo, new Date());}
}
由于消息發送到了死信隊列,于是我們只需要從死信隊列中獲取消息即可。
代碼運行結果如下:
從運行結果中可以看出,消息延遲了 10s 才被消費。
這種實現方式的問題:
但是,當我們連續發送兩條消息,第一條消息的過期時間為 15s,第二條消息的過期時間為 10s,代碼運行結果如下:
這里我們看到,雖然第二條消息先過期,但卻和第一條消息一起被消費,按照正常情況下第二條消息應該率先被消費,于是這種實現方式存在一定的問題。
2.2?使用插件?
2.2.1 安裝插件
雖然 RabbitMQ 沒有提供延遲隊列的使用方式,但是提供了延遲隊列的插件,我們可以安裝插件并使用。
插件安裝地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
需要下載 .ez 的插件。
需要根據本機的 RabbitMQ 版本選擇匹配的插件版本,不然無法使用。
插件下載完成后,需要將插件放到?/usr/lib/rabbitmq/plugins 目錄下,若沒有需要進行創建。
安裝完成后,使用下面這行命令啟動插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
啟動完成后,需要重啟 RabbitMQ 服務,這樣插件就能正常運行。
2.2.2 使用插件
插件安裝完成后,交換機的類型就會多出下面一種:
即延遲隊列,于是我們在聲明交換機是,就能夠聲明這個類型的交換機。
隊列與交換機配置如下:
@Configuration
public class DelayConfig {@Bean("delayQueue")public Queue delayQueue() {return QueueBuilder.durable(Constants.DELAY_QUEUE).build();}/*** 延遲交換機* @return*/@Bean("delayExchange")public DirectExchange delayExchange() {return ExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE).delayed().build();}@Bean("delayBind")public Binding delayBind(@Qualifier("delayExchange") DirectExchange directExchange,@Qualifier("delayQueue") Queue queue) {return BindingBuilder.bind(queue).to(directExchange).with(Constants.DELAY_ROUTINGKEY);}
}
?在聲明交換機時,使用了 delayed 來聲明該隊列是延遲隊列。
生產者代碼如下:
@RequestMapping("/delay")public String delay() {String messageInfo = "delay ...";rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, Constants.DELAY_ROUTINGKEY, messageInfo + "25000ms", message -> {message.getMessageProperties().setDelayLong(20000L); //過期時間,單位為 msreturn message;});rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, Constants.DELAY_ROUTINGKEY, messageInfo + "10000ms", message -> {message.getMessageProperties().setDelayLong(10000L); //過期時間,單位為 msreturn message;});return "消息發送成功";}
消費者代碼如下:
@Component
@Slf4j
public class DelayListener {@RabbitListener(queues = Constants.DELAY_QUEUE)public void listener(Message message) {log.info("接收到消息: {}, time: {}", new String(message.getBody()), new Date());}
}
運行結果如下:
從結果中可以看出,雖然第二條消息的過期時間是后入隊列的,但是卻會先被消費,這就解決了 TTL + 死信隊列實現方式的不足。