RabbitMQ 高級特性解析:RabbitMQ 消息可靠性保障 (上)

RabbitMQ 核心功能

RabbitMQ 高級特性解析:RabbitMQ 消息可靠性保障 (上)-CSDN博客

RabbitMQ 高級特性:從 TTL 到消息分發的全面解析 (下)-CSDN博客


前言

最近再看?RabbitMQ,看了看自己之前寫的博客,誒,一言難盡,當時學的懵懵懂懂的。這里重新整理?RabbitMQ 的核心功能。

在分布式系統中,消息隊列是實現異步通信、解耦服務的關鍵組件。RabbitMQ 作為一款功能強大的消息隊列,其消息可靠性是確保系統穩定運行的重要因素。這里將深入探討 RabbitMQ 的消息確認機制、持久化策略、發送方確認機制以及重試機制!!


一、消息確認機制

1.1 消息確認機制概述

生產者發送消息到消費端后,可能出現消息處理成功或異常的情況。如果 RabbitMQ 在發送消息后就將其刪除,當消息處理異常時,就會造成消息丟失。為了確保消費端成功接收并正確處理消息,RabbitMQ 提供了消息確認機制(message acknowledgement)。

消費者在訂閱隊列時,可以指定autoAck參數,根據該參數設置,消息確認機制分為自動確認和手動確認兩種:

  • 自動確認(autoAck=true):RabbitMQ 會自動把發送出去的消息置為確認,然后從內存(或者磁盤)中刪除,而不管消費者是否真正消費到了這些消息。這種模式適合對消息可靠性要求不高的場景。
  • 手動確認(autoAck=false):RabbitMQ 會等待消費者顯式地調用Basic.Ack命令,回復確認信號后才從內存(或者磁盤)中移去消息。這種模式適合對消息可靠性要求比較高的場景。

以下是basicConsume方法的定義:

/*** Start a non-nolocal, non-exclusive consumer, with* a server-generated consumerTag.* @param queue the name of the queue* @param autoAck true if the server should consider messages* acknowledged once delivered; false if the server should expect* explicit acknowledgements* @param callback an interface to the consumer object* @return the consumerTag generated by the server* @throws java.io.IOException if an error is encountered* @see com.rabbitmq.client.AMQP.Basic.Consume* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk* @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)*/
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

1.2 手動確認方法

消費者在收到消息之后,可以選擇確認、直接拒絕或者跳過,RabbitMQ 提供了三種不同的確認應答方式:

  • 肯定確認:Channel.basicAck(long deliveryTag, boolean multiple):RabbitMQ 已知道該消息并且成功處理消息,可以將其丟棄。
    • deliveryTag:消息的唯一標識,是一個單調遞增的 64 位長整型值,每個通道(Channel)獨立維護,所以在每個通道上都是唯一的。當消費者確認(ack)一條消息時,必須使用對應的通道進行確認。
    • multiple:是否批量確認。值為true則會一次性 ack 所有小于或等于指定deliveryTag的消息;值為false,則只確認當前指定deliveryTag的消息。
  • 否定確認:Channel.basicReject(long deliveryTag, boolean requeue):消費者客戶端可以調用channel.basicReject方法來告訴 RabbitMQ 拒絕這個消息。
    • deliveryTag:參考channel.basicAck
    • requeue:表示拒絕后,這條消息如何處理。如果requeue參數設置為true,則 RabbitMQ 會重新將這條消息存入隊列,以便可以發送給下一個訂閱的消費者;如果requeue參數設置為false,則 RabbitMQ 會把消息從隊列中移除,而不會把它發送給新的消費者。
  • 否定確認:Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue):如果想要批量拒絕消息,可以使用Basic.Nack命令。
    • 參數介紹參考前面兩個方法。multiple參數設置為true則表示拒絕deliveryTag編號之前所有未被當前消費者確認的消息。

1.3 代碼示例

我們基于 Spring Boot 來演示消息的確認機制,Spring - AMQP 對消息確認機制提供了三種策略:

public enum AcknowledgeMode {NONE,MANUAL,AUTO;
}
1.3.1?AcknowledgeMode.NONE
  • 配置確認機制
spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: none
  • 發送消息
public class Constant {public static final String ACK_EXCHANGE_NAME = "ack_exchange";public static final String ACK_QUEUE = "ack_queue";
}@Configuration
public class RabbitmqConfig {@Bean("ackExchange")public Exchange ackExchange(){return ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build();}@Bean("ackQueue")public Queue ackQueue() {return QueueBuilder.durable(Constant.ACK_QUEUE).build();}@Bean("ackBinding")public Binding ackBinding(@Qualifier("ackExchange") Exchange exchange, @Qualifier("ackQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("ack").noargs();}
}@RestController
@RequestMapping("/producer")
public class ProductController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/ack")public String ack(){rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "ack", "consumer ack test...");return "發送成功!";}
}
  • 消費端邏輯
@Component
public class AckQueueListener {@RabbitListener(queues = Constant.ACK_QUEUE)public void ListenerQueue(Message message, Channel channel) throws Exception {System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());//模擬處理失敗int num = 3/0;System.out.println("處理完成");}
}

運行結果:消息處理失敗,但消息已從 RabbitMQ 中移除,因為NONE模式下消息一旦投遞就會被自動確認。

1.3.2?AcknowledgeMode.AUTO
  • 配置確認機制
spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: auto

重新運行程序,當消費者出現異常時,RabbitMQ 會不斷重發消息,由于異常多次重試還是失敗,消息沒被確認,也無法 nack,就一直是unacked狀態,導致消息積壓。

1.3.3?AcknowledgeMode.MANUAL
  • 配置確認機制
spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: manual
  • 消費端手動確認邏輯
@Component
public class AckQueueListener {@RabbitListener(queues = Constant.ACK_QUEUE)public void ListenerQueue(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//1. 接收消息System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());//2. 處理業務邏輯System.out.println("處理業務邏輯");//手動設置一個異常, 來測試異常拒絕機制// int num = 3/0;//3. 手動簽收channel.basicAck(deliveryTag, true);} catch (Exception e) {//4. 異常了就拒絕簽收//第三個參數requeue, 是否重新發送, 如果為true, 則會重新發送, 若為false, 則直接丟棄channel.basicNack(deliveryTag, true, true);}}
}

運行結果:消息正常處理時會被簽收;異常時會不斷重試。


二、持久性

2.1 交換機持久化

交換器的持久化是通過在聲明交換機時將durable參數置為true實現的。這樣當 MQ 的服務器發生意外或關閉之后,重啟 RabbitMQ 時不需要重新去建立交換機,交換機會自動建立。如果交換器不設置持久化,那么在 RabbitMQ 服務重啟之后,相關的交換機元數據會丟失。

ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build();

2.2 隊列持久化

隊列的持久化是通過在聲明隊列時將durable參數置為true實現的。如果隊列不設置持久化,那么在 RabbitMQ 服務重啟之后,該隊列就會被刪掉,數據也會丟失。但隊列持久化不能保證內部所存儲的消息不丟失,要確保消息不丟失,需要將消息設置為持久化。

QueueBuilder.durable(Constant.ACK_QUEUE).build();

創建非持久化隊列:

QueueBuilder.nonDurable(Constant.ACK_QUEUE).build(); 

2.3 消息持久化

消息實現持久化,需要把消息的投遞模式(MessageProperties中的deliveryMode)設置為 2,也就是MessageDeliveryMode.PERSISTENT

// 要發送的消息內容
String message = "This is a persistent message";
// 創建一個Message對象,設置為持久化
Message messageObject = new Message(message.getBytes(), new MessageProperties());
messageObject.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
// 使用RabbitTemplate發送消息
rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "ack", messageObject);

需要注意的是,將所有的消息都設置為持久化,會嚴重影響 RabbitMQ 的性能,因為寫入磁盤的速度比寫入內存的速度慢很多。在選擇是否要將消息持久化時,需要在可靠性和吞吐量之間做一個權衡。

即使將交換器、隊列、消息都設置了持久化,也不能百分之百保證數據不丟失。例如,消費者訂閱隊列時autoAck參數設置為true,消費者接收到消息后還沒來得及處理就宕機,會導致數據丟失;持久化的消息存入 RabbitMQ 后,還需要一段時間才能存入磁盤,如果在這段時間內 RabbitMQ 服務節點發生異常,消息可能會丟失。可以通過引入 RabbitMQ 的仲裁隊列或在發送端引入事務機制、發送方確認機制來提高可靠性。(后續都會講到)


三、發送方確認

3.1 confirm 確認模式

Producer 在發送消息時,對發送端設置一個ConfirmCallback的監聽,無論消息是否到達Exchange,這個監聽都會被執行。如果Exchange成功收到消息,ACKtrue;如果沒收到消息,ACKfalse

配置 RabbitMQ

spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: manual #消息接收確認publisher-confirm-type: correlated #消息發送確認

設置確認回調邏輯并發送消息

@Configuration
public class RabbitTemplateConfig {@Bean("confirmRabbitTemplate")public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack){System.out.printf("消息接收成功, id:%s \n", correlationData.getId());}else {System.out.printf("消息接收失敗, id:%s, cause: %s", correlationData.getId(), cause);}}});return rabbitTemplate;}
}@RestController
@RequestMapping("/product")
public class ProductController {@Resource(name = "confirmRabbitTemplate")private RabbitTemplate confirmRabbitTemplate;@RequestMapping("/confirm")public String confirm() throws InterruptedException {CorrelationData correlationData1 = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE_NAME, "confirm", "confirm test...", correlationData1);return "確認成功";}
}
  • 測試
    運行程序,調用接口http://127.0.0.1:8080/product/confirm,觀察控制臺,消息確認成功。修改交換機名稱,重新運行,會觸發消息發送失敗的結果。

3.2 return 退回模式

消息到達Exchange之后,會根據路由規則匹配,把消息放入Queue中。如果一條消息無法被任何隊列消費,可以選擇把消息退還給發送者。

配置 RabbitMQ

spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: manual #消息接收確認publisher-confirm-type: correlated #消息發送確認
  • 設置返回回調邏輯并發送消息
@Configuration
public class RabbitTemplateConfig {@Bean("confirmRabbitTemplate")public RabbitTemplate confirmRabbitTemplate(CachingConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.printf("消息被退回: %s", returned);}});return rabbitTemplate;}
}@RestController
@RequestMapping("/product")
public class ProductController {@Resource(name = "confirmRabbitTemplate")private RabbitTemplate confirmRabbitTemplate;@RequestMapping("/msgReturn")public String msgReturn(){CorrelationData correlationData = new CorrelationData("2");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm11", "message return test...", correlationData);return "消息發送成功";}
}

測試
運行程序,調用接口http://127.0.0.1:8080/product/msgReturn,觀察控制臺,消息被退回。


四、重試機制

在消息傳遞過程中,可能會遇到各種問題,如網絡故障、服務不可用、資源不足等,這些問題可能導致消息處理失敗。為了解決這些問題,RabbitMQ 提供了重試機制,允許消息在處理失敗后重新發送。但如果是程序邏輯引起的錯誤,那么多次重試也是沒有用的,可以設置重試次數。

4.1 重試配置

spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: auto #消息接收確認retry:enabled: true # 開啟消費者失敗重試initial-interval: 5000ms # 初始失敗等待時長為5秒max-attempts: 5 # 最大重試次數(包括自身消費的一次)

4.2 配置交換機 & 隊列

//重試機制
public static final String RETRY_QUEUE = "retry_queue";
public static final String RETRY_EXCHANGE_NAME = "retry_exchange";//重試機制 發布訂閱模式
//1. 交換機
@Bean("retryExchange")
public Exchange retryExchange() {return ExchangeBuilder.fanoutExchange(Constant.RETRY_EXCHANGE_NAME).durable(true).build();
}
//2. 隊列
@Bean("retryQueue")
public Queue retryQueue() {return QueueBuilder.durable(Constant.RETRY_QUEUE).build();

五:如何保證 RabbitMQ 消息的可靠傳輸?

消息可能丟失的場景以及解決方案如下:

生產者將消息發送到 RabbitMQ 失敗

  • 可能原因是網絡問題等,
  • 解決辦法是參考發送方確認 - confirm確認模式

消息在交換機中無法路由到指定隊列

  • 可能原因是代碼或者配置層面錯誤,導致消息路由失敗,
  • 解決辦法是參考發送方確認 - return模式

消息隊列自身數據丟失

  • 可能原因是消息到達 RabbitMQ 之后,RabbitMQ Server 宕機導致消息丟失,
  • 解決辦法是參考持久化開啟 RabbitMQ 持久化,也可以通過集群的方式提高可靠性。

消費者異常,導致消息丟失

  • 可能原因是消息到達消費者,還沒來得及消費,消費者宕機或消費者邏輯有問題,
  • 解決辦法是參考消息確認,開啟手動確認,配置重試機制

以上就是四個RabbitMQ保證消息可靠性的四個機制,后續有更多核心機制的更新,感謝閱覽!!

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/72856.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/72856.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/72856.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

用DeepSeek-R1-Distill-data-110k蒸餾中文數據集 微調Qwen2.5-7B-Instruct!

下載模型與數據 模型下載: huggingface: Qwen/Qwen2.5-7B-Instruct HF MirrorWe’re on a journey to advance and democratize artificial intelligence through open source and open science.https://hf-mirror.com/Qwen/Qwen2.5-7B-Instruct 魔搭&a…

在IDEA中進行git回滾操作:Reset current branch to here?或Reset HEAD

問題描述 1)在本地修改好的代碼,commit到本地倉庫,突然發覺有問題不想push推到遠程倉庫了,但它一直在push的列表中存在,那該怎么去掉push列表中的內容呢? 2)合并別的分支到當前分支&#xff0…

六十天前端強化訓練之第十一天之事件機制超詳解析

歡迎來到編程星辰海的博客講解 目錄 一、事件模型演進史 1.1 原始事件模型(DOM Level 0) 1.2 DOM Level 2事件模型 1.3 DOM Level 3事件模型 二、事件流深度剖析 2.1 捕獲與冒泡對比實驗 2.2 事件終止方法對比 三、事件委托高級應用 3.1 動態元…

Qwen架構與Llama架構的核心區別

我們在討論Deepseek不同版本之間的區別時了解到,DeepSeek-R1的蒸餾模型分為Qwen和Llama兩個系列,包括Qwen系列的0.5B、1.5B、3B、7B、14B、32B、72B和Llama系列的8B、70B。Qwen系列以阿里通義千問(Qwen)為基礎模型架構(具體是Qwen-2.5),Llama系列以Meta的Llama為基礎模型…

匿名GitHub鏈接使用教程(Anonymous GitHub)2025

Anonymous GitHub 1. 引言2. 準備3. 進入Anonymous GitHub官網4. 用GitHub登錄匿名GitHub并授權5. 進入個人中心,然后點擊? Anonymize Repo實例化6. 輸入你的GitHub鏈接7. 填寫匿名鏈接的基礎信息8. 提交9. 實例化對應匿名GitHub鏈接10. 進入個人中心管理項目11. 查…

工程化與框架系列(25)--低代碼平臺開發

低代碼平臺開發 🔧 引言 低代碼開發平臺是一種通過可視化配置和少量代碼實現應用開發的技術方案。本文將深入探討低代碼平臺的設計與實現,包括可視化編輯器、組件系統、數據流管理等關鍵主題,幫助開發者構建高效的低代碼開發平臺。 低代碼…

Redis系列之慢查詢分析與調優

Redis 慢查詢分析與優化:提升性能的實戰指南 Redis 作為一款高性能的內存數據庫,因其快速的數據讀寫能力和靈活的數據結構,被廣泛應用于緩存、消息隊列、排行榜等多種業務場景。然而,隨著業務規模的擴大和數據量的增加&#xff0…

Git系列之git tag和ReleaseMilestone

以下是關于 Git Tag、Release 和 Milestone 的深度融合內容,并補充了關于 Git Tag 的所有命令、詳細解釋和指令實例,條理清晰,結合實際使用場景和案例。 1. Git Tag 1.1 定義 ? Tag 是 Git 中用于標記特定提交(commit&#xf…

開源項目介紹:Native-LLM-for-Android

項目地址:Native-LLM-for-Android 創作活動時間:2025年 支持在 Android 設備上運行大型語言模型 (LLM) ,具體支持的模型包括: DeepSeek-R1-Distill-Qwen: 1.5B Qwen2.5-Instruct: 0.5B, 1.5B Qwen2/2.5VL:…

深入理解 Java 虛擬機內存區域

Java 虛擬機(JVM)是 Java 程序運行的核心環境,它通過內存管理為程序提供高效的執行支持。JVM 在運行時將內存劃分為多個區域,每個區域都有特定的作用和生命周期。本文將詳細介紹 JVM 的運行時數據區域及其功能,并探討與…

PDF轉JPG(并去除多余的白邊)

首先,手動下載一個軟件(poppler for Windows),下載地址:https://github.com/oschwartz10612/poppler-windows/releases/tag/v24.08.0-0 否則會出現以下錯誤: PDFInfoNotInstalledError: Unable to get pag…

深入剖析MyBatis緩存機制:原理、源碼與實戰指南

引言 MyBatis作為一款優秀的ORM框架,其緩存機制能顯著提升數據庫查詢性能。但許多開發者僅停留在“知道有緩存”的層面,對其實現原理和細節知之甚少。本文將結合可運行的代碼示例和源碼分析,手把手帶您徹底掌握MyBatis緩存機制。 一、MyBatis緩存分類 MyBatis提供兩級緩存…

Vue 使用 vue-router 時,多級嵌套路由緩存問題處理

Vue 使用 vue-router 時,多級嵌套路由緩存問題處理 對于三級菜單(或多級嵌套路由),vue 都是 通過 keep-alive 組件來實現路由組件的緩存。 有時候三級或者多級路由時,會出現失效情況。以下是三級菜單緩存的例子。 最…

QSplitter保存和讀取

官方文檔提供的方案 保存 connect(ui->splitter, &QSplitter::splitterMoved, [](){settings.setValue("splitterSizes", ui->splitter->saveState()); });讀取 ui->splitter->restoreState(settings.value("splitterSizes").toByteA…

VanillaVueSvelteReactSolidAngularPreact前端框架/庫的簡要介紹及其優勢

VanillaVueSvelteReactSolidAngularPreact前端框架/庫的簡要介紹及其優勢。以下是這些前端框架/庫的簡要介紹及其優勢: 1. Vanilla 定義:Vanilla 并不是一個框架,而是指 原生 JavaScript(即不使用任何框架或庫)。優勢…

Java多線程與高并發專題——關于CopyOnWrite 容器特點

引入 在 CopyOnWriteArrayList 出現之前,我們已經有了 ArrayList 和 LinkedList 作為 List 的數組和鏈表的實現,而且也有了線程安全的 Vector 和Collections.synchronizedList() 可以使用。 首先我們來看看Vector是如何實現線程安全的 ,還是…

Jmeter接口測試詳解

今天筆者呢,想給大家聊聊Jmeter接口測試流程詳解,廢話不多說直接進入正題。 一、jmeter簡介 Jmeter是由Apache公司開發的java開源項目,所以想要使用它必須基于java環境才可以; Jmeter采用多線程,允許通過多個線程并…

DeepSeek開啟AI辦公新模式,WPS/Office集成DeepSeek-R1本地大模型!

從央視到地方媒體,已有多家媒體機構推出AI主播,最近杭州文化廣播電視集團的《杭州新聞聯播》節目,使用AI主持人進行新聞播報,且做到了0失誤率,可見AI正在逐漸取代部分行業和一些重復性的工作,這一現象引發很…

通過Golang的container/list實現LRU緩存算法

文章目錄 力扣:146. LRU 緩存主要結構 List 和 Element常用方法1. 初始化鏈表2. 插入元素3. 刪除元素4. 遍歷鏈表5. 獲取鏈表長度使用場景注意事項 源代碼閱讀 在 Go 語言中,container/list 包提供了一個雙向鏈表的實現。鏈表是一種常見的數據結構&#…

【大學生體質】智能 AI 旅游推薦平臺(Vue+SpringBoot3)-完整部署教程

智能 AI 旅游推薦平臺開源文檔 項目前端地址 ??項目介紹 智能 AI 旅游推薦平臺(Intelligent AI Travel Recommendation Platform)是一個利用 AI 模型和數據分析為用戶提供個性化旅游路線推薦、景點評分、旅游攻略分享等功能的綜合性系統。該系統融合…