文章目錄
- 1. 前言
- 2. startScheduledTask 啟動定時任務
- 2.1 fetchNameServerAddr 拉取名稱服務地址
- 2.2 updateTopicRouteInfoFromNameServer 更新 topic 路由信息
- 2.2.1 topic 路由信息
- 2.2.2 updateTopicRouteInfoFromNameServer 獲取 topic
- 2.2.3 updateTopicRouteInfoFromNameServer 獲取路由信息并更新本地緩存
- 2.2.4 getTopicRouteInfoFromNameServer 從 NameServer 中獲取指定的路由信息
- 2.2.5 NameServer 處理 GET_ROUTEINFO_BY_TOPIC 請求
- 2.2.6 topicRouteDataIsChange 判斷 topic 路由信息是否發生了變化
- 2.2.7 isNeedUpdateTopicRouteInfo 是否需要更新 topic 路由配置
- 2.2.7.1 DefaultMQProducerImpl#isPublishTopicNeedUpdate 是否需要更新 topic 信息
- 2.2.7.2 DefaultMQPushConsumerImpl#isSubscribeTopicNeedUpdate 是否需要更新 topic 訂閱信息
- 2.2.8 DefaultMQProducerImpl#updateTopicPublishInfo
- 2.2.9 DefaultMQPushConsumerImpl#updateTopicSubscribeInfo
- 2.2.10 topicRouteData2TopicPublishInfo 將 TopicRouteData 轉成 TopicPublishInfo
- 3. cleanOfflineBroker 清除下線的 broker
- 4. persistAllConsumerOffset 持久化消費進度
- 4.1 LocalFileOffsetStore#persistAll
- 4.2 RemoteBrokerOffsetStore#persistAll
- 4.3 broker 處理 UPDATE_CONSUMER_OFFSET 請求
- 5. adjustThreadPool 調整消費者線程數
- 6. 小結
本文章基于 RocketMQ 4.9.3
1. 前言
- 【RocketMQ】- 源碼系列目錄
- 【RocketMQ 生產者消費者】- 同步、異步、單向發送消費消息
- 【RocketMQ 生產者和消費者】- 生產者啟動源碼-啟動流程(1)
- 【RocketMQ 生產者和消費者】- 生產者啟動源碼-創建 MQClientInstance(2)
- 【RocketMQ 生產者和消費者】- 生產者啟動源碼-上報生產者和消費者心跳信息到 broker(3)
上面幾篇文章我們介紹了生產的啟動流程,這篇文章就來看下生產者啟動的時候在 MQClientInstance 里面通過 startScheduledTask
啟動的定時任務,在上面的文章中也介紹過這個類,MQClientInstance 是 RocketMQ 客戶端的一個核心實例,負責管理消息的生產和消費,封裝了與消息隊列服務端的通信邏輯,包括消息的發送、接收以及相關配置的管理。所以可以說這些定時任務在生產者和消費者啟動的時候都會啟動。
2. startScheduledTask 啟動定時任務
MQClientInstance 在啟動的時候會通過 startScheduledTask
方法啟動一批定時任務,一共有 5 個:
- 每隔 2min 定時拉取 namesrv 地址
- 每隔 30s 定時拉取最新的 topic 路由信息
- 每隔 30s 定時清除無效的 broker,同時向所有 broker 發送心跳
- 每隔 5s 嘗試持久化消費者偏移量,就是消費進度
- 每隔 1min 調整一次消費者的線程數
2.1 fetchNameServerAddr 拉取名稱服務地址
// 1.定時任務,默認一開始延時 10s 執行,之后每 2min 執行一次
if (null == this.clientConfig.getNamesrvAddr()) {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {// 定時從 nameserver 服務器去拉取名稱服務地址// 這里獲取 ws 地址,默認是 http://jmenv.tbsite.net:8080/rocketmq/nsaddr// 全部構成應該是 http:// + wsDomainName + :8080/rocketmq/ + wsDomainSubgroup// wsDomainName 通過 rocketmq.namesrv.domain 獲取,默認值是 jmenv.tbsite.net// wsDomainSubgroup 通過 rocketmq.namesrv.domain.subgroup 獲取,默認值是 nsaddr// 當然了如果 wsDomainName 設置成 ip:port 的格式,那么最終的地址就是 http:// + ip:port + /rocketmq/ + wsDomainSubgroup// 比如 wsDomainName 設置成了 127.0.0.1:9876,那么結果就是 http://127.0.0.1:9876/rocketmq/nsaddrMQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();} catch (Exception e) {log.error("ScheduledTask fetchNameServerAddr exception", e);}}}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
這個定時任務就是定時從 NameServer 服務器拉取 NameServer 服務地址,默認是 http://jmenv.tbsite.net:8080/rocketmq/nsaddr
,不過一般創建出生產者或者消費者之后都會通過 set 方法手動設置 NameServer 地址,所以也不需要用到這個定時任務。
/*** 拉取 nameserver 的地址* @return*/
public String fetchNameServerAddr() {try {// 這里獲取 ws 地址,默認是 http://jmenv.tbsite.net:8080/rocketmq/nsaddr// 全部構成應該是 http:// + wsDomainName + :8080/rocketmq/ + wsDomainSubgroup// wsDomainName 通過 rocketmq.namesrv.domain 獲取,默認值是 jmenv.tbsite.net// wsDomainSubgroup 通過 rocketmq.namesrv.domain.subgroup 獲取,默認值是 nsaddr// 當然了如果 wsDomainName 設置成 ip:port 的格式,那么最終的地址就是 http:// + ip:port + /rocketmq/ + wsDomainSubgroup// 比如 wsDomainName 設置成了 127.0.0.1:9876,那么結果就是 http://127.0.0.1:9876/rocketmq/nsaddrString addrs = this.topAddressing.fetchNSAddr();if (addrs != null) {// 地址發生了變化,更新 nameSrv 地址if (!addrs.equals(this.nameSrvAddr)) {log.info("name server address changed, old=" + this.nameSrvAddr + ", new=" + addrs);// 更新 namesrvAddrList 列表,就是更新里面的 namesrv 地址this.updateNameServerAddressList(addrs);// 更新 nameSrvAddrthis.nameSrvAddr = addrs;return nameSrvAddr;}}} catch (Exception e) {log.error("fetchNameServerAddr Exception", e);}return nameSrvAddr;
}
可以看到這里面拉取到 NameServer 地址之后設置到 nameSrvAddr 屬性中,在設置之前會通過 updateNameServerAddressList
更新 NettyRemotingClient 的 namesrvAddrList 地址。為什么要更新這個呢?不知道大家還記得 RocketMQ 的通信架構嗎。
生產者和消費者都只設置 NameServer 地址,但是最終消息發送和消費都是操作 broker 的,所以肯定是通過某個方法將 broker 地址存儲到了本地,那么 broker 地址從哪里獲取呢?當然就是從 NameServer 了,broker 啟動的時候會將自身信息上報給 NameServer,所以 NameServer 里面是存儲了 broker 信息的,因此直到了 NameServer 地址,生產者和消費者只需要定時向 NameServer 發起請求查詢,之后將 broker 信息存儲到本地即可,而這里的定時查詢存儲就是下面 2.2 小結的定時任務要做的,這里還是先回到 updateNameServerAddressList
方法。
/*** 更新 namesrv 地址,addrs 是傳入的最新的地址* @param addrs*/
public void updateNameServerAddressList(final String addrs) {// 可以傳入 namesrv 集群地址,通過 ';' 分割就行了String[] addrArray = addrs.split(";");List<String> list = Arrays.asList(addrArray);// 更新 namesrvAddrList 列表this.remotingClient.updateNameServerAddressList(list);
}
這個方法會將獲取到的 addrs 通過 ;
分割,然后通過 updateNameServerAddressList
更新 namesrvAddrList 列表。
private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<List<String>>();private final AtomicReference<String> namesrvAddrChoosed = new AtomicReference<String>();/*** 更新 nameserver 地址* @param addrs 傳入的新地址*/
@Override
public void updateNameServerAddressList(List<String> addrs) {// 獲取舊的 namesrv 列表List<String> old = this.namesrvAddrList.get();boolean update = false;// 如果新的不是空if (!addrs.isEmpty()) {if (null == old) {// 原來還沒有數組,說明第一次更新update = true;} else if (addrs.size() != old.size()) {// 如果大小不一樣那也是需要更新update = true;} else {// 如果其中有一條不一樣,那也是需要更新for (int i = 0; i < addrs.size() && !update; i++) {if (!old.contains(addrs.get(i))) {update = true;}}}if (update) {// 需要更新舊打亂地址Collections.shuffle(addrs);log.info("name server address updated. NEW : {} , OLD: {}", addrs, old);// 重新設置進去this.namesrvAddrList.set(addrs);if (!addrs.contains(this.namesrvAddrChoosed.get())) {// 再更新 namesrvAddrChoosed,namesrvAddrChoosed 就是當前被選擇的 namesrvthis.namesrvAddrChoosed.set(null);}}}
}
這個 namesrvAddrList 也是一個原子類,而且可以看到這個方法里面的更新是屬于全量更新,也就是一次性更新所有 NameServer 地址,因此更新前需要有一個 update 字段來標記 NameServer 有沒有發生變化,出現下面的幾種情況:namesrvAddrList 是第一次設置
,NameServer 地址集合長度不一樣
、長度一樣但是地址有不同的
,這幾種情況就屬于發生了變化,需要重新設置 namesrvAddrList。
而最后還需要更新一個屬性 namesrvAddrChoosed
,namesrvAddrChoosed 就是當前被選擇的 NameServer 地址,broker 注冊的時候會向所有 NameServer 注冊信息,所以生產者消費者只需要和一個 broker 建立連接就行,因此 namesrvAddrChoosed 指向的就是建立連接的 broker 地址。
2.2 updateTopicRouteInfoFromNameServer 更新 topic 路由信息
// 2.初始延時 10ms,之后每隔 30s 執行一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {// 定時從 nameserver 中拉取 topic 路由信息來更新MQClientInstance.this.updateTopicRouteInfoFromNameServer();} catch (Exception e) {log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);}}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
第二個定時任務就是每隔 30s 定時去獲取 topic 路由信息,然后更新本地緩存,在更新本地緩存前,我們還是來看下 topic 路由信息里面包括什么。
2.2.1 topic 路由信息
topic 路由信息封裝到了 TopicRouteData
對象中,下面就是這個對象里面的屬性:
// 順序 topic 的配置信息,來自 ${user.home}\namesrv\kvConfig.json,存儲到 KVConfigManager#configTable(namespace, (key, value))
// 這里賦值的時候通過 configTable.get(ORDER_TOPIC_CONFIG).get(topic) 來獲取這個 topic 下面的配置,比如 "broker-a:4;broker-b:4",
// 意思是在 broker-a 下面有 4 個隊列,在 broker-b 下面有 4 個隊列,一般都是 null
private String orderTopicConf;
// topic 下面的隊列信息(數量、分布 ...)
private List<QueueData> queueDatas;
// topic 存儲的 broker 信息,一個 topic 是可以存儲到多個 broker 的
private List<BrokerData> brokerDatas;
// broker 地址對應的過濾器地址列表
// 在 4.3 版本以前的過濾器是會在 broker 的服務器上運行一個 filterServer 的進程,并在 broker 的配置中加上:filterServerNums=1
// 比如: String filterCode = MixAll.file2String("過濾類的絕對路徑");
// System.out.println(filterCode);
// consumer.subscribe("TopicFilter","xjf.filter.MessageFilterImpl(類的相對路徑)",filterCode);
// 所以用戶可以自定義一些過濾信息上傳到過濾服務器上面
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
- orderTopicConf: 順序 topic 的配置信息,來自
${user.home}\namesrv\kvConfig.json
,存儲到KVConfigManager#configTable(namespace, (key, value))
,這里賦值的時候通過configTable.get(ORDER_TOPIC_CONFIG).get(topic)
來獲取這個 topic 下面的配置,比如"broker-a:4;broker-b:4"
,意思是在 broker-a 下面有 4 個隊列,在 broker-b 下面有 4 個隊列,一般都是 null。 - queueDatas: topic 下面的隊列信息(數量、分布 …)。
- brokerDatas: topic 存儲的 broker 信息,一個 topic 是可以存儲到多個 broker 的。
- filterServerTable: broker 地址對應的過濾服務器地址列表,在 4.3 版本以前的過濾器是會在 broker 的服務器上運行一個 filterServer 的進程,并在 broker 的配置中加上:filterServerNums=1,用戶可以自己訂閱過濾邏輯,將代碼傳到過濾服務器上去過濾,但是現階段版本已經不用了,使用 SQL92 代替了過濾服務器。
QueueData 是隊列信息,一個 topic 可以有很多個讀寫隊列,這些隊列可以分不到不同的 broker 上,所以 QueueData 包括了下面的屬性:
- brokerName: 分布的 broker 名稱(broker 集群)。
- readQueueNums: 讀隊列的數量,即該隊列下有多少個讀隊列,這個值在創建隊列時由用戶指定,通常是 4。
- writeQueueNums: 寫隊列的數量,即該隊列下有多少個寫隊列,這個值在創建隊列時由用戶指定,通常是 4。
- perm: 權限,PermName 里面就配置了相關的權限,比如 6 是讀寫權限,4 是只讀,也就是只能消費,2 是只寫,也就是只能發送消息,1 表示繼承 topic 的配置。
- topicSysFlag: topic 的同步配置,具體的配置值可以到 TopicSysFlag 類中去看,默認是 0,但是這個參數現在我也沒搞懂有什么用。
BrokerData 是 broker 信息,當然了 topic 路由配置里面的 broker 信息比較簡單,包括了 broker 集群名稱以及 broker 節點,下面是里面的屬性:
- cluster: broker 所屬集群,比如最經典的 DefaultCluster。
- brokerName: broker 名稱,又或者說是 broker 集群中的一個主從集群的名稱。
- brokerAddrsbrokerAddrs: broker 地址,brokerName 這個主從集群下面的 broker 節點地址,Map 類型,key 是 brokerId,value 是 broker 地址,0 是主節點,其他是從節點。
2.2.2 updateTopicRouteInfoFromNameServer 獲取 topic
從方法名也可以看出,這個方法就是從 NameServer 中獲取路由信息,然后更新本地緩存,在獲取路由信息之前我們要知道有哪些 topic,所以這個方法的核心就是從 consumerTable
和 producerTable
兩個集合中獲取進程消費者和生產者訂閱的 topic,最后再調用 updateTopicRouteInfoFromNameServer
去獲取路由信息。
/*** 從 namesrv 中拉取 topic 路由消息并更新*/
public void updateTopicRouteInfoFromNameServer() {// 去除重復數據Set<String> topicList = new HashSet<String>();// 遍歷當前 MQClientInstance 下面的所有消費者來獲取對應的訂閱信息,主要就是里面的 topic{Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, MQConsumerInner> entry = it.next();MQConsumerInner impl = entry.getValue();if (impl != null) {// 獲取每個消費者下面的訂閱列表Set<SubscriptionData> subList = impl.subscriptions();if (subList != null) {// 遍歷所有訂閱信息,將這個 Consumer 訂閱的 topic 添加到 topicList 中for (SubscriptionData subData : subList) {topicList.add(subData.getTopic());}}}}}// 遍歷當前 MQClientInstance 下面的所有生產者來獲取對應的訂閱信息,主要就是里面的 topic{Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, MQProducerInner> entry = it.next();MQProducerInner impl = entry.getValue();if (impl != null) {// 獲取生產者內部的 topicPublishInfoTable 列表的 topicSet<String> lst = impl.getPublishTopicList();// 將 topic 添加到集合中topicList.addAll(lst);}}}// 上面是從生產者和消費者中獲取的, 但是生產者和消費者都有自己的 MQClientInstance, 所以 topicList 只會包括生產者的或者是消費者的for (String topic : topicList) {// 遍歷每一個 topic,從 nameserver 中獲取這個 topic 的路由信息并更新this.updateTopicRouteInfoFromNameServer(topic);}
}
2.2.3 updateTopicRouteInfoFromNameServer 獲取路由信息并更新本地緩存
/*** 從 nameserver 中獲取 topic 的路由信息并嘗試更新* @param topic* @return*/
public boolean updateTopicRouteInfoFromNameServer(final String topic) {return updateTopicRouteInfoFromNameServer(topic, false, null);
}
這個方法就是上面 2.2.2 小節最后調用的方法,可以看到里面也是調用了另外一個 updateTopicRouteInfoFromNameServer 方法,傳入三個參數,分別是:topic、是否是默認的 topic、默認生產者,默認 topic 就是 "TBW102"
,當然我們這里肯定不是了。
/*** 從 nameserver 中獲取 topic 的路由信息并嘗試更新* @param topic* @return*/
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,DefaultMQProducer defaultMQProducer) {try {// 加鎖防止并發,加鎖時間是 3sif (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {try {// topic 路由信息TopicRouteData topicRouteData;// 1.如果通過默認 topic 去獲取路由信息if (isDefault && defaultMQProducer != null) {// 如果是默認 topic,就是獲取 TBW102 的路由信息topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),clientConfig.getMqClientApiTimeout());if (topicRouteData != null) {// 更新隊列信息,設置讀寫隊列數量for (QueueData data : topicRouteData.getQueueDatas()) {int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());data.setReadQueueNums(queueNums);data.setWriteQueueNums(queueNums);}}} else {// 2. 定時獲取路由信息的邏輯會走這里,會從 nameserver 獲取指定的 topic 的路由信息topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());}// 3.獲取到的路由信息不為空if (topicRouteData != null) {// 那么從路由列表里面獲取本地緩存的舊的 topic 路由信息TopicRouteData old = this.topicRouteTable.get(topic);// 判斷路由信息是否發生了修改boolean changed = topicRouteDataIsChange(old, topicRouteData);if (!changed) {// 如果沒有發生修改,判斷下這個 topic 路由信息是否需要更新了,如果說這個 topic 已經不可用了,那么別管一// 不一樣,都是需要更新的changed = this.isNeedUpdateTopicRouteInfo(topic);} else {log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);}// 4.如果需要更新本地路由信息if (changed) {// 將 nameserver 的路由信息克隆一份TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();// 遍歷這個 topic 分布的所有 broker,因為一個 topic 可以分布到不同 broker 下面for (BrokerData bd : topicRouteData.getBrokerDatas()) {// 4.1 首先是更新 broker 信息this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());}// 4.2 然后是更新生產者的 topicPublishInfoTable 信息,就是生產者的路由信息if (!producerTable.isEmpty()) {// 將 TopicRouteData 轉換成 TopicPublishInfoTopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);// 因為將 topicRouteData 設置到了 publishInfo 里面,所以這個屬性也得設置為 true,表示包括路由信息publishInfo.setHaveTopicRouterInfo(true);// 遍歷所有生產者Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();while (it.hasNext()) {// group(生產者組) -> MQProducerInner(生產者實例)Entry<String, MQProducerInner> entry = it.next();MQProducerInner impl = entry.getValue();if (impl != null) {// 更新生產者的 topicPublishInfoTable 集合impl.updateTopicPublishInfo(topic, publishInfo);}}}// 4.3 然后是更新消費者的 topicSubscribeInfoTable 信息,就是消費者的訂閱路由信息if (!consumerTable.isEmpty()) {// 消費者訂閱的 topic 集合Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();while (it.hasNext()) {// 遍歷這個 MQClientInstance 下面的所有消費者Entry<String, MQConsumerInner> entry = it.next();MQConsumerInner impl = entry.getValue();if (impl != null) {// 更新消費者的訂閱信息impl.updateTopicSubscribeInfo(topic, subscribeInfo);}}}log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);// 4.4 最后更新本地的 topic 信息this.topicRouteTable.put(topic, cloneTopicRouteData);return true;}} else {log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}. [{}]", topic, this.clientId);}} catch (MQClientException e) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC)) {log.warn("updateTopicRouteInfoFromNameServer Exception", e);}} catch (RemotingException e) {log.error("updateTopicRouteInfoFromNameServer Exception", e);throw new IllegalStateException(e);} finally {this.lockNamesrv.unlock();}} else {log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms. [{}]", LOCK_TIMEOUT_MILLIS, this.clientId);}} catch (InterruptedException e) {log.warn("updateTopicRouteInfoFromNameServer Exception", e);}return false;
}
上面就是更新本地緩存的邏輯,在更新之前需要先加個鎖,防止并發更新,比如一個進程里面的生產者和消費者同時跑定時任務來更新。
然后就是獲取路由信息,如果說傳入了默認 topic,就從 NameServer 里面拉取 TBW102 的路由信息,然后再從傳入的默認生產者里面獲取讀寫隊列數設置到路由信息里面。定時任務調用的就沒有傳入這個默認 topic,因此定時任務會直接從 NameServer 里面拉取傳入的 topic 的路由信息。
獲取到路由信息之后,首先就是先從本地 topicRouteTable
緩存,這個緩存就是一個本地的 topic 路由緩存。
/**
* 本地路由信息緩存
*/
private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
接著通過 topicRouteDataIsChange
判斷路由信息是否發生了變化,如果發生了變化,就可以修改,如果沒有發生變化,再判斷下路由信息是否需要更新,也就是 isNeedUpdateTopicRouteInfo
,這個方法后面會講解。
那如果需要更新,首先就是更新 brokerAddrTable
集合,上面 2.2.1 小節我們也說了路由信息里面是有 brokerDatas
屬性的,所以這里更新 brokerAddrTable 的邏輯就是直接往里面設置 broker 信息,這個 brokerAddrTable 是一個 Map 集合。
// brokerName -> (id, address),一個集群里面的 broker 的 brokerName 是一樣的
private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =new ConcurrentHashMap<String, HashMap<Long, String>>();
從這里的定義也可以看出,broker 主從集群共用一個 brokerName,依靠 brokerId 區分是主節點還是從節點。
回到源碼,繼續往下,接著就是更新 producerTable
的 topicPublishInfoTable
集合,這個集合代表生產者的訂閱 topic 的路由信息,以及更新 consuemerTable
的 topicSubscribeInfoTable
集合,這個集合代表消費者的訂閱信息,最后更新本地的 topicRouteTable
集合。
好了,這里就是更新 topic 路由信息的全部邏輯,可以看到的是主要更新了下面四個集合:
- topicRouteTable: topic 路由集合
- brokerAddrTable: broker 地址集合
- DefaultMQProducerImpl#topicPublishInfoTable: 生產者的 topic 路由信息
- DefaultMQPushConsumerImpl#RebalanceImpl#topicSubscribeInfoTable: 消費者下面的 topic 訂閱信息
所以這里也就回收了 2.1 小節的問題,就是 broker 地址是在哪里更新的。
2.2.4 getTopicRouteInfoFromNameServer 從 NameServer 中獲取指定的路由信息
/*** 從 NameServer 中拉取 topic 的路由信息* @param topic* @param timeoutMillis* @param allowTopicNotExist* @return* @throws MQClientException* @throws InterruptedException* @throws RemotingTimeoutException* @throws RemotingSendRequestException* @throws RemotingConnectException*/
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {// 構建獲取路由信息的請求頭GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();// 要獲取哪個 topic 的路由信息requestHeader.setTopic(topic);// 獲取請求命令對象,構建出來的請求 CODE 是 GET_ROUTEINFO_BY_TOPICRemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);// 同步遠程調用,這里傳入的 addr 為空,意思是會隨機選擇一個 NameServer 來發送請求RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);assert response != null;switch (response.getCode()) {case ResponseCode.TOPIC_NOT_EXIST: {// 找不到這個 topic 的路由信息if (allowTopicNotExist) {log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);}break;}case ResponseCode.SUCCESS: {// 成功獲取,對返回消息體解碼byte[] body = response.getBody();if (body != null) {return TopicRouteData.decode(body, TopicRouteData.class);}}default:break;}throw new MQClientException(response.getCode(), response.getRemark());
}
可以看到這里就是發送了一個同步請求,請求 Code 為 GET_ROUTEINFO_BY_TOPIC
,請求參數是 topic。
2.2.5 NameServer 處理 GET_ROUTEINFO_BY_TOPIC 請求
NameServer 會通過 getRouteInfoByTopic 方法來處理生產者和消費者的獲取路由配置請求。
/*** 處理根據 topic 查詢路由配置的請求* @param ctx* @param request* @return* @throws RemotingCommandException*/
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {// 創建響應結果final RemotingCommand response = RemotingCommand.createResponseCommand(null);// 獲取請求參數final GetRouteInfoRequestHeader requestHeader =(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);// 根據 topic 獲取對應的路由配置TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());if (topicRouteData != null) {// 順序消息相關的if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {String orderTopicConf =this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,requestHeader.getTopic());topicRouteData.setOrderTopicConf(orderTopicConf);}// 設置返回結果byte[] content = topicRouteData.encode();response.setBody(content);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}response.setCode(ResponseCode.TOPIC_NOT_EXIST);response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));return response;
}
可以看到上面是通過 pickupTopicRouteData
來獲取對應 topic 的路由配置的,同時獲取到配置之后如果 topic 允許順序消息,那么設置下順序 topic 的配置,這個后面再研究吧,現在來看下 pickupTopicRouteData
里面的邏輯,不過在進入這個方法之前還是先回顧下 TopicRouteData 里面有什么屬性。
上面設置了 orderTopicConf,下面這個 pickupTopicRouteData
就是要處理剩下的三個屬性。
/*** 獲取 topic 的路由配置* @param topic* @return*/
public TopicRouteData pickupTopicRouteData(final String topic) {// 初始化路由配置TopicRouteData topicRouteData = new TopicRouteData();boolean foundQueueData = false;boolean foundBrokerData = false;// 最重要的三個屬性: queueDatas(隊列信息)、brokerDatas(broker 信息)、filterServerTable(過濾服務器信息)Set<String> brokerNameSet = new HashSet<String>();List<BrokerData> brokerDataList = new LinkedList<BrokerData>();topicRouteData.setBrokerDatas(brokerDataList);HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();topicRouteData.setFilterServerTable(filterServerMap);try {try {// 加讀鎖this.lock.readLock().lockInterruptibly();// 從 topicQueueTable 中獲取這個 topic 下面的隊列信息List<QueueData> queueDataList = this.topicQueueTable.get(topic);if (queueDataList != null) {// 設置隊列信息topicRouteData.setQueueDatas(queueDataList);foundQueueData = true;Iterator<QueueData> it = queueDataList.iterator();// 遍歷所有的隊列信息,獲取這個 topic 的隊列會存儲到哪些 broker,簡單說就是獲取這個 topic 存儲到哪些 brokerwhile (it.hasNext()) {QueueData qd = it.next();brokerNameSet.add(qd.getBrokerName());}// 遍歷這些 brokerfor (String brokerName : brokerNameSet) {// 從 brokerAddrTable 中獲取對應的 broker 信息BrokerData brokerData = this.brokerAddrTable.get(brokerName);// 這里就是獲取到 broker 信息了if (null != brokerData) {// 相當于克隆一份數據到 brokerDataClone 中BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData.getBrokerAddrs().clone());// 添加到 brokerDataList 集合brokerDataList.add(brokerDataClone);foundBrokerData = true;// 獲取過濾服務器的地址for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {List<String> filterServerList = this.filterServerTable.get(brokerAddr);filterServerMap.put(brokerAddr, filterServerList);}}}}} finally {// 解鎖this.lock.readLock().unlock();}} catch (Exception e) {log.error("pickupTopicRouteData Exception", e);}log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);if (foundBrokerData && foundQueueData) {// 如果兩個都找到了,就返回結果return topicRouteData;}// 如果 broker 信息或者隊列信息沒找到,就返回空return null;
}
上面就是 pickupTopicRouteData
方法的代碼,這里面注釋也寫的比較清除,所以就簡單說下這幾個屬性的來源:
- queueDatas:從
topicQueueTable
獲取,這個集合存儲了 topic -> 隊列信息,因為一個 topic 可以存儲到多個 broker 下面,所以是Map<String, List< QueueData>>
類型。 - brokerDatas:從
brokerAddrTable
獲取,這個集合之前也說過了,就是存儲了 brokerName -> broker 地址信息,是Map<String, BrokerData>
類型集合,BrokerData 上面 2.2.1 小節也有介紹里面的一些屬性,這里不多說。還要注意的一點是代碼中是獲取了 queueDatas 之后,遍歷這些隊列,然后獲取這些隊列分布的 broker,再去獲取 brokerDatas 信息,所以是一個前后的關系。 - filterServerTable:從
filterServerTable
中獲取,這個集合存儲了 broker 地址 -> 這個 broker 的過濾服務器地址的映射關系,因為涉及到 broker 地址,所以獲取到 brokerDatas 之后再遍歷這個 broker 集群下面的所有 broker 地址來獲取過濾服務器信息。
2.2.6 topicRouteDataIsChange 判斷 topic 路由信息是否發生了變化
private boolean topicRouteDataIsChange(TopicRouteData olddata, TopicRouteData nowdata) {if (olddata == null || nowdata == null)return true;TopicRouteData old = olddata.cloneTopicRouteData();TopicRouteData now = nowdata.cloneTopicRouteData();Collections.sort(old.getQueueDatas());Collections.sort(old.getBrokerDatas());Collections.sort(now.getQueueDatas());Collections.sort(now.getBrokerDatas());return !old.equals(now);}
判斷 TopicRouteData 是否發生了變化就是判斷里面的幾個屬性是否不同,如果不同就說明有變化,直接用 equals 方法判斷就行了。
2.2.7 isNeedUpdateTopicRouteInfo 是否需要更新 topic 路由配置
是否需要更新就得看生產者和消費者里面對應的訂閱 topic 集合信息是否需要更新。
private boolean isNeedUpdateTopicRouteInfo(final String topic) {boolean result = false;{Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();while (it.hasNext() && !result) {Entry<String, MQProducerInner> entry = it.next();MQProducerInner impl = entry.getValue();if (impl != null) {result = impl.isPublishTopicNeedUpdate(topic);}}}{Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();while (it.hasNext() && !result) {Entry<String, MQConsumerInner> entry = it.next();MQConsumerInner impl = entry.getValue();if (impl != null) {result = impl.isSubscribeTopicNeedUpdate(topic);}}}return result;
}
從上面可以看到,生產者是遍歷了 producerTable
,每一個生產者都去使用 isPublishTopicNeedUpdate 方法判斷是否需要更新 topic 信息。而消費者則是遍歷 consumerTable
集合,一個一個消費者去判斷訂閱的這個 topic 下面的訂閱信息是否需要更新。
2.2.7.1 DefaultMQProducerImpl#isPublishTopicNeedUpdate 是否需要更新 topic 信息
@Override
public boolean isPublishTopicNeedUpdate(String topic) {TopicPublishInfo prev = this.topicPublishInfoTable.get(topic);return null == prev || !prev.ok();
}/*** 當前 topic 是否是可以使用的,如果說 topic 下面的隊列不為空,那么就說明還可以使用* @return*/
public boolean ok() {return null != this.messageQueueList && !this.messageQueueList.isEmpty();
}
對于生產者來說 topic 配置信息不可用當然就是沒辦法向 topic 發送消息,而我們之前也說過如果需要發送消息,topic 下面的消息隊列肯定是需要可用,所以這里的判斷就是如果這個 topic 下面的消息隊列為空,就是沒辦法發送消息,這種情況下就是不可用,需要更新 topic 信息。
2.2.7.2 DefaultMQPushConsumerImpl#isSubscribeTopicNeedUpdate 是否需要更新 topic 訂閱信息
對于消費者來說,消費者組里面的消費者會通過負載均衡分配到 topic 下面的某個隊列,所以如果這個消費者的負載均衡服務下面的 topicSubscribeInfoTable 集合沒有找到這個 topic 對應的隊列信息就說明需要更新。
/*** topic -> 消費隊列*/
protected final ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable =new ConcurrentHashMap<String, Set<MessageQueue>>();
不過大家要注意 MQClientInstance 是生產者和消費者獨有的,也就是說消費者找不到這個 topic 的訂閱信息,就說明需要更新,雖然 2.2.2 小節 updateTopicRouteInfoFromNameServer 是從生產者和消費者里面都獲取了 topic 然后遍歷所有 topic 去請求路由信息,但是實際上一個進程的生產者只會請求生產者的 topic 路由信息,消費者只會請求消費者的,不會串起來。因為 MQClientInstance 是生產者和消費者的私有屬性,會通過 clientId 去標識,在前面的文章我們也說過了當前版本生產 clientId 的時候是加了時間戳的,所以一般一個進程里面的生產者消費者啟動都會創建自己的 MQClientInstance,而這些定時任務又是 MQClientInstance 下面的。
回到 isSubscribeTopicNeedUpdate,既然有了上面的概念,那么下面就直接給出源碼就行,直接判斷是否在負載均衡服務的 topic 訂閱信息集合中即可。
@Override
public boolean isSubscribeTopicNeedUpdate(String topic) {Map<String, SubscriptionData> subTable = this.getSubscriptionInner();if (subTable != null) {if (subTable.containsKey(topic)) {// 判斷是否在負載均衡服務的 topic 訂閱信息集合中return !this.rebalanceImpl.topicSubscribeInfoTable.containsKey(topic);}}return false;
}
2.2.8 DefaultMQProducerImpl#updateTopicPublishInfo
當獲取到 topic 路由配置之后,轉成 TopicPublishInfo,然后通過 DefaultMQProducerImpl#updateTopicPublishInfo
設置到生產者的 topicPublishInfoTable
集合中。
/*** 更新生產者的 topic 路由信息* @param topic* @param info*/
@Override
public void updateTopicPublishInfo(final String topic, final TopicPublishInfo info) {if (info != null && topic != null) {// 更新 topic 下面的路由信息TopicPublishInfo prev = this.topicPublishInfoTable.put(topic, info);if (prev != null) {// 這里就是將之前的覆蓋了log.info("updateTopicPublishInfo prev is not null, " + prev.toString());}}
}
2.2.9 DefaultMQPushConsumerImpl#updateTopicSubscribeInfo
消費者也差不多,跟上面 2.2.7.2 是匹配的,也是更新 topicSubscribeInfoTable 就好了。
@Override
public void updateTopicSubscribeInfo(String topic, Set<MessageQueue> info) {// 獲取消費者訂閱的 topicMap<String, SubscriptionData> subTable = this.getSubscriptionInner();if (subTable != null) {// 如果訂閱的 topic 包括傳入的 topic,才更新下面的隊列信息if (subTable.containsKey(topic)) {this.rebalanceImpl.topicSubscribeInfoTable.put(topic, info);}}
}
2.2.10 topicRouteData2TopicPublishInfo 將 TopicRouteData 轉成 TopicPublishInfo
最后來看下 topicRouteData2TopicPublishInfo 方法,就是將 TopicRouteData 轉成生產者所需的 TopicPublishInfo,注釋寫的比較詳細了,直接看代碼。
/*** 將 TopicRouteData 轉化為 TopicPublishInfo,TopicPublishInfo 就是本地路由信息存儲類* @param topic* @param route* @return*/
public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {// TopicPublishInfo 是本地存儲的 topic 路由信息TopicPublishInfo info = new TopicPublishInfo();// 將路由信息設置到 TopicPublishInfo 里面info.setTopicRouteData(route);// 如果說 orderTopicConf 不為空if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {// 按照分號進行分割,這個屬性會標記 topic 在這個 broker 下面有多少個 MessageQueueString[] brokers = route.getOrderTopicConf().split(";");// 比如 broker-a:4;broker-b:4for (String broker : brokers) {String[] item = broker.split(":");// MessageQueue 數量int nums = Integer.parseInt(item[1]);for (int i = 0; i < nums; i++) {// 生成 MessageQueue,指定 MessaegQueue 的 topic、所屬的 broker、編號MessageQueue mq = new MessageQueue(topic, item[0], i);// 添加到 TopicPublishInfo 里面info.getMessageQueueList().add(mq);}}// 如果設置了 orderTopicConf 這個配置,那么說明這個 topic 是一個順序 topicinfo.setOrderTopic(true);} else {// 這里就不是順序消息了,獲取這個 topic 下面的隊列消息List<QueueData> qds = route.getQueueDatas();// 按照 brokerName 排序Collections.sort(qds);for (QueueData qd : qds) {// 遍歷每一個隊列,如果說這個隊列是可寫的,也就是可以接收消息的if (PermName.isWriteable(qd.getPerm())) {BrokerData brokerData = null;// 遍歷這個 topic 存儲的所有 broker 信息,一個 topic 是可以存儲到多個 broker 的for (BrokerData bd : route.getBrokerDatas()) {if (bd.getBrokerName().equals(qd.getBrokerName())) {// 獲取下這個 QueueData 所屬的 broker 信息brokerData = bd;break;}}// 沒找到,繼續處理下一個 QueueDataif (null == brokerData) {continue;}// 如果這個隊列所屬的 broker 集群里面沒有主節點if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {// 繼續處理下一個隊列的信息,因為消息發送和消費都是以主節點為主的continue;}// 這里就是根據 writeQueueNums 屬性來生成 MessageQueuefor (int i = 0; i < qd.getWriteQueueNums(); i++) {// 生成 MessageQueue,指定 MessaegQueue 的 topic、所屬的 broker、編號MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);// 添加到 TopicPublishInfo 中info.getMessageQueueList().add(mq);}}}// 設置順序 topic 屬性為 falseinfo.setOrderTopic(false);}return info;
}
還是要注意下,非順序 topic 前提下,遍歷 topic 下面的隊列,因為隊列可以分配到不同的 brokerName 集群,所以需要遍歷所有的隊列,然后判斷這些隊列分配到的 brokerName 集群中有沒有主節點,怎么判斷呢?topic 路由配置里面的 brokerDatas 就存儲了這個 topic 分配的 broker 信息,所以遍歷 QueueData,然后從 brokerDatas 中找到 QueueData 里面的 brokerName 集群,如果沒有主節點,就不處理,因為生產者寫消息是寫給主節點的。
如果有主節點,就 根據 QueueData#writeQueueNums 屬性來生成 MessageQueue,指定這個隊列的 topic、brokerName、下標,接著添加到 TopicPublishInfo#messageQueueList 屬性中,生產者生產消息的時候可以直接調用 send 方法指定里面的隊列去發送消息。
3. cleanOfflineBroker 清除下線的 broker
// 3.初始延時 1s,之后每隔 30s 執行一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {// 清除下線的 brokerMQClientInstance.this.cleanOfflineBroker();// 給所有 broker 發送心跳MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();} catch (Exception e) {log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);}}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
這個定時任務會初始延時 1s,之后每隔 30s 執行一次,主要做兩件事:
- 清除下線的 broker
- 給所有 broker 發送心跳
這個 sendHeartbeatToAllBrokerWithLock 在文章 【RocketMQ 生產者和消費者】- 生產者啟動源碼-上報生產者和消費者心跳信息到 broker(3) 有說過,所以這里就不說 sendHeartbeatToAllBrokerWithLock 了,來看下第一個 cleanOfflineBroker 是如何清除下線的 broker 的。
/*** 定時清除下線的 broker*/
private void cleanOfflineBroker() {try {// 加鎖if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS))try {ConcurrentHashMap<String, HashMap<Long, String>> updatedTable = new ConcurrentHashMap<String, HashMap<Long, String>>();Iterator<Entry<String, HashMap<Long, String>>> itBrokerTable = this.brokerAddrTable.entrySet().iterator();while (itBrokerTable.hasNext()) {// 下面會遍歷所有 brokerName 下面的所有主從節點Entry<String, HashMap<Long, String>> entry = itBrokerTable.next();String brokerName = entry.getKey();HashMap<Long, String> oneTable = entry.getValue();// 克隆一份,避免下面邊處理,上面被修改了HashMap<Long, String> cloneAddrTable = new HashMap<Long, String>();cloneAddrTable.putAll(oneTable);Iterator<Entry<Long, String>> it = cloneAddrTable.entrySet().iterator();while (it.hasNext()) {Entry<Long, String> ee = it.next();// 遍歷所有的 broker 地址String addr = ee.getValue();// 判斷這個 broker 地址是否還存在 topicRouteTable 里面任意一個 topic 的路由信息中// 這個 topicRouteTable 在 startScheduledTask 方法中會每隔 30s 拉取一次 topic 路由信息來更新// 所以可以直接通過這個集合去判斷 brokerAddrTable 里面的地址有沒有過期if (!this.isBrokerAddrExistInTopicRouteTable(addr)) {// 不存在就刪掉it.remove();log.info("the broker addr[{} {}] is offline, remove it", brokerName, addr);}}// 如果為空,那么刪掉這個 brokerName 下面的所有地址if (cloneAddrTable.isEmpty()) {itBrokerTable.remove();log.info("the broker[{}] name's host is offline, remove it", brokerName);} else {// 否則就將更新之后的集合重新設置進去updatedTable.put(brokerName, cloneAddrTable);}}// 更新完成了,如果不為空,更新 brokerAddrTableif (!updatedTable.isEmpty()) {this.brokerAddrTable.putAll(updatedTable);}} finally {this.lockNamesrv.unlock();}} catch (InterruptedException e) {log.warn("cleanOfflineBroker Exception", e);}
}
邏輯就是遍歷 brokerAddrTable
里面的所有 broker 地址,這個集合在第 2 節定時任務就維護了,所以直接遍歷就行,接著使用 isBrokerAddrExistInTopicRouteTable
方法判斷這個 broker 是否存在于 topicRouteTable 集合中,topicRouteTable 就是 topic 路由集合,如果說 TopicRouteData#brokerDatas 里面不包括當前 broker 地址,那么就說明這個地址失效了,直接刪掉。
/*** 判斷 addr 是否在 topic 路由配置中* @param addr* @return*/
private boolean isBrokerAddrExistInTopicRouteTable(final String addr) {// 遍歷 topic 路由集合Iterator<Entry<String, TopicRouteData>> it = this.topicRouteTable.entrySet().iterator();while (it.hasNext()) {Entry<String, TopicRouteData> entry = it.next();TopicRouteData topicRouteData = entry.getValue();// 獲取 topic 分布的 broker 信息List<BrokerData> bds = topicRouteData.getBrokerDatas();for (BrokerData bd : bds) {// 遍歷所有路由信息, 然后一個一個判斷 broker 地址是否在 topic 路由配置中if (bd.getBrokerAddrs() != null) {boolean exist = bd.getBrokerAddrs().containsValue(addr);if (exist)return true;}}}return false;
}
4. persistAllConsumerOffset 持久化消費進度
// 4.初始化延遲 10s 執行,每隔 5s 執行一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {// 持久化消費者偏移量,就是消費進度// - 如果是廣播模式持久化到本地// - 如果是集群默認就推送到 brokerMQClientInstance.this.persistAllConsumerOffset();} catch (Exception e) {log.error("ScheduledTask persistAllConsumerOffset exception", e);}}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
消費者消息拉取的時候會根據偏移量拉取,所以這里會定時持久化消費者偏移量到本地或者 broker。
/*** 持久化 consumerTable 消費者偏移量集合*/
private void persistAllConsumerOffset() {Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();while (it.hasNext()) {// 遍歷所有消費者,廣播模式下持久化到本地,集群模式下推送到 broker 端Entry<String, MQConsumerInner> entry = it.next();MQConsumerInner impl = entry.getValue();// 依次調用每一個 MQConsumerInner#persistConsumerOffset 方法去持久化impl.persistConsumerOffset();}
}
這里就是遍歷所有消費者,然后依次調用每一個 MQConsumerInner#persistConsumerOffset 方法去持久化。
/*** 持久化偏移量*/
@Override
public void persistConsumerOffset() {try {// 首先確定當前 Consumer 服務狀態是正常的this.makeSureStateOK();// 獲取所有的隊列Set<MessageQueue> mqs = new HashSet<MessageQueue>();Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();mqs.addAll(allocateMq);// 持久化所有隊列的偏移量到本地或者遠程 brokerthis.offsetStore.persistAll(mqs);} catch (Exception e) {log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);}
}
4.1 LocalFileOffsetStore#persistAll
/*** 持久化隊列的消費偏移量到本地* @param mqs*/
@Override
public void persistAll(Set<MessageQueue> mqs) {if (null == mqs || mqs.isEmpty())return;// 創建一個偏移量的序列化包裝類,里面就是封裝了 offsetTableOffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {if (mqs.contains(entry.getKey())) {// 獲取偏移量AtomicLong offset = entry.getValue();// 存到 offsetSerializeWrapper 中offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);}}// 通過 JSON.toJSON 轉成 JSON 字符串,同時將消費者偏移量持久化到本地String jsonString = offsetSerializeWrapper.toJson(true);if (jsonString != null) {try {// 存儲路徑默認是: ${user.home}/.rocketmq_offsets/#{clientId}/#{groupName}/offsets.jsonMixAll.string2File(jsonString, this.storePath);} catch (IOException e) {log.error("persistAll consumer offset Exception, " + this.storePath, e);}}
}
持久化到本地的邏輯就是將消息隊列的偏移量存儲到本地 ${user.home}/.rocketmq_offsets/#{clientId}/#{groupName}/offsets.json
文件中。
4.2 RemoteBrokerOffsetStore#persistAll
/*** 持久化所有的 MQ 的消費偏移量到遠程 broker* @param mqs 當前消費者需要消費的所有隊列*/
@Override
public void persistAll(Set<MessageQueue> mqs) {// 如果隊列為空,直接返回if (null == mqs || mqs.isEmpty())return;// 未上報的 MQ 集合final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {MessageQueue mq = entry.getKey();AtomicLong offset = entry.getValue();if (offset != null) {if (mqs.contains(mq)) {try {// 遍歷 offsetTable,如果隊列在 mqs 中,就將這個隊列的偏移量信息上報偏移量到 broker// 這個偏移量會被存儲到 broker 的 ConsumerOffsetManager#offsetTable 中,但是只是存到內存,但是最終會// 持久化到 ${user.home}/store/config/consumerOffset.json 文件this.updateConsumeOffsetToBroker(mq, offset.get());log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",this.groupName,this.mQClientFactory.getClientId(),mq,offset.get());} catch (Exception e) {log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);}} else {// 這些就是存在 offsetTable 中但是不在 mqs 中的unusedMQ.add(mq);}}}if (!unusedMQ.isEmpty()) {// 從 offsetTable 中刪除那些不需要上報的 MQfor (MessageQueue mq : unusedMQ) {this.offsetTable.remove(mq);log.info("remove unused mq, {}, {}", mq, this.groupName);}}
}
遠程持久化偏移量的方法會遍歷 offsetTable
中的隊列,如果隊列在傳入的 mqs
中也存在,就調用 updateConsumeOffsetToBroker
方法上報偏移量到 broker,持久化到 ${user.home}/store/config/consumerOffset.json
文件中。
傳入的 mqs 是從 RebalanceImpl 重平衡服務獲取的,所以如果 offsetTable 里面的隊列不在 mqs 中就代表過期了,不需要這個隊列了,因此后面如果 unusedMQ 集合不為空,就會將這部分隊列從 offsetTable 中刪掉。
下面來接著看下 updateConsumeOffsetToBroker 是如何請求 broker 存儲偏移量的。
/*** Update the Consumer Offset in one way, once the Master is off, updated to Slave, here need to be optimized.* 上報消費偏移量到 broker*/
private void updateConsumeOffsetToBroker(MessageQueue mq, long offset) throws RemotingException,MQBrokerException, InterruptedException, MQClientException {updateConsumeOffsetToBroker(mq, offset, true);
}/*** Update the Consumer Offset synchronously, once the Master is off, updated to Slave, here need to be optimized.* 同步更新消費者偏移量,一旦主節點宕機,就將更新操作切換到從節點,此處需要進行優化*/
@Override
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,MQBrokerException, InterruptedException, MQClientException {// 從 brokerAddrTable 中根據隊列里面設置的 brokerName 獲取 broker 地址FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);if (null == findBrokerResult) {// 找不到就從 NameServer 拉取 topic 路由消息,同時更新本地的 brokerAddrTablethis.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());// 再次從 brokerAddrTable 中根據隊列里面設置的 brokerName 獲取 broker 地址findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, false);}// 獲取到了if (findBrokerResult != null) {// 構建請求頭UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();// 設置隊列的 topicrequestHeader.setTopic(mq.getTopic());// 設置消費者組requestHeader.setConsumerGroup(this.groupName);// 設置隊列 IDrequestHeader.setQueueId(mq.getQueueId());// 設置消費偏移量requestHeader.setCommitOffset(offset);if (isOneway) {// 單向發送this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);} else {// 同步發送this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);}} else {throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);}
}/*** 向 broker 發送單向更新消費偏移量的請求* @param addr* @param requestHeader* @param timeoutMillis* @throws RemotingConnectException* @throws RemotingTooMuchRequestException* @throws RemotingTimeoutException* @throws RemotingSendRequestException* @throws InterruptedException*/
public void updateConsumerOffsetOneway(final String addr,final UpdateConsumerOffsetRequestHeader requestHeader,final long timeoutMillis
) throws RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException,InterruptedException {// 請求 Code 為 UPDATE_CONSUMER_OFFSETRemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);// 發送單向請求,走 VIP 通道,就是異步發送,不求返回結果this.remotingClient.invokeOneway(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
}
可以看到首先就是從 brokerAddrTable 中根據隊列里面設置的 brokerName 獲取主節點的 broker 地址,如果找不到就從 NameServer 拉取 topic 路由消息,同時更新本地的 brokerAddrTable。
獲取到 broker 地址之后,構建請求頭,在請求頭中設置隊列 ID、消費者組、消費偏移量、隊列 topic,然后使用 VIP 通道(10909 端口)單向發送,請求 code 是 UPDATE_CONSUMER_OFFSET。
4.3 broker 處理 UPDATE_CONSUMER_OFFSET 請求
broker 會通過 updateConsumerOffset 處理 Consumer 發送的更新隊列的消費偏移量的請求。
/*** 更新消費者偏移量* @param ctx* @param request* @return* @throws RemotingCommandException*/
private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {// 創建命令行響應結果final RemotingCommand response =RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class);// 獲取消費者請求final UpdateConsumerOffsetRequestHeader requestHeader =(UpdateConsumerOffsetRequestHeader) request.decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);// 調用 consumerOffsetManager@commitOffset 來存儲偏移量this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getConsumerGroup(),requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;
}
可以看到里面也是掉用了 commitOffset
來處理隊列偏移量。
/*** 存儲 Consumer 上報過來的消費隊列偏移量* @param clientHost Consumer 客戶端的地址* @param group Consumer 消費者組* @param topic topic 主題* @param queueId 隊列 ID* @param offset 消費偏移量*/
public void commitOffset(final String clientHost, final String group, final String topic, final int queueId,final long offset) {// topic@groupString key = topic + TOPIC_GROUP_SEPARATOR + group;// 存入偏移量this.commitOffset(clientHost, key, queueId, offset);
}/*** 提交偏移量,broker 使用 offsetTable 存儲了消費者上報過來的偏移量,對應了 ${user.home}/store/config/consumerOffset.json* 這里是將偏移量存入緩存中,還沒有持久化, 什么時候初始化呢? 在 broker 會有一個定時任務每隔 5s 就持久化偏移量到上面這個文件中* @param clientHost 客戶端地址* @param key 緩存key* @param queueId 隊列id* @param offset 消費偏移量*/
private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {// 獲取 topic@group 下面對應的所有隊列的消費偏移量// topic@group -> (queueId, offset),一個 topic 可以被多個消費者組下面的多個消費去消費,同時每一個消費者都會分配到這個 topic// 下面的幾個隊列,而這個 table 不會記錄隊列是哪一個消費者消費的的,只會記錄這個隊列被消費到哪個偏移量了ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);if (null == map) {// 如果為空,就是第一次添加map = new ConcurrentHashMap<Integer, Long>(32);map.put(queueId, offset);this.offsetTable.put(key, map);} else {// 這里就是更新偏移量邏輯Long storeOffset = map.put(queueId, offset);if (storeOffset != null && offset < storeOffset) {// 如果更新之后的偏移量比原來的還小,可能有問題,因為消費肯定是往后消費的, 這里也是打印下日志log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);}}
}
這里意思就是將生產者消費者上報的偏移量存到 broker 的 offsetTable,注意這里的偏移量是下一條將要拉取的消息在 ConsumerQueue 中的索引 index,這里更新也只是更新了 offsetTable 緩存,在 broker 會有一個定時任務每隔 5s 就持久化偏移量到 ${user.home}/store/config/consumerOffset.json
這個文件中。
{"offsetTable": {"TopicTest@consumer_group": {0: 6,1: 7,2: 9,3: 8},"SQL92Test@consumer_group": {0: 3,1: 2,2: 2,3: 3},"TopicTest@testGroupConsumer": {0: 4,1: 4,2: 6,3: 6},"%RETRY%consumer_group@consumer_group": {0: 0},"TopicMasterSlave@testGroupConsumer": {1: 0,2: 0,3: 0},"%RETRY%testGroupConsumer@testGroupConsumer": {0: 0}}
}
5. adjustThreadPool 調整消費者線程數
// 5.初始化延遲 1min 執行,每隔 1min 執行一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {// 調整消費者的線程數量MQClientInstance.this.adjustThreadPool();} catch (Exception e) {log.error("ScheduledTask adjustThreadPool exception", e);}}
}, 1, 1, TimeUnit.MINUTES);
這里是調整消費者線程數,屬于動態調整,會根據還有多少消息沒有拉取來調整消費者線程池數。
/*** TODO 調整消費者線程池的線程個數*/
public void adjustThreadPool() {// 計算消費者分配的隊列下面還有多少消息沒有消費long computeAccTotal = this.computeAccumulationTotal();// 閾值是 100000long adjustThreadPoolNumsThreshold = this.defaultMQPushConsumer.getAdjustThreadPoolNumsThreshold();long incThreshold = (long) (adjustThreadPoolNumsThreshold * 1.0);long decThreshold = (long) (adjustThreadPoolNumsThreshold * 0.8);if (computeAccTotal >= incThreshold) {// 如果沒有消費的消息大于閾值, 增加核心線程數this.consumeMessageService.incCorePoolSize();}if (computeAccTotal < decThreshold) {// 如果沒有消費的消息小于閾值, 減少核心線程數this.consumeMessageService.decCorePoolSize();}
}
但是 incCorePoolSize
和 decCorePoolSize
這兩方法都是空實現,所以這個方法就不用管了。
6. 小結
好了,這篇文章我們講解了 MQClientInstance 啟動的時候啟動的 5 個定時任務,最后我們簡單總結下這幾個定時任務,分別是:
- 每隔 2min 定時拉取 namesrv 地址。
- 每隔 30s 定時拉取最新的 topic 路由信息。
- 每隔 30s 定時清除無效的 broker,同時向所有 broker 發送心跳。
- 每隔 5s 嘗試持久化消費者偏移量,就是消費進度。
- 每隔 1min 調整一次消費者的線程數。
如有錯誤,歡迎指出!!!