Overview
可以看到,pull message和consume message實際上是兩個過程,但是對于用戶是透明的?
注意這三個Offset的含義,physical offset就是commitLog中的全局偏移量
分發dispatch
如上圖,Topic的每個queue,都綁定了唯一的一個pullRequest對象,每個pullRequest對象也都綁定了唯一的一個紅黑樹隊列processQueue
每個虛線框,代表一個Java類、相同的顏色方框代表是一個線程中的調用,右上角藍色方框就代表CallBack線程? ?
- RebalanceService線程在進行rebalance時,會為每個pushConsumer(pullConsumer不歸PullMessageService線程負責)負責的每個queue都分配一個專門的PullRequest,然后調用PullMessageService#executePullRequestImmediately(),把每個PullRequest都丟進PullMessageService線程的阻塞隊列pullRequestQueue中去
- PullMessageService線程自己的run()方法就負責不斷從pullRequestQueue中拿PullRequest,并根據PullRequest中記錄的offset從broker去拉取消息放入本地紅黑樹
每個消費者客戶端有一個PullMessageService線程,負責多個topic的消息拉取
PullMessageService線程,在broker端有充足的消息時,
PullMessageService線程,通過一個while循環loop,來執行3個步驟:
- 從pullResuqstQueue中take獲取pullRequest
- 獲取到pullRequest后,執行一些流控動作;
- 給broker發送一個異步的拉消息請求(并給broker傳遞一個回調函數)
當發送到broker端的拉消息請求拉回了消息后,會發送響應到客戶端,客戶端的netty IO線程會接收到響應,并把響應轉給NettyClientPublicExcutor線程池中的線程,然后這個線程池中的線程會調用前面的回調函數,來把消息寫入到TreeMap中
一般異步調用,典型的都會傳遞一個callback或者listener進去。最后會有另外一個IO線程來調起這個callback或者listener
doRebalance線程,有兩個場景會將它喚醒:
- waitForRunning 20s后自己主動醒來
- broker端wake?doRebalance線程
Pull & Push
Rebalance
topic的queue數量變化、Consumer group中的consumer數量變化時,都會引起rebalance
可以看出,topic的queue越多,也就代表著有更大的并行度潛力
rocketmq的rebalance:每個broker都執行上面的邏輯,對所有的隊列,和所有的消費者進行排序。讓每個broker都看到同樣的視圖 ,
kafka的rebalance是在某一臺機器上執行的,kafka需要借助zk來選擇出leader
Pull Message
PullMessageService線程實現
public class PullMessageService extends ServiceThread {private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();private final MQClientInstance mQClientFactory;private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "PullMessageServiceScheduledThread");}});public PullMessageService(MQClientInstance mQClientFactory) {this.mQClientFactory = mQClientFactory;}public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {if (!isStopped()) {this.scheduledExecutorService.schedule(new Runnable() {@Overridepublic void run() {PullMessageService.this.executePullRequestImmediately(pullRequest);}}, timeDelay, TimeUnit.MILLISECONDS);} else {log.warn("PullMessageServiceScheduledThread has shutdown");}}public void executePullRequestImmediately(final PullRequest pullRequest) {try {this.pullRequestQueue.put(pullRequest);} catch (InterruptedException e) {log.error("executePullRequestImmediately pullRequestQueue.put", e);}}private void pullMessage(final PullRequest pullRequest) {// 一臺消費者機器就用一個MQClientInstance表示,//一個MQClientInstance中可能存放了很多不同的consumer,這些consumer訂閱著不同的consumeGroupfinal MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if (consumer != null) {// 注意看,這里僅有DefaultMQPushConsumerImpl ,PullComsumer不在此列DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;impl.pullMessage(pullRequest);} else {log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);}}@Overridepublic void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {PullRequest pullRequest = this.pullRequestQueue.take();this.pullMessage(pullRequest);} catch (InterruptedException ignored) {} catch (Exception e) {log.error("Pull Message Service Run Method exception", e);}}log.info(this.getServiceName() + " service end");}}
一臺消費者機器就用一個MQClientInstance表示
public class MQClientInstance {private final ClientConfig clientConfig;private final String clientId;private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();}
一個MQClientInstance的consumerTable中可能存放了很多不同的consumer,這些consumer訂閱著不同的consumeGroup,不同的consumer就對應著有不同的DefaultMQPushConsumerImpl實例,不同的DefaultMQPushConsumerImpl實例都會通過自己的start()方法來啟動很多只屬于自己的服務
比如每個DefaultMQPushConsumerImpl實例都會啟動自己專屬的coreSize和maxSize都為20的“ConsumeMessageThread_”消費線程池
這么看起來MQClientInstance的consumerTable中是沒有放PullComsumer的
PullRequest
public class PullRequest {private String consumerGroup;private MessageQueue messageQueue;private ProcessQueue processQueue; // 本地紅黑樹緩存隊列private long nextOffset;private boolean lockedFirst = false;
}
PullRequest是和消費者組消費者的某一個隊列綁定的,如果同一臺機器上,兩個不同的消費者組的消費者,訂閱消費著同一個topic下的同一個queue隊列,那么也會有兩個不同的PullRequest,會被丟進同一個PullMessageService的阻塞隊列pullRequestQueue中去
左側的PullMessage就是PullMessageService線程;
左側的線程執行完channel.writeAndFlush,來把PullRequest從消費端發送到broker端該channel對應的recv_queue后,左側的線程就可以直接返回,就可以去做下一件事了,不用還繼續去等待broker返回該PullRequest對應的響應。這樣做的好處就是線程非常輕量
粉紅色條狀,代表一個業務線程池。rocketmq的broker,定義了很多的key/value形式的pair對,key就是不同具體業務的編碼,value就是不同編碼對應的業務線程池,從而實現不同的業務隔離。
后續,netty的IO線程(worker線程組中的某個線程),接收到它負責的selector下的某個channel中的READ事件后,就會調ChannelInHandler#channelRead()方法,把消費端發過來的PullRequest對象對應序列化后的二進制數組數據,從該個channel在內核的recv_queue中讀取出來,然后netty的IO線程就把該PullRequest丟給專門負責去CommitLog中搜取消息的業務線程池(丟完后,該IO線程就可以去干下一件事了),讓這個業務線程池中的某一個線程,根據該PullRequest中的offset去ConsumeQueue->Commitlog中拉取當前批次的32條消息,成功拉取到后,是由這個業務線程池中的當前線程,執行channel.writeAndFlush()方法,把這32條消息封裝為pullResult發回Consumer端
Netty Thread(NettyClientHandler)的channelRead()拿到broker端傳過來的pullResult后,把它丟入NettyClientPublicExceutor_這個線程池中去,NettyClientPublicExceutor線程池中的線程,實際上也就是執行callback線程或者執行異步回調listener的線程
執行callback的NettyClientPublicExceutor線程池中的線程,也就是客戶端最開始拿到消息的線程
1. NettyClientPublicExceutor線程池中的線程,對pullResult內的東西進行反序列化,得到一批msg(一般為32條)
2. 再將這些msg放入紅黑樹中
3. 再把已經進入紅黑樹中的32條消息,丟入專門負責消費的一個線程池中去
有兩個時機會上報消費進度:
- 定時器每5s會定時上報一次本地offsetTable中的消費進度
- 客戶端每次異步發送PullRequest請求到broker端時,也會攜帶本地的消費進度值給broker
Consumer端,是有一個ConcurrentMap<MessageQueue,AtomicLong>?的本地緩存,當前消費線程消費的是哪個隊列的消息,消費完這一條消息后,無論消費成功還是失敗,都會把該緩存的MessageQueue對應的AtomicLong更新為紅黑樹的firstKey的值,也就是紅黑樹中的最小消息offset的值(這個值,也就是Consumer端存在本地的消費進度值)
32條消息,先被丟入紅黑樹隊列中,然后會被分別包裝成32個Runnable類型的ConsumeRequest,這32個ConsumeRequest會被丟入客戶端的消費線程池,消費線程池中的線程們,會并發的消費這32條消息,消費線程池中的線程1 2 3 … 20,會回調程序員注冊進來的ConsumeMessageListener的consumeMessage()方法來執行真正的消費邏輯,不管consumeMessage()方法是消費成功、失敗、拋異常,消費線程1 2 3 … 20,都會把自己當前正在消費的消息從紅黑樹隊列中刪除,并同時更新consumer本地的消費進度緩存。注意,由于消費線程池并發消費消息,也就有可能并發的從紅黑樹中remove消息,所以紅黑樹的這個remove方法,需要加鎖
唯一的區別是,consumeMessage()方法消費失敗時,會把retryTimes?1,并把消息重新發回broker以便第二次重試消費,而消費成功時,則不會再再把該條消息發回broker端。但是,不管消費成功,還是消費失敗,該條消息對應的offset,已經被順利度過了,以后就不會再消費該offset的消息了,以后就算是之前被發回broker的重試消費的消息再次被拉回客戶端消費時,則已經不是原來的offset了,此時的消息offset,肯定早已經比原來的offset大了
這一塊pageCache一直在內存中,一段時間內,讀取的都是這塊熱點區域的消息,所以讀取的消息是有保證的,這也是為什么rocketmq高效的原因之一;大量用了PageCache?
堆外內存,是原作者在公司的性能調優的利器,排名第二的調優才是jvm gc調優
但是這個堆外內存池有一個缺點,就是jvm crash時,會丟失數據,jvm正常是不會丟消息的,也沒什么毛刺非常平穩,阿里雙十一幾十萬的tps
消息拉取限流
本地紅黑樹中消息大于1000條,是拉取動作會暫停,紅黑樹中消息大于100m也會暫停,紅黑樹中最小和最大消息offset超過2000,也會限流
public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}
}
之前某位同事,在注冊的監聽器中,調用了某個公司的某個tcp方法,導致16個隊列,就某一個隊列不消費了。那么此時首先懷疑的就是這個監聽器中可能有某個遠程調用是阻塞住了的,最懷疑的就應該是這個監聽器里面的HttpClient的遠程調用,就這個調用沒設置超時參數,就會一直卡住,從而一直導致一直打印:"the cached message count exceeds the threshold {}, so do flow control,之類的限流日志
客戶端
package org.apache.rocketmq.client.impl.consumer;public class PullMessageService extends ServiceThread {private final InternalLogger log = ClientLogger.getLog();private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();private final MQClientInstance mQClientFactory;// 大量使用調度線程池,來做延時調度private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "PullMessageServiceScheduledThread");}});public PullMessageService(MQClientInstance mQClientFactory) {this.mQClientFactory = mQClientFactory;}public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {if (!isStopped()) {this.scheduledExecutorService.schedule(new Runnable() {@Overridepublic void run() {PullMessageService.this.executePullRequestImmediately(pullRequest);}}, timeDelay, TimeUnit.MILLISECONDS);} else {log.warn("PullMessageServiceScheduledThread has shutdown");}}public void executePullRequestImmediately(final PullRequest pullRequest) {try {this.pullRequestQueue.put(pullRequest);} catch (InterruptedException e) {log.error("executePullRequestImmediately pullRequestQueue.put", e);}}public void executeTaskLater(final Runnable r, final long timeDelay) {if (!isStopped()) {this.scheduledExecutorService.schedule(r, timeDelay, TimeUnit.MILLISECONDS);} else {log.warn("PullMessageServiceScheduledThread has shutdown");}}public ScheduledExecutorService getScheduledExecutorService() {return scheduledExecutorService;}private void pullMessage(final PullRequest pullRequest) {final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if (consumer != null) {DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;impl.pullMessage(pullRequest);} else {log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);}}@Overridepublic void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {// 從阻塞隊列中拿 pullRequest PullRequest pullRequest = this.pullRequestQueue.take();this.pullMessage(pullRequest);} catch (InterruptedException ignored) {} catch (Exception e) {log.error("Pull Message Service Run Method exception", e);}}log.info(this.getServiceName() + " service end");}@Overridepublic void shutdown(boolean interrupt) {super.shutdown(interrupt);ThreadUtils.shutdownGracefully(this.scheduledExecutorService, 1000, TimeUnit.MILLISECONDS);}@Overridepublic String getServiceName() {return PullMessageService.class.getSimpleName();}}
Topic的每個queue,都有一個唯一對應的PullRequest
public class PullRequest {private String consumerGroup;private MessageQueue messageQueue;private ProcessQueue processQueue;private long nextOffset;private boolean lockedFirst = false;
}
nextOffset就是下一次要從messageQueue的哪個位置開始拉取消息,nextOffset在此處代表messageQueue的queue offset(0 1 2...)
public class DefaultMQPushConsumerImpl implements MQConsumerInner {public void pullMessage(final PullRequest pullRequest) {/** 給每個pullRequest都附帶一個紅黑樹,然后每個pullRequest從broker端拿回消息后,就往自己附帶的紅黑樹中丟;* */final ProcessQueue processQueue = pullRequest.getProcessQueue();if (processQueue.isDropped()) {log.info("the pull request[{}] is dropped.", pullRequest.toString());return;}pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());try {this.makeSureStateOK();} catch (MQClientException e) {log.warn("pullMessage exception, consumer state not ok", e);this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);return;}if (this.isPause()) {log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);return;}long cachedMessageCount = processQueue.getMsgCount().get();long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);/** 限流:消息消費端紅黑樹隊列中積壓的消息不超過 1000 條* 當限流判斷不通過時,是直接就return了,* 都不會再去走調用broker端的邏輯,從而緩解broker端的壓力* * */if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn( "the cached message count exceeds the threshold {}, so do flow control, " +"minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(),processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return;}/** 消息處理隊列中積壓的消息總大小超過 100M* */if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn( "the cached message size exceeds the threshold {} MiB, so do flow control," +" minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(),processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return;}if (!this.consumeOrderly) {/** 如果是非順序消費:* 消息處理隊列中盡管積壓沒有超過 1000 條,但紅黑樹隊列中,消息的最大偏移量與最小偏移量的差值超過 2000* */if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {log.warn( "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),pullRequest, queueMaxSpanFlowControlTimes);}return;}} else {if (processQueue.isLocked()) {if (!pullRequest.isLockedFirst()) {final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());boolean brokerBusy = offset < pullRequest.getNextOffset();log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",pullRequest, offset, brokerBusy);if (brokerBusy) {log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",pullRequest, offset);}pullRequest.setLockedFirst(true);pullRequest.setNextOffset(offset);}} else {this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);log.info("pull message later because not locked in broker, {}", pullRequest);return;}}final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());if (null == subscriptionData) {this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);log.warn("find the consumer's subscription failed, {}", pullRequest);return;}final long beginTimestamp = System.currentTimeMillis();PullCallback pullCallback = new PullCallback() {/** 這里,拿到broker端發送給客戶端的pullResult*channelRead()拿到broker端傳過來的pullResult后,把它丟入 NettyClientPublicExecutor 這個線程池中去,NettyClientPublicExecutor線程實際上也就是執行callback的線程; callback線程就根據成功還是失敗調用下面的函數1. NettyClientPublicExecutor線程池中的線程,對pullResult內的東西進行反序列化,得到一些msg(一般為32條)2. 先將這批msg放入紅黑樹中,3. 再把這批消息,丟入專門負責消費的一個線程池中去NettyClientPublicExecutor*/@Overridepublic void onSuccess(PullResult pullResult) {if (pullResult != null) {/** 1. 客戶端發送了一個拉消息的請求,broker端返回FOUND代表查到了消息** 實際上,broker端發回的消息,是很多的二進制流,但是業務程序員在listener內,拿到的消息是List<Msg>這里涉及的轉換從左,就是在processPullResult這里* */pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(),pullResult,subscriptionData);switch (pullResult.getPullStatus()) {/** 1. 拿到broker返回回來的消息響應,先反序列化* */case FOUND:/** getNextBeginOffset,表示下一輪要拉的消息的offset,然后繼續發送拉消息請求* */long prevRequestOffset = pullRequest.getNextOffset();pullRequest.setNextOffset(pullResult.getNextBeginOffset());long pullRT = System.currentTimeMillis() - beginTimestamp;DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),pullRequest.getMessageQueue().getTopic(), pullRT);long firstMsgOffset = Long.MAX_VALUE;if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);} else {firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());/** Topic的每個queue,都綁定了唯一的一個pullRequest對象** 給每個pullRequest都附帶一個紅黑樹,然后每個pullRequest拿到消息以后,就往自己附帶的紅黑樹中丟;* processQueue里面實際放的就是一個TreeMap紅黑樹** 將從broker拿回來的消息(默認一次拉32條),丟入processQueue中* */boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());/** 將拉回來的32條消息,丟入消費線程池* */DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);/** 將更新過nextOffset后的新的pullRequest,重新放入到pullRequestQueue中去;* */if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());} else {DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);}}if (pullResult.getNextBeginOffset() < prevRequestOffset|| firstMsgOffset < prevRequestOffset) {log.warn("[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",pullResult.getNextBeginOffset(),firstMsgOffset,prevRequestOffset);}break;case NO_NEW_MSG:pullRequest.setNextOffset(pullResult.getNextBeginOffset());DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);break;case NO_MATCHED_MSG:pullRequest.setNextOffset(pullResult.getNextBeginOffset());DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);break;case OFFSET_ILLEGAL:log.warn("the pull request offset illegal, {} {}",pullRequest.toString(), pullResult.toString());pullRequest.setNextOffset(pullResult.getNextBeginOffset());pullRequest.getProcessQueue().setDropped(true);DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {@Overridepublic void run() {try {DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),pullRequest.getNextOffset(), false);DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());log.warn("fix the pull request offset, {}", pullRequest);} catch (Throwable e) {log.error("executeTaskLater Exception", e);}}}, 10000);break;default:break;}}}@Overridepublic void onException(Throwable e) {if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("execute the pull request exception", e);}DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);}};boolean commitOffsetEnable = false;long commitOffsetValue = 0L;if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);if (commitOffsetValue > 0) {commitOffsetEnable = true;}}String subExpression = null;boolean classFilter = false;SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());if (sd != null) {if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {subExpression = sd.getSubString();}classFilter = sd.isClassFilterMode();}int sysFlag = PullSysFlag.buildSysFlag(commitOffsetEnable, // commitOffsettrue, // suspendsubExpression != null, // subscriptionclassFilter // class filter);try {/** 這里把queue id,nextOffset傳給broker端,* broker端的 PullMessageProcessor#processRequest()接收到拉消息的請求* */this.pullAPIWrapper.pullKernelImpl(/** 告訴broker,要從哪個topic的哪個MessageQueue取消息* */pullRequest.getMessageQueue(),subExpression,subscriptionData.getExpressionType(),subscriptionData.getSubVersion(),/** 告訴broker,要從哪個topic的哪個MessageQueue,取該queue的 queue offset為幾的索引塊,比如取第1或者第2塊* 拿到索引塊,就能拿到該索引塊中的 全局物理偏移量 + 消息size* */pullRequest.getNextOffset(),this.defaultMQPushConsumer.getPullBatchSize(),sysFlag,/** 這里也在上報消費進度,即每次拉消息的同時,也在上報消費進度** 也就是說,上報消費進度有兩個途徑:* 1. 這里拉消息是上報* 2. 每隔persistConsumerOffsetInterval = 1000 * 5,往broker端的 ConsumerOffsetManager 的offsetTable中,更新一次消費進度* */commitOffsetValue,/** 長輪詢的最大超時時間,不能讓broker無限制的hold住consumer端的拉請求* 不然這樣,當broker hold住的請求太多時,broker的內存會扛不住的(設置超時時間,是系統兜底的重要策略)* */BROKER_SUSPEND_MAX_TIME_MILLIS,CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,CommunicationMode.ASYNC,/*向broker發送拉消息的請求,并向broker傳遞一個pullCallback*/pullCallback);} catch (Exception e) {log.error("pullKernelImpl exception", e);this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);}}}
消息的解碼和客戶端二次過濾
public class PullAPIWrapper {public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,final SubscriptionData subscriptionData) {PullResultExt pullResultExt = (PullResultExt) pullResult;this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());if (PullStatus.FOUND == pullResult.getPullStatus()) {/** decodes:反序列化** 將broker端,通過網絡請求發回的二進制流格式的消息,解碼成實際的一條條消息* */ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);/** 消息的二次過濾(客戶端過濾)** 這里不再像broker端用hashCode值比對了,而是用tag本身進行比對* */List<MessageExt> msgListFilterAgain = msgList;if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());for (MessageExt msg : msgList) {if (msg.getTags() != null) {// 用tag 比對if (subscriptionData.getTagsSet().contains(msg.getTags())) {msgListFilterAgain.add(msg);}}}}if (this.hasHook()) {FilterMessageContext filterMessageContext = new FilterMessageContext();filterMessageContext.setUnitMode(unitMode);filterMessageContext.setMsgList(msgListFilterAgain);this.executeHook(filterMessageContext);}for (MessageExt msg : msgListFilterAgain) {String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (Boolean.parseBoolean(traFlag)) {msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));}MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,Long.toString(pullResult.getMinOffset()));MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,Long.toString(pullResult.getMaxOffset()));}pullResultExt.setMsgFoundList(msgListFilterAgain);}pullResultExt.setMessageBinary(null);return pullResult;}
拉取結果類結構
public class PullResult {private final PullStatus pullStatus;private final long nextBeginOffset;private final long minOffset;private final long maxOffset;private List<MessageExt> msgFoundList;
}
服務端
package org.apache.rocketmq.remoting.netty;public abstract class NettyRemotingAbstract {/*** Entry of incoming command processing.* NettyRemotingClient 和 NettyRemotingServer,都繼承了NettyRemotingAbstract** NettyRemotingClient 和 NettyRemotingServer,也都會調用這個processMessageReceived()方法*/public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {final RemotingCommand cmd = msg;if (cmd != null) {switch (cmd.getType()) {case REQUEST_COMMAND:processRequestCommand(ctx, cmd);break;case RESPONSE_COMMAND:processResponseCommand(ctx, cmd);break;default:break;}}}
}
網絡請求的路由分發處
public abstract class NettyRemotingAbstract {/*** Process incoming request command issued by remote peer.** @param ctx channel handler context.* @param cmd request command.*/public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;final int opaque = cmd.getOpaque();if (pair != null) {Runnable run = new Runnable() {@Overridepublic void run() {try {doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);/** 根據不同的請求類型(不同的requestCode)* 選擇不同的NettyRequestProcessor的子類,來執行請求** 比如,如果是拉消息請求,那么就調用PullMessageProcessor的processRequest()來處理請求* 當然,最終還是要依靠 Pair<NettyRequestProcessor, ExecutorService> pair中的 ExecutorService線程池,來真正執行請求** 所以,也可以認識到PullMessageProcessor的processRequest()方法,是活在線程池中的,* 也就是說,它是會被多線程調用的,也就是說它內部的容器,是可能會有線程安全問題的;* */final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);if (!cmd.isOnewayRPC()) {if (response != null) {response.setOpaque(opaque);response.markResponseType();try {ctx.writeAndFlush(response);} catch (Throwable e) {log.error("process request over, but response failed", e);log.error(cmd.toString());log.error(response.toString());}} else {}}} catch (Throwable e) {log.error("process request exception", e);log.error(cmd.toString());if (!cmd.isOnewayRPC()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,RemotingHelper.exceptionSimpleDesc(e));response.setOpaque(opaque);ctx.writeAndFlush(response);}}}};if (pair.getObject1().rejectRequest()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,"[REJECTREQUEST]system busy, start flow control for a while");response.setOpaque(opaque);ctx.writeAndFlush(response);return;}try {/** 接收到客戶端發來的,拉取消息的請求后,調用NettyRemotingAbstract#processRequestCommand()方法* 將客戶端發過來的拉消息的request,重新組裝成一個runnable,丟入新線程池 pullMessageExecutor 中;** 也就是說,這里netty線程,在得到客戶端的拉消息請求后,并沒有直接就開始處理* netty線程而是,將請求又丟給業務線程池,目的是為了保持netty線程的輕量** 這里的線程池,也就是BrokerController#registerProcessor()中注冊的指令與線程池組* */final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);pair.getObject2().submit(requestTask);} catch (RejectedExecutionException e) {if ((System.currentTimeMillis() % 10000) == 0) {log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())+ ", too many requests and system thread pool busy, RejectedExecutionException "+ pair.getObject2().toString()+ " request code: " + cmd.getCode());}if (!cmd.isOnewayRPC()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,"[OVERLOAD]system busy, start flow control for a while");response.setOpaque(opaque);ctx.writeAndFlush(response);}}} else {String error = " request type " + cmd.getCode() + " not supported";final RemotingCommand response =RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);response.setOpaque(opaque);ctx.writeAndFlush(response);log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);}}/*** Process response from remote peer to the previous issued requests.** @param ctx channel handler context.* @param cmd response command instance.*/public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {final int opaque = cmd.getOpaque();final ResponseFuture responseFuture = responseTable.get(opaque);if (responseFuture != null) {responseFuture.setResponseCommand(cmd);responseTable.remove(opaque);if (responseFuture.getInvokeCallback() != null) {executeInvokeCallback(responseFuture);} else {responseFuture.putResponse(cmd);responseFuture.release();}} else {log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));log.warn(cmd.toString());}}}
按照請求類型code,進行請求轉發
既然這里有使用這些XxxxxProcessor,那么肯定就有注冊它們的位置,就在BrokerController的初始化邏輯里面
public class BrokerController {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);private static final InternalLogger LOG_PROTECTION = InternalLoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME);private static final InternalLogger LOG_WATER_MARK = InternalLoggerFactory.getLogger(LoggerName.WATER_MARK_LOGGER_NAME);private final BrokerConfig brokerConfig;private final NettyServerConfig nettyServerConfig;private final NettyClientConfig nettyClientConfig;private final MessageStoreConfig messageStoreConfig;/** 管理topic隊列的消費進度* */private final ConsumerOffsetManager consumerOffsetManager;private final ConsumerManager consumerManager;private final ConsumerFilterManager consumerFilterManager;private final ProducerManager producerManager;private final ClientHousekeepingService clientHousekeepingService;private final PullMessageProcessor pullMessageProcessor;private final PullRequestHoldService pullRequestHoldService;private final MessageArrivingListener messageArrivingListener;private final Broker2Client broker2Client;private final SubscriptionGroupManager subscriptionGroupManager;private final ConsumerIdsChangeListener consumerIdsChangeListener;private final RebalanceLockManager rebalanceLockManager = new RebalanceLockManager();private final BrokerOuterAPI brokerOuterAPI;private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("BrokerControllerScheduledThread"));private final SlaveSynchronize slaveSynchronize;/** 各種業務阻塞隊列,和下方的各種隔離的業務線程池對應** */private final BlockingQueue<Runnable> sendThreadPoolQueue;private final BlockingQueue<Runnable> pullThreadPoolQueue;private final BlockingQueue<Runnable> replyThreadPoolQueue;private final BlockingQueue<Runnable> queryThreadPoolQueue;private final BlockingQueue<Runnable> clientManagerThreadPoolQueue;private final BlockingQueue<Runnable> heartbeatThreadPoolQueue;private final BlockingQueue<Runnable> consumerManagerThreadPoolQueue;private final BlockingQueue<Runnable> endTransactionThreadPoolQueue;private final FilterServerManager filterServerManager;private final BrokerStatsManager brokerStatsManager;private final List<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();private final List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();private MessageStore messageStore;private RemotingServer remotingServer;private RemotingServer fastRemotingServer;private TopicConfigManager topicConfigManager;/** 各種隔離的業務線程池** */private ExecutorService sendMessageExecutor;private ExecutorService pullMessageExecutor;private ExecutorService replyMessageExecutor;private ExecutorService queryMessageExecutor;private ExecutorService adminBrokerExecutor;private ExecutorService clientManageExecutor;private ExecutorService heartbeatExecutor;private ExecutorService consumerManageExecutor;private ExecutorService endTransactionExecutor;private boolean updateMasterHAServerAddrPeriodically = false;private BrokerStats brokerStats;private InetSocketAddress storeHost;private BrokerFastFailure brokerFastFailure;private Configuration configuration;private FileWatchService fileWatchService;private TransactionalMessageCheckService transactionalMessageCheckService;private TransactionalMessageService transactionalMessageService;private AbstractTransactionalMessageCheckListener transactionalMessageCheckListener;private Future<?> slaveSyncFuture;private Map<Class,AccessValidator> accessValidatorMap = new HashMap<Class, AccessValidator>();public BrokerController(final BrokerConfig brokerConfig,final NettyServerConfig nettyServerConfig,final NettyClientConfig nettyClientConfig,final MessageStoreConfig messageStoreConfig) {this.brokerConfig = brokerConfig;this.nettyServerConfig = nettyServerConfig;this.nettyClientConfig = nettyClientConfig;this.messageStoreConfig = messageStoreConfig;this.consumerOffsetManager = new ConsumerOffsetManager(this);/** autoCreateTopicEnable機制-Step1:在Broker啟動流程中,會構建TopicConfigManager對象,其構造方法中首先會判斷是否開啟了允許自動創建主題,* 如果啟用了自動創建主題,則向topicConfigTable中添加默認主題的路由信息。** 在Broker端的topic配置管理器中存在的路由信息,* 一方面會向Nameserver發送心跳包,匯報到Nameserver,* 另一方面會有一個定時任務,定時存儲在broker端,具體路徑為${ROCKET_HOME}/store/config/topics.json中,這樣在Broker關閉后再重啟,并不會丟失路由信息。* */this.topicConfigManager = new TopicConfigManager(this);this.pullMessageProcessor = new PullMessageProcessor(this);this.pullRequestHoldService = new PullRequestHoldService(this);this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);this.consumerFilterManager = new ConsumerFilterManager(this);this.producerManager = new ProducerManager();this.clientHousekeepingService = new ClientHousekeepingService(this);this.broker2Client = new Broker2Client(this);this.subscriptionGroupManager = new SubscriptionGroupManager(this);this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);this.filterServerManager = new FilterServerManager(this);this.slaveSynchronize = new SlaveSynchronize(this);/** 初始化各種業務阻塞隊列* */this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());this.replyThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getReplyThreadPoolQueueCapacity());this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));/** 初始化快速失敗對象,并將brokerController自身傳入* */this.brokerFastFailure = new BrokerFastFailure(this);this.configuration = new Configuration(log,BrokerPathConfigHelper.getBrokerConfigPath(),this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig);}}
Broker的初始化邏輯:BrokerController#initialize()?
public class BrokerController {public boolean initialize() throws CloneNotSupportedException {boolean result = this.topicConfigManager.load();result = result && this.consumerOffsetManager.load();result = result && this.subscriptionGroupManager.load();result = result && this.consumerFilterManager.load();if (result) {try {this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,this.brokerConfig);if (messageStoreConfig.isEnableDLegerCommitLog()) {DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);}this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);//load pluginMessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);this.messageStore = MessageStoreFactory.build(context, this.messageStore);this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));} catch (IOException e) {result = false;log.error("Failed to initialize", e);}}result = result && this.messageStore.load();if (result) {this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);/** 猜測應該是,用多線程的模式,來調用SendMessageProcessor的processRequest()來完成單條消息或者批量消息的保存** 消息發送者向 Broker 發送消息寫入請求,* Broker 端在接收到請求后會首先放入一個隊列中(SendThreadPoolQueue),默認容量為 10000。* Broker 會專門使用一個線程池(SendMessageExecutor)去從隊列中獲取任務并執行消息寫入請求,為了保證消息的順序處理,該線程池默認線程個數為1。* */this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getSendMessageThreadPoolNums(),this.brokerConfig.getSendMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.sendThreadPoolQueue,new ThreadFactoryImpl("SendMessageThread_"));this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getPullMessageThreadPoolNums(),this.brokerConfig.getPullMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.pullThreadPoolQueue,new ThreadFactoryImpl("PullMessageThread_"));this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getProcessReplyMessageThreadPoolNums(),this.brokerConfig.getProcessReplyMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.replyThreadPoolQueue,new ThreadFactoryImpl("ProcessReplyMessageThread_"));this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getQueryMessageThreadPoolNums(),this.brokerConfig.getQueryMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.queryThreadPoolQueue,new ThreadFactoryImpl("QueryMessageThread_"));this.adminBrokerExecutor =Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl("AdminBrokerThread_"));this.clientManageExecutor = new ThreadPoolExecutor(this.brokerConfig.getClientManageThreadPoolNums(),this.brokerConfig.getClientManageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.clientManagerThreadPoolQueue,new ThreadFactoryImpl("ClientManageThread_"));this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getHeartbeatThreadPoolNums(),this.brokerConfig.getHeartbeatThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.heartbeatThreadPoolQueue,new ThreadFactoryImpl("HeartbeatThread_", true));this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getEndTransactionThreadPoolNums(),this.brokerConfig.getEndTransactionThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.endTransactionThreadPoolQueue,new ThreadFactoryImpl("EndTransactionThread_"));this.consumerManageExecutor =Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl("ConsumerManageThread_"));/** 注冊各種處理器* */this.registerProcessor();final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();final long period = 1000 * 60 * 60 * 24;this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.getBrokerStats().record();} catch (Throwable e) {log.error("schedule record error.", e);}}}, initialDelay, period, TimeUnit.MILLISECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.consumerOffsetManager.persist();} catch (Throwable e) {log.error("schedule persist consumerOffset error.", e);}}}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.consumerFilterManager.persist();} catch (Throwable e) {log.error("schedule persist consumer filter error.", e);}}}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.protectBroker();} catch (Throwable e) {log.error("protectBroker error.", e);}}}, 3, 3, TimeUnit.MINUTES);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.printWaterMark();} catch (Throwable e) {log.error("printWaterMark error.", e);}}}, 10, 1, TimeUnit.SECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());} catch (Throwable e) {log.error("schedule dispatchBehindBytes error.", e);}}}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);if (this.brokerConfig.getNamesrvAddr() != null) {this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());} else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.brokerOuterAPI.fetchNameServerAddr();} catch (Throwable e) {log.error("ScheduledTask fetchNameServerAddr exception", e);}}}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);}if (!messageStoreConfig.isEnableDLegerCommitLog()) {if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {if (this.messageStoreConfig.getHaMasterAddress() != null&& this.messageStoreConfig.getHaMasterAddress().length() >= 6) {this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());this.updateMasterHAServerAddrPeriodically = false;} else {this.updateMasterHAServerAddrPeriodically = true;}} else {// 定時打印master 與 slave的差距this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.printMasterAndSlaveDiff();} catch (Throwable e) {log.error("schedule printMasterAndSlaveDiff error.", e);}}}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);}}if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {// Register a listener to reload SslContexttry {fileWatchService = new FileWatchService(new String[] {TlsSystemConfig.tlsServerCertPath,TlsSystemConfig.tlsServerKeyPath,TlsSystemConfig.tlsServerTrustCertPath},new FileWatchService.Listener() {boolean certChanged, keyChanged = false;@Overridepublic void onChanged(String path) {if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {log.info("The trust certificate changed, reload the ssl context");reloadServerSslContext();}if (path.equals(TlsSystemConfig.tlsServerCertPath)) {certChanged = true;}if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {keyChanged = true;}if (certChanged && keyChanged) {log.info("The certificate and private key changed, reload the ssl context");certChanged = keyChanged = false;reloadServerSslContext();}}private void reloadServerSslContext() {((NettyRemotingServer) remotingServer).loadSslContext();((NettyRemotingServer) fastRemotingServer).loadSslContext();}});} catch (Exception e) {log.warn("FileWatchService created error, can't load the certificate dynamically");}}initialTransaction();initialAcl();initialRpcHooks();}return result;}
}
實際注冊的各種處理器
public class BrokerController {/*** broker在啟動時,給每種命令,注冊了對應的處理器* */public void registerProcessor() {/*** SendMessageProcessor*/SendMessageProcessor sendProcessor = new SendMessageProcessor(this);sendProcessor.registerSendMessageHook(sendMessageHookList);sendProcessor.registerConsumeMessageHook(consumeMessageHookList);/** 不同的RequestCode對應不同的動作** 這里就是,讓下面這幾個動作,都交給(sendProcessor, this.sendMessageExecutor)來執行* */this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);/*** PullMessageProcessor*/this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);/*** ReplyMessageProcessor*/ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this);replyMessageProcessor.registerSendMessageHook(sendMessageHookList);this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);/*** QueryMessageProcessor*/NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);/*** ClientManageProcessor*/ClientManageProcessor clientProcessor = new ClientManageProcessor(this);this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);/*** ConsumerManageProcessor** 負責處理,客戶端來broker端* 查詢消費進度* 更新消費進度**/ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);/*** EndTransactionProcessor*/this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);/*** Default*/AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);}}