Sping Boot + RabbitMQ :如何在Spring Boot中整合RabbitMQ實現消息可靠投遞?

Spring Boot整合RabbitMQ實現消息可靠投遞全解析

在分布式系統中,消息中間件是解耦、異步、流量削峰的核心組件。RabbitMQ作為高可靠、易擴展的AMQP協議實現,被廣泛應用于企業級場景。但消息傳遞過程中可能因網絡波動、服務宕機等問題導致消息丟失,因此消息的可靠投遞是RabbitMQ使用的核心課題。本文將基于Spring Boot 3.x版本,詳細講解生產者(Producer)和消費者(Consumer)兩端的可靠投遞實現方案。


一、環境準備與基礎配置

1.1 依賴引入

pom.xml中添加Spring Boot RabbitMQ Starter依賴,自動整合AmqpTemplate和RabbitTemplate:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

1.2 連接配置

application.yml中配置RabbitMQ連接信息及關鍵可靠投遞參數:

spring:rabbitmq:host: 127.0.0.1       # RabbitMQ服務地址port: 5672            # 默認AMQP端口username: guest       # 默認用戶名(生產環境需替換)password: guest       # 默認密碼(生產環境需替換)virtual-host: /       # 默認虛擬主機# 生產者確認與回退配置publisher-confirm-type: correlated  # 關鍵:開啟消息確認模式publisher-returns: true             # 開啟消息回退模式# 消費者確認配置listener:simple:acknowledge-mode: manual        # 手動確認(默認auto自動確認)prefetch: 10                    # 消費者單次拉取最大消息數(防雪崩)

1.3 核心組件初始化

通過配置類初始化RabbitMQ連接工廠、消息模板及隊列/交換器聲明:

@Configuration
public class RabbitMQConfig {// 聲明測試用交換器和隊列(根據業務場景調整)public static final String TEST_EXCHANGE = "test.exchange";public static final String TEST_QUEUE = "test.queue";public static final String TEST_ROUTING_KEY = "test.key";@Beanpublic DirectExchange testExchange() {// 聲明直連交換器(持久化)return new DirectExchange(TEST_EXCHANGE, true, false);}@Beanpublic Queue testQueue() {// 聲明持久化隊列(durable=true)return new Queue(TEST_QUEUE, true, false, false);}@Beanpublic Binding testBinding() {// 綁定隊列與交換器return BindingBuilder.bind(testQueue()).to(testExchange()).with(TEST_ROUTING_KEY);}@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate template = new RabbitTemplate(connectionFactory);// 必須設置為true,否則ReturnCallback不會觸發(僅當消息無法路由到隊列時回調)template.setMandatory(true);return template;}
}

二、生產者可靠投遞:確認模式與回退模式

生產者的可靠投遞需解決兩個核心問題:

  1. 消息是否成功到達交換器(Exchange)?
  2. 消息從交換器到隊列(Queue)是否失敗?

Spring Boot通過ConfirmCallback(確認模式)和ReturnCallback(回退模式)分別解決這兩個問題。

2.1 確認模式(ConfirmCallback):消息到交換器的確認

作用:當消息被交換器接收時觸發回調(無論是否路由到隊列),用于確認消息已到達交換器。

2.1.1 配置與實現

通過RabbitTemplatesetConfirmCallback方法注冊回調:

@Service
public class ProducerService {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init() {// 注冊確認回調rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {// 消息成功到達交換器log.info("消息確認成功,ID:{}", correlationData.getId());} else {// 消息未到達交換器(如交換器不存在、權限不足)log.error("消息確認失敗,ID:{},原因:{}", correlationData.getId(), cause);// 這里可觸發重試邏輯(需結合correlationData存儲原始消息)}});}public void sendMessage(String message) {// 構造CorrelationData(用于關聯消息ID,需全局唯一)CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(RabbitMQConfig.TEST_EXCHANGE,RabbitMQConfig.TEST_ROUTING_KEY,message,correlationData);}
}
2.1.2 參數與注意事項
  • publisher-confirm-type
    • none(默認):禁用確認模式;
    • correlated:啟用關聯確認(推薦),通過CorrelationData傳遞消息元數據;
    • simple:簡化模式(兼容老版本),僅支持同步確認。
  • CorrelationData:必須顯式傳遞,否則回調中無法獲取消息ID等元數據;
  • 異步特性:確認回調是異步觸發的,生產環境需結合本地消息表或Redis記錄消息狀態,避免丟失。

2.2 回退模式(ReturnCallback):交換器到隊列的失敗處理

作用:當消息成功到達交換器,但無法路由到任何隊列時觸發回調(如路由鍵錯誤、隊列未綁定)。

2.2.1 配置與實現

通過RabbitTemplatesetReturnCallback方法注冊回退回調(Spring Boot 2.1+推薦使用setReturnsCallback):

@PostConstruct
public void init() {// 回退回調(Spring Boot 2.1+推薦使用ReturnsCallback)rabbitTemplate.setReturnsCallback(returned -> {Message message = returned.getMessage();String exchange = returned.getExchange();String routingKey = returned.getRoutingKey();int replyCode = returned.getReplyCode();String replyText = returned.getReplyText();log.error("消息回退,交換器:{},路由鍵:{},錯誤碼:{},原因:{},消息內容:{}",exchange, routingKey, replyCode, replyText, new String(message.getBody()));// 這里可觸發補償邏輯(如修改路由鍵重發)});
}
2.2.2 參數與注意事項
  • mandatory:必須設置為true(通過rabbitTemplate.setMandatory(true)),否則RabbitMQ會靜默丟棄無法路由的消息;
  • 觸發條件:僅當消息無法路由到任何隊列時觸發(若交換器綁定了多個隊列,只要有一個隊列匹配就不會觸發);
  • 與確認模式的關系:確認模式(ConfirmCallback)先于回退模式觸發,因為交換器接收消息后才會嘗試路由。

三、消費者可靠投遞:自動確認與手動確認

消費者的可靠投遞核心是消息確認(ACK)機制,確保消息被成功處理后再確認,避免因處理失敗導致消息丟失。

3.1 自動確認(AUTO):簡單但高風險

原理:消息一旦被消費者接收,RabbitMQ立即標記為已確認并刪除。若消費者處理失敗(如拋出異常),消息已丟失。

3.1.1 配置與實現

application.yml中設置acknowledge-mode: auto(默認值),消費者無需手動處理ACK:

@Component
public class AutoAckConsumer {@RabbitListener(queues = RabbitMQConfig.TEST_QUEUE)public void handleMessage(String message) {try {// 模擬業務處理log.info("自動確認模式-消費消息:{}", message);// 若處理成功,RabbitMQ自動ACK} catch (Exception e) {log.error("消息處理失敗:{}", message, e);// 無補救措施,消息已丟失!}}
}
3.1.2 適用場景與風險
  • 適用場景:消息處理邏輯簡單、無失敗可能(如日志記錄);
  • 風險:消息處理失敗時無法重試,可能導致數據丟失;
  • 生產環境不推薦,除非能接受消息丟失。

3.2 手動確認(MANUAL):精準控制,生產首選

原理:消費者顯式調用channel.basicAck(確認)或channel.basicNack(拒絕),RabbitMQ根據ACK狀態決定是否重新入隊。

3.2.1 配置與實現
  1. application.yml中設置acknowledge-mode: manual
  2. 消費者方法中注入ChannelMessage對象,手動處理ACK:
@Component
public class ManualAckConsumer {@RabbitListener(queues = RabbitMQConfig.TEST_QUEUE)public void handleMessage(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {String msgContent = new String(message.getBody(), StandardCharsets.UTF_8);log.info("手動確認模式-消費消息:{}", msgContent);// 模擬業務處理(可能失敗)businessProcess(msgContent);// 處理成功:確認消息(multiple=false表示僅確認當前消息)channel.basicAck(deliveryTag, false);} catch (Exception e) {log.error("消息處理失敗,準備重試或丟棄:{}", message, e);// 處理失敗:拒絕消息(requeue=true表示重新入隊,false表示丟棄或進入死信隊列)channel.basicNack(deliveryTag, false, true); // 或使用basicReject(僅拒絕單條消息):// channel.basicReject(deliveryTag, true);}}private void businessProcess(String message) {// 模擬可能失敗的業務邏輯if (message.contains("error")) {throw new RuntimeException("模擬業務處理失敗");}}
}
3.2.2 關鍵方法與參數解釋
  • channel.basicAck(deliveryTag, multiple)
    • deliveryTag:消息的唯一標識(由RabbitMQ生成);
    • multiple:是否批量確認(true表示確認所有小于deliveryTag的未確認消息);
  • channel.basicNack(deliveryTag, multiple, requeue)
    • requeuetrue表示消息重新入隊(可能被同一消費者重復消費),false表示丟棄或進入死信隊列;
  • channel.basicReject(deliveryTag, requeue):與basicNack類似,但僅支持單條消息拒絕。
3.2.3 生產環境注意事項
  • 冪等性處理:消息可能因requeue=true被重復消費,業務邏輯需保證冪等(如通過數據庫唯一索引、Redis分布式鎖);
  • 異常捕獲范圍:必須在try-catch中包裹完整的業務邏輯,避免未捕獲異常導致ACK未發送,消息被無限阻塞;
  • 批量確認優化:若處理大量消息,可結合multiple=true批量確認提升性能(需確保批量消息均處理成功);
  • 死信隊列(DLX):建議將requeue=false的消息路由到死信隊列,避免無限重試消耗資源(需提前聲明死信交換器和隊列)。

四、總結與最佳實踐

4.1 生產者側關鍵要點

  • 啟用correlated確認模式,結合CorrelationData記錄消息ID;
  • 啟用回退模式(mandatory=true),捕獲無法路由的消息;
  • 確認回調中實現消息重試(需避免無限重試,可結合指數退避策略);
  • 消息持久化:設置交換器、隊列、消息本身為持久化(durable=true),防止RabbitMQ重啟導致消息丟失。

4.2 消費者側關鍵要點

  • 優先選擇手動確認模式(manual),精確控制消息狀態;
  • 處理邏輯必須保證冪等性,避免重復消費問題;
  • 合理設置prefetch參數(如prefetch=10),防止消費者負載過高;
  • 失敗消息路由到死信隊列,避免阻塞正常消息處理。

4.3 完整可靠投遞鏈路

通過“生產者確認+回退模式+消費者手動確認+消息持久化+死信隊列”的組合,可構建覆蓋全鏈路的可靠消息傳遞體系,滿足絕大多數企業級場景的需求。

后續我將會對死信隊列進行詳細講解,歡迎關注。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/94071.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/94071.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/94071.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

STAR-CCM+|K-epsilon湍流模型溯源

【1】引言 三維CFD仿真經典軟件很多&#xff0c;我接觸過的有Ansys和STAR-CCM兩種。因為一些機緣&#xff0c;我使用STAR-CCM更多&#xff0c;今天就來回顧一下STAR-CCM中K-epsilon湍流模型的基本定義。 【2】學習地址介紹 點擊鏈接User Guide可以到達網頁版本的STAR-CCM 24…

osgEarth 圖像融合正片疊底

* 需求&#xff1a;* 高程渲染圖 RGB.tif、 山體陰影圖 AMP.tif** 高程渲染圖 rgb波段分別 乘以 山體陰影圖r波段&#xff0c; 然后除以255(AI說 讀取的紋理就已經歸一化到了 0~1 范圍&#xff0c;不用除以 255)。本人遙感知識匱乏。問了AI,以上 需求在許多商業軟件上已實現。…

Java接口響應速度優化

在 Java 開發中&#xff0c;接口響應速度直接影響用戶體驗和系統吞吐量。優化接口性能需要從代碼、數據庫、緩存、架構等多個維度綜合考量&#xff0c;以下是具體方案及詳細解析&#xff1a;一、代碼層面優化代碼是接口性能的基礎&#xff0c;低效的代碼會直接導致響應緩慢。1.…

A Large Scale Synthetic Graph Dataset Generation Framework的學習筆記

文章的簡介 作者提出了一個可擴展的合成圖生成框架&#xff0c;能夠從真實圖中學習結構和特征分布&#xff0c;并生成任意規模的圖數據集&#xff0c;支持&#xff1a; 節點和邊的結構生成節點和邊的特征生成特征與結構的對齊&#xff08;Aligner&#xff09; 它區別于GraphWor…

Android12 Framework讀寫prop屬性selinux報錯解決

文章目錄問題描述解決過程相關文章問題描述 Android讀prop值時&#xff0c;就算是system應用&#xff0c; 也需要selinux權限&#xff0c;否則會報錯。 java代碼如下 SystemProperties.get("ro.input.resampling", "")selinux報錯如下 2025-06-28 17:57:…

【圖文版】AIOT 小智 AI 聊天機器人 ESP32 項目源碼圖解

前言 小智 AI 聊天機器人是最近一個很火的開源項目&#xff0c;它借助LLM大模型以及TTS等AI的能力&#xff0c;通過自然語言來與其對話實現交互。它可以回答任何問題、播放音樂、背誦古詩&#xff0c;頗有未來AI機器人的雛形。 因為最近工作上的需要對其進行了研究&#xff0c;…

250821-RHEL9.4上Docker及Docker-Compose的離線安裝

在 離線環境下 在 RHEL (Red Hat Enterprise Linux) 系統上安裝 Docker 和 Docker Compose&#xff0c;需要提前在有網絡的環境中下載相關 RPM 包及依賴&#xff0c;然后在目標機器上進行安裝。以下是比較完整的步驟&#xff1a; 1. Docker及Docker-Compose離線安裝 在 RHEL 9.…

react相關知識

1.類組件和函數組件&#xff08;1&#xff09;類組件import React, { Component } from react;class UserProfile extends Component {constructor(props) {super(props);this.state {userData: null,isLoading: true,};this.timerId null;}componentDidMount() {// 模擬 API…

算法第五十五天:圖論part05(第十一章)

并查集理論基礎并查集主要有兩個功能&#xff1a;將兩個元素添加到一個集合中。判斷兩個元素在不在同一個集合class UnionFind:def __init__(self, n):"""初始化并查集"""self.n nself.father list(range(n)) # 每個節點自己是根self.rank […

雨霧天氣漏檢率驟降80%!陌訊多模態車牌識別方案實戰解析

一、行業痛點&#xff1a;車牌識別的天氣敏感性據《智慧交通系統檢測白皮書》統計&#xff0c;雨霧環境下傳統車牌識別漏檢率高達42.7%&#xff08;2024年數據&#xff09;。主要存在三大技術瓶頸&#xff1a;1.??水膜干擾??&#xff1a;擋風玻璃水漬導致車牌區域紋理模糊2…

PostgreSQL15——查詢詳解

PostgreSQL15查詢詳解一、簡單查詢1.1、單表查詢1.2、無表查詢1.3、消除重復結果1.4、使用注釋二、查詢條件2.1、WHERE子句2.2、模式匹配2.3、空值判斷2.4、復雜條件三、排序顯示3.1、單列排序3.2、多列排序3.3、空值排序四、限定結果數量4.1、Top-N查詢4.2、分頁查詢4.3、注意…

03-容器數據卷

卷就是目錄或文件&#xff0c;存在于一個或多個容器中&#xff0c;由 docker 掛載到容器&#xff0c;但不屬于聯合文件系統&#xff0c;因此能夠繞過 UnionFS&#xff0c;提供一些用于持續存儲或共享數據。 特性&#xff1a;卷設計的目的就是數據的持久化&#xff0c;完全獨立于…

Linux內核進程管理子系統有什么第三十三回 —— 進程主結構詳解(29)

接前一篇文章&#xff1a;Linux內核進程管理子系統有什么第三十二回 —— 進程主結構詳解&#xff08;28&#xff09; 本文內容參考&#xff1a; Linux內核進程管理專題報告_linux rseq-CSDN博客 《趣談Linux操作系統 核心原理篇&#xff1a;第三部分 進程管理》—— 劉超 《…

從代碼學習深度強化學習 - 目標導向的強化學習-HER算法 PyTorch版

文章目錄 1. 前言:當一個任務有多個目標 2. 目標導向的強化學習 (GoRL) 簡介 3. HER算法:化失敗為成功的智慧 4. 代碼實踐:用PyTorch實現HER+DDPG 4.1 自定義環境 (WorldEnv) 4.2 智能體與算法 (DDPG) 4.3 HER的核心:軌跡經驗回放 4.4 主流程與訓練 5. 訓練結果與分析 6. 總…

前端 H5分片上傳 vue實現大文件

用uniapp開發APP上傳視頻文件&#xff0c;大文件可以上傳成功&#xff0c;但是一旦打包為H5的代碼&#xff0c;就會一提示鏈接超時&#xff0c;我的代碼中是實現的上傳到阿里云 如果需要看全文的私信我 官方開發文檔地址 前端&#xff1a;使用分片上傳的方式上傳大文件_對象…

Linux服務器Systemctl命令詳細使用指南

目錄 1. 基本語法 2. 基礎命令速查表 3. 常用示例 3.1 部署新服務后&#xff0c;設置開機自啟并啟動 3.2 檢查系統中所有失敗的服務并嘗試修復 3.3 查看系統中所有開機自啟的服務 4. 總結 以下是 systemctl 使用指南&#xff0c;涵蓋服務管理、單元操作、運行級別控制、…

【JVM內存結構系列】二、線程私有區域詳解:程序計數器、虛擬機棧、本地方法棧——搞懂棧溢出與線程隔離

上一篇文章我們搭建了JVM內存結構的整體框架,知道程序計數器、虛擬機棧、本地方法棧屬于“線程私有區域”——每個線程啟動時會單獨分配內存,線程結束后內存直接釋放,無需GC參與。這三個區域看似“小眾”,卻是理解線程執行邏輯、排查棧溢出異常的關鍵,也是面試中高頻被問的…

紅帽認證升級華為openEuler證書活動!

如果您有紅帽證書&#xff0c;可以升級以下相應的證書&#xff1a;&#x1f447; 有RHCSA證書&#xff0c;可以99元升級openEuler HCIA 有RHCE證書&#xff0c;可以99元升級openEuler HCIP 有RHCA證書&#xff0c;可以2100元升級openEuler HCIE 現金激勵&#xff1a;&#x1f4…

迭代器模式與幾個經典的C++實現

迭代器模式詳解1. 定義與意圖迭代器模式&#xff08;Iterator Pattern&#xff09; 是一種行為設計模式&#xff0c;它提供一種方法順序訪問一個聚合對象中的各個元素&#xff0c;而又不暴露該對象的內部表示。主要意圖&#xff1a;為不同的聚合結構提供統一的遍歷接口。將遍歷…

epoll 陷阱:隧道中的高級負擔

上周提到了 tun/tap 轉發框架的數據通道結構和優化 tun/tap 轉發性能優化&#xff0c;涉及 RingBuffer&#xff0c;packetization 等核心話題。我也給出了一定的數據結構以及處理邏輯&#xff0c;但竟然沒有高尚的 epoll&#xff0c;本文說說它&#xff0c;因為它不適合。 epo…