
??謝謝大家捧場,祝屏幕前的小伙伴們每天都有好運相伴左右,一定要天天開心哦!???
🎈🎈作者主頁: 喔的嘛呀🎈🎈
?? 帥哥美女們,我們共同加油!一起進步!???
目錄
引言
一. 選擇合適的消息中間件
二. 定義消息格式和通信協議
1. 定義消息格式
消息頭
消息體
2. 定義通信協議
發送消息
接收消息
消息處理
3. 示例代碼
定義消息格式
發送消息
接收消息
三、發布-訂閱模式
1. 定義發布-訂閱模式
2. 示例代碼
發布消息
訂閱消息
3. 運行示例
4. 異步處理消息
5. 解耦系統
6. 實現步驟
7. 實例場景
實例場景:電商系統訂單處理
場景描述
實現步驟
示例代碼
訂單服務發送消息
庫存服務接收消息
物流服務接收消息
引言
在現代分布式系統中,異步通信和解耦是非常重要的設計原則。通過使用消息中間件,可以實現系統間的異步通信和解耦,提高系統的可擴展性和可靠性。本文將介紹如何使用消息中間件來實現系統間的異步通信和解耦,并通過一個實際場景來演示。
一. 選擇合適的消息中間件
選擇合適的消息中間件需要考慮多個因素,包括項目需求、性能要求、可靠性、社區支持等。常見的消息中間件包括 RabbitMQ、Kafka、ActiveMQ、Redis 等,下面針對不同的需求給出一些選擇建議:
-
消息傳遞模式:
- 點對點:適合使用 RabbitMQ、ActiveMQ 等傳統消息中間件。
- 發布-訂閱:適合使用 RabbitMQ、Kafka 等支持廣播消息的中間件。
-
可靠性:
- 如果對消息的可靠性要求較高,需要確保消息不會丟失,可以考慮使用 RabbitMQ、Kafka 等提供消息持久化和高可靠性的中間件。
-
性能:
- 如果需要處理大量的消息并且需要低延遲,可以考慮使用 Kafka,它是一個高吞吐量的消息中間件,適合大數據場景。
- 如果對延遲要求較低,可以選擇 RabbitMQ、ActiveMQ 等傳統消息中間件。
-
社區支持和生態系統:
- 考慮選擇一個有活躍社區支持和完善生態系統的消息中間件,這樣可以更容易地解決問題和擴展功能。
-
技術棧兼容性:
- 考慮選擇一個與你的技術棧兼容的消息中間件,避免出現集成上的問題。
綜合考慮以上因素,可以選擇最適合項目需求的消息中間件。
二. 定義消息格式和通信協議
定義消息格式和通信協議是使用消息中間件的關鍵步驟之一,它涉及到消息的結構、內容和交互方式。下面以 RabbitMQ 為例,演示如何定義消息格式和通信協議。
1. 定義消息格式
在 RabbitMQ 中,消息通常由兩部分組成:消息頭和消息體。消息頭包含一些元數據信息,如消息的類型、路由鍵等;消息體包含實際的業務數據。
消息頭
Content-Type
:消息體的類型,如application/json
、text/plain
等。DeliveryMode
:消息持久性標志,標識消息是否需要持久化存儲,可選值為1
(持久化)和2
(非持久化)。CorrelationId
:消息關聯標識,用于關聯一組相關消息。- 其他自定義的消息頭字段,根據業務需求定義。
消息體
- 消息體可以是任意格式的數據,如 JSON、XML、文本等,根據業務需求定義。
2. 定義通信協議
通信協議定義了消息的交互方式,包括消息的發送、接收和處理流程。通信協議可以包括以下幾個方面:
發送消息
- 客戶端向消息隊列發送消息,包括指定交換機(Exchange)、路由鍵(Routing Key)和消息體。
接收消息
- 服務端從消息隊列接收消息,根據消息的交換機和路由鍵接收對應的消息。
消息處理
- 客戶端接收到消息后,根據消息的內容執行相應的業務邏輯。
3. 示例代碼
定義消息格式
public class Message {private String content;private String contentType;private int deliveryMode;private String correlationId;// 省略getter和setter方法
}
發送消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class SendMessage {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);Message message = new Message();message.setContent("Hello, RabbitMQ!");message.setContentType("text/plain");message.setDeliveryMode(1); // 持久化message.setCorrelationId("123456");String messageJson = toJson(message);channel.basicPublish("", QUEUE_NAME, null, messageJson.getBytes());System.out.println(" [x] Sent '" + messageJson + "'");}}private static String toJson(Message message) {// 將 message 對象轉換成 JSON 格式的字符串return "{ \"content\": \"" + message.getContent() + "\", \"contentType\": \"" + message.getContentType() + "\", \"deliveryMode\": " + message.getDeliveryMode() + ", \"correlationId\": \"" + message.getCorrelationId() + "\" }";}
}
接收消息
import com.rabbitmq.client.*;public class ReceiveMessage {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Waiting for messages. To exit press Ctrl+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String messageJson = new String(delivery.getBody(), "UTF-8");Message message = fromJson(messageJson, Message.class);System.out.println(" [x] Received '" + messageJson + "'");};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});}}private static <T> T fromJson(String json, Class<T> clazz) {// 將 JSON 格式的字符串轉換成指定類型的對象// 這里可以使用 JSON 框架(如 Jackson、Gson)來實現return null;}
}
通過以上步驟,可以定義消息格式和通信協議,并使用 RabbitMQ 實現消息的發送和接收。
三、發布-訂閱模式
發布-訂閱模式是一種常見的消息傳遞模式,用于實現消息的廣播和訂閱。在發布-訂閱模式中,消息發布者將消息發布到一個主題(Topic),而消息訂閱者可以訂閱感興趣的主題,從而接收到相關消息。下面以 RabbitMQ 為例,演示如何使用發布-訂閱模式。
1. 定義發布-訂閱模式
在發布-訂閱模式中,有一個交換機(Exchange)用來接收發布者發布的消息,并根據訂閱者的綁定關系將消息路由到對應的隊列。訂閱者可以創建自己的隊列,并將隊列綁定到交換機上,從而接收到發布者發布的消息。
2. 示例代碼
發布消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Publisher {private final static String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String message = "Hello, subscribers!";channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}}
}
訂閱消息
import com.rabbitmq.client.*;public class Subscriber {private final static String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println(" [*] Waiting for messages. To exit press Ctrl+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}}
}
3. 運行示例
- 先運行訂閱者
Subscriber
,它會創建一個隊列并綁定到交換機上,開始監聽消息。 - 然后運行發布者
Publisher
,它會向交換機發布一條消息。 - 訂閱者會接收到發布者發布的消息,并輸出到控制臺。
通過以上步驟,可以實現基于 RabbitMQ 的發布-訂閱模式。
4. 異步處理消息
通過消息中間件實現異步處理消息,即發送消息后不需要立即等待結果,而是繼續執行其他任務。這樣可以提高系統的響應速度和吞吐量。
5. 解耦系統
通過消息中間件,系統之間的通信變成了基于消息的方式,系統不再直接依賴于對方的接口和實現細節,從而實現了系統之間的解耦。
6. 實現步驟
- 定義消息格式和通信協議:確定消息的格式和通信協議,包括消息的內容結構、消息的生命周期等。
- 配置消息中間件:在系統中配置和啟動消息中間件,確保消息中間件正常運行。
- 消息的發布和訂閱:編寫代碼實現消息的發布和訂閱邏輯,將消息發布到指定的主題,并訂閱感興趣的主題。
- 處理接收到的消息:編寫代碼處理接收到的消息,根據消息的內容執行相應的業務邏輯。
- 測試和驗證:對系統進行測試和驗證,確保消息的發布、訂閱和處理功能正常運行。
7. 實例場景
實例場景:電商系統訂單處理
場景描述
假設有一個電商系統,包含訂單服務、庫存服務和物流服務。當用戶下單時,訂單服務需要通知庫存服務減少庫存,通知物流服務發貨。為了提高系統的可擴展性和可靠性,我們可以使用消息中間件來實現訂單處理的異步通信和解耦。
實現步驟
-
定義消息格式和通信協議:定義訂單消息的格式,包括訂單號、商品信息等,并確定消息的交換機和隊列名稱。
-
配置消息中間件:在消息中間件中配置交換機和隊列,并確保消息的持久化。
-
訂單服務發送消息:訂單服務在用戶下單后,將訂單消息發送到消息隊列中。
-
庫存服務訂閱消息:庫存服務訂閱訂單消息隊列,接收并處理訂單消息,減少庫存。
-
物流服務訂閱消息:物流服務也訂閱訂單消息隊列,接收并處理訂單消息,進行發貨。
示例代碼
訂單服務發送消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class OrderService {private static final String EXCHANGE_NAME = "orders";private static final String QUEUE_NAME = "order_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "fanout");channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "New order placed";channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}}
}
庫存服務接收消息
import com.rabbitmq.client.*;public class InventoryService {private static final String EXCHANGE_NAME = "orders";private static final String QUEUE_NAME = "order_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "fanout");channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");System.out.println(" [*] Waiting for orders. To exit press Ctrl+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");// 處理訂單消息,減少庫存};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });}}
}
物流服務接收消息
import com.rabbitmq.client.*;public class LogisticsService {private static final String EXCHANGE_NAME = "orders";private static final String QUEUE_NAME = "order_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "fanout");channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");System.out.println(" [*] Waiting for orders. To exit press Ctrl+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");// 處理訂單消息,發貨};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });}}
}
通過以上步驟的簡單演示,訂單服務可以異步發送訂單消息,庫存服務和物流服務可以訂閱訂單消息并處理,實現了訂單處理的異步通信和解耦。
通過以上步驟,可以使用消息中間件實現系統間的異步通信和解耦,提高系統的可擴展性和可維護性。