RabbitMQ 知識詳解(Java版)
RabbitMQ 是一個開源的消息代理,實現了高級消息隊列協議(AMQP)。它用于在分布式系統中實現應用解耦、異步通信和流量削峰。
核心概念
- 生產者(Producer):發送消息的應用
- 消費者(Consumer):接收消息的應用
- 隊列(Queue):消息存儲的緩沖區
- 交換機(Exchange):接收消息并路由到隊列
- 綁定(Binding):連接交換機和隊列的規則
- 路由鍵(Routing Key):消息的路由標識
交換機類型
類型 | 路由規則 | 典型用途 |
---|---|---|
Direct | 精確匹配Routing Key | 點對點通信 |
Topic | 模式匹配(支持通配符) | 多條件路由 |
Fanout | 廣播到所有綁定隊列 | 發布/訂閱 |
Headers | 消息頭鍵值對匹配 | 復雜路由 |
Java 示例(使用官方客戶端)
依賴:
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.9.0</version>
</dependency>
<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-nop</artifactId><version>1.7.30</version>
</dependency>
1. 直連交換機(Direct Exchange)
// Producer
public class DirectExchangeProducer {private static final String EXCHANGE_NAME = "direct_exchange";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "direct");// 綁定不同路由鍵String routingKey1 = "red";String message1 = "重要消息";channel.basicPublish(EXCHANGE_NAME, routingKey1, null, message1.getBytes());System.out.println("發送消息: " + message1);String routingKey2 = "blue";String message2 = "普通消息";channel.basicPublish(EXCHANGE_NAME, routingKey2, null, message2.getBytes());System.out.println("發送消息: " + message2);}}
}// Consumer (紅色隊列)
public class DirectConsumerRed {private static final String EXCHANGE_NAME = "direct_exchange";private static final String QUEUE_NAME = "red_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "direct");channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "red");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("紅色隊列收到消息: " + message);};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}
2. 扇出交換機(Fanout Exchange)
// Producer
public class FanoutExchangeProducer {private static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String message = "廣播消息";channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println("廣播消息已發送");}}
}// Consumer (郵件隊列)
public class FanoutConsumerEmail {private static final String EXCHANGE_NAME = "fanout_exchange";private static final String QUEUE_NAME = "email_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("郵件服務收到: " + message);};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}
3. 主題交換機(Topic Exchange)
// Producer
public class TopicExchangeProducer {private static final String EXCHANGE_NAME = "topic_exchange";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "topic");// 發送不同主題的消息String routingKey = "order.create";String message = "訂單創建通知";channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());System.out.println("發送訂單創建消息");routingKey = "user.login";message = "用戶登錄通知";channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());System.out.println("發送用戶登錄消息");}}
}// Consumer (訂單服務)
public class TopicConsumerOrder {private static final String EXCHANGE_NAME = "topic_exchange";private static final String QUEUE_NAME = "order_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "topic");channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "order.*");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("訂單服務收到: " + message);};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}
4. 頭交換機(Headers Exchange)
// Producer
public class HeadersExchangeProducer {private static final String EXCHANGE_NAME = "headers_exchange";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "headers");// 設置消息頭Map<String, Object> headers = new HashMap<>();headers.put("type", "log");headers.put("level", "error");AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().headers(headers).build();String message = "系統錯誤日志";channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());System.out.println("發送錯誤日志消息");}}
}// Consumer (日志服務)
public class HeadersConsumerLog {private static final String EXCHANGE_NAME = "headers_exchange";private static final String QUEUE_NAME = "log_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "headers");channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 設置匹配規則 (必須包含type=log且level=error)Map<String, Object> bindingArgs = new HashMap<>();bindingArgs.put("x-match", "all"); // 全部匹配bindingArgs.put("type", "log");bindingArgs.put("level", "error");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "", bindingArgs);DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("日志服務收到: " + message);};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}
通用配置說明
-
交換機類型總結:
- 直連交換機:路由鍵精確匹配
- 扇出交換機:忽略路由鍵,廣播所有綁定隊列
- 主題交換機:使用通配符匹配路由鍵
- 頭交換機:通過消息頭屬性匹配(非路由鍵)
-
重要參數:
channel.queueDeclare()
參數說明:- durable: 是否持久化
- exclusive: 是否排他
- autoDelete: 是否自動刪除
x-match
參數在頭交換機中有兩種模式:- “all”: 需匹配所有指定頭
- “any”: 匹配任意指定頭
關鍵特性(Java實現)
1. 消息持久化
// 聲明持久化隊列
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);// 發送持久化消息
channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
2. 公平分發(Prefetch)
// 每次只分發一條消息
int prefetchCount = 1;
channel.basicQos(prefetchCount);
3. 消息確認(ACK)
// 消費者關閉自動ACK
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag -> {});// 處理完成后手動ACK
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
4. 持久化消費者
// 重啟后自動恢復的消費者
Map<String, Object> args = new HashMap<>();
args.put("x-queue-type", "quorum");
channel.queueDeclare("persistent_queue", true, false, false, args);
使用場景
- 服務解耦:訂單系統與庫存系統分離
- 異步處理:耗時操作(如郵件發送)
- 流量削峰:突發請求緩沖(秒殺系統)
- 分布式事務:最終一致性實現
- 日志收集:多系統日志聚合
最佳實踐
- 連接管理:使用連接池(如Spring AMQP的CachingConnectionFactory)
- 異常處理:實現Consumer和Connection的監聽器
- 死信隊列:處理失敗消息
- 集群部署:保證高可用性
- 監控管理:使用RabbitMQ Management Plugin
提示:生產環境推薦使用Spring AMQP簡化開發,它提供了RabbitTemplate和@RabbitListener等便捷工具。
建議運行測試時:
- 先啟動所有消費者
- 再運行生產者發送消息
- 觀察各消費者接收到的消息是否符合路由規則
以上示例展示了RabbitMQ的核心路由機制,在實際生產環境中需添加異常處理、連接恢復、消息確認等機制。