Spring-rabbit重試消費源碼分析

在集成RabbitMQ與Spring Boot 3.1.x時,RetryOperationsInterceptor 是實現消息重試機制的關鍵組件。這里將深入分析 RetryOperationsInterceptor 的工作原理,尤其是在消費者消費失敗時的行為,并結合底層源碼進行詳解。

一、配置解析

首先,需要提供 RetryOperationsInterceptor 配置如下:

// 配置重試攔截器
RetryOperationsInterceptor retryInterceptor = RetryInterceptorBuilder.stateless().maxAttempts(3) // 初次消費 + 2次重試.backOffOptions(1000, 2.0, 10000) // 初始間隔1秒,倍增因子2.0,最大間隔10秒.recoverer(new RepublishMessageRecoverer(rabbitTemplate,DEAD_LETTER_EXCHANGE,DEAD_LETTER_ROUTING_KEY)).build();

這段配置的含義如下:

  • maxAttempts(3): 設置最大嘗試次數為3次,包括初次消費和2次重試。
  • backOffOptions(1000, 2.0, 10000): 設置重試的間隔策略,初始間隔為1秒,倍增因子為2.0,最大間隔為10秒。
  • recoverer: 當所有重試嘗試失敗后,使用 RepublishMessageRecoverer 將消息轉發到指定的死信交換機和路由鍵。

二、RetryOperationsInterceptor 的工作原理

RetryOperationsInterceptor 是Spring Retry提供的攔截器,用于在方法執行失敗時自動進行重試。結合Spring AMQP(RabbitMQ)的消息監聽器容器,它能夠在消息處理失敗時執行重試邏輯。

1. 消息消費流程

當RabbitMQ消費者接收到消息時,以下步驟會依次執行:

  1. 消息接收: 消息被送到監聽方法 onMessage
  2. 消息處理: 執行 processMessage(message) 方法進行業務處理。
  3. 成功確認: 如果處理成功,消息被確認(ACK)。
  4. 處理失敗: 如果拋出異常,觸發重試機制。

2. 重試攔截器的作用

processMessage 方法拋出異常時,RetryOperationsInterceptor 會攔截這個異常,并按照配置的重試策略進行重試。具體流程如下:

  1. 攔截異常: 異常被 RetryOperationsInterceptor 捕獲。
  2. 執行重試: 根據 backOffOptions 設置的間隔和倍增因子,等待指定時間后重新執行 onMessage 方法。
  3. 重試次數限制: 如果重試次數未超過 maxAttempts,則繼續重試;否則,執行 recoverer 邏輯。

3. 重試邏輯的底層實現

從源碼角度看,RetryOperationsInterceptor 主要依賴于 RetryTemplate 來執行重試邏輯。以下是關鍵步驟:

  1. 構建 RetryTemplate:

    RetryTemplate retryTemplate = new RetryTemplate();
    SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
    retryPolicy.setMaxAttempts(3);
    retryTemplate.setRetryPolicy(retryPolicy);ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(1000);
    backOffPolicy.setMultiplier(2.0);
    backOffPolicy.setMaxInterval(10000);
    retryTemplate.setBackOffPolicy(backOffPolicy);
    
  2. 執行重試:

    retryTemplate.execute(context -> {// 調用實際的消息處理方法onMessage(message);return null;
    }, context -> {// 重試失敗后的回調recoverer.recover(message, context.getLastThrowable());return null;
    });
    
  3. 異常處理與恢復:

    • 每次重試失敗時,RetryTemplate 會根據 BackOffPolicy 計算下次重試的等待時間。
    • 如果所有重試次數都失敗,則調用 RepublishMessageRecoverer 將消息發送到死信隊列。

三、與 @RabbitListener 的集成

在您的消費者代碼中:

@RabbitListener(queues = RabbitConfig.MAIN_QUEUE, containerFactory = "rabbitListenerContainerFactory")
public void onMessage(String message) throws Exception {LOGGER.info("接收到消息: {}", message);// 處理消息processMessage(message);LOGGER.info("消息處理成功并確認: {}", message);
}

這里的關鍵在于 containerFactory 的配置,確保 RetryOperationsInterceptor 被正確應用到消息監聽器容器中。

1. rabbitListenerContainerFactory 配置示例

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory,RabbitTemplate rabbitTemplate) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setAcknowledgeMode(AcknowledgeMode.AUTO);factory.setAdviceChain(retryInterceptor(rabbitTemplate));return factory;
}@Bean
public RetryOperationsInterceptor retryInterceptor(RabbitTemplate rabbitTemplate) {return RetryInterceptorBuilder.stateless().maxAttempts(3).backOffOptions(1000, 2.0, 10000).recoverer(new RepublishMessageRecoverer(rabbitTemplate,DEAD_LETTER_EXCHANGE,DEAD_LETTER_ROUTING_KEY)).build();
}

在這里,RetryOperationsInterceptor 被添加到監聽器容器的 adviceChain 中,使其能夠攔截 onMessage 方法的執行。

四、底層源碼分析

讓我們更深入地看看Spring AMQP和Spring Retry的集成是如何實現的。

1. Spring AMQP 消息監聽器容器

SimpleRabbitListenerContainerFactory 創建的 SimpleMessageListenerContainer 是實際的消息監聽器容器。該容器負責從RabbitMQ獲取消息并調用相應的監聽方法。

2. RetryOperationsInterceptor 的集成

SimpleMessageListenerContainer 中,adviceChain 被應用到消息處理邏輯中。具體來說,RabbitListenerEndpointContainer#invokeListener 方法會被 RetryOperationsInterceptor 包裹,確保在調用監聽方法時執行重試邏輯。

3. RetryTemplate 的執行流程

RetryOperationsInterceptor 內部使用 RetryTemplate 來管理重試流程。其核心邏輯如下:

public Object invoke(MethodInvocation invocation) throws Throwable {return retryTemplate.execute(context -> {try {return invocation.proceed();} catch (Exception e) {throw e;}}, context -> {// 重試失敗后的恢復邏輯recoverer.recover(message, context.getLastThrowable());return null;});
}

在每次重試中,RetryTemplate 會調用 invocation.proceed() 執行實際的消息處理。如果拋出異常,則根據 RetryPolicy 決定是否繼續重試。

4. RepublishMessageRecoverer 的作用

當所有重試嘗試失敗后,RepublishMessageRecoverer 會將消息重新發布到指定的死信交換機和路由鍵。這是通過以下方式實現的:

public void recover(Message message, Throwable cause) {MessageProperties properties = message.getMessageProperties();properties.setHeader("x-exception-stacktrace", getStackTrace(cause));rabbitTemplate.send(deadLetterExchange, deadLetterRoutingKey, message);
}

這樣,未成功處理的消息不會丟失,而是被轉發到死信隊列,便于后續分析和處理。

五、總結

RetryOperationsInterceptor 在Spring Boot與RabbitMQ集成中,通過攔截消息處理方法的異常,按照配置的重試策略自動執行重試邏輯,極大地提高了系統的可靠性和健壯性。其底層依賴于Spring Retry的 RetryTemplateBackOffPolicy,并通過 RepublishMessageRecoverer 實現失敗后的消息轉發。

通過理解上述工作原理和源碼實現,可以更靈活地配置和優化消息重試機制,確保消息處理的穩定性和可控性。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/84984.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/84984.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/84984.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

如何使用JacksonTypeHandler處理mysql json字符串轉List對象的問題

在使用mysql5.7或更高版本時&#xff0c;json類型字段應用場景越來越多&#xff0c;對于普通的對象或者List<Integer>、List<String>這些基礎類型&#xff0c;jacksonTypeHandler都能很好的處理&#xff0c;如下&#xff1a; 1、定義一個person對象 import com.f…

華為云Flexus+DeepSeek征文 | 基于Dify構建股票分析助手

華為云FlexusDeepSeek征文 | 基于Dify構建AI 圖片生成應用 一、構建股票分析助手前言二、構建股票分析助手環境2.1 基于FlexusX實例的Dify平臺2.2 基于MaaS的模型API商用服務 三、構建股票分析助手實戰3.1 配置Dify環境3.2 配置Dify工具3.3 創建股票分析助手3.4 使用股票分析助…

【0.1 漫畫計算機組成原理】

??? 漫畫計算機組成原理 ?? 學習目標:深入理解計算機硬件基礎,為后續Java編程和性能優化打下堅實基礎 ?? 目錄 CPU架構與指令集內存層次結構馮諾依曼架構與哈佛架構總線系統與IO設備計算機性能分析實際應用場景?? 漫畫引言 小明: “為什么我的Java程序有時候跑得飛…

pytorch 實戰二 CNN手寫數字識別

系列文章目錄 文章目錄 系列文章目錄前言一、torchvision.datasets1. 數據下載2. 數據分批次傳入 二、網絡1. 網絡搭建2. 訓練3.測試 完整代碼三、保存模型與推理&#xff08;inference&#xff09;模型保存推理鳴謝 前言 手寫數字識別&#xff0c;就是要根據手寫的數字0~9&…

[Godot] C#讀取CSV表格創建雙層字典實現本地化

最近研究了一下本地化&#xff0c;給大家用簡單易懂的方式說明我是怎么實現的&#xff0c;使用CSV表格填寫翻譯&#xff0c;然后在Godot中讀取為字典 表格填寫 首先&#xff0c;我們表格可以按照下面這種格式填寫 idzhenjaruesdefrapple蘋果appleリンゴяблокоmanzanaA…

Spark 之 Subquery

各類 Subquery src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala /*** Evaluates to `true` if `values` are returned in `query`s result set.*/ case class InSubquery(values: Seq[Expression], query: ListQuery)extends Predicate with Une…

3.1.3_棧的鏈式存儲實現

知識總覽&#xff1a; 鏈棧定義&#xff1a; 頭插法建立單鏈表&#xff1a; 每次要插入一個元素的時候&#xff0c;總是把該元素插在頭節點之后的位置&#xff0c;如果規定只能在單鏈表的鏈頭一端進行操作即為進棧操作 每次刪除一個元素的時候&#xff0c;規定只能在單鏈表…

華為OD機試_2025 B卷_字符串重新排列(Python,100分)(附詳細解題思路)

題目描述 給定一個字符串s&#xff0c;s包括以空格分隔的若干個單詞&#xff0c;請對s進行如下處理后輸出&#xff1a; 1、單詞內部調整&#xff1a;對每個單詞字母重新按字典序排序 2、單詞間順序調整&#xff1a; 1&#xff09;統計每個單詞出現的次數&#xff0c;并按次數降…

http的緩存問題

一句話概括&#xff1a;瀏覽器請求資源的時候&#xff0c;會首先檢查本地是否有緩存&#xff0c;減少向服務器請求的次數 一、緩存類型&#xff1a; 1. 強緩存&#xff08;本地緩存&#xff09;&#xff1a;直接讀本地&#xff0c;不發請求 控制方式&#xff1a; ① Cache-C…

【網絡安全】SRC漏洞挖掘思路/手法分享

文章目錄 Tip1Tip2Tip3Tip4Tip5Tip6Tip7Tip8Tip9Tip10Tip11Tip12Tip13Tip14Tip15Tip16Tip17Tip18Tip19Tip20Tip21Tip22Tip23Tip24Tip25Tip26Tip27Tip28Tip29Tip30Tip1 “復制該主機所有 URL”:包含該主機上的所有接口等資源。 “復制此主機里的鏈接”:包括該主機加載的第三…

「Linux中Shell命令」Shell常見命令

知識點及案例解析 1. who 命令 功能:顯示當前登錄系統的用戶信息,包括用戶名、終端、登錄時間、IP等。 案例: who輸出示例: root tty1 2025-06-13 19:42 root pts/0 2025-06-13 19:45 (192.168.226.1)解析: 顯示兩個用戶登錄信息: 第一列(用…

StampedLock入門教程

文章目錄 一、理解“戳” (Stamp)二、為什么 StampedLock 能提高讀性能&#xff1f;秘密在于“樂觀讀”StampedLock性能對比性能對比結果圖 總結 StampedLock完整演示代碼對代碼的疑問之處問題一&#xff1a;為什么 demonstrateOptimisticReadFailure 中寫線程能修改成功&#…

基于云計算的振動弦分析:諧波可視化與波動方程參數理解-AI云計算數值分析和代碼驗證

振動弦方程是一個基礎的偏微分方程&#xff0c;它描述了彈性弦的橫向振動。其應用范圍廣泛&#xff0c;不僅可用于模擬樂器和一般的波動現象&#xff0c;更是數學物理以及深奧的弦理論中的重要基石。 ??AI云計算數值分析和代碼驗證 振動弦方程是描述固定兩端彈性弦橫向振動的…

Qt .pro配置gcc相關命令(三):-W1、-L、-rpath和-rpath-link

目錄 1.Linux 動態庫相關知識 1.1.動態庫查找路徑 1.2.查看程序依賴的動態庫 1.3.修改動態庫查找路徑的方法 1.4.動態鏈接器緩存管理 2.-Wl參數 3.-L選項&#xff08;編譯時路徑&#xff09; 4.-rpath參數(運行時路徑) 5.-rpath-link 參數 6.常見問題與解決方案 7.總…

Hoppscotch

官方地址 xixiaxiazxiaxix下載 ? Hoppscotch Hoppscotch 是一款輕量級、基于 Web 的 API 開發套件&#xff0c;其核心功能和特點如下&#xff1a; 核心功能3 交互式 API 測試&#xff1a;允許用戶實時發送請求并查看響應&#xff0c;方便記錄 API 行為&#xff0c;在記錄響…

RabbitMQ 知識詳解(Java版)

RabbitMQ 知識詳解&#xff08;Java版&#xff09; RabbitMQ 是一個開源的消息代理&#xff0c;實現了高級消息隊列協議&#xff08;AMQP&#xff09;。它用于在分布式系統中實現應用解耦、異步通信和流量削峰。 核心概念 生產者(Producer)&#xff1a;發送消息的應用消費者(…

Flink task、Operator 和 UDF 之間的關系

要真正駕馭 Flink 并構建出高效、穩定、可擴展的流處理應用&#xff0c;僅僅停留在 API 的表面使用是遠遠不夠的。深入理解其內部的運行機制&#xff0c;洞悉數據從代碼到分布式執行的完整生命周期&#xff0c;以及明晰各個核心組件之間錯綜復雜而又協同工作的關系&#xff0c;…

Veeam Backup Replication系統的安裝與使用

Veeam Backup & Replication系統安裝與使用 系統簡介 核心功能 備份與恢復&#xff1a;專注于虛擬化環境&#xff08;如VMware和Hyper-V&#xff09;的備份與恢復&#xff0c;支持物理服務器和云環境。快速恢復&#xff1a;提供即時恢復功能&#xff0c;可在幾分鐘內恢復…

十四、【ESP32全棧開發指南:搭建輕量級HTTP服務器】

一、HTTP協議基礎 HTTP&#xff08;Hyper Text Transfer Protocol&#xff09;作為互聯網基礎協議&#xff0c;采用請求-響應模型工作&#xff1a; 1.1 HTTP請求組成 GET /uri?query1value1 HTTP/1.1 // 請求行&#xff08;方法URI協議版本&#xff09; Host: example…

java中LinkedList和ArrayList的區別和聯系?

我們被要求解釋Java中LinkedList和ArrayList的區別和聯系。下面將分別從實現原理、性能特點、使用場景等方面進行詳細說明&#xff0c;并總結它們的相同點和不同點。 # 一、聯系&#xff08;共同點&#xff09; 1. 都實現了List接口&#xff0c;因此具有List接口的所有方法&…