目錄
前言
獲取源碼
總概論
生產者實例
源碼
A-01:設置生產者組名稱
A-02:生產者服務啟動
B-01:初始化狀態
B-02:該方法再次對生產者組名稱進行校驗
B-03:判斷是否為默認生產者組名稱
B-04: 該方法是為了實例化MQClientInstance對象,mq客戶端對象實例
B-05: 該方法就是將當前生產者對象注冊到mqClientInstance中的producerTable集合中,并且生產者組名稱作為key
B-06: 啟動相關核心服務以及開啟一系列定時任務(核心邏輯)
1. 開啟請求-響應通道- this.mQClientAPIImpl.start();
2. 開啟拉動式服務- this.mQClientAPIImpl.start();
3. 開啟負載均衡服務- this.rebalanceService.start();
4. 開啟推送服務- this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
5. 啟動各種計劃任務- this.startScheduledTask();
a. 啟動定時任務獲取MQ注冊中心nameServer的地址- MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
b. 定時從nameServer拉取topic信息到本地存儲 -????????????????? ? ? ??????????????MQClientInstance.this.updateTopicRouteInfoFromNameServer();
c. 定時清除離線的broker服務并給所有在線的broker發送心跳
d. 定時持久化消費偏移量數據
e. 定時調整消費者消息的線程池數量
B-07:初始化topic路由信息、topic訂閲信息以及topic端點映射信息
B-08:開啟定時監測broker故障信息任務
B-09:發送心跳給所有的broker服務
B-10:開啟定時掃描異步請求響應任務
A-03:開啟監控和處理同步發送和異步發送操作的守護線程
A-04:開啟消息軌跡和發送機制
總結
展望
前言
大概一年半前自己寫了一篇《云原生》一文搞懂RocketMQ隊列概述,這篇對rocketmq的相關概念和使用方法進行了整理概述,就像結尾說的都太局限于表面,簡單使用還能將就,但一出現問題自己也很難排查,為了邁向技能的下一個階段,還得是要讀源碼,學習大佬們的編碼風格和技巧,對于使用mq以及排除問題也會更得心應手。因為最近一年自己的工作充滿了波折,讓自己沒法靜下心來學習整理,雖然現在也好不了多少,但可算能回歸本心。本章篇幅比較長,將近萬字,博主也是自己讀源碼一步一步跟蹤的,所以盡量想描述得通俗易懂一些。
獲取源碼
首先我們從github上拉取rocketmqd的源碼鏈接到本地,使用idea打開。
源碼地址:https://github.com/apache/rocketmq
目前最新版本為:5.2.0
那么我們在idea上切換分支為 release-5.2.0
注:rocketmq5.x與4.x官方改動的東西比較多,盡量使用一直的版本,具體差異可查看官網,這里只對源碼邏輯進行分析
總概論
我們知道rocketmq的組成需要四大模塊構成,缺一不可
- nameserver mq注冊中心(狀態管理)
- broker mq的服務端(核心)
- producer 生產者
- consumer 消費者
本章我們先從應該大家接觸最多的生產者開始學習源碼吧。
生產者實例
- 在idea的rocketmq源碼中找到 example 模塊,這個模塊中都是官方給出的簡單案例
- 然后找到simple 包下面的 Producer類打開
- 然后在producer類中配置自己的mq的地址,topic以及tag就能成功啟動生產者并且發送消息
注意:這里成功啟動的前提是必須提前啟動了mq的nameserver服務和broker服務才能成功,若沒有可不用啟動,直接跳過看下面源碼
源碼
根據上面簡單生產者實例可知,生產者端的兩大核心就是,啟動生產者和發送消息,分別對應下面兩行代碼。看似簡單的兩行其實里面的功能邏輯很強大。
-
producer.start();
-
producer.send(msg);
生產者包含4中狀態:
-
CREATE_JUST 服務剛剛創建,尚未啟動
-
RUNNING 服務運行中
-
SHUTDOWN_ALREADY 服務已關閉
-
START_FAILED 啟動出錯
按照順序,我們從 生產者的啟動開始
public void start() throws MQClientException {//A-01:設置生產者組名稱this.setProducerGroup(withNamespace(this.producerGroup));//A-02:生產者服務啟動this.defaultMQProducerImpl.start();//A-03:開啟監控和處理同步發送和異步發送操作的守護線程if (this.produceAccumulator != null) {this.produceAccumulator.start();}//A-04:開啟消息軌跡和發送機制if (null != traceDispatcher) {try {traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());} catch (MQClientException e) {logger.warn("trace dispatcher start failed ", e);}}
}
A-01:設置生產者組名稱
- 該方法中顧名思義主要用于設置生產者組的名稱
- withNamespace()進入該方法發現,其實對生產者組的名稱就行各種非空校驗和長度校驗,最后根據固定格式拼接名稱后返回。(對于開源組件大佬,校驗方式也是和我們無異的)
A-02:生產者服務啟動
該方法為本次的啟動核心方法,我們直接深入了解下其內部實現。
方法邏輯太長我們進行分段拆分來解析
public void start(final boolean startFactory) throws MQClientException {switch (this.serviceState) {//B-01:初始化狀態case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;//B-02:校驗this.checkConfig();//B-03:生產者組名設置if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {this.defaultMQProducer.changeInstanceNameToPID();}//...
B-01:初始化狀態
因為現在還是正在啟動中,所以狀態還是默認未啟動狀態,那么直接進入第一個case邏輯中,進入后里面把狀態至為啟動失敗,我認為這是一種防御性編碼,并且防止未成功啟動的生產者被重復啟動
B-02:該方法再次對生產者組名稱進行校驗
B-03:判斷是否為默認生產者組名稱
????????前面可知我們已經成功設置自定義名稱,所以直接進入if中
- changeInstanceNameToPID(),該方法就設置實例名稱,進入方法可以看到名稱的生成規則,this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();
當前運行的虛擬機的名稱截取拼接上當前納米時間戳,保證唯一性
public void start(final boolean startFactory) throws MQClientException {/......///B-04:該方法是為了實例化MQClientInstance對象,mq客戶端對象實例this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);//B-05:注冊生產者boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);/....../
B-04: 該方法是為了實例化MQClientInstance對象,mq客戶端對象實例
- 內部首先生成一個唯一的clientId,其組成包含ip地址與之前生成的實例名稱instanceName組成,然后new 了一個MQClientInstance對象并設置對應屬性。
- 將clientId作為key維護到一個Map對象中,private final ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable;
注:MQClientInstance對象,該對象非常重要,因為生產者和消費者都在使用
進入該對象我們可以發現,里面維護了兩個Map集合,就是分別存儲當前客戶端的生產者和消費者的對象數據
private final ConcurrentMap<String, MQProducerInner> producerTable ;
private final ConcurrentMap<String, MQConsumerInner> consumerTable ;
B-05: 該方法就是將當前生產者對象注冊到mqClientInstance中的producerTable集合中,并且生產者組名稱作為key
public void start(final boolean startFactory) throws MQClientException {switch (this.serviceState) {/....///B-06: 啟動相關核心服務以及開啟一系列定時任務if (startFactory) {mQClientFactory.start();}/.../
B-06: 啟動相關核心服務以及開啟一系列定時任務(核心邏輯)
1. 開啟請求-響應通道- this.mQClientAPIImpl.start();
2. 開啟拉動式服務- this.mQClientAPIImpl.start();
3. 開啟負載均衡服務- this.rebalanceService.start();
4. 開啟推送服務- this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
- 這個方法是否有點眼熟,沒錯這就是我們最開始調用的啟動方法A-2,參數傳的false,說明上面if代碼塊中startFactory=false,則不進入B-06的代碼塊中
- 并且A-2代碼塊方法中,因為第一次進入時狀態已經從CREATE_JUST變更為START_FAILED,所以也不會再次進入第一個case中
- 閱讀后續代碼可知,核心就是調用了 this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); 向所有Broker服務發送一次心跳(具體后面會詳解)
5. 啟動各種計劃任務- this.startScheduledTask();
所有任務都是使用Executors線程池創建一個單獨的的單線程定時任務實現,如下格式
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "MQClientFactoryScheduledThread"));
//....
this.scheduledExecutorService.scheduleAtFixedRate(() -> {try {//業務邏輯} catch (Exception e) {log.error("ScheduledTask fetchNameServerAddr exception", e);}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
a. 啟動定時任務獲取MQ注冊中心nameServer的地址- MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
首次啟動延遲時間:2s
定時間隔時間:2m
mQClientAPIImpl對象是否眼熟,沒錯就是上面B-06-1啟動的服務,所以該服務必須在任務執行之前啟動,查看源碼如此
- 深入方法中會發現其實就是獲取地址處理后存儲在一個List集合中,為什么使用集合,我認為如果是集群那就就會有多條地址存在。 private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<>();
- 繼續深入會發現有Netty的身影,用于服務間遠程通信,這里不再研究。
- private final ConcurrentMap<String /* addr */, ChannelWrapper> channelTables;
- 該Map就是用nameserver地址作為key,而value為ChannelWrapper對象,該對象內部就使用了netty框架 包中的對象,一個地址對應一個通道封裝器。但是該邏輯中并沒有使用put操作,只是get獲取。
b. 定時從nameServer拉取topic信息到本地存儲 -????????????????? ? ? ??????????????MQClientInstance.this.updateTopicRouteInfoFromNameServer();
首次啟動延遲時間:10ms
定時間隔時間:30s
- 深入方法內部可知,其實就是分別對producerTable與consumerTable的map進行操作遍歷,取出對象里面的topic名稱,由前面B-04中可知,分別用于存儲生產者對象和消費者對象信息
- 再將topic名稱的set集合進行遍歷去遠程獲取nameserver中的topic的路由詳細信息,并將信息存儲在另一個map對象中。作用: 用于管理和查詢主題的路由信息,幫助生產者和消費者確定消息的發送和接收路徑。
- private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<>();
c. 定時清除離線的broker服務并給所有在線的broker發送心跳
????????MQClientInstance.this.cleanOfflineBroker(); 清除離線的broker
????????MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); 給所有的broker發送心跳
首次啟動延遲時間:1s
定時間隔時間:30s
- 清除離線的broker,查看源碼可知道,大概意思為首先從private final ConcurrentMap<String, HashMap<Long, String>> brokerAddrTable = new ConcurrentHashMap<>(); map中
- 獲取所有broker的地址數據,然后進行遍歷,
- 在遍歷中取出 topicRouteTable,該map存放的是topic的對象信息
- 再對topic map的values進行遍歷,取出topic信息對象中存儲的對應broker集合,
- 判斷上面的brokerAddrTable中的broker是否在topic維護的broker集合中,沒有則清除
d. 定時持久化消費偏移量數據
? ? ?MQClientInstance.this.persistAllConsumerOffset();
首次啟動延遲時間:10s
定時間隔時間:5s
同樣的維護了一個Map對象:
private ConcurrentMap<MessageQueue, ControllableOffset> offsetTable;
key則為消息隊列對象
- 深入源碼可知,它的消費者持久化實現方式有三種
- lite pull
- mp pull
- mp push
e. 定時調整消費者消息的線程池數量
????MQClientInstance.this.adjustThreadPool();
首次啟動延遲時間:1m
定時間隔時間:1m
public void start(final boolean startFactory) throws MQClientException {switch (this.serviceState) {//...//B-07:初始化topic路由信息、topic訂閲信息以及topic端點映射信息this.initTopicRoute();//B-08:開啟定時監測broker故障信息任務this.mqFaultStrategy.startDetector();//...
B-07:初始化topic路由信息、topic訂閲信息以及topic端點映射信息
- 深入源碼可知,首先獲取開發者自定義的topic集合,然后分別處理成MQ要求的格式newTopic,然后創建TopicPublishInfo對象,用于存儲topic訂閱信息newTopic作為key,同樣最后放入map中
private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable;
- 查看TopicPublishInfo對象可知,對象里面包含了TopicRouteData對象,我們知道這個對象在上面定時器B-06-5-b中出現過用于存儲topic路由信息,并且存儲在topicRouteTable map中
- 所以在本方法中也會通過newTopic去遠程從nameserver中拉去TopicRouteData信息,設置到TopicPublishInfo對象中,同樣也會對比topic新獲取的TopicRouteData與原來定時器存儲的topicRouteTable中的是否有變化,有則更新
- 有變化同時還會更新,上面定時器B-06-5-c中出現的brokerAddrTable map,更新broker的地址信息
- 同時更新topic端點映射信息-記錄每個主題的消息隊列與 Broker 之間的映射?
- private final ConcurrentMap<String/* Topic */, ConcurrentMap<MessageQueue, String/*brokerName*/>> topicEndPointsTable;這是一個嵌套map,因為一個topic可能對應多個broker,那么消息隊列也會是對應多個broker, 可以幫助管理和均衡負載,確保消息被分布到不同的 Broker 上。
B-08:開啟定時監測broker故障信息任務
深入源碼可知,里面維護了一個定時任務,定時監測 Broker 的故障詳細信息
首次啟動延遲時間:3s
定時間隔時間:3s
- 同時也維護了一個map,用于存儲每一個broker的 故障詳細信息,包括故障時間、故障持續時間和可用狀態
????????private final ConcurrentHashMap<String, FaultItem> faultItemTable;
- 邏輯處理中還會去查詢brokerAddrTable中是否還存在當前broker地址信息,不存在則從faultItemTable中移除,然后再去監測broker服務是否可用,若可用則將可用狀態 設置為true
public void start(final boolean startFactory) throws MQClientException {//...//B-09:發送心跳給所有的broker服務this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();//B-10:開啟定時掃描異步請求響應任務RequestFutureHolder.getInstance().startScheduledTask(this);}
B-09:發送心跳給所有的broker服務
????????發送心跳其實在上面定時器B-06-5-c中已經出現過了,但是沒有深入了解,那么定時器中既然已經在發送心跳了,為什么生產者啟動最后還要發送呢?
- 定時任務的作用:定時任務確保客戶端在運行過程中定期發送心跳,保持與 Broker 的連接。
- 啟動時的心跳:生產者在啟動完成時立即發送心跳,以確保初始化成功、快速檢測連接狀態并更新路由信息。
- 同樣的心跳機制中也維護了一個map, 用于記錄和管理每個 Broker 的心跳信息,private final ConcurrentMap<String, Integer> brokerAddrHeartbeatFingerprintTable;
- 其中value值稱為心跳指紋, MQ通過比較當前心跳指紋和上次記錄的指紋,可以判斷 Broker 是否正常工作
B-10:開啟定時掃描異步請求響應任務
????????深入源碼可知,里面維護了一個定時任務,定時掃描MQ存儲的生產者發布的異步請求以及響應的信息,幫助MQ實現異步請求的超時、回調和狀態管理,增強系統的異步處理能力。
次啟動延遲時間:3s
定時間隔時間:1s
同樣的是維護了一個map數據用于存儲異步請求以及響應的信息:
private ConcurrentHashMap<String, RequestResponseFuture> requestFutureTable;
那么key為請求時生成的唯一標識,value為RequestResponseFuture對象則記錄了請求信息、超時時間、響應信息、回調信息等,mq根據記錄的信息做出響應的處理。
- 源碼內部邏輯有一個地方就判斷了isTimeout是否請求超時,為true則拋出異常
該map requestFutureTable在本次啟動中只是使用,具體在什么地方存儲的,應該會在后續的生產者發送消息源碼中再次出現,本次啟動使用到的requestFutureTable應該都是沒數據的。日常開發看似只是簡單的調用了發送消息的api方法,而mq內部則做了許多復雜的處理來保證消息的可靠性和高可用性
A-03:開啟監控和處理同步發送和異步發送操作的守護線程
- guardThreadForSyncSend.start();
- guardThreadForAsyncSend.start();
????????這些線程中,可以實現具體的監控和處理邏輯,例如檢測發送超時、重試失敗的發送操作等。 并且這些線程在 JVM 退出時會自動終止
A-04:開啟消息軌跡和發送機制
通過收集消息軌跡信息,可以了解消息在 RocketMQ 中的流轉路徑,幫助系統監控和故障排查。
總結
對于RocketMQ我們都知道生產者會從nameserver中拉取數據,并且會在本地存儲,就算nameserver服務意外離線了,也能通過本地保存的數據進行消息通信。那么如何遠程拉取數據以及心跳監測,如何在本地存儲,我想大家通過對上面start啟動源碼的學習,疑惑都解開了吧。
- 數據更新以及心跳無非就是通過一系列的定時器在不斷請遠程請求
- 本地存儲則是使用已 table為后綴命名的Map集合來存儲的
對本章源碼中遇到的定時器和table進行了整理,方便大家快速記憶
展望
本章內容比較多,博主也是肝了幾天才完成,希望對大家都有所收獲,下一章我們繼續對生產者send消息源碼進行學習!