RabbitMQ 是一款基于 AMQP(Advanced Message Queuing Protocol)協議的開源消息中間件,主要用于實現分布式系統中的消息傳遞,支持異步通信、系統解耦、流量削峰等場景。在 Java 生態中,RabbitMQ 被廣泛應用,其 Java 客戶端提供了簡潔的 API,方便開發者快速集成。
AMQP 協議
核心概念
1.?消息模型
- 生產者(Producer):發送消息的應用
- 消費者(Consumer):接收消息的應用
- 消息中間件(Broker):負責接收、存儲和轉發消息
2.?核心組件
AMQP(Advanced Message Queuing Protocol)是一種開放標準的應用層協議,專為消息隊列設計。它定義了客戶端與消息中間件之間的通信規范,確保不同廠商的實現可以互操作。
+----------+ +---------+ +----------+
| Producer | -> | Exchange| -> | Queue | -> Consumer
+----------+ +---------+ +----------+|v+---------+| Binding |+---------+
- Exchange(交換器)
接收生產者的消息
根據規則(Binding)將消息路由到隊列
類型包括:Direct、Topic、Fanout、Headers
- Queue(隊列)
存儲消息直到被消費
支持多個消費者競爭消費
消息可持久化存儲
- Binding(綁定)
定義 Exchange 與 Queue 之間的關聯
通過 Binding Key(綁定鍵)和 Routing Key(路由鍵)匹配
工作流程
1.生產者發送消息
????????指定消息的 Routing Key
????????將消息發送到特定的 Exchange
2.Exchange 路由邏輯
????????Direct Exchange:按 Routing Key 精確匹配
????????Topic Exchange:按 Routing Key 的模式匹配(支持*和#通配符)
????????Fanout Exchange:將消息廣播到所有綁定的隊列
????????Headers Exchange:按消息頭部屬性匹配
3.消費者接收消息
????????從隊列中拉取或訂閱消息
????????處理完成后發送確認(ACK)
RabbitMQ 核心概念
組件 | 作用 |
生產者(Producer) | 消息的發送方,負責創建并發送消息到 RabbitMQ 服務器。 |
消費者(Consumer) | 消息的接收方,監聽隊列并處理接收到的消息。 |
隊列(Queue) | 消息的存儲容器,位于 RabbitMQ 服務器中,消息最終會被投遞到隊列中等待消費。 |
交換機(Exchange) | 接收生產者發送的消息,并根據綁定規則(Binding)將消息路由到對應的隊列。 |
綁定(Binding) | 定義交換機與隊列之間的關聯關系,包含路由鍵(Routing Key)和匹配規則。 |
路由鍵(Routing Key) | 生產者發送消息時指定的鍵,交換機根據該鍵和綁定規則路由消息。 |
RabbitMQ 消息流轉流程
- 生產者發送消息時,需指定交換機名稱和路由鍵;
- 交換機根據自身類型(如 Direct、Topic 等)和綁定規則,將消息轉發到匹配的隊列;
- 消費者監聽隊列,獲取并處理消息。
Java 操作 RabbitMQ 基礎示例
1. 連接 RabbitMQ 服務器
所有操作的前提是建立與 RabbitMQ 的連接,需指定服務器地址、端口、賬號密碼(默認賬號guest僅允許本地連接,遠程連接需配置新用戶)。
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class RabbitMQConnection {// RabbitMQ連接配置private static final String HOST = "localhost"; // 服務器地址private static final int PORT = 5672; // 默認端口private static final String USERNAME = "guest";private static final String PASSWORD = "guest";// 獲取連接public static Connection getConnection() throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost(HOST);factory.setPort(PORT);factory.setUsername(USERNAME);factory.setPassword(PASSWORD);return factory.newConnection();}
}
2. 生產者發送消息
- 創建連接和通道(Channel);
- 聲明交換機(可選,若使用默認交換機則無需聲明);
- 聲明隊列(指定隊列名稱、是否持久化等);
- 綁定交換機與隊列(若使用自定義交換機);
- 發送消息(指定交換機、路由鍵、消息內容)。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Producer {// 隊列名稱(需與消費者一致)private static final String QUEUE_NAME = "java_rabbitmq_queue";public static void main(String[] args) throws Exception {// 1. 獲取連接Connection connection = RabbitMQConnection.getConnection();// 2. 創建通道(RabbitMQ的操作大多通過通道完成)Channel channel = connection.createChannel();// 3. 聲明隊列(參數:隊列名、是否持久化、是否排他、是否自動刪除、附加參數)channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 4. 消息內容String message = "Hello, RabbitMQ from Java!";// 5. 發送消息(參數:交換機名、路由鍵、消息屬性、消息字節數組)// 此處使用默認交換機(""),路由鍵需與隊列名一致channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("生產者發送消息:" + message);// 6. 關閉資源channel.close();connection.close();}
}
3. 消費者接收消息
- 創建連接和通道;
- 聲明隊列(需與生產者隊列名一致);
- 定義消息處理邏輯(通過DefaultConsumer回調);
- 開啟消費(指定隊列、是否自動確認消息)。
import com.rabbitmq.client.*;
import java.io.IOException;public class Consumer {private static final String QUEUE_NAME = "java_rabbitmq_queue";public static void main(String[] args) throws Exception {// 1. 獲取連接Connection connection = RabbitMQConnection.getConnection();// 2. 創建通道Channel channel = connection.createChannel();// 3. 聲明隊列(需與生產者一致,重復聲明不會報錯)channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println("消費者已啟動,等待接收消息...");// 4. 定義消息處理回調DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("消費者接收消息:" + message);};// 5. 開啟消費(參數:隊列名、是否自動確認、消息接收回調、取消消費回調)// 自動確認(autoAck=true):消息被接收后自動從隊列刪除;false則需手動確認channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}
Spring AMQP簡化 RabbitMQ
1.引入依賴:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.配置application.yml:
spring:rabbitmq:host: localhostport: 5673username: guestpassword: guest
3. 生產者(使用RabbitTemplate):
@Autowired
private RabbitTemplate rabbitTemplate;public void sendMessage(String message) {rabbitTemplate.convertAndSend("queue_name", message);
}
4.消費者(使用@RabbitListener注解):
@RabbitListener(queues = "queue_name")
public void receiveMessage(String message) {System.out.println("接收消息:" + message);
}
交換機類型及 Java 實現
1. Direct 交換機(精確匹配)
- 路由規則:消息的路由鍵與綁定的路由鍵完全一致時,消息被路由到對應隊列。
- 適用場景:一對一通信(如訂單通知)。
// 生產者聲明Direct交換機并綁定隊列
String EXCHANGE_NAME = "direct_exchange";
String ROUTING_KEY = "order.notify";
// 聲明Direct交換機(參數:交換機名、類型、是否持久化)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false);
// 綁定交換機與隊列(參數:隊列名、交換機名、路由鍵)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
// 發送消息(指定交換機和路由鍵)
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
2. Topic 交換機(模糊匹配)
- 路由規則:路由鍵支持通配符(*匹配一個單詞,#匹配多個單詞,單詞以.分隔)。
- 適用場景:多規則匹配(如日志分類:log.error、log.warn)。
// 生產者聲明Topic交換機 String EXCHANGE_NAME = "topic_exchange"; // 路由鍵為"log.error"(匹配綁定鍵"log.*"或"log.#") String ROUTING_KEY = "log.error"; channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, false); // 綁定隊列到交換機,綁定鍵為"log.#"(匹配所有以log.開頭的路由鍵) channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "log.#");
3. Fanout 交換機(廣播)
- 路由規則:忽略路由鍵,將消息路由到所有綁定的隊列。
- 適用場景:一對多通信(如廣播通知)。
// 生產者聲明Fanout交換機
String EXCHANGE_NAME = "fanout_exchange";
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, false);
// 綁定多個隊列到交換機(無需指定路由鍵)
channel.queueBind(QUEUE1, EXCHANGE_NAME, "");
channel.queueBind(QUEUE2, EXCHANGE_NAME, "");
// 發送消息(路由鍵無效,可設為空)
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
RabbitMQ 應用場景
- 異步通信:如用戶注冊后異步發送郵件 / 短信通知;
- 系統解耦:訂單系統與庫存系統通過消息通信,避免直接依賴;
- 流量削峰:秒殺場景中,通過隊列緩沖請求,避免服務器過載;
- 日志收集:多服務日志通過 Fanout 交換機廣播到日志處理服務。