RabbitMQ 發送方確認的兩大工具 (With Spring Boot)

核心概念解析

發布者確認機制的核心思想是:將消息投遞的可靠性從“盡力而為”提升為“契約保證”。生產者不再是“發后不理”,而是與 Broker 建立一個雙向的溝通渠道。

在 Spring AMQP 的封裝下,這個機制主要由兩個回調接口實現:

1. ConfirmCallback: 確認消息是否到達 Exchange

這是最核心的確認機制。它關注的是消息從生產者到 Broker 內的交換機 (Exchange) 這一段路程是否成功。

  • 觸發時機:無論消息是否成功到達 Exchange,Broker 都會異步地調用生產者的這個回調函數。
  • 如何工作
    • 如果 Broker 成功接收消息并將其放入 Exchange,回調中的 ack 參數將為 true
    • 如果 Broker 因故(如內部錯誤、交換機不存在等)未能接收消息,ack 參數將為 false,同時 cause 參數會提供失敗的原因描述。
  • 作用:它回答了問題:“Broker 收到我的消息了嗎?

2. ReturnCallback: 確認消息是否路由到 Queue

這是一個補充機制,處理的是一個更細分的場景。它在 ConfirmCallback 返回成功 (ack=true) 的前提下才可能被觸發。

  • 觸發時機:當消息已成功到達 Exchange,但 Exchange 無法根據路由鍵 (Routing Key) 將消息路由到任何一個綁定的隊列時,Broker 會將這條“無法投遞”的消息退回給生產者,并調用此回調。
  • 如何工作
    • 如果消息被正常路由到一個或多個隊列,ReturnCallback 不會被觸發
    • 如果消息無法路由(例如,路由鍵寫錯,或者沒有隊列綁定這個路由鍵),回調函數將被調用,你可以從 ReturnedMessage 參數中獲取到被退回的消息內容、路由信息和退回原因。
  • 作用:它回答了問題:“我發給 Exchange 的消息,有隊列接收它嗎?
  • 回調參數:回調函數中有?個參數: ReturnedMessage 包含以下屬性
public class ReturnedMessage {//返回的消息對象,包含了消息體和消息屬性private final Message message;//由Broker提供的回復碼, 表?消息?法路由的原因. 通常是?個數字代碼,每個數字代表不同的含義.private final int replyCode;//?個?本字符串, 提供了?法路由消息的額外信息或錯誤描述.private final String replyText;//消息被發送到的交換機名稱private final String exchange;//消息的路由鍵,即發送消息時指定的鍵private final String routingKey;
}

工作流程圖

下圖清晰地展示了這兩種回調機制在消息發送過程中的作用點:
在這里插入圖片描述

demo演練

接下來,我們通過一個 Spring Boot 項目來演示如何實現發布者確認。

項目結構

一個簡單的 Spring Boot 項目結構如下:

此處的com.example.ackdemo要改成讀者自己的項目路徑,包括后面相關代碼的路徑引入也需要進行對應修改

ack-demo
├── src
│   ├── main
│   │   ├── java
│   │   │   └── com
│   │   │       └── example
│   │   │           └── publishackdemo
│   │   │               │   ├── RabbitMQConfig.java
│   │   │               │   └── RabbitTemplateConfig.java
│   │   │               │   └── MessageController.java
│   │   │               └── AckDemoApplication.java
│   │   └── resources
│   │       └── application.yml
└── pom.xml

配置發布者確認模式

src/main/resources/application.yml 文件中,進行如下配置:

spring:application:name: ackDemorabbitmq:host: localhostport: 5672username: guestpassword: guestpublisher-confirm-type: correlated # 開啟ConfirmCallback,correlated表示回調時會攜帶CorrelationDatapublisher-returns: true # 開啟ReturnCallback

聲明 Exchange 和 Queue

config/RabbitMQConfig.java 中聲明我們需要的交換機和隊列。

注意此處引入的包為org.springframework.amqp.core

package com.example.publishackdemo.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration("RabbitMQConfigWithProductACK")
public class RabbitMQConfig {// 此處的常量提取到一個單獨的靜態類中更好,此處為了方便演式不單獨提取public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";public static final String CONFIRM_QUEUE_NAME = "confirm.queue";public static final String CONFIRM_ROUTING_KEY = "key.confirm";@Beanpublic TopicExchange confirmExchange() {return ExchangeBuilder.topicExchange(CONFIRM_EXCHANGE_NAME).durable(true).build();}@Beanpublic Queue confirmQueue() {return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}@Beanpublic Binding confirmBinding(Queue confirmQueue, TopicExchange confirmExchange) {return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);}
}

配置 RabbitTemplate 回調

這是核心步驟。我們創建一個配置類,專門用于定制 RabbitTemplate,并為其設置回調。

package com.example.publishackdemo.config;import lombok.extern.slf4j.Slf4j;  
import org.springframework.amqp.rabbit.connection.ConnectionFactory;  
import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  @Slf4j  
@Configuration("RabbitMQConfigWithPublisherAck")  
public class RabbitTemplateConfig {  @Bean("rabbitTemplate")  public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {  return new RabbitTemplate(connectionFactory);  }  @Bean("confirmRabbitTemplate") // 給這個新的Bean起一個唯一的名字  public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {  RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);  // 關鍵:只為這個新的實例設置回調  rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {  String id = (correlationData != null) ? correlationData.getId() : "";  if (ack) {  log.info("ConfirmCallback: 消息發送成功!ID: {}", id);  } else {  log.error("ConfirmCallback: 消息發送失敗!ID: {}, 原因: {}", id, cause);  }  });  // 同樣可以設置 ReturnsCallback//rabbitTemplate.setMandatory(true);   rabbitTemplate.setReturnsCallback(returnedMessage -> {  log.warn("ReturnsCallback: 消息被退回! Message: {}, ReplyCode: {}, ReplyText: {}, Exchange: {}, RoutingKey: {}",  new String(returnedMessage.getMessage().getBody()),  returnedMessage.getReplyCode(),  returnedMessage.getReplyText(),  returnedMessage.getExchange(),  returnedMessage.getRoutingKey());  // 此處可以記錄無法路由的消息,用于后續分析或處理  });  return rabbitTemplate;  }  
}

此處配置兩個RabbitTemplate的原因?

  • setConfirmCallback只能被設置一次,如果直接在Controller層里面調用的時候聲明,那么每次請求都會調用一次設置的代碼,會導致第二次之后的請求都會報錯,所以需要提取到Config里面
  • 在設置RabbitTemplatesetReturnsCallback或者setConfirmCallback設置之后會全局生效,如果并不需要進行發送確認的生產者也使用了這個Template那么會導致性能下降等問題,所以創建兩個不同的Bean就是為了在不同情況選擇不同的Bean對象

生產者代碼 (Publisher)

修改 MessageController,增加幾個測試接口來模擬不同場景。

package com.example.ackdemo.controller;import com.example.ackdemo.config.RabbitMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;import java.util.UUID;@Slf4j
@RestController
public class MessageController {@Resource(name = "confirmRabbitTemplate")private RabbitTemplate rabbitTemplate;// 1. 測試正常發送@GetMapping("/send/ok")public String sendOkMessage() {String id = UUID.randomUUID().toString();String message = "A correct message.";log.info("Sending message with ID: {}", id);rabbitTemplate.convertAndSend(RabbitMQConfig.CONFIRM_EXCHANGE_NAME, RabbitMQConfig.CONFIRM_ROUTING_KEY, message, new CorrelationData(id));return "Message sent (OK). Check logs for callback.";}// 2. 測試發送到不存在的 Exchange@GetMapping("/send/bad-exchange")public String sendToBadExchange() {String id = UUID.randomUUID().toString();String message = "Message to a non-existent exchange.";log.info("Sending message with ID: {} to a bad exchange", id);rabbitTemplate.convertAndSend("non-existent-exchange", RabbitMQConfig.CONFIRM_ROUTING_KEY, message, new CorrelationData(id));return "Message sent (Bad Exchange). Check logs for callback.";}// 3. 測試發送到正確的 Exchange,但錯誤的 Routing Key@GetMapping("/send/bad-routing")public String sendWithBadRoutingKey() {String id = UUID.randomUUID().toString();String message = "Message with a bad routing key.";log.info("Sending message with ID: {} with a bad routing key", id);rabbitTemplate.convertAndSend(RabbitMQConfig.CONFIRM_EXCHANGE_NAME, "wrong.key.123", message, new CorrelationData(id));return "Message sent (Bad Routing). Check logs for callback.";}
}

運行與驗證

  1. 啟動應用。
  2. 驗證成功場景
    • 訪問 http://localhost:8080/send/ok
    • 日志打印:你會看到兩條日志,說明回調已經正確設置。
2025-07-11T19:59:56.682+08:00  INFO 11868 --- [mqDemo] [nio-8080-exec-1] c.d.m.p.PublisherController              : Sending message with ID: 679ddb2f-0bfe-4ec6-b77e-4dd44c1612e6
2025-07-11T19:59:56.980+08:00  INFO 11868 --- [mqDemo] [nectionFactory2] c.d.m.p.RabbitTemplateConfig             : ConfirmCallback: 消息發送成功!ID: 679ddb2f-0bfe-4ec6-b77e-4dd44c1612e6
  • 觀察 RabbitMQ 管理界面confirm.queue 中會有一條消息。
  1. 驗證交換機失敗場景
    • 訪問 http://localhost:8080/send/bad-exchange
    • 日志打印:只會觸發 ConfirmCallback 的失敗回調。
reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost 'demo', class-id=60, method-id=40)
2025-07-11T20:02:41.784+08:00 ERROR 40760 --- [mqDemo] [nectionFactory3] c.d.m.p.RabbitTemplateConfig             : ConfirmCallback: 消息發送失敗!ID: bf93981e-f0a7-485d-bddf-cd5aec3e299f, 原因: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost 'demo', class-id=60, method-id=40)
  1. 驗證路由失敗場景
    • 訪問 http://localhost:8080/send/bad-routing
    • 日志打印:你會看到先觸發成功ConfirmCallback(因為消息確實到達了 Exchange)
    • 但是并沒有觸發ReturnsCallback,為什么?
      • 因為需要在其方法前添加一行rabbitTemplate.setMandatory(true);來開啟此功能
2025-07-11T20:13:00.788+08:00  INFO 10852 --- [mqDemo] [nio-8080-exec-1] c.d.m.p.PublisherController              : Sending message with ID: 24753c11-7341-4836-8c74-79a0deae8f3b with a bad routing key
2025-07-11T20:13:01.023+08:00  WARN 10852 --- [mqDemo] [nectionFactory2] c.d.m.p.RabbitTemplateConfig             : ReturnsCallback: 消息被退回! Message: Message with a bad routing key., ReplyCode: 312, ReplyText: NO_ROUTE, Exchange: confirm.exchange, RoutingKey: wrong.key.123
2025-07-11T20:13:01.023+08:00  INFO 10852 --- [mqDemo] [nectionFactory3] c.d.m.p.RabbitTemplateConfig             : ConfirmCallback: 消息發送成功!ID: 24753c11-7341-4836-8c74-79a0deae8f3b

生產環境注意事項

  1. 為失敗做好準備:收到 nackreturn 回調后,必須有相應的補償機制。常見的策略包括:

    • 有限重試:對于網絡抖動等臨時性故障,可以進行幾次延時重試。
    • 記錄日志與告警:對于持續失敗或邏輯錯誤(如錯誤的Exchange/RoutingKey),應詳細記錄日志,并觸發告警通知開發人員介入。
    • 消息入庫:將發送失敗的消息存入數據庫或本地文件,通過定時任務進行重發,這是最可靠的補償方式。
  2. 善用 CorrelationData:在異步高并發場景下,CorrelationData 是你識別哪條消息得到確認的唯一憑證。它的 ID 應該具有業務唯一性(如訂單ID、業務流水號),以便于追蹤和排錯。

  3. 性能權衡:開啟發布者確認會增加網絡開銷和 Broker 的 CPU 負擔,從而降低消息發送的吞吐量。對于可以容忍少量丟失的非核心業務(如打點日志),可以關閉此功能以追求性能。

  4. 全局回調 vs. 單次發送回調:我們演示的是全局配置 RabbitTemplate 的回調。RabbitTemplate 也支持為單次 send 操作指定一個臨時的 CorrelationData,它內部可以包含更豐富的回調邏輯,適用于需要對特定消息進行特殊處理的場景。

  5. 構建完整的可靠性鏈路:切記,發布者確認只是可靠性拼圖的一部分。一個完整的可靠性方案必須是:發布者確認 + 持久化(交換機、隊列、消息)+ 消費者確認。三者結合,才能最大限度地保證消息在整個生命周期內的安全。

總結

RabbitMQ 的發布者確認機制,通過 ConfirmCallbackReturnCallback 兩個強大的工具,為我們彌補了消息從生產者到 Broker 這一段路程中的可靠性盲區。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/91175.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/91175.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/91175.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

KONG API Gateway中的核心概念

在使用Kong API Gateway&#xff08;API網關&#xff09;時&#xff0c;理解其核心概念是掌握其工作原理的基礎。這些概念既體現了Kong的設計哲學&#xff0c;也決定了它如何適配復雜的API管理場景&#xff08;如微服務、多團隊協作等&#xff09;。本文將系統梳理Kong的核心概…

如何解決pip安裝報錯ModuleNotFoundError: No module named ‘jupyterlab’問題

【Python系列Bug修復PyCharm控制臺pip install報錯】如何解決pip安裝報錯ModuleNotFoundError: No module named ‘jupyterlab’問題 摘要 在開發過程中&#xff0c;我們經常會遇到各種模塊安裝的問題&#xff0c;尤其是在使用PyCharm時&#xff0c;經常會遇到pip install時的…

3 運算符與表達式

運算符&#xff1a;對字面量或者變量進行操作的符號 表達式&#xff1a;用運算符把字面量或者變量連接起來符合java語法的式子就可以稱作表達式不同運算符連接的表達式體現的是不同類型的表達式int a 10; int b 20; int c a b;&#xff1a;運算符&#xff0c;并且是算術運算…

MySQL的單行函數:

目錄 函數的理解&#xff1a; MySQL的內置函數及分類&#xff1a; 單行函數&#xff1a; 數值函數&#xff1a; 基本函數&#xff1a; 角度與弧度互換函數&#xff1a; 三角函數&#xff1a; 指數與對數&#xff1a; 進制轉換&#xff1a; 字符串函數&#xff1a; 日…

設計模式(二十一)行為型:狀態模式詳解

設計模式&#xff08;二十一&#xff09;行為型&#xff1a;狀態模式詳解狀態模式&#xff08;State Pattern&#xff09;是 GoF 23 種設計模式中的行為型模式之一&#xff0c;其核心價值在于允許一個對象在其內部狀態改變時改變其行為&#xff0c;使得對象看起來像是修改了它的…

深入理解 Doris Compaction:提升查詢性能的幕后功臣

在 Doris 的數據存儲與查詢體系里&#xff0c;Compaction 是保障查詢效率、優化存儲結構的關鍵機制。如果你好奇 Doris 如何在高頻寫入后仍能高效響應查詢&#xff0c;或是想解決數據版本膨脹帶來的性能問題&#xff0c;這篇關于 Compaction 的深度解析值得收藏 &#x1f447; …

css 實現虛線效果的多種方式

使用邊框實現虛線 通過設置元素的邊框樣式來實現虛線效果。以下為示例代碼: .dashed {border: 1px dashed black; }使用 CSS 偽元素實現虛線 使用偽元素來模擬虛線的效果。以下為示例代碼: .dashed::before {content: "";display: block;height: 1px;border-bo…

深入剖析 RocketMQ 分布式事務:原理、流程與實踐

Apache RocketMQ 是一種分布式消息隊列系統&#xff0c;支持分布式事務消息&#xff0c;以確保在分布式系統中數據的一致性。它通過一種基于兩階段提交(2PC)的機制結合補償邏輯來實現分布式事務的最終一致性。以下是對 RocketMQ 分布式事務的詳細講解&#xff0c;包括其核心概念…

具身智能 自動駕駛相關崗位的技術棧與能力地圖

一、硬技能技術棧&#xff08;優先級排序&#xff09; 1. 核心領域技術&#xff08;★★★★★&#xff09;技術方向具體技能學習建議大模型實戰- VLA架構&#xff08;RT-2、PaLM-E&#xff09;開發/微調- 多模態對齊&#xff08;CLIP、Flamingo&#xff09;- 生成式策略&#…

實現了加載 正向 碰撞 雅可比 仿真

""" # 此示例從 URDF 文件中加載一個 UR10 機械臂的模型 # 隨后演示 Pinocchio 庫的基本功能,如正向運動學計算 # 雅可比矩陣計算、碰撞檢測以及動力學仿真 """ # 導入 meshcat 的幾何模塊,用于創建和管理可視化的幾何對象 import meshcat.geo…

【0基礎PS】PS工具詳解--畫筆工具

目錄前言一、畫筆工具的位置與快捷鍵?二、畫筆工具選項欄設置?三、畫筆工具的進階應用?四、常見問題及解決方法?總結前言 在 Photoshop 的眾多工具中&#xff0c;畫筆工具無疑是極具創造力和實用性的工具之一。無論是進行圖像繪制、照片修飾&#xff0c;還是特效制作&…

window10和ubuntu22.04雙系統之卸載ubuntu系統

window10和ubuntu22.04雙系統之卸載ubuntu系統&#xff09;1. 刪除Ubuntu系統占用的磁盤分區&#xff08;在Windows下操作&#xff09;2. 刪除ubuntu開機引導項1. winr出來終端提示框后輸入2. 然后會在命令行中顯示電腦的硬盤列表&#xff0c;輸入命令選擇安裝Windows的那個硬盤…

(C++)C++類和類的方法(基礎教程)(與Python類的區別)

前言&#xff1a; 本篇博客建議搭配&#xff1a;&#xff08;Python&#xff09;類和類的方法&#xff08;基礎教程介紹&#xff09;&#xff08;Python基礎教程&#xff09;-CSDN博客 一起學習使用&#xff1b; 源代碼&#xff1a; #include <iostream> #include &…

【NLP輿情分析】基于python微博輿情分析可視化系統(flask+pandas+echarts) 視頻教程 - 微博文章數據可視化分析-文章分類下拉框實現

大家好&#xff0c;我是java1234_小鋒老師&#xff0c;最近寫了一套【NLP輿情分析】基于python微博輿情分析可視化系統(flaskpandasecharts)視頻教程&#xff0c;持續更新中&#xff0c;計劃月底更新完&#xff0c;感謝支持。今天講解微博文章數據可視化分析-文章分類下拉框實現…

Git命令保姆級教程

Git 入門網站 https://learngitbranching.js.org/?localezh_CN Git 命令 git init // 在本地目錄內部會生成.git文件夾 git initgit clone // 從git服務器拉取代碼 // 代碼下載完成后在當前文件夾中會有一個 shop 的目錄&#xff0c;通過 cd shop 命令進入目錄。 git clone ht…

Java Ai For循環 (day07)

循環結構 for&#xff1a;循環語句的作用&#xff1a;可以將一段代碼重復的執行很多次for 循環語句格式&#xff1a;執行流程&#xff1a; 初始化語句執行條件判斷語句&#xff0c;看結果是 true&#xff0c;還是 false false結束&#xff0c;true繼續執行循環體語句執行條件控…

Directory Opus 使用優化

自定義快捷鍵 Directory Opus 移動標簽到另一欄 設置快捷鍵&#xff1a;ctrl←/→ 設置步驟&#xff1a; 打開【設置】—>選擇【自定義工具欄和快捷鍵】 選擇【新建】—>【新建窗口快捷鍵】 輸入快捷鍵命令 Go TABMOVEother此時可以點擊運行進行測試&#xff0c;…

Qt知識點2『Ubuntu24.04.2安裝Qt5.12.9各種報錯』

問題1&#xff1a;Qt安裝完畢后&#xff0c;新建一個最簡單的測試程序&#xff0c;但是QtCreator左側構建的三個按鈕呈現灰色&#xff0c;無法進行構建操作答&#xff1a;進入QtCreator的Kits界面&#xff08;工具-選項&#xff09;&#xff0c;點擊"自動檢測"下的De…

TS面試題

1.TS有哪些類型&#xff08;對比與js&#xff09;&#xff1f;關鍵字/語法用途示例any關閉類型檢查let a: any 4unknown類型安全的 anylet u: unknown 4; if (typeof u number) …never永不存在的值function err(): never { throw 0; }void無返回值function f(): void {}enu…

借助Early Hints和HarperDB改善網頁性能

對電商網站來說&#xff0c;糟糕的頁面性能可能會增加交易放棄率。一直以來&#xff0c;人們會使用CDN進行緩存從而縮短頁面加載時間&#xff0c;但即便實施了強大的緩存&#xff0c;消費者在通過移動網絡訪問這些網站時可能仍然會需要頻繁等待。最近誕生了一種名為“早期提示”…