RabbitMQ 訂單超時處理方案
使用 RabbitMQ 的 TTL + 死信隊列(DLX)
RabbitMQ 的 TTL(Time-To-Live) 和 死信隊列(Dead Letter Exchange) 是處理訂單超時的常見方案。核心思路是設置消息的過期時間,超時后自動轉入死信隊列進行后續處理。
步驟:
-
創建訂單時發送延遲消息
訂單創建后,向 RabbitMQ 發送一條帶有 TTL 的消息(例如 30 分鐘超時)。消息的 TTL 可以通過隊列或消息本身設置。// 設置消息的 TTL(單位:毫秒) AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration("1800000") // 30分鐘 = 30 * 60 * 1000.build(); channel.basicPublish("", "order_queue", properties, message.getBytes());
-
配置死信隊列
定義一個死信交換器(DLX)和死信隊列,綁定到原始隊列。當消息超時后,RabbitMQ 會自動將其路由到死信隊列。// 定義死信交換器和隊列 channel.exchangeDeclare("order_dlx", "direct"); channel.queueDeclare("order_dead_letter_queue", true, false, false, null); channel.queueBind("order_dead_letter_queue", "order_dlx", "dead_letter");// 原始隊列綁定死信交換器 Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", "order_dlx"); args.put("x-dead-letter-routing-key", "dead_letter"); channel.queueDeclare("order_queue", true, false, false, args);
-
消費死信隊列處理超時訂單
監聽死信隊列,接收到消息時檢查訂單狀態。若訂單未支付,則執行取消邏輯(如釋放庫存、標記訂單狀態)。DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);// 解析訂單ID,檢查狀態if (orderService.isOrderUnpaid(orderId)) {orderService.cancelOrder(orderId);} }; channel.basicConsume("order_dead_letter_queue", true, deliverCallback, consumerTag -> {});
使用 RabbitMQ 插件 rabbitmq_delayed_message_exchange
如果需要更靈活的延遲時間(如動態設置不同訂單的超時時間),可以使用官方插件 rabbitmq_delayed_message_exchange
,支持延遲消息投遞。
步驟:
-
安裝插件
在 RabbitMQ 服務器執行:rabbitmq-plugins enable rabbitmq_delayed_message_exchange
-
聲明延遲交換器
在 Java 代碼中聲明一個x-delayed-message
類型的交換器。Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); channel.exchangeDeclare("delayed_exchange", "x-delayed-message", true, false, args); channel.queueDeclare("delayed_queue", true, false, false, null); channel.queueBind("delayed_queue", "delayed_exchange", "delayed_routing_key");
-
發送延遲消息
通過x-delay
頭設置延遲時間(毫秒)。Map<String, Object> headers = new HashMap<>(); headers.put("x-delay", 1800000); // 30分鐘延遲 AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().headers(headers).build(); channel.basicPublish("delayed_exchange", "delayed_routing_key", props, message.getBytes());
-
消費延遲消息
監聽隊列處理超時邏輯,與死信隊列方案類似。
注意事項
-
冪等性處理
消息可能重復投遞(如消費者處理失敗),需確保取消訂單邏輯的冪等性。 -
消息持久化
若需要可靠性,將隊列和消息設置為持久化:channel.queueDeclare("order_queue", true, false, false, args); // 持久化隊列 AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2) // 持久化消息.build();
-
替代方案
高精度場景可結合數據庫定時任務或 Redis 的鍵過期通知,但 RabbitMQ 方案更適用于分布式系統解耦。