【RabbitMQ】保證消息不丟失

要確保 RabbitMQ 在消費者(Python 服務)重啟或掛掉時消息不丟失,需結合 消息持久化確認機制(ACK)死信隊列(DLX) 實現高可靠性:


1. 消息持久化(Durability)

確保消息和隊列在 RabbitMQ 服務重啟后仍存在:

Java 發布者(設置持久化)
// 創建持久化隊列 + 持久化消息
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 聲明持久化隊列(durable=true)channel.queueDeclare("image_queue", true, false, false, null);// 發布持久化消息(deliveryMode=2)String message = "{\"task_id\": \"123\"}";channel.basicPublish("", "image_queue", new AMQP.BasicProperties.Builder().deliveryMode(2) // 持久化消息.build(),message.getBytes());System.out.println("消息已持久化發送");
}
Python 消費者(聲明持久化隊列)
channel.queue_declare(queue='image_queue', durable=True)  # durable=True

2. 手動確認(Manual ACK)

消費者處理完消息后顯式發送 ACK,避免消息因崩潰丟失:

Python 消費者(關閉自動 ACK)
def callback(ch, method, properties, body):try:print(f"處理消息: {body.decode()}")# 業務邏輯...ch.basic_ack(delivery_tag=method.delivery_tag)  # 手動確認except Exception as e:print(f"處理失敗: {e}")ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)  # 重新入隊channel.basic_consume(queue='image_queue', on_message_callback=callback, auto_ack=False)  # 關閉自動ACK

關鍵點

  • auto_ack=False:必須關閉自動確認。
  • 處理成功后調用 basic_ack,失敗時 basic_nack 并重新入隊。

3. 死信隊列(DLX)

處理因消費者崩潰或消息超時未被確認的消息:

Java 發布者(聲明死信交換機和隊列)
// 定義死信交換機和隊列
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx_exchange");  // 死信交換機
args.put("x-message-ttl", 60000);  // 消息存活時間(可選)channel.queueDeclare("image_queue", true, false, false, args);
channel.exchangeDeclare("dlx_exchange", "direct");
channel.queueDeclare("dlx_queue", true, false, false, null);
channel.queueBind("dlx_queue", "dlx_exchange", "");
Python 消費者(監聽死信隊列)
channel.queue_declare(queue='dlx_queue', durable=True)
channel.basic_consume(queue='dlx_queue', on_message_callback=handle_dlx_message, auto_ack=False)

作用

  • 消息超過 TTL 或消費者拒絕時,自動路由到死信隊列。
  • 可對死信消息進行補償處理(如重試或記錄日志)。

4. 消費者高可用(HA)

確保消費者服務崩潰后能自動恢復:

方案 1:進程守護(如 systemd)
# /etc/systemd/system/python_consumer.service
[Unit]
Description=Python RabbitMQ Consumer
After=network.target[Service]
User=root
WorkingDirectory=/opt/app
ExecStart=/usr/bin/python3 /opt/app/consumer.py
Restart=always  # 崩潰后自動重啟[Install]
WantedBy=multi-user.target
方案 2:容器化(Docker)
# Dockerfile
FROM python:3.9
COPY consumer.py /app/
CMD ["python", "/app/consumer.py"]
docker run -d --restart=unless-stopped my_consumer

5. 消息補償機制(可選)

極端情況下(如 RabbitMQ 崩潰),可通過數據庫記錄消息狀態:

Java 發布者(本地事務)
// 偽代碼:本地事務
try {saveToDatabase(task);  // 1. 先存數據庫sendToRabbitMQ(task); // 2. 再發消息commitTransaction();
} catch (Exception e) {rollbackTransaction();// 定時任務掃描數據庫補償發送
}

完整流程圖

Java Publisher RabbitMQ Python Consumer DLX 發布持久化消息 (deliveryMode=2) 推送消息 (auto_ack=false) basic_ack() basic_nack(requeue=true) 重新投遞 alt [處理成功] [處理失敗或崩潰] 轉入死信隊列 觸發補償邏輯 alt [消息超時或多次失敗] Java Publisher RabbitMQ Python Consumer DLX

最佳實踐總結

措施實現方式作用
消息持久化deliveryMode=2 + queueDeclare(durable=true)防止 RabbitMQ 重啟丟失消息
手動 ACKauto_ack=false + basic_ack()/basic_nack()確保消息處理完成才刪除
死信隊列(DLX)x-dead-letter-exchange + 獨立死信消費者處理超時或失敗消息
消費者高可用systemd Restart=alwaysdocker --restart=unless-stopped消費者崩潰后自動恢復
消息補償數據庫記錄 + 定時任務掃描極端情況下的兜底措施

通過以上組合方案,可確保消息在消費者崩潰、重啟或 RabbitMQ 異常時不丟失、不重復、可恢復

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/78026.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/78026.shtml
英文地址,請注明出處:http://en.pswp.cn/web/78026.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Python基本語法(控制語句)

#控制語句 Python語言的控制語句和其他編程語言類似&#xff0c;常用的有if…else、while、for語句。 案例2一7控制語句 第1組代碼&#xff0c;說明if-else語句&#xff1a; #1 print(\n1,if) x,y,z10,20,5 if x>y:print(x>y) else:print(x<y)輸出結果: 1,if x<…

并發設計模式實戰系列(10):Balking(猶豫模式)

&#x1f31f; 大家好&#xff0c;我是摘星&#xff01; &#x1f31f; 今天為大家帶來的是并發設計模式實戰系列&#xff0c;第10章Balking&#xff08;猶豫模式&#xff09;&#xff0c;廢話不多說直接開始~ 目錄 一、核心原理深度拆解 1. 狀態守護機制 2. 與狀態模式的…

【強化學習系列】貝爾曼方程

首先回顧狀態價值函數和動作價值函數的定義&#xff1a; 狀態價值函數 v π ( s ) v_\pi(s) vπ?(s)是從狀態 s s s出發&#xff0c;直至一幕結束后獲得的回報的期望值 動作價值函數 q π ( s , a ) q_\pi(s,a) qπ?(s,a)是從狀態 s s s出發&#xff0c;采取動作 a a a后&…

donet使用指定版本sdk

ps:來自微軟官方方案,實測可行,就是在項目任意目錄下在新建 global.json,并配置sdk版本 SDK 使用最新安裝的版本 SDK 命令包括 dotnet new 和 dotnet run。 .NET CLI 必須為每個 dotnet 命令選擇一個 SDK 版本。 即使在以下情況下&#xff0c;它也會默認使用計算機上安裝的最新…

x-cmd install | Orbiton:極簡至上的終端文本編輯器與輕量級 IDE

目錄 核心特點安裝適用場景優勢 厭倦了臃腫復雜的 IDE&#xff1f;渴望一個輕巧、快速、專注的編碼環境&#xff1f;Orbiton&#xff0c;一款極簡主義的終端文本編輯器與輕量級 IDE&#xff0c;將帶給你前所未有的編碼體驗。 核心特點 極簡主義&#xff0c;專注編碼&#xff1…

大腦、機器人與貝葉斯信念及AI推理

在機器不再局限于重復性任務的世界里&#xff0c;機器人技術已經大膽地邁入了感知、學習和決策的領域。這篇文章探討了智能機器人系統是如何構建的——從理解它們嘈雜的傳感器和不確定的環境&#xff0c;到使它們能夠做出明智的選擇并隨著時間的推移調整自己的行為。 AI推理 …

線上婚戀相親小程序源碼介紹

?基于ThinkPHP、FastAdmin和UniApp開發的線上婚戀相親小程序源碼&#xff0c;這款小程序源碼采用了ThinkPHP作為后端框架&#xff0c;其強大的功能與良好的擴展性為程序的穩定運行提供了保障。 ?FastAdmin作為后臺管理框架&#xff0c;使得管理員能夠便捷地對用戶信息、相親…

長短期記憶(LSTM)簡介

RNN 的主要限制在于它無法記住很長的序列&#xff0c;并且會陷入梯度消失的問題。 什么是梯度消失問題&#xff1f; 當添加更多具有某些激活函數的層時&#xff0c;神經網絡中損失函數的梯度趨近于零&#xff0c;這使得網絡難以訓練。 長短期記憶&#xff08;LSTM&#xff09;…

JESD204B 探究

JESD204B協議是高速串行接口標準,主要用于ADC/DAC與邏輯器件(如FPGA)之間的數據傳輸。以下為綜合解析: 一、協議概述 ?核心作用? 通過高速SERDES技術實現數模轉換器與邏輯器件間的高效數據傳輸,支持多通道同步和確定性延遲,適用于GB級吞吐量場景23。?版本演進? JESD2…

Flutter PIP 插件 ---- 新增PipActivity,Android 11以下支持自動進入PIP Mode

接上文 Flutter PIP 插件 ---- Android 項目地址 PIP&#xff0c; pub.dev也已經同步發布 pip 0.0.3&#xff0c;你的加星和點贊&#xff0c;將是我繼續改進最大的動力 開發文檔 Add videos using picture-in-picture (PiP)介紹PIP功能從 Android 8.0 (API level 26) 引入&…

【Java開發日記】6個Java 工具,輕松分析定位 JVM 問題 !

目錄 使用 JDK 自帶工具查看 JVM 情況 jps jinfo jvisualvm jcm 使用 JDK 自帶工具查看 JVM 情況 JDK 自帶了很多命令行甚至是圖形界面工具&#xff0c;幫助查看 JVM 的一些信息。比如&#xff0c;在機器上運行 ls 命令&#xff0c;可以看到 JDK 8 提供了非常多的工具或程…

動態規劃簡單題2

leetcode91題&#xff08;解碼方法&#xff09; 分析題目&#xff1a; 1.這是一種解碼&#xff0c;就是給多個數字組成的字符串&#xff0c;把這些數字解碼成字母&#xff0c;看看一共有多少種 2.如果一個數字前有前導0就不合法&#xff0c;比如06&#xff0c;這與6不同&…

(007)Excel 公式的使用

文章目錄 邏輯運算公式的參數常用函數引用方式引用工作表和工作簿表格的引用修改公式的計算時機區域交叉引用 邏輯運算 公式的參數 單元格引用&#xff1a;SUM(A1:A24)。字面值&#xff1a;SQRT(121)。字面文本字符串&#xff1a;PROPER(“john.f.smith”)。表達式&#xff1a…

Unity 和 Unreal Engine(UE) 兩大主流游戲引擎的核心使用方法

以下是 Unity 和 Unreal Engine&#xff08;UE&#xff09; 兩大主流游戲引擎的核心使用方法和對比分析&#xff0c;幫助開發者快速上手并根據項目需求選擇合適工具&#xff1a; 一、Unity 使用指南 1. 安裝與配置 安裝&#xff1a;從 Unity Hub 下載&#xff0c;選擇長期支持…

猜數字游戲:從數學原理到交互體驗的完整設計指南

目錄 猜數字游戲&#xff1a;從數學原理到交互體驗的完整設計指南引言第一章 游戲數學原理1.1 均勻分布與隨機生成1.2 最優猜測策略 第二章 游戲系統設計2.1 核心架構2.2 動態難度系統 第三章 交互設計細節3.1 輸入驗證系統3.2 漸進式提示機制 第四章 進階功能設計4.1 智能輔導…

2025工業大模型白皮書 | 螞蟻工廠北京航空航天大學聯合出品

由螞蟻工廠與北京航空航天大學聯合發布的《2025工業大模型白皮書》是一部針對工業領域大模型技術發展的前瞻性研究報告。該白皮書系統梳理了工業大模型的技術演進、核心應用場景、關鍵挑戰及未來發展趨勢&#xff0c;旨在為制造業數字化轉型提供理論支撐和實踐指南。作為產學研…

JavaWeb:后端web基礎(TomcatServletHTTP)

一、今日內容 二、Tomcat 介紹與使用 介紹 基本使用 小結 配置 配置 查找進程 三、Servlet 什么是Servlet 快速入門 需求 步驟 1.新建工程-模塊&#xff08;Maven&#xff09; 2.修改打包方式-war 3.編寫代碼 /*** 可以選擇繼承HttpServlet*/ WebServlet("/hello&q…

構建現代分布式云架構的三大支柱:服務化、Service Mesh 與 Serverless

目錄 前言1. 服務化架構模式&#xff1a;構建可擴展的基礎單元1.1 服務化的定義與演進1.2 在分布式云中的價值1.3 面臨的挑戰 2. Service Mesh 架構&#xff1a;服務通信的治理中樞2.1 什么是 Service Mesh&#xff1f;2.2 功能與優勢2.3 在分布式云中的角色2.4 落地難點 3. Se…

嵌入式C語言的運算符與輸入輸出

目錄 1. 運算符 1.1 位運算符 1.1.1 位運算 ~ 1.1.2 位邏輯與 & 1.1.3 位邏輯或 | 1.1.4 位邏輯異或 ^ 1.1.5 位移位運算 1.1.6 將無符號位的某位快速置 1 1.2 三目運算符 1.3 逗號運算符 1.4 運算符優先級 2. 輸出 2.1 字符輸出函數 2.2 格式輸出函數 2.3 字符…

IPD研學:76頁頁基于IPD思想-華為需求管理培訓方案【附全文閱讀】

適應人群 本方案適用于企業中參與產品研發、市場、銷售、項目管理等部門的人員,尤其是負責需求管理工作的相關從業者;致力于提升產品競爭力,對優化需求管理流程、提高產品開發質量感興趣的企業管理者;以及希望了解行業前沿需求管理方法,尋求突破和創新的相關人士。…