RocketMQ的核心三流程
RocketMQ服務端由兩部分組成NameServer和Broker,NameServer是服務的注冊中心,Broker會把自己的地址注冊到NameServer,生產者和消費者啟動的時候會先從NameServer獲取Broker的地址,再去從Broker發送和接受消息。
Producer將消息寫入到RocketMQ集群中Broker中具體的Queue。
Comsumer從RocketMQ集群中拉取對應的消息并進行消費確認。
NameServer源碼分析
NameServer整體流程
NameServer是整個RocketMQ的“大腦”,它是RocketMQ的服務注冊中心,所以RocketMQ需要先啟動NameServer再啟動Rocket中的Broker。
- NameServer啟動
啟動監聽,等待Broker、Producer、Consumer連接。Broker在啟動時向所有NameServer注冊,生產者發送消息之前從NameServer獲取Broker服務器地址列表,然后根據負載均衡算法從列表中選擇一臺服務器進行消息消費發送。消費者在訂閱某個主題的消息之前從NameServer獲取Broker服務器地址列表(有可能是集群),但是消費者選擇從Broker中訂閱消息,訂閱規則由Broker配置決定。 - 路由注冊
Broker啟動后向所有NameServer發送路由及心跳信息。 - 路由剔除
一處心跳超時的Broker相關路由信息。NameServer與,每臺Broker服務保持長連接,并間隔10s檢查Broker是否存活,如果檢測到Broker宕機,則從路由注冊表中將其移除。這樣就可以實現RocketMQ的高可用。
NameServer啟動流程
啟動NameServer
加載KV配置
構建NRS(Netty Remoting Server)通訊,接受路由、心跳信息
構建定時任務(剔除超時的Broker)
加載KV配置
核心解讀NamesrvController類中createNamesrvController()
構建NRS通訊接收路由、心跳信息
構建定時任務剔除超時Broker
核心控制器會啟動定時任務: 每隔10s掃描一次Broker,移除不活躍的Broker。
Broker每隔30s向NameServer發送一個心跳包,心跳包包含BrokerId,Broker地址,Broker名稱,Broker所屬集群名稱、Broker關聯的FilterServer列表。但是如果Broker宕機,NameServer無法收到心跳包,此時NameServer如何來剔除這些失效的Broker呢?NameServer會每隔10s掃描brokerLiveTable狀態表,如果BrokerLive的lastUpdateTimestamp的時間戳距當前時間超過120s,則認為Broker失效,移除該Broker,關閉與Broker連接,同時更新topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable。
RocketMQ有兩個觸發點來刪除路由信息:
-
NameServer定期掃描brokerLiveTable檢測上次心跳包與當前系統的時間差,如果時間超過120s,則需要移除broker。
-
Broker在正常關閉的情況下,會執行unregisterBroker指令這兩種方式路由刪除的方法都是一樣的,都是從相關路由表中刪除與該broker相關的信息。
在消費者啟動之后,第一步都要從NameServer中獲取Topic相關信息