1.緒論
前面的幾篇文章都剖析了broker的存儲文件。那么生產者發送一條消息到達broker過后是如何處理的,這條消息結果什么處理過后,消費者才能夠消費這條消息。接下來,帶我們將仔細剖析一下一條消息從生產者生產消息,到到達broker持久化到commitLog后,消費者來消費的具體流程。
2.生產者推送消息
生產者發送消息我們后面會詳細講解,這里我們先看一看推送消息的核心函數。其實就是根據消消息的queue找到所屬的broker,然后通過netty將消息發送到對應的broker上去。
private SendResult sendKernelImpl(final Message msg, //需要發送的消息final MessageQueue mq, //推送消息到哪個queuefinal CommunicationMode communicationMode,//網絡模式:同步,異步,onewayfinal SendCallback sendCallback, //推送消息后的回調函數final TopicPublishInfo topicPublishInfo, //topic的路由信息,topic和queue的對應關系final long timeout //超時時間) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginStartTime = System.currentTimeMillis();//通過ma的brokerName獲取到broker地址String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());//如果broker地址為空,需要重新從NameServer拉取broker的配置信息if (null == brokerAddr) {tryToFindTopicPublishInfo(mq.getTopic());brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());}SendMessageContext context = null;if (brokerAddr != null) {//構建broker的網絡請求地址brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);//獲取消息內容byte[] prevBody = msg.getBody();try {//for MessageBatch,ID has been set in the generating processif (!(msg instanceof MessageBatch)) {MessageClientIDSetter.setUniqID(msg);}boolean topicWithNamespace = false;if (null != this.mQClientFactory.getClientConfig().getNamespace()) {msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());topicWithNamespace = true;}int sysFlag = 0;boolean msgBodyCompressed = false;//進行消息壓縮if (this.tryToCompressMessage(msg)) {sysFlag |= MessageSysFlag.COMPRESSED_FLAG;sysFlag |= compressType.getCompressionFlag();msgBodyCompressed = true;}final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (Boolean.parseBoolean(tranMsg)) {sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;}//是否有禁止消息發送的鉤子函數,如果有,便執行鉤子if (hasCheckForbiddenHook()) {CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());checkForbiddenContext.setCommunicationMode(communicationMode);checkForbiddenContext.setBrokerAddr(brokerAddr);checkForbiddenContext.setMessage(msg);checkForbiddenContext.setMq(mq);checkForbiddenContext.setUnitMode(this.isUnitMode());this.executeCheckForbiddenHook(checkForbiddenContext);}//是否有消息發送前的鉤子函數,如果有,便執行鉤子if (this.hasSendMessageHook()) {context = new SendMessageContext();context.setProducer(this);context.setProducerGroup(this.defaultMQProducer.getProducerGroup());context.setCommunicationMode(communicationMode);context.setBornHost(this.defaultMQProducer.getClientIP());context.setBrokerAddr(brokerAddr);context.setMessage(msg);context.setMq(mq);context.setNamespace(this.defaultMQProducer.getNamespace());String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (isTrans != null && isTrans.equals("true")) {context.setMsgType(MessageType.Trans_Msg_Half);}if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {context.setMsgType(MessageType.Delay_Msg);}this.executeSendMessageHookBefore(context);}//構建消息發送的請求頭SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());//生產者組requestHeader.setTopic(msg.getTopic());//topucrequestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); //默認的隊列數量requestHeader.setQueueId(mq.getQueueId()); //發送到哪個queuerequestHeader.setSysFlag(sysFlag);requestHeader.setBornTimestamp(System.currentTimeMillis()); //消息產生的事件requestHeader.setFlag(msg.getFlag());requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); //消息的額外屬性requestHeader.setReconsumeTimes(0); //消息能夠重新消費的次數,默認為0requestHeader.setUnitMode(this.isUnitMode());requestHeader.setBatch(msg instanceof MessageBatch);requestHeader.setBname(mq.getBrokerName()); //broker的名稱//如果是重試隊列列明的消息if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);if (reconsumeTimes != null) {//設置重試的次數:RECONSUME_TIMErequestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);}//設置最大的重試次數String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);if (maxReconsumeTimes != null) {requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);}}SendResult sendResult = null;switch (communicationMode) {case ASYNC:Message tmpMessage = msg;boolean messageCloned = false;if (msgBodyCompressed) {//If msg body was compressed, msgbody should be reset using prevBody.//Clone new message using commpressed message body and recover origin massage.//Fix bug:https://github.com/apache/rocketmq-externals/issues/66tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;msg.setBody(prevBody);}if (topicWithNamespace) {if (!messageCloned) {tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;}msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}long costTimeAsync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeAsync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}//核心如果是異步發送,通過網絡將消息發送給broker,異步調用sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),tmpMessage,requestHeader,timeout - costTimeAsync,communicationMode,sendCallback,topicPublishInfo,this.mQClientFactory,this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),context,this);break;case ONEWAY:case SYNC:long costTimeSync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeSync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}//通過網絡將消息同步發送到brokersendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);break;default:assert false;break;}if (this.hasSendMessageHook()) {context.setSendResult(sendResult);this.executeSendMessageHookAfter(context);}return sendResult;} catch (RemotingException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} catch (MQBrokerException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} catch (InterruptedException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} finally {msg.setBody(prevBody);msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}}throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);}
3. broker接收生產者的推送的消息-SendMessageProcessor
消息到達broker過后是交給誰處理呢,我們可以看到producer發送消息的code為SEND_MESSAGE,所以在broker中會交給SendMessageProcessor來處理請求。
public class RequestCode {public static final int SEND_MESSAGE = 10;
}
3.1 處理請求
public RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {RemotingCommand response = null;try {//調用異步處理請求的函數response = asyncProcessRequest(ctx, request).get();} catch (InterruptedException | ExecutionException e) {log.error("process SendMessage error, request : " + request.toString(), e);}return response;}
public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception {//1.先執行異步處理請求 2.然后再執行回調函數asyncProcessRequest(ctx, request).thenAcceptAsync(responseCallback::callback, this.brokerController.getPutMessageFutureExecutor());}
public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final SendMessageContext mqtraceContext;switch (request.getCode()) {case RequestCode.CONSUMER_SEND_MSG_BACK:return this.asyncConsumerSendMsgBack(ctx, request);default://解析請求頭SendMessageRequestHeader requestHeader = parseRequestHeader(request);if (requestHeader == null) {return CompletableFuture.completedFuture(null);}//榮國請求頭獲取到消息上下文,比如消息所屬的broker或broker地址等mqtraceContext = buildMsgContext(ctx, requestHeader);//執行消息處理前的前置鉤子this.executeSendMessageHookBefore(ctx, request, mqtraceContext);if (requestHeader.isBatch()) {return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);} else {//真正處理消息的邏輯return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);}}}
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,SendMessageContext mqtraceContext,SendMessageRequestHeader requestHeader) {//構造一個空的響應,并且回寫該響應的opaque,方便請求方根據opaque來判斷響應是否到達final RemotingCommand response = preSend(ctx, request, requestHeader);//獲取請求頭final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();if (response.getCode() != -1) {return CompletableFuture.completedFuture(response);}//獲取請求體final byte[] body = request.getBody();//獲取到queueIdint queueIdInt = requestHeader.getQueueId();//根據消息的topic獲取到topic的配置信息TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());//如果queueId小于0,隨機投遞到一個消息if (queueIdInt < 0) {queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());}//這是broker里面消息的映射MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(requestHeader.getTopic());msgInner.setQueueId(queueIdInt);if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {return CompletableFuture.completedFuture(response);}//設置消息體,生產時間,機器,重試時間等msgInner.setBody(body);msgInner.setFlag(requestHeader.getFlag());Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());MessageAccessor.setProperties(msgInner, origProps);msgInner.setBornTimestamp(requestHeader.getBornTimestamp());msgInner.setBornHost(ctx.channel().remoteAddress());msgInner.setStoreHost(this.getStoreHost());msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();//設置集群名MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);if (origProps.containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) {// There is no need to store "WAIT=true", remove it from propertiesString to save 9 bytes for each message.// It works for most case. In some cases msgInner.setPropertiesString invoked later and replace it.String waitStoreMsgOKValue = origProps.remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));// Reput to properties, since msgInner.isWaitStoreMsgOK() will be invoked laterorigProps.put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue);} else {msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));}CompletableFuture<PutMessageResult> putMessageResult = null;//判斷是否是事務消息里面的prepare消息String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (Boolean.parseBoolean(transFlag)) {if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending transaction message is forbidden");return CompletableFuture.completedFuture(response);}//如果是,邊帶哦呦事務消息service寫入消息putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);} else {//否者調用messageStore來存儲消息putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);}//根據存儲結果,返回結果return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);}
其實在這里,我們很清晰了,如果是事務的prepare消息,便調用TransactionalMessageService這個組件寫入消息,否者調用messageStore的asyncPutMessage寫入消息。接下來我們來看看這兩種情況發生了什么。
3.2 普通消息持久化到commitLog-DefaultMessageStore
普通消息到了這里我們應該是很清楚的,其實就是調用commitLog的asyncPutMessage方法將消息寫入到commitLog中。如果我們采用的是DledgerCommitLog,會采用二階段寫入,主broker會將這條消息寫入到commitLog中,然后通知follower節點寫日志,如果follower寫入成功,主節點會更新commited的索引,代表真正的寫入成功。
不清楚的小伙伴可以看:
深度解析RocketMq源碼-持久化組件(四) CommitLog
深度解析RocketMq源碼-高可用存儲組件(二)Dledger框架概覽-CSDN博客
不懂的小伙伴可以看:
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {//校驗存儲組件的狀態PutMessageStatus checkStoreStatus = this.checkStoreStatus();if (checkStoreStatus != PutMessageStatus.PUT_OK) {return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));}//校驗,essahe的內容PutMessageStatus msgCheckStatus = this.checkMessage(msg);if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));}PutMessageStatus lmqMsgCheckStatus = this.checkLmqMessage(msg);if (msgCheckStatus == PutMessageStatus.LMQ_CONSUME_QUEUE_NUM_EXCEEDED) {return CompletableFuture.completedFuture(new PutMessageResult(lmqMsgCheckStatus, null));}long beginTime = this.getSystemClock().now();//調用commitLog的asyncPutMessage寫入的消息中CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);//構建返回結果putResultFuture.thenAccept(result -> {long elapsedTime = this.getSystemClock().now() - beginTime;if (elapsedTime > 500) {log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);}this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);if (null == result || !result.isOk()) {this.storeStatsService.getPutMessageFailedTimes().add(1);}});return putResultFuture;}
3.3 事務消息持久化到commitLog-TransactionalMessageServiceImpl
public CompletableFuture<PutMessageResult> asyncPrepareMessage(MessageExtBrokerInner messageInner) {return transactionalMessageBridge.asyncPutHalfMessage(messageInner);}
public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) {return store.asyncPutMessage(parseHalfMessageInner(messageInner));}
可以看出,事務消息本質上還是調用的DefaultMessageStore來將日志信息持久化到commitLog中,但是在這之前間消息轉換成了half消息。其實就是重新設置了消息的topic為RMQ_SYS_TRANS_HALF_TOPIC,queueId為0,然后持久化到commitlog中。
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,String.valueOf(msgInner.getQueueId()));//設置事務消息為TRANSACTION_NOT_TYPEmsgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));//重新設置topic為RMQ_SYS_TRANS_HALF_TOPICmsgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());//事務消息的queueId都為0msgInner.setQueueId(0);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));return msgInner;}
看到這里,可能發現消息已經完成持久化了,那什么時候才能消費消息,什么時候建立的IndexFile呢,我們接下來就來探討一下這個問題。
4.消息重投遞-ReputMessageService
broker中有一個ReputMessageService的線程,它一致會變量commitLog中的消息,并且轉換成IndexFile和consumeQueue。
//重投遞就是一個線程,一致執行doReput方法@Overridepublic void run() {DefaultMessageStore.log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {Thread.sleep(1);this.doReput();} catch (Exception e) {DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);}}DefaultMessageStore.log.info(this.getServiceName() + " service end");}
private void doReput() {//如果重投遞的索引小于commitLog的最小的索引,便設置其從commitLog的第一條消息開始if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();}//一直循環,直到commitLog中存在消息for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {break;}//獲取commitLog中reputFromOffset后面的所有消息SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);if (result != null) {try {this.reputFromOffset = result.getStartOffset();//遍歷從commitLog中取出的消息內容for (int readSize = 0; readSize < result.getSize() && doNext; ) {//構建DispatchRequest進行消息重投遞DispatchRequest dispatchRequest =DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();if (dispatchRequest.isSuccess()) {if (size > 0) {//調用messageStore構建consumerqueue和indexFileDefaultMessageStore.this.doDispatch(dispatchRequest);//如果節點是主節點,并且消費方式采用長輪詢的方式,通過messageArrivingListener通知喚醒消費者拉取消息的線程開始拉取消息if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()&& DefaultMessageStore.this.messageArrivingListener != null) {DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());notifyMessageArrive4MultiQueue(dispatchRequest);}//重投遞索引往后加1this.reputFromOffset += size;readSize += size;//增加統計信息if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()).add(dispatchRequest.getMsgSize());}} else if (size == 0) {this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);readSize = result.getSize();}} else if (!dispatchRequest.isSuccess()) {if (size > 0) {log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);this.reputFromOffset += size;} else {doNext = false;// If user open the dledger pattern or the broker is master node,// it will not ignore the exception and fix the reputFromOffset variableif (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",this.reputFromOffset);this.reputFromOffset += result.getSize() - readSize;}}}}} finally {result.release();}} else {doNext = false;}}}
其實這個方法主要是兩個作用:
遍歷commitLog中的消息,然后:
1.建立consumequeue和IndexFile;
2.如果該broker為主節點,會喚醒消費者拉取消息的線程,(通知消費者消息到達)開始拉取消息。這里長輪詢其實就是消費者會來broker中拉取消息,如果有就返回,如果沒有變阻塞一段時間,等待消息產生。而這里其實就是通過messageArrivingListener來喚醒消費線程的。
4.2?建立consumQueue和indexFile
public void doDispatch(DispatchRequest req) {for (CommitLogDispatcher dispatcher : this.dispatcherList) {dispatcher.dispatch(req);}}
可以看出CommitLogDispatcher有多個實現類,我們接下來看一看建立consumequeue的CommitLogDispatcherBuildConsumeQueue和建立indexFile的CommitLogDispatcherBuildIndex。
4.2.1 建立consumeQueue-CommitLogDispatcherBuildConsumeQueue
其實就是根據topic和queueId找到對應的consumemequeue,并順序寫入對應的索引數據。
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {@Overridepublic void dispatch(DispatchRequest request) {//獲取事務類型final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());switch (tranType) {//如果普通消息或者事務的commit消息會建立consumeQueue,表示消息可以被立即消費case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE:DefaultMessageStore.this.putMessagePositionInfo(request);break;//如果是事務消息,不能建立consumequeuecase MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:break;}}}
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {//根據topic和queueId找到對應的consumequeueConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());//根據consumequeue順序寫入consumemequeue的索引數據cq.putMessagePositionInfoWrapper(dispatchRequest, checkMultiDispatchQueue(dispatchRequest));}
4.1.2 建立indexFile
前面講過,indexFile其實就是索引數據,它根據msgKey的hash值定位到hash槽,然后得到對應index數據,里面包含commitLog的物理偏移量,由此可以定位到具體的消息位置。
class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {@Overridepublic void dispatch(DispatchRequest request) {//如果開啟了IndexFile,便構建一條IndexFile數據,并且存儲到IndexFile文件中if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {DefaultMessageStore.this.indexService.buildIndex(request);}}}
4.3喚醒長輪詢消費線程-NotifyMessageArrivingListener
首先獲取到訂閱這批消息的consumer請求,然后根據bigMap來進行判斷這批消息的tag是否在cosumer的訂閱范圍內,如過在,便將這批消息交給PullMessageProcessor推送給consumer。
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {//獲取到對應的key topic@queueIdString key = this.buildKey(topic, queueId);//從pullRequestTable獲取到請求這個topic和queueId的消費者請求ManyPullRequest mpr = this.pullRequestTable.get(key);if (mpr != null) {List<PullRequest> requestList = mpr.cloneListAndClear();if (requestList != null) {List<PullRequest> replayList = new ArrayList<PullRequest>();//遍歷消費請求for (PullRequest request : requestList) {//當前這個queueID最大的consumeoffset是多好long newestOffset = maxOffset;if (newestOffset <= request.getPullFromThisOffset()) {newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);}//如果還有消息未拉取if (newestOffset > request.getPullFromThisOffset()) {//判斷這批消息是包含consumer訂閱的這個tagboolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));// match by bit map, need eval again when properties is not null.if (match && properties != null) {match = request.getMessageFilter().isMatchedByCommitLog(null, properties);}if (match) {try {//如果這批消息的tag是符合consumer消費的妖氣,便開始讓消費者拉取這批消息this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());} catch (Throwable e) {log.error("execute request when wakeup failed.", e);}continue;}}if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {try {this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());} catch (Throwable e) {log.error("execute request when wakeup failed.", e);}continue;}replayList.add(request);}if (!replayList.isEmpty()) {mpr.addPullRequest(replayList);}}}}
public void executeRequestWhenWakeup(final Channel channel,final RemotingCommand request) throws RemotingCommandException {Runnable run = new Runnable() {@Overridepublic void run() {try {//根據通過netty將這部分消息推送給消費者final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false);if (response != null) {response.setOpaque(request.getOpaque());response.markResponseType();try {channel.writeAndFlush(response).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {log.error("processRequestWrapper response to {} failed",future.channel().remoteAddress(), future.cause());log.error(request.toString());log.error(response.toString());}}});} catch (Throwable e) {log.error("processRequestWrapper process request over, but response failed", e);log.error(request.toString());log.error(response.toString());}}} catch (RemotingCommandException e1) {log.error("excuteRequestWhenWakeup run", e1);}}};//構建好推送任務過后,交給線程池執行this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request));}
4.4 broker接收consumer拉取消息請求-PullMessageProcessor
PullMessageProcessor主要用于響應consumer拉取消息的請求,processRequest由于代碼太長,我們就介紹一下它大概干了什么。
1.檢查消費者組的訂閱關系。
2.根據consume的topic和queueid和consumeoffset在consumequeue中定位到consumequeue的日志數據,并且根據consumequeue中的commitLog的物理偏移量獲取到具體的commitLog的日志消息。
3.如果沒有找到消息,便阻塞線程1s鐘,再來拉取消息。
4.如果是長輪詢的方式,這里不會自動的提交offset。
5.consumer消費消息
前面講過,consumer消費又推,拉和長輪詢3種方式,而我們常用的方式就是長輪詢,接下來我們來看看長輪詢的方式是如何拉取消息的。
5.1 長輪詢拉取消息-DefaultMQPushConsumerImpl
其步驟主要如下:
1.獲取pullRequest正在處理的queue信息,其實每個pullRequest都會拉取消息到本地緩存起來,而緩存的位置就是processqueue;
2.果當前請求對應的緩存的消息數量大于了1000條或者緩存大小超過100m,便讓該請求過50ms再拉取;
3如果是集群模式,從內存中讀取消費偏移量,因為每個消費者消費的信息是不一樣的,所以這個信息只能保存在每個消費者實例本地;
4.發送網絡請求,根據messaqueue和消費的consumeoffset從broker中拉取消息;
5.拉取到消息過后根據tag過濾消息;
6.調用ConsumeMessageService的submitConsumeRequest方法,將消息封裝成消費請求,放入到阻塞隊列中,等待業務方消費消息;
7.如果pullInterval這個參數大于0,便會間隔一會兒,再去broker拉取消息,否則立刻再次拉取下一批次的消息。
至此,consumer便已經從broker中拉取到了消息,并且交到阻塞隊列中,業務方只需要根據阻塞隊列中的內容,便能拉取到消息并實現自己的業務邏輯。
public void pullMessage(final PullRequest pullRequest) {//獲取pullRequest正在處理的queue信息,其實每個pullRequest都會拉取消息到本地緩存起來,而緩存的位置就是processqueuefinal ProcessQueue processQueue = pullRequest.getProcessQueue();if (processQueue.isDropped()) {log.info("the pull request[{}] is dropped.", pullRequest.toString());return;}//更新pull消息的最新拉取消息的事件pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());try {this.makeSureStateOK();} catch (MQClientException e) {log.warn("pullMessage exception, consumer state not ok", e);this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);return;}if (this.isPause()) {log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);return;}long cachedMessageCount = processQueue.getMsgCount().get();long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);//如果當前請求對應的緩存的消息數量大于了1000條,便讓該請求過50ms再請求if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn("the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return;}//如果緩存大小超過了100m,也等50ms再請求if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn("the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return;}//如果不是順序消息的話,超過了consumeConcurrentlyMaxSpan會進行流控if (!this.consumeOrderly) {if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL);if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {log.warn("the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),pullRequest, queueMaxSpanFlowControlTimes);}return;}} else {//如果是順序消息,需要判斷當前processqueue已經被加鎖if (processQueue.isLocked()) {if (!pullRequest.isPreviouslyLocked()) {long offset = -1L;try {//計算出當前consumer應該從哪個consumeoffset開始消費offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());} catch (Exception e) {this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e);return;}boolean brokerBusy = offset < pullRequest.getNextOffset();log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",pullRequest, offset, brokerBusy);if (brokerBusy) {log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",pullRequest, offset);}pullRequest.setPreviouslyLocked(true);pullRequest.setNextOffset(offset);}} else {this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);log.info("pull message later because not locked in broker, {}", pullRequest);return;}}//構建topic獲取到訂閱關系final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());if (null == subscriptionData) {this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);log.warn("find the consumer's subscription failed, {}", pullRequest);return;}final long beginTimestamp = System.currentTimeMillis();//這是需要執行的回調函數PullCallback pullCallback = new PullCallback() {@Overridepublic void onSuccess(PullResult pullResult) {if (pullResult != null) {//拉取到消息過后根據tag過濾消息pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,subscriptionData);switch (pullResult.getPullStatus()) {case FOUND://更新上一次拉取消息的地方long prevRequestOffset = pullRequest.getNextOffset();pullRequest.setNextOffset(pullResult.getNextBeginOffset());//設置拉取消息的時間long pullRT = System.currentTimeMillis() - beginTimestamp;DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),pullRequest.getMessageQueue().getTopic(), pullRT);long firstMsgOffset = Long.MAX_VALUE;if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);} else {//獲取在broker的consumequeue中的offsetfirstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());//將消息放入到consumer的本地緩存processqueue中boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());//將消息提交到阻塞隊列中,等待業務方消費消息DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);//如果pullInterval這個參數大于0,便會間隔一會兒,再去broker拉取消息,否則立刻再次拉取下一批次的消息if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());} else {DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);}}if (pullResult.getNextBeginOffset() < prevRequestOffset|| firstMsgOffset < prevRequestOffset) {log.warn("[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",pullResult.getNextBeginOffset(),firstMsgOffset,prevRequestOffset);}break;case NO_NEW_MSG:case NO_MATCHED_MSG:pullRequest.setNextOffset(pullResult.getNextBeginOffset());DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);break;case OFFSET_ILLEGAL:log.warn("the pull request offset illegal, {} {}",pullRequest.toString(), pullResult.toString());pullRequest.setNextOffset(pullResult.getNextBeginOffset());pullRequest.getProcessQueue().setDropped(true);DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {@Overridepublic void run() {try {DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),pullRequest.getNextOffset(), false);DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());log.warn("fix the pull request offset, {}", pullRequest);} catch (Throwable e) {log.error("executeTaskLater Exception", e);}}}, 10000);break;default:break;}}}@Overridepublic void onException(Throwable e) {if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("execute the pull request exception", e);}if (e instanceof MQBrokerException && ((MQBrokerException) e).getResponseCode() == ResponseCode.FLOW_CONTROL) {DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL);} else {DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);}}};boolean commitOffsetEnable = false;long commitOffsetValue = 0L;//如果是集群模式if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {//從內存中讀取消費偏移量,因為每個消費者消費的信息是不一樣的,所以這個信息只能保存在每個消費者實例本地commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);if (commitOffsetValue > 0) {commitOffsetEnable = true;}}String subExpression = null;boolean classFilter = false;//獲取訂閱信息SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());if (sd != null) {if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {subExpression = sd.getSubString();}classFilter = sd.isClassFilterMode();}int sysFlag = PullSysFlag.buildSysFlag(commitOffsetEnable, // commitOffsettrue, // suspendsubExpression != null, // subscriptionclassFilter // class filter);//發送網絡請求,根據messaqueue和消費的consumeoffset從broker中拉取消息try {this.pullAPIWrapper.pullKernelImpl(pullRequest.getMessageQueue(),subExpression,subscriptionData.getExpressionType(),subscriptionData.getSubVersion(),pullRequest.getNextOffset(),this.defaultMQPushConsumer.getPullBatchSize(),sysFlag,commitOffsetValue,BROKER_SUSPEND_MAX_TIME_MILLIS,CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,CommunicationMode.ASYNC,pullCallback);} catch (Exception e) {log.error("pullKernelImpl exception", e);this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);}}
5.2? 從本地消息緩存中消費消息-ConsumeMessageService
ConsumeMessageService也有兩種實現分別是并行消費和順序消費。
5.2.1 從本地緩存中并行消費消息-ConsumeMessageConcurrentlyService
1.將一批消息拆分成一條一條的消息,并封裝成消費請求,交給線程池執行
可以看出如果一次消息的數量小于1條(consumeMessageBatchMaxSize可以由它設置),便構建消息消費請求ConsumeRequest,并請交個線程池異步執行消費邏輯,如果消費失敗,這批消息會重試。前面講重復消費的時候說過這個參數,最好是每次從本地隊列中取出一條消息,這樣消費失敗,只會重試本條消息,而不會導致重復消費
public void submitConsumeRequest(final List<MessageExt> msgs, //消息內容final ProcessQueue processQueue, //具體的消息內容便在processQueue中,里面有一個map用來存儲消息,并且總的消息數量不能超過1000條,大小不能超過300mfinal MessageQueue messageQueue,//具體從broker哪個topic哪個queueId中開始消費的final boolean dispatchToConsume) {final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();if (msgs.size() <= consumeBatchSize) {//如果一次消息的數量小于1條(consumeMessageBatchMaxSize可以由它設置),便構建消息消費請求ConsumeRequest,并請交個線程池異步執行消費邏輯、//前面講重復消費的時候說過這個參數,最好是每次從本地隊列中取出一條消息,這樣消費失敗,只會重試本條消息,而不會導致重復消費ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);try {this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {this.submitConsumeRequestLater(consumeRequest);}} else {//如果超過一條,便將消息進行切割成一條的大小for (int total = 0; total < msgs.size(); ) {List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);for (int i = 0; i < consumeBatchSize; i++, total++) {if (total < msgs.size()) {msgThis.add(msgs.get(total));} else {break;}}//并且提交消費請求ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);try {this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {for (; total < msgs.size(); total++) {msgThis.add(msgs.get(total));}//如果消費失敗,過一段時間再重試this.submitConsumeRequestLater(consumeRequest);}}}}
2.消費者開始執行消費邏輯-ConsumeMessageConcurrentlyService$ConsumeRequest
ConsumeRequest是一個線程池,他會啟動線程消費提交來的消費請求,并做對應的邏輯處理,主要包括如下步驟:
1.如果擁有消費者前置鉤子函數,便執行鉤子函數中的before方法;
2.調用messageListener的consumeMessage方法,這個方法里面包含了我們真正消費消息的邏輯;
3.如果執行我們的消費邏輯超過了1分鐘,便會返回消費超時;
4.這里會執行后續邏輯,主要是返回消費進度consumeoffset;
public void run() {if (this.processQueue.isDropped()) {log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);return;}//我們寫代碼需要實現messageListener來實現自己的業務邏輯MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);ConsumeConcurrentlyStatus status = null;defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());ConsumeMessageContext consumeMessageContext = null;//如果擁有執行消費者前置鉤子函數,便執行鉤子函數if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext = new ConsumeMessageContext();consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());consumeMessageContext.setProps(new HashMap<String, String>());consumeMessageContext.setMq(messageQueue);consumeMessageContext.setMsgList(msgs);consumeMessageContext.setSuccess(false);ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);}long beginTimestamp = System.currentTimeMillis();boolean hasException = false;ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;try {if (msgs != null && !msgs.isEmpty()) {for (MessageExt msg : msgs) {MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));}}//我們代碼真正的消費邏輯在這里執行status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);} catch (Throwable e) {log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",RemotingHelper.exceptionSimpleDesc(e),ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue), e);hasException = true;}//獲取到消費時間long consumeRT = System.currentTimeMillis() - beginTimestamp;if (null == status) {if (hasException) {returnType = ConsumeReturnType.EXCEPTION;} else {returnType = ConsumeReturnType.RETURNNULL;}//如果消費時間大于1分鐘,便單反回消費超時} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {returnType = ConsumeReturnType.TIME_OUT;} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {returnType = ConsumeReturnType.FAILED;} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {returnType = ConsumeReturnType.SUCCESS;}if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());}if (null == status) {log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue);status = ConsumeConcurrentlyStatus.RECONSUME_LATER;}//執行消費者的后置鉤子if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.setStatus(status.toString());consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);consumeMessageContext.setAccessChannel(defaultMQPushConsumer.getAccessChannel());ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);}ConsumeMessageConcurrentlyService.this.getConsumerStatsManager().incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);if (!processQueue.isDropped()) {//這里會執行后續邏輯,主要是返回消費進度consumeoffsetConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);} else {log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);}}
3.維護本地消費進度
這一步主要是判斷有多少消息消費成功,并且在本地維護消費進度。
public void processConsumeResult(final ConsumeConcurrentlyStatus status,final ConsumeConcurrentlyContext context,final ConsumeRequest consumeRequest) {int ackIndex = context.getAckIndex();if (consumeRequest.getMsgs().isEmpty())return;//這里是consumer對消費者消費成功和失敗批次消息的統計switch (status) {case CONSUME_SUCCESS://如果消費成功,初始時,將已經消費的index設置為0if (ackIndex >= consumeRequest.getMsgs().size()) {ackIndex = consumeRequest.getMsgs().size() - 1;}//將消費成功的批次加1int ok = ackIndex + 1;int failed = consumeRequest.getMsgs().size() - ok;this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);break;case RECONSUME_LATER:ackIndex = -1;this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),consumeRequest.getMsgs().size());break;default:break;}//這里是返回消費成功的消息switch (this.defaultMQPushConsumer.getMessageModel()) {//如果是廣播模式,打印消息,case BROADCASTING:for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());}break;//如果是集群模式,統計消費成功和失敗的消息,再次重試case CLUSTERING:List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);boolean result = this.sendMessageBack(msg, context);if (!result) {msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);msgBackFailed.add(msg);}}if (!msgBackFailed.isEmpty()) {consumeRequest.getMsgs().removeAll(msgBackFailed);this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());}break;default:break;}//獲取到消費成功的offsetlong offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {//維護存儲在本地的consumeoffsetthis.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);}}
5.2.1 從本地緩存中順序消費消息-ConsumeMessageOrderlyService
1.提交消費請求
public void submitConsumeRequest(final List<MessageExt> msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispathToConsume) {if (dispathToConsume) {//直接構建消費,并且存儲到消費隊列中,讓線程池消費ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);this.consumeExecutor.submit(consumeRequest);}}
2.消費者執行消費邏輯-ConsumeMessageOrderlyService$ConsumeRequest
其實這一邏輯和并發消息的邏輯一模一樣,只是會加鎖,防止多線程場景下,并發消費,打亂消息的順序。
public void run() {if (this.processQueue.isDropped()) {log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);return;}//順序消息為了防止并發導致的消費順序問題,所以會加鎖final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);synchronized (objLock) {//如果是廣播模式 或者加鎖成功 或者鎖沒有過期 if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {final long beginTime = System.currentTimeMillis();for (boolean continueConsume = true; continueConsume; ) {if (this.processQueue.isDropped()) {log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);break;}//如果是集群模式 && 到這里鎖已經過期,便稍后再試if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())&& !this.processQueue.isLocked()) {log.warn("the message queue not locked, so consume later, {}", this.messageQueue);ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);break;}if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())&& this.processQueue.isLockExpired()) {log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);break;}//如果搶鎖時間超過了1分鐘,便提交消費請求到消費隊列中,稍后再試long interval = System.currentTimeMillis() - beginTime;if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);break;}//舒心消費也是一次只能消費一條消息final int consumeBatchSize =ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();//順序消費會從消息本地緩存中取出一條消息,注意這個動作也是加鎖的List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());if (!msgs.isEmpty()) {final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);ConsumeOrderlyStatus status = null;ConsumeMessageContext consumeMessageContext = null;//執行消費前置鉤子函數if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext = new ConsumeMessageContext();consumeMessageContext.setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());consumeMessageContext.setMq(messageQueue);consumeMessageContext.setMsgList(msgs);consumeMessageContext.setSuccess(false);// init the consume context typeconsumeMessageContext.setProps(new HashMap<String, String>());ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);}long beginTimestamp = System.currentTimeMillis();ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;boolean hasException = false;try {this.processQueue.getConsumeLock().lock();if (this.processQueue.isDropped()) {log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",this.messageQueue);break;}//調用我們的業務邏輯代碼status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);} catch (Throwable e) {log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",RemotingHelper.exceptionSimpleDesc(e),ConsumeMessageOrderlyService.this.consumerGroup,msgs,messageQueue), e);hasException = true;} finally {this.processQueue.getConsumeLock().unlock();}if (null == status|| ConsumeOrderlyStatus.ROLLBACK == status|| ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}",ConsumeMessageOrderlyService.this.consumerGroup,msgs,messageQueue);}long consumeRT = System.currentTimeMillis() - beginTimestamp;if (null == status) {if (hasException) {returnType = ConsumeReturnType.EXCEPTION;} else {returnType = ConsumeReturnType.RETURNNULL;}//如果消費超過60分鐘,便返回消費超時} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {returnType = ConsumeReturnType.TIME_OUT;} else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {returnType = ConsumeReturnType.FAILED;} else if (ConsumeOrderlyStatus.SUCCESS == status) {returnType = ConsumeReturnType.SUCCESS;}if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());}if (null == status) {status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.setStatus(status.toString());consumeMessageContext.setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);consumeMessageContext.setAccessChannel(defaultMQPushConsumer.getAccessChannel());ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);}ConsumeMessageOrderlyService.this.getConsumerStatsManager().incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);//執行消費完成的邏輯,比如維護消費者的消費進度continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);} else {continueConsume = false;}}} else {if (this.processQueue.isDropped()) {log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);return;}ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);}}}}
3.維護消費進度
這一邏輯和并發消費一摸一樣,會更新本地的消費進度。
6.總結
至此,消費者從產生到存儲,再到消費的怎個邏輯我們應該是很清晰了。
在生產端,生產者根據負載均衡策略(比如輪詢或者一致性hash等)選擇對應的messagequeue所在的broker中,然后通過Netty,將消費發送broker去;
broker收到消息過后,會直接將消息持久化到commitLog中,然后再單獨的啟動一個線程,根據持久化的commitLog建立IndexFile和consumequeue,并且會喚醒因為長輪詢阻塞的消費線程,通過Netty將消發送到consume中;
消費者收到消息過后,會先存儲到本地的一個Map中,提交一個消費請求到消費線程池中,消費線程會每次從Map中取出一條消息,調用我們重寫的consumeMessage方法,進行消費,消費完成過后,維護好消費進度。