項目demo地址:https://github.com/tian-qingzhao/rabbitmq-demo
一、RabbitMQ組件概念
1.1 Server:接收客戶端的連接,實現AMQP實體服務。
1.2 Connection:連接
應用程序與Server的網絡連接,TCP連接。
1.3 Channel:信道
消息讀寫等操作在信道中進行。客戶端可以建立多個信道,每個信道代表一個會話任務。
1.4 Message:消息
應用程序和服務器之間傳送的數據,消息可以非常簡單,也可以很復雜。由Properties和Body組成。Properties為外包裝,
可以對消息進行修飾,比如消息的優先級、延遲等高級特性;Body就是消息體內容。
1.5 Virtual Host:虛擬主機
用于邏輯隔離。一個虛擬主機里面可以有若干個Exchange和Queue,同一個虛擬主機里面不能有相同名稱的Exchange或Queue。
1.6 Exchange:交換器
接收消息,按照路由規則將消息路由到一個或者多個隊列。如果路由不到,或者返回給生產者,或者直接丟棄。RabbitMQ常用的交換器常用類型有direct、topic、fanout、headers四種。
1.7 Binding:綁定
交換器和消息隊列之間的虛擬連接,綁定中可以包含一個或者多個RoutingKey。
1.8 RoutingKey:路由鍵
生產者將消息發送給交換器的時候,會發送一個RoutingKey,用來指定路由規則,這樣交換器就知道把消息發送到哪個隊列。
路由鍵通常為一個“.”分割的字符串,例如“com.rabbitmq”。
1.9 Queue:消息隊列
用來保存消息,供消費者消費。
二、RabbitMQ七種模式
2.1 Hello Work
生產者發布消息到隊列,不需要交換機,消費者接收。也是順序消費的一種。
2.2 Work Queue
生產者發布消息到隊列,多個消費者隨機消費消息,也不需要交換機,可以通過 channel.basicQos(1)
指定權重。
2.3 Publish/Subscribe
生產者發送消息到交換機,交換機綁定隊列,消費者可以從該隊列上消費消息,所有該隊列的消費者都可以獲取到消息。
生產者如果不想聲明隊列和隊列綁定到交換機的步驟,也可以在消費者去實現, 生產者只需要定義交換機即可,消費者從該交換機上拿消息。
2.4 Routing
和發布訂閱不同的是,生產者發送消息還是到交換機,交換機不是發送到所有隊列,
而是根據Routing Key參數選擇性的發送到對應規則的隊列上,也就是在綁定交換機和隊列關系的時候指定一個路由鍵參數。
生產者如果不想聲明隊列和隊列綁定到交換機的步驟,也可以在消費者去實現, 生產者只需要定義交換機即可,消費者從該交換機上拿消息。
交換機是不存儲消息的,交換機轉發給隊列之后,消息存儲在隊列上。如果交換機找不到對應的隊列,則會將該消息丟棄掉。
2.5 Topic
和Routing模式不同的是,Routing模式只支持路由鍵全詞匹配,而Topic模式則支持通配符匹配。
2.6 RPC
實現“發送請求 → 等待響應”的同步調用機制,盡管底層仍是異步消息通信。不是傳統意義上的 RPC(如 gRPC、Dubbo)。
基于消息隊列的請求-響應模型。使用 reply_to 和 correlation_id 實現請求與響應的匹配。
2.7 Publish/Confirms
發布確認模式,通過 channel.confirmSelect()
開啟確認。
單獨發布消息,同步等待確認:簡單,但吞吐量非常有限。
批量發布消息,等待批量的同步確認:簡單、合理的吞吐量,但是很難判斷出什么時候出了問題。
異步處理:最佳的性能和資源利用,良好的控制情況下的錯誤,但涉及到正確的實現,相對復雜。
三、RabbitMQ四種交換機類型
3.1 Direct exchange:直連交換機
是一種帶路由功能的交換機,一個隊列會和一個交換機綁定,除此之外再綁定一個routing_key,
當消息被發送的時候,需要指定一個binding_key,這個消息被送達交換機的時候,就會被這個交換機送到指定的隊列里面去。
同樣的一個binding_key也是支持應用到多個隊列中的。 這樣當一個交換機綁定多個隊列時,就會被送到對應的隊列去處理。
3.2 Fanout exchange:扇形交換機
是最基本的交換機類型,它能做的事非常簡單——廣播消息,
扇形交換機會把能接收到的消息全部發送給綁定在自己身上的隊列。
因為廣播不需要"思考"(不需要路由鍵),所以扇形交換機處理消息的速度也是所有的交換機類型里面最快的。
3.3 Topic exchange:主題交換機
直連交換機的routing_key方法非常簡單,如果希望將一條消息發送給多個隊列,
那么這個交換機需要綁定非常多的routing_key,這樣的話消息的管理就會非常的困難。
所以RabbitMQ提供了一種主題交換機,發送到主題交換機上的消息需要攜帶制定規則的routing_key,主題交換機會根據這個規則將數據發送到對應的隊列上。
主題交換機的routing_key需要有一定的規則,交換機和隊列綁定時候設置的binding_key需要采用*.#.*…的格式,每個部分用.分開,其中:
*表示一個單詞, #表示任意數量(零個或多個)單詞。 假設有一條消息的routing_key為com.lrving.www,
那么帶有這樣binding_key的幾個隊列都有收到消息:
- com…
- …www
- com.#
- …
3.4 Headers exchange:首部交換機
是忽略routing_key的一種路由方式。路由器和交換機路由的規則是通過Headers信息來交換的,
這個有點像HTTP請求中的請求頭。將一個交換機聲明成首部交換機,綁定一個隊列的時候,定義一個Hash的數據結構,消息發送的時候,
會攜帶一組hash數據結構的信息,當Hash內容匹配上的時候,消息就會被寫入隊列。
綁定交換機和隊列的時候,Hash結構中要求攜帶一個鍵"x-match",這個鍵的Value可以是any或者all,
這代表消息攜帶的Hash是需要全部匹配(all), 還是僅僅匹配一個鍵(any)
就可以了。相比較直連交換機,首部交換機的優勢是匹配的規則不被限定為字符串(string)。
四、RabbitMQ三種隊列
4.1 classic:經典隊列
是Rabbit MQ默認的隊列,在單機模式下是最常用的。
單節點存儲: Classic隊列存儲在單一的RabbitMQ節點上,消息不會跨節點復制,雖然提高了速度,但在節點故障時,隊列的容錯性較差。
FIFO消息處理: 消息按接收順序存儲和消費,尤其適合對消息順序要求嚴格的任務。
持久與非持久消息: Classic隊列可以存儲內存(瞬態)或磁盤(持久)的消息。持久化消息會被保存到磁盤,以確保在服務器重啟或崩潰后不會丟失消息,但性能會有所下降。
高吞吐量: Classic隊列經過優化,能夠以低延遲處理大量消息,適用于對消息處理速度要求嚴格的應用,如實時系統或日志聚合服務。
4.2 quorum:仲裁隊列
3.8版本引入的,Quorum是針對鏡像隊列的一種優化,目前已經取代了鏡像隊列,作為Rabbit MQ集群部署保證高可用性的解決方案。
傳統的鏡像隊列,是將消息副本存儲在一組節點上,以提高可用性和可靠性。鏡像隊列將隊列中的消息復制到一個或多個其他節點上,并使這些節點上的隊列保持同步。
當一個節點失敗時,其他節點上的隊列不受影響,因為它們上面都有消息的備份。
鏡像隊列使用主從模式,所有消息寫入和讀取均通過主節點,并異步復制到鏡像節點。主節點故障時需重新選舉,期間隊列不可用。而仲裁隊列基于Raft
分布式共識算法,所有節點組成仲裁組。消息需被多數節點持久化后才確認成功,Leader故障時自動觸發選舉。
相比較于傳統的主從模式,避免了發生網絡分區時的腦裂問題(基于Raft分布式共識算法避免)。
相比較于普通隊列,仲裁隊列增加了一個對于有毒消息的處理。什么是有毒消息?首先,消費者從隊列中獲取到了元素,隊列會將該元素刪除,
但是消費者消費失敗了,會給隊列nack,并且可以設置消息重新入隊。這樣可能存在因為業務代碼的問題,某條消息一直處理不成功的問題。
仲裁隊列會記錄消息的重新投遞次數,判斷是否超過了設置的閾值,Quorum隊列會持續跟蹤消息的失敗投遞嘗 試次數,
并記錄在"x-delivery-count"這樣一個頭部參數中。然后,就可以通過設置 Delivery limit參數來定制一個毒消息的刪除策略。
當消息的重復投遞次數超過了Delivery limit參數閾值時, 就直接丟棄或者放入死信隊列人工處理。
仲裁隊列適用于集群環境下,隊列長期存在,并且對于消息可靠性要求高,允許犧牲一部分性能(因為raft算法,消息需被多數節點持久化后才確認成功)的場景。
4.3 stream:流式隊列
3.9版本引入的,在傳統的隊列模型中,同一條消息只能被一個消費者消費(一個隊列如果有多個消費者,是工作分發的機制。
消息1->消費者1,消息2->消費者2,消息3->消費者1, 不能兩個消費者讀同一條消息。),并且消息是閱后即焚的(消費者接收到消息后,隊列中的該消息就刪除,
如果消費者拒絕簽收并且設置了重新入隊,再把消息重新放入隊列中),無法重復從隊列中獲取相同的消息。并且在當隊列中積累的消息過多時,性能下降會非常明顯。
Stream隊列正是解決了以上的這些問題。Stream隊列的核心是用aof文件的形式存儲隊列,將消息以aof的方式追加到文件中。
允許用戶在日志的任何一個連接點開始重新讀取數據。(需要用戶自己記錄偏移量)
五、死信隊列
目前只有classic和quorum隊列支持死信隊列,stream不支持死信隊列。死信隊列和普通隊列是一樣的。
死信隊列如果也配置了死信隊列,那么就會往下傳遞。
5.1 消費失敗或者拒絕消費
channel.basicNack(envelope.getDeliveryTag(), false, true);
channel.basicReject(deliveryTag, true)
5.2 超過消息的TTL(Time To Live)時間
消息超過 x-message-ttl 設置的超時時間(單位:毫秒)而沒有被消費。消息在隊列中保存時間超過這個TTL,即會被認為死亡。死亡的消
息會被丟入死信隊列,如果沒有配置死信隊列的話,RabbitMQ會保證死了的消息不會再次被投遞,并且在未來版本中,會主動刪除掉這些死掉的消息。
5.3 隊列達到最大長度
隊列超過 x-max-length 設置的最大長度之后,新來的消息會被處理為死信,消息也會進入死信隊列。
5.4 如何確認一個消息是不是死信
消息被作為死信轉移到死信隊列后,會在Header當中增加一些消息。在官網的詳細介紹中,可以看到很多內容,
比如時間、原因(rejected,expired,maxlen)、隊列等。然后header中還會加上第一次成為死信的三個屬性,
并且這三個屬性在以后的傳遞過程中都不會更改:
- x-first-death-reason
- x-first-death-queue
- x-first-death-exchange
六、消息確認的三種模式
6.1 NONE(自動確認/不確認)
消費者收到消息后即自動確認,無論消息是否正確處理,都不會進一步檢查,直接刪除隊列中的消息 。
可能導致某些情況下消息丟失(如消費者處理失敗時,RabbitMQ仍認為消息已成功處理)
6.2 AUTO(自動處理確認)
如果消費者使用了 autoAck=true
,那么 RabbitMQ 在投遞給消費者的同時就會立即刪除隊列里的消息(不管客戶端是否處理成功),
如果發生異常時消息重新回到隊列。在springboot場景下可以通過 setDefaultRequeueReject 方法設置是否重新入隊。
6.3 MANUAL(手動確認)
若拋異常,消息不會丟失,一直處Unacked狀態,消息不會再次發送給其他消費者。可選擇顯式關閉連接,消息會恢復到Ready狀態并重新投遞。消費者需要顯式調用ack
方法確認消息成功處理。如果消費者沒有確認(如拋出異常或未處理消息),消息會保持在未確認狀態(Unacked),不會再次投遞。關閉消費者連接時,未確認的消息會重新回到隊列中。
手動確認模式(MANUAL)適用于需要更精細控制的場景,能夠確保消息不會因為處理失敗而丟失。
七、懶隊列
RabbitMQ從3.6.0版本開始,就引入了懶隊列(Lazy Queue)的概念。懶隊列會盡可能早的將消息內容保存到硬盤當中,并且只有在用戶請求到時,才臨時從硬盤加載到RAM內存當中。
懶隊列的設計目標是為了支持非常長的隊列(數百萬級別)。隊列可能會因為一些原因變得非常長-也就是數據堆積。
消費者服務宕機了,有一個突然的消息高峰,生產者生產消息超過消費者
消費者消費太慢了,默認情況下,RabbitMQ接收到消息時,會保存到內存以便使用,同時把消息寫到硬盤。但是,消息寫入
硬盤的過程中,是會阻塞隊列的。RabbitMQ雖然針對寫入硬盤速度做了很多算法優化,但是在長隊列中,
依然表現不是很理想,所以就有了懶隊列的出現。
懶隊列會嘗試盡可能早的把消息寫到硬盤中,而在消費者消費到相應的消息時才會被加載到內存中。
這意味著在正常操作的大多數情況下,RAM中要保存的消息要少得多。當然,這是以增加磁盤IO為代價的。
聲明懶隊列有兩種方式:
- 給隊列指定參數
Map<String, Object> args = new HashMap<>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);
- 設定一個策略,在策略中指定queue-mode 為 lazy。
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"default"}' --apply-to queues
要注意的是,當一個隊列被聲明為懶隊列,那即使隊列被設定為不持久化,消息依然會寫入到硬盤中。如果是在集群模式中使用,這會給集群資源帶來很大的負擔。
最后一句話總結:懶隊列適合消息量大且長期有堆積的隊列,可以減少內存使用,加快消費速度。但是這是以大量消耗集群的網絡及磁盤IO為代價的。
八、聯邦插件
8.1 確認聯邦插件
rabbitmq-plugins list|grep federation
[ ] rabbitmq_federation
[ ] rabbitmq_federation_management
8.2 啟用聯邦插件
rabbitmq-plugins.bat enable rabbitmq_federation
8.3 啟用聯邦插件的管理平臺支持
rabbitmq-plugins.bat enable rabbitmq_federation_management
8.4 在RabbitMQ控制臺配置federation
打開下游RabbitMQ的控制臺,在admin標簽下,點擊Federation Upstreams,然后配置Name和URI即可,
這里的Name和URI填寫成上游的信息,可以理解成mysql的slave填寫master的信息。
8.5 測試是否配置成功
生產者發送消息往上游RabbitMQ發送消息,消費者使用下游RabbitMQ的信息進行接收消息。
九、消息分片存儲插件
Lazy Queue懶隊列機制提升了消息在RabbitMQ中堆積的能力,但是最終,消息還是需要消費者處理消化。
但是如何在消費者的處理能力有限的前提下提升消費者的消費速度呢?RabbitMQ提供的Sharding插件,就提供了一種思路。
對于RabbitMQ同樣,針對單個隊列,如何增加吞吐量呢? 消費者并不能對消息增加消費并發度,所以,
RabbitMQ的集群機制并不能增加單個隊列的吞吐量。
上面的懶隊列其實就是針對這個問題的一種解決方案。但是很顯然,懶隊列的方式屬于治標不治本。
真正要提升RabbitMQ單隊列的吞吐量,還是要從數據也就是消息入手,只有將數據真正的分開存儲才行。
RabbitMQ提供的Sharding插件,就是一個可選的方案。他會真正將一個隊列中的消息分散存儲到不同的節點上,
并提供多個節點的負載均衡策略實現對等的讀與寫功能。
9.1 啟用Sharding插件
rabbitmq-plugins enable rabbitmq_sharding
9.2 添加Sharding策略
在RabbitMQ控制臺的admin標簽下,點擊Policies,在 Add / update a policy 里面添加一個策略即可。
9.3 使用生產者發送消息
生產者只需要聲明交換機即可,交換機的類型為 x-modulus-hash
。
9.4 使用消費者消費消息
消費者只需要聲明隊列即可,但是隊列的名稱一定要寫成生產者的交換機名稱,并且有幾個分片,就要消費幾次。
sharding創建出來的分片就是一個一個的隊列,和自己定義的隊列沒什么區別,都可以直接使用,
但是在分片場景下,我們都盡量不要單獨去使用它,否則會讓sharding出現消費不均勻的情況。
十、保證消息不丟失
10.1 生產者端
RabbitMQ的生產者確認機制分為同步確認和異步確認。
同步確認主要是通過在生產者端使用 Channel.waitForConfirmsOrDie()
指定一個等待確認的完成時間。
異步確認機制則是通過 channel.addConfirmListener(ConfirmCallback var1, ConfirmCallback var2)
在生產者端注入兩個回調確認函數。
如果發送批量消息,在RabbitMQ中,另外還有一種手動事務的方式,可以保證消息正確發送。
手動事務機制主要有幾個關鍵的方法: channel.txSelect() 開啟事務; channel.txCommit() 提交事務;
channel.txRollback() 回滾事務; 用這幾個方法來進行事務管理。但是這種方式需要手動控制事務邏輯,并且手動事務會對channel產生阻塞,造成吞吐量下降。
10.2 消費者使用ack機制
消費者接收到消息之后,將autoAck改成false,使用手動應答的方式。
10.3 服務端
對應classic隊列,使用持久化機制,將消息存儲在磁盤上。而quorum和stream默認都是持久化的,所以不用考慮。
使用主從架構,對于普通集群,消息是分散存儲的,不會進行消息同步。而對于鏡像模式集群,可以在各個節點之間同步,丟失消息的概率不高。
十一、保證消息冪等
可以使用 AMQP.BasicProperties#messageId
設置一個消息的id,這個需要生產者自己設置成唯一標識,
然后在消費者端拿到 messageId 后做邏輯判斷。也可以在消息體放入一個唯一標識,比如訂單編號之類的字段。
十二、保證消息順序性
rabbitmq一個隊列對應多個消費者的場景下,保證不了消息的順序性問題,只能通過一個生產者、一個隊列、一個消費者去保證消息的順序,
因為消息進入隊列之后是先進先出的。
十三、解決消息堆積問題
13.1 生產者端
降低消息生產的速度
13.2 消費者端
增加消費者數量,但是一個隊列的消費者數量盡量不要超過5個消費者。另外可以調大每個消費者每次消費消息的數量,以及增加消費者的線程數量。
# 單次推送消息數量
spring.rabbitmq.listener.simple.prefetch=100
# 消費者的消費線程數量
spring.rabbitmq.listener.simple.concurrency=5
13.3 服務端
可以使用懶隊列,或者使用sharding分片隊列。