【RocketMQ Broker 相關源碼】- broker 啟動源碼(2)

文章目錄

  • 1. 前言
  • 2. 創建 DefaultMessageStore
  • 3. DefaultMessageStore#load
    • 3.1 CommitLog#load
    • 3.2 loadConsumeQueue 加載 ConsumeQueue 文件
    • 3.3 創建 StoreCheckpoint
    • 3.4 indexService.load 加載 IndexFile 文件
    • 3.5 recover 文件恢復
    • 3.6 延時消息服務加載
  • 4. registerProcessor 注冊 code 處理器
  • 5. ConfigManager.persist
  • 6. protectBroker 保護 broker
  • 7. fetchNameServerAddr 定時任務定時拉取 NameServer 地址
  • 8. 小結


本文章基于 RocketMQ 4.9.3

1. 前言

  • 【RocketMQ】- 源碼系列目錄
  • 【RocketMQ Broker 相關源碼】- broker 啟動源碼(1)

在前面【RocketMQ Broker 相關源碼】- broker 啟動源碼(1)這篇文章中,我們對 broker 啟動的源碼進行了簡要梳理,著重介紹了 broker 啟動過程中所涉及的各項服務,以及啟動流程中具體執行的操作。至于這些服務的具體功能,以及其背后源碼的詳細實現,還有上一篇文章中遺留下來的一些方法和比較重要的類,我們在這篇文章中去介紹。


2. 創建 DefaultMessageStore

DefaultMessageStore 是 RocketMQ 中默認的消息存儲核心類,broker 對消息的操作都需要經過這個類,比如消息查詢、消息添加、消息拉取 … 所以在調用 BrokerController#initialize 的時候第二步就是創建 DefaultMessageStore,那么我們就來看下這個類的構造方法。

public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {// 消息到達監聽器, 消息重放之后會通知 pullRequestHoldService 服務消息到達, 接著就可以處理消費者的消息拉取請求this.messageArrivingListener = messageArrivingListener;// broker 配置this.brokerConfig = brokerConfig;// 消息存儲配置this.messageStoreConfig = messageStoreConfig;// broker 數據統計管理類this.brokerStatsManager = brokerStatsManager;// MappedFile 分配服務this.allocateMappedFileService = new AllocateMappedFileService(this);// 實例化 CommitLog, 默認是不支持 Dleger 高可用模式的if (messageStoreConfig.isEnableDLegerCommitLog()) {this.commitLog = new DLedgerCommitLog(this);} else {this.commitLog = new CommitLog(this);}// 創建 ConsumeQueue 集合, 存儲了 topic -> (queueId, ConsumeQueue) 的關系this.consumeQueueTable = new ConcurrentHashMap<>(32);// 初始化 ConsumeQueue 刷盤服務this.flushConsumeQueueService = new FlushConsumeQueueService();// 初始化過期 CommitLog 文件清除服務this.cleanCommitLogService = new CleanCommitLogService();// 初始化過期 ConsumeQueue 文件清除服務this.cleanConsumeQueueService = new CleanConsumeQueueService();// 初始化消息存儲統計服務this.storeStatsService = new StoreStatsService();// 初始化 Index 索引服務this.indexService = new IndexService(this);// 判斷是否初始化 HAService 主從同步服務if (!messageStoreConfig.isEnableDLegerCommitLog()) {this.haService = new HAService(this);} else {this.haService = null;}// 初始化消息重放服務this.reputMessageService = new ReputMessageService();// 初始化延時消息服務this.scheduleMessageService = new ScheduleMessageService(this);// 初始化堆外緩存this.transientStorePool = new TransientStorePool(messageStoreConfig);// 初始化堆外緩存的 ByteBuffer, 默認大小是 5if (messageStoreConfig.isTransientStorePoolEnable()) {this.transientStorePool.init();}// 啟動 MappedFile 分配線程this.allocateMappedFileService.start();// 啟動 Index 文件服務線程this.indexService.start();// 創建消息重放列表, 用于構建 ConsumeQueue、Index 索引, 默認順序是 ConsumeQueue -> Indexthis.dispatcherList = new LinkedList<>();this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());// 創建 DefaultMessageStore 的根目錄文件File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir()));// 確保根文件父目錄, 也就是 ${user.home}/store(又或者 broker.conf 里面設置的 storePathRootDir) 被創建MappedFile.ensureDirOK(file.getParent());// 確保 CommitLog 文件父目錄, 也就是 ${user.home}/store/commitlog(又或者 broker.conf 里面設置的 storePathCommitLog) 被創建MappedFile.ensureDirOK(getStorePathPhysic());// 確保 ConsumeQueue 文件父目錄, 也就是 ${user.home}/store/consumequeue(又或者 broker.conf 里面設置的 storePathConsumeQueue) 被創建MappedFile.ensureDirOK(getStorePathLogic());// 設置文件權限是可讀寫, 在 DefaultMessageStore 啟動的時候會加鎖, 確保只有一個 broker 對這個根目錄下的文件操作lockFile = new RandomAccessFile(file, "rw");
}

可以看到在這個構造器里面就是初始化了一堆服務,按照順序一個一個往下說:

  • messageArrivingListener:消息到達監聽器, 消息重放之后會通知 pullRequestHoldService 服務消息到達, 接著就可以處理消費者的消息拉取請求,這個監聽器是跟消費者消息拉取有關的,后面講到消費者拉取消息的時候會重點說下。
  • brokerConfig:broker 配置。
  • messageStoreConfig:消息存儲配置,里面記錄了消息存儲服務的一些參數。
  • brokerStatsManager:broker 數據統計管理類,比如 broker 里面添加了多少次消息,添加的消息的總大小 …
  • allocateMappedFileService:MappedFile 分配服務,用來創建 MappedFile 的,這個服務在文章 【RocketMQ 存儲】- RocketMQ 如何高效創建 MappedFile 有詳細介紹。
  • CommitLog:就是存儲消息的 CommitLog,一個 broker 中有一個 CommitLog,但是 CommitLog 中包含了多個 MappedFile。
  • consumeQueueTable:存儲了 topic -> <queueId, ConsumeQueue> 的關系,一個 topic 下面有多個隊列,而這些隊列可以分配到不同的 broker,所以這個集合保存了當前這個 broker 下面存儲的 topic -> (id, queue) 的映射,也就是說通過 topic 可以找到所有的 ConsumeQueue。
  • flushConsumeQueueService:ConsumeQueue 刷盤服務,這個服務在文章 【RocketMQ 存儲】ConsumeQueue 刷盤服務 FlushConsumeQueueService 有詳細介紹。
  • cleanCommitLogService:CommitLog 文件清除服務,清除 CommitLog 的過期文件,這個服務在文章 【RocketMQ 存儲】- CommitLog 過期清除服務 CleanCommitLogService 有詳細介紹。
  • cleanConsumeQueueService:ConsumeQueue 文件清除服務,清除 ConsumeQueue 的過期文件,這個服務在文章 【RocketMQ 存儲】- ConsumeQueue 過期清除服務 CleanConsumeQueueService 中有詳細介紹。
  • storeStatsService:消息存儲統計服務。
  • indexService:消息存儲統計服務。
  • storeStatsService:Index 索引服務,用于管理 Index 索引,構建、刪除 … 邏輯都是在里面進行。
  • haService:主從同步服務,在【RocketMQ 高可用】這幾篇文章中有詳細介紹這個類。
  • reputMessageService:消息重放服務,用于構建 ConsumeQueue 索引、Index 索引、SQL92 的 bitMap 過濾處理,在 【RocketMQ 存儲】消息重放服務-ReputMessageService
    中有詳細介紹。
  • scheduleMessageService:延時消息服務,處理延時消息的。
  • transientStorePool:堆外緩存,實現讀寫分離,雖然通過 MappedByteBuffer 的 mmap 進行文件映射到了 page cache,同時通過 mlock 鎖定了這篇內存進行 swap 交換。但是當 page cache 臟頁數據越來越多的時候,內核的 pdflush 線程就會將 page cache 中的臟頁回寫到磁盤中,這里是我們不能控制的。所以當臟頁寫入磁盤之后,我們通過 MappedByteBuffer 寫入數據還是會觸發缺頁中斷,這樣就會導致寫入數據延遲,性能產生毛刺現象。為了解決這個問題,RocketMQ 引入了堆外緩存提供讀寫分離,當數據要寫入的時候,先寫入 writeBuffer,這段 writeBuffer 只是普通堆外緩存,不涉及臟頁回寫,所以寫入的時候不會阻塞,而后臺線程就會不斷將這部分數據 commit 到 page cache 中。
  • lockFile:${user.home}/store/lock(storePathRootDir),當 DefaultMessageStore 啟動的時候會往里面寫入 lock,這是為了確保只有一個 broker 對這個根目錄下的文件操作,根目錄默認是 ${user.home}/store,但是也可以在 broker.conf 中設置 storePathRootDir 為根目錄。

3. DefaultMessageStore#load

在 BrokerController#initialize 方法中,通過 DefaultMessageStore#load 去加載各個目錄下面的文件到內存中,如 CommitLog、ConsumeQueue、StoreCheckPoint 等文件,下面就來看下這個方法的源碼。

/*** 加載 MessageStore 下面的消息存儲文件* @throws IOException*/
public boolean load() {boolean result = true;try {// 判斷下上次 RocketMQ 關閉是不是正常關閉boolean lastExitOK = !this.isTempFileExist();log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");// 1.加載 CommitLog 文件result = result && this.commitLog.load();// 2.加載 ConsumeQueue 文件result = result && this.loadConsumeQueue();if (result) {// 3.加載 checkpoint 檢查點文件,文件位置是 ${home}/store/checkpointthis.storeCheckpoint =new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));// 4.加載 IndexFile 文件this.indexService.load(lastExitOK);// 5.恢復 ConsumeQueue 和 CommitLog 文件,也就是將這些數據恢復到內存中this.recover(lastExitOK);log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());if (null != scheduleMessageService) {// 6.加載延時消息服務result =  this.scheduleMessageService.load();}}} catch (Exception e) {log.error("load exception", e);result = false;}if (!result) {// 如果上面的操作有異常,這里就停止創建 MappedFile 的服務this.allocateMappedFileService.shutdown();}return result;
}

3.1 CommitLog#load

這個方法就是用于加載 CommitLog 文件,其實就是加載 Commit 存儲文件夾下面的文件到內存中,下面是源碼。

/*** 加載 CommitLog 文件* @return*/
public boolean load() {// 加載 CommitLog 文件boolean result = this.mappedFileQueue.load();log.info("load commit log " + (result ? "OK" : "Failed"));return result;
}/*** 加載文件* @return*/
public boolean load() {// 文件存放目錄// ${home}/store/commitlogFile dir = new File(this.storePath);// 獲取內部的文件集合File[] ls = dir.listFiles();if (ls != null) {// 如果文件夾下存在文件,那么進行加載return doLoad(Arrays.asList(ls));}return true;
}

因為 CommitLog 一個文件大小是 1GB,所以這個文件夾下面可能有多個文件,需要用到 doLoad 方法,傳入一個集合。

/*** 加載文件* @param files* @return*/
public boolean doLoad(List<File> files) {// 將文件按照文件名排序,這里的文件名就是文件的起始偏移量files.sort(Comparator.comparing(File::getName));// 遍歷所有文件for (File file : files) {if (file.length() != this.mappedFileSize) {// 到這里就是文件實際大小如果不等于設定的文件大小,就直接返回,不加載其他文件,這里 length 就是創建文件的時候設定的文件大小log.warn(file + "\t" + file.length()+ " length not matched message store config value, please check it manually");return false;}try {// 每一個 CommitLog 或者 ConsumeQueue 文件都需要創建一個MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);// 將下面三個指針位置設置成文件大小// MappedByteBuffer 或者 writeBuffer 中 position 的位置mappedFile.setWrotePosition(this.mappedFileSize);// MappedByteBuffer 或者 writeBuffer 中 flush 刷盤的位置mappedFile.setFlushedPosition(this.mappedFileSize);// writeBuffer 中 commit 提交數據到 page cache 中的位置mappedFile.setCommittedPosition(this.mappedFileSize);// 將 MappedByteBuffer 添加到集合 mappedFiles 中this.mappedFiles.add(mappedFile);log.info("load " + file.getPath() + " OK");} catch (IOException e) {log.error("load file " + file + " error", e);return false;}}return true;
}

可以看到就是在加載文件的時候,會把三個指針 wrotePosition、flushedPosition、committedPosition 設置成 mappedFileSize,這個 mappedFileSize 在 CommitLog 文件下就是 1GB,因為默認從磁盤讀取出來的數據肯定就已經是刷盤成功的了,所以這里直接設置為 mappedFileSize,但是有一個問題就是如果你的 RocketMQ 是突然崩潰了,就比如我這里直接 windows 啟動,在這之前我只往 CommitLog 里面寫入了幾條消息,但是這里的設置會導致這三個指針的值都是 1G,就跟實際不符合。
在這里插入圖片描述
但是這里設置了 1G 之后,在 DefaultMessageStore#load 的第 5 步 this.recover(lastExitOK) 會恢復 ConsumeQueue 和 CommitLog 文件,也就是將這些數據恢復到內存中,同時糾正偏移量,所以是沒問題的。


3.2 loadConsumeQueue 加載 ConsumeQueue 文件

上面加載完 CommitLog 之后,這里就要加載 ConsumeQueue 文件了。

/*** 加載 ConsumeQueue 文件* @return*/
private boolean loadConsumeQueue() {// 文件路徑是 ${user.home}/store/consumequeue, 當然這個 ${user.home}/store 也可以在 broker.conf 里面通過 storePathRootDir 配置根目錄File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));// 獲取這個目錄下面的所有文件File[] fileTopicList = dirLogic.listFiles();if (fileTopicList != null) {// 遍歷文件for (File fileTopic : fileTopicList) {// 獲取文件名, ConsumeQueue 文件和 CommitLog 不一樣, CommitLog 是所有 topic 都存到一起, ConsumeQueue 是 topic 分// 開存儲, 文件名就是 topicString topic = fileTopic.getName();// 獲取 ${storePathRootDir}/consumequeue/${topic} 下面的文件列表File[] fileQueueIdList = fileTopic.listFiles();if (fileQueueIdList != null) {// 遍歷, 因為 ConsumeQueue 默認就是 4 個隊列, 所以需要遍歷這些隊列for (File fileQueueId : fileQueueIdList) {int queueId;try {// 隊列 IDqueueId = Integer.parseInt(fileQueueId.getName());} catch (NumberFormatException e) {continue;}// 創建 ConsumeQueue, 這里傳了 topic 和 queueId 進去構造器, 因為文件路徑就是 ${storePathRootDir}/consumequeue/${topic}/${queueId}ConsumeQueue logic = new ConsumeQueue(topic,queueId,StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),this);// 將創建出來的 ConsumeQueue 添加到 consumeQueueTable 中this.putConsumeQueue(topic, queueId, logic);// ConsumeQueue#load 方法if (!logic.load()) {return false;}}}}}log.info("load logics queue all over, OK");return true;
}

這里的文件加載和前面 CommitLog 的差不多,只是獲取文件名的時候 ConsumeQueue 文件和 CommitLog 不一樣,CommitLog 是所有 topic 都存到一起,ConsumeQueue 是 topic 分開存儲,所以 ConsumeQueue 的文件路徑就是 ${storePathRootDir}/consumequeue/${topic}/${queueId},最后創建出 ConsumeQueue 之后需要添加到 consumeQueueTable 中,添加的邏輯如下。

/*** 添加 ConsumeQueue 映射關系* @param topic* @param queueId* @param consumeQueue*/
private void putConsumeQueue(final String topic, final int queueId, final ConsumeQueue consumeQueue) {// 獲取 topic 下面的 ConsumeQueue 集合ConcurrentMap<Integer/* queueId */, ConsumeQueue> map = this.consumeQueueTable.get(topic);if (null == map) {// 初始化map = new ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>();// 添加map.put(queueId, consumeQueue);this.consumeQueueTable.put(topic, map);if (MixAll.isLmq(topic)) {this.lmqConsumeQueueNum.getAndIncrement();}} else {// 直接添加, 會覆蓋map.put(queueId, consumeQueue);}
}

添加完之后調用 ConsumeQueue 的 load 方法去加載 ConsumeQueue 文件,但是其實加載的邏輯跟 CommitLog 是一樣的,只是加載的文件不同,因為都是走的 mappedFileQueue.load() 的邏輯,這個邏輯上面 3.1 小結也說過了,這里不再贅述。
在這里插入圖片描述


3.3 創建 StoreCheckpoint

加載 checkpoint 檢查點文件,文件位置是 ${storePathRootDir}/checkpoint,這個 checkpoint 就是檢查點文件,CommitLog、ConsumeQueue、IndexFile 這三個文件的最新消息的存儲時間點會被記錄,如果 broker 異常重啟了,這時候就會根據這三個時間點來恢復文件。
在這里插入圖片描述
當然這里就是來看一下創建的構造器邏輯,關于這三個值的使用在 recover 方法有用到,之前的文件也講過,所以如果有需要可以去看下之前的文章,就是正常異常退出恢復那兩篇。

public StoreCheckpoint(final String scpPath) throws IOException {// 判斷是否存在當前文件// D:\\javaCode\\rocketmq-source\\config\\store\\checkpointFile file = new File(scpPath);MappedFile.ensureDirOK(file.getParent());// 文件是否存在boolean fileExists = file.exists();// mmap 映射this.randomAccessFile = new RandomAccessFile(file, "rw");this.fileChannel = this.randomAccessFile.getChannel();this.mappedByteBuffer = fileChannel.map(MapMode.READ_WRITE, 0, MappedFile.OS_PAGE_SIZE);if (fileExists) {log.info("store checkpoint file exists, " + scpPath);// 記錄最新 CommitLog 文件的最新添加到 ByteBuffer 的時間戳this.physicMsgTimestamp = this.mappedByteBuffer.getLong(0);// 最新 ConsumeQueue 文件的最新添加到 ByteBuffer 的時間戳this.logicsMsgTimestamp = this.mappedByteBuffer.getLong(8);// 最新 IndexFile 文件的最新添加到 ByteBuffer 的時間戳this.indexMsgTimestamp = this.mappedByteBuffer.getLong(16);log.info("store checkpoint file physicMsgTimestamp " + this.physicMsgTimestamp + ", "+ UtilAll.timeMillisToHumanString(this.physicMsgTimestamp));log.info("store checkpoint file logicsMsgTimestamp " + this.logicsMsgTimestamp + ", "+ UtilAll.timeMillisToHumanString(this.logicsMsgTimestamp));log.info("store checkpoint file indexMsgTimestamp " + this.indexMsgTimestamp + ", "+ UtilAll.timeMillisToHumanString(this.indexMsgTimestamp));} else {// 文件不存在log.info("store checkpoint file not exists, " + scpPath);}
}

3.4 indexService.load 加載 IndexFile 文件

/*** 加載 IndexFile 文件, 路徑是 ${storePathRootDir}/index* @param lastExitOK 上一次 RocketMQ 是否正常退出* @return*/
public boolean load(final boolean lastExitOK) {// 首先獲取這個文件夾路徑下面的所有文件File dir = new File(this.storePath);File[] files = dir.listFiles();if (files != null) {// 文件目錄名稱排序Arrays.sort(files);for (File file : files) {try {// 構建 IndexFileIndexFile f = new IndexFile(file.getPath(), this.hashSlotNum, this.indexNum, 0, 0);// 加載 IndexFilef.load();// 如果上一次不是正常退出if (!lastExitOK) {// 如果說 IndexFile 最后一條消息的 storeTimeStamp 比 StoreCheckPoint 中記錄的要大if (f.getEndTimestamp() > this.defaultMessageStore.getStoreCheckpoint().getIndexMsgTimestamp()) {// 說明這個 IndexFile 有一部分數據是臟數據, 刪掉這個文件f.destroy(0);continue;}}log.info("load index file OK, " + f.getFileName());// 加到 indexFileList 里面this.indexFileList.add(f);} catch (IOException e) {log.error("load file {} error", file, e);return false;} catch (NumberFormatException e) {log.error("load file {} error", file, e);}}}return true;
}

這個方法就用到了我們提到的 StoreCheckPoint 文件里面的數據,如果 IndexFile 記錄的最新的索引的消息在 CommitLog 的存儲時間比 StoreCheckPoint 文件里面記錄的 indexMsgTimestamp 要大,說明這個 IndexFile 文件就是一個不合法的文件,至少是數據不合法的,因為 indexMsgTimestamp 這個變量就是在 IndexFile 滿了刷盤的時候才會記錄,比如將一個寫滿了的 IndexFile 文件刷盤就記錄一下這個 indexMsgTimestamp 變量。
在這里插入圖片描述
所以看上面圖也能看出來,刷盤的時候必須要文件寫滿了才會去更新 indexMsgTimestamp,這就意味者這個變量就是記錄完整的 IndexFile 刷盤的時間,而不是刷盤一次就記錄一次,因此如果說 IndexFile 最后一條消息的 storeTimeStamp 比 StoreCheckPoint 中記錄的要大,就說明這個 IndexFile 肯定是有問題的。

至于為什么可以這么記錄,因為 IndexFile 不像 CommitLog 和 ConsumeQueue 那樣,一個 IndexFile 就是一個 MappedFile 來存儲所有數據,像 CommitLog 和 ConsumeQueue 都是用 MappedFileQueue 存儲 MappedFile 集合的,所以這里還是有點不同的。


3.5 recover 文件恢復

這里的文件恢復代碼如下:

/*** 恢復 CommitLog 和 ConsumeQueue 中的數據到內存中* @param lastExitOK*/
private void recover(final boolean lastExitOK) {// 恢復所有 ConsumeQueue 文件,返回的是 ConsumeQueue 中存儲的最大有效 CommitLog 偏移量long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();// 上一次 Broker 退出是正常退出還是異常退出if (lastExitOK) {// 這里就是正常退出,所以正常恢復 CommitLogthis.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);} else {// 這里就是異常退出,所以異常恢復 CommitLogthis.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);}// 最后恢復 topicQueueTablethis.recoverTopicQueueTable();
}

正常異常恢復的邏輯在前面講 RocketMQ 存儲部分的時候已經講過了,所以這里就不再多說。

  • 【RocketMQ 存儲】- 異常退出恢復邏輯 recoverAbnormally
  • 【RocketMQ 存儲】- 正常退出恢復邏輯 recoverNormally

3.6 延時消息服務加載

延時消息屬于消息消費的一種,這里后面講到消費者的時候也會詳細講解,這里先不多說。


4. registerProcessor 注冊 code 處理器

/*** 注冊 Code 處理器*/
public void registerProcessor() {/*** SendMessageProcessor*/SendMessageProcessor sendProcessor = new SendMessageProcessor(this);sendProcessor.registerSendMessageHook(sendMessageHookList);sendProcessor.registerConsumeMessageHook(consumeMessageHookList);// 消息發送的處理器this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);/*** PullMessageProcessor, 專門用于處理 PULL_MESSAGE 這個請求 CODE, 也就是消息拉取請求*/this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);/*** ReplyMessageProcessor*/ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this);replyMessageProcessor.registerSendMessageHook(sendMessageHookList);this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);/*** QueryMessageProcessor*/NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);/*** ClientManageProcessor*/ClientManageProcessor clientProcessor = new ClientManageProcessor(this);this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);/*** ConsumerManageProcessor*/ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);/*** EndTransactionProcessor*/this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);/*** Default*/AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
}

這里就是注冊處理器的邏輯,上一篇文章我們說過,RocketMQ 發送請求的時候會在請求頭設置 code 表示這個請求是什么類型的請求,這個請求需要使用什么處理器來處理就是在這里注冊的,同時 broker 處理請求的時候不可能是單線程處理的,肯定是封裝成一個 runnable 線程任務去處理,所以需要傳入一個線程池來并發執行。


5. ConfigManager.persist

在 BrokerController#initialize 中啟動了多個定時任務來定時持久化,下面來看下這個持久化的源碼。

/*** 持久化到文件中, 持久化 json 和 json.bak 文件*/
public synchronized void persist() {// 需要持久化的數據String jsonString = this.encode(true);if (jsonString != null) {// 需要持久化的文件路徑String fileName = this.configFilePath();try {// 持久化邏輯MixAll.string2File(jsonString, fileName);} catch (IOException e) {log.error("persist file " + fileName + " exception", e);}}
}

這里面就是用了模板方法的設計模式,ConfigManager 是一個抽象類,需要持久化的數據 encode、需要持久化的文件路徑 configFilePath 都是子類實現的,下面是 ConfigManager 的子類。
在這里插入圖片描述


6. protectBroker 保護 broker

RocketMQ 作為消息隊列,肯定會存在消息堆積的情況,如果是堆積量比較少那還行,可以新啟動多幾個消費者來加速消費,但是如果堆積量比較大,這就不得不懷疑是不是消費者出什么問題了,比如代碼出 bug 導致消費進度一致不變。

我們知道消費者消費之前都會先去 broker 拉取消息到本地,然后再在本地消費,所以如果消費者拉取消息的進度一直不變,而生產者又在不斷新增消息,就會導致消息堆積量越來越多。

這個 protectBroker 方法就是去檢測如果這個消費者組的消息拉取進度落后最新消息超過 16G, 說明有可能這個消費者組里面的消費者出問題了, 不能正常消費消息, 所以這時候就會將消費者組的訂閱配置設置為禁止消費, 需要人為介入去修復。

public void protectBroker() {// 如果禁止消費者緩慢讀取if (this.brokerConfig.isDisableConsumeIfConsumerReadSlowly()) {final Iterator<Map.Entry<String, MomentStatsItem>> it = this.brokerStatsManager.getMomentStatsItemSetFallSize().getStatsItemTable().entrySet().iterator();while (it.hasNext()) {final Map.Entry<String, MomentStatsItem> next = it.next();// fallBehindBytes 記錄的是消費者組剩余可以拉取的消息大小final long fallBehindBytes = next.getValue().getValue().get();// 如果超過了 16Gif (fallBehindBytes > this.brokerConfig.getConsumerFallbehindThreshold()) {final String[] split = next.getValue().getStatsKey().split("@");final String group = split[2];LOG_PROTECTION.info("[PROTECT_BROKER] the consumer[{}] consume slowly, {} bytes, disable it", group, fallBehindBytes);// 設置這個消費者組的 consumeEnable 為 false, 禁止這個消費者組里面的消費者對這個 topic 消費this.subscriptionGroupManager.disableConsume(group);}}}}

7. fetchNameServerAddr 定時任務定時拉取 NameServer 地址

broker 啟動的時候在 start 方法會啟動一個定時任務去定期拉取 NameServer 地址,當然這里的定期拉取 NameServer 地址的前提是用戶沒有在 broker.conf 文件中設置 namesrvAddr 配置。

public String fetchNameServerAddr() {try {// 從地址服務器拉取 nameserver 的地址String addrs = this.topAddressing.fetchNSAddr();if (addrs != null) {if (!addrs.equals(this.nameSrvAddr)) {log.info("name server address changed, old: {} new: {}", this.nameSrvAddr, addrs);this.updateNameServerAddressList(addrs);// 更新 nameSrvAddr 屬性this.nameSrvAddr = addrs;return nameSrvAddr;}}} catch (Exception e) {log.error("fetchNameServerAddr Exception", e);}return nameSrvAddr;
}

核心源碼是下面的 fetchNSAddr,來看下代碼。

/*** 拉取 nameserver 的地址* @return*/
public final String fetchNSAddr() {return fetchNSAddr(true, 3000);
}/*** 拉取 nameserver 的地址* @param verbose* @param timeoutMills* @return*/
public final String fetchNSAddr(boolean verbose, long timeoutMills) {String url = this.wsAddr;try {// 向 wsAddr 地址發送查詢 NameServer 的請求if (!UtilAll.isBlank(this.unitName)) {url = url + "-" + this.unitName + "?nofix=1";}HttpTinyClient.HttpResult result = HttpTinyClient.httpGet(url, null, null, "UTF-8", timeoutMills);if (200 == result.code) {String responseStr = result.content;if (responseStr != null) {// 處理返回結果return clearNewLine(responseStr);} else {log.error("fetch nameserver address is null");}} else {log.error("fetch nameserver address failed. statusCode=" + result.code);}} catch (IOException e) {if (verbose) {log.error("fetch name server address exception", e);}}if (verbose) {String errorMsg ="connect to " + url + " failed, maybe the domain name " + MixAll.getWSAddr() + " not bind in /etc/hosts";errorMsg += FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL);log.warn(errorMsg);}return null;
}private static String clearNewLine(final String str) {// 去除前后的空格String newString = str.trim();int index = newString.indexOf("\r");if (index != -1) {// 截取 \r 前面的字符串return newString.substring(0, index);}index = newString.indexOf("\n");if (index != -1) {// 截取 \n 前面的字符串return newString.substring(0, index);}// 沒有 \r 和 \n 字符, 直接返回原來的字符串return newString;
}

拉取的邏輯就是往 url 發送 get 請求來獲取地址,所以這里的核心就是如何找到這個 url,也就是代碼中的 wsAddr。由于這個屬性是 TopAddressing 的,我們就看下這個屬性是怎么設置的。

在這里插入圖片描述
在這里插入圖片描述
可以看到這個屬性就是從 MixAll.getWSAddr 這個方法獲取到的,這個方法就是用于獲取地址服務器的請求地址。

/*** 獲取 WS 地址* @return*/
public static String getWSAddr() {// 屬性 rocketmq.namesrv.domain,默認值 jmenv.tbsite.netString wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP);// 屬性 rocketmq.namesrv.domain.subgroup,默認值 nsaddrString wsDomainSubgroup = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr");// wsAddr 的默認值是 http://jmenv.tbsite.net:8080/rocketmq/nsaddrString wsAddr = "http://" + wsDomainName + ":8080/rocketmq/" + wsDomainSubgroup;// 如果存在 ':', 比如說 127.0.0.1:9876if (wsDomainName.indexOf(":") > 0) {// wsAddr 的會被設置成 http://127.0.0.1:9876/rocketmq/nsaddrwsAddr = "http://" + wsDomainName + "/rocketmq/" + wsDomainSubgroup;}// 返回 ws 地址return wsAddr;
}

如果在沒有配置系統變量的情況下,默認就是 http://jmenv.tbsite.net:8080/rocketmq/nsaddr,這個一般也用不到吧,除非是用一個服務來統一管理 NameServer 地址,這里就看下注釋即可。


8. 小結

承接上一篇文章,本篇對其中一些關鍵方法進行了補充說明。將這兩篇文章結合起來,我們就梳理出了 broker 啟動的大致邏輯。

在 broker 啟動過程中,涉及眾多方法,本文已著重對部分重要方法進行了補充介紹。不過,還有一些同樣關鍵的操作尚未展開,比如將 broker 信息注冊到 NameServer 這一操作。考慮到其重要性和復雜性,我會專門撰寫一篇文章深入剖析。

至此,關于 broker 啟動的大致邏輯就講解到這里了。后續文章,我們再一同深入探討其他關鍵環節。





如有錯誤,歡迎指出!!!!

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/904097.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/904097.shtml
英文地址,請注明出處:http://en.pswp.cn/news/904097.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

認識含鹽褶皺和沖斷帶中的前造山運動蒸發巖變形

主要觀察結果的示意圖模型&#xff0c;并提出了三疊紀前造山蒸發巖變形的演化模型。&#xff08;a&#xff09; 蒸發巖的初始平緩隆起&#xff0c;在鹽枕和鹽背斜頂部有滑塌褶皺和同沉積伸展斷層。&#xff08;b&#xff09; 底辟形成&#xff0c;主動刺穿和拖動折疊長底辟&…

數據庫MySQL學習——day8(復習與鞏固基礎知識)

文章目錄 1. 數據庫基礎概念復習2. 常用SQL命令復習2.1 SELECT 查詢數據2.2 INSERT 插入數據2.3 UPDATE 更新數據2.4 DELETE 刪除數據 3. 表操作復習3.1 創建表3.2 修改表3.3 刪除表 4. 實踐任務4.1 創建樣例數據庫和表4.2 插入和更新數據4.3 使用WHERE、ORDER BY、LIMIT進行查…

TS 字面量類型

str是string類型l str2是常量&#xff0c;類型是字面量類型 用途&#xff1a;配合聯合類型確定更嚴謹精確的可選值利恩

【PostgreSQL數據分析實戰:從數據清洗到可視化全流程】1.3 客戶端工具選擇(psql/PGAdmin/DBeaver)

?? 點擊關注不迷路 ?? 點擊關注不迷路 ?? 點擊關注不迷路 文章大綱 1.3 客戶端工具選擇(psql/PGAdmin/DBeaver)1.3.1 工具定位與適用場景1.3.2 深度解析三大工具1.3.2.1 psql:命令行的終極掌控核心特性基礎操作示例優缺點分析1.3.2.2 PGAdmin:PostgreSQL 專屬管理平臺…

2023年第十四屆藍橋杯省賽B組Java題解【 簡潔易懂】

2023年第十四屆藍橋杯省賽B組Java 題解 比賽整體情況說明 第十四屆藍橋杯大賽省賽B組Java賽道共有10道題目&#xff0c;題型分布如下&#xff1a; 題目類型題量難度分布考察重點結果填空題53簡單 2中等基礎算法、數學邏輯、字符串處理編程大題52中等 3困難動態規劃、圖論、…

深度學習中的數據增強:提升食物圖像分類模型性能的關鍵策略

深度學習中的數據增強&#xff1a;提升食物圖像分類模型性能的關鍵策略 在深度學習領域&#xff0c;數據是模型訓練的基石&#xff0c;數據的數量和質量直接影響著模型的性能表現。然而&#xff0c;在實際項目中&#xff0c;獲取大量高質量的數據往往面臨諸多困難&#xff0c;…

jakarta.mail(javax.mail)包中關于SMTP協議支持的屬性參數配置

一、程序屬性配置 NO.NameTypeDescription1mail.smtp.userStringSMTP的默認用戶名。2mail.smtp.hostString要連接的SMTP服務器。3mail.smtp.portint如果 connect()方法未明確指定一個&#xff0c;則要連接的SMTP服務器端口。默認為25。4mail.smtp.connectiontimeoutint套接字連…

《解鎖SCSS算術運算:構建靈動樣式的奧秘》

SCSS作為CSS預處理器&#xff0c;算術運算功能猶如一顆璀璨明珠&#xff0c;賦予我們動態計算樣式屬性值的強大能力&#xff0c;讓網頁樣式不再是一成不變的刻板呈現&#xff0c;而是能夠根據各種條件和需求靈動變化。 在SCSS的世界里&#xff0c;算術運算絕非孤立的存在&…

Java 入門篇

文章目錄 一、注釋1、單行注釋2、多行注釋3、文檔注釋 二、字面量注意 三、變量四、標識符 一、注釋 1、單行注釋 // 同 C992、多行注釋 /* 同 C89和 C99 等 */3、文檔注釋 /** 文檔注釋&#xff0c;其內容是可以提取到一個程序說明文檔中去的*/二、字面量 同 C 常量 注意…

PowerShell安裝Chocolatey

文章目錄 環境背景安裝參考 環境 Windows 11 專業版PowerShell 7.5.1.NET Framework 4.0Chocolatey v2.4.3 背景 Chocolatey是Windows上的包管理工具&#xff0c;有點類似于Linux的 yum 和 apt 命令。比如&#xff0c;PowerShell里默認沒有 grep 命令&#xff0c;則可以通過…

將Airtable導入NocoDB

將Airtable導入NocoDB 0. 前提條件1. 詳細步驟 NocoDB提供了一種簡化流程&#xff0c;可在幾分鐘內將您的Airtable數據庫無縫轉移到各種數據庫管理系統&#xff0c;包括MySQL、Postgres和SQLite。此功能特別適合希望將Airtable數據庫遷移到更強大且可擴展的數據庫管理系統中的用…

微軟發布了最新的開源推理模型套件“Phi-4-Reasoning

每周跟蹤AI熱點新聞動向和震撼發展 想要探索生成式人工智能的前沿進展嗎&#xff1f;訂閱我們的簡報&#xff0c;深入解析最新的技術突破、實際應用案例和未來的趨勢。與全球數同行一同&#xff0c;從行業內部的深度分析和實用指南中受益。不要錯過這個機會&#xff0c;成為AI領…

Elasticsearch入門速通01:核心概念與選型指南

一、Elasticsearch 是什么&#xff1f; 一句話定義&#xff1a; 開源分布式搜索引擎&#xff0c;擅長處理海量數據的實時存儲、搜索與分析&#xff0c;是ELK技術棧&#xff08;ElasticsearchKibanaBeatsLogstash&#xff09;的核心組件。 核心能力&#xff1a; 近實時搜索&…

【angular19】入門基礎教程(三):關于angular里面的響應式數據入門使用

三個框架&#xff0c;都有響應式數據的概念。在angular里面有專門的叫法&#xff0c;響應式數據叫信號,英文名signal。其他兩個框架式沒有專門的名字的&#xff0c;統稱為動態數據。這點可以說&#xff0c;angular還是太細了&#xff0c;細節值得點贊&#xff01;&#xff01; …

GitHub Actions 和 GitLab CI/CD 流水線設計

以下是關于 GitHub Actions 和 GitLab CI/CD 流水線設計 的基本知識總結: 一、核心概念對比 維度GitHub ActionsGitLab CI/CD配置方式YAML 文件(.github/workflows/*.yml).gitlab-ci.yml執行環境GitHub 托管 Runner / 自托管GitLab 共享 Runner / 自托管市場生態Actions Mar…

【網絡編程】HTTP(超文本傳輸協議)詳解

&#x1f984;個人主頁:修修修也 &#x1f38f;所屬專欄:網絡編程 ??操作環境:Visual Studio 2022 目錄 &#x1f4cc;HTTP定義 &#x1f4cc;HTTP工作原理 1.客戶端發起請求: 2.服務器處理請求: 3.客戶端處理響應: &#x1f4cc;HTTP關鍵特性 &#x1f38f;HTTP請求方法 &am…

Centos小白之在CentOS8.5中安裝Rabbitmq 3.10.8

注意事項 安裝以及運行等其他操作&#xff0c;要使用root賬號進行&#xff0c;否則會遇到很多麻煩的事情。 使用命令行進行遠程登錄 ssh root192.168.0.167 安裝make 執行安裝命令 yum -y install make gcc gcc-c kernel-devel m4 ncurses-devel openssl-devel這里有可能會…

SQL筆記——左連接、右連接、內連接

前言&#xff1a;總是忘記表連接的區別&#xff0c;在面試的時候也容易被問到&#xff0c;因此就好記性不如爛筆頭吧 集合運算 有并集、交集、差集 聯合查詢*&#xff08;針對行合并的&#xff09;* union為關鍵字&#xff0c;就是將兩個select的結果求并集&#xff08;此時重…

LeetCode - 19.刪除鏈表的倒數第N個結點

目錄 題目 解法一 雙指針算法 核心思想 執行流程 具體例子 代碼 解法二 兩次遍歷法 核心思想 執行流程 具體例子 代碼 題目 19. 刪除鏈表的倒數第 N 個結點 - 力扣&#xff08;LeetCode&#xff09; 解法一 雙指針算法 核心思想 利用雙指針間隔固定距離(n1)&a…

C# 編程核心:控制流與方法調用詳解

在編程中&#xff0c;控制流和方法調用是構建程序邏輯的兩大基石。它們決定了代碼的執行順序和模塊化協作方式。本文將從基礎概念出發&#xff0c;結合代碼示例&#xff0c;深入解析這兩部分內容。 控制流&#xff1a;程序執行的指揮棒 控制流決定了代碼的執行路徑&#xff0…