前言
MQ就是接收并轉發消息
核心概念
admin是用戶
每個虛擬機上都有多個交換機
快速入門
引入依賴
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.22.0</version></dependency>
生產者
建立連接
消費者
其實這里消費了兩條消息的,因為釋放太快了,有一條還來不及打印
如果沒有釋放資源的話
這兩個就會一直存在東西
connection關閉的話,channel會自動關閉的,所以會報錯這里
工作模式的介紹
官網
簡單模式,一個生產者,一個消費者
工作隊列模式:一個生產者,多個消費者,c1和c2共同消費p生產的消息
交換機類型
作?: ?產者將消息發送到Exchange, 由交換機將消息按?定規則路由到?個或多個隊列中(上圖中?產
者將消息投遞到隊列中, 實際上這個在RabbitMQ中不會發?. )
RabbitMQ交換機有四種類型: fanout,direct, topic, headers, 不同類型有著不同的路由策略. AMQP協
議?還有另外兩種類型, System和?定義, 此處不再描述.
- Fanout:?播,將消息交給所有綁定到交換機的隊列(Publish/Subscribe模式),就是消息會重復消費的,交換機是廣播類型
- Direct:定向,把消息交給符合指定routing key的隊列(Routing模式)
- Topic:通配符,把消息交給符合routing pattern(路由模式)的隊列(Topics模式),就是bingdingkey是通配符,但是routingkey一定是字符串,看匹不匹配
- headers類型的交換器不依賴于路由鍵的匹配規則來路由消息, ?是根據發送的消息內容中的
headers屬性進?匹配. headers類型的交換器性能會很差,?且也不實?,基本上不會看到它的存在.
Exchange(交換機)只負責轉發消息, 不具備存儲消息的能?, 因此如果沒有任何隊列與Exchange綁
定,或者沒有符合路由規則的隊列,那么消息就會丟失
RoutingKey: 路由鍵.?產者將消息發給交換器時, 指定的?個字符串, ?來告訴交換機應該如何處理這個消息.
Binding Key:綁定. RabbitMQ中通過Binding(綁定)將交換器與隊列關聯起來, 在綁定的時候?般會指
定?個Binding Key, 這樣RabbitMQ就知道如何正確地將消息路由到隊列了
就是看routingkey匹配哪個bindingkey,匹配哪個的話,就去哪個隊列
當消息的Routing key與隊列綁定的Bindingkey相匹配時,消息才會被路由到這個隊列
BindingKey其實也屬于路由鍵中的?種, 官?解釋為:the routingkey to use for the binding.
可以翻譯為:在綁定的時候使?的路由鍵. ?多數時候,包括官??檔和RabbitMQJava API 中都把
BindingKey和RoutingKey看作RoutingKey, 為了避免混淆,可以這么理解:
- 在使?綁定的時候,需要的路由鍵是BindingKey.
- 在發送消息的時候,需要的路由鍵是RoutingKey
第三個,發布訂閱模式,x表示的是交換機,有特殊作用,每個生產者都收到的相同消息
也是廣播模式
第四個路由模式,增加了routingkey,發布訂閱模式沒有bingdingkey,會轉發到匹配的隊列,發布訂閱模式的升級
第五個是通配符模式,是路由模式的升級,就是bindingkey支持通配符
星表示一個單詞,#表示多個單詞
第六個是RPC模式,RPC通信
- 客?端發送消息到?個指定的隊列, 并在消息屬性中設置replyTo字段, 這個字段指定了?個回調隊
列, ?于接收服務端的響應. - 服務端接收到請求后, 處理請求并發送響應消息到replyTo指定的回調隊列
- 客?端在回調隊列上等待響應消息. ?旦收到響應,客?端會檢查消息的correlationId屬性,以
確保它是所期望的響應
第七個發布確認模式
Publisher Confirms模式是RabbitMQ提供的?種確保消息可靠發送到RabbitMQ服務器的機制。在這
種模式下,?產者可以等待RabbitMQ服務器的確認,以確保消息已經被服務器接收并處理.
- ?產者將Channel設置為confirm模式(通過調?channel.confirmSelect()完成)后, 發布的每?條消
息都會獲得?個唯?的ID, ?產者可以將這些序列號與消息關聯起來,以便跟蹤消息的狀態. - 當消息被RabbitMQ服務器接收并處理后,服務器會異步地向?產者發送?個確認(ACK)給?產者
(包含消息的唯?ID),表明消息已經送達
通過Publisher Confirms模式,?產者可以確保消息被RabbitMQ服務器成功接收, 從?避免消息丟失
的問題
是生產者和隊列之間的保證
工作隊列模式
簡單模式中的生產者,就是把消息轉發到和routingkey相同的隊列中
多個消費者
因為使用的是默認交換機,所以就可以根據routingkey找到相同隊列名稱
先啟動消費者,因為先啟動生產者的話,消費者一個就一下消費完了
而且消費者要關閉close
工作隊列模式就是共同消費的意思
一個生產者,一個隊列,多個消費者,默認交換機
發布訂閱模式
多了exchange的角色
在發布/訂閱模型中,多了?個Exchange??.
Exchange 常?有三種類型, 分別代表不同的路由規則
a) Fanout:?播,將消息交給所有綁定到交換機的隊列(Publish/Subscribe模式)
b) Direct:定向,把消息交給符合指定routing key的隊列(Routing模式)
c) Topic:通配符,把消息交給符合routing pattern(路由模式)的隊列(Topics模式)
也就分別對應不同的?作模式
就是廣播了
與上面的區別就是生產者要聲明交換機,然后隊列與交換機的綁定關系
發布訂閱模式就是把收到的消息全部轉發到所有的隊列中,每個隊列都有這個消息,所以routingkey為空
消費者的話,就是兩個消費者分別消費兩個隊列
路由模式
發布訂閱模式就是會把消息發給所有和這個交換機綁定的隊列
而路由模式交換機類型不同,第二就是與隊列建立綁定關系的時候,要建立bindingkey
第三就是發送消息的時候,要指定routingkey,routingkey與bindingkey一致的時候就會發送了
第三個參數就是bindingkey
通配符模式
在topic類型的交換機在匹配規則上, 有些要求:
- RoutingKey 是?系列由點( . )分隔的單詞, ?如 " stock.usd.nyse ", " nyse.vmw ",
" quick.orange.rabbit " - BindingKey 和RoutingKey?樣, 也是點( . )分割的字符串.
- Binding Key中可以存在兩種特殊字符串, ?于模糊匹配
? * 表??個單詞,不是一個字母
? # 表?多個單詞(0-N個)
消費者獲取到消息,默認就刪除了
RPC通信-客戶端
RPC(Remote Procedure Call), 即遠程過程調?. 它是?種通過?絡從遠程計算機上請求服務, ?不需要了解底層?絡的技術. 類似于Http遠程調?.
RabbitMQ實現RPC通信的過程, ?概是通過兩個隊列實現?個可回調的過程
RabbitMQ 消息確定機制
在RabbitMQ中,basicConsume?法的autoAck參數?于指定消費者是否應該?動向消息隊列確認
消息
?動確認(autoAck=true): 消息隊列在將消息發送給消費者后, 會?即從內存中刪除該消息. 這意味
著, 如果消費者處理消息失敗,消息將丟失,因為消息隊列認為消息已經被成功消費
?動確認(autoAck=false): 消息隊列在將消息發送給消費者后,需要消費者顯式地調?basicAck
?法來確認消息. ?動確認提供了更?的可靠性, 確保消息不會被意外丟失, 適?于消息處理重要且需
要確保每個消息都被正確處理的場景
發布確認
就是最后一個模式
作為消息中間件, 都會?臨消息丟失的問題.
消息丟失?概分為三種情況:
- ?產者問題. 因為應?程序故障, ?絡抖動等各種原因, ?產者沒有成功向broker發送消息.
- 消息中間件??問題. ?產者成功發送給了Broker, 但是Broker沒有把消息保存好, 導致消息丟失.
- 消費者問題. Broker 發送消息到消費者, 消費者在消費消息時, 因為沒有處理好, 導致broker將消費
失敗的消息從隊列中刪除了
RabbitMQ也對上述問題給出了相應的解決?案. 問題2可以通過持久化機制. 問題3可以采?消息應答機制. (后?詳細講)
針對問題1, 可以采?發布確認(Publisher Confirms)機制實現.
?產者將信道設置成confirm(確認)模式, ?旦信道進?confirm模式, 所有在該信道上?發布的消息都
會被指派?個唯?的ID(從1開始), ?旦消息被投遞到所有匹配的隊列之后, RabbitMQ就會發送?個確認給?產者(包含消息的唯?ID), 這就使得?產者知道消息已經正確到達?的隊列了, 如果消息和隊列是可持久化的, 那么確認消息會在將消息寫?磁盤之后發出. broker回傳給?產者的確認消息中
deliveryTag 包含了確認消息的序號, 此外 broker 也可以設置channel.basicAck?法中的multiple參
數, 表?到這個序號之前的所有消息都已經得到了處理
第一個是批量確認,第二個不是
發送?確認機制最?的好處在于它是異步的, ?產者可以同時發布消息和等待信道返回確認消息.
- 當消息最終得到確認之后, ?產者可以通過回調?法來處理該確認消息.
- 如果RabbitMQ因為??內部錯誤導致消息丟失, 就會發送?條nack(Basic.Nack)命令, ?產者同樣
可以在回調?法中處理該nack命令.
使?發送確認機制, 必須要信道設置成confirm(確認)模式.
發布確認是 AMQP 0.9.1 協議的擴展, 默認情況下它不會被啟?. ?產者通過channel.confirmSelect()將信道設置為confirm模式
有三種確認方式,單獨確認,批量確認(就是發布數量達到一定數量的時候開始去確認),異步確認(另外開一個線程卻確認)
可以去看官網
單獨確認
這個確認的話,是虛擬機自己確認的,不關我們的事,不關消費者的事,這個一個一個確認就很慢
如果隊列還要持久化這個數據的話,就更慢了
批量確認
異步確認
異步confirm?法的編程實現最為復雜. Channel 接?提供了?個?法addConfirmListener. 這個?法
可以添加ConfirmListener 回調接?.
ConfirmListener 接?中包含兩個?法: handleAck(long deliveryTag, boolean
multiple) 和 handleNack(long deliveryTag, boolean multiple) , 分別對應處理
RabbitMQ發送給?產者的ack和nack.
deliveryTag 表?發送消息的序號. multiple 表?是否批量確認
我們需要為每?個Channel 維護?個已發送消息的序號集合. 當收到RabbitMQ的confirm 回調時, 從集
合中刪除對應的消息. 當Channel開啟confirm模式后, channel上發送消息都會附帶?個從1開始遞增的
deliveryTag序號. 我們可以使?SortedSet 的有序性來維護這個已發消息的集合.
- 當收到ack時, 從序列中刪除該消息的序號. 如果為批量確認消息, 表??于等于當前序號
deliveryTag的消息都收到了, 則清除對應集合 - 當收到nack時, 處理邏輯類似, 不過需要結合具體的業務情況, 進?消息重發等操作
可以一下發布多個消息,然后multiple =true就是批量確認,deliveryTag =3,就是確認它之前的,包括它,要么multiple =false,就是確認單個
如果數據量很大的話,異步確認就要比批量確認快得多