消息隊列場景
什么是消息隊列?
??消息隊列是一個使用隊列來通信的組件,它的本質就是個轉發器,包含發消息、存消息、消費消息。
消息隊列怎么選型?
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
單機吞吐量 | 萬級 | 萬級 | 10萬級 | 10萬級 |
時效性 | 毫秒級 | 微秒級 | 毫秒級 | 毫秒級 |
可用性 | 高(主從) | 高(主從) | 非常高(分布式) | 非常高(分布式) |
消息重復 | 至少一次 | 至少一次 | 至少一次 最多一次 | 至少一次最多一次 |
消息順序性 | 有序 | 有序 | 有序 | 分區有序 |
支持主題數 | 千級 | 百萬級 | 千級 | 百級,多了性能嚴重下滑 |
消息回溯 | 不支持 | 不支持 | 支持(按時間回溯) | 支持(按offset回溯) |
管理界面 | 普通 | 普通 | 完善 | 普通 |
消息隊列使用場景有哪些?
- 異步處理:縮短用戶響應時間,提高系統吞吐量,各服務可獨立運行,互不干擾。
- 應用解耦:降低系統間耦合度,一個系統的變更或故障不易影響其他系統,提升系統可維護性與擴展性。
- 流量削峰:保護后端服務不被高流量沖垮,可按下游處理能力調節流量,避免系統崩潰 。
消息重復消費怎么解決?
??業務端對于已經消費成功的消息,保存在本地數據庫或Redis緩存業務中,進行業務表示,每次處理前先進行校驗,保證冪等性。
消息丟失怎么解決的?
??消息生產階段:只要能正常接收到MQ中間件的ack確認響應,就表示發送成功,所以只要處理號消息的返回值和異常,如果返回異常則進行消息重發,那么這個階段是不會出現消息丟失的。
??消息存儲階段:生產者在發布消息是,MQ中間件通常會寫入多個節點,也就是創建多個副本,即便其中一個節點掛掉,也能保證集群的數據不丟失。
??消息消費階段:消費者接收消息并處理消息之后,才回復ack的話,那么消息是不會丟失的。不能收到消息就會回ack,否則可能消息處理中途就掛掉了,消息便丟失了。
消息隊列的可靠性怎么保證?
??消息持久化:在系統崩潰、重啟或者網絡故障等情況下,未處理的消息不會丟失。
??消息確認機制:消費者在成功處理消息后,應該向消息隊列發送確認(ack)。消息隊列只有收到確認后,才會將消息從隊列中移除。如果沒有收到確認,消息隊列會在一定時間內重發消息給消費者。
??消息重試策略:當消費者處理消息失敗后,需要選擇合適的重試策略。可以是設置重試次數和重試間隔時間;也可以是發送消息到死信隊列中,以便后續的排查和處理。
消息隊列的順序性怎么保證?
??有序消息處理場景的識別:明確業務場景中哪些消息是需要保證順序的,對于需要順序處理的消息,要確保消息隊列和消費者能夠按照特定的順序進行處理。
??消息隊列對順序性的支持:Kafka可以通過將消息劃分到同一個分區(Partition)來保證消息在分區內是有序的,消費者按照分區順序讀取消息就可以保證消息順序。但這也可能會限制消息的并行處理程度,需要在順序性和吞吐量之間進行權衡。
??消費者順序處理消息:消費者在處理消息時,應該避免并發處理可能導致的打亂情況。可以使用單線程或者使用對順序消息進行串行化處理后的線程池等方法,確保消息按照正確的順序被消費。
如何保證冪等性?
冪等性:同一操作的多次執行對系統狀態的影響與一次執行結果一致。
實現冪等性的核心方案:
- 唯一標識(冪等鍵):客戶端為每一個請求生成全局唯一ID,服務端校驗該ID是否已處理,適用于場景:接口調用、消息消費等。
- 數據庫事務+樂觀鎖:通過版本號或狀態字段控制并發更新,確保多次更新等同于單次操作,適用場景:數據庫記錄更新(如余額扣減、訂單狀態變更)。
- 數據庫唯一約束:利用數據庫唯一索引防止重復數據寫入,適用場景:數據插入(如訂單創建)。
- 分布式鎖:通過鎖機制保證同一時刻僅有一個請求執行關鍵操作,適用場景:高并發下的資源搶奪(如秒殺)。
- 消息去重:消息隊列生產者為每一條消息生成唯一的消息ID,消費者在處理消息前,先檢查該消息ID是否已經處理過,如果已經處理過則丟棄該消息。
如何處理消息隊列的消息積壓問題?
原因:生產者的生產速度大于消費者的消費速度。
解決方案:
- 批量處理消息
- 增加Topic的隊列數和消費組機器的數量
- 臨時緊急擴容
臨時緊急擴容的大概思路:
1.先修復consumer消費者的問題,以確保其恢復消費速度,然后將現有consumer都停掉。
2.新建一個topic,partition是原來的10倍。臨時建立好原先10倍數量的queue。
3.寫一個臨時的分發數據的cunsumer程序,這個程序部署上去,消費積壓的數據,消費之后不做耗時的處理,直接輪詢寫入臨時建立好的10倍數量的queue。
4.接著臨時征用10倍的機器來部署consumer,每一批consumer消費一個臨時queue的數據。這個做法相當于是臨時將queue資源和consumer資源擴大10倍,以正常的10倍速度來消費數據。
5.等消息消費完積壓的數據后,恢復原先的部署架構,重新用原先的consumer機器來消費消息。
如何保證數據一致性,事務消息如何實現?
- 生產者產生消息,發送帶MQ服務器
- MQ收到消息后,將消息持久化到存儲系統。
- MQ服務器返回Ack到生產者。
- MQ服務器把消息push給消費者
- 消費者消費完消息,響應ACK
- MQ服務器收到ACK,認為消息消費成功,即在存儲中刪除消息。
- 生產者產生消息,發送一條半事務消息到MQ服務器
- MQ收到消息后,將消息持久化到存儲系統,這條消息的狀態是待發送狀態。
- MQ服務器返回ACK確認到生產者,此時MQ不會觸發消息推送事件
- 生產者執行本地事務
- 如果本地事務執行成功,即commit執行結果到MQ服務器;如果執行失敗,發送rollback。
- 如果是正常的commit,MQ服務器更新消息狀態為可發送;如果是rollback,即刪除消息。
- 如果消息狀態更新為可發送,則MQ服務器會push消息給消費者。消費者消費完就回ACK。
- 如果MQ服務器長時間沒有收到生產者的commit或者rollback,它會反查生產者,然后根據查詢到的結果執行最終狀態。
消息隊列是參考哪種設計模式?
觀察者模式:
觀察者模式實際上就是一對多的關系,即存在一個主題和多個觀察者,主題也是被觀察者,當主題發布消息時,會通知各個觀察者,觀察者將會收到最新消息。
發布訂閱模式
發布訂閱模式和觀察者模式的區別就是發布者和訂閱者完全解耦,通過中間的發布訂閱中心進行消息通知,發布者并不知道自己發布的消息會通知給誰。
讓你寫一個消息隊列,該如何進行架構設計?
- 首先是消息隊列的整體流程,producer發送消息給broker,broker存儲好,broker再發送給consumer消費,consumer回復消費確認等。
- producer發送消息給broker,broker發消息給consumer消費,那就需要兩次RPC了,RPC如何設計呢?可以參考開源框架Dubbo,你可以說說服務發現、序列化協議等等
- broker考慮如何持久化呢,是放文件系統還是數據庫呢,會不會消息堆積呢,消息堆積如何處理呢。
- 消費關系如何保存呢?點對點還是廣播方式呢?廣播關系又是如何維護呢?zk還是config server
- 消息可靠性如何保證呢?如果消息重復了,如何冪等處理呢?
- 消息隊列的高可用如何設計呢?可以參考Kafka的高可用保障機制。多副本 -> leader & follower -> broker掛了重新選舉leader即可對外服務。
- 消息事務特性,與本地業務同個事務,本地消息落庫;消息投遞到服務端,本地才刪除;定時任務掃描本地消息庫,補償發送。
- MQ得伸縮性和可擴展性,如果消息積壓或者資源不夠時,如何支持快速擴容,提高吞吐?可以參照一下Kafka的設計理念,broker -> topic -> partition,每個partition放一個機器,就存一部分數據。如果現在資源不夠了,簡單啊,給topic增加partition,然后做數據遷移,增加機器,不就可以存放更多數據,提供更高的吞吐量了。
RocketMQ
消息隊列為什么選擇RocketMQ的?
- 開發語言優勢:RocketMQ使用Java語言開發,更容易上手和閱讀源碼。
- 社區氛圍活躍:RocketMQ是阿里巴巴開源且內部在大量使用的消息隊列,是經得起考驗的,并且能夠針對線上的復雜環境提供相應的解決方案。
- 特性豐富:RocketMQ的高級特性達到了12種,例如順序消息、事務消息、消息過濾、定時消息等。豐富的特性,能夠為我們復雜的業務場景盡可能多地提供思路和解決方案。
RocketMQ和Kafka的區別是什么?如何做技術選型?
Kafka的優缺點:
- 優點:Kafka的最大優勢在于它的高吞吐量。Kafka支持集群部署,如果部分機器宕機不可用,則不影響Kafka的正常使用。
- 缺點:Kafka有可能造成數據丟失,因為它在收發消息的時候,并不是直接寫入物理磁盤中,而是寫入到磁盤緩沖區里面的。Kafka功能比較單一,主要就是支持收發消息,造成使用場景受限。
RocketMQ的優缺點:
- 優點:支持功能多,比如延遲隊列、消息事務等等,吞吐量也高,支持大規模集群部署,線性擴展方便,Java語言開發,滿足了國內絕大多數公司的技術棧。
- 缺點:性能相比Kafka弱一些,因為kafka用到了sendfile的零拷貝技術,而RockMQ主要是用mmap+write來實現零拷貝。
怎么選擇呢?
- 如果業務只是收發消息,而且允許小部分的數據丟失,但要求極高的吞吐量和高性能的話,選擇kafka。
- 如果公司需要通過mq實現一些業務需求,比如延遲隊列、消息事務等,且公司技術棧主要是Java的話,選RocketMQ
RocketMQ延時消息的底層原理
??broker在接受到消息的時候,會將延時消息存入到延時的Topic的隊列中,然后ScheduleMessageService對每個queue對應的定時任務不停的執行,檢查queue中哪些消息已到設定時間,然后轉發到消息的原始Topic,這些消息就會被各自的producer消費了。
RocektMQ怎么處理分布式事務?
??RocketMQ是一種最終一致性的分布式事務。
分布式事務的流程如上圖:
- A服務先發送個Half Message (是指暫不能被Consumer消費的消息。Producer已經把消息成功發送到了Broker端,但此消息被標記為暫不能投遞狀態,處于該種狀態下的消息稱為半消息。需要Producer對消息的二次確認后,Consumer才能去消費它) 給Brock端。
- 當A服務知道Half Message發送成功后,那么開始第3步執行本地事務。
- 執行本地事務(會有三種情況1、執行成功。2、執行失敗。3、網絡等原因導致沒有響應)
- 如果本地事務成功,那么Product像Brock服務器發送Commit,這樣B服務就可以消費該message。
- 如果本地事務失敗,那么Product像Brock服務器發送Rollback,那么就會直接刪除上面這條半消息。
- 如果因為網絡等原因遲遲沒有返回失敗還是成功,那么會執行RocketMQ的回調接口,來進行事務的回查。
從上面流程可以得知 只有A服務本地事務執行成功 ,B服務才能消費該message。
如果B最終執行失敗,幾乎可以斷定就是代碼有問題導致的異常,因為消費端RocketMQ有重試機制,如果不是代碼問題一般重試擊此就可以成功。
如果是代碼的原因引起多次重試失敗后,也沒有關系,將異常記錄下來,由人工處理。
RocketMQ消息順序怎么保證?
- RocketMQ采用了局部順序一致性的機制,實現了單個隊列中的消息嚴格有序。
- 在Producer(生產者)把一批需要保證順序的消息發送給同一個MessageQueue。
- Consumer(消費者)則通過加鎖的機制來保證消息消費的順序性,Broker端通過對MessageQueue進行加鎖,保證同一個MessageQueue只能被同一個Consumer進行消費。
RocketMQ怎么保證消息不被重復消費
??在業務邏輯中實現冪等性,確保即使消息被重復消費,也不會影響業務狀態。
RocketMQ消息積壓了,怎么辦?
- 擴展消費端的實例數來提升總體的消費能力。
- 如果短時間內沒有足夠的服務器資源進行擴容,可進行系統降級,通過關閉一些不重要的業務,減少發送方的數據量,最低限度地讓系統還能正常運轉,服務一些重要業務。