《消息隊列學習指南:從 MQ 基礎到 SpringAMQP 實踐》

初識MQ

同步調用

目前我們采用的是基于OpenFeign的同步調用,也就是說業務執行流程是這樣的:

  • 支付服務需要先調用用戶服務完成余額扣減

  • 然后支付服務自己要更新支付流水單的狀態

  • 然后支付服務調用交易服務,更新業務訂單狀態為已支付

三個步驟依次執行。

這其中就存在3個問題:

第一拓展性差

但是隨著業務規模擴大,產品的功能也在不斷完善,最終支付業務會越來越臃腫。

第二性能下降?

采用了同步調用,調用者需要等待服務提供者執行完返回結果后,才能繼續向下執行,也就是說每次遠程調用,調用者都是阻塞等待狀態。最終整個業務的響應時長就是每次遠程調用的執行時長之和:

第三,級聯失敗?

由于我們是基于OpenFeign調用交易服務、通知服務。當交易服務、通知服務出現故障時,整個事務都會回滾,交易失敗。

這其實就是同步調用的級聯失敗問題。

異步調用

異步調用方式其實就是基于消息通知的方式,一般包含三個角色:

  • 消息發送者:投遞消息的人,就是原來的調用方

  • 消息Broker:管理、暫存、轉發消息,你可以把它理解成微信服務器

  • 消息接收者:接收和處理消息的人,就是原來的服務提供方

在異步調用中,發送者不再直接同步調用接收者的業務接口,而是發送一條消息投遞給消息Broker。然后接收者根據自己的需求從消息Broker那里訂閱消息。每當發送方發送消息后,接受者都能獲取消息并處理。

這樣,發送消息的人和接收消息的人就完全解耦了。

異步調用的優勢包括:

  • 耦合度更低

  • 性能更好

  • 業務拓展性強

  • 故障隔離,避免級聯失敗

當然,異步通信也并非完美無缺,它存在下列缺點:

  • 完全依賴于Broker的可靠性、安全性和性能

  • 架構復雜,后期維護和調試麻煩

常見的消息隊列(MessageQueue)

目比較常見的MQ實現:

  • ActiveMQ

  • RabbitMQ

  • RocketMQ

  • Kafka

    RabbitMQActiveMQRocketMQKafka
    公司/社區RabbitApache阿里Apache
    開發語言ErlangJavaJavaScala&Java
    協議支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定義協議自定義協議
    可用性一般
    單機吞吐量一般非常高
    消息延遲微秒級毫秒級毫秒級毫秒以內
    消息可靠性一般一般

追求可用性:Kafka、 RocketMQ 、RabbitMQ

追求可靠性:RabbitMQ、RocketMQ

追求吞吐能力:RocketMQ、Kafka

追求消息低延遲:RabbitMQ、Kafka

RabbitMQ

RabbitMQ是基于Erlang語言開發的開源消息通信中間件,官網地址:

RabbitMQ: One broker to queue them all | RabbitMQ

RabbitMQ對應的架構如圖:

其中包含幾個概念:

  • publisher:生產者,也就是發送消息的一方

  • consumer:消費者,也就是消費消息的一方

  • queue:隊列,存儲消息。生產者投遞的消息會暫存在消息隊列中,等待消費者處理

  • exchange:交換機,負責消息路由。生產者發送的消息由交換機決定投遞到哪個隊列。

  • virtual host:虛擬主機,起到數據隔離的作用。每個虛擬主機相互獨立,有各自的exchange、queue

交換機

首先展示交換機項下的創建交換機:

我們點擊任意交換機,即可進入交換機詳情頁面。仍然會利用控制臺中的publish message 發送一條消息:

隊列

我們打開Queues選項卡,新建一個隊列:

數據隔離

用戶管理

點擊Admin選項卡,首先會看到RabbitMQ控制臺的用戶管理界面:

virtual host

SpringAMQP

???RabbitMQ采用了AMQP協議,因此它具備跨語言的特性。任何語言只要遵循AMQP協議收發消息,都可以與RabbitMQ交互。并且RabbitMQ官方也提供了各種不同語言的客戶端。

SpringAmqp的官方地址:

Spring AMQP

SpringAMQP提供了三個功能:

  • 自動聲明隊列、交換機及其綁定關系

  • 基于注解的監聽器模式,異步接收消息

  • 封裝了RabbitTemplate工具,用于發送消息

快速入門

  • publisher直接發送消息到隊列

  • 消費者監聽并處理隊列中的消息

導入依賴
<!--AMQP依賴,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

工程中就可以直接使用SpringAMQP了。

消息發送

首先配置MQ地址,在publisher服務的application.yml中添加配置:

spring:rabbitmq:host: 192.168.100.128 # 你的虛擬機IPport: 5672 # 端口virtual-host: /hmall # 虛擬主機username: hmall # 用戶名password: 123 # 密碼

然后在publisher服務中編寫測試類SpringAmqpTest,并利用RabbitTemplate實現消息發送:

消息接收

首先配置MQ地址,在consumer服務的application.yml中添加配置:

spring:rabbitmq:host: 192.168.150.101 # 你的虛擬機IPport: 5672 # 端口virtual-host: /hmall # 虛擬主機username: hmall # 用戶名password: 123 # 密碼

然后在consumer服務的com.itheima.consumer.listener包中新建一個類SpringRabbitListener,代碼如下:

測試

啟動consumer服務,然后在publisher服務中運行測試代碼,發送MQ消息。最終consumer收到消息:

WorkQueues模型

Work queues,任務模型。簡單來說就是多個消費者綁定到一個隊列,共同消費隊列中的消息

當消息處理比較耗時的時候,可能生產消息的速度會遠遠大于消息的消費速度。長此以往,消息就會堆積越來越多,無法及時處理。

此時就可以使用work 模型,

多個消費者共同處理消息處理,消息處理的速度就能大大提高了。

但消息是平均分配給每個消費者,并沒有考慮到消費者的處理能力。

導致1個消費者空閑,另一個消費者忙的不可開交。沒有充分利用每一個消費者的能力

能者多勞

在spring中有一個簡單的配置,可以解決這個問題。我們修改consumer服務的application.yml文件,添加配置:

spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能獲取一條消息,處理完成才能獲取下一個消息

?這樣充分利用了每一個消費者的處理能力,可以有效避免消息積壓問題。

總結

Work模型的使用:

  • 多個消費者綁定到一個隊列,同一條消息只會被一個消費者處理

  • 通過設置prefetch來控制消費者預取的消息數量

交換機類型

  • Publisher:生產者,不再發送消息到隊列中,而是發給交換機

  • Exchange:交換機,一方面,接收生產者發送的消息。另一方面,知道如何處理消息,例如遞交給某個特別隊列、遞交給所有隊列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。

  • Queue:消息隊列也與以前一樣,接收消息、緩存消息。不過隊列一定要與交換機綁定。

  • Consumer:消費者,與以前一樣,訂閱隊列,沒有變化

Exchange(交換機只負責轉發消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規則的隊列,那么消息會丟失!

交換機的類型有四種:

  • Fanout:廣播,將消息交給所有綁定到交換機的隊列。我們最早在控制臺使用的正是Fanout交換機

  • Direct:訂閱,基于RoutingKey(路由key)發送給訂閱了消息的隊列

  • Topic:通配符訂閱,與Direct類似,只不過RoutingKey可以使用通配符

  • Headers:頭匹配,基于MQ的消息頭匹配,用的較少。

Fanout交換機

Fanout,英文翻譯是扇出,我們學過的廣播,發出消息任何綁定的隊列都可以收到。

  • 1) 可以有多個隊列

  • 2) 每個隊列都要綁定到Exchange(交換機)

  • 3) 生產者發送的消息,只能發送到交換機

  • 4) 交換機把消息發送給綁定過的所有隊列

  • 5) 訂閱隊列的消費者都能拿到消息

消息發送

在有交換機參與時,發送方調用的參數時要注意參數類型

rabbitTemplate.convertAndSend( exchangeName,? "",? message );

第一個參數:交換機名稱

第二個參數:交換機與隊列綁定的RoutingKey值

第三個參數:消息對象

總結

交換機的作用:

  • 接收publisher發送的消息

  • 將消息按照規則路由到與之綁定的隊列

  • 不能緩存消息,路由失敗,消息丟失

  • FanoutExchange的會將消息路由到每個綁定的隊列

Direct交換機

在Fanout模式中,一條消息,會被所有訂閱的隊列都消費。但是,在某些場景下,我們希望不同的消息被不同的隊列消費。這時就要用到Direct類型的Exchange。

在Direct模型下:

  • 隊列與交換機的綁定,不能是任意綁定了,而是要指定一個RoutingKey(路由key)

  • 消息的發送方在 向 Exchange發送消息時,也必須指定消息的 RoutingKey

  • Exchange不再把消息交給每一個綁定的隊列,而是根據消息的Routing Key進行判斷,只有隊列的Routingkey與消息的 Routing key完全一致,才會接收到消息

總結

描述下Direct交換機與Fanout交換機的差異

  • Fanout交換機將消息路由給每一個與之綁定的隊列

  • Direct交換機根據RoutingKey判斷路由給哪個隊列

  • 如果多個隊列具有相同的RoutingKey,則與Fanout功能類似

Topic交換機

說明

Topic類型的ExchangeDirect相比,都是可以根據RoutingKey把消息路由到不同的隊列。

只不過Topic類型Exchange可以讓隊列在綁定BindingKey 的時候使用通配符!

BindingKey 一般都是有一個或多個單詞組成,多個單詞之間以.分割,例如: item.insert

通配符規則:

  • #:匹配一個或多個詞

  • *:匹配不多不少恰好1個詞

總結

描述下Direct交換機與Topic交換機的差異?

  • Topic交換機接收的消息RoutingKey必須是多個單詞,以 . 分割

  • Topic交換機與隊列綁定時的bindingKey可以指定通配符

  • #:代表0個或多個詞

  • *:代表1個詞

聲明隊列和交換機

? ? ? ? ?通過編寫代碼的方式來聲明創建交換機和隊列

? ? ? ? ?程序啟動時檢查隊列和交換機是否存在,如果不存在自動創建。

基本API

SpringAMQP提供了一個Queue類,用來創建隊列:

SpringAMQP還提供了一個Exchange接口,來表示所有不同類型的交換機:

我們可以自己創建隊列和交換機,不過SpringAMQP還提供了ExchangeBuilder來簡化這個過程:

而在綁定隊列和交換機時,則需要使用BindingBuilder來創建Binding對象:

示例:

基于注解聲明

基于@Bean的方式聲明隊列和交換機比較麻煩,Spring還提供了基于注解方式來聲明。

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){System.out.println("消費者1接收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){System.out.println("消費者2接收到direct.queue2的消息:【" + msg + "】");
}

消息轉換器

Spring的消息發送代碼接收的消息體是一個Object:

而在數據傳輸時,它會把你發送的消息序列化為字節發送給MQ,接收消息的時候,還會把字節反序列化為Java對象。

只不過,默認情況下Spring采用的序列化方式是JDK序列化。眾所周知,JDK序列化存在下列問題:

  • 數據體積過大

  • 有安全漏洞

  • 可讀性差

配置JSON轉換器

顯然,JDK序列化方式并不合適。我們希望消息體的體積更小、可讀性更高,因此可以使用JSON方式來做序列化和反序列化。

publisherconsumer兩個服務中都引入依賴:

<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>

注意,如果項目中引入了spring-boot-starter-web依賴,則無需再次引入Jackson依賴。

配置消息轉換器,在publisherconsumer兩個服務的啟動類中添加一個Bean即可:

@Bean
public MessageConverter messageConverter(){// 1.定義消息轉換器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自動創建消息id,用于識別不同消息,也可以在業務中基于ID判斷是否是重復消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;
}

注意: publisher用什么類型的消息傳遞,接收者也要用什么類型來接收

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/916885.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/916885.shtml
英文地址,請注明出處:http://en.pswp.cn/news/916885.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

深度學習 --- 過擬合與欠擬合

深度學習 — 過擬合與欠擬合 文章目錄深度學習 --- 過擬合與欠擬合一.概念1.1 過擬合1.2 欠擬合1.3 判斷方式二&#xff0c;解決欠擬合三&#xff0c;解決過擬合3.1 L2正則化3.1.1 定義以及作用3.1.2 代碼3.2 L1正則化3.3 L1與L2對比3.4 Dropout示例3.5 數據增強3.5.1 圖片縮放…

Python 之抽象方法 @abstractmethod 的理解

如果你熟悉 Java 的話&#xff0c;Java 里有一個抽象接口的概念&#xff0c;Python 里的抽象方法基本上與其類似。在 Python 中&#xff0c;abstractmethod 是一個裝飾器&#xff0c;用于定義抽象方法。它是實現抽象基類&#xff08;Abstract Base Class, ABC&#xff09;的核心…

深度學習·pytorch

廣播機制 從末尾開始逐個維度遍歷兩個矩陣的shape&#xff0c;如果維度不相同&#xff0c;則考慮廣播&#xff1a;任一方的維度為1或者維度不存在(小矩陣廣播為大矩陣)&#xff0c;這樣的運算可以廣播 可以廣播的例子 xtorch.empty(5,3,4,1) ytorch.empty(3,1,1) (x.add_(y)).s…

SpringBoot集成deepseek

pom文件&#xff1a;<?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org…

JetBrains Annotations:從入門到落地,徹底告別 NullPointerException

本文基于三篇高質量博客&#xff08;JetBrains Annotations官方文檔、Jakarta Validation 規范、《Effective Java》第3版&#xff09;的原文內容&#xff0c;結合作者在一線研發團隊落地 JetBrains Annotations 的實戰經驗&#xff0c;系統梳理了該注解庫的核心能力、使用姿勢…

基于Rust與HDFS、YARN、Hue、ZooKeeper、MySQL

基于Rust與HDFS、YARN、Hue、ZooKeeper、MySQL集合 以下是基于Rust與HDFS、YARN、Hue、ZooKeeper、MySQL等技術棧結合的實例,涵蓋不同場景和應用方向: 數據處理與分析 使用Rust編寫MapReduce作業,通過YARN提交到HDFS處理大規模數據集。Rust的高性能特性適合處理密集型計算…

芯片上市公司正在放棄射頻業務

轉載自--鐘林談芯射頻芯片賽道本來不卷的&#xff0c;投資人多了也就卷了。本周&#xff0c;多家媒體報道某芯片上市公司終止射頻業務&#xff0c;終止射頻業務的何止一家芯片上市公司&#xff0c;從去年開始就逐漸有上市公司終止射頻業務&#xff0c;開啟清貨模式。如人飲水&a…

Jmeter 性能測試監控之ServerAgent

使用 Jmeter 對 Linux 服務器的進行壓測時&#xff0c;想要監控服務器的 CPU 、內存&#xff0c;可以通過添加插件 【ServerAgent】來觀察,可以實時監控性能指標 一、ServerAgent-2.2.3下載 下載地址&#xff1a; GitCode - 全球開發者的開源社區,開源代碼托管平臺 二、通過插…

5.蘋果ios逆向-過ssl證書檢測和安裝ssh和獲取root權限

免責聲明&#xff1a;內容僅供學習參考&#xff0c;請合法利用知識&#xff0c;禁止進行違法犯罪活動&#xff01; 內容參考于&#xff1a;圖靈Python學院 工具下載&#xff1a; 鏈接&#xff1a;https://pan.baidu.com/s/1bb8NhJc9eTuLzQr39lF55Q?pwdzy89 提取碼&#xff1…

Navicat 17 教程:Windows 和 Mac 系統適用

一、引言 對于程序員們來說&#xff0c;Navicat是一款極為實用的數據庫管理工具。Navicat 17更是帶來了諸多新特性&#xff0c;能大大提升我們的工作效率。今天就為大家帶來Navicat 17在Windows和Mac系統上的使用教程。 二、準備工作 &#xff08;一&#xff09;下載安裝包 「…

Android 中 實現柱狀圖自定義控件

一、基本思路 創建自定義控件的數據模型; 創建一個自定義 View 類,繼承自 View; 在初始化方法中獲取自定義屬性的值。 創建設置數據方法,將數據模型列表轉換成自定義繪制時的數據; 重寫 onDraw 方法,以實現自定義的繪制邏輯。 二、主要繪制方法 1、drawLine 繪制直線 p…

Netty 核心原理與實戰:從 DiscardServer 看透 Reactor 模式與組件協作

目錄 Netty 是什么&#xff1f; Netty 的目標 Netty 實戰案例 DiscardServer 服務端程序 NettyDiscardServer 業務處理器 NettyDiscardHandler 配置類 NettyDemoConfig 回顧 Reactor 模式中的 IO 事件處理流程 Netty 中的 Channel Netty 中的 Reactor Netty 中的 Han…

關于“LoggerFactory is not a Logback LoggerContext but Logback is on ......“的解決方案

? ?重磅&#xff01;盹貓的個人小站正式上線啦&#xff5e;誠邀各位技術大佬前來探秘&#xff01;? 這里有&#xff1a; 硬核技術干貨&#xff1a;編程技巧、開發經驗、踩坑指南&#xff0c;帶你解鎖技術新姿勢&#xff01;趣味開發日常&#xff1a;代碼背后的腦洞故事、工具…

2025年6月電子學會青少年軟件編程(C語言)等級考試試卷(三級)

答案和更多內容請查看網站&#xff1a;【試卷中心 -----> 電子學會 ----> C/C ---->三級】 網站鏈接 青少年軟件編程歷年真題模擬題實時更新 編程題 第 1 題 打印城門 題目描述 給定一個正整數 n&#xff0c;輸出如下的星號城門。具體格式請見樣例。 輸入格…

跨平臺直播美顏SDK開發指南:兼顧性能與美型效果的最佳實踐

面對iOS、Android乃至Web等多端應用需求&#xff0c;如何開發一款真正跨平臺、兼顧性能與美型效果的美顏SDK&#xff0c;成為眾多開發團隊和產品經理的一道必答題。 今天筆者這篇文章&#xff0c;就從架構設計、性能優化、視覺效果調校三個關鍵維度&#xff0c;帶你深入解析跨平…

2025數字藏品安全保衛戰:高防CDN如何成為NFT應用的“隱形護甲”?

副標題&#xff1a; 從DDoS防御到全球加速&#xff0c;拆解數字資產平臺的生死防線&#x1f310; 引言&#xff1a;當數字藏品成為黑客的“頭號靶場”2025年全球數字藏品市場突破$1000億&#xff0c;但安全事件同步激增230%——某頭部NFT平臺因3.2Tbps DDoS攻擊癱瘓&#xff0c…

linux 執行sh腳本,提示$‘\r‘: command not found

1、在Linux下執行某個腳本文件卻提示$\r: command not found&#xff0c;如下圖:2、錯誤原因:a、 Windows 風格的換行符&#xff1a;Windows 系統使用 \r\n 作為行結束符&#xff0c;而 Linux 和 Unix 系統使用 \n。當你從 Windows 環境中復制文本到 Linux 環境時&#xff0c;可…

使用HaiSnap做了一款取件碼App(一鍵生成)

你是否懷揣著奇思妙想&#xff0c;卻因不懂代碼而對開發應用望而卻步&#xff1f;現在&#xff0c;有一個神奇AI Agent&#xff08;響指HaiSnap&#xff09;&#xff0c;一個響指就能實現&#xff0c;你說神奇不&#xff1f;只需要一句話就可以生成你想要的應用&#xff01;讓你…

容器與虛擬機的本質差異:從資源隔離到網絡存儲機制

目錄 專欄介紹 作者與平臺 您將學到什么&#xff1f; 學習特色 容器與虛擬機的本質差異&#xff1a;從資源隔離到網絡存儲機制 一、容器與虛擬機的本質區別 1.1 資源抽象層次差異 1.2 資源消耗與性能對比 1.3 隔離性深度差異 二、容器網絡基礎架構 2.1 Docker網絡模型…

ros2 launch文件編寫詳解

一個完整的簡單的launch文件配置過程1.編寫launch文件2.配置package.xml3.配置setup.py&#xff08;python包&#xff09;4.配置CMakeList(C包)5.編譯運行# 在 ROS 2 的 Python 啟動文件中&#xff0c;這些導入語句用于引入各類啟動模塊&#xff0c;以構建和配置節點啟動流程 f…