摘要
RocketMQ只要有CommitLog文件就可以正常運行了,那為何還要維護ConsumeQueue文件呢?
ConsumeQueue是消費隊列,引入它的目的是為了提高消費者的消費速度。畢竟RocketMQ是基于Topic主題訂閱模式的,消費者往往只關心自己訂閱的消息,如果每次消費都從CommitLog文件中檢索數據,無疑性能是非常差的。有了ConsumeQueue,消費者就可以根據消息在CommitLog文件中的偏移量快速定位到消息進行消費了。
Broker會將客戶端發送的消息寫入CommitLog文件,持久化存儲。但是整個流程并沒有涉及到ConsumeQueue文件的操作,那么ConsumeQueue文件是如何被構建的呢?
一、CommitLog文件構建分析
ReputMessageService是消息重放服務,請允許我這么命名。Broker在啟動的時候,會開啟一個線程每毫秒執行一次doReput()方法。
它的目的就是對寫入CommitLog文件里的消息進行「重放」,它有一個屬性reputFromOffset,記錄的是消息重放的偏移量,MessageStore啟動的時候會對其進行賦值。
它的工作原理是,根據重放偏移量reputFromOffset去讀取CommitLog里的待重放的消息,并構建DispatchRequest對象,然后將DispatchRequest對象分發出去,交給各個CommitLogDispatcher處理。
MessageStore維護了CommitLogDispatcher對象集合,目前只有三個處理器:
- CommitLogDispatcherBuildConsumeQueue:構建ConsumeQueue索引。
- CommitLogDispatcherBuildIndex:構建Index索引。
- CommitLogDispatcherCalcBitMap:構建布隆過濾器,加速SQL92過濾效率。
doReput()
方法1毫秒執行一次,它的方法體是一個for循環,只要reputFromOffset沒有到達CommitLog文件的最大偏移量,就會一直繼續重放消息。
private boolean isCommitLogAvailable() {return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
}
它首先會根據reputFromOffset去CommitLog文件中截取一段ByteBuffer,這個緩沖區里就是待重放的消息數據。
public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {// CommitLog單個文件的大小int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();// 根據索引構建進度找到等待構建的文件,文件名就是起始Offset,遍歷文件即可找到MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);if (mappedFile != null) {// 計算Offset在當前文件的讀指針位置int pos = (int) (offset % mappedFileSize);/*** 基于MappedFile的MappedByteBuffer派生出一個ByteBuffer對象* 共享同一塊內存,但是擁有自己的指針*/SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);return result;}return null;
}
SelectMappedBufferResult類屬性如下:
// 起始偏移量
private final long startOffset;
// 緩沖區
private final ByteBuffer byteBuffer;
// 長度
private int size;
// 關聯的MappedFile對象
private MappedFile mappedFile;
有了SelectMappedBufferResult,就可以讀取消息數據了。由于消息重放并不需要知道消息主體內容,因此不會讀取消息Body,只是讀取相關屬性,并構建DispatchRequest對象。讀取的屬性如下:
// 消息所屬Topic
private final String topic;
// 消息所屬隊列ID
private final int queueId;
// 消息在CommitLog文件中的偏移量
private final long commitLogOffset;
// 消息大小
private int msgSize;
// 消息Tag哈希碼
private final long tagsCode;
// 消息存盤時間
private final long storeTimestamp;
// 邏輯消費隊列位點
private final long consumeQueueOffset;
private final String keys;
private final boolean success;
// 消息唯一鍵
private final String uniqKey;
// 消息系統標記
private final int sysFlag;
// 事務消息偏移量
private final long preparedTransactionOffset;
// 屬性
private final Map<String, String> propertiesMap;
有了DispatchRequest對象,接下來就是調用doDispatch
方法將請求分發出去了。此時CommitLogDispatcherBuildConsumeQueue將被觸發,它會將請求轉交給DefaultMessageStore執行。
DefaultMessageStore.this.putMessagePositionInfo(request);
MessageStore先根據消息Topic和QueueID定位到ConsumeQueue文件,然后將索引追加到文件中。
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {// 根據Topic和QueueID定位到ConsumeQueue文件ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());// 追加索引到文件cq.putMessagePositionInfoWrapper(dispatchRequest);
}
寫索引之前,會先確保消息倉庫是可寫狀態:
boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
然后,初始化一個ByteBuffer,容量為20字節,依次往里面寫入:消息Offset、size、tagsCode。
// 每個索引的長度是20字節,byteBufferIndex是循環使用的
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
/**
* 索引結構:Offset+size+tagsCode
* 8字節 4字節 8字節
*/
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);
根據消費隊列位點和單個索引的長度計算索引應該寫入的文件位置,因為是順序寫的嘛,所以獲取最新的ConsumeQueue文件,如果文件寫滿會創建新的繼續寫。?
final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
寫之前,校驗預期的偏移量和邏輯偏移量是否相等,正常情況下兩者應該相等,如果不等說明數據構建錯亂了,需要重新構建了。、
if (cqOffset != 0) {// 偏移量:當前文件的寫指針位置+文件起始偏移量(文件名)long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();// 正常情況下,expectLogicOffset和currentLogicOffset應該相等if (expectLogicOffset < currentLogicOffset) {log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);return true;}if (expectLogicOffset != currentLogicOffset) {LOG_ERROR.warn("[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",expectLogicOffset,currentLogicOffset,this.topic,this.queueId,expectLogicOffset - currentLogicOffset);}
}
檢驗通過后,就可以正常寫了。先更新當前ConsumeQueue記錄消息的最大偏移量maxPhysicOffset,再將20個字節的索引數據寫入到文件。?
至此,就完成了CommitLog中的消息到ConsumeQueue文件里的索引同步。
ConsumeQueue是RocketMQ用來加速消費者消費效率的索引文件,它是一個邏輯消費隊列,并不保存消息本身,只是一個消息索引。索引長度為20個字節,記錄了消息在CommitLog文件里的偏移量,消息長度,和消息Tag的哈希值。Consumer消費消息時可以根據Tag哈希值快速過濾消息,然后根據偏移量快速定位到消息,再根據消息長度讀取出一條完整的消息。
?
Broker將消息寫入CommitLog后并不會馬上寫ConsumeQueue,而是由一個異步線程ReputMessageService將消息進行重放,重放的過程中由CommitLogDispatcherBuildConsumeQueue將消息構建到ConsumeQueue文件,構建的頻率為1毫秒一次,幾乎是近實時的,不用擔心消費會延遲。
二、Consumer消息拉取和消費分析
MQConsumer是RocketMQ提供的消費者接口,從接口定義上可以看到,它主要的功能是訂閱感興趣的Topic、注冊消息監聽器、啟動生產者開始消費消息。
消費者獲取消息的模式有兩種:推模式和拉模式,對應的類分別是DefaultMQPushConsumer和DefaultMQPullConsumer,需要注意的是,在4.9.0版本,DefaultMQPullConsumer已經被廢棄了。
Push模式下,由Broker接收到消息后主動推送給消費者,實時性較高,但是會增加Broker的壓力。Pull模式下,由消費者主動從Broker拉取消息,主動權在消費者,這種方式更靈活,消費者可以根據自己的消費能力拉取適量的消息。
實際上,Push模式也是通過Pull的方式實現的,消息統一由消費者主動拉取,那如何保證消息的實時性呢?
Consumer和Broker會建立長連接,一旦分配到MessageQueue,就會立馬構建PullRequest去拉取消息,在不觸發流控的情況下,不管有沒有拉取到新的消息,Consumer都會立即再次拉取,這樣就保證了消息消費的實時性。
如果Broker長時間沒有新的消息,Consumer一直拉取,豈不是空轉CPU浪費資源?
Consumer在拉取消息時,會攜帶參數suspendTimeoutMillis,它表示Broker在沒有新的消息時,阻塞等待的時間,默認是15秒。如果沒有消息,Broker等待15秒再返回結果,避免客戶端頻繁拉取。如果15秒內有新的消息了,立馬返回,保證消息消費的時效性。
2.1 Consumer相關組件
DefaultMQPushConsumer
RocketMQ暴露給開發者使用的基于Push模式的默認生產者類,和DefaultMQProducer一樣,它也僅僅是一個外觀類,基本沒有業務邏輯,幾乎所有操作都轉交給生產者實現類DefaultMQPushConsumerImpl完成。這么做的好處是RocketMQ屏蔽了內部實現,方便在后續的版本中隨時更換實現類,而用戶無感知。
DefaultMQPushConsumerImpl
默認的基于Push模式的消費者實現類,擁有消費者的所有功能,例如:拉取消息、執行鉤子函數、消費者重平衡等等。
PullAPIWrapper
調用拉取消息API的包裝類,它是Consumer拉取消息的核心類,它有一個方法特別重要pullKernelImpl,是拉取消息的核心方法。它會根據拉取的MessageQueue去查找對應的Broker,然后構建拉取消息請求頭PullMessageRequestHeader發送到Broker,然后執行拉取回調,在回調里會通知消費者消費拉取到的消息。
OffsetStore
OffsetStore是RocketMQ提供的,用來幫助Consumer管理消費位點(消費進度)的接口,它有兩個實現類:LocalFileOffsetStore和RemoteBrokerOffsetStore,從名字就可以看出來,一個是將消費進度存儲在本地,一個是將消費進度存儲在Broker上。
LocalFileOffsetStore會將消費進度持久化到本地磁盤,Consumer啟動后會從指定目錄讀取文件,恢復消費進度。
RemoteBrokerOffsetStore將消費進度交給Broker管理,Consumer不會存儲到文件,沒有意義,但是消費消息時會暫存消費進度在內存,然后在拉取消息時上報消費進度,由Broker負責存儲。
什么場景下需要將消費進度存儲在本地呢?這和RocketMQ消息消費模式有關,RocketMQ支持兩種消息消費模式:集群消費和廣播消費。一個ConsumerGroup下可以有多個消費者實例,集群模式下,消息只會投遞給其中一個Consumer實例消費,而廣播模式下,消息會投遞給每個Consumer實例。
綜上所述,集群模式下,消費進度由Broker管理,使用RemoteBrokerOffsetStore。廣播模式下,因為消息需要被每個Consumer實例消費,每個實例消費的進度是不一樣的,因此由實例自己存儲消費進度,使用LocalFileOffsetStore。
ConsumeMessageService
消費消息的服務,客戶端拉取到消息后,是需要有線程去消費的,因此它是一個線程池,線程數由consumeThreadMin
和consumeThreadMax
設置,默認線程數為20。
它是一個接口,比較重要的兩個方法如下:
// 當前線程直接消費消息
ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);// 提交消費請求,由線程池去調度
void submitConsumeRequest(final List<MessageExt> msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispathToConsume);
一個是由當前線程直接消費消息,另一個是提交消費請求ConsumeRequest由線程池去負責調度,一般情況下使用的還是后者。
RocketMQ提供了兩個實現類,分別是ConsumeMessageConcurrentlyService和ConsumeMessageOrderlyService,前者用來并發消費消息,后者用來消費有序消息。
PullMessageService
消息拉取服務,負責從Broker拉取消息,然后提交給ConsumeMessageService消費。它也是一個線程,它的run方法是一個死循環,通過監聽阻塞隊列來判斷是否需要拉取消息。阻塞隊列里存放的就是PullRequest對象,當Consumer實例上線后,會做一次負載均衡,從眾多MessageQueue中給自己完成分配,當有新的MessageQueue被分配給自己,就會創建PullRequest對象提交到阻塞隊列,然后PullMessageService就會開始拉取消息,在拉取完成的回調函數中,不管有沒有拉取到新的消息,在不觸發流控的情況下,都會一直拉取。
2.2?Consumer相關源碼分析
整個Consumer的運行可以大致分為四個過程:
- 消費者啟動
- 消費者組負載均衡
- 消息的拉取
- 消息的消費
Consumer的啟動流程和Producer有部分重合之處。
首先自然是創建DefaultMQPushConsumer,在它的構造函數中,會創建消費者實現類DefaultMQPushConsumerImpl。
public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,AllocateMessageQueueStrategy allocateMessageQueueStrategy) {// 消費組名this.consumerGroup = consumerGroup;// 命名空間this.namespace = namespace;// 消息隊列分配策略算法this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;// 消費者實現類defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}
前面說過,DefaultMQPushConsumer只是一個外觀類,它更多的職責只是保存Consumer的配置,它的屬性可以重點關注一下:
// 消費者實現
protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
// 消費者組名
private String consumerGroup;
// 消費類型:集群消費/廣播消費
private MessageModel messageModel = MessageModel.CLUSTERING;
// 新加入的ConsumerGroup,從哪里開始消費消息?
// 只針對Broker沒有消費位點的新ConsumerGroup,已經存在的消費組設置無意義。
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
// 當ConsumeFromWhere設為CONSUME_FROM_TIMESTAMP時,
// 從哪個時間點開始消費?默認半小時前。
private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30));
// 消費者分配消息的策略算法
private AllocateMessageQueueStrategy allocateMessageQueueStrategy;
// Topic訂閱關系
private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>();
// 消息監聽器
private MessageListener messageListener;
/*** 消費進度管理* 1.集群消費:RemoteBrokerOffsetStore* 2.廣播消費:LocalBrokerOffsetStore*/
private OffsetStore offsetStore;
// 最小消費線程數
private int consumeThreadMin = 20;
// 最大消費線程數
private int consumeThreadMax = 20;
// 動態調整線程池,代碼被刪除,暫不支持,忽略。
private long adjustThreadPoolNumsThreshold = 100000;
// 并發消息的最大位點差,超過該值說明客戶端消息積壓較多,降低拉取速度
private int consumeConcurrentlyMaxSpan = 2000;
// 單個Queue緩存的消息閾值,達到閾值流控處理
private int pullThresholdForQueue = 1000;
// 單個Queue緩存的消息字節數閾值,單位MB
private int pullThresholdSizeForQueue = 100;
// 單個Topic緩存的消息數閾值
private int pullThresholdForTopic = -1;
// 單個Topic緩存的消息字節數閾值
private int pullThresholdSizeForTopic = -1;
// 消息拉取間隔,單位ms
private long pullInterval = 0;
// 消費者批量消費的消息數
private int consumeMessageBatchMaxSize = 1;
// 批量拉取的消息數
private int pullBatchSize = 32;
// 每次拉取消息是否更新訂閱關系?
private boolean postSubscriptionWhenPull = false;
private boolean unitMode = false;
// 最大消費重試次數,默認16
private int maxReconsumeTimes = -1;
// 需要降低拉取速度時,暫停拉取的時間
private long suspendCurrentQueueTimeMillis = 1000;
// 消費超時時間,單位:分鐘
private long consumeTimeout = 15;
// 關閉消費者時,等待消息消費的時間,默認不等待。
private long awaitTerminationMillisWhenShutdown = 0;
// 消息軌跡跟蹤
private TraceDispatcher traceDispatcher = null;
SubscriptionData代表了消費者的訂閱關系,屬性如下:
// 啟用Broker類過濾模式
private boolean classFilterMode = false;
// 訂閱的Topic
private String topic;
// 子表達式 Tag/SQL92語法
private String subString;
// Tag集合
private Set<String> tagsSet = new HashSet<String>();
// Tag哈希集合,Broker根據Tag哈希快速過濾消息
private Set<Integer> codeSet = new HashSet<Integer>();
private long subVersion = System.currentTimeMillis();
// 表達式類型,默認是Tag,也可以用SQL92語法
private String expressionType = ExpressionType.TAG;
// 使用FilterClass過濾的源碼
private String filterClassSource;
如果使用TAG的方式,會計算出Tag哈希值,Broker在ConsumeQueue索引中記錄了Tag哈希,這樣就可以根據Tag哈希快速過濾消息了。
訂閱完Topic,就是注冊消息監聽MessageListener,就是一個賦值操作。
以上操作執行完,Consumer就可以啟動了,接下來才是重頭戲。
外觀類啟動的時候,會啟動消費者實現類DefaultMQPushConsumerImpl,我們直接看它就好。啟動主要做了以下事情:
- 校驗消費者配置
- 拷貝訂閱關系
- 創建MQClientInstance
- 設置RebalanceImpl
- 創建PullAPIWrapper,消息拉取核心類
- 加載消費進度(Local)
- 啟動ConsumeMessageService
- 啟動MQClientInstance
- 拉取訂閱的Topic路由信息
- SQL表達式上傳到Broker編譯
- 給Broker發心跳,通知其他Consumer重平衡
- 自己重平衡,拉取消息
?
checkConfig方法,會在Consumer啟動前做一系列的校驗,確保服務滿足啟動條件,校驗的事項有:
- 校驗GroupName
- 校驗消費模式:集群/廣播
- 校驗ConsumeFromWhere
- 校驗開始消費的指定時間
- 校驗AllocateMessageQueueStrategy
- 校驗訂閱關系
- 校驗是否注冊消息監聽
- 校驗消費線程數
- 校驗單次拉取的最大消息數
- 校驗單次消費的最大消息數
- 啟動前校驗通過,說明配置沒有問題,具備啟動的基本條件。
copySubscription
方法會拷貝訂閱關系到RebalanceImpl,Consumer在重平衡時需要用到,除了拷貝給定的Topic訂閱關系,Consumer還會自動訂閱ConsumerGroup的重試隊列。
// 集群消費模式下,自動訂閱重試Topic
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
創建客戶端實例MQClientInstance,消息拉取核心對象PullAPIWrapper。
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
根據消息消費模式,創建對應的OffsetStore
switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING:this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;case CLUSTERING:this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;default:break;
}
// 從磁盤恢復消費進度(Local)
this.offsetStore.load();
創建ConsumeMessageService并啟動,如果是有序消息,創建ConsumeMessageOrderlyService,并發消費創建ConsumeMessageConcurrentlyService。
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {this.consumeOrderly = true;this.consumeMessageService =new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {this.consumeOrderly = false;this.consumeMessageService =new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();
ConsumeMessageService是一個線程池,消息拉取服務拉取到消息后,會構建ConsumeRequest對象交給線程池調度執行。
前置操作完成后,就可以啟動客戶端實例了。MQClientInstance啟動主要做了以下事情:
- 發請求獲取NameServerAddr
- 啟動Netty客戶端
- 啟動各種定時任務
- 啟動消息拉取服務
- 啟動重均衡服務
客戶端如果沒有指定NameServer地址,RocketMQ會讀取環境變量rocketmq.namesrv.domain,它的期望值是一個URL鏈接,每隔2分鐘發一個請求更新NameServer地址。在集群環境下,NameServer機器數和IP都是不固定的,通過配置中心下發比硬編碼更靈活。
if (null == this.clientConfig.getNamesrvAddr()) {// 讀取環境變量,發請求更新NameServer地址this.mQClientAPIImpl.fetchNameServerAddr();
}
RocketMQ基于Netty來完成網絡通信,Consumer作為客戶端是要和Broker通信的,因此還需要啟動Netty客戶端。
啟動各種定時任務,這些任務包括:獲取NameServer地址,從NameServer拉取Topic路由信息、清理下線的Broker、給Broker發心跳、持久化消費進度。
?
啟動消息拉取服務PullMessageService,它是一個單獨的線程,run方法會監聽阻塞隊列pullRequestQueue,只要隊列中有拉取請求,它就會去Broker拉取消息。
public void run() {while (!this.isStopped()) {PullRequest pullRequest = this.pullRequestQueue.take();this.pullMessage(pullRequest);}
}
啟動重平衡服務RebalanceService,也是一個單獨的線程,默認會每隔20秒重新做一次負載均衡,給Consumer重新分配MessageQueue。例如,TopicA下有4個MessageQueue,此時只有一個消費者實例訂閱了,那么這4個MessageQueue都會分配給它消費。過了一會兒,新的消費者實例上線,此時會做一次重平衡,重新分配,因為有兩個消費者實例了,因此每個實例會分配2個MessageQueue。
@Override
public void run() {while (!this.isStopped()) {// 默認20秒做一次重新負載均衡this.waitForRunning(waitInterval);this.mqClientFactory.doRebalance();}
}
相關服務啟動完成后,Consumer會自動向NameServer拉取訂閱的Topic路由信息。
private void updateTopicSubscribeInfoWhenSubscriptionChanged() {Map<String, SubscriptionData> subTable = this.getSubscriptionInner();if (subTable != null) {for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {final String topic = entry.getKey();this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);}}
}