RocketMQ
- 基本架構
- 消息模型
- 消費者消費消息模式
- 順序消息機制
- 延遲消息
- 批量消息
- 事務消息
- 消息重試
- 最佳實踐
基本架構
nameServer: 維護broker列表信息,客戶端連接時只需要連接nameServer。可配置成集群。
broker:broker分為master和slave,master負責消息的發送和消費,slave是master的備份。master-slaver的集群方式,master掛掉時候slave不能主動轉換為master提供服務(5.X版本后可以通過配置實現mater掛掉后slave轉為master提供服務)。
leader-follower的集群方式,即高可用集群,各個broker是對等的,通過選舉產生leader(在dashboart中顯示為master),如果leader掛掉,在剩下的follower(顯示為slave)中選舉再產生新的leader。注意,只有超過半數的幾點存活,才能選舉出leader。
消息模型
?產者和消費者都可以指定?個Topic發送消息或者拉取消息。?Topic是?個邏輯概念。
Topic中的消息會分布在后?多個MessageQueue當中。這些MessageQueue會分布到?個或者多個broker中。
消費者消費消息模式
廣播模式:所有關注topic的消費者都收到消息。廣播模式下消息隊列的消費位點由客戶端自己維護,消費失敗服務端不會重發。
集群模式:同一個消費者組只有一個成員收到消息。集群模式下消費點位由服務端維護,消費者組的所有成員共用一個位點,消費失敗服務端會重發。
順序消息機制
- ?產者只有將?批有順序要求的消息,放到同?個MesasgeQueue上,通過MessageQueue的FIFO特性保證這?批消息的順序。如果不指定MessageSelector對象,
那么?產者會采?輪詢的?式將多條消息依次發送到不同的MessageQueue上。 - 消費者需要實現MessageListenerOrderly接?,實際上在服務端,處理MessageListenerOrderly時,會給?個MessageQueue加鎖,拿到MessageQueue上所有的消息,然后再去讀取下?個MessageQueue的消息。
- 消費消息失敗時,不建議拋出異常,可以返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT作為替代。因為消費者端只進?有限次數的重試。如果?條消息處理失敗,RocketMQ會將后續消息阻塞住,讓消費者進?重試。但是,如果消費者?直處理失敗,超過最?重試次數,那么RocketMQ就會跳過這?條消息,處理后?的消息,這會造成消息亂序。
延遲消息
- 定固定的延遲級別:對于指定固定延遲級別的延遲消息,RocketMQ的實現?式是預設?個系統Topic,名字叫做SCHEDULE_TOPIC_XXXXX。在這個Topic下,預設了18個MessageQueue。這?每個對列就對應了?種延遲級別。然后每次掃描這18個隊列?的消息,進?延遲操作就可以了。
- 指定消息發送時間:RocketMQ是通過時間輪算法實現。
批量消息
?產者要發送的消息?較多時,可以將多條消息合并成?個批量消息,?次性發送出去。這樣可以減少?絡IO,提升消息發送的吞吐量。同?批消息的Topic必須相同,另外,不?持延遲消息。還有批量消息的??不要超過1M,如果太?就需要??分割。
事務消息
- ?產者將消息發送?ApacheRocketMQ服務端。
- ApacheRocketMQ服務端將消息持久化成功之后,向?產者返回Ack確認消息已經發送成功,此時消息被標記為"暫不能投遞",這種狀態下的消息即為半事務消息。
- ?產者開始執?本地事務邏輯。
- ?產者根據本地事務執?結果向服務端提交?次確認結果(Commit或是Rollback),服務端收到確認結果后處理邏輯如下:?次確認結果為Commit:服務端將半事務消息標記為可投遞,并投遞給消費者。?次確認結果為Rollback:服務端將回滾事務,不會將半事務消息投遞給消費者。
- 在斷?或者是?產者應?重啟的特殊情況下,若服務端未收到發送者提交的?次確認結果,或服務端收到的?次確認結果為Unknown未知狀態,經過固定時間后,服務端將對消息?產者即?產者集群中任??產者實例發起消息回查。
- ?產者收到消息回查后,需要檢查對應消息的本地事務執?的最終結果。
- ?產者根據檢查到的本地事務的最終狀態再次提交?次確認,服務端仍按照步驟4對半事務消息進?處理。
消息重試
RocketMQ的消費者端,如果處理消息失敗了,Broker是會將消息重新進?投送的。?在重試時,RocketMQ實際上會為每個消費者組創建?個對應的重試隊列。重試的消息會進??個“%RETRY%”+ConsumeGroup的隊列中。
RocketMQ默認允許每條消息最多重試16次,每次重試的間隔時間如下:
如果消息重試16次后仍然失敗,消息將不再投遞,轉為進?死信隊列。重試次數可以通過consumer.setMaxReconsumeTimes(20);將重試次數設定為20次。當定制的重試次數超過16次后,消息的重試時間間隔均為2?時。
如果消息超過最?重試次數,RocketMQ會將消息發送到死信隊列。?個死信隊列對應?個消費組。死信隊列的默認權限為2(禁讀)。如果需要處理死信隊列的消息,需要把權限修改為6(可讀可寫后)消費該Topic的消息進行處理。隊列中超過有效期(默認3天)的消息會被刪除,不管有沒有消費。
最佳實踐
- ?個應?盡可能??個Topic,?消息?類型則可以?tags來標識。tags過濾消息的性能很高,相當于索引。
- 消費端冪等控制:RocketMQ的每條消息都有?個唯?的MessageId,這個參數在多次投遞的過程中是不會改變的,所以業務上可以?這個MessageId來作為判斷冪等的關鍵依據。但是,這個MessageId是?法保證全局唯?的,也會有沖突的情況。所以在?些對冪等性要求嚴格的場景,最好是使?業務上唯?的?個標識?較靠譜。例如訂單ID。?這個業務標識可以使?Message的Key來進?傳遞。