目錄
- 一、死信交換機【不推薦】
- 二、延遲消息插件【推薦】
- 2.1 安裝插件【Linux】
- 2.2 安裝插件【Windows】
- 2.3 如何使用
延時消息:生產者發送消息時指定一個時間,消費者不會立刻收到消息,而是在指定時間之后才收到消息。
延時任務:設置一定時間之后才執行的任務。
一、死信交換機【不推薦】
當一個隊列的消息滿足下列情況之一時,就會成為死信(dead letter):
- 消費者使用basic.reject或basic.nack聲明消費失敗,并且消息的requeue參數設置為false。
- 消息是一個過期消息(達到了隊列或消息本身設置的過期時間),超時無人消費。
- 要投遞的隊列消息堆積滿了,最早的消息可能成為死信。
如果隊列通過dead-letter-exchange屬性指定了一個交換機,那么該隊列中的死信就會投遞到這個交換機中。這個交換機也稱之為死信交換機
。
具體實現流程如下:
- 首先創建兩個隊列
direct.queue、dlx.queue
,需要注意的是在創建direct.queue
隊列時,需要綁定死信交換機。
如何綁定死信交換機:選中Dead letter exchange
輸入交換機的名稱
2. 創建兩個交換機分別綁定兩個隊列mt.direct、mt.dlx.direct
3. 消費者監聽死信隊列,并給mt.direct
發送定時消息
@Test
public void dlxExchangeTest(){String exchangeName = "mt.direct";String message = "黃色警報 ......";rabbitTemplate.convertAndSend(exchangeName, "dlx", message, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("1000"); // 設置過期時間,單位ms,1000=1sreturn message;}});
}
@RabbitListener(queues = "dlx.queue")
public void listenDlxQueue(String message){System.out.println(String.format("消費者收到了dlx.queue: %s", message));
}
二、延遲消息插件【推薦】
要想使用延遲消息,需要先安裝延遲消息插件rabbitmq_delayed_message_exchange,根據自己RabbitMQ的版本去下載。
2.1 安裝插件【Linux】
去官網下載插件。
將插件放入RabbitMQ的plugins
中,具體路徑如下:
/usr/lib/rabbitmq/lib/rabbitmq_server-3.13.7/plugins
安裝插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
重啟RabbitMQ服務
systemctl restart rabbitmq-server
再次登錄rabbitmq,如果exchange的類型中出現:x-delayed-message,說明該插件安裝成功!
2.2 安裝插件【Windows】
將插件放入RabbitMQ的plugins
中,具體路徑如下:
xxx\RabbitMQ Server\rabbitmq_server-3.12.10\plugins
然后進入到RabbitMQ額度sbin
目錄下,執行以下命令:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
2.3 如何使用
rabbitmq_delayed_message_exchange
插件的實現原理是設計了一種支持延遲消息功能的交換機,當消息投遞到交換機后可以暫存一段時間,到期后在投遞到隊列。
Java注解的實現方式
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue", durable = "true"),exchange = @Exchange(name = "mt.delay.direct", delayed = "true"),key = "delay"
))
public void listenDelayQueue(String message){System.out.println(String.format("消費者收到了delay.queue: %s", message));
}
Java Bean的實現方式
@Bean
public DirectExchange delayExchange() {return ExchangeBuilder.directExchange("mt.delay.direct").durable(true).delayed().build();
}
消費者
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue", durable = "true"),exchange = @Exchange(name = "mt.delay.direct", delayed = "true"),key = "delay"
))
public void listenDelayQueue(String message){System.out.println(String.format("消費者收到了delay.queue: %s", message));
}
生產者
@Test
public void delayExchangeTest(){String exchangeName = "mt.delay.direct";String message = "延遲警報 ......";rabbitTemplate.convertAndSend(exchangeName, "delay", message, new DelayMessageProcessor(5000));
}
package com.ming.processor;import lombok.RequiredArgsConstructor;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;/*** @RequiredArgsConstructor 是Lombok庫提供的一個注解,用于自動生成包含必需參數的構造函數。必需參數是指那些被聲明為 final 或者有 @NonNull 注解的成員變量。*/
@RequiredArgsConstructor
public class DelayMessageProcessor implements MessagePostProcessor {/*** 定義延遲時間*/private final int delay;@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(delay);return message;}
}