RabbitMq--消息可靠性

12.消息可靠性

1.消息丟失的情況

  1. 生產者向消息代理傳遞消息的過程中,消息丟失了
  2. 消息代理( RabbitMQ )把消息弄丟了
  3. 消費者把消息弄丟了

image-20250310192901333

那怎么保證消息的可靠性呢,我們可以從消息丟失的情況入手——從生產者、消息代理( RabbitMQ )、消費者三個方面來保證消息的可靠性

2.生產者的可靠性

1.生產者重連

由于網絡問題,可能會出現客戶端連接 RabbitMQ 失敗的情況,我們可以通過配置開啟連接 RabbitMQ 失敗后的重連機制

application.yml(將 host 更改為部署 RabbitMQ 的服務器的地址)

spring:rabbitmq:host: 127.0.0.1port: 5672virtual-host: /blogusername: CaiXuKunpassword: T1rhFXMGXIOYCoyiconnection-timeout: 1s # 連接超時時間template:retry:enabled: true # 開啟連接超時重試機制initial-interval: 1000ms # 連接失敗后的初始等待時間multiplier: 1 # 連接失敗后的等待時長倍數,下次等待時長 = (initial-interval) * multipliermax-attempts: 3 # 最大重試次數

注意事項: 當網絡不穩定的時候,利用重試機制可以有效提高消息發送的成功率,但 SpringAMOP 提供的重試機制是阻塞式的重試,也就是說多次重試等待的過程中,線程會被阻塞,影響業務性能
如果對于業務性能有要求,建議禁用重試機制。如果一定要使用,請合理配置等待時長(比如 200 ms)和重試次數,也可以考慮使用異步線程來執行發送消息的代碼

2.生產者確認

RabbitMQ 提供了 Publisher ConfirmPublisher Return 兩種確認機制。開啟確機制認后,如果 MQ 成功收到消息后,會返回確認消息給生產者,返回的結果有以下幾種情況

  • 消息投遞到了 MQ,但是路由失敗(業務原因),此時會通過 PublisherReturn 機制返回路由異常的原因,然后返回 ACK,告知生產者消息投遞成功

  • 臨時消息投遞到了 MQ,并且入隊成功,返回 ACK,告知生產者消息投遞成功

  • 持久消息投遞到了MQ,并且入隊完成持久化,返回 ACK,告知生產者消息投遞成功

  • 其它情況都會返回 NACK,告知生產者消息投遞失敗

image-20250310194409045

生產者確認機制有關的配置信息( application.yml 文件)

spring:rabbitmq:publisher-returns: truepublisher-confirm-type: correlated

publisher-confirm-type 有三種模式:

  1. none:關閉 confirm 機制
  2. simple:以同步阻塞等待的方式返回 MQ 的回執消息
  3. correlated:以異步回調方式的方式返回 MQ 的回執消息

每個 RabbitTemplate 只能配置一個 ReturnCallback

新增一個名為 RabbitMQConfig 的配置類,并讓該類實現 ApplicationContextAware 接口

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 配置回調rabbitTemplate.setReturnsCallback((returnedMessage) -> {System.out.println("收到消息的return callback, " +"exchange = " + returnedMessage.getExchange() + ", " +"routingKey = " + returnedMessage.getRoutingKey() + ", " +"replyCode = " + returnedMessage.getReplyCode() + ", " +"replyText = " + returnedMessage.getReplyText() + ", " +"message = " + returnedMessage.getMessage());});}}

添加一個測試類,測試 ReturnCallback 的效果

@Test
void testConfirmCallback() throws InterruptedException {CorrelationData correlationData = new CorrelationData();correlationData.getFuture().whenCompleteAsync((confirm, throwable) -> {if (confirm.isAck()) {// 消息發送成功System.out.println("消息發送成功,收到ack");} else {// 消息發送失敗System.err.println("消息發送失敗,收到nack,原因是" + confirm.getReason());}if (throwable != null) {// 消息回調失敗System.err.println("消息回調失敗");}});rabbitTemplate.convertAndSend("blog.direct", "red", "Hello, confirm callback", correlationData);// 測試方法執行結束后程序就結束了,所以這里需要阻塞線程,否則程序看不到回調結果Thread.sleep(2000);
}

如何看待和處理生產者的確認信息

  • 生產者確認需要額外的網絡開銷和系統資源開銷,盡量不要使用
  • 如果一定要使用,無需開啟 Publisher-Return 機制,因為路由失敗一般是業務出了問題
  • 對于返回 nack 的消息,可以嘗試重新投遞,如果依然失敗,則記錄異常消息

3.消息代理(RabbitMQ)的可靠性

在默認情況下,RabbitMQ 會將接收到的信息保存在內存中以降低消息收發的延遲,這樣會導致兩個問題:

  • 一旦 RabbitMQ 宕機,內存中的消息會丟失
  • 內存空間是有限的,當消費者處理過慢或者消費者出現故障或時,會導致消息積壓,引發 MQ 阻塞( Paged Out 現象

**MQ 阻塞:**當隊列的空間被消息占滿了之后,RabbitMQ 會先把老舊的信息存到磁盤,為新消息騰出空間,在這個過程中,整個 MQ 是被阻塞的,也就是說,在 MQ 完成這一系列工作之前,無法處理已有的消息和接收新的消息

1.數據持久化

RabbitMQ 實現數據持久化包括 3 個方面:

  1. 交換機持久化
  2. 隊列持久化
  3. 消息持久化

注意事項:利用 SpringAMQP 創建的交換機、隊列、消息,默認都是持久化的
在 RabbitMQ 控制臺創建的交換機、隊列默認是持久化的,而消息默認是存在內存中( 3.12 版本之前默認存放在內存,3.12 版本及之后默認先存放在磁盤,消費者處理消息時才會將消息取出來放到內存中)

2. LazyQueue( 3.12 版本后所有隊列都是 Lazy Queue 模式)

從 RabbitMQ 的 3.6.0 版本開始,增加了 Lazy Queue 的概念,也就是惰性隊列,惰性隊列的特征如下:

  1. 接收到消息后直接存入磁盤而非內存(內存中只保留最近的消息,默認 2048條 )
  2. 消費者要處理消息時才會從磁盤中讀取并加載到內存
  3. 支持數百萬條的消息存儲,在 3.12 版本后,所有隊列都是 Lazy Queue 模式,無法更改

開啟持久化和生產者確認時,RabbitMQ 只有在消息持久化完成后才會給生產者返回 ACK 回執

  • 在 RabbitMQ 控制臺中,要創建一個惰性隊列,只需要在聲明隊列時,指定 x-queue-mode 屬性為 lazy 即可

    image-20250310200323362

  • 在 Java 代碼中,要創建一個惰性隊列,只需要在聲明隊列時,指定 x-queue-mode 屬性為 lazy 即可

    //注解式創建
    @RabbitListener(queuesToDeclare = @org.springframework.amqp.rabbit.annotation.Queue(name = "lazy.queue2",durable = "true",arguments = @Argument(name = "x-queue-mode",value = "lazy")
    ))
    public void listenLazeQueue(String message) {System.out.println("消費者收到了 laze.queue2的消息: " + message);
    }
    //編程式創建@Bean
    public org.springframework.amqp.core.Queue lazeQueue() {return QueueBuilder.durable("lazy.queue1").lazy().build();
    }
    

4.消費者的可靠性

1. 消費者確認機制

為了確認消費者是否成功處理消息,RabbitMQ 提供了消費者確認機制(Consumer Acknowledgement)。處理消息后,消費者應該向 RabbitMQ 發送一個回執,告知 RabbitMQ 消息的處理狀態,回執有三種可選值:

  1. ack:成功處理消息,RabbitMQ 從隊列中刪除該消息
  2. nack:消息處理失敗,RabbitMQ 需要再次投遞消息
  3. reject:消息處理失敗并拒絕該消息,RabbitMQ 從隊列中刪除該消息

SpringAMQP 已經實現了消息確認功能,并允許我們通過配置文件選擇 ACK 的處理方式,有三種方式:

  • none:不處理,即消息投遞給消費者后立刻 ack,消息會會立刻從 MQ 中刪除,非常不安全,不建議使用

  • manual:手動模式。需要自己在業務代碼中調用 api,發送 ack 或 reject ,存在業務入侵,但更靈活

  • auto:自動模式,SpringAMQP 利用 AOP 對我們的消息處理邏輯做了環繞增強,當業務正常執行時則自動返回 ack,當業務出現異常時,會根據異常的類型返回不同結果:

    • 如果是業務異常,會自動返回 nack
    • 如果是消息處理或校驗異常,自動返回 reject

開啟消息確認機制,需要在 application.yml 文件中編寫相關的配置

spring:rabbitmq:listener:simple:prefetch: 1acknowledge-mode: none
2.失敗重試機制

當消費者出現異常后,消息會不斷重新入隊,重新發送給消費者,然后再次發生異常,再次 requeue(重新入隊),陷入 無限循環,給 RabbitMQ 帶來不必要的壓力

我們可以利用 Spring 提供的 retry 機制,在消費者出現異常時利用本地重試,而不是無限制地重新入隊

在 application.yml 配置文件中開啟失敗重試機制

spring:rabbitmq:listener:simple:prefetch: 1acknowledge-mode: autoretry:enabled: true # 開啟消息消費失敗重試機制initial-interval: 1000ms # 消息消費失敗后的初始等待時間multiplier: 1 # 消息消費失敗后的等待時長倍數,下次等待時長 = (initial-interval) * multipliermax-attempts: 3 # 最大重試次數stateless: true # true表示無狀態,false表示有狀態,如果業務中包含事務,需要設置為false

在達到最大重試次數后,消息會丟失!!!怎樣解決?

3. 失敗消息的處理策略

開啟重試模式后,如果重試次數耗盡后消息依然處理失敗,則需要由 MessageRecoverer 接口來處理, MessageRecoverer 有三個實現類:

  • RejectAndDontRequeueRecoverer:重試次數耗盡后,直接 reject,丟棄消息,默認就是這種方式

  • ImmediateRequeueMessageRecoverer:重試次數耗盡后,返回 nack,消息重新入隊

  • RepublishMessageRecoverer:重試耗盡后,將失敗消息投遞到指定的交換機

image-20250310203318152

我們來演示一下使用 RepublishMessageRecoverer 類的情況

  • 第一步:定義一個名為 blog.error 的交換機、一個名為 error.queue 的隊列,并將隊列和交換機進行綁定

    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.retry.MessageRecoverer;
    import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;@Configuration
    @ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled", havingValue = "true")
    public class ErrorConfiguration {@Beanpublic DirectExchange errorExchange() {return new DirectExchange("error.direct", true, false);}@Beanpublic Queue errorQueue() {return new Queue("error.queue", true, false, false);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorExchange) {return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");}}
  • 第二步:將失敗處理策略改為 RepublishMessageRecoverer (開起了消費者重試機制才會生效)

    @Bean
    public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
  • 在控制臺中可以看到,消息的重試次數耗盡后,消息被放入了 error.queue 隊列

    image-20250310203613628

  • 在 RabbitMQ 的控制塔也可以看到, error.direct 交換機 和 error.queue 隊列成功創建,消息也成功放入了 error.queue 隊列

    image-20250310203633853

總結:消費者如何保證消息一定被消費?

  • 開啟消費者確認機制為 auto ,由 Spring 幫我們確認,消息處理成功后返回 ack,異常時返回 nack
  • 開啟消費者失敗重試機制,并設置 MessageRecoverer ,多次重試失敗后將消息投遞到異常交換機,交由人工處理
4. 業務冪等性

在程序開發中,冪等是指同一個業務,執行一次或多次對業務狀態的影響是一致的

image-20250310203936288

那么有什么方法能夠確保業務的冪等性呢

方案一:為每條消息設置一個唯一的 id

給每個消息都設置一個唯一的 id,利用 id 區分是否是重復消息:

  1. 為每條消息都生成一個唯一的 id,與消息一起投遞給消費者
  2. 消費者接收到消息后處理自己的業務,業務處理成功后將消息 id 保存到數據庫
  3. 如果消費者下次又收到相同消息,先去數據庫查詢該消息對應的 id 是否存在,如果存在則為重復消息,放棄處理

可以在指定 MessageConverter 的具體類型時,同時為 MessageConverter 設置自動創建一個 messageId

@Bean
public MessageConverter jacksonMessageConvertor() {Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;
}

發送消息后,在 RabbitMQ 的控制臺可以看到,消息的 properties 屬性附帶了 messageId 信息

image-20250310204104246

但這種方式對業務有一定的侵入性

方案二:結合業務判斷 – 結合實例
兜底的解決方案

我們可以在交易服務設置定時任務,定期查詢訂單支付狀態,這樣即便 MQ 通知失敗,還可以利用定時任務作為兜底方案,確保訂單支付狀態的最終一致性

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/71886.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/71886.shtml
英文地址,請注明出處:http://en.pswp.cn/web/71886.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Windows中在VSCode/Cursor上通過CMake或launch文件配置CUDA編程環境

前置步驟 安裝符合GPU型號的CUDA Toolkit 配置好 nvcc 環境變量 安裝 Visual Studio 參考https://blog.csdn.net/Cony_14/article/details/137510909 VSCode 安裝插件 Nsight Visual Studio Code Edition 注意:不是vscode-cudacpp。若兩個插件同時安裝,…

Spark(8)配置Hadoop集群環境-使用腳本命令實現集群文件同步

一.hadoop的運行模式 二.scp命令————基本使用 三.scp命令———拓展使用 四.rsync遠程同步 五.xsync腳本集群之間的同步 一.hadoop的運行模式 hadoop一共有如下三種運行方式: 1. 本地運行。數據存儲在linux本地,測試偶爾用一下。我們上一節課使用…

聚焦兩會:科技與發展并進,賽逸展2025成創新新舞臺

在十四屆全國人大三次會議和全國政協十四屆三次會議期間,代表委員們圍繞多個關鍵議題展開深入討論,為國家未來發展謀篇布局。其中,技術競爭加劇與經濟轉型需求成為兩會焦點,將在首都北京舉辦的2025第七屆亞洲消費電子技術貿易展&a…

【音視頻】ffmpeg命令提取像素格式

1、提取YUV數據 提取yuv數據,并保持分辨率與原視頻一致 使用-pix_fmt或-pixel_format指定yuv格式提取數據,并保持原來的分辨率 ffmpeg -i music.mp4 -t "01:00" -pixel_format yuv420p music.yuv提取成功后,可以使用ffplay指定y…

【從零開始學習計算機科學】計算機體系結構(二)指令級并行(ILP)

【從零開始學習計算機科學】【從零開始學習計算機科學】計算機體系結構(二)指令級并行(ILP) ILP流水線(pipeline)流水線調度循環展開和循環流水循環展開。循環展開的具體步驟可以描述為,軟件流水(循環流水)。我們可以通過流水線的思想處理循環的執行,即不需要這一次的…

android edittext 防止輸入多個小數點或負號

有些英文系統的輸入法,或者定制輸入法。使用xml限制不了輸入多個小數點和多個負號。所以代碼來控制。 一、通過XML設置限制 <EditTextandroid:id="@+id/editTextNumber"android:layout_width="wrap_content"android:layout_height="wrap_conten…

2019年藍橋杯第十屆CC++大學B組真題及代碼

目錄 1A&#xff1a;組隊&#xff08;填空5分_手算&#xff09; 2B&#xff1a;年號字符&#xff08;填空5分_進制&#xff09; 3C&#xff1a;數列求值&#xff08;填空10分_枚舉&#xff09; 4D&#xff1a;數的分解&#xff08;填空10分&#xff09; 5E&#xff1a;迷宮…

從C#中的MemberwiseClone()淺拷貝說起

MemberwiseClone() 是 C# 中的一個方法&#xff0c;用于創建當前對象的淺拷貝&#xff08;shallow copy&#xff09;。它屬于 System.Object 類&#xff0c;因此所有 C# 對象都可以調用該方法。 1. MemberwiseClone() 的含義 淺拷貝&#xff1a;MemberwiseClone() 會創建一個新…

筆記六:單鏈表鏈表介紹與模擬實現

在他一生中&#xff0c;從來沒有人能夠像你們這樣&#xff0c;以他的視角看待這個世界。 ---------《尋找天堂》 目錄 文章目錄 一、什么是鏈表&#xff1f; 二、為什么要使用鏈表&#xff1f; 三、 單鏈表介紹與使用 3.1 單鏈表 3.1.1 創建單鏈表節點 3.1.2 單鏈表的頭插、…

尚硅谷爬蟲note15n

1. 多條管道 多條管道開啟&#xff08;2步&#xff09;&#xff1a; (1)定義管道類 &#xff08;2&#xff09;在settings中開啟管道 在pipelines中&#xff1a; import urllib.request # 多條管道開啟 #(1)定義管道類 #&#xff08;2&#xff09;在setti…

oracle檢查字段為空

在Oracle數據庫中&#xff0c;檢查字段是否為空通常涉及到使用IS NULL條件。如果你想查詢某個表中的字段是否為空&#xff0c;你可以使用SELECT語句結合WHERE子句來實現。這里有一些基本示例來展示如何進行這樣的查詢。 示例1: 檢查單個字段是否為空 假設你有一個表employees…

虛幻基礎:動畫層接口

文章目錄 動畫層&#xff1a;動畫圖表中的函數接口&#xff1a;名字&#xff0c;沒有實現。動畫層接口&#xff1a;由動畫藍圖實現1.動畫層可直接調用實現功能2.動畫層接口必須安裝3.動畫層默認使用本身實現4.動畫層也可使用其他動畫藍圖實現&#xff0c;但必須在角色藍圖中關聯…

HarmonyOS學習第18天:多媒體功能全解析

一、開篇引入 在當今數字化時代&#xff0c;多媒體已經深度融入我們的日常生活。無論是在工作中通過視頻會議進行溝通協作&#xff0c;還是在學習時借助在線課程的音頻講解加深理解&#xff0c;亦或是在休閑時光用手機播放音樂放松身心、觀看視頻打發時間&#xff0c;多媒體功…

緒論數據結構基本概念(刷題筆記)

&#xff08;一&#xff09;單選題 1.與數據元素本身的形式、相對位置和個數無關的是&#xff08;B&#xff09;【廣東工業大學2019年829數據結構】 A.數據存儲結構 B.數據邏輯結構 C.算法 D.操作 2.在數據結構的討論中把數據結構從邏輯上分為&#xff08;C&#xff09;【中國…

GPTQ - 生成式預訓練 Transformer 的精確訓練后壓縮

GPTQ - 生成式預訓練 Transformer 的精確訓練后壓縮 flyfish 曾經是 https://github.com/AutoGPTQ/AutoGPTQ 現在是https://github.com/ModelCloud/GPTQModel 對應論文是 《Accurate Post-Training Quantization for Generative Pre-trained Transformers》 生成式預訓練Tr…

git的使用方法

文章目錄 前言git簡介GIT的基本操作克隆倉庫 (Clone)獲取最新代碼 (Pull)提交代碼到遠程倉庫查看當前分支查看提交代碼的日志git config 配置用戶信息 GIT的實操 前言 git是一種軟件版本管理工具&#xff0c;在多人團隊軟件開發中地方非常重要。 類似與SVN&#xff0c;git工具…

php虛擬站點提示No input file specified時的問題及權限處理方法

訪問站點&#xff0c;提示如下 No input file specified. 可能是文件權限有問題&#xff0c;也可能是“.user.ini”文件路徑沒有配置對&#xff0c;最簡單的辦法就是直接將它刪除掉&#xff0c;還有就是將它設置正確 #配置成自己服務器上正確的路徑 open_basedir/mnt/qiy/te…

使用Langflow和AstraDB構建AI助手:從架構設計到與NocoBase的集成

本文由 Leandro Martins 編寫&#xff0c;最初發布于 Building an AI Assistant with Langflow and AstraDB: From Architecture to Integration with NocoBase。 引言 本文的目標是演示如何創建一個集成了 NocoBase、LangFlow 和 VectorDB 工具的 AI 助手。作為基礎&#xf…

6.聊天室環境安裝 - Ubuntu22.04 - elasticsearch(es)的安裝和使用

目錄 介紹安裝安裝kibana安裝ES客戶端使用 介紹 Elasticsearch&#xff0c; 簡稱 ES&#xff0c;它是個開源分布式搜索引擎&#xff0c;它的特點有&#xff1a;分布式&#xff0c;零配置&#xff0c;自動發現&#xff0c;索引自動分片&#xff0c;索引副本機制&#xff0c;res…

SSL VXN

SSL VPN是采用SSL&#xff08;Security Socket Layer&#xff09;/TLS&#xff08;Transport Layer Security&#xff09;協議來實現遠程接入的一種輕量級VPN技術,其基于B/S架構&#xff0c;免于安裝客戶端&#xff0c;相較與IPSEC有更高的靈活度和管理性&#xff0c;當隧道建立…