前言
? RocketMQ架構上主要分為四部分, Broker、Producer、Consumer、NameServer,其他三個都會與NameServer進行通信。
Producer:
? **消息發布的角色,可集群部署。**通過NameServer集群獲得Topic的路由信息,包括Topic下面有哪些Queue,這些Queue分布在哪些Broker上等。(Producer只會將消息發送到Master節點,因此只需與Master節點建立連接)。
Consumer:
? **消息消費的角色,可集群部署。**通過NameServer集群獲得Topic的路由信息,連接到對應的Broker上拉取和消費消息。(Master和Slave都可以拉取消息,因此Consumer會與Master和Slave都建立連接)。
Broker:
? 主要負責消息的存儲、投遞和查詢以及服務高可用保證。
介紹
? NameServer 是專為 RocketMQ 設計的輕量級名稱服務,也是一個簡單的Topic路由注冊中心。具有簡單、可集群橫向擴展、無狀態,節點之間互不通信等特點。
? NameServer 通常以集群的方式部署,各實例間相互不進行信息通訊,只是互為備份,達到高可用的效果。
? NameServer具有兩大功能:Broker管理和topic路由信息管理。
Broker管理:
? **NameServer接受Broker的注冊請求,處理請求數據作為路由信息的基礎數據。**對broker進行心跳檢測機制,檢測是否還存活(120s);
Topic路由信息管理:
? 每個NameServer都保存整個Broker集群的路由信息,用于Producer和Conumser查詢的路由信息,從而進行消息的投遞和消費。
? NameServer 僅僅處理其他模塊的請求,而不會主動向其他模塊發起請求。它其實本質上就是一個 NettyServer,它主要有 3 個模塊:Topic 路由管理模塊(RouteInfoManager
)、通信模塊(DefaultRequestProcessor
、ClusterTestRequestProcessor
)、KV 數據存儲模塊(KVConfigManager
)。
RouteInfoManager
? **RouteInfoManager 中存儲 5 個 HashMap
,這就是 NameServer 中主要存儲的數據。它們僅存在于內存中,并不會持久化。**其中數據內容如下:
- **topicQueueTable:保存 Topic 的隊列信息,也是真正的路由信息。**隊列信息中包含了其所在的 Broker 名稱和讀寫隊列數量。
- brokerAddrTable:保存 Broker 信息,包含其名稱、集群名稱、主備 Broker 地址。
- clusterAddrTable:保存 Cluster信息,包含每個集群中所有的 Broker 名稱列表。
- brokerLiveTable:Broker 狀態信息,包含當前所有存活的 Broker,和它們最后一次上報心跳的時間。
- filterServerTable:Broker 上的 FilterServer 列表,用于類模式消息過濾,該機制在 4.4 版本后被廢棄。
? RequestProcessor 繼承了 AsyncNettyRequestProcessor。作為 NameServer 的請求處理器,根據不同種類的請求做不同類型的處理。 其中
KV_CONFIG
類型的請求用于 KVConfig 模塊,當前不會用到。其他請求類型由 Broker 和 Producer、Consumer 發起。? KVConfigManager 內部保存了一個二級
HashMap
:configTable
,并且會將該對象進行持久化。
心跳機制
Broker
- 每隔 30s 向 NameServer 集群的每臺機器都發送心跳包,包含自身 Topic 隊列的路由信息。
- 當有 Topic 改動(創建/更新),Broker 會立即發送 Topic 增量信息到 NameServer,同時觸發 NameServer 的數據版本號發生變更(+1)。
NameServer
- 將路由信息保存在內存中。它只被其他模塊調用(被 Broker 上傳,被客戶端拉取),不會主動調用其他模塊。
- 啟動一個定時任務線程,每隔 10s 掃描 brokerAddrTable 中所有的 Broker 上次發送心跳時間,如果超過 120s 沒有收到心跳,則從存活 Broker 表中移除該 Broker。
Client
- 生產者第一次發送消息時,向 NameServer 拉取該 Topic 的路由信息。
- 消費者啟動過程中會向 NameServer 請求 Topic 路由信息。
- 每隔 30s 向 NameServer 發送請求,獲取它們要生產/消費的 Topic 的路由信息。
啟動流程
- 由啟動腳本調用
NamesrvStartup#main
函數觸發啟動流程 NamesrvStartup#createNamesrvController
函數中先解析命令行參數,然后初始化 NameServer 和 Netty remote server 配置,最后創建NamesrvController
的實例。NamesrvStartup#start
初始化NamesrvController
;調用NamesrvController#start()
方法,啟動 Netty remoting server;最后注冊關閉鉤子函數,在 JVM 線程關閉之前,關閉 Netty remoting server 和處理線程池,關閉定時任務線程。
? NamesrvController
實例是 NameServer 的核心控制器,它的初始化方法 initialize()
先加載 KVConfig manager,然后初始化 Netty remoting server。最后添加 2 個定時任務:一個每 10s 打印一次 KV 配置,一個每 10s 掃描 Broker 列表,移除掉線的 Broker。
為什么選擇重新開發一個NameServer?
? RocketMQ的架構設計決定了只需一個輕量級的元數據服務器,只需保持最終一致,所以AP模式直接pass。
? NameServer互相獨立,彼此沒有通信關系,由于Broker向每個NameServer注冊自己的路由信息,所以每個NameServer都保存一份完整的路由信息。
? 單臺NameServer掛掉,Broker仍然可以向其它NameServer同步路由信息,不影響其他NameServer,所以Producer,Consumer仍然可以動態感知Broker的路由的信息。
源碼分析
? NameServer的路由數據來源是broker注冊提供,然后內部加工處理,而路由的數據的使用者是producer和consumer。
1、路由數據結構
? RouteInfoManager是NameServer核心邏輯類,其代碼作用就是維護路由信息管理,提供路由注冊/查詢等核心功能。
? 由于路由信息都是保存在NameServer應用內存里,其本質就是維護HashMap,而為了防止并發操作,添加了ReentrantReadWriteLock讀寫鎖。
? RouteInfoManager源碼結構如下:
QueueData 屬性解析:
/*** 隊列信息*/
public class QueueData implements Comparable<QueueData> {// 隊列所屬的Broker名稱private String brokerName;// 讀隊列數量 默認:16private int readQueueNums;// 寫隊列數量 默認:16private int writeQueueNums;//todo Topic的讀寫權限(2是寫 4是讀 6是讀寫)private int perm;/** 同步復制還是異步復制--對應TopicConfig.topicSysFlag* {@link org.apache.rocketmq.common.sysflag.TopicSysFlag}*/private int topicSynFlag;...省略...}
map: topicQueueTable 數據格式demo(json):
{"TopicTest":[{"brokerName":"broker-a","perm":6,"readQueueNums":4,"topicSynFlag":0,"writeQueueNums":4}]
}
BrokerData 屬性解析:
/*** broker的數據:Master與Slave 的對應關系通過指定相同的BrokerName,不同的BrokerId來定義,BrokerId為0 表示Master,非0表示Slave。*/
public class BrokerData implements Comparable<BrokerData> {// broker所屬集群private String cluster;// brokerNameprivate String brokerName;// 同一個brokerName下可以有一個Master和多個Slave,所以brokerAddrs是一個集合// brokerld=O表示 Master,大于 O表示從 Slaveprivate HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;// 用于查找broker地址private final Random random = new Random();...省略...}
map: brokerAddrTable 數據格式demo(json):
{"broker-a":{"brokerAddrs":{"0":"172.16.62.75:10911"},"brokerName":"broker-a","cluster":"DefaultCluster"}
}
BrokerLiveInfo 屬性解析:
/*** 存放存活的Broker信息,當前存活的 Broker,該信息不是實時的,NameServer 每10S掃描一次所有的 broker,根據心跳包的時間得知 broker的狀態,* 該機制也是導致當一個 Broker 進程假死后,消息生產者無法立即感知,可能繼續向其發送消息,導致失敗(非高可用)*/
class BrokerLiveInfo {//最后一次更新時間private long lastUpdateTimestamp;//版本號信息private DataVersion dataVersion;//Netty的Channelprivate Channel channel;//HA Broker的地址 是Slave從Master拉取數據時鏈接的地址,由brokerIp2+HA端口構成private String haServerAddr;...省略...}
map: brokerLiveTable 數據格式demo(json):
{"172.16.62.75:10911":{"channel":{"active":true,"inputShutdown":false,"open":true,"outputShutdown":false,"registered":true,"writable":true},"dataVersion":{"counter":2,"timestamp":1630907813571},"haServerAddr":"172.16.62.75:10912","lastUpdateTimestamp":1630907814074}
}
brokerAddrTable -Map 數據格式demo(json)
{"DefaultCluster":["broker-a"]}
? 從RouteInfoManager維護的HashMap數據結構和QueueData、BrokerData、BrokerLiveInfo類屬性得知,NameServer維護的信息既簡單但極其重要。
2、路由注冊
路由注冊流程:
roker主動注冊的幾種情況:
- 啟動時向集群中所有的NameServer注冊
- 定時30s向集群中所有NameServer發送心跳包注冊
- 當broker中topic信息發送變更(新增/修改/刪除)發送心跳包注冊。
? NameServer中注冊的核心處理邏輯是RouteInfoManager#registerBroker
public RegisterBrokerResult registerBroker(final String clusterName, final String brokerAddr,final String brokerName, final long brokerId,final String haServerAddr,//TopicConfigSerializeWrapper比較復雜的數據結構,主要包含了broker上所有的topic信息final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList, final Channel channel) {RegisterBrokerResult result = new RegisterBrokerResult();try {try {this.lock.writeLock().lockInterruptibly(); // 鎖//1: 此處維護 clusterAddrTable 集群元數據Set<String> brokerNames = this.clusterAddrTable.get(clusterName);if (null == brokerNames) {brokerNames = new HashSet<String>();this.clusterAddrTable.put(clusterName, brokerNames);}brokerNames.add(brokerName);//2:此處維護 brokerAddrTable broker元數據boolean registerFirst = false;//是否第一次注冊(如果Topic配置信息發生變更或者該broker為第一次注冊)BrokerData brokerData = this.brokerAddrTable.get(brokerName);if (null == brokerData) {registerFirst = true;brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());this.brokerAddrTable.put(brokerName, brokerData);}//3: 此處維護 topicQueueTable 主題隊列數據,數據更新操作方法在:createAndUpdateQueueDataString oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);registerFirst = registerFirst || (null == oldAddr);if (null != topicConfigWrapper&& MixAll.MASTER_ID == brokerId) { //小知識點:只處理主節點請求,因為備節點的topic信息是同步主節點的// 如果Topic配置信息發生變更或者該broker為第一次注冊if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())|| registerFirst) {ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable();if (tcTable != null) {for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) { this.createAndUpdateQueueData(brokerName, entry.getValue());}}}}//4: 此處維護:brokerLiveTable數據,關鍵點:BrokerLiveInfo構造器第一個參數:System.currentTimeMillis(),用于存活判斷BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,new BrokerLiveInfo(System.currentTimeMillis(),topicConfigWrapper.getDataVersion(),channel,haServerAddr));if (null == prevBrokerLiveInfo) {log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);}//5-維護:filterServerTable 數據if (filterServerList != null) {if (filterServerList.isEmpty()) {this.filterServerTable.remove(brokerAddr);} else {this.filterServerTable.put(brokerAddr, filterServerList);}}//返回值(如果當前broker為slave節點)則將haServerAddr、masterAddr等信息設置到result返回值中if (MixAll.MASTER_ID != brokerId) {String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if (masterAddr != null) {BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);if (brokerLiveInfo != null) {result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());result.setMasterAddr(masterAddr);}}}} finally {this.lock.writeLock().unlock();}} catch (Exception e) {log.error("registerBroker Exception", e);}return result;
}
? 從源碼中分析,Broker注冊的路由信息對于NameServer來說,其實就是維護clusterAddrTable、brokerAddrTable、topicQueueTable、brokerLiveTable、filterServerTable。
3、路由刪除
? **路由刪除有兩種方式,一種是broker主動上報刪除,一種是NameServer主動刪除。**從根本上來說,是刪除clusterAddrTable、brokerAddrTable、topicQueueTable、brokerLiveTable、filterServerTable相關信息.
1)Broker主動上報刪除
? Broker在正常被關閉的情況下,會執行unregisterBroker指令。向NameServer發送取消注冊請求,之后執行刪除信息操作(加寫鎖)。
? 1-1)直接刪除 brokerLiveTable 信息,無需判斷時間
? 1-2)刪除 filterServerTable 信息
? 1-3)維護刪除 brokerAddrTable 信息
? 1-4)維護刪除 clusterAddrTable 信息
? 1-5)根據 BrokerName,從 topicQueueTable
中移除該 Broker 的隊列。也就是維護刪除 topicQueueTable 信息
2)NameServer主動刪除:
? NameServer定時(10s)掃描brokerLiveTable,檢測上次心跳包與當前系統時間的時間差,如果時間戳大于120s,則需要移除該Broker信息。
? 2-1)判斷是否有時間戳大于120s的,若是有,則執行移除操作
? 2-2)移除前,加讀鎖,查詢需要刪除的broker信息。
? 2-3)根據broker信息brokerAddrFound(加寫鎖),刪除相關信息brokerLiveTable、filterServerTable、brokerAddrTable、clusterAddrTable、topicQueueTable 信息。
4、路由發現
? 當Topic路由出現變化后,NameServer不會主動推送給客戶端,而是由生產端和消費端定時拉取主題最新的路由,所以路由信息非實時的。NameServer 收到客戶端獲取路由信息請求后,調用 DefaultRequestProcessor#getRouteInfoByTopic()
方法,返回 Topic 路由信息。
? 源碼中,實際上是調用 RouteInfoManager#pickupTopicRouteData()
方法從topicQueueTable、brokerAddrTable、filterServerTable這些Map中查詢數據,組裝給TopicRouteData,然后返回給客戶端使用。
? 如果該主題為順序消息,從 KVConfig 中獲取順序消息相關的配置,填充進 TopicRouteData
對象。之后將 TopicRouteData
對象編碼,并返回給客戶端。
RouteInfoManager#pickupTopicRouteData
public TopicRouteData pickupTopicRouteData(final String topic) {TopicRouteData topicRouteData = new TopicRouteData();boolean foundQueueData = false;boolean foundBrokerData = false;Set<String> brokerNameSet = new HashSet<String>();List<BrokerData> brokerDataList = new LinkedList<BrokerData>();topicRouteData.setBrokerDatas(brokerDataList);HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();topicRouteData.setFilterServerTable(filterServerMap);try {try {this.lock.readLock().lockInterruptibly();List<QueueData> queueDataList = this.topicQueueTable.get(topic);if (queueDataList != null) {topicRouteData.setQueueDatas(queueDataList);foundQueueData = true;Iterator<QueueData> it = queueDataList.iterator();while (it.hasNext()) {QueueData qd = it.next();brokerNameSet.add(qd.getBrokerName());}// 處理構建:BrokerData數據for (String brokerName : brokerNameSet) {BrokerData brokerData = this.brokerAddrTable.get(brokerName);if (null != brokerData) {BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData.getBrokerAddrs().clone());brokerDataList.add(brokerDataClone);foundBrokerData = true;for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {List<String> filterServerList = this.filterServerTable.get(brokerAddr);filterServerMap.put(brokerAddr, filterServerList);}}}}} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error("pickupTopicRouteData Exception", e);}log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);if (foundBrokerData && foundQueueData) {return topicRouteData;}return null;
}public class TopicRouteData extends RemotingSerializable {//topic排序的配置,和"ORDER_TOPIC_CONFIG"這個NameSpace有關,參照DefaultRequestProcessor#getRouteInfoByTopic,后續可講解此小知識點private String orderTopicConf;// topic 隊列元數據private List<QueueData> queueDatas;// topic分布的 broker元數據private List<BrokerData> brokerDatas;// broker上過濾服務器地址列表private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;...省略...
}
為什么不用ConcurrentHashMap存儲呢?
? 首先ConcurrentHashMap如果是1.8之前,初始化容量之后,是不能擴容的。其容量默認是16,且鎖的粒度是segment,那么最大并發度也就是容量大小。
? 如果是1.8之后的版本是支持擴容,且在加鎖方面有優化,調整為synchronized+CAS,但是同時會要求jdk版本。
? 最主要是RocketMQ 中的路由信息比較多。
?
疑問
1、假設Broker異常宕機,導致生產者發送消息失敗怎么解決?
? 假設Broker異常宕機,NameServer至少等120s才將該Broker從路由信息中剔除,在Broker故障期間,消息生產者Producer根據topic獲取到的路由信息包含已經宕機的Broker,會導致消息在短時間內發送失敗。
? 那么生產者是否也有相關的嘗試策略?
之后再做補充…
總結
? NameServer作為RocketMQ的“大腦”,保存著集群MQ的路由信息,具體就是記錄維護Topic、Broker的信息,及監控Broker的運行狀態,為client提供路由能力。
? 而從源代碼的角度總結:NameServer就是維護了多個HashMap,Broker的注冊,Client的查詢都是圍繞其Map操作,當然為了解決并發問題添加了ReentrantReadWriteLock(讀寫鎖)。