????????案例場景:用戶下單后需要多個微服務(如營銷、會員)分別訂閱并處理訂單事件,且每個微服務可能有多個集群實例,需要保證同一個微服務的集群中,只有一個實例消費到消息。
????????不同于Kafka和rocketMQ有分組消費的功能,rabbitMQ需要通過topic Exchange實現。
? ? ? ? 1、消息設計
{"event_type": "order_created", // 事件類型(如下單、支付成功)"order_id": "123456","user_id": "user_789","items": [{"product_id": "p1001", "quantity": 2},{"product_id": "p1002", "quantity": 1}],"total_amount": 399.00,"created_at": "2025-07-01T12:00:00Z"
}
? ? ? ? 2、rabbitMQ設計
????????使用 topic 類型的 Exchange:支持靈活的路由規則,不同微服務可通過 binding key 訂閱特定事件。
Exchange 名稱示例:order_events。
微服務 隊列名 綁定鍵(binding key)
營銷系統 marketing_queue order.created.marketing
會員系統 member_queue order.created.member綁定鍵規則:<event_type>.<microservice_name>,便于后續擴展(如 order.cancel.marketing)。
? ? ? ? 3、消息發布
????????生產者(訂單服務)發布消息時,指定路由鍵(routing key) 為 order.created
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.fasterxml.jackson.databind.ObjectMapper;public class OrderProducer {private static final String EXCHANGE_NAME = "order_events";private static final String ROUTING_KEY = "order.created";public static void main(String[] args) throws Exception {// 1. 創建連接ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setUsername("guest");factory.setPassword("guest");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 2. topic Exchangechannel.exchangeDeclare(EXCHANGE_NAME, "topic", true);// 3. 構造消息OrderEvent event = new OrderEvent();event.setEvent_type("order_created");event.setOrder_id("123456");event.setUser_id("user_789");event.setItems(List.of(Map.of("product_id", "p1001", "quantity", 2)));event.setTotal_amount(399.00);event.setCreated_at("2025-07-02T22:00:00Z");ObjectMapper mapper = new ObjectMapper();String messageJson = mapper.writeValueAsString(event);// 4. 發布消息channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,true, // 持久化消息new AMQP.BasicProperties.Builder().deliveryMode(2) // 持久化.build(),messageJson.getBytes());System.out.println("Sent order event: " + messageJson);}}
}
? ? ? ? 4、消息訂閱(營銷服務)
? ? ? ? 不同服務只需要修改?QUEUE_NAME 和 BINDING_KEY 就可以了。?
import com.rabbitmq.client.*;
import com.fasterxml.jackson.databind.ObjectMapper;public class MarketingConsumer {private static final String QUEUE_NAME = "marketing_queue";private static final String BINDING_KEY = "order.created.marketing";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setUsername("guest");factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 1. 聲明隊列并綁定到 Exchangechannel.queueDeclare(QUEUE_NAME, true, false, false, null);channel.queueBind(QUEUE_NAME, "order_events", BINDING_KEY);// 2. 消費者回調DeliverCallback deliverCallback = (consumerTag, delivery) -> {String messageJson = new String(delivery.getBody(), "UTF-8");ObjectMapper mapper = new ObjectMapper();OrderEvent event = mapper.readValue(messageJson, OrderEvent.class);// 3. 業務邏輯:發送營銷短信System.out.println("Marketing: Sending SMS for order " + event.getOrder_id());// 4. 手動確認消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 5. 設置手動確認channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});}
}