SpringBoot使用RabbitMQ實現延遲隊列
- 需求和目標
- 名詞解釋
- 實現方式
- 引入依賴
- 添加配置文件
- 配置類
- 死信隊列消費者
- 即時隊列消費者
- 延遲消息發送
- 結果
- 注意
需求和目標
商城系統,用戶下單后若15分鐘內仍未完成支付,則自動取消訂單,若已支付,不做其他特殊操作
系統還需要支持即時消息的功能,即發即收。
名詞解釋
①即時隊列:即發即收
②延遲隊列:發了消息,沒有接收方,只有消息過期后才被處理
③死信隊列:延遲隊列上的消息過期后,會被自動轉發到死信隊列中,從而最終達到延遲的目的
實現方式
本文采用RabbitMQ自身屬性:
TTL(Time To Live存活時間) + DLX(Dead-Letter-Exchange死信交換機)
實現延遲隊列,先將消息發到指定了TTL時長的隊列A中,隊列A沒有消費者,也就是說,隊列A中的消息肯定會過期,等消息過期后,就會加入到隊列B,也就是死信隊列,B隊列是有消費者在監聽的,一旦收到消息,就進行后續的邏輯處理,從而達到延遲效果。
這種實現方式只能為隊列設置消息延遲的時長,不能為每個消息指定延遲時長,粒度比較粗,請注意使用的業務場景!
引入依賴
<!--rabbitmq-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
添加配置文件
分別聲明了:即時、延遲、死信的相關信息
其中,延遲和死信是相互配合形成了延遲隊列
# rabbitMQ配置
mq:rabbit:host: 127.0.0.1:5672virtualHost: /username: testUserpassword: 123456normal-exchange: wms_exchange_normalnormal-queue: wms_queue_normalnormal-routing-key: wms_routing_key_normaldelay-exchange: wms_exchange_delaydelay-queue: wms_queue_delaydelay-routing-key: wms_routing_key_delaydlx-exchange: wms_exchange_dlxdlx-queue: wms_queue_dlxdlx-routing-key: wms_routing_key_dlx
配置類
package com.nwd.common.config;import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {// 從配置文件中讀取參數@Value("${mq.rabbit.host}")String HOST;@Value("${mq.rabbit.username}")String USERNAME;@Value("${mq.rabbit.password}")String PASSWORD;@Value("${mq.rabbit.normal-exchange}")String NORMAL_EXCHANGE;@Value("${mq.rabbit.normal-queue}")String NORMAL_QUEUE;@Value("${mq.rabbit.normal-routing-key}")String NORMAL_ROUTING_KEY;@Value("${mq.rabbit.delay-exchange}")String DELAY_EXCHANGE;@Value("${mq.rabbit.delay-queue}")String DELAY_QUEUE;@Value("${mq.rabbit.delay-routing-key}")String DELAY_ROUTING_KEY;@Value("${mq.rabbit.dlx-exchange}")String DLX_EXCHANGE;@Value("${mq.rabbit.dlx-queue}")String DLX_QUEUE;@Value("${mq.rabbit.dlx-routing-key}")String DLX_ROUTING_KEY;//創建mq連接@Bean(name = "connectionFactory")public ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setUsername(USERNAME);connectionFactory.setPassword(PASSWORD);//connectionFactory.setVirtualHost(virtualHost);connectionFactory.setPublisherConfirms(true);//該方法配置多個host,在當前連接host down掉的時候會自動去重連后面的hostconnectionFactory.setAddresses(HOST);//connectionFactory.setPort(Integer.parseInt(port));return connectionFactory;}// 即時隊列===========================================@Beanpublic Queue normalQueue() {return new Queue(NORMAL_QUEUE);}@Beanpublic DirectExchange normalDirectExchange(){return new DirectExchange(NORMAL_EXCHANGE);}@Beanpublic Binding normalBinding(){return BindingBuilder.bind(normalQueue()).to(normalDirectExchange()).with(NORMAL_ROUTING_KEY);}// 即時隊列===========================================// 延遲隊列===========================================@Beanpublic Queue delayQueue(){Map<String,Object> map = new HashMap<>();//message在該隊列queue的存活時間最大為15分鐘map.put("x-message-ttl", 10000*6*15);//x-dead-letter-exchange參數是設置該隊列的死信交換器(DLX)map.put("x-dead-letter-exchange", DLX_EXCHANGE);//x-dead-letter-routing-key參數是給這個DLX指定路由鍵map.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);return new Queue(DELAY_QUEUE,true,false,false,map);}@Beanpublic DirectExchange delayDirectExchange(){return new DirectExchange(DELAY_EXCHANGE);}@Beanpublic Binding delayBinding(){return BindingBuilder.bind(delayQueue()).to(delayDirectExchange()).with(DELAY_ROUTING_KEY);}// 延遲隊列===========================================// 死信隊列===========================================@Beanpublic Queue dlxQueue() {return new Queue(DLX_QUEUE);}@Beanpublic DirectExchange dlxDirectExchange(){return new DirectExchange(DLX_EXCHANGE);}@Beanpublic Binding dlxBinding(){return BindingBuilder.bind(dlxQueue()).to(dlxDirectExchange()).with(DLX_ROUTING_KEY);}// 死信隊列===========================================
}
死信隊列消費者
package com.nwd.module.mq;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 死信隊列消息處理* 此隊列消費到的,是經過延遲之后的消息* @author niuwenda* @since 2024-06-03 09:50*/
@Slf4j
@Component
@RabbitListener(queues = "${mq.rabbit.dlx-queue}")
public class DlxMsgConsumer {@RabbitHandler(isDefault = true)public void process(String msg, Message message, Channel channel) {try {// 處理消息的業務邏輯log.info("RabbitMq:死信隊列接收到消息,{}",msg);// 此處應判斷訂單是否已完成支付,若未完成,后續繼續編寫取消訂單邏輯// .....} catch (Exception e) {// 發生異常時,打印日志并拒絕消息(不重新放入隊列)System.out.println("Error processing message: " + e.getMessage());/*try {channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception ex) {// 處理拒絕消息的異常}*/}}
}
即時隊列消費者
保證系統有即發即收的功能,此處代碼與訂單需求無關
package com.nwd.module.mq;import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** mq消息接收處理器* @author niuwenda* @since 2024-06-03 09:50*/
@Slf4j
@Component
@RabbitListener(queues = "${mq.rabbit.normal-queue}")
public class MqMsgConsumer {@RabbitHandler(isDefault = true)public void process(String msg, Message message, Channel channel) {try {// 處理消息的業務邏輯log.info("RabbitMq1:接收到消息,{}",msg);JSONObject msgObj = JSONObject.parseObject(msg);// 手動確認消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 發生異常時,打印日志并拒絕消息(不重新放入隊列)System.out.println("Error processing message: " + e.getMessage());/*try {channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception ex) {// 處理拒絕消息的異常}*/}}
}
延遲消息發送
可以寫在controller中,測試時,用接口調用來發送消息
@Resource
private RabbitTemplate rabbitTemplate;@Value("${mq.rabbit.delay-exchange}")
private String exchange;rabbitTemplate.convertAndSend(exchange, routingKey, param);
log.info("RabbitMq發送消息成功:{}", param);
結果
可看到,消息延遲了10秒收到
2024-06-03 16:09:23.640 INFO RabbitMqUtil : RabbitMq發送消息成功:helloMQ
2024-06-03 16:09:33.655 INFO DlxMsgConsumer : RabbitMq:死信隊列接收到消息,helloMQ
注意
延遲消息插件內部會維護一個本地數據庫表,同時使用Elang Timers功能實現計時。如果消息的延遲時間設置較長,可能會導致堆積的延遲消息非常多,會帶來較大的CPU開銷,同時延遲消息的時間會存在誤差。
因此,不建議設置延遲時間過長的延遲消息,如果時間過長,建議使用任務調度。