消息隊列 - RabbitMQ
- 1. 初識 MQ
- 1.1 同步調用
- 1.2 異步調用
- 1.3.技術選型
- 2. RabbitMQ
- 2.1 安裝
- 2.2 收發信息
- 2.2.1 交換機(Exchange)
- 2.2.2 隊列
- 2.2.3 綁定關系
- 2.2.4 發送消息
- 2.3 數據隔離
1. 初識 MQ
微服務一旦拆分,必然涉及到服務之間的相互調用,之前講到的基于 OpenFeign 的調用。這種調用中,調用者發起請求后需要等待服務提供者執行業務返回結果后,才能繼續執行后面的業務。也就是說調用者在調用過程中處于阻塞狀態,因此我們稱這種調用方式為同步調用,也可以叫同步通訊。但在很多場景下,我們可能需要采用異步通訊的方式,為什么呢?
兩種方式的區別:
- 同步通訊:就如同打視頻電話,雙方的交互都是實時的。因此同一時刻你只能跟一個人打視頻電話。
- 異步通訊:就如同發微信聊天,雙方的交互不是實時的,你不需要立刻給對方回應。因此你可以多線操作,同時跟多人聊天。
兩種方式各有優劣,打電話可以立即得到響應,但是你卻不能跟多個人同時通話。發微信可以同時與多個人收發微信,但是往往響應會有延遲。
所以,如果我們的業務需要實時得到服務提供方的響應,則應該選擇同步通訊(同步調用)。而如果我們追求更高的效率,并且不需要實時響應,則應該選擇異步通訊(異步調用)。
1.1 同步調用
我們以注冊成功后郵件通知為例
同步調用的業務流程是:
- 發送注冊請求
- 注冊成功將信息存入數據庫
- 發送郵件通知
- 返回成功信息
這樣的業務流程會存在下面三個問題
一、性能低
由于同步調用,調用者需要等待服務提供者執行完返回結果后,才能繼續向下執行,也就是說每次遠程調用,調用者都是阻塞等待狀態。最終整個業務的響應時長就是每次遠程調用的執行時長之和:100+100+300+50 = 550ms
二、擴展性差
比如需要添加短信通知,需要在代碼中添加短信服務的遠程調用代碼。
三、級聯失效
在這個案例中,注冊成功了,但是郵件通知服務出現了問題拋出了錯誤會讓存入數據庫的信息回滾導致整個注冊業務失敗。
需要解決這些問題就需要用異步調用的方式來代替同步調用。
1.2 異步調用
異步調用方式其實就是基于消息通知的方式,一般包含三個角色:
- 消息發送者:投遞消息的人,就是原來的調用方
- 消息Broker:管理、暫存、轉發消息,你可以把它理解成微信服務器
- 消息接收者:接收和處理消息的人,就是原來的服務提供方
在異步調用中,發送者不再直接同步調用接收者的業務接口,而是發送一條消息投遞給消息Broker。然后接收者根據自己的需求從消息Broker那里訂閱消息。每當發送方發送消息后,接受者都能獲取消息并處理。
這樣,發送消息的人和接收消息的人就完全解耦了。
還是以注冊通知為例:
將遠程調用邏輯刪除,改為發送一條消息到Broker。而相關的微服務都可以訂閱消息通知,一旦消息到達Broker,則會分發給每一個訂閱了的微服務,處理各自的業務。
后續需要添加短信通知也只需要添加一個短信服務訂閱消息即可。
而且注冊服務的耗時也和其他服務無關。
綜上,異步調用的優勢包括:
- 耦合度更低
- 性能更好
- 業務拓展性強
- 故障隔離,避免級聯失敗
當然,異步通信也并非完美無缺,它存在下列缺點:
- 完全依賴于Broker的可靠性、安全性和性能
- 架構復雜,后期維護和調試麻煩
1.3.技術選型
消息Broker,目前常見的實現方案就是消息隊列(MessageQueue),簡稱為MQ.
目比較常見的MQ實現:
- ActiveMQ
- RabbitMQ
- RocketMQ
- Kafka
幾種常見MQ的對比:
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社區 | Rabbit | Apache | 阿里 | Apache |
開發語言 | Erlang | Java | Java | Scala&Java |
協議支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定義協議 | 自定義協議 |
可用性 | 高 | 一般 | 高 | 高 |
單機吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延遲 | 微秒級 | 毫秒級 | 毫秒級 | 毫秒以內 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求消息低延遲:RabbitMQ、Kafka
2. RabbitMQ
RabbitMQ 是一個開源的消息中間件(Message Broker),用于在分布式系統中傳遞消息。它實現了高級消息隊列協議(AMQP),這是一種網絡協議,用于在應用程序之間傳遞消息。
以下是 RabbitMQ 的一些主要特性和概念:
- 消息隊列: RabbitMQ 提供了一個消息隊列,允許不同的應用程序通過消息進行通信。發送方將消息放入隊列,而接收方則從隊列中獲取消息。
- 發布/訂閱模型: RabbitMQ 支持發布/訂閱模型,其中一個應用程序(發布者)發布消息,而多個應用程序(訂閱者)可以訂閱并接收這些消息。
- 可靠性: RabbitMQ 提供了持久性,確保即使在代理重新啟動后,隊列和消息也不會丟失。
- 靈活的路由: RabbitMQ 具有靈活的路由機制,可以根據不同的條件將消息路由到不同的隊列。
- 多協議支持: RabbitMQ 支持多種消息傳遞協議,包括 AMQP、STOMP、MQTT 等。
- 插件系統: RabbitMQ 具有豐富的插件系統,可以通過插件擴展其功能,如集群、身份驗證、可視化工具等。
- 可視化管理界面: RabbitMQ 提供了一個易于使用的管理界面,允許用戶監視和管理隊列、交換機等組件。
- 集群支持: RabbitMQ 支持構建集群,以提高可用性和性能。
- 消息確認: 客戶端可以向 RabbitMQ 確認已成功處理消息,確保消息不會因為消費者故障而丟失。
- 死信隊列: RabbitMQ 支持死信隊列,用于處理無法被消費的消息。
2.1 安裝
這里介紹 Docker 安裝 RabbitMQ 的過程。
2.1.1 拉取鏡像
docker pull rabbitmq:management # 拉取帶有web界面的鏡像
2.1.2 啟動容器
docker run -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=root --name mq --hostname q -p 15672:15672 -p 5672:5672 -d rabbitmq:management
# -e RABBITMQ_DEFAULT_USER=root 用戶名
# -e RABBITMQ_DEFAULT_PASS=root 密碼
端口介紹:
- 4369/tcp: Erlang Port Mapper Daemon (EPMD)端口,用于Erlang節點之間的通信。
- 5671/tcp: AMQP over TLS/SSL端口,用于安全的AMQP連接。
- 5672/tcp: 默認的AMQP端口,用于客戶端連接和消息傳遞。
- 15671/tcp: RabbitMQ Management插件的安全AMQP端口。
- 25672/tcp: 集群端口,用于Erlang分布式節點之間的內部通信。
- 15672/tcp: RabbitMQ Management插件的Web管理界面端口,通過Web界面進行RabbitMQ的管理。
2.1.3 訪問管理頁面
http://ip地址:15672
RabbitMQ對應的架構如圖:
其中包含幾個角色:
publisher
:生產者,也就是發送消息的一方consumer
:消費者,也就是消費消息的一方queue
:隊列,存儲消息。生產者投遞的消息會暫存在消息隊列中,等待消費者處理exchange
:交換機,負責消息路由。生產者發送的消息由交換機決定投遞到哪個隊列。virtual host
:虛擬主機,起到數據隔離的作用。每個虛擬主機相互獨立,有各自的exchange、queue
2.2 收發信息
2.2.1 交換機(Exchange)
打開Exchanges
選項,可以看到RabbitMQ預定義了7種交換機,這7種交換機有4種類型。
關于交換機類型的簡介:
-
Direct Exchange(直連交換機):
直連交換機是最簡單的交換機類型。
它將消息路由到與消息中的路由鍵完全匹配的隊列中。
在消息生產者指定的路由鍵和隊列的綁定鍵完全相同時,消息將被發送到相應的隊列。
-
Fanout Exchange(扇出交換機):
- 扇出交換機將消息廣播到與交換機綁定的所有隊列,忽略路由鍵。
- 適用于廣播消息給多個消費者的場景,不關心消息的具體內容。
-
Topic Exchange(主題交換機):
- 主題交換機通過匹配路由鍵和模式的方式來路由消息到隊列。
- 路由鍵可以包含通配符
*
(匹配一個單詞)和#
(匹配零個或多個單詞),允許更復雜的路由規則。
-
Headers Exchange(頭交換機):
- 頭交換機使用消息的頭部信息而不是路由鍵來決定如何路由消息。
- 在綁定隊列時指定一組鍵值對(headers),只有當消息的頭部信息與這些鍵值對完全匹配時,消息才會被路由到隊列。
關于預定義的7種交換機簡介:
- AMQP default:
- 這是一個直連交換機,是 RabbitMQ 的默認交換機。
- 當消息的路由鍵與隊列的綁定鍵完全匹配時,消息將被發送到相應的隊列。
- amq.direct:
- 也是一個直連交換機。
- 類似于 AMQP 默認交換機,將消息路由到與消息中的路由鍵完全匹配的隊列。
- amq.fanout:
- 是一個扇出交換機。
- 將消息廣播到與交換機綁定的所有隊列,忽略路由鍵。
- amq.headers:
- 是一個頭交換機。
- 使用消息的頭部信息而不是路由鍵來決定如何路由消息。在綁定隊列時指定一組鍵值對(headers)。
- amq.match:
- 是一個主題交換機。
- 通過匹配路由鍵和模式的方式來路由消息到隊列。支持更復雜的路由規則。
- amq.rabbitmq.trace:
- 這是一個專用于消息追蹤的交換機。
- 用于在 RabbitMQ 中追蹤消息的流動,通常不用于常規消息傳遞。
- amq.topic:
- 也是一個主題交換機。
- 通過匹配路由鍵和模式的方式來路由消息到隊列。類似于 amq.match,支持更復雜的路由規則。
2.2.2 隊列
打開Queues
選項卡,新建一個隊列:
2.2.3 綁定關系
點擊 amq.fanout
交換機
綁定隊列 queue1
2.2.4 發送消息
點擊 amq.fanout
交換機
發送消息
隊列成功收到消息
2.3 數據隔離
RabbitMQ 使用 virtual host 實現數據隔離
新建一個虛擬主機
添加一個用戶
現在用戶test還沒有能訪問的虛擬主機
切換登錄用戶test后無法獲取沒有訪問權限的虛擬主機的隊列消息。
點擊test為用戶添加虛擬主機
右上角 虛擬主機切換為 /test 看不到其他虛擬主機的消息隊列,這就是基于virtual host
的隔離效果。