文章目錄
- 基于Spring Boot的RabbitMQ延時隊列技術實現
- 延時隊列應用場景
- 基本概念
- 實現延時隊列
- 添加依賴
- 基礎配置
- 配置類設計
- 消息生產者
- 消息消費者
- 兩種TTL設置方式
- 訂單超時關閉實例
- 訂單服務
- 消息處理
- 延遲消息插件
- 安裝插件
- 配置延遲交換機
基于Spring Boot的RabbitMQ延時隊列技術實現
延時隊列應用場景
- 訂單系統:30分鐘未支付訂單自動取消
1. 用戶下單 → 發送延時消息(30分鐘TTL)
2. 消息進入普通隊列等待
3. 30分鐘后消息過期 → 轉入死信隊列
4. 消費者檢查訂單狀態:- 未支付 → 執行關閉操作- 已支付 → 忽略
- 定時通知:預約提醒服務
場景:會議開始前15分鐘提醒
1. 創建會議時發送延時消息
2. 消息存活直到會議開始前15分鐘
3. 觸發通知服務發送提醒
- 異步重試:失敗任務延時重試機制
消息處理失敗時:
1. 首次失敗 → 延時5秒重試
2. 二次失敗 → 延時30秒重試
3. 三次失敗 → 進入死信隊列人工處理
- 物流跟蹤:預計送達時間狀態更新
基本概念
延遲消息:發送者發送消息時指定一個時間,消費者不會立刻收到消息,而是在指定時間之后才收到消息。
延遲任務:設置在一定時間之后才執行的任務
當一個隊列中的消息滿足下列情況之一時,就會成為死信(dead letter):
- 消息被拒絕且不重新入隊:消費者使用 basic.reject 或 basic.nack 聲明消費失敗,并且消息的 requeue 參數設置為 false
- 消息過期:消息是一個過期消息(達到了隊列或消息本身設置的過期時間),超時無人消費
- 隊列達到最大長度:要投遞的隊列消息堆積滿了,最早的消息可能成為死信
如果隊列通過 dead-letter-exchange
屬性指定了一個交換機,那么該隊列中的死信就會投遞到這個交換機中。這個交換機稱為死信交換機(Dead Letter Exchange,簡稱 DLX)。
RabbitMQ 本身沒有直接的延時隊列功能,通常是通過死信隊列和**TTL(Time-To-Live)**來實現的。
[生產者] → [普通隊列(設置TTL)] → (消息過期)→ [死信隊列] → [消費者]
實現延時隊列
添加依賴
<!-- amqp 依賴 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- Mybatis-Plus包 -->
<dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.1</version>
</dependency>
<!-- MySQL驅動包 -->
<dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><scope>runtime</scope>
</dependency>
<!-- lombok包 -->
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional>
</dependency>
基礎配置
server:port: 8080
spring:datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://127.0.0.1:3306/smbms?useUnicode=true&characterEncoding=UTF-8&useSSL=falseusername: rootpassword: rootrabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guest
mybatis-plus:type-aliases-package: com.hz.pojo #類型別名所在的包#控制臺打印sql語句configuration:log-impl: org.apache.ibatis.logging.stdout.StdOutImplmap-underscore-to-camel-case: false # 駝峰映射
死信隊列三要素
- DLX (Dead-Letter-Exchange):死信轉發交換機
- DLK (Dead-Letter-Routing-Key):死信路由鍵
- TTL (Time-To-Live):消息存活時間
配置類設計
@Configuration
public class RabbitMQConfig {// 業務交換機public static final String BUSINESS_EXCHANGE = "business.exchange";// 業務隊列public static final String BUSINESS_QUEUE = "business.queue";// 死信交換機public static final String DLX_EXCHANGE = "dlx.exchange";// 死信隊列public static final String DLX_QUEUE = "dlx.queue";// 業務隊列路由鍵private static final String BUSINESS_ROUTING_KEY = "business.key";// 死信路由鍵private static final String DLX_ROUTING_KEY = "dlx.key";// 聲明業務交換機(直連型)@Beanpublic DirectExchange businessExchange() {return new DirectExchange(BUSINESS_EXCHANGE);}// 聲明死信交換機@Beanpublic DirectExchange dlxExchange() {return new DirectExchange(DLX_EXCHANGE);}// 聲明業務隊列(綁定死信屬性)@Beanpublic Queue businessQueue() {Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", DLX_EXCHANGE); // 設置死信交換機args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY); // 設置死信路由鍵args.put("x-message-ttl", 10000); // 隊列統一TTL(單位:毫秒)return new Queue(BUSINESS_QUEUE, true, false, false, args);}// 聲明死信隊列@Beanpublic Queue dlxQueue() {return new Queue(DLX_QUEUE);}// 綁定業務隊列到交換機@Beanpublic Binding businessBinding() {return BindingBuilder.bind(businessQueue()).to(businessExchange()).with(BUSINESS_ROUTING_KEY);}// 綁定死信隊列到交換機@Beanpublic Binding dlxBinding() {return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTING_KEY);}
}
消息生產者
@Service
public class MessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 發送延時消息* @param message 消息內容* @param ttl 單位:秒*/public void sendDelayMessage(String message, int ttl) {// 消息屬性設置MessagePostProcessor processor = message -> {message.getMessageProperties().setExpiration(String.valueOf(ttl * 1000)); // 消息級別TTLreturn message;};rabbitTemplate.convertAndSend(RabbitMQConfig.BUSINESS_EXCHANGE,RabbitMQConfig.BUSINESS_ROUTING_KEY,message,processor);}
}
消息消費者
@Component
public class MessageConsumer {@Autowiredprivate BillService billService;@RabbitListener(queues = RabbitMQConfig.DLX_QUEUE)public void processDelayMessage(String billCode) {System.out.println("收到延時消息:" + billCode);billService.closeBill(billCode);System.out.println("超時未支付,訂單已關閉--------------");}
}
兩種TTL設置方式
隊列級別TTL
args.put("x-message-ttl", 10000);
隊列中所有消息統一過期時間;消息實際存活時間 = 隊列TTL;性能更優(RabbitMQ統一處理)
消息級別TTL
message.getMessageProperties().setExpiration("5000");
每個消息可以設置不同TTL;實際存活時間取最小值(隊列TTL vs 消息TTL);需要逐個處理消息,性能開銷較大
訂單超時關閉實例
訂單服務
@Service
public class BillService {@Autowiredprivate MessageProducer messageProducer;@Resourceprivate BillMapper billMapper;public void createBill(Bill bill) {// 保存訂單到數據庫bill.setIsPayment(1); // 設置初始狀態 1:未支付 2:已支付 3:已關閉billMapper.insert(bill);// 發送延時消息(10s)messageProducer.sendDelayMessage(bill.getBillCode(), 10);}public void closeBill(String billCode) {Bill bill = billMapper.selectOne(new QueryWrapper<Bill>().eq("billCode", billCode));if (bill != null && bill.getIsPayment() == 1) {bill.setIsPayment(3);billMapper.updateById(bill);}}
}
消息處理
@RestController
@RequestMapping("/bill")
public class BillController {@Autowiredprivate BillService billService;@GetMapping("/send")public String send(){// 創建測試訂單Bill bill = new Bill();bill.setBillCode("BILL2025_999");bill.setProductName("可口可樂");// 創建賬單并發送延時消息billService.createBill(bill);return "訂單創建成功,10秒后未支付將自動關閉。訂單號:" + bill.getBillCode();}
}
流程:
-
訪問
localhost:8080/bill/send
創建測試訂單 -
訂單初始狀態為待支付(1)
-
消息經過10秒延遲進入死信隊列
-
消費者處理消息時檢查訂單狀態
-
若仍為未支付狀態,更新為已關閉(3)
延遲消息插件
RabbitMQ 提供了官方插件 rabbitmq_delayed_message_exchange
,它允許你發送延遲消息而無需設置消息的 TTL 和死信隊列。這個插件提供了一個新的交換機類型 x-delayed-message
,可以用來實現消息的延遲投遞。
安裝插件
可以從 RabbitMQ 的插件頁面下載,或者直接使用以下命令進行安裝(假設 RabbitMQ 安裝在默認位置):
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
安裝完成后,重啟 RabbitMQ 服務。
配置延遲交換機
@Bean
public CustomExchange delayedExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange("delayed.exchange", "x-delayed-message", true, false, args);
}// 發送消息時設置延遲頭
rabbitTemplate.convertAndSend("delayed.exchange", "routing.key", message, msg -> {msg.getMessageProperties().setHeader("x-delay", 5000);return msg;
});