核心概念解析
發布者確認機制的核心思想是:將消息投遞的可靠性從“盡力而為”提升為“契約保證”。生產者不再是“發后不理”,而是與 Broker 建立一個雙向的溝通渠道。
在 Spring AMQP 的封裝下,這個機制主要由兩個回調接口實現:
1. ConfirmCallback: 確認消息是否到達 Exchange
這是最核心的確認機制。它關注的是消息從生產者到 Broker 內的交換機 (Exchange) 這一段路程是否成功。
- 觸發時機:無論消息是否成功到達 Exchange,Broker 都會異步地調用生產者的這個回調函數。
- 如何工作:
- 如果 Broker 成功接收消息并將其放入 Exchange,回調中的
ack
參數將為true
。 - 如果 Broker 因故(如內部錯誤、交換機不存在等)未能接收消息,
ack
參數將為false
,同時cause
參數會提供失敗的原因描述。
- 如果 Broker 成功接收消息并將其放入 Exchange,回調中的
- 作用:它回答了問題:“Broker 收到我的消息了嗎?”
2. ReturnCallback: 確認消息是否路由到 Queue
這是一個補充機制,處理的是一個更細分的場景。它在 ConfirmCallback
返回成功 (ack=true
) 的前提下才可能被觸發。
- 觸發時機:當消息已成功到達 Exchange,但 Exchange 無法根據路由鍵 (Routing Key) 將消息路由到任何一個綁定的隊列時,Broker 會將這條“無法投遞”的消息退回給生產者,并調用此回調。
- 如何工作:
- 如果消息被正常路由到一個或多個隊列,
ReturnCallback
不會被觸發。 - 如果消息無法路由(例如,路由鍵寫錯,或者沒有隊列綁定這個路由鍵),回調函數將被調用,你可以從
ReturnedMessage
參數中獲取到被退回的消息內容、路由信息和退回原因。
- 如果消息被正常路由到一個或多個隊列,
- 作用:它回答了問題:“我發給 Exchange 的消息,有隊列接收它嗎?”
- 回調參數:回調函數中有?個參數:
ReturnedMessage
包含以下屬性
public class ReturnedMessage {//返回的消息對象,包含了消息體和消息屬性private final Message message;//由Broker提供的回復碼, 表?消息?法路由的原因. 通常是?個數字代碼,每個數字代表不同的含義.private final int replyCode;//?個?本字符串, 提供了?法路由消息的額外信息或錯誤描述.private final String replyText;//消息被發送到的交換機名稱private final String exchange;//消息的路由鍵,即發送消息時指定的鍵private final String routingKey;
}
工作流程圖
下圖清晰地展示了這兩種回調機制在消息發送過程中的作用點:
demo演練
接下來,我們通過一個 Spring Boot 項目來演示如何實現發布者確認。
項目結構
一個簡單的 Spring Boot 項目結構如下:
此處的
com.example.ackdemo
要改成讀者自己的項目路徑,包括后面相關代碼的路徑引入也需要進行對應修改
ack-demo
├── src
│ ├── main
│ │ ├── java
│ │ │ └── com
│ │ │ └── example
│ │ │ └── publishackdemo
│ │ │ │ ├── RabbitMQConfig.java
│ │ │ │ └── RabbitTemplateConfig.java
│ │ │ │ └── MessageController.java
│ │ │ └── AckDemoApplication.java
│ │ └── resources
│ │ └── application.yml
└── pom.xml
配置發布者確認模式
在 src/main/resources/application.yml
文件中,進行如下配置:
spring:application:name: ackDemorabbitmq:host: localhostport: 5672username: guestpassword: guestpublisher-confirm-type: correlated # 開啟ConfirmCallback,correlated表示回調時會攜帶CorrelationDatapublisher-returns: true # 開啟ReturnCallback
聲明 Exchange 和 Queue
在 config/RabbitMQConfig.java
中聲明我們需要的交換機和隊列。
注意此處引入的包為
org.springframework.amqp.core
!
package com.example.publishackdemo.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration("RabbitMQConfigWithProductACK")
public class RabbitMQConfig {// 此處的常量提取到一個單獨的靜態類中更好,此處為了方便演式不單獨提取public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";public static final String CONFIRM_QUEUE_NAME = "confirm.queue";public static final String CONFIRM_ROUTING_KEY = "key.confirm";@Beanpublic TopicExchange confirmExchange() {return ExchangeBuilder.topicExchange(CONFIRM_EXCHANGE_NAME).durable(true).build();}@Beanpublic Queue confirmQueue() {return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}@Beanpublic Binding confirmBinding(Queue confirmQueue, TopicExchange confirmExchange) {return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);}
}
配置 RabbitTemplate 回調
這是核心步驟。我們創建一個配置類,專門用于定制 RabbitTemplate
,并為其設置回調。
package com.example.publishackdemo.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; @Slf4j
@Configuration("RabbitMQConfigWithPublisherAck")
public class RabbitTemplateConfig { @Bean("rabbitTemplate") public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { return new RabbitTemplate(connectionFactory); } @Bean("confirmRabbitTemplate") // 給這個新的Bean起一個唯一的名字 public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); // 關鍵:只為這個新的實例設置回調 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { String id = (correlationData != null) ? correlationData.getId() : ""; if (ack) { log.info("ConfirmCallback: 消息發送成功!ID: {}", id); } else { log.error("ConfirmCallback: 消息發送失敗!ID: {}, 原因: {}", id, cause); } }); // 同樣可以設置 ReturnsCallback//rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnsCallback(returnedMessage -> { log.warn("ReturnsCallback: 消息被退回! Message: {}, ReplyCode: {}, ReplyText: {}, Exchange: {}, RoutingKey: {}", new String(returnedMessage.getMessage().getBody()), returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getExchange(), returnedMessage.getRoutingKey()); // 此處可以記錄無法路由的消息,用于后續分析或處理 }); return rabbitTemplate; }
}
此處配置兩個
RabbitTemplate
的原因?
setConfirmCallback
只能被設置一次,如果直接在Controller
層里面調用的時候聲明,那么每次請求都會調用一次設置的代碼,會導致第二次之后的請求都會報錯,所以需要提取到Config
里面- 在設置
RabbitTemplate
的setReturnsCallback
或者setConfirmCallback
設置之后會全局生效,如果并不需要進行發送確認的生產者也使用了這個Template
那么會導致性能下降等問題,所以創建兩個不同的Bean
就是為了在不同情況選擇不同的Bean
對象
生產者代碼 (Publisher)
修改 MessageController
,增加幾個測試接口來模擬不同場景。
package com.example.ackdemo.controller;import com.example.ackdemo.config.RabbitMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;import java.util.UUID;@Slf4j
@RestController
public class MessageController {@Resource(name = "confirmRabbitTemplate")private RabbitTemplate rabbitTemplate;// 1. 測試正常發送@GetMapping("/send/ok")public String sendOkMessage() {String id = UUID.randomUUID().toString();String message = "A correct message.";log.info("Sending message with ID: {}", id);rabbitTemplate.convertAndSend(RabbitMQConfig.CONFIRM_EXCHANGE_NAME, RabbitMQConfig.CONFIRM_ROUTING_KEY, message, new CorrelationData(id));return "Message sent (OK). Check logs for callback.";}// 2. 測試發送到不存在的 Exchange@GetMapping("/send/bad-exchange")public String sendToBadExchange() {String id = UUID.randomUUID().toString();String message = "Message to a non-existent exchange.";log.info("Sending message with ID: {} to a bad exchange", id);rabbitTemplate.convertAndSend("non-existent-exchange", RabbitMQConfig.CONFIRM_ROUTING_KEY, message, new CorrelationData(id));return "Message sent (Bad Exchange). Check logs for callback.";}// 3. 測試發送到正確的 Exchange,但錯誤的 Routing Key@GetMapping("/send/bad-routing")public String sendWithBadRoutingKey() {String id = UUID.randomUUID().toString();String message = "Message with a bad routing key.";log.info("Sending message with ID: {} with a bad routing key", id);rabbitTemplate.convertAndSend(RabbitMQConfig.CONFIRM_EXCHANGE_NAME, "wrong.key.123", message, new CorrelationData(id));return "Message sent (Bad Routing). Check logs for callback.";}
}
運行與驗證
- 啟動應用。
- 驗證成功場景:
- 訪問
http://localhost:8080/send/ok
。 - 日志打印:你會看到兩條日志,說明回調已經正確設置。
- 訪問
2025-07-11T19:59:56.682+08:00 INFO 11868 --- [mqDemo] [nio-8080-exec-1] c.d.m.p.PublisherController : Sending message with ID: 679ddb2f-0bfe-4ec6-b77e-4dd44c1612e6
2025-07-11T19:59:56.980+08:00 INFO 11868 --- [mqDemo] [nectionFactory2] c.d.m.p.RabbitTemplateConfig : ConfirmCallback: 消息發送成功!ID: 679ddb2f-0bfe-4ec6-b77e-4dd44c1612e6
- 觀察 RabbitMQ 管理界面:
confirm.queue
中會有一條消息。
- 驗證交換機失敗場景:
- 訪問
http://localhost:8080/send/bad-exchange
。 - 日志打印:只會觸發
ConfirmCallback
的失敗回調。
- 訪問
reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost 'demo', class-id=60, method-id=40)
2025-07-11T20:02:41.784+08:00 ERROR 40760 --- [mqDemo] [nectionFactory3] c.d.m.p.RabbitTemplateConfig : ConfirmCallback: 消息發送失敗!ID: bf93981e-f0a7-485d-bddf-cd5aec3e299f, 原因: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost 'demo', class-id=60, method-id=40)
- 驗證路由失敗場景:
- 訪問
http://localhost:8080/send/bad-routing
。 - 日志打印:你會看到先觸發成功的
ConfirmCallback
(因為消息確實到達了 Exchange) - 但是并沒有觸發
ReturnsCallback
,為什么?- 因為需要在其方法前添加一行
rabbitTemplate.setMandatory(true);
來開啟此功能
- 因為需要在其方法前添加一行
- 訪問
2025-07-11T20:13:00.788+08:00 INFO 10852 --- [mqDemo] [nio-8080-exec-1] c.d.m.p.PublisherController : Sending message with ID: 24753c11-7341-4836-8c74-79a0deae8f3b with a bad routing key
2025-07-11T20:13:01.023+08:00 WARN 10852 --- [mqDemo] [nectionFactory2] c.d.m.p.RabbitTemplateConfig : ReturnsCallback: 消息被退回! Message: Message with a bad routing key., ReplyCode: 312, ReplyText: NO_ROUTE, Exchange: confirm.exchange, RoutingKey: wrong.key.123
2025-07-11T20:13:01.023+08:00 INFO 10852 --- [mqDemo] [nectionFactory3] c.d.m.p.RabbitTemplateConfig : ConfirmCallback: 消息發送成功!ID: 24753c11-7341-4836-8c74-79a0deae8f3b
生產環境注意事項
-
為失敗做好準備:收到
nack
或return
回調后,必須有相應的補償機制。常見的策略包括:- 有限重試:對于網絡抖動等臨時性故障,可以進行幾次延時重試。
- 記錄日志與告警:對于持續失敗或邏輯錯誤(如錯誤的Exchange/RoutingKey),應詳細記錄日志,并觸發告警通知開發人員介入。
- 消息入庫:將發送失敗的消息存入數據庫或本地文件,通過定時任務進行重發,這是最可靠的補償方式。
-
善用
CorrelationData
:在異步高并發場景下,CorrelationData
是你識別哪條消息得到確認的唯一憑證。它的 ID 應該具有業務唯一性(如訂單ID、業務流水號),以便于追蹤和排錯。 -
性能權衡:開啟發布者確認會增加網絡開銷和 Broker 的 CPU 負擔,從而降低消息發送的吞吐量。對于可以容忍少量丟失的非核心業務(如打點日志),可以關閉此功能以追求性能。
-
全局回調 vs. 單次發送回調:我們演示的是全局配置
RabbitTemplate
的回調。RabbitTemplate
也支持為單次send
操作指定一個臨時的CorrelationData
,它內部可以包含更豐富的回調邏輯,適用于需要對特定消息進行特殊處理的場景。 -
構建完整的可靠性鏈路:切記,發布者確認只是可靠性拼圖的一部分。一個完整的可靠性方案必須是:發布者確認 + 持久化(交換機、隊列、消息)+ 消費者確認。三者結合,才能最大限度地保證消息在整個生命周期內的安全。
總結
RabbitMQ 的發布者確認機制,通過 ConfirmCallback
和 ReturnCallback
兩個強大的工具,為我們彌補了消息從生產者到 Broker 這一段路程中的可靠性盲區。