【RabbitMQ】高級特性—持久性、重試機制詳解

持久性

我們在前面說了消息端處理消息時,消息如何不丟失,但是如何保證當 RabbitMQ 服務器停掉之后,生產者發送的消息不丟失呢?

默認情況下,RabbitMQ 退出或者由于某種原因崩潰時,會忽視隊列和消息,除非告知他不要這么做

RabbitMQ 的持久化分為三個部分:

  • 交換器的持久化
  • 隊列的持久化
  • 消息的持久化

交換機持久化

交換機的持久化,是通過在聲明交換機時將 durable 參數置為 true 實現的

  • 相當于將交換機的屬性在服務器內部保存,當 MQ 的服務器發生意外或關閉之后,重啟 RabbitMQ 時不需要重新去建立交換機,交換機會自動建立,相當于一直存在
  • 如果交換器不設置持久化,那么在 RabbitMQ 服務器重啟之后,相關的交換機元數據會丟失,對一個長期使用的交換機來說,建議將其置為持久化的
ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build();

隊列持久化

隊列的持久化是通過在聲明隊列時將 durable 參數置為 true 實現的

  • 如果隊列不設置持久化,那么在 RabbitMQ 服務器重啟之后,該隊列就會被刪除掉,此時數據也會丟失(隊列沒有了,消息也無處可存了)
  • 隊列的持久化能保證該隊列本身的元數據不會因異常情況而丟失,但是并不能保證內部存儲的消息不會丟失
  • 要確保消息不會丟失,需要將消息設置為持久化

咱們前面用的創建隊列的方式都是持久化的

QueueBuilder.durable(COnstant.ACK_QUEUE).build();

點進去看源碼會發現,該方法默認 durabletrue

public static QueueBuilder durable() {  return durable(namingStrategy.generateName());  
}  private QueueBuilder setDurable() {  this.durable = true;  return this;  
}

通過下面代碼,可以創建非持久化的隊列

QueueBuilder.noDurable(Constant.ACK_QUEUE).build();

消息持久化

消息實現持久化,需要把消息的投遞模式 (MessageProperties 中的 deliveryMode)設置為 2,也就是 MessageDeliveryMode.PERSISTENT

public enum MessageDeliveryMode {  NON_PERSISTENT,  PERSISTENT;

設置了隊列和消息的持久化,當 RabbitMQ 服務器重啟之后,消息依舊存在。

  • 如果只設置隊列持久化,重啟之后消息會丟失
  • 如果只設置消息持久化,重啟之后隊列消息,繼而消息也丟失
    所以單單設置消息持久化而不設置隊列的持久化顯得毫無意義
// 非持久化信息
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());// 持久化信息
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAN, msg.getBytes());

MessageProperties.PERSISTENT_TEXT_PLAIN 實際就是封裝了這個屬性

    public static final BasicProperties PERSISTENT_TEXT_PLAIN =  new BasicProperties("text/plain",  null,  null,  2,  0, null, null, null,  null, null, null, null,  null, null);  
}

如果使用 RabbitTemplate 發送持久化消息,代碼如下:

// 要發送的消息內容
String message = "This is a persistent message";// 創建一個 Message 對象,設置為持久化
Message messagePbject = new Message(message.getBytes(), new MessageProperties());
messageObject.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);// 使用 RabbitTemplate 發送消息
rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "ack", messageObject);

  • RabbitMQ 默認情況下會將消息視為持久化,除非隊列被聲明為非持久化,或者消息在發送時被標記為非持久化
  • 我們也可以通過打印 Message 這個對象,來觀察消息是否持久化
(Body:'consumer ack test...' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=true, receivedExchange=ack_exchange, receivedRoutingKey=ack, deliveryTag=2, consumerTag=amq.ctag-mtd-2Mec9zH2fXizRqVAqg, consumerQueue=ack_queue])
  • 將所有的消息都設置為持久化,會嚴重影響 RabbitMQ 的性能(隨機)
  • 寫入磁盤的速度比寫入內存的速度慢得不只一點點,對于可靠性不是那么高的消息可以不采用持久化處理以提高整體的吞吐量
  • 在選擇是否要將消息持久化時,需要在可靠性和吞吐量之間做一個權衡

數據丟失

將交換器、隊列、消息都設置了持久化之后就能百分之百保證數據不丟失了嗎?答案是否定的

  1. 從消費者來說,如果在訂閱消息隊列時將 autoAck 參數設置為 true,那么當消費者接收到相關消息之后,還沒來得及處理就宕機了,這樣也算數據丟失。這種情況很好解決,將 autoAck 參數設置為 false,并進行手動確認,詳細可以參考(消息確認章節)

  2. 在持久化的消息正確存入 RabbitMQ 之后,還需要有一段時間(雖然很短,但是不可忽視)才能存入磁盤中。RabbitMQ 并不會為每條信息都進行同步存盤(調用內核的 fsync 方法)的處理,可能僅僅保存到操作系統緩存之中而不是物理磁盤之中。如果在這段時間內 RabbitMQ 服務器節點發生了宕機、重啟等異常情況,消息保存還沒來得及落盤,那么這些消息將會丟失


這個問題如何解決呢?

  1. 引入 RabbitMQ仲裁隊列(后面會說),如果主節點(master)在此特殊時間內掛掉,可以自動切換到從節點(slave),這樣有效地保證了高可用性,除非整個集群都掛掉

    • 此方法也不能保證 100% 可靠,但是配置了仲裁隊列要比沒有配置的可靠性要高很多,實際生產環境中的關鍵業務隊列一般都會設置仲裁隊列
  2. 還可以在發送端引入事務機制或者發送方確認機制來保證消息已經正確地發送病存儲至 RabbitMQ 中(詳情參考后面發送方確認)

重試機制

在消息傳遞過程中,可能會遇到各種問題,如網絡故障,服務不可用,資源不足等,這些問題可能導致消息處理失敗

為了解決這些問題,RabbitMQ 提供了重試機制,允許消息在處理失敗后重新發送

但如果是程序邏輯引起的錯誤,那么多次重試也是沒有用的,可以設置重試次數

1. 重試配置

spring:   rabbitmq:  addresses: amqp://guest:guest@127.0.0.1:5672/coding  listener:  simple:  acknowledge-mode: manual  # 消息接收確認  retry:  enabled: true  # 開啟消費者失敗重試  initial-interval: 5000ms  # 初始失敗等待時長為 5s          max-attempts: 5  # 最大重試次數(包括自身消費的一次)

2. 配置交換機&隊列

public static final String RETRY_EXCHANGE_NAME = "retry_exchange";  
public static final String RETRY_QUEUE = "retry_queue";
// 1. 交換機  
@Bean("retryExchange")  
public FanoutExchange retryExchange() {  return ExchangeBuilder.fanoutExchange(Constant.RETRY_EXCHANGE_NAME).durable(true).build();  
}  // 2. 隊列  
@Bean("retryQueue")  
public Queue retryQueue() {  return QueueBuilder.durable(Constant.RETRY_QUEUE).build();  
}  // 3. 隊列和交換機綁定 Binding@Bean("retryBinding")  
public Binding retryBinding(@Qualifier("retryExchange") FanoutExchange exchange, @Qualifier("retryQueue") Queue queue) {  return BindingBuilder.bind(queue).to(exchange);  
}

3. 發送消息

@RequestMapping("/retry")  
public String retry() {  rabbitTemplate.convertAndSend(Constant.RETRY_EXCHANGE_NAME, "", "retry test...");  return "發送成功";  
}

4. 消費消息

@Component  
public class RetryQueueListener {  // 指定監聽隊列的名稱  @RabbitListener(queues = Constant.RETRY_QUEUE)  public void Listener(Message message) throws Exception {  System.out.printf("接收到消息:%s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"),  message.getMessageProperties().getDeliveryTag());  // 模擬處理失敗  int num = 3 / 0;  System.out.println("處理完成");  }

5. 運行程序

運行程序,調用接口,發送消息: http://127.0.0.1:8080/producer/retry

[外鏈圖片轉存中…(img-7OSM9aK6-1752140853006)]

異常捕獲

如果對異常進行捕獲,那么就不會進行重試

  • 代碼修改如下
System.out.printf("接收到消息:%s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"),  message.getMessageProperties().getDeliveryTag());  
// 模擬處理失敗  
try {  int num = 3 / 0;  System.out.println("處理完成");  
} catch (Exception e) {  System.out.println("處理失敗");  
}

重新運行程序,結果如下:[外鏈圖片轉存中…(img-7hevkq3m-1752140853007)]

6. 手動確認

改為手動確認


@RabbitListener(queues = Constant.RETRY_QUEUE)  
public void ListenerQueue(Message message, Channel channel) throws Exception{  long deliveryTag = message.getMessageProperties().getDeliveryTag();  try {  System.out.printf("接收到消息:%s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"),  message.getMessageProperties().getDeliveryTag());  // 模擬處理失敗  int num = 3 / 0;  System.out.println("處理完成");  // 3. 手動簽收  channel.basicAck(deliveryTag, true);  } catch (Exception e) {  // 4. 異常了就拒絕簽收  Thread.sleep(1000);  // 第三個參數 requeue,是否重新發送。若為 true,則重發;若為 false,則直接丟棄  channel.basicNack(deliveryTag, true, true);  }  
}

運行結果:[外鏈圖片轉存中…(img-ne4pJWMZ-1752140853008)]

  • 可以看到,手動確認模式時,重試次數的限制不會像在自動確認模式下那樣直接生效,因為是否重試以及何時重試更多的取決于應用程序的邏輯和消費者的實現

自動確認模式下RabbitMQ 會在消息被投遞給消費者后自動確認消息。

  • 如果消費者處理消息時拋出異常,RabbitMQ 根據配置的重試參數自動將消息重新入隊,從而實現重試
  • 重試次數和重試間隔等參數可以直接在 RabbitMQ 的配置中設定,并且 RabbitMQ 會負責執行這些重試策略

手動確認模式下,消費者需要顯式地對消息進行確認。如果消費者在處理消息時遇到異常,可以選擇不確認消息,使消息重新入隊

  • 重試的控制權在于應用程序本身,而不是 RabbitMQ 的內部機制
  • 應用程序可以通過自己的邏輯和利用 RabbitMQ 的高級特性來實現有效的重試策略

使用重試機制時需要注意:

  1. 自動確認模式下:程序邏輯異常,多次重試還是失敗,消息就會被自動確認,那么消息就丟失了
  2. 手動確認模式下:程序邏輯異常,多次重試消息依然處理失敗,無法被確認,就一直是 unacked 的狀態,導致消息積壓

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/92173.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/92173.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/92173.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

零基礎人工智能學習規劃之路

一、引言:為什么選擇人工智能?人工智能(AI)是當前科技領域最炙手可熱的方向之一,涵蓋機器學習、深度學習、計算機視覺、自然語言處理等多個分支。無論是就業市場的高需求,還是技術改變生活的潛力&#xff0…

【科研繪圖系列】R語言繪制誤差棒圖

文章目錄 介紹 加載R包 數據下載 導入數據 數據預處理 畫圖 系統信息 參考 介紹 【科研繪圖系列】R語言繪制誤差棒圖 加載R包 library(tidyverse) library(ggplot2) library(ggsignif) library(RColorBrewer) library(waterfalls) library(reshape2

期權定價全解析:從Black-Scholes到量子革命的金融基石

在金融市場中,期權定價如同航海中的羅盤,為風險定價提供方向。本文將深入剖析期權定價的核心邏輯、應用場景及量子計算帶來的顛覆性變革,并附實戰代碼示例。 一、期權定價的本質:風險的時間價值 1. 核心公式解析 C = e^{-rT}\mathbb{E}^\mathbb{Q}[\max(S_T-K,0)] C:期權…

實現div內容的垂直居中

Flexbox 彈性盒子(推薦) div {display: flex;align-items: center; /* 垂直居中 */justify-content: center;/* 水平居中 */height: 300px; /* 需要指定高度 */ }? 現代瀏覽器首選方案,支持響應式布局 Grid 網格布局 div {displ…

Juc高級篇:可見性,有序性,cas,不可變,設計模式

目錄 一.Java內存模型 1.可見性 1.1設計模式 (1.1.1)兩階段終止 (1.1.2)Balking模式 2.有序性 3.volatile原理 3.1保證可見性與有序性 3.2單例模式DCL 3.3 happens-before規則 4.線程安全單例 4.1餓漢式 二.無鎖并發 1.原子整數 2.原子引用 2.1 AtomicReference…

JDK源碼

java.util.concurrent 以下是atomic包下的 AtomicInteger Unsafe類:提供的方法可以直接訪問內存、線程。 屬性:Unsafe、int value 通過Unsafe方法中的CAS循環,保證int類型值的原子操作 int var5; do {var5 this.getIntVolatile(var1, var2);…

Linux網絡編程【基于UDP網絡通信的字典翻譯服務】

1. 基本框架:前面我們已近完成了,基于UDP協議的網絡通信,但是我們服務器接收到來自客戶端的信息即字符串時只是進行了簡單的發送會客戶端和在日志中回顯打印,并沒有實際的業務服務。那么接下來,我們就設計一個字典翻譯…

Quality Control II: Trimming (二):BBDuk

參考:BBDuk Guide - Archive 在我們了解了如何使用trimmomatic之后,我們開始進一步了解另外一種trim工具BBDuk 首先小編要聲明:如果想要完全掌握一個工具是需要較長時間的鉆研和學習的,這里呢只是提供BBDuk處理數據的基本邏輯和…

AlmaLinux8 平替 manylinux_2_28-python 的 GPG密鑰管理、安裝 cuda sdk

0. 下載 AlmaLinux 8 docker 鏡像 https://hub.docker.com/r/almalinux/8-base/tags 下載鏡像: sudo docker pull almalinux/8-base:8.4 創建一個容器: sudo docker run --gpus all -it --name cudaq_src_py_LHL_06 -v /home/hanmeimei/big…

BM1684X平臺:Qwen-2-5-VL圖像/視頻識別應用

一、 簡介 Qwen-2-5-VL 是阿里巴巴通義千問團隊推出的多模態大語言模型(MLLM),屬于 Qwen-2 系列模型的一部分,支持視覺(Vision)與語言(Language)的多模態交互。 1、特性 動態分辨…

前端項目工程化配置webpack與vite

webpack與vite一、了解 webpack入口(entry)輸出(output)loader插件(plugin)模式(mode)二、項目中使用webpackvue項目react項目三、了解vite構建選項(build)模塊解析(Resolve)模塊處理(Module)服務器選項&am…

機器學習(3):KNN算法-分類

一、KNN算法 K-近鄰算法(K-Nearest Neighbors,簡稱KNN),根據K個鄰居樣本的類別來判斷當前樣本的類別;如果一個樣本在特征空間中的k個最相似(最鄰近)樣本中的大多數屬于某個類別,則該類本也屬于這個類別。一些距離&…

Redis Windows遷移方案與測試

我想將開源軟件Redis的主程序和附屬程序遷移到Windows平臺,目前它只能在Linux上運行,讓它可以在Windows 11和Windows Server 2025上運行,這需要考慮Linux操作系統和Windows操作系統的差異,請列舉出將Redis在Linux系統上運行的GCC的…

信息安全概述--實驗總結

數據鏈路層--ARP欺騙ARP欺騙原理XP2要與XP3通信,要發送ARP請求,詢問XP3的MAC地址kali冒充XP3持續給XP2發送ARP應答,XP2會以為收到的MAC地址是XP3的,實際是kali的之后XP2發送的數據都是發給kali的如果說XP2需要想要訪問互聯網&…

【Electron】打包后圖標不變問題,圖標問題

windows上圖標未更換。圖標已經換了,但新打出的包或是安裝后的 exe 圖標沒有更換。這個時候可以右擊你的exe或是安裝包點屬性,看看圖標是否正常,如果這里的圖標正常,那其實就是成功的了。主要原因是因為 windows 圖標緩存機制導致…

單詞拆分 II

題目&#xff1a;思考&#xff1a; 本質上和單詞拆分1沒什么區別單詞拆分1是問能不能拆單詞拆分2是問把所有拆的方案列出來要列出所有方案&#xff0c;采用字典樹回溯 實現&#xff1a; class Node { public:vector<Node*> check;bool isEnd;Node(int num){for (int i0;i…

國產三防平板電腦是什么?三防平板推薦

國產三防平板電腦&#xff0c;專為應對極端工作環境而生。這類設備集防水、防塵、防摔三大防護性能于一體&#xff0c;通過IP67/IP68防護認證及MIL-STD-810軍規標準測試&#xff0c;能在建筑工地、油田勘探、應急救援等惡劣場景中穩定運行。其核心價值在于將消費級平板的智能體…

優思學院|什么是精益生產管理?原則與方法詳述

在企業經營中&#xff0c;「利潤&#xff1d;價格&#xff0d;成本」這條公式可謂家喻戶曉。傳統的成本思維通常認為價格由公司設定&#xff0c;而成本則是難以撼動的既定事實。然而&#xff0c;隨著市場經濟與自由定價機制的成熟&#xff0c;企業逐漸意識到——價格其實是由市…

【銀行測試】銀行票據項目業務+票據測試點分析(四)

目錄&#xff1a;導讀 前言一、Python編程入門到精通二、接口自動化項目實戰三、Web自動化項目實戰四、App自動化項目實戰五、一線大廠簡歷六、測試開發DevOps體系七、常用自動化測試工具八、JMeter性能測試九、總結&#xff08;尾部小驚喜&#xff09; 前言 1、提示付款 功能…

基于華為開發者空間的Open WebUI數據分析與可視化實戰

1 概述 1.1 案例介紹 本案例演示如何在華為開發者空間云主機上搭建Open WebUI環境&#xff0c;結合DeepSeek-R1模型進行數據分析、統計建模、數據可視化和業務洞察挖掘等實際數據科學任務。 1.2 適用對象 數據分析師業務分析師數據科學工程師市場研究人員統計學專業學生 1…