rabbitmq的高級特性

一.發送者的可靠性

1.生產者重試機制

修改publisher模塊的application.yaml文件

spring:rabbitmq:connection-timeout: 1s # 設置MQ的連接超時時間template:retry:enabled: true # 開啟超時重試機制initial-interval: 1000ms # 失敗后的初始等待時間multiplier: 1 # 失敗后下次的等待時長倍數,下次等待時長 = initial-interval * multipliermax-attempts: 3 # 最大重試次數

注意:

①當網絡不穩定的時候,利用重試機制可以有效提高消息發送的成功率。不過SpringAMQP提供的重試機制是 阻塞式 的重試,也就是說多次重試等待的過程中,當前線程是被阻塞的。
②如果對于業務性能有要求,建議禁用重試機制。如果一定要使用,請合理配置等待時長和重試次數,當然也可以考慮使用異步線程來執行發送消息的代碼。?

2.生產者確認機制

RabbitMQ提供了生產者消息確認機制,包括 Publisher Confirm 和 Publisher Return 兩種。在開啟確認機制的情況下,當生產者發送消息給MQ后,MQ會根據消息處理的情況返回不同的 回執

如何返回基本內容如下:

① 當消息投遞到MQ,但是路由失敗時,通過 Publisher Return 返回異常信息,同時通過 Publisher Confirm 返回ACK 的確認信息,代表投遞成功。
② 臨時消息投遞到了MQ,并且入隊成功,返回ACK,告知投遞成功。
③ 持久消息投遞到了MQ,并且入隊完成持久化,返回ACK ,告知投遞成功。
④ 其它情況都會返回NACK,告知投遞失敗。

其中 ack 和 nack 屬于 Publisher Confirm 機制,ack 是投遞成功;nack 是投遞失敗。而return 則屬于?Publisher Return?機制。默認兩種機制都是關閉狀態,需要通過配置文件來開啟。

①開啟生產者確認機制

在publisher模塊的 application.yaml 中添加配置:

spring:rabbitmq:publisher-confirm-type: correlated # 開啟publisher confirm機制,并設置confirm類型  publisher-returns: true # 開啟publisher return機制

這里 publisher-confirm-type 有三種模式可選:

① none:關閉confirm機制
② simple:同步阻塞等待MQ的回執
③ correlated:MQ異步回調返回回執

②定義ReturnCallback

每個 RabbitTemplate 只能配置一個 ReturnCallback,因此我們可以在配置類中統一設置。我們在publisher模塊定義一個配置類:

@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {private final RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("觸發return callback,");log.debug("exchange: {}", returned.getExchange());log.debug("routingKey: {}", returned.getRoutingKey());log.debug("message: {}", returned.getMessage());log.debug("replyCode: {}", returned.getReplyCode());log.debug("replyText: {}", returned.getReplyText());}});}
}
③定義ConfirmCallback

由于每個消息發送時的處理邏輯不一定相同,因此ConfirmCallback需要在每次發消息時定義。具體來說,是在調用RabbitTemplate中的convertAndSend方法時,多傳遞一個參數:

@Test
void testPublisherConfirm() {// 1.創建CorrelationDataCorrelationData cd = new CorrelationData(UUID.randomUUID().toString());// 2.給Future添加ConfirmCallbackcd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {// 2.1.Future發生異常時的處理邏輯,基本不會觸發log.error("send message fail", ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {// 2.2.Future接收到回執的處理邏輯,參數中的result就是回執內容if(result.isAck()){ // result.isAck(),boolean類型,true代表ack回執,false 代表 nack回執log.debug("發送消息成功,收到 ack!");}else{ // result.getReason(),String類型,返回nack時的異常描述log.error("發送消息失敗,收到 nack, reason : {}", result.getReason());}}});// 3.發送消息rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}

總結:Publisher Confirm?用來確認消息是否發送到MQ,而Publish Return 用來通知生產者哪些消息由于路由失敗沒有被接收

注意:

開啟生產者確認比較消耗MQ性能,一般不建議開啟。?

二.MQ的可靠性

1.數據持久化

交換機持久化,隊列持久化,消息持久化(先保存到內存在寫入磁盤),這三個持久化都是默認開啟的。如果消息類型是非持久化的,只有在消息隊列滿了后會被迫寫入磁盤。

總結:

持久化是持續將消息寫入磁盤,非持久化是當mq內存被使用完畢后才將消息寫入磁盤,因此性能較差。

2.LazyQueue

① 接收到(不論臨時還是持久的消息)消息后直接存入磁盤而非內存

② 消費者要消費消息時才會從磁盤中讀取并加載到內存(也就是懶加載)(如果消費者的速度很快也會把消息提前緩存到內存)
③ 支持數百萬條的消息存儲

這種模式屬于數據持久化的升級版。

從RabbitMQ的3.6.0版本開始,就增加了Lazy Queues的模式,而在3.12版本之后,LazyQueue已經成為所有隊列的默認格式。

三.消費者的可靠性

1.消費者確認機制

為了確認消費者是否成功處理消息,RabbitMQ提供了消費者確認機制(Consumer Acknowledgement)。即:當消費者處理消息結束后,應該向RabbitMQ發送一個回執,告知RabbitMQ自己消息處理狀態。回執有三種可選值:

? - ack:成功處理消息,RabbitMQ從隊列中刪除該消息
? - nack:消息處理失敗,RabbitMQ需要再次投遞消息
? - reject:消息處理失敗并拒絕該消息,RabbitMQ從隊列中刪除該消息

由于消息回執的處理代碼比較統一,因此SpringAMQP幫我們實現了消息確認。并允許我們通過配置文件設置ACK處理方式,有三種模式:

? ? ?①none:不處理。即消息投遞給消費者后立刻ack,消息會立刻從MQ刪除。非常不安全,不建議使用
? ? ?②manual:手動模式。需要自己在業務代碼中調用api,發送 ack 或 reject ,存在業務入侵,但更靈活。
? ? ?③auto:自動模式。SpringAMQP利用AOP對我們的消息處理邏輯做了環繞增強,當業務正常執行時則自動返回 ack. ?當業務出現異常時,根據異常判斷返回不同結果:

  • 如果是 業務異常,會自動返回 nack,消息處理失敗后,會回到RabbitMQ,并重新投遞到消費者。
  • 如果是 消息處理或校驗異常,自動返回 reject。

?通過下面的配置可以修改SpringAMQP的ACK處理方式:

spring:rabbitmq:listener:simple:acknowledge-mode: none # 不做處理

2.失敗重試機制(對消費者確認機制的增強)

如果上面的代碼一直返回nack會導致無線循環。

所以我們配置消費者自己重試,如果超過了配置重試的次數,就會返回reject

修改consumer服務的application.yml文件,添加內容:

spring:rabbitmq:listener:simple:retry:enabled: true # 開啟消費者失敗重試initial-interval: 1000ms # 初識的失敗等待時長為1秒multiplier: 1 # 失敗的等待時長倍數,下次等待時長 = multiplier * last-intervalmax-attempts: 3 # 最大重試次數stateless: true # true無狀態;false有狀態。如果業務中包含事務,這里改為false

配置之后出現的現象:

- 消費者在失敗后消息沒有重新回到MQ無限重新投遞,而是在本地重試了3次
- 本地重試3次以后,拋出了 AmqpRejectAndDontRequeueException 異常。查看RabbitMQ控制臺,發現消息被刪除了,說明最后SpringAMQP返回的是 reject?

3.自定義失敗處理策略(對消費者重試機制的增強)

有上面失敗之后是直接返回的reject,這可能并不是我們想要返回的結果,于是我們有了失敗處理策略。

因此Spring允許我們自定義重試次數耗盡后的消息處理策略,這個策略是由MessageRecovery 接口來定義的,它有3個不同實現:

? ①RejectAndDontRequeueRecoverer:重試耗盡后,直接 reject ,丟棄消息。默認就是這種方式 (黑馬說不可選,下面兩個可選)
? ②ImmediateRequeueMessageRecoverer:重試耗盡后,返回 nack ,消息重新入隊?
? ③RepublishMessageRecoverer :重試耗盡后,將失敗消息投遞到指定的交換機,然后轉入到指定的隊列,后續由人工集中處理。

代碼實現:

@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue", true);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error"); //這里的with是routingkey}// 這個是配置消息處理失敗之后投入到那個交換機@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

4.確保業務冪等性(解決消費者重復消費的問題)

冪等性:指同一個業務,執行一次或多次對業務狀態的影響是一致的。

數據的刪除,查詢一般是冪等的,但是修改和新增不是冪等的,案例如下:

所以,我們要盡可能避免業務被重復執行。

為了解決上面圖中的問題我們有了如下的解決方案:

①使用唯一消息id

給每一個消息都設置一個唯一ID,用ID區分是否被消費過。當我們消費一個消息的時候,先在數據庫中查詢是否存在這個數據的ID,如果不存在就消費。如果存在就說明這個消息之前被消費過。

給消息設置唯一id:這就是在配置消息轉換器的時候添加了一點代碼

@Bean
public MessageConverter messageConverter(){// 1.定義消息轉換器Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();// 2.配置自動創建消息id,用于識別不同消息,也可以在業務中基于ID判斷是否是重復消息jjmc.setCreateMessageIds(true);return jjmc;
}

?獲取消息的id:

@RabbitListener(queues = "simple.queue")
public void listensimpleQueue(Message message) {log.info("監聽到simple.queue的消息:ID:【{}】", message.getMessageProperties().getMessageId());log.info("監聽到simple.queue的消息:【{}】", new String(message.getBody()));
}

缺點:

?但是這種方法也有自己的缺點,也就是對業務邏輯有侵入性,而且還有額外的數據庫操作。?

②根據業務判斷

根據上面出現非冪等性問題,我們可以把業務邏輯改成這樣就可以保證業務的冪等性了。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/84712.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/84712.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/84712.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

北京大學肖臻老師《區塊鏈技術與應用》公開課:02-BTC-密碼學原理

文章目錄 1.比特幣中用到的密碼學的功能2. hash3. 簽名 1.比特幣中用到的密碼學的功能 比特幣中用到密碼學中兩個功能&#xff1a; hash、 簽名。 2. hash hash函數的三個特性&#xff1a;抗碰撞性&#xff08;Collision Resistance&#xff09;、隱蔽性&#xff08;Hiding&…

Spring Cloud Gateway高并發限流——基于Redis實現方案解析

本文是一個基于 Spring Cloud Gateway 的分布式限流方案&#xff0c;使用Redis Lua實現高并發場景下的精準流量控制。該方案支持動態配置、多維度限流&#xff08;API路徑/IP/用戶&#xff09;&#xff0c;并包含完整的代碼實現和性能優化建議。 一、架構設計 #mermaid-svg-vg…

SpringAI--RAG知識庫

SpringAI–RAG知識庫 RAG概念 什么是RAG&#xff1f; RAG(Retrieval-Augmented Genreation&#xff0c;檢索增強生成)是一種結合信息檢索技術和AI內容生成的混合架構&#xff0c;可以解決大模型的知識時效性限制和幻覺問題。 RAG在大語言模型生成回答之前&#xff0c;會先從…

【PhysUnits】14 二進制數的標準化表示(standardization.rs)

一、源碼 這段代碼主要用于處理二進制數的標準化表示。它定義了兩個特質(trait) IfB0 和 IfB1&#xff0c;以及它們的實現&#xff0c;用于處理二進制數的前導零及前導一的簡化。 use super::basic::{B0, B1, Z0, N1, Integer, NonZero, NonNegOne};/// 處理 B0<H> 類型…

將 ubutun 的網絡模式 從NAT 改到 橋接模式后,無法上網,linux 沒有IP地址 的解決方案

首先要將 ubutun 的網絡模式設置為橋接模式 這里再從 NAT 模式改動成 橋接模式的時候&#xff0c;還出現了一個問題。改成橋接模式后&#xff0c;linux沒有ip地址了。原因是 不知道什么時候 將 虛擬網絡編輯器 中的值改動了 要選擇這個 自動 選項

多模態大語言模型arxiv論文略讀(九十)

Hybrid RAG-empowered Multi-modal LLM for Secure Data Management in Internet of Medical Things: A Diffusion-based Contract Approach ?? 論文標題&#xff1a;Hybrid RAG-empowered Multi-modal LLM for Secure Data Management in Internet of Medical Things: A Di…

電腦主板VGA長亮白燈

電腦主板VGA長亮白燈 起因解決方法注意事項&#xff1a; 起因 搬家沒有拆機整機在車上晃蕩導致顯卡松動接觸不良&#xff08;一般VGA長亮白燈都和顯卡有關&#xff0c;主要排查顯卡&#xff09; 解決方法 將顯卡拆下重新安裝即可 注意事項&#xff1a; 不可直接拔下顯卡&a…

【監控】pushgateway中間服務組件

Pushgateway 是 Prometheus 生態中的一個中間服務組件&#xff0c;以獨立工具形式存在&#xff0c;主要用于解決 Prometheus 無法直接獲取監控指標的場景&#xff0c;彌補其定時拉取&#xff08;pull&#xff09;模式的不足。 其用途如下&#xff1a; 突破網絡限制&#xff1…

打造AI智能旅行規劃器:基于LLM和Crew AI的Agent實踐

引言 今天來學習大佬開發的一個AI驅動的旅行規劃應用程序&#xff0c;它能夠自動處理旅行規劃的復雜性——尋jni找航班、預訂酒店以及優化行程。傳統上&#xff0c;這個過程需要手動搜索多個平臺&#xff0c;常常導致決策效率低下。 通過利用**代理型人工智能&#xff08;Age…

21. 自動化測試框架開發之Excel配置文件的測試用例改造

21. 自動化測試框架開發之Excel配置文件的測試用例改造 一、測試框架核心架構 1.1 組件依賴關系 # 核心庫依賴 import unittest # 單元測試框架 import paramunittest # 參數化測試擴展 from chap3.po import * # 頁面對象模型 from file_reader import E…

如何在電力系統中配置和管理SNTP時間同步?

在電力系統中配置和管理 SNTP 時間同步需結合行業標準&#xff08;如《DL/T 1100.1-2019》&#xff09;和分層架構特點&#xff0c;確保安全性、可靠性和精度適配。以下是具體操作指南&#xff0c;涵蓋架構設計、設備配置、安全管理、運維監控四大核心環節&#xff0c;并附典型…

MTK-關于HW WCN的知識講解

前言: 最近做項目過程中和硬件打交道比較多,現在關于整理下硬件的HW wcn的知識點 一 MTK常見的MT6631 Wi-Fi 2.4GHz 匹配調諧指南 ?拓撲結構選擇? 推薦采用并聯電容拓撲(?shunt cap topology?)代替并聯電感拓撲(?shunt inductor topology?),以減少潛在電路設計…

(1)課堂 1--5,這五節主要講解 mysql 的概念,定義,下載安裝與卸載

&#xff08;1&#xff09;謝謝老師&#xff1a; &#xff08;2&#xff09;安裝 mysql &#xff1a; &#xff08;3&#xff09;鏡像下載 &#xff0c;這個網址很好 &#xff1a; &#xff08;4&#xff09; 另一個虛擬機的是 zhang 123456 &#xff1a; 接著配置…

U-Boot ARMv8 平臺異常處理機制解析

入口點&#xff1a;arch/arm/cpu/armv8/start.S 1. 判斷是否定義了鉤子&#xff0c;如有則執行&#xff0c;否則往下走。執行save_boot_params&#xff0c;本質就是保存一些寄存器的值。 2. 對齊修復位置無關碼的偏移 假設U-Boot鏈接時基址為0x10000&#xff0c;但實際加載到0…

mysql安裝教程--筆記

一、Windows 系統安裝 方法1&#xff1a;使用 MySQL Installer&#xff08;推薦&#xff09; 1. 下載安裝包 訪問 MySQL 官網下載頁面&#xff0c;選擇 MySQL Installer for Windows。 2. 運行安裝程序 雙擊下載的 .msi 文件&#xff0c;選擇安裝類型&#xff1a; ? Developer…

投資策略規劃最優決策分析

目錄 一、投資策略規劃問題詳細 二、存在最優投資策略&#xff1a;每年都將所有錢投入到單一投資產品中 &#xff08;一&#xff09;狀態轉移方程 &#xff08;二&#xff09;初始條件與最優策略 &#xff08;三&#xff09;證明最優策略總是將所有錢投入到單一投資產品中…

NGINX HTTP/3 實驗指南安裝、配置與調優

一、HTTP/3 簡介 基于 QUIC&#xff1a;在 UDP 之上實現的多路復用傳輸&#xff0c;內置擁塞控制與前向糾錯&#xff0c;無需三次握手即可恢復連接。零 RTT 重連&#xff1a;借助 TLS 1.3&#xff0c;實現連接恢復時的 0-RTT 數據發送&#xff08;視底層庫支持&#xff09;。多…

編程日志5.28

string賦值操作 算法: #include<iostream> using namespace std; int main() { //1.字符串常量的賦值 string s1; s1 = "英雄哪里出來"; cout << s1 << endl; //2.字符串變量的賦值 string s2; s2 = s1; cout <…

AE的ai圖層導到Ai

AE的ai圖層導到ai 解決方法: 1、打開ai軟件&#xff0c;不用新建&#xff0c;留在那就行。 2、在AE里選中任意一個ai文件圖層&#xff0c;只需同時按住ctrl和英文字母鍵&#xff0c;圖層就會自動全部導入到ai中 英文字母鍵的詳情可以參考&#xff1a;http://www.yayihouse.co…

【Springboot+LangChain4j】Springboot項目集成LangChain4j(下)

前置條件&#xff1a;根據上篇文章完成springboot初步集成LangChain4j 【SpringbootLangChain4j】根據LangChain4j官方文檔&#xff0c;三分鐘完成Springboot項目集成LangChain4j&#xff08;上&#xff09;-CSDN博客 但是接口方法中&#xff0c;關于大模型的配置都是寫死的&a…