死信交換機(Dead Letter Exchange,DLX)
- 定義:死信交換機是一種特殊的交換機,專門用于**接收從其他隊列中因特定原因變成死信的消息**。它的本質還是交換機,遵循RabbitMQ中交換機的基本工作原理,如根據路由規則將消息發送到綁定的隊列。
- 作用:為死信提供一個集中處理的入口點。通過將死信發送到死信交換機,再由其路由到相應的死信隊列,可以方便地對這些異常消息進行統一管理和處理,確保數據不丟失。
死信隊列(Dead Letter Queue,DLQ)
- 定義:死信隊列用于存儲那些無法在正常流程中被消費的消息,即死信。這些消息進入死信隊列后,可以后續進行分析、重試或其他特殊處理。
- 產生死信的原因:
- 消息被拒絕且不重新入隊:消費者調用
basic.reject
或basic.nack
方法拒絕消息,并將requeue
參數設置為false
,表明該消息不再重新放回原隊列等待消費,從而成為死信。 - 消息過期:可以為消息或隊列設置生存時間(TTL,Time-To-Live)。當消息在隊列中的存活時間超過設定的TTL值時,消息就會過期成為死信。消息的TTL既可以在發送消息時針對單條消息設置,也可以在聲明隊列時對隊列中的所有消息統一設置。
- 隊列達到最大長度:當為隊列設置了最大長度(
Max-Length
),并且隊列中的消息數量達到這個上限時,新進入的消息會被丟棄成為死信。
- 消息被拒絕且不重新入隊:消費者調用
代碼舉例
下面將用代碼舉例,由于消息過期而進入死信隊列
初始化RabbitMQ的連接配置、隊列和交換機的聲明
/*** RabbitMQ配置類* 負責管理RabbitMQ的連接配置、隊列和交換機的聲明*/
@Slf4j
public class RabbitMQConfig {// 普通隊列和死信隊列的配置常量public static final String NORMAL_QUEUE = "normal.queue"; // 普通隊列名稱public static final String DLX_QUEUE = "dlx.queue"; // 死信隊列名稱public static final String NORMAL_EXCHANGE = "normal.exchange"; // 普通交換機名稱public static final String DLX_EXCHANGE = "dlx.exchange"; // 死信交換機名稱public static final String NORMAL_ROUTING_KEY = "normal.routing.key"; // 普通路由鍵public static final String DLX_ROUTING_KEY = "dlx.routing.key"; // 死信路由鍵/*** 創建RabbitMQ連接** @return Connection RabbitMQ連接對象* @throws Exception*/public static Connection createConnection() throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("xxxx"); // 設置RabbitMQ服務器地址factory.setPort(5672); // 設置RabbitMQ服務器端口factory.setUsername("xxxx"); // 設置用戶名factory.setPassword("xxxx"); // 設置密碼return factory.newConnection(); // 創建并返回新的連接}/*** 初始化RabbitMQ的隊列和交換機* 包括:* 1. 刪除已存在的隊列和交換機* 2. 聲明死信交換機和隊列* 3. 聲明普通交換機和隊列* 4. 設置隊列的死信參數* 5. 綁定隊列和交換機** @throws Exception*/public static void init() throws Exception {try (Connection connection = createConnection();Channel channel = connection.createChannel()) {// 刪除已存在的隊列和交換機try {channel.queueDelete(NORMAL_QUEUE);channel.queueDelete(DLX_QUEUE);channel.exchangeDelete(NORMAL_EXCHANGE);channel.exchangeDelete(DLX_EXCHANGE);} catch (Exception e) {// 忽略刪除不存在的隊列或交換機時的錯誤log.warn("刪除隊列或交換機時出錯(可能是首次創建): {}", e.getMessage());}// 聲明死信交換機,類型為direct,持久化channel.exchangeDeclare(DLX_EXCHANGE, "direct", true);// 聲明死信隊列,持久化channel.queueDeclare(DLX_QUEUE, true, false, false, null);// 將死信隊列綁定到死信交換機,使用死信路由鍵channel.queueBind(DLX_QUEUE, DLX_EXCHANGE, DLX_ROUTING_KEY);// 聲明普通交換機,類型為direct,持久化channel.exchangeDeclare(NORMAL_EXCHANGE, "direct", true);// 設置普通隊列的死信參數Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", DLX_EXCHANGE); // 設置死信交換機args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY); // 設置死信路由鍵// 聲明普通隊列,并應用死信參數channel.queueDeclare(NORMAL_QUEUE, true, false, false, args);// 將普通隊列綁定到普通交換機,使用普通路由鍵channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, NORMAL_ROUTING_KEY);}}
}
消息生產者
/*** 消息生產者類* 負責向RabbitMQ發送消息*/
@Slf4j
public class MessageProducer {/*** 發送消息到普通隊列* 該方法會:* 1. 創建RabbitMQ連接和通道* 2. 將消息發布到普通交換機* 3. 使用try-with-resources自動關閉連接和通道* * @param message 要發送的消息內容* @throws Exception */public void sendMessage(String message) throws Exception {// 使用try-with-resources自動管理連接和通道的關閉try (Connection connection = RabbitMQConfig.createConnection();Channel channel = connection.createChannel()) {// 打印發送的消息內容log.info("發送消息: {}", message);// 發布消息到普通交換機// 參數說明:// 1. 交換機名稱// 2. 路由鍵// 3. 消息屬性(這里為null表示使用默認屬性)// 4. 消息內容(轉換為字節數組)channel.basicPublish(RabbitMQConfig.NORMAL_EXCHANGE,RabbitMQConfig.NORMAL_ROUTING_KEY,null,message.getBytes());}}/*** 發送帶TTL的消息到普通隊列* 該方法會:* 1. 創建RabbitMQ連接和通道* 2. 設置消息的TTL屬性* 3. 將消息發布到普通交換機* 4. 使用try-with-resources自動關閉連接和通道* * @param message 要發送的消息內容* @param ttl 消息的過期時間(毫秒)* @throws Exception 如果發送過程中出現錯誤則拋出異常*/public void sendMessageWithTTL(String message, int ttl) throws Exception {// 使用try-with-resources自動管理連接和通道的關閉try (Connection connection = RabbitMQConfig.createConnection();Channel channel = connection.createChannel()) {// 設置消息屬性,包括TTLAMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration(String.valueOf(ttl)).build();// 打印發送的消息內容log.info("發送消息: {}, TTL: {}ms", message, ttl);// 發布消息到普通交換機// 參數說明:// 1. 交換機名稱// 2. 路由鍵// 3. 消息屬性(包含TTL)// 4. 消息內容(轉換為字節數組)channel.basicPublish(RabbitMQConfig.NORMAL_EXCHANGE,RabbitMQConfig.NORMAL_ROUTING_KEY,properties,message.getBytes());}}
}
消息消費者
/*** 消息消費者類* 負責從普通隊列和死信隊列中消費消息*/
@Slf4j
public class MessageConsumer {/*** 消費普通隊列中的消息* 該方法會:* 1. 創建RabbitMQ連接和通道* 2. 設置預取計數為1,確保公平分發* 3. 創建消費者回調處理消息* 4. 確認消息處理完成** @throws Exception 異常*/public void consumeNormalQueue() throws Exception {// 創建RabbitMQ連接和通道Connection connection = RabbitMQConfig.createConnection();Channel channel = connection.createChannel();// 設置預取計數為1,確保公平分發,避免某個消費者處理過多消息channel.basicQos(1);// 創建普通隊列消費者回調DeliverCallback deliverCallback = (consumerTag, delivery) -> {// 獲取消息內容String message = new String(delivery.getBody(), StandardCharsets.UTF_8);log.info("收到普通隊列消息: {}", message);// 模擬消息處理耗時try {Thread.sleep(20000);} catch (InterruptedException e) {throw new RuntimeException(e);}// 確認消息處理完成channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 開始消費普通隊列// 參數說明:// 1. 隊列名稱// 2. 是否自動確認消息(false表示手動確認)// 3. 消息處理回調// 4. 消費者取消回調(這里為空實現)channel.basicConsume(RabbitMQConfig.NORMAL_QUEUE, false, deliverCallback, consumerTag -> {});}/*** 消費死信隊列中的消息* 該方法會:* 1. 創建RabbitMQ連接和通道* 2. 設置預取計數為1,確保公平分發* 3. 創建消費者回調處理消息* 4. 確認消息處理完成** @throws Exception 異常*/public void consumeDlxQueue() throws Exception {// 創建RabbitMQ連接和通道Connection connection = RabbitMQConfig.createConnection();Channel channel = connection.createChannel();// 設置預取計數為1,確保公平分發channel.basicQos(1);// 創建死信隊列消費者回調DeliverCallback deliverCallback = (consumerTag, delivery) -> {// 獲取消息內容String message = new String(delivery.getBody(), StandardCharsets.UTF_8);log.info("收到死信隊列消息: {}", message);// 確認消息處理完成channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 開始消費死信隊列// 參數說明:// 1. 隊列名稱// 2. 是否自動確認消息(false表示手動確認)// 3. 消息處理回調// 4. 消費者取消回調(這里為空實現)channel.basicConsume(RabbitMQConfig.DLX_QUEUE, false, deliverCallback, consumerTag -> {});}
}
測試
@Slf4j
public class DLXTest {private static final int THREAD_COUNT = 1; // 并發線程數private static final int MESSAGE_COUNT = 2; // 每個線程發送的消息數/*** 主方法,執行死信隊列測試流程* 測試流程:* 1. 初始化RabbitMQ的隊列和交換機* 2. 創建生產者和消費者實例* 3. 啟動普通隊列和死信隊列的消費者線程* 4. 使用線程池發送測試消息* 5. 等待消息處理完成** @param args* @throws Exception*/public static void main(String[] args) throws Exception {// 初始化RabbitMQ的隊列和交換機RabbitMQConfig.init();// 創建生產者和消費者實例MessageProducer producer = new MessageProducer();MessageConsumer consumer = new MessageConsumer();// 啟動普通隊列消費者線程new Thread(() -> {try {consumer.consumeNormalQueue();} catch (Exception e) {log.error("普通隊列消費者異常", e);}}).start();// 啟動死信隊列消費者線程new Thread(() -> {try {consumer.consumeDlxQueue();} catch (Exception e) {log.error("死信隊列消費者異常", e);}}).start();// 創建線程池和計數器ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);CountDownLatch latch = new CountDownLatch(THREAD_COUNT);// 提交任務到線程池for (int i = 0; i < THREAD_COUNT; i++) {final int threadId = i;executorService.submit(() -> {try {// 每個線程發送MESSAGE_COUNT條消息for (int j = 0; j < MESSAGE_COUNT; j++) {// 隨機生成消息TTL(1-30秒)int ttl = (int) (Math.random() * 30000) + 1000;String message = String.format("消息-線程%d-第%d條 (消息TTL: %dms)", threadId + 1, j + 1, ttl);producer.sendMessageWithTTL(message, ttl);// 隨機延遲0-100ms,模擬真實場景Thread.sleep((long) (Math.random() * 100));}} catch (Exception e) {log.error("發送消息異常", e);} finally {latch.countDown();}});}// 等待所有消息發送完成latch.await();log.info("所有消息已發送完成");// 關閉線程池executorService.shutdown();executorService.awaitTermination(1, TimeUnit.MINUTES);// 保持程序運行,等待消息處理完成Thread.sleep(60000);log.info("測試完成");}
}
從結果可以看出,第一條消息 ttl 為 28301ms,被普通消費者進行消費,而產生的第二條消息得到 ttl 為 4332ms,由于第一條消息在消費時耗時較久,在此期間 第二條消息已經過期,不得不進入死信隊列,由死信消費者進行處理,從前面的日志時間也可以看出,剛好間隔 4s 左右。