深入理解 RabbitMQ:從底層原理到實戰落地的全維度指南

引言:

  • 本文總字數:約 18500 字
  • 預計閱讀時間:45 分鐘

為什么我們需要 RabbitMQ?

在當今分布式系統架構中,消息隊列已成為不可或缺的核心組件。想象一下,當你在電商平臺下單時,系統需要處理庫存扣減、訂單創建、支付處理、物流通知等一系列操作。如果這些操作都同步執行,任何一個環節的延遲都會導致整個流程卡頓,用戶體驗將大打折扣。

RabbitMQ 作為一款高性能、可靠的消息中間件,正是為解決這類問題而生。它采用先進的消息傳遞機制,實現了系統間的異步通信,不僅提高了系統吞吐量,還增強了系統的容錯能力和可擴展性。

根據 RabbitMQ 官方文檔(RabbitMQ Documentation | RabbitMQ),RabbitMQ 是一個開源的消息代理軟件,實現了高級消息隊列協議(AMQP),支持多種消息傳遞模式,能夠滿足復雜分布式系統的通信需求。

一、RabbitMQ 核心概念與架構

1.1 核心組件

RabbitMQ 的核心組件包括:

  • 生產者(Producer):發送消息的應用程序
  • 消費者(Consumer):接收并處理消息的應用程序
  • 交換機(Exchange):接收生產者發送的消息,并根據路由規則將消息路由到一個或多個隊列
  • 隊列(Queue):存儲消息的緩沖區
  • 綁定(Binding):交換機和隊列之間的關聯關系,包含路由規則
  • 路由鍵(Routing Key):消息的屬性,用于交換機將消息路由到合適的隊列

1.2 工作流程

RabbitMQ 的基本工作流程如下:

  1. 生產者創建消息,并指定消息的路由鍵和交換機
  2. 生產者將消息發送到指定的交換機
  3. 交換機根據預設的綁定規則和消息的路由鍵,將消息路由到一個或多個隊列
  4. 消費者監聽隊列,當有消息到達時,接收并處理消息

1.3 為什么選擇 RabbitMQ?

與其他消息隊列相比,RabbitMQ 具有以下優勢:

  1. 可靠性高:支持消息持久化、確認機制和鏡像隊列,確保消息不丟失
  2. 靈活的路由機制:提供多種交換機類型,支持復雜的路由場景
  3. 多協議支持:除了 AMQP,還支持 STOMP、MQTT 等多種協議
  4. 易于擴展:支持集群部署,可根據需求動態擴展
  5. 豐富的客戶端:幾乎所有主流編程語言都有 RabbitMQ 客戶端
  6. 管理界面友好:提供直觀的 Web 管理界面,方便監控和管理

根據 2023 年 JetBrains 開發者生態系統調查,RabbitMQ 在消息隊列領域的使用率排名第二,僅次于 Kafka,尤其在企業級應用中廣泛采用。

二、RabbitMQ 環境搭建

2.1 安裝 RabbitMQ

Docker 安裝(推薦)
# 拉取RabbitMQ鏡像(帶管理界面)
docker pull rabbitmq:3.13-management# 啟動RabbitMQ容器
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 \-e RABBITMQ_DEFAULT_USER=admin \-e RABBITMQ_DEFAULT_PASS=admin \rabbitmq:3.13-management
手動安裝(以 Ubuntu 為例)
# 更新包列表
sudo apt update# 安裝Erlang(RabbitMQ依賴)
sudo apt install -y erlang# 添加RabbitMQ倉庫
echo "deb https://dl.bintray.com/rabbitmq/debian bionic main" | sudo tee /etc/apt/sources.list.d/rabbitmq.list# 導入簽名密鑰
wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -# 安裝RabbitMQ
sudo apt update
sudo apt install -y rabbitmq-server# 啟動RabbitMQ服務
sudo systemctl start rabbitmq-server# 啟用管理插件
sudo rabbitmq-plugins enable rabbitmq_management# 設置開機自啟
sudo systemctl enable rabbitmq-server

安裝完成后,可通過訪問http://localhost:15672?打開管理界面,使用默認賬號 guest/guest 登錄(注意:默認賬號只允許本地訪問)。

2.2 配置 RabbitMQ

創建自定義用戶和虛擬主機:

# 進入容器
docker exec -it rabbitmq bash# 創建用戶
rabbitmqctl add_user jamguo password# 設置用戶角色為管理員
rabbitmqctl set_user_tags jamguo administrator# 創建虛擬主機
rabbitmqctl add_vhost my_vhost# 授權用戶訪問虛擬主機
rabbitmqctl set_permissions -p my_vhost jamguo ".*" ".*" ".*"

三、Java 客戶端開發環境搭建

3.1 Maven 依賴配置

創建一個 Spring Boot 項目,在 pom.xml 中添加以下依賴:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.2.0</version><relativePath/></parent><groupId>com.jamguo.rabbitmq</groupId><artifactId>rabbitmq-demo</artifactId><version>0.0.1-SNAPSHOT</version><name>rabbitmq-demo</name><description>RabbitMQ Demo Project by JamGuo</description><properties><java.version>17</java.version></properties><dependencies><!-- Spring Boot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!-- Spring AMQP (RabbitMQ) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- Lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.30</version><scope>provided</scope></dependency><!-- Commons Lang3 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.14.0</version></dependency><!-- Spring Boot Starter Test --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- Spring AMQP Test --><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency><!-- Swagger3 --><dependency><groupId>org.springdoc</groupId><artifactId>springdoc-openapi-starter-webmvc-ui</artifactId><version>2.1.0</version></dependency><!-- MyBatis-Plus --><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.5</version></dependency><!-- MySQL Driver --><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><version>8.3.0</version><scope>runtime</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build>
</project>

3.2 配置文件

創建 application.yml 配置文件:

spring:application:name: rabbitmq-demorabbitmq:host: localhostport: 5672username: jamguopassword: passwordvirtual-host: my_vhost# 連接超時時間(毫秒)connection-timeout: 10000# 生產者配置publisher-confirm-type: correlatedpublisher-returns: true# 消費者配置listener:simple:# 手動確認模式acknowledge-mode: manual# 并發消費者數量concurrency: 5# 最大并發消費者數量max-concurrency: 10# 限制消費者在單個請求中預取的消息數量prefetch: 10# 數據庫配置datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/rabbitmq_demo?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghaiusername: rootpassword: root# MyBatis-Plus配置
mybatis-plus:mapper-locations: classpath*:mapper/**/*.xmltype-aliases-package: com.jamguo.rabbitmq.entityconfiguration:map-underscore-to-camel-case: truelog-impl: org.apache.ibatis.logging.stdout.StdOutImpl# Swagger配置
springdoc:api-docs:path: /api-docsswagger-ui:path: /swagger-ui.htmloperationsSorter: method# 日志配置
logging:level:com.jamguo.rabbitmq: debugorg.springframework.amqp: info

3.3 啟動類

package com.jamguo.rabbitmq;import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;import io.swagger.v3.oas.annotations.OpenAPIDefinition;
import io.swagger.v3.oas.annotations.info.Info;/*** RabbitMQ示例應用啟動類* * @author jamguo*/
@SpringBootApplication
@MapperScan("com.jamguo.rabbitmq.mapper")
@EnableScheduling
@OpenAPIDefinition(info = @Info(title = "RabbitMQ示例API",version = "1.0",description = "RabbitMQ各種用法示例接口文檔")
)
public class RabbitmqDemoApplication {public static void main(String[] args) {SpringApplication.run(RabbitmqDemoApplication.class, args);}}

四、RabbitMQ 交換機類型詳解

RabbitMQ 提供了多種交換機類型,每種類型有不同的路由策略,適用于不同的業務場景。

4.1 直接交換機(Direct Exchange)

直接交換機是最簡單的交換機類型,它根據消息的路由鍵(Routing Key)與綁定的路由鍵進行精確匹配,將消息路由到對應的隊列。

工作原理

代碼實現

1. 常量定義

package com.jamguo.rabbitmq.constant;/*** RabbitMQ常量類* * @author jamguo*/
public class RabbitMqConstant {/*** 直接交換機名稱*/public static final String DIRECT_EXCHANGE = "direct_exchange";/*** 訂單創建隊列*/public static final String ORDER_CREATE_QUEUE = "order_create_queue";/*** 訂單支付隊列*/public static final String ORDER_PAY_QUEUE = "order_pay_queue";/*** 訂單創建路由鍵*/public static final String ORDER_CREATE_ROUTING_KEY = "order.create";/*** 訂單支付路由鍵*/public static final String ORDER_PAY_ROUTING_KEY = "order.pay";}

2. 配置類

package com.jamguo.rabbitmq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import com.jamguo.rabbitmq.constant.RabbitMqConstant;import lombok.extern.slf4j.Slf4j;/*** 直接交換機配置類* * @author jamguo*/
@Configuration
@Slf4j
public class DirectExchangeConfig {/*** 創建直接交換機* * @return 直接交換機實例*/@Beanpublic DirectExchange directExchange() {// 參數說明:交換機名稱、是否持久化、是否自動刪除、附加參數DirectExchange exchange = new DirectExchange(RabbitMqConstant.DIRECT_EXCHANGE, true, false, null);log.info("創建直接交換機: {}", RabbitMqConstant.DIRECT_EXCHANGE);return exchange;}/*** 創建訂單創建隊列* * @return 隊列實例*/@Beanpublic Queue orderCreateQueue() {// 參數說明:隊列名稱、是否持久化、是否排他、是否自動刪除、附加參數Queue queue = new Queue(RabbitMqConstant.ORDER_CREATE_QUEUE, true, false, false, null);log.info("創建訂單創建隊列: {}", RabbitMqConstant.ORDER_CREATE_QUEUE);return queue;}/*** 創建訂單支付隊列* * @return 隊列實例*/@Beanpublic Queue orderPayQueue() {Queue queue = new Queue(RabbitMqConstant.ORDER_PAY_QUEUE, true, false, false, null);log.info("創建訂單支付隊列: {}", RabbitMqConstant.ORDER_PAY_QUEUE);return queue;}/*** 綁定訂單創建隊列到直接交換機* * @param orderCreateQueue 訂單創建隊列* @param directExchange 直接交換機* @return 綁定關系*/@Beanpublic Binding bindOrderCreateQueue(Queue orderCreateQueue, DirectExchange directExchange) {Binding binding = BindingBuilder.bind(orderCreateQueue).to(directExchange).with(RabbitMqConstant.ORDER_CREATE_ROUTING_KEY);log.info("綁定隊列 {} 到交換機 {},路由鍵: {}", RabbitMqConstant.ORDER_CREATE_QUEUE,RabbitMqConstant.DIRECT_EXCHANGE,RabbitMqConstant.ORDER_CREATE_ROUTING_KEY);return binding;}/*** 綁定訂單支付隊列到直接交換機* * @param orderPayQueue 訂單支付隊列* @param directExchange 直接交換機* @return 綁定關系*/@Beanpublic Binding bindOrderPayQueue(Queue orderPayQueue, DirectExchange directExchange) {Binding binding = BindingBuilder.bind(orderPayQueue).to(directExchange).with(RabbitMqConstant.ORDER_PAY_ROUTING_KEY);log.info("綁定隊列 {} 到交換機 {},路由鍵: {}", RabbitMqConstant.ORDER_PAY_QUEUE,RabbitMqConstant.DIRECT_EXCHANGE,RabbitMqConstant.ORDER_PAY_ROUTING_KEY);return binding;}}

3. 消息實體類

package com.jamguo.rabbitmq.entity;import java.io.Serializable;
import java.math.BigDecimal;
import java.time.LocalDateTime;import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;/*** 訂單消息實體類* * @author jamguo*/
@Data
@Schema(description = "訂單消息實體")
public class OrderMessage implements Serializable {private static final long serialVersionUID = 1L;@Schema(description = "訂單ID")private Long orderId;@Schema(description = "用戶ID")private Long userId;@Schema(description = "訂單金額")private BigDecimal amount;@Schema(description = "訂單狀態")private Integer status;@Schema(description = "創建時間")@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")@JsonSerialize(using = LocalDateTimeSerializer.class)@JsonDeserialize(using = LocalDateTimeDeserializer.class)private LocalDateTime createTime;}

4. 生產者

package com.jamguo.rabbitmq.producer;import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.UUID;import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import com.jamguo.rabbitmq.constant.RabbitMqConstant;
import com.jamguo.rabbitmq.entity.OrderMessage;import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;/*** 直接交換機生產者* * @author jamguo*/
@Component
@Slf4j
@Tag(name = "直接交換機生產者", description = "用于發送消息到直接交換機")
public class DirectExchangeProducer {private final RabbitTemplate rabbitTemplate;@Autowiredpublic DirectExchangeProducer(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;// 設置確認回調this.rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {String messageId = correlationData != null ? correlationData.getId() : "unknown";if (ack) {log.info("消息 [{}] 成功發送到交換機", messageId);} else {log.error("消息 [{}] 發送到交換機失敗,原因: {}", messageId, cause);}});// 設置返回回調this.rabbitTemplate.setReturnsCallback(returnedMessage -> {log.error("消息路由失敗: 交換機={}, 路由鍵={}, 消息={}, 回復碼={}, 回復文本={}",returnedMessage.getExchange(),returnedMessage.getRoutingKey(),new String(returnedMessage.getMessage().getBody()),returnedMessage.getReplyCode(),returnedMessage.getReplyText());});}/*** 發送訂單創建消息* * @param orderId 訂單ID* @param userId 用戶ID* @param amount 訂單金額*/@Operation(summary = "發送訂單創建消息", description = "創建訂單后發送消息到訂單創建隊列")public void sendOrderCreateMessage(Long orderId, Long userId, BigDecimal amount) {Objects.requireNonNull(orderId, "訂單ID不能為空");Objects.requireNonNull(userId, "用戶ID不能為空");Objects.requireNonNull(amount, "訂單金額不能為空");// 創建訂單消息OrderMessage orderMessage = new OrderMessage();orderMessage.setOrderId(orderId);orderMessage.setUserId(userId);orderMessage.setAmount(amount);orderMessage.setStatus(1); // 1表示訂單創建orderMessage.setCreateTime(LocalDateTime.now());// 生成消息IDString messageId = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(messageId);log.info("準備發送訂單創建消息: {}", orderMessage);// 發送消息rabbitTemplate.convertAndSend(RabbitMqConstant.DIRECT_EXCHANGE,RabbitMqConstant.ORDER_CREATE_ROUTING_KEY,orderMessage,correlationData);}/*** 發送訂單支付消息* * @param orderId 訂單ID* @param userId 用戶ID* @param amount 支付金額*/@Operation(summary = "發送訂單支付消息", description = "訂單支付后發送消息到訂單支付隊列")public void sendOrderPayMessage(Long orderId, Long userId, BigDecimal amount) {Objects.requireNonNull(orderId, "訂單ID不能為空");Objects.requireNonNull(userId, "用戶ID不能為空");Objects.requireNonNull(amount, "支付金額不能為空");// 創建訂單消息OrderMessage orderMessage = new OrderMessage();orderMessage.setOrderId(orderId);orderMessage.setUserId(userId);orderMessage.setAmount(amount);orderMessage.setStatus(2); // 2表示訂單已支付orderMessage.setCreateTime(LocalDateTime.now());// 生成消息IDString messageId = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(messageId);log.info("準備發送訂單支付消息: {}", orderMessage);// 發送消息rabbitTemplate.convertAndSend(RabbitMqConstant.DIRECT_EXCHANGE,RabbitMqConstant.ORDER_PAY_ROUTING_KEY,orderMessage,correlationData);}}

5. 消費者

package com.jamguo.rabbitmq.consumer;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import com.fasterxml.jackson.databind.ObjectMapper;
import com.jamguo.rabbitmq.constant.RabbitMqConstant;
import com.jamguo.rabbitmq.entity.OrderMessage;
import com.rabbitmq.client.Channel;import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;import java.io.IOException;
import java.util.Objects;/*** 直接交換機消費者* * @author jamguo*/
@Component
@Slf4j
@Tag(name = "直接交換機消費者", description = "用于消費直接交換機路由的消息")
public class DirectExchangeConsumer {private final ObjectMapper objectMapper;public DirectExchangeConsumer(ObjectMapper objectMapper) {this.objectMapper = objectMapper;}/*** 處理訂單創建消息* * @param message 消息對象* @param channel 通道對象* @throws IOException IO異常*/@RabbitListener(queues = RabbitMqConstant.ORDER_CREATE_QUEUE)public void handleOrderCreateMessage(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {// 解析消息OrderMessage orderMessage = objectMapper.readValue(message.getBody(), OrderMessage.class);log.info("收到訂單創建消息: {}", orderMessage);// 處理訂單創建業務邏輯(示例)processOrderCreation(orderMessage);// 手動確認消息channel.basicAck(deliveryTag, false);log.info("訂單創建消息處理完成,消息ID: {}", message.getMessageProperties().getMessageId());} catch (Exception e) {log.error("處理訂單創建消息失敗", e);// 判斷消息是否已經被處理過if (message.getMessageProperties().getRedelivered()) {log.error("消息已經重試過,拒絕再次處理并丟棄: {}", new String(message.getBody()));// 拒絕消息,并設置為不重新入隊channel.basicReject(deliveryTag, false);} else {log.error("消息將重新入隊,等待再次處理: {}", new String(message.getBody()));// 拒絕消息,并設置為重新入隊channel.basicNack(deliveryTag, false, true);}}}/*** 處理訂單支付消息* * @param message 消息對象* @param channel 通道對象* @throws IOException IO異常*/@RabbitListener(queues = RabbitMqConstant.ORDER_PAY_QUEUE)public void handleOrderPayMessage(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {// 解析消息OrderMessage orderMessage = objectMapper.readValue(message.getBody(), OrderMessage.class);log.info("收到訂單支付消息: {}", orderMessage);// 處理訂單支付業務邏輯(示例)processOrderPayment(orderMessage);// 手動確認消息channel.basicAck(deliveryTag, false);log.info("訂單支付消息處理完成,消息ID: {}", message.getMessageProperties().getMessageId());} catch (Exception e) {log.error("處理訂單支付消息失敗", e);// 判斷消息是否已經被處理過if (message.getMessageProperties().getRedelivered()) {log.error("消息已經重試過,拒絕再次處理并丟棄: {}", new String(message.getBody()));// 拒絕消息,并設置為不重新入隊channel.basicReject(deliveryTag, false);} else {log.error("消息將重新入隊,等待再次處理: {}", new String(message.getBody()));// 拒絕消息,并設置為重新入隊channel.basicNack(deliveryTag, false, true);}}}/*** 處理訂單創建業務邏輯* * @param orderMessage 訂單消息*/private void processOrderCreation(OrderMessage orderMessage) {// 這里是訂單創建的業務邏輯處理// 例如:更新訂單狀態、通知庫存系統等log.info("處理訂單創建: 訂單ID={}, 用戶ID={}, 金額={}",orderMessage.getOrderId(),orderMessage.getUserId(),orderMessage.getAmount());}/*** 處理訂單支付業務邏輯* * @param orderMessage 訂單消息*/private void processOrderPayment(OrderMessage orderMessage) {// 這里是訂單支付的業務邏輯處理// 例如:更新支付狀態、通知物流系統等log.info("處理訂單支付: 訂單ID={}, 用戶ID={}, 支付金額={}",orderMessage.getOrderId(),orderMessage.getUserId(),orderMessage.getAmount());}}

6. 測試控制器

package com.jamguo.rabbitmq.controller;import java.math.BigDecimal;
import java.util.Random;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import com.jamguo.rabbitmq.producer.DirectExchangeProducer;import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;/*** 直接交換機測試控制器* * @author jamguo*/
@RestController
@RequestMapping("/api/direct")
@Slf4j
@Tag(name = "直接交換機測試接口", description = "用于測試直接交換機的消息發送")
public class DirectExchangeController {private final DirectExchangeProducer directExchangeProducer;private final Random random = new Random();@Autowiredpublic DirectExchangeController(DirectExchangeProducer directExchangeProducer) {this.directExchangeProducer = directExchangeProducer;}/*** 發送訂單創建消息* * @param userId 用戶ID* @param amount 訂單金額* @return 響應信息*/@PostMapping("/sendOrderCreate")@Operation(summary = "發送訂單創建消息", description = "創建訂單并發送消息到訂單創建隊列")public String sendOrderCreateMessage(@Parameter(description = "用戶ID", required = true) @RequestParam Long userId,@Parameter(description = "訂單金額", required = true) @RequestParam BigDecimal amount) {// 生成隨機訂單IDLong orderId = System.currentTimeMillis() + random.nextLong(1000);directExchangeProducer.sendOrderCreateMessage(orderId, userId, amount);return "訂單創建消息已發送,訂單ID: " + orderId;}/*** 發送訂單支付消息* * @param orderId 訂單ID* @param userId 用戶ID* @param amount 支付金額* @return 響應信息*/@PostMapping("/sendOrderPay")@Operation(summary = "發送訂單支付消息", description = "訂單支付后發送消息到訂單支付隊列")public String sendOrderPayMessage(@Parameter(description = "訂單ID", required = true) @RequestParam Long orderId,@Parameter(description = "用戶ID", required = true) @RequestParam Long userId,@Parameter(description = "支付金額", required = true) @RequestParam BigDecimal amount) {directExchangeProducer.sendOrderPayMessage(orderId, userId, amount);return "訂單支付消息已發送,訂單ID: " + orderId;}}
適用場景

直接交換機適用于需要精確路由的場景,例如:

  • 不同類型的事件處理(如訂單創建、支付、取消等)
  • 任務分發系統,根據任務類型路由到不同的處理隊列

4.2 主題交換機(Topic Exchange)

主題交換機是一種更靈活的交換機類型,它支持使用通配符進行路由鍵匹配,適合實現消息的多播路由。

工作原理

主題交換機使用點分隔的路由鍵(如 "order.create"、"user.login"),并支持兩種通配符:

  • *:匹配一個單詞
  • #:匹配零個或多個單詞

例如:

  • "order.*" 可以匹配 "order.create"、"order.pay",但不能匹配 "order.create.success"
  • "order.#" 可以匹配 "order.create"、"order.pay"、"order.create.success"

代碼實現

1. 常量定義(在 RabbitMqConstant 中添加)

/*** 主題交換機名稱*/
public static final String TOPIC_EXCHANGE = "topic_exchange";/*** 訂單服務隊列*/
public static final String ORDER_SERVICE_QUEUE = "order_service_queue";/*** 登錄日志隊列*/
public static final String LOGIN_LOG_QUEUE = "login_log_queue";/*** 全量日志隊列*/
public static final String FULL_LOG_QUEUE = "full_log_queue";/*** 訂單相關路由鍵前綴*/
public static final String ORDER_ROUTING_KEY_PREFIX = "order.";/*** 登錄相關路由鍵*/
public static final String USER_LOGIN_ROUTING_KEY = "user.login";

2. 配置類

package com.jamguo.rabbitmq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import com.jamguo.rabbitmq.constant.RabbitMqConstant;import lombok.extern.slf4j.Slf4j;/*** 主題交換機配置類* * @author jamguo*/
@Configuration
@Slf4j
public class TopicExchangeConfig {/*** 創建主題交換機* * @return 主題交換機實例*/@Beanpublic TopicExchange topicExchange() {TopicExchange exchange = new TopicExchange(RabbitMqConstant.TOPIC_EXCHANGE,true,false,null);log.info("創建主題交換機: {}", RabbitMqConstant.TOPIC_EXCHANGE);return exchange;}/*** 創建訂單服務隊列* * @return 隊列實例*/@Beanpublic Queue orderServiceQueue() {Queue queue = new Queue(RabbitMqConstant.ORDER_SERVICE_QUEUE,true,false,false,null);log.info("創建訂單服務隊列: {}", RabbitMqConstant.ORDER_SERVICE_QUEUE);return queue;}/*** 創建登錄日志隊列* * @return 隊列實例*/@Beanpublic Queue loginLogQueue() {Queue queue = new Queue(RabbitMqConstant.LOGIN_LOG_QUEUE,true,false,false,null);log.info("創建登錄日志隊列: {}", RabbitMqConstant.LOGIN_LOG_QUEUE);return queue;}/*** 創建全量日志隊列* * @return 隊列實例*/@Beanpublic Queue fullLogQueue() {Queue queue = new Queue(RabbitMqConstant.FULL_LOG_QUEUE,true,false,false,null);log.info("創建全量日志隊列: {}", RabbitMqConstant.FULL_LOG_QUEUE);return queue;}/*** 綁定訂單服務隊列到主題交換機* * @param orderServiceQueue 訂單服務隊列* @param topicExchange 主題交換機* @return 綁定關系*/@Beanpublic Binding bindOrderServiceQueue(Queue orderServiceQueue, TopicExchange topicExchange) {// 綁定鍵為 "order.*",匹配所有以 "order." 開頭的路由鍵Binding binding = BindingBuilder.bind(orderServiceQueue).to(topicExchange).with("order.*");log.info("綁定隊列 {} 到交換機 {},綁定鍵: order.*",RabbitMqConstant.ORDER_SERVICE_QUEUE,RabbitMqConstant.TOPIC_EXCHANGE);return binding;}/*** 綁定登錄日志隊列到主題交換機* * @param loginLogQueue 登錄日志隊列* @param topicExchange 主題交換機* @return 綁定關系*/@Beanpublic Binding bindLoginLogQueue(Queue loginLogQueue, TopicExchange topicExchange) {// 綁定鍵為 "*.login",匹配所有以 ".login" 結尾的路由鍵Binding binding = BindingBuilder.bind(loginLogQueue).to(topicExchange).with("*.login");log.info("綁定隊列 {} 到交換機 {},綁定鍵: *.login",RabbitMqConstant.LOGIN_LOG_QUEUE,RabbitMqConstant.TOPIC_EXCHANGE);return binding;}/*** 綁定全量日志隊列到主題交換機* * @param fullLogQueue 全量日志隊列* @param topicExchange 主題交換機* @return 綁定關系*/@Beanpublic Binding bindFullLogQueue(Queue fullLogQueue, TopicExchange topicExchange) {// 綁定鍵為 "#",匹配所有路由鍵Binding binding = BindingBuilder.bind(fullLogQueue).to(topicExchange).with("#");log.info("綁定隊列 {} 到交換機 {},綁定鍵: #",RabbitMqConstant.FULL_LOG_QUEUE,RabbitMqConstant.TOPIC_EXCHANGE);return binding;}}

3. 消息實體類

package com.jamguo.rabbitmq.entity;import java.io.Serializable;
import java.time.LocalDateTime;import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;/*** 系統日志消息實體類* * @author jamguo*/
@Data
@Schema(description = "系統日志消息實體")
public class SystemLogMessage implements Serializable {private static final long serialVersionUID = 1L;@Schema(description = "日志ID")private String logId;@Schema(description = "日志類型")private String logType;@Schema(description = "用戶ID")private Long userId;@Schema(description = "用戶名")private String username;@Schema(description = "操作內容")private String operation;@Schema(description = "操作時間")@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")@JsonSerialize(using = LocalDateTimeSerializer.class)@JsonDeserialize(using = LocalDateTimeDeserializer.class)private LocalDateTime operationTime;@Schema(description = "IP地址")private String ipAddress;}

4. 生產者

package com.jamguo.rabbitmq.producer;import java.time.LocalDateTime;
import java.util.UUID;import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import com.jamguo.rabbitmq.constant.RabbitMqConstant;
import com.jamguo.rabbitmq.entity.SystemLogMessage;import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;import java.util.Objects;/*** 主題交換機生產者* * @author jamguo*/
@Component
@Slf4j
@Tag(name = "主題交換機生產者", description = "用于發送消息到主題交換機")
public class TopicExchangeProducer {private final RabbitTemplate rabbitTemplate;@Autowiredpublic TopicExchangeProducer(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;// 設置確認回調this.rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {String messageId = correlationData != null ? correlationData.getId() : "unknown";if (ack) {log.info("主題消息 [{}] 成功發送到交換機", messageId);} else {log.error("主題消息 [{}] 發送到交換機失敗,原因: {}", messageId, cause);}});// 設置返回回調this.rabbitTemplate.setReturnsCallback(returnedMessage -> {log.error("主題消息路由失敗: 交換機={}, 路由鍵={}, 消息={}, 回復碼={}, 回復文本={}",returnedMessage.getExchange(),returnedMessage.getRoutingKey(),new String(returnedMessage.getMessage().getBody()),returnedMessage.getReplyCode(),returnedMessage.getReplyText());});}/*** 發送訂單相關消息* * @param logType 日志類型* @param userId 用戶ID* @param username 用戶名* @param operation 操作內容* @param ipAddress IP地址*/@Operation(summary = "發送訂單相關消息", description = "發送訂單相關的日志消息")public void sendOrderLogMessage(String logType, Long userId, String username, String operation, String ipAddress) {StringUtils.hasText(logType, "日志類型不能為空");Objects.requireNonNull(userId, "用戶ID不能為空");StringUtils.hasText(username, "用戶名不能為空");StringUtils.hasText(operation, "操作內容不能為空");// 創建系統日志消息SystemLogMessage logMessage = new SystemLogMessage();logMessage.setLogId(UUID.randomUUID().toString());logMessage.setLogType(logType);logMessage.setUserId(userId);logMessage.setUsername(username);logMessage.setOperation(operation);logMessage.setOperationTime(LocalDateTime.now());logMessage.setIpAddress(StringUtils.defaultIfBlank(ipAddress, "unknown"));// 生成路由鍵:order.{logType}String routingKey = RabbitMqConstant.ORDER_ROUTING_KEY_PREFIX + logType;// 生成消息IDString messageId = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(messageId);log.info("準備發送訂單日志消息,路由鍵: {}, 消息: {}", routingKey, logMessage);// 發送消息rabbitTemplate.convertAndSend(RabbitMqConstant.TOPIC_EXCHANGE,routingKey,logMessage,correlationData);}/*** 發送用戶登錄消息* * @param userId 用戶ID* @param username 用戶名* @param ipAddress IP地址*/@Operation(summary = "發送用戶登錄消息", description = "發送用戶登錄的日志消息")public void sendUserLoginMessage(Long userId, String username, String ipAddress) {Objects.requireNonNull(userId, "用戶ID不能為空");StringUtils.hasText(username, "用戶名不能為空");// 創建系統日志消息SystemLogMessage logMessage = new SystemLogMessage();logMessage.setLogId(UUID.randomUUID().toString());logMessage.setLogType("login");logMessage.setUserId(userId);logMessage.setUsername(username);logMessage.setOperation("用戶登錄系統");logMessage.setOperationTime(LocalDateTime.now());logMessage.setIpAddress(StringUtils.defaultIfBlank(ipAddress, "unknown"));// 生成消息IDString messageId = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(messageId);log.info("準備發送用戶登錄消息,路由鍵: {}, 消息: {}", RabbitMqConstant.USER_LOGIN_ROUTING_KEY, logMessage);// 發送消息rabbitTemplate.convertAndSend(RabbitMqConstant.TOPIC_EXCHANGE,RabbitMqConstant.USER_LOGIN_ROUTING_KEY,logMessage,correlationData);}}

5. 消費者

package com.jamguo.rabbitmq.consumer;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import com.fasterxml.jackson.databind.ObjectMapper;
import com.jamguo.rabbitmq.constant.RabbitMqConstant;
import com.jamguo.rabbitmq.entity.SystemLogMessage;
import com.rabbitmq.client.Channel;import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.util.Objects;/*** 主題交換機消費者* * @author jamguo*/
@Component
@Slf4j
@Tag(name = "主題交換機消費者", description = "用于消費主題交換機路由的消息")
public class TopicExchangeConsumer {private final ObjectMapper objectMapper;public TopicExchangeConsumer(ObjectMapper objectMapper) {this.objectMapper = objectMapper;}/*** 處理訂單服務消息* * @param message 消息對象* @param channel 通道對象* @throws IOException IO異常*/@RabbitListener(queues = RabbitMqConstant.ORDER_SERVICE_QUEUE)public void handleOrderServiceMessage(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {// 解析消息SystemLogMessage logMessage = objectMapper.readValue(message.getBody(), SystemLogMessage.class);log.info("訂單服務收到消息: {}", logMessage);// 處理訂單相關業務邏輯processOrderMessage(logMessage);// 手動確認消息channel.basicAck(deliveryTag, false);log.info("訂單服務消息處理完成,消息ID: {}", logMessage.getLogId());} catch (Exception e) {log.error("處理訂單服務消息失敗", e);// 消息處理失敗,根據業務需求決定是否重新入隊channel.basicNack(deliveryTag, false, true);}}/*** 處理登錄日志消息* * @param message 消息對象* @param channel 通道對象* @throws IOException IO異常*/@RabbitListener(queues = RabbitMqConstant.LOGIN_LOG_QUEUE)public void handleLoginLogMessage(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {// 解析消息SystemLogMessage logMessage = objectMapper.readValue(message.getBody(), SystemLogMessage.class);log.info("登錄日志服務收到消息: {}", logMessage);// 處理登錄日志業務邏輯processLoginLog(logMessage);// 手動確認消息channel.basicAck(deliveryTag, false);log.info("登錄日志消息處理完成,消息ID: {}", logMessage.getLogId());} catch (Exception e) {log.error("處理登錄日志消息失敗", e);// 消息處理失敗,根據業務需求決定是否重新入隊channel.basicNack(deliveryTag, false, true);}}/*** 處理全量日志消息* * @param message 消息對象* @param channel 通道對象* @throws IOException IO異常*/@RabbitListener(queues = RabbitMqConstant.FULL_LOG_QUEUE)public void handleFullLogMessage(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {// 解析消息SystemLogMessage logMessage = objectMapper.readValue(message.getBody(), SystemLogMessage.class);log.info("全量日志服務收到消息: {}", logMessage);// 處理全量日志業務邏輯processFullLog(logMessage);// 手動確認消息channel.basicAck(deliveryTag, false);log.info("全量日志消息處理完成,消息ID: {}", logMessage.getLogId());} catch (Exception e) {log.error("處理全量日志消息失敗", e);// 消息處理失敗,根據業務需求決定是否重新入隊channel.basicNack(deliveryTag, false, true);}}/*** 處理訂單相關消息* * @param logMessage 日志消息*/private void processOrderMessage(SystemLogMessage logMessage) {// 處理訂單相關業務邏輯log.info("處理訂單消息: 用戶[{}]在[{}]進行了[{}]操作",logMessage.getUsername(),logMessage.getOperationTime(),logMessage.getOperation());}/*** 處理登錄日志* * @param logMessage 日志消息*/private void processLoginLog(SystemLogMessage logMessage) {// 處理登錄日志業務邏輯log.info("記錄登錄日志: 用戶[{}]在[{}]從IP[{}]登錄系統",logMessage.getUsername(),logMessage.getOperationTime(),logMessage.getIpAddress());}/*** 處理全量日志* * @param logMessage 日志消息*/private void processFullLog(SystemLogMessage logMessage) {// 處理全量日志業務邏輯log.info("記錄系統日志: 類型[{}], 用戶[{}], 操作[{}]",logMessage.getLogType(),logMessage.getUsername(),logMessage.getOperation());}}

6. 測試控制器

package com.jamguo.rabbitmq.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import com.jamguo.rabbitmq.producer.TopicExchangeProducer;import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;/*** 主題交換機測試控制器* * @author jamguo*/
@RestController
@RequestMapping("/api/topic")
@Slf4j
@Tag(name = "主題交換機測試接口", description = "用于測試主題交換機的消息發送")
public class TopicExchangeController {private final TopicExchangeProducer topicExchangeProducer;public TopicExchangeController(TopicExchangeProducer topicExchangeProducer) {this.topicExchangeProducer = topicExchangeProducer;}/*** 發送訂單創建日志消息* * @param userId 用戶ID* @param username 用戶名* @param ipAddress IP地址* @return 響應信息*/@PostMapping("/sendOrderCreateLog")@Operation(summary = "發送訂單創建日志消息", description = "發送訂單創建的日志消息")public String sendOrderCreateLog(@Parameter(description = "用戶ID", required = true) @RequestParam Long userId,@Parameter(description = "用戶名", required = true) @RequestParam String username,@Parameter(description = "IP地址") @RequestParam(required = false) String ipAddress) {topicExchangeProducer.sendOrderLogMessage("create", userId, username, "創建新訂單", ipAddress);return "訂單創建日志消息已發送";}/*** 發送訂單支付日志消息* * @param userId 用戶ID* @param username 用戶名* @param ipAddress IP地址* @return 響應信息*/@PostMapping("/sendOrderPayLog")@Operation(summary = "發送訂單支付日志消息", description = "發送訂單支付的日志消息")public String sendOrderPayLog(@Parameter(description = "用戶ID", required = true) @RequestParam Long userId,@Parameter(description = "用戶名", required = true) @RequestParam String username,@Parameter(description = "IP地址") @RequestParam(required = false) String ipAddress) {topicExchangeProducer.sendOrderLogMessage("pay", userId, username, "支付訂單", ipAddress);return "訂單支付日志消息已發送";}/*** 發送用戶登錄消息* * @param userId 用戶ID* @param username 用戶名* @param ipAddress IP地址* @return 響應信息*/@PostMapping("/sendUserLogin")@Operation(summary = "發送用戶登錄消息", description = "發送用戶登錄的日志消息")public String sendUserLogin(@Parameter(description = "用戶ID", required = true) @RequestParam Long userId,@Parameter(description = "用戶名", required = true) @RequestParam String username,@Parameter(description = "IP地址") @RequestParam(required = false) String ipAddress) {topicExchangeProducer.sendUserLoginMessage(userId, username, ipAddress);return "用戶登錄消息已發送";}}
適用場景

主題交換機適用于需要靈活路由的場景,例如:

  • 日志收集系統,根據日志類型和級別進行分類處理
  • 新聞推送系統,根據用戶興趣標簽推送相關新聞
  • 事件總線,實現不同模塊間的解耦通信

4.3 扇形交換機(Fanout Exchange)

扇形交換機是一種廣播類型的交換機,它會將消息路由到所有綁定到它的隊列,忽略路由鍵的存在。

工作原理

扇形交換機不處理路由鍵,無論消息攜帶什么路由鍵,都會被發送到所有綁定到該交換機的隊列。這是一種最簡單的廣播模式。

代碼實現

這里只展示關鍵代碼,其他輔助代碼(如常量定義、消息實體等)與前面類似,不再重復。

1. 配置類

package com.jamguo.rabbitmq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import com.jamguo.rabbitmq.constant.RabbitMqConstant;import lombok.extern.slf4j.Slf4j;/*** 扇形交換機配置類* * @author jamguo*/
@Configuration
@Slf4j
public class FanoutExchangeConfig {/*** 創建扇形交換機* * @return 扇形交換機實例*/@Beanpublic FanoutExchange fanoutExchange() {FanoutExchange exchange = new FanoutExchange(RabbitMqConstant.FANOUT_EXCHANGE,true,false,null);log.info("創建扇形交換機: {}", RabbitMqConstant.FANOUT_EXCHANGE);return exchange;}/*** 創建庫存服務隊列* * @return 隊列實例*/@Beanpublic Queue inventoryServiceQueue() {Queue queue = new Queue(RabbitMqConstant.INVENTORY_SERVICE_QUEUE,true,false,false,null);log.info("創建庫存服務隊列: {}", RabbitMqConstant.INVENTORY_SERVICE_QUEUE);return queue;}/*** 創建物流服務隊列* * @return 隊列實例*/@Beanpublic Queue logisticsServiceQueue() {Queue queue = new Queue(RabbitMqConstant.LOGISTICS_SERVICE_QUEUE,true,false,false,null);log.info("創建物流服務隊列: {}", RabbitMqConstant.LOGISTICS_SERVICE_QUEUE);return queue;}/*** 創建通知服務隊列* * @return 隊列實例*/@Beanpublic Queue notificationServiceQueue() {Queue queue = new Queue(RabbitMqConstant.NOTIFICATION_SERVICE_QUEUE,true,false,false,null);log.info("創建通知服務隊列: {}", RabbitMqConstant.NOTIFICATION_SERVICE_QUEUE);return queue;}/*** 綁定庫存服務隊列到扇形交換機* * @param inventoryServiceQueue 庫存服務隊列* @param fanoutExchange 扇形交換機* @return 綁定關系*/@Beanpublic Binding bindInventoryServiceQueue(Queue inventoryServiceQueue, FanoutExchange fanoutExchange) {// 扇形交換機忽略路由鍵,所以這里可以隨便填Binding binding = BindingBuilder.bind(inventoryServiceQueue).to(fanoutExchange);log.info("綁定隊列 {} 到交換機 {}",RabbitMqConstant.INVENTORY_SERVICE_QUEUE,RabbitMqConstant.FANOUT_EXCHANGE);return binding;}/*** 綁定物流服務隊列到扇形交換機* * @param logisticsServiceQueue 物流服務隊列* @param fanoutExchange 扇形交換機* @return 綁定關系*/@Beanpublic Binding bindLogisticsServiceQueue(Queue logisticsServiceQueue, FanoutExchange fanoutExchange) {Binding binding = BindingBuilder.bind(logisticsServiceQueue).to(fanoutExchange);log.info("綁定隊列 {} 到交換機 {}",RabbitMqConstant.LOGISTICS_SERVICE_QUEUE,RabbitMqConstant.FANOUT_EXCHANGE);return binding;}/*** 綁定通知服務隊列到扇形交換機* * @param notificationServiceQueue 通知服務隊列* @param fanoutExchange 扇形交換機* @return 綁定關系*/@Beanpublic Binding bindNotificationServiceQueue(Queue notificationServiceQueue, FanoutExchange fanoutExchange) {Binding binding = BindingBuilder.bind(notificationServiceQueue).to(fanoutExchange);log.info("綁定隊列 {} 到交換機 {}",RabbitMqConstant.NOTIFICATION_SERVICE_QUEUE,RabbitMqConstant.FANOUT_EXCHANGE);return binding;}}
適用場景

扇形交換機適用于需要廣播消息的場景,例如:

  • 系統通知,需要多個服務同時收到通知
  • 數據同步,多個節點需要保持數據一致
  • 分布式系統中的事件廣播

4.4 Headers 交換機(Headers Exchange)

Headers 交換機與其他類型的交換機不同,它不依賴路由鍵進行路由,而是根據消息的 headers 屬性進行匹配。

工作原理

Headers 交換機使用消息的 headers 屬性進行路由匹配,類似于 HTTP 請求的 headers。綁定隊列時可以指定一組鍵值對,消息被路由到隊列的條件是:

  • 消息的 headers 包含綁定中指定的所有鍵
  • 消息 headers 中對應的值與綁定中指定的值匹配(或者使用 x-match=any,表示只要有一個匹配即可)

Headers 交換機在實際應用中使用較少,因為主題交換機通常可以更簡潔地實現類似功能。

4.5 默認交換機(Default Exchange)

默認交換機是 RabbitMQ 自帶的一個直接交換機,它有以下特點:

  • 沒有名字(空字符串)
  • 每個隊列都會自動綁定到默認交換機,綁定鍵是隊列的名稱
  • 當發送消息時不指定交換機,消息會被發送到默認交換機

默認交換機簡化了簡單場景的使用,例如只需要將消息發送到指定隊列的情況。

五、RabbitMQ 高級特性

5.1 消息確認機制

為了確保消息的可靠傳遞,RabbitMQ 提供了完善的消息確認機制,包括生產者確認和消費者確認。

生產者確認(Publisher Confirm)

生產者確認機制確保消息成功發送到交換機。在配置文件中開啟:

spring:rabbitmq:publisher-confirm-type: correlated
  • none:禁用確認機制
  • simple:簡單確認模式,同步等待確認
  • correlated:關聯確認模式,異步回調確認

代碼實現如前面生產者示例中所示,通過設置ConfirmCallback來處理確認結果。

生產者返回(Publisher Return)

當消息成功發送到交換機,但無法路由到任何隊列時,可以通過返回機制獲取通知:

spring:rabbitmq:publisher-returns: true

代碼實現如前面生產者示例中所示,通過設置ReturnsCallback來處理返回消息。

消費者確認(Consumer Acknowledgment)

消費者確認機制確保消息被成功處理。在配置文件中設置:

spring:rabbitmq:listener:simple:acknowledge-mode: manual
  • none:自動確認,消息一旦被接收,立即確認
  • auto:根據消息處理情況自動確認
  • manual:手動確認,需要調用channel.basicAck()方法

手動確認的三種方式:

  1. channel.basicAck(deliveryTag, multiple):確認消息成功處理
  2. channel.basicNack(deliveryTag, multiple, requeue):否定確認,可以選擇是否重新入隊
  3. channel.basicReject(deliveryTag, requeue):否定單個消息,可以選擇是否重新入隊

5.2 消息持久化

為了防止 RabbitMQ 服務器重啟導致消息丟失,需要配置消息持久化:

  1. 交換機持久化:創建交換機時設置durable=true
  2. 隊列持久化:創建隊列時設置durable=true
  3. 消息持久化:發送消息時設置消息的deliveryMode=2

在 Spring AMQP 中,默認情況下交換機和隊列都是持久化的,消息持久化可以通過以下方式設置:

// 發送持久化消息
rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return msg;
});

5.3 死信隊列(Dead Letter Queue)

死信隊列用于處理無法被正常消費的消息,當消息滿足以下條件之一時,會被發送到死信隊列:

  1. 消息被消費者拒絕(basicRejectbasicNack),并且requeue=false
  2. 消息過期
  3. 隊列達到最大長度,無法再添加新消息
死信隊列配置
/*** 死信交換機*/
@Bean
public DirectExchange deadLetterExchange() {return new DirectExchange(RabbitMqConstant.DEAD_LETTER_EXCHANGE, true, false);
}/*** 死信隊列*/
@Bean
public Queue deadLetterQueue() {return QueueBuilder.durable(RabbitMqConstant.DEAD_LETTER_QUEUE).build();
}/*** 綁定死信隊列到死信交換機*/
@Bean
public Binding bindDeadLetterQueue(Queue deadLetterQueue, DirectExchange deadLetterExchange) {return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with(RabbitMqConstant.DEAD_LETTER_ROUTING_KEY);
}/*** 業務隊列(設置死信相關參數)*/
@Bean
public Queue businessQueue() {return QueueBuilder.durable(RabbitMqConstant.BUSINESS_QUEUE)// 設置死信交換機.withArgument("x-dead-letter-exchange", RabbitMqConstant.DEAD_LETTER_EXCHANGE)// 設置死信路由鍵.withArgument("x-dead-letter-routing-key", RabbitMqConstant.DEAD_LETTER_ROUTING_KEY)// 設置消息過期時間(毫秒).withArgument("x-message-ttl", 60000)// 設置隊列最大長度.withArgument("x-max-length", 1000).build();
}

5.4 延遲隊列

延遲隊列用于處理需要延遲執行的任務,例如訂單超時未支付自動取消、定時提醒等。RabbitMQ 本身不直接支持延遲隊列,但可以通過以下兩種方式實現:

  1. 使用消息的過期時間(TTL)結合死信隊列
  2. 使用 rabbitmq-delayed-message-exchange 插件
使用插件實現延遲隊列

首先安裝插件:

# 下載插件
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez# 復制到容器中
docker cp rabbitmq_delayed_message_exchange-3.13.0.ez rabbitmq:/plugins# 進入容器
docker exec -it rabbitmq bash# 啟用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

配置延遲交換機:

/*** 創建延遲交換機*/
@Bean
public CustomExchange delayedExchange() {Map<String, Object> args = new HashMap<>(1);// 設置交換機類型args.put("x-delayed-type", "direct");// 參數說明:交換機名稱、類型、是否持久化、是否自動刪除、附加參數return new CustomExchange(RabbitMqConstant.DELAYED_EXCHANGE,"x-delayed-message",true,false,args);
}

發送延遲消息:

/*** 發送延遲消息*/
public void sendDelayedMessage(OrderMessage message, long delayMillis) {Objects.requireNonNull(message, "消息不能為空");if (delayMillis <= 0) {throw new IllegalArgumentException("延遲時間必須大于0");}String messageId = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(messageId);log.info("準備發送延遲消息,延遲時間: {}ms,消息: {}", delayMillis, message);// 發送延遲消息,通過x-delay header設置延遲時間(毫秒)rabbitTemplate.convertAndSend(RabbitMqConstant.DELAYED_EXCHANGE,RabbitMqConstant.DELAYED_ROUTING_KEY,message,msg -> {msg.getMessageProperties().setHeader("x-delay", delayMillis);return msg;},correlationData);
}

5.5 優先級隊列

優先級隊列允許消息按照優先級進行排序,優先級高的消息會被優先消費。適用于需要處理緊急任務的場景。

配置優先級隊列:

/*** 創建優先級隊列*/
@Bean
public Queue priorityQueue() {// 設置隊列最大優先級為10Map<String, Object> args = new HashMap<>(1);args.put("x-max-priority", 10);return QueueBuilder.durable(RabbitMqConstant.PRIORITY_QUEUE).withArguments(args).build();
}

發送帶優先級的消息:

/*** 發送帶優先級的消息*/
public void sendPriorityMessage(String content, int priority) {StringUtils.hasText(content, "消息內容不能為空");if (priority < 0 || priority > 10) {throw new IllegalArgumentException("優先級必須在0-10之間");}String messageId = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(messageId);log.info("準備發送優先級消息,優先級: {},內容: {}", priority, content);// 發送帶優先級的消息rabbitTemplate.convertAndSend(RabbitMqConstant.PRIORITY_EXCHANGE,RabbitMqConstant.PRIORITY_ROUTING_KEY,content,msg -> {msg.getMessageProperties().setPriority(priority);return msg;},correlationData);
}

六、RabbitMQ 集群與高可用

對于生產環境,單節點的 RabbitMQ 存在單點故障風險,因此需要部署 RabbitMQ 集群以提高可用性。

6.1 集群架構

RabbitMQ 集群通常由多個節點組成,分為以下角色:

  • 磁盤節點(Disk Node):存儲集群元數據(交換機、隊列、綁定等)
  • 內存節點(RAM Node):元數據存儲在內存中,性能更好

集群中至少需要一個磁盤節點,建議生產環境配置 3 個節點(2 個磁盤節點 + 1 個內存節點)。

6.2 集群搭建(Docker Compose)

創建 docker-compose.yml 文件:

version: '3.8'services:rabbitmq1:image: rabbitmq:3.13-managementcontainer_name: rabbitmq1ports:- "5672:5672"- "15672:15672"environment:- RABBITMQ_DEFAULT_USER=admin- RABBITMQ_DEFAULT_PASS=admin- RABBITMQ_ERLANG_COOKIE=rabbitmq_cookievolumes:- rabbitmq1_data:/var/lib/rabbitmqnetworks:- rabbitmq_networkrabbitmq2:image: rabbitmq:3.13-managementcontainer_name: rabbitmq2ports:- "5673:5672"- "15673:15672"environment:- RABBITMQ_DEFAULT_USER=admin- RABBITMQ_DEFAULT_PASS=admin- RABBITMQ_ERLANG_COOKIE=rabbitmq_cookievolumes:- rabbitmq2_data:/var/lib/rabbitmqnetworks:- rabbitmq_networkdepends_on:- rabbitmq1rabbitmq3:image: rabbitmq:3.13-managementcontainer_name: rabbitmq3ports:- "5674:5672"- "15674:15672"environment:- RABBITMQ_DEFAULT_USER=admin- RABBITMQ_DEFAULT_PASS=admin- RABBITMQ_ERLANG_COOKIE=rabbitmq_cookievolumes:- rabbitmq3_data:/var/lib/rabbitmqnetworks:- rabbitmq_networkdepends_on:- rabbitmq1networks:rabbitmq_network:driver: bridgevolumes:rabbitmq1_data:rabbitmq2_data:rabbitmq3_data:

啟動集群:

docker-compose up -d

將節點加入集群:

# 進入rabbitmq2容器
docker exec -it rabbitmq2 bash# 停止節點
rabbitmqctl stop_app# 加入集群(以內存節點方式)
rabbitmqctl join_cluster --ram rabbit@rabbitmq1# 啟動節點
rabbitmqctl start_app# 退出容器
exit# 進入rabbitmq3容器
docker exec -it rabbitmq3 bash# 停止節點
rabbitmqctl stop_app# 加入集群(以內存節點方式)
rabbitmqctl join_cluster --ram rabbit@rabbitmq1# 啟動節點
rabbitmqctl start_app# 退出容器
exit

查看集群狀態:

docker exec -it rabbitmq1 rabbitmqctl cluster_status

6.3 鏡像隊列(Mirror Queue)

鏡像隊列是 RabbitMQ 提供的高可用方案,它會將隊列鏡像到集群中的多個節點,當主節點故障時,鏡像節點會自動接管。

設置鏡像策略:

# 為所有隊列設置鏡像策略,同步到所有節點
docker exec -it rabbitmq1 rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'# 為特定隊列設置鏡像策略,同步到2個節點
docker exec -it rabbitmq1 rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
  • ha-mode: 鏡像模式
    • all: 鏡像到所有節點
    • exactly: 鏡像到指定數量的節點
    • nodes: 鏡像到指定的節點列表
  • ha-params: 配合 ha-mode 使用的參數
  • ha-sync-mode: 同步模式
    • manual: 手動同步
    • automatic: 自動同步

6.4 客戶端連接集群

Spring Boot 客戶端配置集群:

spring:rabbitmq:addresses: localhost:5672,localhost:5673,localhost:5674username: adminpassword: adminvirtual-host: /connection-timeout: 10000# 開啟自動恢復連接publisher-confirm-type: correlatedpublisher-returns: truelistener:simple:acknowledge-mode: manualconcurrency: 5max-concurrency: 10prefetch: 10

七、RabbitMQ 性能優化

為了充分發揮 RabbitMQ 的性能,需要進行合理的配置和優化。

7.1 生產者優化

  1. 使用批量發送:減少網絡交互次數
// 批量發送消息
public void batchSendMessages(List<OrderMessage> messages) {if (CollectionUtils.isEmpty(messages)) {log.warn("批量發送消息列表為空,不執行發送操作");return;}log.info("準備批量發送 {} 條消息", messages.size());rabbitTemplate.invoke(operations -> {for (OrderMessage message : messages) {operations.convertAndSend(RabbitMqConstant.DIRECT_EXCHANGE,RabbitMqConstant.ORDER_CREATE_ROUTING_KEY,message,new CorrelationData(UUID.randomUUID().toString()));}return null;});log.info("批量發送消息完成,共發送 {} 條", messages.size());
}
  1. 使用異步發送:避免阻塞主線程
  2. 合理設置消息大小:太大的消息會影響性能,建議拆分或使用外部存儲

7.2 消費者優化

  1. 合理設置并發數:根據服務器性能調整消費者并發數
spring:rabbitmq:listener:simple:concurrency: 5    # 最小并發數max-concurrency: 10 # 最大并發數
  1. 設置 prefetch count:控制消費者一次預取的消息數量
spring:rabbitmq:listener:simple:prefetch: 10  # 每次預取10條消息
  1. 消息處理異步化:消費者接收到消息后,盡快確認,然后異步處理業務邏輯

7.3 服務器優化

  1. 調整內存限制:默認情況下,當 RabbitMQ 使用的內存超過節點內存的 40% 時,會停止接收新消息
# 設置內存限制為50%
rabbitmqctl set_vm_memory_high_watermark 0.5# 設置內存限制為固定值(如2GB)
rabbitmqctl set_vm_memory_high_watermark absolute 2GB
  1. 設置磁盤空間限制:當磁盤空間低于指定閾值時,RabbitMQ 會停止接收新消息
# 設置磁盤空間限制為500MB
rabbitmqctl set_disk_free_limit 500MB
  1. 合理選擇存儲類型:根據業務需求選擇合適的存儲類型(如持久化或非持久化)

八、RabbitMQ 在實際項目中的應用

8.1 異步處理

場景:用戶下單后,需要發送短信通知、更新庫存、記錄日志等操作。

使用 RabbitMQ 將這些操作異步化,提高主流程的響應速度:

/*** 訂單服務*/
@Service
@Slf4j
public class OrderService {private final OrderMapper orderMapper;private final RabbitTemplate rabbitTemplate;@Autowiredpublic OrderService(OrderMapper orderMapper, RabbitTemplate rabbitTemplate) {this.orderMapper = orderMapper;this.rabbitTemplate = rabbitTemplate;}/*** 創建訂單*/@Transactionalpublic Order createOrder(OrderCreateDTO orderCreateDTO) {// 參數校驗Objects.requireNonNull(orderCreateDTO, "訂單創建參數不能為空");StringUtils.hasText(orderCreateDTO.getUserId(), "用戶ID不能為空");Objects.requireNonNull(orderCreateDTO.getAmount(), "訂單金額不能為空");// 創建訂單Order order = new Order();order.setOrderId(System.currentTimeMillis());order.setUserId(orderCreateDTO.getUserId());order.setAmount(orderCreateDTO.getAmount());order.setStatus(OrderStatus.CREATED.getCode());order.setCreateTime(LocalDateTime.now());// 保存訂單int rows = orderMapper.insert(order);if (rows != 1) {throw new BusinessException("創建訂單失敗");}log.info("訂單創建成功,訂單ID: {}", order.getOrderId());// 發送訂單創建消息(異步處理后續操作)sendOrderCreatedMessage(order);return order;}/*** 發送訂單創建消息*/private void sendOrderCreatedMessage(Order order) {OrderMessage message = new OrderMessage();message.setOrderId(order.getOrderId());message.setUserId(order.getUserId());message.setAmount(order.getAmount());message.setStatus(order.getStatus());message.setCreateTime(order.getCreateTime());rabbitTemplate.convertAndSend(RabbitMqConstant.ORDER_EXCHANGE,RabbitMqConstant.ORDER_CREATED_ROUTING_KEY,message,new CorrelationData(UUID.randomUUID().toString()));log.info("訂單創建消息已發送,訂單ID: {}", order.getOrderId());}
}

8.2 應用解耦

場景:電商系統中,訂單系統、庫存系統、支付系統、物流系統需要相互協作,但又要保持松耦合。

使用 RabbitMQ 實現系統間的解耦,每個系統只需要關注自己感興趣的事件:

  • 訂單系統:發送訂單創建、訂單取消事件
  • 庫存系統:監聽訂單創建事件,扣減庫存;監聽訂單取消事件,恢復庫存
  • 支付系統:監聽訂單創建事件,生成支付單
  • 物流系統:監聽支付成功事件,創建物流單

8.3 流量削峰

場景:秒殺活動中,瞬間會有大量請求涌入,可能導致系統過載。

使用 RabbitMQ 作為緩沖,控制請求處理速度:

  1. 前端將秒殺請求發送到 RabbitMQ
  2. 后端消費者以固定速率處理消息,避免系統過載
  3. 對于超出處理能力的請求,直接返回 "秒殺失敗" 或 "排隊中"
/*** 秒殺服務*/
@Service
@Slf4j
public class SeckillService {private final RabbitTemplate rabbitTemplate;private final StringRedisTemplate redisTemplate;// 秒殺商品庫存Redis鍵前綴private static final String SECKILL_STOCK_KEY_PREFIX = "seckill:stock:";@Autowiredpublic SeckillService(RabbitTemplate rabbitTemplate, StringRedisTemplate redisTemplate) {this.rabbitTemplate = rabbitTemplate;this.redisTemplate = redisTemplate;}/*** 提交秒殺請求*/public SeckillResultDTO submitSeckill(SeckillRequestDTO requestDTO) {// 參數校驗Objects.requireNonNull(requestDTO, "秒殺請求參數不能為空");StringUtils.hasText(requestDTO.getUserId(), "用戶ID不能為空");Objects.requireNonNull(requestDTO.getGoodsId(), "商品ID不能為空");log.info("用戶 {} 提交秒殺請求,商品ID: {}", requestDTO.getUserId(), requestDTO.getGoodsId());// 先檢查庫存(Redis中預減庫存)String stockKey = SECKILL_STOCK_KEY_PREFIX + requestDTO.getGoodsId();Long remainStock = redisTemplate.opsForValue().decrement(stockKey);if (remainStock == null || remainStock < 0) {// 庫存不足,恢復計數器if (remainStock != null && remainStock < 0) {redisTemplate.opsForValue().increment(stockKey);}log.warn("商品 {} 庫存不足,用戶 {} 秒殺失敗", requestDTO.getGoodsId(), requestDTO.getUserId());return new SeckillResultDTO(false, "商品已搶完", null);}// 庫存充足,發送秒殺消息到隊列SeckillMessage message = new SeckillMessage();message.setUserId(requestDTO.getUserId());message.setGoodsId(requestDTO.getGoodsId());message.setRequestTime(LocalDateTime.now());String messageId = UUID.randomUUID().toString();rabbitTemplate.convertAndSend(RabbitMqConstant.SECKILL_EXCHANGE,RabbitMqConstant.SECKILL_ROUTING_KEY,message,new CorrelationData(messageId));log.info("用戶 {} 秒殺請求已加入隊列,商品ID: {}", requestDTO.getUserId(), requestDTO.getGoodsId());// 返回排隊中狀態,前端可以輪詢查詢結果return new SeckillResultDTO(true, "正在排隊中", messageId);}
}

九、常見問題與解決方案

9.1 消息丟失問題

可能的原因及解決方案:

  1. 生產者發送消息丟失

    • 啟用生產者確認機制(publisher confirm)
    • 實現消息重發機制
  2. RabbitMQ 服務器丟失消息

    • 配置交換機、隊列、消息的持久化
    • 部署 RabbitMQ 集群,使用鏡像隊列
  3. 消費者處理消息丟失

    • 使用手動確認模式(manual ack)
    • 確保消息處理完成后再確認

9.2 消息重復消費問題

可能的原因及解決方案:

  1. 網絡問題導致確認消息丟失

    • 實現消息冪等性處理
    • 使用消息 ID 去重
  2. 消費者處理超時

    • 合理設置消息處理超時時間
    • 拆分長任務為多個短任務

實現冪等性的示例:

/*** 冪等性處理器*/
@Component
public class IdempotentHandler {private final StringRedisTemplate redisTemplate;// 消息處理記錄前綴private static final String MESSAGE_PROCESSED_PREFIX = "message:processed:";// 鎖前綴private static final String LOCK_PREFIX = "lock:message:";// 鎖過期時間(秒)private static final long LOCK_EXPIRE_SECONDS = 30;@Autowiredpublic IdempotentHandler(StringRedisTemplate redisTemplate) {this.redisTemplate = redisTemplate;}/*** 處理消息,確保冪等性* * @param messageId 消息ID* @param handler 消息處理器* @return 處理結果*/public <T> T handleWithIdempotency(String messageId, Supplier<T> handler) {StringUtils.hasText(messageId, "消息ID不能為空");Objects.requireNonNull(handler, "消息處理器不能為空");// 檢查消息是否已處理String processedKey = MESSAGE_PROCESSED_PREFIX + messageId;Boolean isProcessed = redisTemplate.hasKey(processedKey);if (Boolean.TRUE.equals(isProcessed)) {log.info("消息 {} 已處理,跳過重復處理", messageId);return null;}// 獲取分布式鎖String lockKey = LOCK_PREFIX + messageId;Boolean locked = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", LOCK_EXPIRE_SECONDS, TimeUnit.SECONDS);if (Boolean.FALSE.equals(locked)) {log.warn("消息 {} 獲取鎖失敗,可能有其他進程正在處理", messageId);return null;}try {// 再次檢查,防止并發問題isProcessed = redisTemplate.hasKey(processedKey);if (Boolean.TRUE.equals(isProcessed)) {log.info("消息 {} 已處理,跳過重復處理", messageId);return null;}// 處理消息T result = handler.get();// 標記消息已處理,設置過期時間(例如24小時)redisTemplate.opsForValue().set(processedKey, "1", 24, TimeUnit.HOURS);return result;} finally {// 釋放鎖redisTemplate.delete(lockKey);}}
}

9.3 消息堆積問題

可能的原因及解決方案:

  1. 生產者發送速度過快

    • 實現流量控制
    • 使用令牌桶算法限制發送速度
  2. 消費者處理速度過慢

    • 增加消費者數量
    • 優化消費邏輯,提高處理速度
    • 拆分大消息為小消息
  3. 系統資源不足

    • 增加服務器資源
    • 優化 JVM 參數

監控消息堆積情況:

/*** 消息隊列監控服務*/
@Component
@Slf4j
public class QueueMonitorService {private final RabbitAdmin rabbitAdmin;// 隊列最大消息數閾值private static final int QUEUE_MAX_MESSAGE_THRESHOLD = 10000;@Autowiredpublic QueueMonitorService(RabbitAdmin rabbitAdmin) {this.rabbitAdmin = rabbitAdmin;}/*** 檢查隊列消息堆積情況*/@Scheduled(fixedRate = 60000) // 每分鐘檢查一次public void checkQueueMessageCount() {log.info("開始檢查隊列消息堆積情況");// 獲取所有隊列List<Queue> queues = rabbitAdmin.getQueues();if (CollectionUtils.isEmpty(queues)) {log.info("沒有找到任何隊列");return;}for (Queue queue : queues) {// 獲取隊列信息QueueInformation queueInfo = rabbitAdmin.getQueueInfo(queue.getName());if (queueInfo == null) {log.warn("獲取隊列 {} 信息失敗", queue.getName());continue;}// 隊列消息數long messageCount = queueInfo.getMessageCount();log.info("隊列 {} 當前消息數: {}", queue.getName(), messageCount);// 如果消息數超過閾值,發送告警if (messageCount > QUEUE_MAX_MESSAGE_THRESHOLD) {log.error("隊列 {} 消息堆積嚴重,當前消息數: {},閾值: {}",queue.getName(), messageCount, QUEUE_MAX_MESSAGE_THRESHOLD);// 發送告警(可以是郵件、短信、企業微信等)sendQueueAlert(queue.getName(), messageCount);}}log.info("隊列消息堆積情況檢查完成");}/*** 發送隊列告警*/private void sendQueueAlert(String queueName, long messageCount) {// 實現告警邏輯// 例如:調用告警服務發送通知}
}

十、總結與展望

RabbitMQ 作為一款成熟穩定的消息中間件,憑借其靈活的路由機制、可靠的消息傳遞和豐富的高級特性,在分布式系統中得到了廣泛應用。本文從核心概念、環境搭建、交換機類型、高級特性、集群部署、性能優化等多個方面詳細介紹了 RabbitMQ,并通過實例代碼展示了其在實際項目中的應用。

隨著微服務架構的普及,消息隊列作為服務間通信的重要手段,其作用將更加凸顯。RabbitMQ 社區也在不斷發展,未來可能會推出更多新特性,如更好的流處理支持、更優的性能等。

對于開發者而言,掌握 RabbitMQ 不僅能夠解決實際項目中的問題,還能加深對分布式系統設計思想的理解。在使用 RabbitMQ 時,應根據具體業務場景選擇合適的交換機類型和高級特性,同時關注消息的可靠性、一致性和系統的性能。

    參考資料

    1. RabbitMQ 官方文檔:RabbitMQ Documentation | RabbitMQ
    2. Spring AMQP 官方文檔:https://docs.spring.io/spring-amqp/docs/current/reference/html/
    3. 《RabbitMQ 實戰指南》,朱忠華著
    4. 《Spring Cloud 微服務實戰》,翟永超
    5. RabbitMQ GitHub 倉庫:https://github.com/rabbitmq/rabbitmq-server

    本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
    如若轉載,請注明出處:http://www.pswp.cn/pingmian/94912.shtml
    繁體地址,請注明出處:http://hk.pswp.cn/pingmian/94912.shtml
    英文地址,請注明出處:http://en.pswp.cn/pingmian/94912.shtml

    如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

    相關文章

    寬帶有丟包,重傳高的情況怎么優化

    寬帶丟包和重傳率高是一個非常影響網絡體驗的常見問題。它會導致游戲卡頓、視頻通話模糊、網頁加載慢等。別擔心&#xff0c;我們可以按照從易到難的順序&#xff0c;系統地排查和優化。請遵循以下步驟&#xff1a;第一步&#xff1a;基礎排查&#xff08;自己動手&#xff0c;…

    Kotlin 協程之Channel 的高階應用

    前言 了解了 Channel 的基礎概念和基本使用 后&#xff0c;我們再來看一看 Channel 的特性以及高階應用。 Channel 是"熱流" 熱流概念 Channel 是熱流&#xff08;Hot Stream&#xff09;&#xff0c;具備以下特性&#xff1a; 數據的生產和消費是兩套獨立的流程 …

    PostgreSQL表空間(Tablespace)作用(管理數據庫對象的存儲位置)(pg_default、pg_global)

    文章目錄**1. 靈活的數據存儲管理**- **邏輯與物理分離**&#xff1a;表空間為數據庫對象&#xff08;如表、索引&#xff09;提供了一個邏輯名稱與物理存儲路徑的映射。用戶無需直接操作底層文件路徑&#xff0c;只需通過表空間名稱管理數據。- **多數據庫共享表空間**&#x…

    Ansible 核心運維場景落地:YUM 倉庫、SSH 公鑰、固定 IP 配置技巧

    1&#xff1a;如何一次性驗證所有主機能否被 Ansible 訪問&#xff1f; 答&#xff1a;使用臨時命令&#xff1a;ansible all -m ansible.builtin.ping或驗證 sudo 是否正常&#xff1a;ansible all -m ansible.builtin.ping --become -K2&#xff1a;如何用 Ansible 統一配置…

    rman導致的報錯ORA-27037: unable to obtain file status

    有套3節點的11204集群環境&#xff0c;在db2上配置了rman備份&#xff0c;今天例行檢查時發現db1和db3上不定期有報錯&#xff0c;報錯如下&#xff1a;Control file backup creation failed:failure to open backup target file /u01/app/oracle/product/11.2.0/db_1/dbs/snap…

    Kubernetes 與 GitOps 的深度融合實踐指南

    前言&#xff1a;在云原生技術飛速發展的今天&#xff0c;Kubernetes&#xff08;簡稱 K8s&#xff09;已成為容器編排領域的事實標準&#xff0c;而 GitOps 作為一種基于 Git 的云原生運維理念&#xff0c;正與 K8s 深度融合&#xff0c;為企業實現自動化、可追溯、可審計的應…

    REST-assured 接口測試編寫指南

    REST-assured 簡介 REST-assured 是一個基于 Java 的 DSL&#xff08;領域特定語言&#xff09;庫&#xff0c;專門用于簡化 RESTful API 測試的編寫。它提供了流暢的 API 接口&#xff0c;使得測試代碼更加易讀易寫&#xff0c;支持 JSON 和 XML 等多種響應格式的驗證。 基本環…

    內網應用如何實現外網訪問?外地通過公網地址訪問內網服務器的設置方法

    一、內網應用程序在外網需要連接訪問遇到的問題我們經常需要在內網中部署服務&#xff0c;比如一個 Web 服務器或者數據庫&#xff0c;但由于本地沒有公網IP&#xff0c;這些服務無法直接從外地公網訪問。如自己家里的監控系統&#xff0c;在家時能查看&#xff0c;但出門在外就…

    ubuntu24.04 QT中配置opencv4.12

    假如生成的opencv路徑是&#xff1a;/usr/local/opencv4.12QT core guigreaterThan(QT_MAJOR_VERSION, 4): QT widgetsCONFIG c17# You can make your code fail to compile if it uses deprecated APIs. # In order to do so, uncomment the following line. #DEFINE…

    客戶端是否都需要主動發送`FindService`報文來尋找服務

    <摘要> 在AUTOSAR SOME/IP-SD的服務發現流程中&#xff0c;客戶端是否需要主動發送FindService報文來尋找服務&#xff0c;是理解服務訂閱邏輯的一個關鍵。這將直接影響到事件組訂閱的觸發時機和網絡行為。下文將結合規范&#xff0c;對這一問題進行深入剖析。 <解析&…

    Go語言流式輸出實戰:構建高性能實時應用

    什么是流式輸出&#xff1f; 流式輸出&#xff08;Streaming Output&#xff09;是一種服務器推送技術&#xff0c;允許數據在生成過程中逐步發送到客戶端&#xff0c;而不是等待所有數據準備就緒后一次性發送。這種技術顯著改善了用戶體驗&#xff0c;特別是在處理大量數據或長…

    操作系統上的Docker安裝指南:解鎖容器化新世界

    摘要&#xff1a;本文詳細介紹了Docker在不同操作系統上的安裝方法。主要內容包括&#xff1a;Windows系統通過Docker Desktop安裝&#xff0c;需啟用Hyper-V和WSL2&#xff1b;Mac系統同樣使用Docker Desktop&#xff0c;根據芯片類型選擇版本&#xff1b;Linux系統以Ubuntu為…

    【微信小程序】分別解決H5的跨域代理問題 和小程序正常不需要代理問題

    ——總問&#xff1a;何為跨域和代理&#xff1f; &#x1f539;什么叫跨域&#xff1f; 前端在瀏覽器里發請求時&#xff0c;如果 域名 / 協議 / 端口 三個中有一個不一樣&#xff0c;就會觸發 跨域問題。 例子&#xff1a; 頁面跑在 http://localhost:5173你要請求接口 http:…

    數字簽名 digital signature

    文章目錄1、嚴謹的定義2、技術原理&#xff1a;如何工作&#xff1f;第一步&#xff1a;發送者 - 簽名過程第二步&#xff1a;接收者 - 簽名驗證過程3、C語言實現示例4、關鍵技術要點5、安全注意事項6、最重要的應用&#xff1a;TLS/SSL 與網站安全1、嚴謹的定義 數字簽名是一…

    對于STM32工程模板

    工程模板文件下載鏈接 https://download.csdn.net/download/qq_58631644/91809234 重命名 打開這個文件夾 重命名保持一致 雙擊打開

    使用 SmartIDE 開發長安鏈 Go 語言智能合約

    文章目錄官方文檔Chrome 插件登錄 SmartIDE合約調試合約編譯官方文檔 使用SmartIDE編寫Go智能合約 Chrome 插件 https://git.chainmaker.org.cn/chainmaker/chainmaker-smartplugin/-/releases 登錄 SmartIDE https://ide.chainmaker.org.cn/ 合約調試 合約編譯

    MEM課程之物流與供應鏈管理課程經典案例及分析-個人原創內容放在此保存

    供應鏈管理課程案例 特殊時期期間,美國出現養豬戶對數百萬頭豬實施安樂死和奶農傾倒牛奶現象。從供應鏈的角度分析該現象并提出應對思路。要求有分析框架和文獻支撐。 供應鏈管理課程案例分析 從供應鏈角度分析特殊時期美國豬安樂死和傾倒牛奶現象 本文描述了特殊時期期間,美…

    Transformer:從入門到精通

    學習一個深度學習模型&#xff0c;我們首先需要從理論的角度理解它的構架&#xff0c;進而理解代碼。 Transformer背景 首先我們知道&#xff0c;神經網絡有一個巨大的家族&#xff0c;其中的CNN&#xff08;卷積神經網絡&#xff09;源于視覺研究&#xff0c;目標是讓機器自…

    FOC開環控制代碼解讀

    這段代碼實現了一個開環速度控制系統&#xff0c;用于控制電機轉速。它通過PWM控制器輸出電壓信號&#xff0c;來驅動電機轉動。具體來說&#xff0c;它在指定目標速度下&#xff0c;持續通過電壓信號進行控制。下面是對該代碼詳細流程的逐步解析&#xff1a; 1. 宏定義與變量初…

    Ansible Playbook 調試與預演指南:從語法檢查到連通性排查

    1&#xff1a;調試 playbook 時&#xff0c;最該先看哪一段輸出&#xff1f; 答&#xff1a;先查看ansible-navigator run的 PLAY RECAP 段落&#xff0c;它能一次性給出每臺受管主機的 ok、changed、unreachable、failed、skipped、rescued、ignored 等計數&#xff0c;快速定…