RabbitMQ核心特性——重試、TTL、死信隊列

一、重試機制

? ? ? 在消息傳輸過程中,可能遇到各種問題,如網絡故障,服務器不可用等,這些問題可能導致消息處理失敗,因此RabbitMQ提供了重試機制,允許消息處理失敗后重新發送,但是,如果是因為程序邏輯發生的錯誤,那么重試多次也是無用的,因此重試機制可以設置重試次數。

1.1 重試配置

spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: auto #消息接收確認 retry:enabled: true # 開啟消費者失敗重試 initial-interval: 5000ms # 初始失敗等待時?為5秒 max-attempts: 5 # 最?重試次數(包括??消費的?次) 

2.2 配置交換機、隊列

(1)配置交換機、隊列、及綁定關系:

    /** 重試機制*/@Bean("retryQueue")public Queue retryQueue(){return QueueBuilder.durable(Constants.RETRY_QUEUE).build();}@Bean("retryExchange")public DirectExchange retryExchange(){return ExchangeBuilder.directExchange(Constants.RETRY_EXCHANGE).build();}@Bean("retryBinding")public Binding retryBinding(@Qualifier("retryQueue") Queue queue,@Qualifier("retryExchange") DirectExchange directExchange){return BindingBuilder.bind(queue).to(directExchange).with("retry");}

(2)生產者

    /** 重試機制*/@RequestMapping("/retry")public String retry(){rabbitTemplate.convertAndSend(Constants.RETRY_EXCHANGE,"retry","retry test...");return "消息發送成功";}

(3)消費者

@Component
public class RetryListener {@RabbitListener(queues = Constants.RETRY_QUEUE)public void handlerMessage(Message message) throws UnsupportedEncodingException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("[" + Constants.RETRY_QUEUE + "]接收到消息: %s," +" deliveryTag: %S \n",new String(message.getBody(),"UTF-8"),deliveryTag);int num = 10/0;System.out.println("業務處理完成");} 
}

2.3 測試

? ? ?可以看到,重試后還是未能正常消費消息,拋出異常,需要注意的是,如果手動處理異常,是不會觸發重試的,如:

@Component
public class RetryListener {@RabbitListener(queues = Constants.RETRY_QUEUE)public void handlerMessage(Message message) throws UnsupportedEncodingException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try{System.out.printf("[" + Constants.RETRY_QUEUE + "]接收到消息: %s," +" deliveryTag: %S \n",new String(message.getBody(),"UTF-8"),deliveryTag);int num = 10/0;System.out.println("業務處理完成");}catch (Exception e){System.out.println("業務處理失敗");}}

再次測試代碼:

沒有觸發重試


2.4 重試注意事項

1. 自動確認模式 : 程序邏輯異常, 多次重試還是失敗, 消息就會被自動確認, 那么消息就丟失

2. 手動確認模式:程序邏輯異常, 多次重試消息依然處理失敗, 無法被確認, 就?直是 unacked的狀態, 導致消息積壓


?二、TTL 機制

? ? ? ?TTL 即 Time To Live(過期時間), RabbitMQ可以對消息或隊列設置過期時間,當消息過期后,就會被自動清除,無論是對消息設置TTL還是對隊列設置TTL,本質上都是設置消息的TTL

?

2.1 設置消息的TTL

一、準備工作(聲明隊列、交換機)

//未設置TTL的queue@Bean("ttlQueue")public Queue ttlQueue(){return QueueBuilder.durable(Constants.TTL_QUEUE).build();}@Bean("ttlExchange")public DirectExchange ttlExchange(){return ExchangeBuilder.directExchange(Constants.TTL_exchange).build();}@Bean("ttlBinding")public Binding ttlBinding(@Qualifier("ttlQueue") Queue queue,@Qualifier("ttlExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with("ttl");}

?二、如何設置消息的TTL

設置消息的TTL是在發送消息是設置的,通過下面這個方法來發送:

	public void convertAndSend(String exchange, String routingKey, final Object message,final MessagePostProcessor messagePostProcessor)

? ? ?這個方法比前面使用的方法多了一個參數,就是通過這個messagePostProcessor來設置消息的TTL,只需要在發送消息前,構造一個MessagePostProcesser對象并傳入即可:

    @RequestMapping("/ttl")public String ttl(){MessagePostProcessor messagePostProcessor = message -> {message.getMessageProperties().setExpiration("10000");return message;};rabbitTemplate.convertAndSend(Constants.TTL_exchange,"ttl" ,"ttl test 10s...",messagePostProcessor);return "消息發送成功";}

三、測試代碼

10s后:

消息已近被清除


四、設置消息的TTL存在的問題

? ? ?大家都知道,隊列滿足先進先出的特性,那么如果先發送一條TTL為30s的消息,再發送一條TTL為10s的消息,那么當10s后,后進隊列的那條消息是否會被移除?

? ? 不妨測試一下:

    @RequestMapping("/ttl")public String ttl(){MessagePostProcessor messagePostProcessor = message -> {message.getMessageProperties().setExpiration("10000");return message;};MessagePostProcessor messagePostProcessor2 = message -> {message.getMessageProperties().setExpiration("30000");return message;};rabbitTemplate.convertAndSend(Constants.TTL_exchange,"ttl" ,"ttl test 30s...",messagePostProcessor2);rabbitTemplate.convertAndSend(Constants.TTL_exchange,"ttl" ,"ttl test 10s...",messagePostProcessor);return "消息發送成功";}

? ? 運行程序:

為了解決這個問題,我們可以對隊列設置TTL


2.2 設置隊列的TTL

?一、聲明隊列,綁定交換機(綁定在前面聲明的交換機上即可)

 //設置TTL的queue@Bean("ttlQueue2")public Queue ttlQueue2(){return QueueBuilder.durable(Constants.TTL_QUEUE2).ttl(20*1000).build();//設置隊列TTL為20s}@Bean("ttlBinding2")public Binding ttlBinding2(@Qualifier("ttlQueue2") Queue queue,@Qualifier("ttlExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with("ttl2");}

二、如何對隊列設置TTL?

其實在上面聲明隊列時已經設置了,只需要在聲明隊列時通過 ttl方法 設置即可。


三、在TTL隊列中存放為設置TTL的消息,消息是否移除?

? ?生產者代碼:

    @RequestMapping("/ttl")public String ttl(){rabbitTemplate.convertAndSend(Constants.TTL_exchange,"ttl2" ,"ttl2 test...");return "消息發送成功";}

? ?測試:

? ?可以看到,消息同樣會過期


四、隊列TTL為20s,消息TTL為10s,消息什么時候過期?

? ?生產者代碼:

    @RequestMapping("/ttl")public String ttl(){MessagePostProcessor messagePostProcessor = message -> {message.getMessageProperties().setExpiration("10000");return message;};rabbitTemplate.convertAndSend(Constants.TTL_exchange,"ttl" ,"ttl test 10s...",messagePostProcessor);rabbitTemplate.convertAndSend(Constants.TTL_exchange,"ttl2" ,"ttl2 test...");return "消息發送成功";}

? ?測試:


三、死信隊列

?3.1 什么是死信

? ? ?由于各種原因,導致的無法被消費的消息,就是死信,死信隊列就是用來存儲死心的隊列,當一個消息在隊列中變成死信后,可以被重新發送到另一個交換機DLX(Dead Letter Exchange)中,這個交換機綁定的隊列就是死信隊列DLQ(Dead Letter Queue)。

? ? 消息變成死信有以下幾種原因:

1> 消息過期

2> 消息被拒絕 ,且requeue參數置為false

3> 隊列達到最大長度


3.2 死信代碼示例

一、聲明隊列、交換機及綁定關系

    @Bean("normalQueue")public Queue normalQueue(){return QueueBuilder.durable(Constants.NORMAL_QUEUE).ttl(10000).deadLetterExchange(Constants.DL_EXCHANGE).deadLetterRoutingKey("dl").build();}@Bean("normalExchange")public DirectExchange normalExchange(){return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build();}@Bean("normalBinding")public Binding normalBinding(@Qualifier("normalQueue") Queue queue,@Qualifier("normalExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with("normal");}@Bean("dlQueue")public Queue dlQueue(){return QueueBuilder.durable(Constants.DL_QUEUE).build();}@Bean("dlExchange")public DirectExchange dlExchange(){return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();}@Bean("dlBinding")public Binding dlBinding(@Qualifier("dlQueue") Queue queue,@Qualifier("dlExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with("dl");}

? ? ?上面在聲明了普通交換機、隊列以及死信交換機、隊列,還要聲明普通隊列與死信交換機的關系(確保消息變成死信后會通過死信交換機路由到死信隊列),只需要在聲明普通隊列時通過 deadLetterExchange 和 deadLetterRoutingKey 綁定即可。


二、測試由于消息過期而導致的死信

? ? 前面在聲明normalQueue時已經通過ttl方法設置了過期時間,所以只需編寫生產者代碼即可:

    @RequestMapping("/dl")public String dl(){rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","dl test...");return "消息發送成功";}

? ? 運行程序,測試:


三、測試由于消息被拒絕導致的死信

? ? 編寫消費者代碼:

@Component
public class DlListener {@RabbitListener(queues = Constants.NORMAL_QUEUE)public void handlerMessage1(Message message, Channel channel) throws IOException {Long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.printf("接收到消息:%s,deliveryTag: %d \n", new String(message.getBody(), "UTF-8"), message.getMessageProperties().getDeliveryTag());//業務邏輯處理System.out.println("業務邏輯處理");int num = 10 / 0;System.out.println("業務處理完成");channel.basicAck(deliveryTag, false);} catch (Exception e) {//!!!注意requeue一定要置為false才能變成死信System.out.println("業務處理失敗");channel.basicNack(deliveryTag, false, false);}}@RabbitListener(queues = Constants.DL_QUEUE)public void handlerMessage2(Message message, Channel channel) throws IOException {Long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("死信隊列接收到消息:%s,deliveryTag: %d \n", new String(message.getBody(), "UTF-8"), message.getMessageProperties().getDeliveryTag());channel.basicAck(deliveryTag,false);}
}

? ? 測試:


四、測試由于隊列達到最大長度導致的死信

? ? 修改normal隊列的聲明(添加一個maxLength方法指定隊列最大長度):

    @Bean("normalQueue")public Queue normalQueue(){return QueueBuilder.durable(Constants.NORMAL_QUEUE).ttl(10000).maxLength(1).deadLetterExchange(Constants.DL_EXCHANGE).deadLetterRoutingKey("dl").build();}

? ? 重新編寫生產者代碼,連續發送10條消息:

 @RequestMapping("/dl")public String dl(){for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","dl test..." + i);}return "消息發送成功";}

? ? 由于隊列的最大長度為1,因此應該有9條消息進入死信隊列,測試:

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/84276.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/84276.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/84276.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

MVCC實現原理

MVCC的基本概念 MVCC&#xff0c;一個數據的多個版本&#xff0c;使得讀寫操作沒有沖突。 在多個事務并發的情況下&#xff0c;確定到底要訪問哪個版本。 MVCC實現原理 MVCC實現依賴于隱式字段&#xff0c;undo log日志&#xff0c;readView 隱式字段 在mysql用戶自定義的…

湖北理元理律師事務所債務優化方案解析:如何科學規劃還款保障生活質量

在當前經濟環境下&#xff0c;債務問題已成為困擾許多家庭的重要難題。據相關統計數據顯示&#xff0c;我國個人負債率呈現逐年上升趨勢&#xff0c;如何合理規劃還款、保障基本生活質量成為亟待解決的社會問題。湖北理元理律師事務所基于多年實務經驗&#xff0c;研發出一套科…

ffmpeg 轉換視頻格式

使用FFmpeg將視頻轉換為MP4格式的常用命令&#xff1a; ffmpeg -i input.mov -c:v libx264 -crf 23 -c:a aac output.mp4 -i input.avi&#xff1a;指定輸入文件 -c:v libx264&#xff1a;使用H.264視頻編碼器 -crf 23&#xff1a;控制視頻質量&#xff08;范圍18-28&#…

LLM Tuning

Lora-Tuning 什么是Lora微調&#xff1f; LoRA&#xff08;Low-Rank Adaptation&#xff09; 是一種參數高效微調方法&#xff08;PEFT, Parameter-Efficient Fine-Tuning&#xff09;&#xff0c;它通過引入低秩矩陣到預訓練模型的權重變換中&#xff0c;實現無需大規模修改…

實現tdx-hs300-mcp

文章目錄 項目簡介功能說明使用方法配置說明項目簡介 tdx-hs300-mcp是一個Model Context Protocol (MCP)的服務 功能說明 下載數據自動保存為CSV格式文件使用方法 確保已安裝Python 3.7+和依賴庫: pip install pytdx fastapi uvicorn啟動MCP服務: mcp run MCP.py使用MCP工具…

《100天精通Python——基礎篇 2025 第20天:Thread類與線程同步機制詳解》

目錄 一、概念簡單回顧二、Python的線程開發2.1 Thread類2.1.1 線程啟動2.1.2 線程退出2.1.3 線程的傳參2.1.4 threading的屬性和方法2.1.5 Thread實例的屬性和方法2.1.6 start和run方法 2.2 多線程2.3 線程安全2.4 daemon線程2.5 threading.local類2.6 __slots__拓展 三、線程…

【web應用】前后端分離開源項目聯調運行的過程步驟ruoyi

文章目錄 ?前言?一、項目運行環境準備?二、數據庫創建&#x1f31f;1、新建數據庫&#x1f31f;2、導入數據腳本 ?三、運行后端項目&#x1f31f;1、打開后端項目&#x1f31f;2、后端項目配置項修改 ?四、運行前端項目VUE3&#x1f31f;1、在IDEA另一個窗口中打開前端項目…

【深度剖析】三一重工的數字化轉型(下篇1)

在數字經濟持續發展的背景下,企業數字化轉型方案成為實現轉型的關鍵。不同行業內的企業因轉型動機和路徑的差異,其轉型成效也各異。三一重工作為機械制造行業的領軍企業,較早地實施了數字化轉型,并積累了豐富的經驗。本研究選取三一重工作為案例,通過梳理相關文獻,對其數…

Nacos適配GaussDB超詳細部署流程

1部署openGauss 官方文檔下載 https://support.huaweicloud.com/download_gaussdb/index.html 社區地址 安裝包下載 本文主要是以部署輕量級為主要教程 1.1系統環境準備 操作系統選擇 系統AARCH64X86-64openEuler√√CentOS7√Docker√√1.2軟硬件安裝環境 版本輕量版(單…

國際前沿知識系列五:時間序列建模方法在頭部撞擊運動學測量數據降噪中的應用

目錄 國際前沿知識系列五&#xff1a;時間序列建模方法在頭部撞擊運動學測量數據降噪中的應用 一、引言 二、時間序列建模方法 &#xff08;一&#xff09;ARIMA 模型 &#xff08;二&#xff09;指數平滑法 &#xff08;三&#xff09;小波變換 三、實際案例分析 &…

線性代數中的向量與矩陣:AI大模型的數學基石

&#x1f9d1; 博主簡介&#xff1a;CSDN博客專家、CSDN平臺優質創作者&#xff0c;高級開發工程師&#xff0c;數學專業&#xff0c;10年以上C/C, C#, Java等多種編程語言開發經驗&#xff0c;擁有高級工程師證書&#xff1b;擅長C/C、C#等開發語言&#xff0c;熟悉Java常用開…

第十七次CCF-CSP算法(含C++源碼)

第十七次CCF-CSP認證 小明種蘋果AC代碼 小明種蘋果&#xff08;續&#xff09;AC代碼 后面好難哈哈 小手冰涼 小明種蘋果 輸入輸出&#xff1a; 題目鏈接 AC代碼 #include<iostream> using namespace std; int n,m; int res,res3; int sum; int res21; int main(){cin …

curl常用指令

curl使用記錄 curl常用指令安裝請求get請求post請求錯誤排查 curl常用指令 安裝 sudo apt update sudo apt install curl -y請求 get請求 curl [URL]如果能正常請求&#xff0c;則會返回正常的頁面信息 post請求 發送 JSON 數據? curl -X POST [URL] -H "Content-…

C++ 輸入輸出流示例代碼剖析

一、開篇&#xff1a;代碼核心概述 本文圍繞一段融合輸入輸出流操作、自定義類型重載、文件讀寫的C代碼展開&#xff0c;深入探究其底層原理與實踐應用。代碼通過類型轉換、操作符重載等技術&#xff0c;實現自定義類型與標準輸入輸出流的交互&#xff0c;同時借助文件流完成數…

常見嵌入式軟件架構

常見的嵌入式軟件架構 一、ASW文件夾&#xff08;Application Software&#xff0c;應用軟件&#xff09;定義與作用常見子目錄結構特點 二、BSP文件夾&#xff08;Board Support Package&#xff0c;板級支持包&#xff09;定義與作用常見子目錄結構特點 三、OS文件夾&#xf…

【PostgreSQL】數據探查工具1.0研發可行性方案

?? 點擊關注不迷路 ?? 點擊關注不迷路 ?? 點擊關注不迷路 想搶先解鎖數據自由的寶子,速速戳我!評論區蹲一波 “蹲蹲”,揪人嘮嘮你的超實用需求! 【PostgreSQL】數據探查工具1.0研發可行性方案,數據調研之秒解析數據結構,告別熬夜寫 SQL【PostgreSQL】數據探查工具…

Lambda表達式與匿名內部類的對比詳解

Lambda表達式與匿名內部類的對比詳解 1. 語法簡潔性 Lambda表達式&#xff1a; 僅適用于函數式接口&#xff08;只有一個抽象方法的接口&#xff09;&#xff0c;語法簡潔。 示例&#xff1a; Runnable r () -> System.out.println("Hello Lambda");匿名內部類&…

Seata Server 1.6.1 高可用部署終極指南:Nacos配置中心+DB存儲+多實例實戰

文章目錄 高可用 - 關鍵因素存儲模式配置中心注冊中心高可用 - 步驟第 1 步:使用 db 作為存儲模式第 2 步:使用 Nacos 配置中心自定義 seata-server 配置添加 seata-server.properties 到 Nacos第 3 步:修改 application.yml使用 Nacos 作為配置中心使用 Nacos 作為注冊中心…

JS 中判斷 null、undefined 與 NaN 的權威方法及場景實踐

在 JavaScript 中&#xff0c;null、undefined 和 NaN 是三個特殊的「非正常值」&#xff0c;正確判斷它們是保證代碼健壯性的關鍵。本文結合 ECMA 規范與 MDN 權威文檔&#xff0c;系統梳理三者的判斷方法、原理及典型場景&#xff0c;幫助開發者規避常見誤區。 一、理解三個…

基于DenseNet的醫學影像輔助診斷系統開發教程

本文源碼地址: https://download.csdn.net/download/shangjg03/90873921 1. 簡介 本教程將使用DenseNet開發一個完整的醫學影像輔助診斷系統,專注于胸部X光片的肺炎檢測。我們將從環境搭建開始,逐步介紹數據處理、模型構建、訓練、評估以及最終的系統部署。 2. 環境準備<…