文章目錄
- 前言
- 延遲隊列介紹
- ttl+死信隊列存在問題
- 延遲隊列插件安裝
- 延遲插件使用
- 事務
- 消息分發概念介紹
- 限流
- 非公平分發(負載均衡)
- 限流
- 負載均衡
- RabbitMQ應用問題-冪等性保障
- 順序性保障介紹1
- 順序性保障介紹2
- 消息積壓
- 總結
前言
延遲隊列介紹
延遲隊列(Delayed Queue),即消息被發送以后, 并不想讓消費者?刻拿到消息, ?是等待特定時間后,消費者才能拿到這個消息進?消費
延遲隊列的使?場景有很多, ?如:
- 智能家居: ??希望通過?機遠程遙控家?的智能設備在指定的時間進??作. 這時候就可以將??指令發送到延遲隊列, 當指令設定的時間到了再將指令推送到智能設備.
- ?常管理: 預定會議后,需要在會議開始前?五分鐘提醒參會?參加會議
- ??注冊成功后, 7天后發送短信, 提???活躍度等
RabbitMQ本?沒有直接?持延遲隊列的的功能, 但是可以通過前?所介紹的TTL+死信隊列的?式組合模擬出延遲隊列的功能.
假設?個應?中需要將每條消息都設置為10秒的延遲, ?產者通過 normal_exchange 這個交換器將
發送的消息存儲在 normal_queue 這個隊列中. 消費者訂閱的并?是 normal_queue 這個隊列, ?
是 dlx_queue 這個隊列. 當消息從 normal_queue 這個隊列中過期之后被存? dlx_queue 這個
隊列中,消費者就恰巧消費到了延遲10秒的這條消息
就是消費ttl到了的死信隊列的信息
這個就是死信隊列就是一個延遲10s的延遲隊列
這樣就成功了
ttl+死信隊列存在問題
問題就是消息的ttl,先發送一個20s的,再來10s的,那么10s就不會立馬過期,第一個過期了,第二個才會過期,到達消費端的時候,才會去判定過沒過期
如果是隊列的ttl就沒事了
所以這樣的話,過期時間都變成一樣的了
如果先10s后20s就沒事了
延遲隊列插件安裝
官網安裝
點擊就可以跳轉了
點擊版本,下載ez
然后要把插件放在linux下載的rabbitmq的文件下
第一個就是ubuntu的
我們把插件放在這個目錄下面就可以了
兩個目錄放一個就可以了
/usr/lib/rabbitmq/plugins 是?個附加?錄, RabbitMQ包本?不會在此安裝任何內容, 如果
沒有這個路徑, 可以??進?創建
然后把ez包弄在這個目錄下
#查看插件列表
rabbitmq-plugins list
#啟動插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 禁用插件
rabbitmq-plugins disable rabbitmq_delayed_message_exchange
#重啟服務
service rabbitmq-server restart
可以看到就放進來了
可以看到版本對不上,我們換一個版本3.9.27
這個時候我們的交換機就多了一個類型了
就是延遲隊列類型
延遲插件使用
我們只需要在聲明交換機的時候多聲明.delayed()這個參數就可以了
然后就是生產者,就不是說明過期時間了,而是說明延遲時間setDelayLong
然后再寫消費者
這個消息就不會亂序了
這個延遲隊列的原理是
把消息暫停在交換機上,到了時間后才會到達隊列,不信的話,就把消費者注釋一下就知道了
事務
RabbitMQ是基于AMQP協議實現的, 該協議實現了事務機制, 因此RabbitMQ也?持事務機制. Spring
AMQP也提供了對事務相關的操作. RabbitMQ事務允許開發者確保消息的發送和接收是原?性的, 要么全部成功, 要么全部失敗
這種發送消息的話,就只會發送成功一個消息,就是第一個消息,這個就是沒有采用事務的方式
要啟動事務的話,就要對template進行單獨的配置,不然每個請求都有事務了
還是自己構建一個template
這樣我們就創建了一個有事務的template了,開啟setChannelTransacted即可
然后還要在方法的上面加上注解Transactional,這樣就開啟事務了
這樣兩條消息都不會發送成功了
但是這樣還是不行的,還要做一點
我們還要創建一個rabbitmq的事務管理器,加入bean
這樣就可以完成事務了
只有這三個條件同時滿足的時候,那么這兩個才不會同時發送
如果三個條件少了一個,那么都不是rabbitmq的事務,都會發送成功一個
1.不加 @Transactional , 會發現消息1發送成功
2. 添加 @Transactional , 消息1和消息2全部發送失敗
而且值得注意的就是rabbitmq的事務與手動確認模式和發布確認模式是矛盾的,意思是不能同時存在和使用
所以最好改為自動確認
acknowledge-mode: auto # 使用自動確認(配合事務)publisher-confirm-type: none # 關閉發布確認(使用事務保證)
就是這樣
那么就有效果了
消息分發概念介紹
RabbitMQ隊列擁有多個消費者時, 隊列會把收到的消息分派給不同的消費者. 每條消息只會發送給訂閱列表?的?個消費者. 這種?式?常適合擴展, 如果現在負載加重,那么只需要創建更多的消費者來消費處理消息即可.默認情況下, RabbitMQ是以輪詢的?法進?分發的, ?不管消費者是否已經消費并已經確認了消息. 這種?式是不太合理的, 試想?下, 如果某些消費者消費速度慢, ?某些消費者消費速度快, 就可能會導致某些消費者消息積壓, 某些消費者空閑, 進?應?整體的吞吐量下降.
如何處理呢? 我們可以使?前?章節講到的channel.basicQos(int prefetchCount) ?法, 來限制當前信道上的消費者所能保持的最?未確認消息的數量
?如: 消費端調?了 channelbasicQos(5) , RabbitMQ會為該消費者計數, 發送?條消息計數+1, 消費?條消息計數-1, 當達到了設定的上限, RabbitMQ就不會再向它發送消息了,直到消費者確認了某條消息.類似TCP/IP中的"滑動窗?".prefetchCount 設置為0時表?沒有上限.basicQos 對拉模式的消費?效(后?再講)
消息分發的常?應?場景有如下:
- 限流
- ?公平分發
限流
訂單系統每秒最多處理5000請求, 正常情況下, 訂單系統可以正常滿?需求
但是在秒殺時間點, 請求瞬間增多, 每秒1萬個請求, 如果這些請求全部通過MQ發送到訂單系統, ?疑會把訂單系統壓垮
RabbitMQ提供了限流機制, 可以控制消費端?次只拉取N個請求
通過設置prefetchCount參數, 同時也必須要設置消息應答?式為?動應答
prefetchCount: 控制消費者從隊列中預取(prefetch)消息的數量, 以此來實現流控制和負載均衡
非公平分發(負載均衡)
我們也可以?此配置,來實現"負載均衡"
如下圖所?, 在有兩個消費者的情況下,?個消費者處理任務?常快, 另?個?常慢,就會造成?個消費者會?直很忙, ?另?個消費者很閑. 這是因為 RabbitMQ 只是在消息進?隊列時分派消息. 它不考慮消費者未確認消息的數量
我們可以使?設置prefetch=1 的?式, 告訴 RabbitMQ ?次只給?個消費者?條消息, 也就是說, 在處理并確認前?條消息之前, 不要向該消費者發送新消息. 相反, 它會將它分派給下?個不忙的消費者
限流
配置prefetch參數, 設置應答?式為?動應答
channel.basicQos(int prefetchCount)這個是sdk提供的方法
這個就表示每一個消費者,未確認就只能為五個
我們這個消費者一直沒有確認的話,那么就會有15條消息沒有發過來
這個的意思就是隊列中有15條消息,消費者有五條未確認的消息,總共20條消息
剩下的15條消息就不會給消費者了
沒有限流的話,20條消息就全部一下子去給消費者了
限流就是限制消費者最多獲得的資源數
負載均衡
設置為1,意思就是干完一個才能干下一個—》這樣就不會擠壓了
設置為2的意思就是一次給你兩個任務,干完了才能領取下一次的兩個任務
然后現在改為兩個消費者
一個快點,立馬確認,一個慢點,一直不確認
就變成這樣了
再改一下
這樣就是負載均衡了
看得出來111每處理一個222就要處理兩個
那個tag是每個信道的tag,左邊的test編號才是資源的編號
每個信道的tag是互不干擾的
RabbitMQ應用問題-冪等性保障
冪等性是數學和計算機科學中某些運算的性質, 它們可以被多次應?, ?不會改變初始應?的結果.
應?程序的冪等性介紹
在應?程序中, 冪等性就是指對?個系統進?重復調?(相同參數), 不論請求多少次, 這些請求對系統的影響都是相同的效果.
?如數據庫的 select 操作. 不同時間兩次查詢的結果可能不同, 但是這個操作是符合冪等性的. 冪等
性指的是對資源的影響, ?不是返回結果. 查詢操作對數據資源本?不會產?影響, 之所以結果不同, 可能是因為兩次查詢之間有其他操作對資源進?了修改.
?如 i++ 這個操作, 就是?冪等性的. 如果調??沒有控制好邏輯, ?次流程重復調?好?次, 結果
就會不同
MQ的冪等性介紹
對于MQ??, 冪等性是指同?條消息, 多次消費, 對系統的影響是相同的.
?般消息中間件的消息傳輸保障分為三個層級.
- At most once:最多?次. 消息可能會丟失,但絕不會重復傳輸.
- At least once:最少?次. 消息絕不會丟失,但可能會重復傳輸.–》要保證冪等性
- Exactly once:恰好?次. 每條消息肯定會被傳輸?次且僅傳輸?次.
RabbitMQ?持"最多?次"和"最少?次".
對于"恰好?次", ?前RabbitMQ還做不到, 不僅是RabbitMQ, ?前市?上主流的消息中間件, 都做不到這?點.
在業務使?中, 對于可靠性要求?較?的場景, 建議使?"最少?次", 以防?消息丟失. “最多?次” 會因為消息發送過程中, ?絡問題, 消費出現異常等種種原因, 導致消息丟失.
以下場景可能會導致消息發送重復(包含但不限于)
? 發送時消息重復: 當?條消息已被成功發送到服務端并完成持久化, 此時出現了?絡閃斷或者客?
端宕機, 導致服務端對客?端應答失敗. 如果此時Producer意識到消息發送失敗并嘗試再次發送消
息, Consumer后續會收到兩條內容相同并且Message ID也相同的消息.
? 投遞時消息重復: 消息消費的場景下, 消息已投遞到Consumer并完成業務處理, 當客?端給服務端
反饋應答的時候?絡閃斷. 為了保證消息?少被消費?次, 云消息隊列 RabbitMQ 版的服務端將在
?絡恢復后再次嘗試投遞之前已被處理過的消息, Consumer后續會收到兩條內容相同并且
Message ID也相同的消息
但是"最少?次", 就會造成?個問題, 消費端會收到重復的消息, 也會造成對同?條消息進?多次處理. ?些不重要的業務還好?點, 對于重要的業務, 如果不對重復的消息進?處理, 會造成嚴重事故.
?如: 當??對?個訂單付款之后, 因為?絡問題, 付款成功的結果未返回給訂單系統, 當??再次點擊付款時, 如果系統未做冪等性處理, 那就會造成兩次扣款
MQ消費者的冪等性的解決?法, ?般有以下?種:
全局唯?ID
- 為每條消息分配?個唯?標識符, ?如UUID或者MQ消息中的唯?ID,但是?定要保證唯?性.
- 消費者收到消息后, 先?該id判斷該消息是否已經消費過, 如果已經消費過, 則放棄處理.
- 如果未消費過, 消費者開始消費消息, 業務處理成功后, 把唯?ID保存起來(數據庫或Redis等)
可以使?Redis 的原?性操作setnx來保證冪等性, 將唯?ID作為key放到redis中 (SETNX
messageID 1) . 返回1, 說明之前沒有消費過, 正常消費. 返回0, 說明這條消息之前已消費過, 拋
棄.
SETNX = set if not exsts
業務邏輯判斷
在業務邏輯層?實現消息處理的冪等性.
例如: 通過檢查數據庫中是否已存在相關數據記錄, 或者使?樂觀鎖機制來避免更新已被其他事務更改的數據, 再或者在處理消息之前, 先檢查相關業務的狀態, 確保消息對應的操作尚未執?, 然后才進?處理, 具體根據業務場景來處理
順序性保障介紹1
消息的順序性是指消費者消費的消息和?產者發送消息的順序是?致的.
?如?產者發送的消息分別是msg1, msg2, msg3, 那么消費者也是按照msg1, msg2, msg3的順序進?消費的.
很多業務場景下, 消息的消費是不?保證順序的, ?如使?MQ實現訂單超時的處理. 但有些業務場景, 可能存在多個消息順序處理的情況. ?如??信息修改, 對同?個??的同?個資料進?修改, 需要保證消息的順序
?些資料顯?RabbitMQ的消息能夠保障順序性, 這是不嚴謹的. 在不考慮消息丟失, ?絡故障等異常的情況下, 如果只有?個消費者, 最好也只有?個?產者的情況下, 是可以保證消息的順序性**. 如果有多個?產者同時發送消息, ?法確定消息到達RabbitMQ Broker的前后順序, 也就?法驗證消息的順序性**.哪些情況可能會打破RabbitMQ的順序性呢? 下?介紹?種常?的場景:一個生產者
- 多個消費者: 當隊列配置了多個消費者時, 消息可能會被不同的消費者并?處理, 從?導致消息處理
的順序性?法保證. - ?絡波動或異常: 在消息傳遞過程中, 如果出現?絡波動或異常, 可能會導致消息確認(ACK) 丟失, 從?使得消息被重新?隊和重新消費, 造成順序性問題.
- 消息重試:如果消費者在處理消息后未能及時發送確認, 或者確認消息在傳輸過程中丟失, 那么MQ可能會認為消息未被成功消費?進?重試, 這也可能導致消息處理的順序性問題.
- 消息路由問題: 在復雜的路由場景中, 消息可能會根據路由鍵被發送到不同的隊列, 從??法保證全
局的順序性. - 死信隊列: 消息因為某些原因(如消費端拒絕消息)被放?死信隊列, 死信隊列被消費時, ?法保證消息的順序和?產者發送消息的順序?致
包括但不僅限于以上?種情形會使RabbitMQ消息錯序, 如果要保證消息的順序性, 需要業務?使?
RabbitMQ之后做進?步的處理
順序性保障介紹2
消息順序性保障分為: 局部順序性保證和全局順序性保證.
局部順序性通常指的是在單個隊列內部保證消息的順序. 全局順序性是指在多個隊列或多個消費者之間保證消息的順序.
在實際應?中, 全局順序性很難實現, 可以考慮使?業務邏輯來保證順序性, ?如在消息中嵌?序列號,并在消費端進?排序處理. 相對??, 局部順序性更常?, 也更容易實現.
RabbitMQ作為?個分布式消息隊列, 主要優化的是吞吐量和可?性, ?不是嚴格的順序性保證. 如果業務場景確實需要嚴格的消息順序, 可能需要在應?層?進?額外的設計和實現.
接下來說?下消息的順序性保證的常?策略.
-
單隊列單消費者
最簡單的?法是使?單個隊列, 并由單個消費者進?處理. 同?個隊列中的消息是先進先出的, 這是
RabbitMQ來幫助我們保證的 -
分區消費
單個消費者的吞吐太低了, 當需要多個消費者以提?處理速度時, 可以使?分區消費. 把?個隊列分割成多個分區, 每個分區由?個消費者處理, 以此來保持每個分區內消息的順序性.
?如??修改資料后, 發送?條??資料消息. 消費者在處理時, 需要保證消息發送的先后順序
但這種場合并不需要保證全局順序. 只需要保證同?個??的消息順序消費就可以. 這時候就可以采
?把消費按照?定的規則, 分為多個區, 每個分區由?個消費者處理
RabbitMQ本?并不?持分區消費, 需要業務邏輯去實現, 或者借助spring-cloud-stream來實現
參考: https://docs.spring.io/spring-cloud-stream/reference/rabbit/rabbit_partitions.html
-
消息確認機制
使??動消息確認機制, 消費者在處理完?條消息后, 顯式地發送確認, 這樣RabbitMQ才會移除并繼續發送下?條消息. -
業務邏輯控制
在某些情況下, 即使消息亂序到達, 也可以在業務邏輯層?實現順序控制. ?如通過在消息中嵌?序列
號, 并在消費時根據這些信息來處理
RabbitMQ本?并不保證全局的嚴格順序性, 特別是在分布式系統中. 在實際應?開發中, 根據具體的業務需求, 可能需要結合多種策略來實現所需要的順序保證
消息積壓
原因分析
消息積壓是指在消息隊列(如RabbitMQ)中, 待處理的消息數量超過了消費者處理能?, 導致消息在隊列中不斷堆積的現象.
通常有以下?種原因:
- 消息?產過快: 在?流量或者?負載的情況下, ?產者以極?的速率發送消息, 超過了消費者的處理
能?. - 消費者處理能?不?: 消費者處理處理消息的速度跟不上消息?產的速度, 也會導致消息在隊列中積壓.
可能原因有:
- 消費端業務邏輯復雜, 耗時?
- 消費端代碼性能低
- 系統資源限制, 如CPU、內存、磁盤I/O等也會限制消費者處理消息的效率.
- 異常處理處理不當. 消費者在處理消息時出現異常, 導致消息?法被正確處理和確認.
- ?絡問題: 因為?絡延遲或不穩定, 消費者?法及時接收或確認消息, 最終導致消息積壓
- RabbitMQ 服務器配置偏低
消息積壓可能會導致系統性能下降, 影響??體驗, 甚?導致系統崩潰. 因此, 及時發現消息積壓并解決對于維護系統穩定性?關重要
解決?案
遇到消息積壓時, ?先要分析消息積壓造成的原因. 根據原因來調整策略.
主要從以下?個??來解決:
- 提?消費者效率
a. 增加消費者實例數量, ?如新增機器
b. 優化業務邏輯, ?如使?多線程來處理業務
c. 設置prefetchCount, 當?個消費者阻塞時, 消息轉發到其他未阻塞的消費者.
d. 消息發?異常時, 設置合適的重試策略, 或者轉?到死信隊列 - 限制?產者速率. ?如流量控制, 限流算法等.
a. 流量控制: 在消息?產者中實現流量控制邏輯, 根據消費者處理能?動態調整發送速率
b. 限流: 使?限流?具, 為消息發送速率設置?個上限
c. 設置過期時間. 如果消息過期未消費, 可以配置死信隊列, 以避免消息丟失, 并減少對主隊列的壓
? - 資源與配置優化. ?如升級RabbitMQ服務器的硬件, 調整RabbitMQ的配置參數等
上述這些策略選擇時, 需要綜合考慮業務需求和系統的實際承載能?