1. 概述
先來看看 RocketMQ 消費過程中的輪詢機制是啥。首先需要補充一點消費相關的前置知識。
1.1 消息消費方式
RocketMQ 支持多種消費方式,包括 Push 模式和 Pull 模式
- Pull 模式:用戶自己進行消息的拉取和消費進度的更新
- Push 模式:Broker 將新的消息自動發送給用戶進行消費
1.2 Push 消費模式
我們一般使用 RocketMQ 時用的是 Push 模式,因為比較方便,不需要手動拉取消息和更新消費進度。
那么你有沒有想過 Push 模式是如何做到能夠立即消費新的消息?
1.2.1 Push 模式原理
實際上,在 Push 消費時,消費者是在不斷輪詢 Broker,詢問是否有新消息可供消費。一旦有新消息到達,馬上拉取該消息。也就是說 Push 模式內部也用了 Pull 消息的模式,這樣就可以立即消費到最新的消息。
1.3 如何進行輪詢?
那么 Push 模式或 Pull 模式如何進行消息的查詢?
能夠想到的比較笨的方法是,每隔一定的時間(如1ms)就向 Broker 發送一個查詢請求,如果沒有新消息則立刻返回。可想而知這種方法非常浪費網絡資源。
RocketMQ 為了提高網絡性能,在拉取消息時如果沒有新消息,不會馬上返回,而是會將該查詢請求掛起一段時間,然后再重試查詢。如果一直沒有新消息,直到輪詢時間超過設定的閾值才會返回。
根據輪詢設定的超時閾值大小的不同,RocketMQ 有兩種輪詢方式,分別為長輪詢(默認)和短輪詢。
1.4 長輪詢和短輪詢
RocketMQ 的 Broker 端參數 longPollingEnable
可以配置輪詢方式,默認為 true
- 短輪詢:
longPollingEnable=false
,輪詢時間為shortPollingTimeMills
,默認為 1s - 長輪詢:
longPollingEnable=true
,輪詢時間為 5s。拉取請求掛起時間:受DefaultMQPullConsumer
的brokerSuspendMaxTimeMillis
控制,默認push模式固定15s,pull模式固定20s。
2. 概要流程
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-b6pQzSWr-1646145661686)(https://raw.githubusercontent.com/HScarb/drawio-diagrams/main/rocketmq/consume/long_polling_activity.drawio.svg)]
根據上面的活動圖來看一下 RocketMQ 消費時的輪詢機制流程
- Consumer 發送拉取消息請求
- Broker 收到請求后交給請求處理模塊處理
- 嘗試從存儲的消息中拉取消息
- 如果能夠拉取消息,那么將拉取到的消息直接返回
- 如果沒有拉取到消息,那么根據 Broker 是否支持掛起和是否開啟長輪詢來判斷是否要進行輪詢以及進行哪種輪詢。
- 如果支持掛起,那么會將該拉取請求掛起
- 長輪詢等待 5s
- 短輪詢等待 1s
- 檢查消費隊列中是否有新消息到達,如果沒有則繼續等待,以此循環。如果有新消息,處理掛起的拉取消息請求并返回消費者。
- 如果沒有新消息到達,輪詢后會檢查每個掛起的拉取請求的掛起時間是否超過掛起時間閾值,如果超過那么也會直接返回消費者,否則繼續循環進行輪詢操作。
那么按照上述流程,開啟長輪詢的情況下,如果一次輪詢沒有找到消息,要等待 5s 才能進行下一次查詢。如果這 5s 當中有新的消息存入,如何保證能夠立刻消費到?
解決方案不難想到,就是新的消息寫入后,主動進行通知,讓掛起的拉取請求立刻進行拉取操作。
RocketMQ 就是這么做的,在消息存入 CommitLog 后的 doReput 方法中,會判斷是否是長輪詢,如果是則會發送一個通知,讓掛起的拉取請求立刻進行處理。
3. 詳細流程
3.1 涉及到的類
3.1.1 PullMessageProcessor
該類是 Broker 處理 Consumer 拉取清求的入口類。當 Broker 收到 Consumer 發送的拉取請求時,調用該類的 processRequest 方法
3.1.2 PullRequestHoldService
長輪詢請求管理線程,掛起的拉取請求會在這里進行保存。每等待一段時間(長輪詢/短輪詢等待時間)會檢查掛起的請求中是否有可以進行拉取的數據。
3.1.3 DefaultMessageStore#ReputMessageService
該線程負責將存儲到 CommitLog 的消息重新轉發,用以生成 ConsumeQueue 和 IndexFile 索引。在生成索引之后,會向長輪詢線程發送提醒,立刻喚醒相應隊列的拉取請求,執行消息拉取。
3.2 時序圖
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-brvsznhE-1646145661687)(https://raw.githubusercontent.com/HScarb/drawio-diagrams/main/rocketmq/consume/long_polling_sequence.drawio.svg)]
著重體現了長輪詢邏輯,其他邏輯有所省略
- 消費者調用
pullKernelImpl()
發送拉取請求,調用時用brokerSuspendMaxTimeMillis
指定了 Broker 掛起的最長時間,默認為 20s - Broker 中
PullMessageProcess
處理拉取請求,從ConsumeQueue
中查詢消息 - 如果沒有查詢到消息,判斷是否啟用長輪詢,調用
PullRequestHoldService#suspendPullRequest()
方法將該請求掛起 - PullRequestHoldService 線程
run()
方法循環等待輪詢時間,然后周期性調用checkHoldRequest()
方法檢查掛起的請求是否有消息可以拉取 - 如果檢查到有新消息可以拉取,調用
notifyMessageArriving()
方法 - ReputMessageService 的 doReput() 如果被調用,說明也有新消息到達,需要喚醒掛起的拉取請求。這里也會發送一個 notify,進而調用
notifyMessageArriving()
方法 notifyMessageArriving()
方法中也會查詢 ConsumeQueue 的最大 offset,如果確實有新消息,那么將喚醒對應的拉取請求,具體的方法是調用executeRequestWhenWakeup()
方法executeRequestWhenWakeup()
方法喚醒拉取請求,調用processRequest()
方法處理該請求
3.3 每個類的具體邏輯
3.3.1 PullMessageProcessor
Broker 處理 Consumer 拉取清求的入口類
-
RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
:處理 Consumer 拉取請求的入口方法,收到 Consumer 拉取請求時調用。該方法主要完成如下操作- 校驗
- 消息過濾
- 從存儲中查詢消息
- 返回響應給 Consumer
如果從存儲中沒有查詢到消息,會將響應碼設置為
ResponseCode.PULL_NOT_FOUND
,并且啟動長輪詢 -
void executeRequestWhenWakeup(Channel channel, final RemotingCommand request)
:將 Hold 的拉取請求喚醒,再次拉取消息- 該方法在長輪詢收到新消息時調用,立即喚醒掛起的拉取請求,然后對這些請求調用
processRequest
方法 - 何時需要提醒長輪詢新消息已經到達?上面說到,在長輪詢等待時如果有新消息到達,
CommitLog
的doReput
方法中會進行提醒,最終會調用executeRequestWhenWakeup
方法
- 該方法在長輪詢收到新消息時調用,立即喚醒掛起的拉取請求,然后對這些請求調用
3.3.2 PullRequestHoldService
該服務線程會從 pullRequestTable
本地緩存變量中取PullRequest請求,檢查輪詢條件“待拉取消息的偏移量是否小于消費隊列最大偏移量”是否成立,如果條件成立則說明有新消息達到Broker端,則通過PullMessageProcessor的executeRequestWhenWakeup()方法重新嘗試發起Pull消息的RPC請求
-
pullRequestTable
private ConcurrentMap<String/* topic@queueId */, ManyPullRequest/* 同一隊列積累的拉取請求 */> pullRequestTable = new ConcurrentHashMap<>(1024)
上面是掛起的消息拉取請求容器,它是一個
ConcurrentHashMap
,key 是拉取請求的隊列,value 是該隊列掛起的所有拉取請求。其中ManyPullRequest
底層是一個ArrayList
,它的 add 方法加了鎖。 -
suspendPullRequest(String topic, int queueId, PullRequest pullRequest)
:將 Consumer 拉取請求暫時掛起,會將請求加入到pullRequestTable
中 -
checkHoldRequest()
:檢查所有掛起的拉取請求,如果有數據滿足要求,就喚醒該請求,對其執行PullMessageProcessor#processRequest
方法 -
run()
:線程主循環,每等待一段時間就調用checkHoldRequest()
方法檢查是否有請求需要喚醒。等待的時間根據長輪詢/短輪詢的配置決定,長輪詢等待 5s,短輪詢默認等待 1s -
notifyMessageArriving()
:被checkHoldRequest()
和ReputMessageService#doReput()
調用,表示新消息到達,喚醒對應隊列掛起的拉取請求
3.3.3 DefaultMessageStore#ReputMessageService
該服務線程 doReput()
方法會在 Broker 端不斷地從數據存儲對象 CommitLog
中解析數據并分發請求,隨后構建出 ConsumeQueue
(邏輯消費隊列)和 IndexFile
(消息索引文件)兩種類型的數據。
同時從本地緩存變量 PullRequestHoldService#pullRequestTable
中,取出掛起的拉起請求并執行。
4. 源碼解析
4.1 PullMessageProcessor
4.1.1 processRequest
如果從存儲中沒有查詢到消息,會將響應碼設置為 ResponseCode.PULL_NOT_FOUND
,并且啟動長輪詢
以下三種情況會將響應碼設置為ResponseCode.PULL_NOT_FOUND
:
- NO_MESSAGE_IN_QUEUE:消費隊列中沒有任何消息
- OFFSET_FOUND_NULL:offset未找到任何數據
- OFFSET_OVERFLOW_ONE:待拉取偏移量等于隊列最大偏移量
/*** 處理客戶端請求入口** @param channel 網絡通道,通過該通道向消息拉取客戶端發送響應結果* @param request 消息拉取請求* @param brokerAllowSuspend Broker端是否允許掛起,默認true。true:如果未找到消息則掛起。false:未找到消息直接返回消息未找到* @return 響應* @throws RemotingCommandException 當解析請求發生異常時*/
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)throws RemotingCommandException {// ...switch (response.getCode()) {// ...// 如果從消費隊列中未找到新的可以拉取的消息,判斷并掛起該拉取請求case ResponseCode.PULL_NOT_FOUND:// 長輪詢if (brokerAllowSuspend && hasSuspendFlag) {long pollingTimeMills = suspendTimeoutMillisLong;if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();}String topic = requestHeader.getTopic();long offset = requestHeader.getQueueOffset();int queueId = requestHeader.getQueueId();PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);response = null;break;}// ...
}
4.1.2 executeRequestWhenWakeup
在PullMessageProcessor的executeRequestWhenWakeup()方法中,通過業務線程池pullMessageExecutor,異步提交重新Pull消息的請求任務,即為重新調了一次PullMessageProcessor業務處理器的processRequest()方法,來實現Pull消息請求的二次處理)。
/*** 將Hold的拉取請求喚醒,再次拉取消息* 該方法調用線程池,因此,不會阻塞** @param channel 通道* @param request Consumer拉取請求* @throws RemotingCommandException 當遠程調用發生異常*/
public void executeRequestWhenWakeup(final Channel channel,final RemotingCommand request) throws RemotingCommandException {Runnable run = new Runnable() {@Overridepublic void run() {try {// 處理Consumer拉取請求,獲取返回體final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false);if (response != null) {response.setOpaque(request.getOpaque());response.markResponseType();try {// 將返回體寫入channel,返回給Consumerchannel.writeAndFlush(response).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {log.error("processRequestWrapper response to {} failed",future.channel().remoteAddress(), future.cause());log.error(request.toString());log.error(response.toString());}}});} catch (Throwable e) {log.error("processRequestWrapper process request over, but response failed", e);log.error(request.toString());log.error(response.toString());}}} catch (RemotingCommandException e1) {log.error("excuteRequestWhenWakeup run", e1);}}};// 異步執行請求處理和返回this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request));
}
4.2 PullRequestHoldService
4.2.1 suspendPullRequest
/*** 掛起(保存)客戶端請求,當有數據的時候觸發請求** @param topic 主題* @param queueId 隊列編號* @param pullRequest 拉取消息請求*/
public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {// 根據topic和queueId構造map的keyString key = this.buildKey(topic, queueId);// map的key如果為空,創建一個空的request隊列,填充key和valueManyPullRequest mpr = this.pullRequestTable.get(key);if (null == mpr) {mpr = new ManyPullRequest();ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);if (prev != null) {mpr = prev;}}// 保存該次Consumer拉取請求mpr.addPullRequest(pullRequest);
}
4.2.2 checkHoldRequest
/*** 檢查所有已經掛起的長輪詢請求* 如果有數據滿足要求,就觸發請求再次執行*/
private void checkHoldRequest() {// 遍歷拉取請求容器中的每個隊列for (String key : this.pullRequestTable.keySet()) {String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);if (2 == kArray.length) {String topic = kArray[0];int queueId = Integer.parseInt(kArray[1]);// 從store中獲取隊列的最大偏移量final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);try {// 根據store中獲取的最大偏移量,判斷是否有新消息到達,如果有則執行拉取請求操作this.notifyMessageArriving(topic, queueId, offset);} catch (Throwable e) {log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);}}}
}
4.2.3 run
@Override
public void run() {log.info("{} service started", this.getServiceName());while (!this.isStopped()) {try {// 等待一定時間if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {// 開啟長輪詢,每5s判斷一次消息是否到達this.waitForRunning(5 * 1000);} else {// 未開啟長輪詢,每1s判斷一次消息是否到達this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());}long beginLockTimestamp = this.systemClock.now();// 檢查是否有消息到達,可以喚醒掛起的請求this.checkHoldRequest();long costTime = this.systemClock.now() - beginLockTimestamp;if (costTime > 5 * 1000) {log.info("[NOTIFYME] check hold request cost {} ms.", costTime);}} catch (Throwable e) {log.warn(this.getServiceName() + " service has exception. ", e);}}log.info("{} service end", this.getServiceName());
}
4.2.4 notifyMessageArriving
這個方法在兩個地方被調用,如下圖所示
這個方法是重新喚醒拉取請求的核心方法。調用這個方法,提醒 PullRequestHoldService 線程有新消息到達
我們來看看這個方法具體做了什么
- 根據 topic 和 queueId 獲取掛起的拉取請求列表
- 從 store 中獲取該隊列消息的最大offset
- 遍歷該隊列的所有拉取請求,符合以下兩種條件之一的拉取請求會被處理并返回
- 消費隊列最大offset比消費者拉取請求的offset大,說明有新的消息可以被拉取,處理該拉取請求
- 拉取請求掛起時間超過閾值,直接返回消息未找到
- 如果不滿足以上兩個條件,那么該拉取請求會重新放回
pullRequestTable
,等待下次檢查
/*** 當有新消息到達的時候,喚醒長輪詢的消費端請求** @param topic 消息Topic* @param queueId 消息隊列ID* @param maxOffset 消費隊列的最大Offset*/
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {// 根據topic和queueId從容器中取出掛起的拉取請求列表String key = this.buildKey(topic, queueId);ManyPullRequest mpr = this.pullRequestTable.get(key);if (mpr != null) {// 獲取掛起的拉取請求列表List<PullRequest> requestList = mpr.cloneListAndClear();if (requestList != null) {// 預先定義需要繼續掛起的拉取請求列表List<PullRequest> replayList = new ArrayList<PullRequest>();for (PullRequest request : requestList) {long newestOffset = maxOffset;// 從store中獲取該隊列消息的最大offsetif (newestOffset <= request.getPullFromThisOffset()) {newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);}// 消費隊列最大offset比消費者拉取請求的offset大,說明有新的消息可以被拉取if (newestOffset > request.getPullFromThisOffset()) {// 消息過濾匹配boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));// match by bit map, need eval again when properties is not null.if (match && properties != null) {match = request.getMessageFilter().isMatchedByCommitLog(null, properties);}if (match) {try {// 會調用PullMessageProcessor#processRequest方法拉取消息,然后將結果返回給消費者this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());} catch (Throwable e) {log.error("execute request when wakeup failed.", e);}continue;}}// 查看是否超時,如果Consumer請求達到了超時時間,也觸發響應,直接返回消息未找到if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {try {this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());} catch (Throwable e) {log.error("execute request when wakeup failed.", e);}continue;}// 當前不滿足要求,重新放回Hold列表中replayList.add(request);}if (!replayList.isEmpty()) {mpr.addPullRequest(replayList);}}}
}
4.3 DefaultMessageStore#ReputMessageService
4.3.1 doReput
private void doReput() {// ...DefaultMessageStore.this.doDispatch(dispatchRequest);// 通知消息消費長輪詢線程,有新的消息落盤,立即喚醒掛起的消息拉取請求if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()&& DefaultMessageStore.this.messageArrivingListener != null) {DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
}
這里調用了 NotifyMessageArrivingListener#arriving() 方法,進而調用 PullRequestHoldService.notifyMessageArriving()。
為什么不直接調用 pullRequestHoldService.notifyMessageArriving() ?因為 doReput 所處的類所在的包是 store,存儲包,而 PullRequestHoldService 在 broker 包中
所以需要一個橋梁,就是 NotifyMessageArrivingListener。它在 Broker 初始化 DefaultMessageStore 時被寫入 DefaultMessageStore
4.3.2 NotifyMessageArrivingListener#arriving
public class NotifyMessageArrivingListener implements MessageArrivingListener {@Overridepublic void arriving(String topic, int queueId, long logicOffset, long tagsCode,long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {// 提醒長輪詢請求管理容器,新的消息到達,立刻拉取最新消息this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode,msgStoreTime, filterBitMap, properties);}
}
參考資料
- 源碼分析RocketMQ消息PULL-長輪詢模式
- 消息中間件—RocketMQ 消息消費(二)(push 模式實現)