什么是死信隊列?死信隊列是如何導致的?

死信交換機(Dead Letter Exchange,DLX)

  • 定義:死信交換機是一種特殊的交換機,專門用于**接收從其他隊列中因特定原因變成死信的消息**。它的本質還是交換機,遵循RabbitMQ中交換機的基本工作原理,如根據路由規則將消息發送到綁定的隊列。
  • 作用:為死信提供一個集中處理的入口點。通過將死信發送到死信交換機,再由其路由到相應的死信隊列,可以方便地對這些異常消息進行統一管理和處理,確保數據不丟失。

死信隊列(Dead Letter Queue,DLQ)

  • 定義:死信隊列用于存儲那些無法在正常流程中被消費的消息,即死信。這些消息進入死信隊列后,可以后續進行分析、重試或其他特殊處理。
  • 產生死信的原因
    • 消息被拒絕且不重新入隊:消費者調用basic.rejectbasic.nack方法拒絕消息,并將requeue參數設置為false,表明該消息不再重新放回原隊列等待消費,從而成為死信。
    • 消息過期:可以為消息或隊列設置生存時間(TTL,Time-To-Live)。當消息在隊列中的存活時間超過設定的TTL值時,消息就會過期成為死信。消息的TTL既可以在發送消息時針對單條消息設置,也可以在聲明隊列時對隊列中的所有消息統一設置。
    • 隊列達到最大長度:當為隊列設置了最大長度(Max-Length),并且隊列中的消息數量達到這個上限時,新進入的消息會被丟棄成為死信。

代碼舉例

下面將用代碼舉例,由于消息過期而進入死信隊列

初始化RabbitMQ的連接配置、隊列和交換機的聲明

/*** RabbitMQ配置類* 負責管理RabbitMQ的連接配置、隊列和交換機的聲明*/
@Slf4j
public class RabbitMQConfig {// 普通隊列和死信隊列的配置常量public static final String NORMAL_QUEUE = "normal.queue";      // 普通隊列名稱public static final String DLX_QUEUE = "dlx.queue";           // 死信隊列名稱public static final String NORMAL_EXCHANGE = "normal.exchange"; // 普通交換機名稱public static final String DLX_EXCHANGE = "dlx.exchange";     // 死信交換機名稱public static final String NORMAL_ROUTING_KEY = "normal.routing.key"; // 普通路由鍵public static final String DLX_ROUTING_KEY = "dlx.routing.key";      // 死信路由鍵/*** 創建RabbitMQ連接** @return Connection RabbitMQ連接對象* @throws Exception*/public static Connection createConnection() throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("xxxx");    // 設置RabbitMQ服務器地址factory.setPort(5672);           // 設置RabbitMQ服務器端口factory.setUsername("xxxx");    // 設置用戶名factory.setPassword("xxxx");    // 設置密碼return factory.newConnection();  // 創建并返回新的連接}/*** 初始化RabbitMQ的隊列和交換機* 包括:* 1. 刪除已存在的隊列和交換機* 2. 聲明死信交換機和隊列* 3. 聲明普通交換機和隊列* 4. 設置隊列的死信參數* 5. 綁定隊列和交換機** @throws Exception*/public static void init() throws Exception {try (Connection connection = createConnection();Channel channel = connection.createChannel()) {// 刪除已存在的隊列和交換機try {channel.queueDelete(NORMAL_QUEUE);channel.queueDelete(DLX_QUEUE);channel.exchangeDelete(NORMAL_EXCHANGE);channel.exchangeDelete(DLX_EXCHANGE);} catch (Exception e) {// 忽略刪除不存在的隊列或交換機時的錯誤log.warn("刪除隊列或交換機時出錯(可能是首次創建): {}", e.getMessage());}// 聲明死信交換機,類型為direct,持久化channel.exchangeDeclare(DLX_EXCHANGE, "direct", true);// 聲明死信隊列,持久化channel.queueDeclare(DLX_QUEUE, true, false, false, null);// 將死信隊列綁定到死信交換機,使用死信路由鍵channel.queueBind(DLX_QUEUE, DLX_EXCHANGE, DLX_ROUTING_KEY);// 聲明普通交換機,類型為direct,持久化channel.exchangeDeclare(NORMAL_EXCHANGE, "direct", true);// 設置普通隊列的死信參數Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", DLX_EXCHANGE);     // 設置死信交換機args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY); // 設置死信路由鍵// 聲明普通隊列,并應用死信參數channel.queueDeclare(NORMAL_QUEUE, true, false, false, args);// 將普通隊列綁定到普通交換機,使用普通路由鍵channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, NORMAL_ROUTING_KEY);}}
} 

消息生產者

/*** 消息生產者類* 負責向RabbitMQ發送消息*/
@Slf4j
public class MessageProducer {/*** 發送消息到普通隊列* 該方法會:* 1. 創建RabbitMQ連接和通道* 2. 將消息發布到普通交換機* 3. 使用try-with-resources自動關閉連接和通道* * @param message 要發送的消息內容* @throws Exception */public void sendMessage(String message) throws Exception {// 使用try-with-resources自動管理連接和通道的關閉try (Connection connection = RabbitMQConfig.createConnection();Channel channel = connection.createChannel()) {// 打印發送的消息內容log.info("發送消息: {}", message);// 發布消息到普通交換機// 參數說明:// 1. 交換機名稱// 2. 路由鍵// 3. 消息屬性(這里為null表示使用默認屬性)// 4. 消息內容(轉換為字節數組)channel.basicPublish(RabbitMQConfig.NORMAL_EXCHANGE,RabbitMQConfig.NORMAL_ROUTING_KEY,null,message.getBytes());}}/*** 發送帶TTL的消息到普通隊列* 該方法會:* 1. 創建RabbitMQ連接和通道* 2. 設置消息的TTL屬性* 3. 將消息發布到普通交換機* 4. 使用try-with-resources自動關閉連接和通道* * @param message 要發送的消息內容* @param ttl 消息的過期時間(毫秒)* @throws Exception 如果發送過程中出現錯誤則拋出異常*/public void sendMessageWithTTL(String message, int ttl) throws Exception {// 使用try-with-resources自動管理連接和通道的關閉try (Connection connection = RabbitMQConfig.createConnection();Channel channel = connection.createChannel()) {// 設置消息屬性,包括TTLAMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration(String.valueOf(ttl)).build();// 打印發送的消息內容log.info("發送消息: {}, TTL: {}ms", message, ttl);// 發布消息到普通交換機// 參數說明:// 1. 交換機名稱// 2. 路由鍵// 3. 消息屬性(包含TTL)// 4. 消息內容(轉換為字節數組)channel.basicPublish(RabbitMQConfig.NORMAL_EXCHANGE,RabbitMQConfig.NORMAL_ROUTING_KEY,properties,message.getBytes());}}
} 

消息消費者

/*** 消息消費者類* 負責從普通隊列和死信隊列中消費消息*/
@Slf4j
public class MessageConsumer {/*** 消費普通隊列中的消息* 該方法會:* 1. 創建RabbitMQ連接和通道* 2. 設置預取計數為1,確保公平分發* 3. 創建消費者回調處理消息* 4. 確認消息處理完成** @throws Exception 異常*/public void consumeNormalQueue() throws Exception {// 創建RabbitMQ連接和通道Connection connection = RabbitMQConfig.createConnection();Channel channel = connection.createChannel();// 設置預取計數為1,確保公平分發,避免某個消費者處理過多消息channel.basicQos(1);// 創建普通隊列消費者回調DeliverCallback deliverCallback = (consumerTag, delivery) -> {// 獲取消息內容String message = new String(delivery.getBody(), StandardCharsets.UTF_8);log.info("收到普通隊列消息: {}", message);// 模擬消息處理耗時try {Thread.sleep(20000);} catch (InterruptedException e) {throw new RuntimeException(e);}// 確認消息處理完成channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 開始消費普通隊列// 參數說明:// 1. 隊列名稱// 2. 是否自動確認消息(false表示手動確認)// 3. 消息處理回調// 4. 消費者取消回調(這里為空實現)channel.basicConsume(RabbitMQConfig.NORMAL_QUEUE, false, deliverCallback, consumerTag -> {});}/*** 消費死信隊列中的消息* 該方法會:* 1. 創建RabbitMQ連接和通道* 2. 設置預取計數為1,確保公平分發* 3. 創建消費者回調處理消息* 4. 確認消息處理完成** @throws Exception 異常*/public void consumeDlxQueue() throws Exception {// 創建RabbitMQ連接和通道Connection connection = RabbitMQConfig.createConnection();Channel channel = connection.createChannel();// 設置預取計數為1,確保公平分發channel.basicQos(1);// 創建死信隊列消費者回調DeliverCallback deliverCallback = (consumerTag, delivery) -> {// 獲取消息內容String message = new String(delivery.getBody(), StandardCharsets.UTF_8);log.info("收到死信隊列消息: {}", message);// 確認消息處理完成channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 開始消費死信隊列// 參數說明:// 1. 隊列名稱// 2. 是否自動確認消息(false表示手動確認)// 3. 消息處理回調// 4. 消費者取消回調(這里為空實現)channel.basicConsume(RabbitMQConfig.DLX_QUEUE, false, deliverCallback, consumerTag -> {});}
} 

測試

@Slf4j
public class DLXTest {private static final int THREAD_COUNT = 1;  // 并發線程數private static final int MESSAGE_COUNT = 2; // 每個線程發送的消息數/*** 主方法,執行死信隊列測試流程* 測試流程:* 1. 初始化RabbitMQ的隊列和交換機* 2. 創建生產者和消費者實例* 3. 啟動普通隊列和死信隊列的消費者線程* 4. 使用線程池發送測試消息* 5. 等待消息處理完成** @param args* @throws Exception*/public static void main(String[] args) throws Exception {// 初始化RabbitMQ的隊列和交換機RabbitMQConfig.init();// 創建生產者和消費者實例MessageProducer producer = new MessageProducer();MessageConsumer consumer = new MessageConsumer();// 啟動普通隊列消費者線程new Thread(() -> {try {consumer.consumeNormalQueue();} catch (Exception e) {log.error("普通隊列消費者異常", e);}}).start();// 啟動死信隊列消費者線程new Thread(() -> {try {consumer.consumeDlxQueue();} catch (Exception e) {log.error("死信隊列消費者異常", e);}}).start();// 創建線程池和計數器ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);CountDownLatch latch = new CountDownLatch(THREAD_COUNT);// 提交任務到線程池for (int i = 0; i < THREAD_COUNT; i++) {final int threadId = i;executorService.submit(() -> {try {// 每個線程發送MESSAGE_COUNT條消息for (int j = 0; j < MESSAGE_COUNT; j++) {// 隨機生成消息TTL(1-30秒)int ttl = (int) (Math.random() * 30000) + 1000;String message = String.format("消息-線程%d-第%d條 (消息TTL: %dms)", threadId + 1, j + 1, ttl);producer.sendMessageWithTTL(message, ttl);// 隨機延遲0-100ms,模擬真實場景Thread.sleep((long) (Math.random() * 100));}} catch (Exception e) {log.error("發送消息異常", e);} finally {latch.countDown();}});}// 等待所有消息發送完成latch.await();log.info("所有消息已發送完成");// 關閉線程池executorService.shutdown();executorService.awaitTermination(1, TimeUnit.MINUTES);// 保持程序運行,等待消息處理完成Thread.sleep(60000);log.info("測試完成");}
} 

從結果可以看出,第一條消息 ttl 為 28301ms,被普通消費者進行消費,而產生的第二條消息得到 ttl 為 4332ms,由于第一條消息在消費時耗時較久,在此期間 第二條消息已經過期,不得不進入死信隊列,由死信消費者進行處理,從前面的日志時間也可以看出,剛好間隔 4s 左右。

在這里插入圖片描述

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/82256.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/82256.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/82256.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

9. 從《蜀道難》學CSS基礎:三種選擇器的實戰解析

引言&#xff1a;當古詩遇上現代網頁設計 今天我們通過李白的經典詩作《蜀道難》來學習CSS的三種核心選擇器。這種古今結合的學習方式&#xff0c;既能感受中華詩詞的魅力&#xff0c;又能掌握實用的網頁設計技能。讓我們開始這場穿越時空的技術之旅吧&#xff01; 一、HTML骨架…

三角網格減面算法及其代表的算法庫都有哪些?

以下是三角網格減面算法及其代表庫/工具的詳細分類&#xff0c;涵蓋經典算法和現代實現&#xff1a; ??1. 頂點聚類&#xff08;Vertex Clustering&#xff09;?? ??原理??&#xff1a;將網格空間劃分為體素柵格&#xff0c;合并每個柵格內的頂點。??特點??&#…

URP - 屏幕圖像(_CameraOpaqueTexture)

首先需要在unity中開啟屏幕圖像開關才可以使用該紋理 同樣只有不透明對象才能被渲染到屏幕圖像中 若想要該對象不被渲染到屏幕圖像中&#xff0c;可以將其Shader的渲染隊列改為 "Queue" "Transparent" 如何在Shader中使用_CameraOpaqueTexture&#xf…

vue 和 html 的區別

使用 Vue.js 和原生 HTML 開發 Web 應用有顯著的區別&#xff0c;主要體現在開發模式、功能擴展、性能優化和維護性等方面。以下是兩者的對比分析&#xff1a; &#x1f9f1; 原生 HTML&#xff08;HTML CSS JavaScript&#xff09; 特點&#xff1a; 靜態結構&#xff1a;H…

LeetCode[226] 翻轉二叉樹

思路&#xff1a; 使用遞歸&#xff0c;歸根結底還是左右節點互相倒&#xff0c;那么肯定需要一個temp節點在中間傳遞&#xff0c;最后就是遞歸&#xff0c;沒什么說的 代碼&#xff1a; /*** Definition for a binary tree node.* public class TreeNode {* int …

冪等的幾種解決方案以及實踐

目錄 什么是冪等&#xff1f; 解決冪等的常見解決方案&#xff1a; 唯一標識符案例 數據庫唯一約束 案例 樂觀鎖案例 分布式鎖&#xff08;Distributed Locking&#xff09; 實踐精選方案 首先 為什么不直接使用分布式鎖呢&#xff1f; 自定義實現冪等組件&#xff01…

PowerShell中的Json處理

1.定義JSON字符串變量 PS C:\WINDOWS\system32> $body {"Method": "POST","Body": {"model": "deepseek-r1","messages": [{"content": "why is the sky blue?","role"…

奧威BI:AI+BI深度融合,重塑智能AI數據分析新標桿

在數字化浪潮席卷全球的今天&#xff0c;企業正面臨著前所未有的數據挑戰與機遇。如何高效、精準地挖掘數據價值&#xff0c;已成為推動業務增長、提升競爭力的核心議題。奧威BI&#xff0c;作為智能AI數據分析領域的領軍者&#xff0c;憑借其創新的AIBI融合模式&#xff0c;正…

【Linux網絡】網絡協議基礎

網絡基礎 計算機網絡背景 獨立模式:計算機之間相互獨立 網絡互聯:多臺計算機連接在一起,完成數據共享 局域網LAN:計算機數量更多了,通過交換機和路由器連接在一起 廣域網WAN:將遠隔千里的計算機都連在一起 所謂"局域網"和"廣域網"只是一個相對的概念.比…

LabVIEW表面粗糙度測量及算法解析

在制造業和科研領域&#xff0c;表面粗糙度測量對保障產品質量、推動材料研究意義重大。表面粗糙度作為衡量工件表面加工質量的關鍵指標&#xff0c;直接影響著工件諸如磨損、密封、疲勞等機械性能。隨著技術的發展&#xff0c;LabVIEW 在表面粗糙度測量及數據處理中發揮著不可…

深入探索 JavaScript 中的模塊對象

引言 在現代 JavaScript 開發中&#xff0c;模塊化編程是一項至關重要的技術。它允許開發者將代碼拆分成多個獨立的模塊&#xff0c;每個模塊專注于單一功能&#xff0c;從而提高代碼的可維護性、可測試性和復用性。而模塊對象則是模塊化編程中的核心概念之一&#xff0c;它為…

Linux——Mysql數據庫

目錄 一&#xff0c;數據庫簡介 二&#xff0c;數據庫的基本概念 1&#xff0c;數據 2&#xff0c;數據庫和數據庫表 3&#xff0c;數據庫管理系統和數據庫系統 三&#xff0c;主流數據庫介紹 四&#xff0c;數據庫的兩大類型 1&#xff0c;關系型數據庫 主鍵 外鍵 2…

73頁最佳實踐PPT《DeepSeek自學手冊-從理論模型訓練到實踐模型應用》

這份文檔是一份關于 DeepSeek 自學手冊的詳細指南&#xff0c;涵蓋了 DeepSeek V3 和 R1 模型的架構、訓練方法、性能表現以及使用技巧等內容。它介紹了 DeepSeek V3 作為強大的 MoE 語言模型在數學、代碼等任務上的出色表現以及其訓練過程中的創新架構如多頭潛在注意力和多 To…

LabVIEW 2019 與 NI VISA 20.0 安裝及報錯處理

在使用 Windows 11 操作系統的電腦上&#xff0c;同時安裝了 LabVIEW 2019 32 位和 64 位版本的軟件。此前安裝的 NI VISA 2024 Q1 版&#xff0c;該版本與 LabVIEW 2019 32 位和 64 位不兼容&#xff0c;之后重新安裝了 NI VISA 20.0。從說明書來看&#xff0c;NI VISA 20.0 …

基于Centos7的DHCP服務器搭建

一、準備實驗環境&#xff1a; 克隆兩臺虛擬機 一臺作服務器&#xff1a;DHCP Server 一臺作客戶端&#xff1a;DHCP Clinet 二、部署服務器 在網絡模式為NAT下使用yum下載DHCP 需要管理員用戶權限才能下載&#xff0c;下載好后關閉客戶端&#xff0c;改NAT模式為僅主機模式…

最全盤點,趕緊收藏:2025 年全網最全的 Java 技術棧內容梳理(持續更新中)

大家好&#xff0c;我是栗箏i&#xff0c;是一個擁有 5 年經驗的 Java 開發工程師和技術博主&#xff0c;曾有多年在國內某大廠工作的經歷。從 2022 年 10 月份開始&#xff0c;我將持續梳理出全面的 Java 技術棧內容&#xff0c;一方面是對自己學習內容進行整合梳理&#xff0…

【項目實踐】boost 搜索引擎

1. 項目展示 boost搜索引擎具體講解視頻 2. 項目背景 對于boost庫&#xff0c;官方是沒有提供搜索功能的&#xff0c;我們這個項目就是來為它添加一個站內搜索的功能。 3. 項目環境與技術棧 ? 項目環境&#xff1a; ubuntu22.04、vscode ? 技術棧&#xff1a; C/C、C11、S…

一個簡單的MCP測試與debug

最近MCP挺火&#xff0c;我也跟著網上教程試試&#xff0c;參考如下&#xff0c;感謝原博主分享&#xff1a; https://zhuanlan.zhihu.com/p/1891227835722606201https://zhuanlan.zhihu.com/p/1891227835722606201 MCP是啥&#xff1f;技術原理是什么&#xff1f;一個視頻搞…

深度學習系統學習系列【7】之卷積神經網絡(CNN)

文章目錄 說明卷積神經網絡概述(Convolutional Neural Network,CNN)卷積神經網絡的應用圖像分類與識別圖像著色自然語言處理NLP卷積神經網絡的結構卷積神經網絡中的數據流動 CNN與ANN的關系 卷積操作Padding 操作滑動窗口卷積操作網絡卷積層操作矩陣快速卷積Im2col算法GEMM算法…

事務隔離(MySQL)

事務隔離 —— 為什么你改了我還看不見&#xff1f; 在數據庫中&#xff0c;事務&#xff08;Transaction&#xff09; 用于保證一組數據庫操作要么全部成功&#xff0c;要么全部失敗&#xff0c;這是一種原子性的操作機制。在 MySQL 中&#xff0c;事務的支持由存儲引擎層實現…