本文著重分析為consumequeue/topic/queueId
目錄下的索引文件。
1.ConsumeQueueStore
public class ConsumeQueueStore {protected final ConcurrentMap<String>, ConcurrentMap<Integer>, ConsumeQueueInterface>> consumeQueueTable;public boolean load() {String storePathRootDir = this.messageStoreConfig.getStorePathRootDir();String storePathConsumeQueue = getStorePathConsumeQueue(storePathRootDir);boolean cqLoadResult = loadConsumeQueues(storePathConsumeQueue, CQType.SimpleCQ);String storePathBatchConsumeQueue = getStorePathBatchConsumeQueue(storePathRootDir);boolean bcqLoadResult = loadConsumeQueues(storePathBatchConsumeQueue, CQType.BatchCQ);return cqLoadResult && bcqLoadResult;}//Broker啟動后加載本地的consumequeue文件private boolean loadConsumeQueues(String storePath, CQType cqType) {File dirLogic = new File(storePath);File[] fileTopicList = dirLogic.listFiles();if (fileTopicList != null) {for (File fileTopic : fileTopicList) {String topic = fileTopic.getName();File[] fileQueueIdList = fileTopic.listFiles();if (fileQueueIdList != null) {for (File fileQueueId : fileQueueIdList) {int queueId = Integer.parseInt(fileQueueId.getName());;queueTypeShouldBe(topic, cqType);//選擇 ConsumeQueue or BatchConsumeQueue 本文以 ConsumeQueue 作為分析案例ConsumeQueueInterface logic = createConsumeQueueByType(cqType, topic, queueId, storePath);this.putConsumeQueue(topic, queueId, logic);if (!this.load(logic)) {return false;}}}}}return true;}private void putConsumeQueue(final String topic, final int queueId, final ConsumeQueueInterface consumeQueue) {ConcurrentMap<Integer/* queueId */, ConsumeQueueInterface> map = this.consumeQueueTable.get(topic);if (null == map) {map = new ConcurrentHashMap<>();map.put(queueId, consumeQueue);this.consumeQueueTable.put(topic, map);} else {map.put(queueId, consumeQueue);}}public boolean load(ConsumeQueueInterface consumeQueue) {// 通過 topic & queueId 從consumeQueueTable 獲取到 對應的FileQueueLifeCycle 即ConsumeQueueFileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());return fileQueueLifeCycle.load();}
}
1.1.ConsumeQueue
public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {private final MappedFileQueue mappedFileQueue;@Overridepublic boolean load() {boolean result = this.mappedFileQueue.load();return result;}
}
1.2.MappedFileQueue
mappedFileQueue.load
核心功能就是加載consumequeue/topic/queueId
目錄下的消費索引本地文件。區別CommitLog加載的是/commitlog目錄下真正的用戶數據。
ConsumeQueue & CommitLog 均持有屬性類MappedFileQueue【mmap零拷貝之內存映射的磁盤文件】。
DefaultMessageStore#ReputMessageService
CommitLog & ConsumerQueue 目錄下的所有問題在Broker端啟動的時候默認都會加載到內存中建立與磁盤之間的映射關系。但是在CommitLog不斷增加數據過程中,ConsumerQueue是如何確認每條消息的索引文件呢?