文章目錄
- 簡單介紹RabbitMQ
- RabbitMQ架構
- 什么是 RabbitMQ?有什么顯著的特點?
- RabbitMQ 有那些基本概念?
- RabbitMQ routing 路由模式
- 消息怎么路由?
- RabbitMQ publish/subscribe 發布訂閱(共享資源)
- 能夠在地理上分開的不同數據中心使用 RabbitMQ cluster 么?
- 什么情況下會出現 blackholed 問題?
- 消息如何分發?
- Basic.Reject 的用法是什么?
- 什么是 Binding 綁定?
- RabbitMQ如何確保消息的不丟失
- RabbitMQ如何避免消息堆積
- 如何保證RabbitMQ的高可用
- RabbitMQ 如何構建集群?
- RabbitMQ 支持哪些消息模式?
- RabbitMQ 中有哪幾種交換機類型?
- RabbitMQ 如何實現消息的持久化?
- RabbitMQ 是如何實現死信隊列的?
- RabbitMQ 中如何進行事務處理?
- 聊一聊常用的 RabbitMQ 插件
簡單介紹RabbitMQ
RabbitMQ是一款基于AMQP協議的、穩定易用的消息中間件
其穩定體現在其確保消息的不丟失能力,通過從生產端、broker端、消費者端來保障。
另外其支持延時隊列、死信機制等,提高了它的使用覆蓋場景。
RabbitMQ架構
RabbitMQ架構會涉及如下模型:Producer、Consumer、Queue、Exchange、Broker、RoutingKey、BindingKey。
什么是 RabbitMQ?有什么顯著的特點?
RabbitMQ 是一個開源的消息中間件,使用 Erlang 語言開發。這種語言天生非常適合分布式場景,RabbitMQ 也就非常適用于在分布式應用程序之間傳遞消息。RabbitMQ 有非常多顯著的特點:
- 消息傳遞模式:RabbitMQ 支持多種消息傳遞模式,包括發布/訂閱、點對點和工作隊列等,使其更靈活適用于各種消息通信場景。
- 消息路由和交換機:RabbitMQ 引入了交換機(Exchange)的概念,用于將消息路由到一個或多個隊列。這允許根據消息的內容、標簽或路由鍵進行靈活的消息路由,從而實現更復雜的消息傳遞邏輯。
- 消息確認機制:RabbitMQ 支持消息確認機制,消費者可以確認已成功處理消息。這確保了消息不會在傳遞后被重復消費,增加了消息的可靠性。
- 可擴展性:RabbitMQ 是高度可擴展的,可以通過添加更多的節點和集群來增加吞吐量和可用性。這使得 RabbitMQ 適用于大規模的分布式系統。
- 多種編程語言支持:RabbitMQ 提供了多種客戶端庫和插件,支持多種編程語言,包括 Java、Python、Ruby、Node.js 等,使其在不同技術棧中都能方便地集成和使用。
- 消息持久性:RabbitMQ 允許消息和隊列的持久性設置,確保消息在 RabbitMQ 重新啟動后不會丟失。這對于關鍵的業務消息非常重要。
- 靈活的插件系統:RabbitMQ 具有豐富的插件系統,使其可以擴展功能,包括管理插件、數據復制插件、分布式部署插件等。
- 管理界面:RabbitMQ 提供了一個易于使用的 Web 管理界面,用于監視和管理隊列、交換機、連接和用戶權限等。
總之,RabbitMQ 是一個功能豐富、高度可擴展且靈活的消息中間件,適用于各種分布式應用程序和消息通信需求。它的強大功能和廣泛的社區支持使其成為一個流行的消息中間件解決方案。
RabbitMQ 有那些基本概念?
- Broker:簡單來說就是消息隊列服務器實體。
- Exchange:消息交換機,它指定消息按什么規則,路由到哪個隊列。
- Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。
- Binding:綁定,它的作用就是把 exchange 和 queue 按照路由規則綁定起來。
- Routing Key:路由關鍵字,exchange 根據這個關鍵字進行消息投遞。
- VHost:vhost 可以理解為虛擬 broker ,即 mini-RabbitMQ server。其內部均含有獨立的 queue、exchange 和 binding 等,但最最重要的是,其擁有獨立的權限 系統,可以做到 vhost 范圍的用戶控制。當然,從RabbitMQ 的全局?度,vhost 可以作為不同權限隔離的手段(一個典型的例子就是不同的應用可以跑在不同的 vhost 中)。
- Producer:消息生產者,就是投遞消息的程序。
- Consumer:消息消費者,就是接受消息的程序。
- Channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個 channel代表一個會話任務。
- 由 Exchange、Queue、RoutingKey 三個才能決定一個從 Exchange 到 Queue 的唯一的線路。
RabbitMQ routing 路由模式
消息生產者將消息發送給交換機按照路由判斷,路由是字符串(info) 當前產生的消息攜 帶路由字符(對象的方法),交換機根據路由的 key,只能匹配上路由 key 對應的消息隊列, 對應的消費者才能消費消息。
根據業務功能定義路由字符串。
從系統的代碼邏輯中獲取對應的功能字符串,將消息任務扔到對應的隊列中。
業務場景:error 通知、EXCEPTION、錯誤通知的功能、傳統意義的錯誤通知、客戶 通知、利用 key 路由,可以將程序中的錯誤封裝成消息傳入到消息隊列中,開發者可以自 定義消費者,實時接收錯誤。
消息怎么路由?
消息提供方->路由->一至多個隊列消息發布到交換器時,消息將擁有一個路由鍵 (routing key),在消息創建時設定。通過隊列路由鍵,可以把隊列綁定到交換器上。消 息到達交換器后,RabbitMQ 會將消息的路由鍵與隊列的路由鍵進行匹配(針對不同的交換器有不同的路由規則)。
常用的交換器主要分為一下三種:
- fanout:如果交換器收到消息,將會廣播到所有綁定的隊列上。
- direct:如果路由鍵完全匹配,消息就被投遞到相應的隊列。
- topic:可以使來自不同源頭的消息能夠到達同一個隊列。 使用 topic 交換器時,可以使用通配符。
RabbitMQ publish/subscribe 發布訂閱(共享資源)
- 每個消費者監聽自己的隊列。
- 生產者將消息發給 broker,由交換機將消息轉發到綁定此交換機的每個隊列,每個綁定交換機的隊列都將接收到消息。
能夠在地理上分開的不同數據中心使用 RabbitMQ cluster 么?
不能。
- 第一,你無法控制所創建的 queue 實際分布在 cluster 里的哪個 node 上(一般使用HAProxy + cluster 模型時都是這樣),這可能會導致各種跨地域訪問時的常?問題。
- 第二,Erlang 的 OTP 通信框架對延遲的容忍度有限,這可能會觸發各種超時,導致業務 疲于處理。
- 第三,在廣域網上的連接失效問題將導致經典的“腦裂”問題,而 RabbitMQ 目前無法 處理(該問題主要是說Mnesia)。
什么情況下會出現 blackholed 問題?
blackholed 問題是指,向 exchange 投遞了 message ,而由于各種原因導致該 message 丟失,但發送者卻不知道。可導致 blackholed 的情況:
- 向未綁定 queue 的 exchange 發送 message;
- exchange 以binding_key key_A 綁定了 queue queue_A,但向該 exchange 發送 message 使用的 routing_key 卻是key_B。
消息如何分發?
- 若該隊列至少有一個消費者訂閱,消息將以循環(round-robin)的方式發送給消費者。每條消息只會分發給一個訂閱的消費者(前提是消費者能夠正常處理消息并進行確認)。
- 通過路由可實現多消費的功能
Basic.Reject 的用法是什么?
該信令可用于 consumer 對收到的 message 進行 reject 。若在該信令中設置 requeue=true,則當RabbitMQ server 收到該拒絕信令后,會將該 message 重新發 送到下一個處于 consume 狀態的 consumer處(理論上仍可能將該消息發送給當前 consumer)。若設置 requeue=false ,則 RabbitMQ server 在收到拒絕信令后,將直 接將該 message 從 queue 中移除。
另外一種移除 queue 中 message 的小技巧是,consumer 回復 Basic.Ack 但不對獲 取到的 message 做任何處理。而 Basic.Nack 是對 Basic.Reject 的擴展,以支持一次 拒絕多條 message 的能力。
什么是 Binding 綁定?
通過綁定將交換器和隊列關聯起來,一般會指定一個 BindingKey,這樣 RabbitMq 就知道 如何正確路由消息到隊列了。
RabbitMQ如何確保消息的不丟失
RabbitMQ針對消息傳遞過程中可能發生問題的各個地方,給出了針對性的解決方案:
- 生產者發送消息時可能因為網絡問題導致消息沒有到達交換機:RabbitMQ提供了publisher confirm機制,生產者發送消息后,可以編寫ConfirmCallback函數;消息成功到達交換機后,RabbitMQ會調用ConfirmCallback通知消息的發送者,返回ACK;消息如果未到達交換機,RabbitMQ也會調用ConfirmCallback通知消息的發送者,返回NACK;消息超時未發送成功也會拋出異常。
- 消息到達交換機后,如果未能到達隊列,也會導致消息丟失:RabbitMQ提供了publisher return機制,生產者可以定義ReturnCallback函數,消息到達交換機,未到達隊列,RabbitMQ會調用ReturnCallback通知發送者,告知失敗原因。
- 消息到達隊列后,MQ宕機也可能導致丟失消息:RabbitMQ提供了持久化功能,集群的主從備份功能;消息持久化,RabbitMQ會將交換機、隊列、消息持久化到磁盤,宕機重啟可以恢復消息;鏡像集群,仲裁隊列,都可以提供主從備份功能,主節點宕機,從節點會自動切換為主,數據依然在。
- 消息投遞給消費者后,如果消費者處理不當,也可能導致消息丟失:SpringAMQP基于RabbitMQ提供了消費者確認機制、消費者重試機制,消費者失敗處理策略。
消費者的確認機制:
消費者處理消息成功,未出現異常時,Spring返回ACK給RabbitMQ,消息才被移除
消費者處理消息失敗,拋出異常,宕機,Spring返回NACK或者不返回結果,消息不被異常
消費者重試機制:
默認情況下,消費者處理失敗時,消息會再次回到MQ隊列,然后投遞給其它消費者。Spring提供的消費者重試機制,則是在處理失敗后不返回NACK,而是直接在消費者本地重試。多次重試都失敗后,則按照消費者失敗處理策略來處理消息。避免了消息頻繁入隊帶來的額外壓力。
消費者失敗策略:
當消費者多次本地重試失敗時,消息默認會丟棄。
Spring提供了Republish策略,在多次重試都失敗,耗盡重試次數后,將消息重新投遞給指定的異常交換機,并且會攜帶上異常棧信息,幫助定位問題。
RabbitMQ如何避免消息堆積
消息堆積問題產生的原因往往是因為消息發送的速度超過了消費者消息處理的速度。因此解決方案無外乎以下三點:
- 提高消費者處理速度:優化業務代碼,批量處理業務,多線程并發處理業務。
- 增加更多消費者:一個隊列綁定多個消費者,共同爭搶任務,自然可以提供消息處理的速度。
- 增加隊列消息存儲上限:在RabbitMQ的1.8版本后,加入了新的隊列模式:Lazy Queue。這種隊列不會將消息保存在內存中,而是在收到消息后直接寫入磁盤中,理論上沒有存儲上限。可以解決消息堆積問題。
如何保證RabbitMQ的高可用
要實現RabbitMQ的高可用無外乎下面兩點:
- 做好交換機、隊列、消息的持久化
- 搭建RabbitMQ的鏡像集群,做好主從備份。當然也可以使用仲裁隊列代替鏡像集群。
RabbitMQ 如何構建集群?
RabbitMQ 支持兩種主要類型的集群:普通集群(Classic Cluster)和鏡像集群(Mirrored Cluster)。他們之間有一些重要的區別:
- 普通集群: 這種模式使用Erlang語言天生具備的集群方式搭建。這種集群模式下,集群的各個節點之間只會有相同的元數據,即隊列結構,而消息不會進行冗余,只存在一個節點中。消費時,如果消費的不是存有數據的節點, RabbitMQ會臨時在節點之間進行數據傳輸,將消息從存有數據的節點傳輸到消費的節點。很顯然,這種集群模式的消息可靠性不是很高。因為如果其中有個節點服務宕機了,那這個節點上的數據就無法消費了,需要等到這個節點服務恢復后才能消費,而這時,消費者端已經消費過的消息就有可能給不了服務端正確應答,服務起來后,就會再次消費這些消息,造成這部分消息重復消費。 另外,如果消息沒有做持久化,重啟就消息就會丟失。并且,這種集群模式也不支持高可用,即當某一個節點服務掛了后,需要手動重啟服務,才能保證這一部分消息能正常消費。所以這種集群模式只適合一些對消息安全性不是很高的場景。而在使用這種模式時,消費者應該盡量的連接上每一個節點,減少消息在集群中的傳輸。
- 鏡像集群:這種模式是在普通集群模式基礎上的一種增強方案,這也就是RabbitMQ的官方HA高可用方案。需要在搭建了普通集群之后再補充搭建。其本質區別在于,這種模式會在鏡像節點中間主動進行消息同步,而不是在客戶端拉取消息時臨時同步。并且在集群內部有一個算法會選舉產生master和slave,當一個master掛了后,也會自動選出一個來。從而給整個集群提供高可用能力。這種模式的消息可靠性更高,因為每個節點上都存著全量的消息。而他的弊端也是明顯的,集群內部的網絡帶寬會被這種同步通訊大量的消耗,進而降低整個集群的性能。這種模式下,隊列數量最好不要過多
總的來說,普通集群適用于對性能要求高,但可以接受數據丟失的情況。而鏡像集群則適用于對數據持久性和可用性有更高要求,并愿意付出一些性能代價的場景。
RabbitMQ 支持哪些消息模式?
RabbitMQ 支持多種消息傳遞模式,這些模式允許應用程序在不同的場景下進行靈活的消息交流。以下是幾種最常見的消息分發機制:
- workQueue 工作序列機制: Producer 將消息發送到 queue,多個 Consumer 同時消費Queue 上的消息。消息會均勻的分配給多個 Consumer 處理。
- Publish/Subscribe 訂閱發布機制: Producer 只負責將消息發送到exchange交換機上。Exchange 將消息轉發到所有訂閱的 Queue,并由對應的 Consumer 去進行消費
- Routing 基于內容路由機制:在訂閱發布機制的基礎上,增加一個routingKey,并根據routingKey判斷 Exchange 將消息轉發到哪些 Queue 上。
- Topic 基于話題路由機制:在基于內容路由的基礎上,對routingKey增加了模糊匹配的功能。
另外,RabbitMQ 還支持雙向同步的 RPC 機制,不過一般用得比較少。這些消息模式允許開發者根據應用程序的需求選擇合適的消息通信方式,以滿足不同的業務場景和可靠性要求。不同的模式可以用于構建各種類型的分布式系統和應用程序。
RabbitMQ 中有哪幾種交換機類型?
RabbitMQ 支持多種交換機(Exchange)類型,每種類型都用于不同的消息路由和分發策略:
- Direct Exchange:這種交換機根據消息的路由鍵(Routing Key)將消息發送到與之完全匹配的隊列。只有當消息的路由鍵與隊列綁定時指定的路由鍵完全相同時,消息才會被路由到隊列。這是一種簡單的路由策略,適用于點對點通信。
- Topic Exchange:這種交換機根據消息的路由鍵與隊列綁定時指定的路由鍵模式(通配符)匹配程度,將消息路由到一個或多個隊列。路由鍵可以使用通配符符號 *(匹配一個單詞)和 #(匹配零個或多個單詞),允許更靈活的消息路由。用于發布/訂閱模式和復雜的消息路由需求。
- Headers Exchange:這種交換機根據消息的標頭信息(Headers)來決定消息的路由,而不是使用路由鍵。隊列和交換機之間的綁定規則是根據標頭鍵值對來定義的,只有當消息的標頭與綁定規則完全匹配時,消息才會被路由到隊列。適用于需要復雜消息匹配的場景。
- Fanout Exchange:這種交換機將消息廣播到與之綁定的所有隊列,無論消息的路由鍵是什么。用于發布/訂閱模式,其中一個消息被廣播給所有訂閱者。
- Default Exchange:這是 RabbitMQ 默認實現的一種交換機,它不需要手動創建。當消息發布到默認交換機時,路由鍵會被解釋為隊列的名稱,消息會被路由到與路由鍵名稱相同的隊列。默認交換機通常用于點對點通信,但不支持復雜的路由策略。
這些不同類型的交換機允許你在 RabbitMQ 中實現各種不同的消息路由和分發策略,以滿足不同的應用需求。選擇適當的交換機類型對于有效的消息傳遞非常重要。
RabbitMQ 如何實現消息的持久化?
RabbitMQ 允許消息的持久化,以確保即使在 RabbitMQ 服務器重新啟動后,消息也不會丟失。RabbitMQ 可以通過以下方式實現消息的持久化:
- 消息持久化:在 RabbitMQ 中,只需要在發送消息時,將delivery_mode屬性設置為 2,就可以將消息標記為持久化。
- 隊列持久化:在 RabbitMQ 中聲明隊列時,也可以將隊列聲明為持久化。RabbitMQ 中的隊列分為三種不同類型經典隊列,仲裁隊列和流式隊列。其中,經典隊列需要將durable屬性設置為true。而仲裁隊列和流式隊列默認必須持久化保存。
- 交換機持久化:與經典隊列類似,RabbitMQ 也可以在聲明交換機時,將交換機的 durable 屬性設置為true,這樣就可以將交換機標記為持久化。
RabbitMQ 的持久化機制會對其性能產生影響。因此,需要根據具體的業務場景和需求來權衡是否需要持久化以及需要哪種類型的持久化。
RabbitMQ 是如何實現死信隊列的?
死信隊列是 RabbitMQ 提供的一種特殊序列,處理那些無法被正常消費的消息。有三種情況會產生死信:
- 消息被消費者明確拒絕。
- 消息達到預設的過期時間仍沒有消費者消費。
- 消息由于隊列已經達到最大長度限制而被丟棄。
在 RabbitMQ 中,實現死信隊列只需要給正常隊列增加三個核心參數即可:
- dead-letter-exchange:指定當前隊列對應的死信隊列
- dead-letter-routing-key:指定消息轉入死信隊列時的路由鍵
- message-ttl:消息在隊列中的過期時間。
接下來,就可以往正常隊列中發送消息。如果消息滿足了某些條件,就會成為死信,并被重新發送到對應的死信隊列中。而此時,RabbitMQ 會在消息的頭部添加一些與死信相關的補充信息,例如時間、成為死信的原因、原隊列等。應用程序可以按需處理這些補充的信息。
最后,死信隊列中的消息都是正常業務處理失敗的消息,應用程序需要創建一個消費者來專門處理這些被遺漏的消息。例如記錄日志、發送警報等。這樣才能保證業務數據的完整性。
RabbitMQ 中如何進行事務處理?
RabbitMQ 提供了事務處理機制,允許生產者在發送消息時將操作包裝在一個事務中,以確保消息的可靠性傳遞。在 RabbitMQ 中,事務是通過通道(Channel)來實現的。可以通過以下步驟進行事務處理:
- 開啟事務:在生產者端,可以通過調用 Channel 的 tx_select 方法來開啟一個事務。這將啟動一個新的事務,并將所有后續的消息發布操作放在該事務內。
- 發送消息:接下來在事務中,可以正常發送消息。如果消息發送失敗,事務會自動回滾。
- 提交事務:如果事務中所有消息發送成功后,需要提交事務。可以通過調用 Channel 的tx_commit方法提交事務。
- 處理異常:如果在事務過程中發生異常,可以使用 try/catch 快來捕獲異常。然后在異常處理過程中,調用 Channel 的 tx_rollback 方法來回滾 RabbitMQ 相關的事務操作。
需要注意的是,RabbitMQ 的事務處理是基于存儲過程的,它可以保證在事務中的操作要么全部成功,要么全部失敗。但是,由于 RabbitMQ 是一個異步的消息隊列系統,事務處理可能會對其性能產生影響。因此,需要根據具體的應用場景和需求來權衡是否需要使用事務以及如何使用事務。
聊一聊常用的 RabbitMQ 插件
RabbitMQ 支持許多插件,這些插件可以擴展 RabbitMQ 的功能和特性。以下是一些常用的 RabbitMQ 插件:
- Management Plugin:RabbitMQ 管理插件提供了一個 Web 管理界面,用于監控和管理 RabbitMQ 服務器。可以查看隊列、交換機、連接、通道等的狀態,并進行配置和操作。
- Shovel Plugin:Shovel 插件用于將消息從一個 RabbitMQ 服務器傳遞到另一個 RabbitMQ 服務器,實現消息復制和跨集群通信。它可以用于實現數據復制、故障恢復、數據中心間同步等。
- Federation Plugin:Federation 插件允許不同 RabbitMQ 集群之間建立聯合,實現消息的跨集群傳遞。這對于構建分布式系統、將消息從一個地理位置傳遞到另一個地理位置非常有用。
- STOMP Plugin:STOMP插件允許使用 STOMP 協議與 RabbitMQ 進行通信。這對于使用非 AMQP 協議的客戶端與 RabbitMQ 交互非常有用,例如使用 WebSocket 的 Web 應用程序。
- Prometheus Plugin:Prometheus 插件用于將 RabbitMQ 的性能指標導出到 Prometheus 監控系統,以便進行性能監控和警報。
- Delayed Message Plugin:延遲消息插件允許發布延遲交付的消息,使你能夠在稍后的時間點將消息傳遞給消費者。這對于實現定時任務、延遲重試等場景非常有用。
這些插件提供了豐富的功能擴展,可以根據需求選擇并配置它們