深入理解RocketMQ三高架構設計
高性能
- 順序寫磁盤 + mmap 零拷貝
- 異步刷盤 + 刷盤策略可配置
- 輕量網絡協議 + 長連接復用
高可用
- 主從復制機制、controller、dledger集群
- NameServer 多副本無狀態
- 客戶端自動切換 Broker
- 消息刷盤機制保障可靠性
高可擴展性
- Broker 水平擴展
- Consumer 分組機制
- Topic/Queue 靈活路由
- 插件式架構設計
快速梳理RocketMQ客戶端消息模型
三大核心角色
角色 | 說明 |
---|---|
Producer(生產者) | 發送消息到 Broker。支持同步、異步、單向三種發送方式。 |
Consumer(消費者) | 從 Broker 拉取消息進行消費。支持推模式(Push)和拉模式(Pull)。 |
NameServer | 提供路由發現服務。Producer/Consumer 都通過它查找 Broker 地址。 |
五大關鍵過程
-
Producer 啟動流程
- 初始化 MQClientInstance;
- 向 NameServer 拉取路由信息;
- 建立與 Broker 的連接(Netty 長連接);
- 注冊自身到 Topic 路由表。
-
消息發送流程
生產者發送消息時:- 從緩存中查找 Topic 對應的路由信息;
- 按策略選擇一個隊列(MessageQueue);
- 通過 Netty 將消息發送到對應的 Broker;
- 根據配置選擇:
- 同步發送:等待返回確認;
- 異步發送:注冊回調函數;
- 單向發送:不關心發送結果,適用于日志類數據。
-
Consumer 啟動流程
消費者啟動時:- 初始化 MQClientInstance;
- 向 NameServer 拉取 Topic 路由;
- 與 Broker 建立連接;
- 根據消費模式(Push/Pull)拉取消息。
-
消息消費流程
支持兩種消費模式:模式 說明 Push 模式(默認) 實際是 Broker 定期向 Consumer 主動推送拉取請求。 Pull 模式 Consumer 主動向 Broker 拉取消息。 消費進度(offset)根據消費模式不同,也有兩種:
- 集群模式(Clustering) :隊列在多個消費者之間分攤;
- 廣播模式(Broadcasting) :每個消費者都消費所有消息。 -
消費確認與重試機制
- 消費成功:Consumer 會定期上報消費進度;
- 消費失敗:
- 可自動重試(重投到
RETRY_TOPIC
); - 或轉移到死信隊列(
DLQ
)。
- 可自動重試(重投到
結合源碼理解RocketMQ高性能實現細節
方面 | 實現機制 |
---|---|
消息寫入 | 順序寫磁盤 + MappedByteBuffer + 異步刷盤 |
消息讀取 | 消費隊列(ConsumeQueue)+ 索引文件(IndexFile) |
通信框架 | 高性能 Netty + 自定義輕量協議 |
路由發現 | NameServer 提前緩存路由,無需頻繁請求 |
網絡效率 | 長連接復用 + 請求壓縮 + 線程池模型 |
全面思考RocketMQ的集群架構
RocketMQ 集群核心角色
角色 | 描述 |
---|---|
NameServer | 類似于注冊中心,管理路由信息,支持無狀態集群部署 |
Broker | 真正存儲消息的服務。可部署為主從結構 |
Producer | 消息生產者,連接 NameServer 獲取路由,再將消息發送至 Broker |
Consumer | 消息消費者,從 Broker 拉取并消費消息 |
架構特性與設計思想
- NameServer(服務發現)
- 無狀態部署,支持多個節點;
- Producer/Consumer 啟動時從多個 NameServer 拉取 Broker 路由信息;
- 路由信息是 Broker 主動注冊 到 NameServer 的;
- 支持故障容忍(某個 NameServer 掉線不影響整體)。
- Broker(核心)
每個 Broker 有唯一標識(brokerName
+brokerId
):
brokerId = 0
:MasterbrokerId > 0
:Slave
每個 Topic 可以配置多個隊列分布在不同的 Broker 上。
主從同步方式:
同步模式 | 描述 |
---|---|
ASYNC_MASTER | 異步同步(默認),寫成功不等待 Slave,同步失敗不影響寫入 |
SYNC_MASTER | 同步刷盤,寫消息時等待 Slave 確認,提高可靠性 |
SLAVE | 只做備份,不接收寫請求,不參與消費 |
- Producer 工作機制
- 從 NameServer 獲取最新 Topic 路由;
- 通過負載均衡策略選擇隊列(MessageQueue);
- 支持三種發送方式(同步/異步/單向);
- 自動感知路由變化,動態調整發送目標。
- Consumer 工作機制
-
支持兩種消費模式:
- 集群模式(Clustering):多個消費者共享消息
- 廣播模式(Broadcasting):每個消費者都消費所有消息
-
支持 Push 和 Pull 模式;
-
消費進度保存在 Broker(默認)或本地(廣播模式);
-
支持負載均衡重新分配隊列(Rebalance)。
集群高可用與容錯機制
機制 | 實現 |
---|---|
主從容災 | Master 掛了,Slave 不自動轉正,需人工或運維系統切換 |
NameServer 容災 | Producer/Consumer 配置多個 NameServer,自動重試 |
消息重試機制 | 消費失敗支持自動重試、死信隊列 |
刷盤策略保障數據 | 同步刷盤 + SYNC_MASTER 可實現消息 0 丟失(犧牲部分性能) |
生產環境RocketMQ常見問題處理思路
MQ消息零丟失方案總結
各種防止MQ消息丟失的方案,本質上都是以犧牲系統性能和吞吐量為代價的。這種資源消耗必然會導致集群整體效率的下降。在實際業務場景中,我們需要根據具體需求對這些安全方案進行權衡取舍。
- 生產者發送消息如何保證不丟失
- 同步發送+多次嘗試(降低吞吐)
- 異步發送(增加生產者客戶端負擔)
- 事務消息機制(多次網絡請求)
- Broker寫入數據如何保證不丟失
- 同步刷盤(I/O負擔)
- Dledger集群(網絡負擔)
- 消費者消費消息如何不丟失
- 同步處理消息,再提交offset(無法通過異步提高吞吐)
- 如果MQ服務全部掛了,如何保證不丟失
- 增加臨時的降級存儲
MQ如何保證消息的順序性
強調局部有序,而不是全局有序。
- Producer將一組有序的消息寫入到同一個MessageQueue中。
- Consumer每次只有單個線程能從一個同一個TopicMessageQueue中拿取消息。
MQ如何保證消息冪等性
-
生產者發送消息到服務端如何保持冪等
Producer發送消息時,如果采用發送者確認的機制,Producer發送消息會等待Broker的響應。若未收到響應,Producer將自動重試發送。然而,這種情況也可能發生在消息已被處理成功處理但確認響應丟失的場景中,從而導致消息重復發送的問題。
RocketMQ的處理方式,是會在發送消息時,給每條消息分配一個唯一的ID。 -
消費者消費消息如何保持冪等、
RocketMQ官網明確做了回答:RocketMQ確保所有消息至少傳遞一次。在大多數情況下,消息不會重復。
防止重復消費的關鍵在于確定一個可靠的唯一性標識。RocketMQ為每條消息自動分配了唯一的messageId,消費者可以通過獲取這個messageId來實現去重。將已處理的messageId記錄下來,就能有效判斷消息是否重復消費。數據庫的兜底方案則是在某些適用的場景下設置唯一鍵,插入重復的唯一鍵自然會報錯回滾。
MQ如何快速處理積壓的消息
-
消息積壓會有哪些問題
RocketMQ和Kafka都具備出色的消息積壓處理能力,短期的消息堆積通常不會造成問題。然而需要警惕的是,若積壓問題長期得不到解決,當日志文件過期時,系統會自動刪除這些過期文件,導致其中未被消費的消息永久丟失。 -
怎么處理大量積壓的消息
-
RabbitMQ
如果是Classic Queue經典對列,那么針對同一個Queue的多個消費者,是按照Work Queue的模式,在多個Consuemr之間依次分配消息的。所以這時,如果Consumer消費能力不夠,那么直接加更多的Consumer實例就可以了。這里需要注意下的是如果各個Consumer實例他們的運行環境,或者是處理消息的速度有差別。那么可以優化一下每個Consumer的比重(Qos屬性),從而盡量大的發揮Consumer實例的性能。 -
RocketMQ和Kafka
因為同一個消費者組下的多個Cosumer需要和對應Topic下的MessageQueue建立對應關系,而一個MessageQueue最多只能被一個Consumer消費,因此,增加的Consumer實例最多也只能和Topic下的MessageQueue個數相同。如果此時再繼續增加Consumer的實例,那么就會有些Consumer實例是沒有MessageQueue去消費的,因此也就沒有用了。
如果Topic下的MessageQueue配置本來就不夠多的話,那就無法一直增加Consumer節點個數了。
如果要快速處理積壓的消息,可以創建一個新的Topic,配置足夠多的MessageQueue。然后把Consumer實例的Topic轉向新的Topic,并緊急上線一組新的消費者,只負責消費舊Topic中的消息,并轉存到新的Topic中。這個速度明顯會比普通Consumer處理業務邏輯要快很多。然后在新的Topic上,就可以通過添加消費者個數來提高消費速度了。之后再根據情況考慮是否要恢復成正常情況。類似固定級別的延遲消息機制,把消息臨時轉到一個系統內部的Topic下,處理過后,再轉回來。