基于消息中間件的異步通信機制在系統解耦中的優化與實現


??謝謝大家捧場,祝屏幕前的小伙伴們每天都有好運相伴左右,一定要天天開心哦!???
🎈🎈作者主頁: 喔的嘛呀🎈🎈
?? 帥哥美女們,我們共同加油!一起進步!???

目錄

引言

一. 選擇合適的消息中間件

二. 定義消息格式和通信協議

1. 定義消息格式

消息頭

消息體

2. 定義通信協議

發送消息

接收消息

消息處理

3. 示例代碼

定義消息格式

發送消息

接收消息

三、發布-訂閱模式

1. 定義發布-訂閱模式

2. 示例代碼

發布消息

訂閱消息

3. 運行示例

4. 異步處理消息

5. 解耦系統

6. 實現步驟

7. 實例場景

實例場景:電商系統訂單處理

場景描述

實現步驟

示例代碼

訂單服務發送消息

庫存服務接收消息

物流服務接收消息


引言

在現代分布式系統中,異步通信和解耦是非常重要的設計原則。通過使用消息中間件,可以實現系統間的異步通信和解耦,提高系統的可擴展性和可靠性。本文將介紹如何使用消息中間件來實現系統間的異步通信和解耦,并通過一個實際場景來演示。

一. 選擇合適的消息中間件

選擇合適的消息中間件需要考慮多個因素,包括項目需求、性能要求、可靠性、社區支持等。常見的消息中間件包括 RabbitMQ、Kafka、ActiveMQ、Redis 等,下面針對不同的需求給出一些選擇建議:

  1. 消息傳遞模式

    • 點對點:適合使用 RabbitMQ、ActiveMQ 等傳統消息中間件。
    • 發布-訂閱:適合使用 RabbitMQ、Kafka 等支持廣播消息的中間件。
  2. 可靠性

    • 如果對消息的可靠性要求較高,需要確保消息不會丟失,可以考慮使用 RabbitMQ、Kafka 等提供消息持久化和高可靠性的中間件。
  3. 性能

    • 如果需要處理大量的消息并且需要低延遲,可以考慮使用 Kafka,它是一個高吞吐量的消息中間件,適合大數據場景。
    • 如果對延遲要求較低,可以選擇 RabbitMQ、ActiveMQ 等傳統消息中間件。
  4. 社區支持和生態系統

    • 考慮選擇一個有活躍社區支持和完善生態系統的消息中間件,這樣可以更容易地解決問題和擴展功能。
  5. 技術棧兼容性

    • 考慮選擇一個與你的技術棧兼容的消息中間件,避免出現集成上的問題。

綜合考慮以上因素,可以選擇最適合項目需求的消息中間件。

二. 定義消息格式和通信協議

定義消息格式和通信協議是使用消息中間件的關鍵步驟之一,它涉及到消息的結構、內容和交互方式。下面以 RabbitMQ 為例,演示如何定義消息格式和通信協議。

1. 定義消息格式

在 RabbitMQ 中,消息通常由兩部分組成:消息頭和消息體。消息頭包含一些元數據信息,如消息的類型、路由鍵等;消息體包含實際的業務數據。

消息頭
  • Content-Type:消息體的類型,如 application/jsontext/plain 等。
  • DeliveryMode:消息持久性標志,標識消息是否需要持久化存儲,可選值為 1(持久化)和 2(非持久化)。
  • CorrelationId:消息關聯標識,用于關聯一組相關消息。
  • 其他自定義的消息頭字段,根據業務需求定義。
消息體
  • 消息體可以是任意格式的數據,如 JSON、XML、文本等,根據業務需求定義。

2. 定義通信協議

通信協議定義了消息的交互方式,包括消息的發送、接收和處理流程。通信協議可以包括以下幾個方面:

發送消息
  • 客戶端向消息隊列發送消息,包括指定交換機(Exchange)、路由鍵(Routing Key)和消息體。
接收消息
  • 服務端從消息隊列接收消息,根據消息的交換機和路由鍵接收對應的消息。
消息處理
  • 客戶端接收到消息后,根據消息的內容執行相應的業務邏輯。

3. 示例代碼

定義消息格式
public class Message {private String content;private String contentType;private int deliveryMode;private String correlationId;// 省略getter和setter方法
}
發送消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class SendMessage {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);Message message = new Message();message.setContent("Hello, RabbitMQ!");message.setContentType("text/plain");message.setDeliveryMode(1); // 持久化message.setCorrelationId("123456");String messageJson = toJson(message);channel.basicPublish("", QUEUE_NAME, null, messageJson.getBytes());System.out.println(" [x] Sent '" + messageJson + "'");}}private static String toJson(Message message) {// 將 message 對象轉換成 JSON 格式的字符串return "{ \"content\": \"" + message.getContent() + "\", \"contentType\": \"" + message.getContentType() + "\", \"deliveryMode\": " + message.getDeliveryMode() + ", \"correlationId\": \"" + message.getCorrelationId() + "\" }";}
}
接收消息
import com.rabbitmq.client.*;public class ReceiveMessage {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Waiting for messages. To exit press Ctrl+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String messageJson = new String(delivery.getBody(), "UTF-8");Message message = fromJson(messageJson, Message.class);System.out.println(" [x] Received '" + messageJson + "'");};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});}}private static <T> T fromJson(String json, Class<T> clazz) {// 將 JSON 格式的字符串轉換成指定類型的對象// 這里可以使用 JSON 框架(如 Jackson、Gson)來實現return null;}
}

通過以上步驟,可以定義消息格式和通信協議,并使用 RabbitMQ 實現消息的發送和接收。

三、發布-訂閱模式

發布-訂閱模式是一種常見的消息傳遞模式,用于實現消息的廣播和訂閱。在發布-訂閱模式中,消息發布者將消息發布到一個主題(Topic),而消息訂閱者可以訂閱感興趣的主題,從而接收到相關消息。下面以 RabbitMQ 為例,演示如何使用發布-訂閱模式。

1. 定義發布-訂閱模式

在發布-訂閱模式中,有一個交換機(Exchange)用來接收發布者發布的消息,并根據訂閱者的綁定關系將消息路由到對應的隊列。訂閱者可以創建自己的隊列,并將隊列綁定到交換機上,從而接收到發布者發布的消息。

2. 示例代碼

發布消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Publisher {private final static String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String message = "Hello, subscribers!";channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}}
}
訂閱消息
import com.rabbitmq.client.*;public class Subscriber {private final static String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println(" [*] Waiting for messages. To exit press Ctrl+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}}
}

3. 運行示例

  1. 先運行訂閱者 Subscriber,它會創建一個隊列并綁定到交換機上,開始監聽消息。
  2. 然后運行發布者 Publisher,它會向交換機發布一條消息。
  3. 訂閱者會接收到發布者發布的消息,并輸出到控制臺。

通過以上步驟,可以實現基于 RabbitMQ 的發布-訂閱模式。

4. 異步處理消息

通過消息中間件實現異步處理消息,即發送消息后不需要立即等待結果,而是繼續執行其他任務。這樣可以提高系統的響應速度和吞吐量。

5. 解耦系統

通過消息中間件,系統之間的通信變成了基于消息的方式,系統不再直接依賴于對方的接口和實現細節,從而實現了系統之間的解耦。

6. 實現步驟

  • 定義消息格式和通信協議:確定消息的格式和通信協議,包括消息的內容結構、消息的生命周期等。
  • 配置消息中間件:在系統中配置和啟動消息中間件,確保消息中間件正常運行。
  • 消息的發布和訂閱:編寫代碼實現消息的發布和訂閱邏輯,將消息發布到指定的主題,并訂閱感興趣的主題。
  • 處理接收到的消息:編寫代碼處理接收到的消息,根據消息的內容執行相應的業務邏輯。
  • 測試和驗證:對系統進行測試和驗證,確保消息的發布、訂閱和處理功能正常運行。

7. 實例場景

實例場景:電商系統訂單處理
場景描述

假設有一個電商系統,包含訂單服務、庫存服務和物流服務。當用戶下單時,訂單服務需要通知庫存服務減少庫存,通知物流服務發貨。為了提高系統的可擴展性和可靠性,我們可以使用消息中間件來實現訂單處理的異步通信和解耦。

實現步驟
  1. 定義消息格式和通信協議:定義訂單消息的格式,包括訂單號、商品信息等,并確定消息的交換機和隊列名稱。

  2. 配置消息中間件:在消息中間件中配置交換機和隊列,并確保消息的持久化。

  3. 訂單服務發送消息:訂單服務在用戶下單后,將訂單消息發送到消息隊列中。

  4. 庫存服務訂閱消息:庫存服務訂閱訂單消息隊列,接收并處理訂單消息,減少庫存。

  5. 物流服務訂閱消息:物流服務也訂閱訂單消息隊列,接收并處理訂單消息,進行發貨。

示例代碼
訂單服務發送消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class OrderService {private static final String EXCHANGE_NAME = "orders";private static final String QUEUE_NAME = "order_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "fanout");channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "New order placed";channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}}
}
庫存服務接收消息
import com.rabbitmq.client.*;public class InventoryService {private static final String EXCHANGE_NAME = "orders";private static final String QUEUE_NAME = "order_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "fanout");channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");System.out.println(" [*] Waiting for orders. To exit press Ctrl+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");// 處理訂單消息,減少庫存};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });}}
}
物流服務接收消息
import com.rabbitmq.client.*;public class LogisticsService {private static final String EXCHANGE_NAME = "orders";private static final String QUEUE_NAME = "order_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "fanout");channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");System.out.println(" [*] Waiting for orders. To exit press Ctrl+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");// 處理訂單消息,發貨};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });}}
}

通過以上步驟的簡單演示,訂單服務可以異步發送訂單消息,庫存服務和物流服務可以訂閱訂單消息并處理,實現了訂單處理的異步通信和解耦。

通過以上步驟,可以使用消息中間件實現系統間的異步通信和解耦,提高系統的可擴展性和可維護性。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/13438.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/13438.shtml
英文地址,請注明出處:http://en.pswp.cn/web/13438.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【只會for循環? 來看下, Nodejs中典型的5種循環方式】

Nodejs中的&#xff0c;除了經典的for循環 , 其實還有幾種好用的循環方式&#xff0c; 并有典型的使用場景。下面來一起看下&#x1f447;&#x1f3fb; 5種循環用法 For Loop&#xff1a;這是最常見的循環方式&#xff0c;適用于你知道循環次數的情況。 for (let i 0; i &…

GPT-SoVITS語音克隆部署與使用

GPT-SoVITS是一款強大的少量樣本語音轉換與語音合成開源工具。當前&#xff0c;GPT-SoVITS實現了如下幾個方面的功能&#xff1a; 由參考音頻的情感、音色、語速控制合成音頻的情感、音色、語速可以少量語音微調訓練&#xff0c;也可不訓練直接推理可以跨語種生成&#xff0c;…

【AI】試用 ai 提取文章內容嘗試

電梯產業面臨這樣一個問題&#xff0c;因為太多的品牌&#xff0c;將近 400 多個&#xff0c;甚至有寶馬&#xff0c;奧迪&#xff0c;你敢相信&#xff0c;一家造汽車的造過電梯?不過好像想想也是&#xff0c;電梯是第二大交通工具&#xff0c;電梯從某種意義上來說&#xff…

無網環境禁止 WPS 提示登錄,且基本功能按鈕可用

目前 WPS 升級后&#xff0c;每次打開都會提示你登錄 WPS&#xff0c;并且在未登錄之前所有基本功能按鈕是置灰狀態&#xff0c;無法使用。 如此一來&#xff0c;在內網或無網環境&#xff0c;我們無法登陸 WPS &#xff0c;就給我們的使用帶來了極大的不便&#xff0c;那么有沒…

全網最全面的由淺到深的Kotlin基礎教程(七)

前言 本篇文章接著上一篇文章全網最全面的由淺到深的Kotlin基礎教程&#xff08;六&#xff09;繼續進階學習kotlin&#xff0c;建議看完上一篇文章&#xff0c;再來看本篇文章。本篇主要分析一些常用的kotlin函數源碼&#xff0c;以及用kotlin簡單實現Rxjava的幾個操作符。堅…

卡梅德生物噬菌體展示多肽庫

噬菌體展示多肽文庫在新藥發現領域展現出巨大的潛力和應用價值。卡梅德生物的噬菌體展示肽庫通過將大量肽序列插入到噬菌體基因組中&#xff0c;并在噬菌體表面展示這些肽&#xff0c;構建了一個多樣性的肽庫。 在新藥發現過程中&#xff0c;噬菌體展示多肽文庫可以用于篩選具有…

全志A133 android10 調試vibrator震動馬達

一&#xff0c;前提條件 全志使用的馬達配置為上電震動&#xff0c;下電停止&#xff0c;需測試硬件是否正常。馬達供電最好為獨立供電&#xff0c;避免干擾。 二&#xff0c;適配步驟 1. dts中增加馬達配置 motor_para {compatible "allwinner,sunxi-vibrator";…

java工廠模式示例代碼

工廠模式是一種創建型設計模式&#xff0c;它提供了一種將對象的創建與使用分離的方法。在工廠模式中&#xff0c;我們通過工廠類來創建對象&#xff0c;而不是直接在客戶端代碼中使用 new 關鍵字來創建對象。以下是一個簡單的Java工廠模式示例代碼&#xff1a; 產品接口 首先…

BGP實驗:聯邦和發射器實驗

BGP實驗&#xff1a;聯邦和發射器實驗 一、實驗拓撲 二、實驗要求及分析 實驗要求&#xff1a; 1、AS1存在兩個環回&#xff0c;一個地址為192.168.1.0/24&#xff0c;該地址不能再任何協議中宣告&#xff1b; ? AS3存在兩個環回&#xff0c;一個地址為192.168.2.0/24&…

解決ModuleNotFoundError: No module named ‘open_clip‘問題

在使用stable diffusion大模型時&#xff0c;添加一些模型后啟動df頁面報錯&#xff1a;ModuleNotFoundError: No module named open_clip 使用 pip install open_clip命令下載失敗&#xff0c;報錯&#xff1a; Looking in indexes: https://mirrors.aliyun.com/pypi/simple…

Redis【B站面試題】

前言 2023-07-27 22:44:59 出自B站 灰灰的Java面試 Redis Redis為什么快&#xff1f; 1.純內存KV操作 Redis的操作都是基于內存的&#xff0c;CPU不是 Redis性能瓶頸,&#xff0c;Redis的瓶頸是機器內存和網絡帶寬。 在計算機的世界中&#xff0c;CPU的速度是遠大于內存的速…

Java技術深度解析:高級面試問題與精粹答案(二)

Java 面試問題及答案 1. 什么是Java的垃圾回收機制&#xff1f;它是如何工作的&#xff1f; 答案&#xff1a; Java的垃圾回收機制&#xff08;Garbage Collection&#xff0c;GC&#xff09;是Java運行時環境&#xff08;JRE&#xff09;中的一個功能&#xff0c;用于自動管…

[CocosCreator]Android的增加AndroidX的動態權限

歡迎喜歡或者從事CocosCreator開發的小伙伴請加入我的大家庭CocosCreator游戲開發Q群:26855530 1.首先增加你需要申請的權限,修改:AndroidManifest.xml: <?xml version"1.0" encoding"utf-8"?> <manifest xmlns:android"http://schemas…

深度學習之基于TensorFlow人臉表情識別

歡迎大家點贊、收藏、關注、評論啦 &#xff0c;由于篇幅有限&#xff0c;只展示了部分核心代碼。 文章目錄 一項目簡介 二、功能三、系統四. 總結 一項目簡介 一、項目背景 人臉表情識別是計算機視覺領域的重要研究方向之一&#xff0c;它在人機交互、情感分析、安全監控等領…

代碼隨想錄35期Day48-Java

Day48題目 LeetCode121買股票的最佳時機1 核心思想:可以使用貪心,選擇左邊最小的和右邊最大的,也可以動態規劃,需要保存是否持有股票的狀態,dp[i][0]表示第i天,不持有股票,dp[i][1]表示第i天持有 class Solution {public int maxProfit(int[] prices) {int[][] dp new int[…

python中異步io簡單樣例

目錄 一、異步IO簡單說明 二、代碼樣例 一、異步IO簡單說明 當進行異步 IO 操作時&#xff0c;程序不會因為等待 IO 操作完成而阻塞&#xff0c;而是可以在等待過程中繼續執行其他任務&#xff0c;從而提高了程序的并發性能和響應速度。這是因為異步 IO 操作利用了操作系統底層…

Java 變量和作用域:理解變量的聲明、初始化及其作用域

在Java編程語言中&#xff0c;變量和作用域是兩個核心概念。理解變量的聲明、初始化以及它們的作用域對于編寫健壯且高效的代碼至關重要。 變量的聲明與初始化 變量的聲明 在Java中&#xff0c;變量的聲明指的是定義變量的名稱和類型。在Java中&#xff0c;變量聲明的一般語…

ESP32開發——關于ESP32TimerInterrupt庫的例程分析

最近在學習嵌入式開發的內容&#xff0c;正好有一個開發任務涉及到對于定時器中斷的使用&#xff0c;今天正好找到了這個相關的庫&#xff1a;ESP32TimerInterrupt ESP32TimerInterrupt庫的下載鏈接&#xff08;適用于Arduino IDE&#xff09; 進入到這個地址后直接下載該庫的…

ubuntu使用記錄——如何使用wireshark網絡抓包工具進行檢測速騰激光雷達的ip和端口號

提示&#xff1a;文章寫完后&#xff0c;目錄可以自動生成&#xff0c;如何生成可參考右邊的幫助文檔 文章目錄 前言wireshark網絡抓包工具1.wireshark的安裝2.wireshark的使用3.更改雷達ip 總結 前言 Wireshark是一款備受贊譽的開源網絡協議分析軟件&#xff0c;其功能之強大…