源碼環境搭建
1、主要功能模塊
RocketMQ官方Git倉庫地址:GitHub - apache/rocketmq: Apache RocketMQ is a cloud native messaging and streaming platform, making it simple to build event-driven applications.
RocketMQ的官方網站下載:下載 | RocketMQ
重要模塊:
-
broker: Broker 模塊(broke 啟動進程)
-
client :消息客戶端,包含消息生產者、消息消費者相關類
-
example: RocketMQ 示例代碼
-
namesrv:NameServer模塊
-
store:消息存儲模塊
-
remoting:遠程訪問模塊
2、源碼啟動服務
將源碼導入IDEA后,需要先對源碼進行編譯。編譯指令clean install -Dmaven.test.skip=true
編譯完成后就可以開始調試代碼:
調試時,先在項目目錄下創建一個conf目錄,并從distribution
拷貝broker.conf
和logback_broker.xml
和logback_namesrv.xml
window10執行上面的編譯指令一直報錯A required class was missing while executing org.apache.maven.plugins:maven-remote-resources-plugin:1.5:process: org/apache/commons/collections/ExtendedProperties,最后在Linux執行才成功
unzip rocketmq-rocketmq-all-4.9.5.zip
cd rocketmq-rocketmq-all-4.9.5/
mvn clean install -Dmaven.test.skip=true
啟動nameServer
雖然編譯指令在windows執行報錯,但是nameSrv還是可以正常啟動的
如果啟動時報錯,并提示需要配置ROCKETMQ_HOME環境變量,可以在工程里面添加:
啟動Broker
啟動Broker之前,需要先修改之前復制的broker.conf文件
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH# 自動創建Topic
autoCreateTopicEnable=true
# nameServ地址
namesrvAddr=127.0.0.1:9876
# 存儲路徑
storePathRootDir=E:\\RocketMQ\\data\\rocketmq\\dataDir
# commitLog路徑
storePathCommitLog=E:\\RocketMQ\\data\\rocketmq\\dataDir\\commitlog
# 消息隊列存儲路徑
storePathConsumeQueue=E:\\RocketMQ\\data\\rocketmq\\dataDir\\consumequeue
# 消息索引存儲路徑
storePathIndex=E:\\RocketMQ\\data\\rocketmq\\dataDir\\index
# checkpoint文件路徑
storeCheckpoint=E:\\RocketMQ\\data\\rocketmq\\dataDir\\checkpoint
# abort文件存儲路徑
abortFile=E:\\RocketMQ\\data\\rocketmq\\dataDir\\abort
啟動Broker時,還需要配置一個-c 參數,指向broker.conf配置文件
發送消息
在源碼的example模塊下,提供了非常詳細的測試代碼。
啟動example模塊下的org.apache.rocketmq.example.quickstart.Producer類即可發送消息
在測試源碼中,需要指定NameServer地址。這個NameServer地址有兩種指定方式,一種是配置一個NAMESRV_ADDR的環境變量。另一種是在源碼中指定。
// 源碼中指定
producer.setNamesrvAddr("127.0.0.1:9876");
消費消息
啟動example模塊下的org.apache.rocketmq.example.quickstart.Consumer類來消費消息
consumer.setNamesrvAddr("192.168.232.128:9876");
3、讀源碼的方法
1、帶著問題讀源碼。一定要自己思考!
2、小步快走。不要覺得一兩遍就能讀懂源碼
3、分步總結。帶上自己的理解,及時總結。對各種擴展功能,嘗試驗證
對于RocketMQ,試著去理解源碼中的各種單元測試。
源碼熱身階段
NameServer的啟動過程
關注重點
RocketMQ集群中,實際上消息存儲、推送等核心功能點是Broker。NameServer的作用,和微服務中的注冊中心非常類似,只是提供了Broker端的服務注冊與發現功能。
源碼重點
NameServer的啟動入口類是org.apache.rocketmq.namesrv.NamesrvStartup。其中的核心是構建并啟動一個NamesrvController。這個Controller對象就跟MVC中的Controller是很類似的,都是響應客戶端的請求。只不過,他響應的是基于Netty的客戶端請求。
另外,他的實際啟動過程,其實可以配合NameServer的啟動腳本進行更深入的理解。
從NameServer啟動和關閉這兩個關鍵步驟,我們可以總結出NameServer的組件其實并不是很多,整個NameServer的結構是這樣的:
這兩個配置類就可以用來指導如何優化Nameserver的配置。比如,如何調整nameserver的端口?
可以看出,RocketMQ的整體源碼風格就是典型的MVC思想。Controller響應請求,Service處理業務,各種Table保存消息
部分源碼示例:
// NamesrvStartup#main
public static void main(String[] args) {main0(args);
}public static NamesrvController main0(String[] args) {try {NamesrvController controller = createNamesrvController(args);start(controller);String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();log.info(tip);System.out.printf("%s%n", tip);return controller;} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null;
}public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));//PackageConflictDetect.detectFastjson();Options options = ServerUtil.buildCommandlineOptions(new Options());commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());if (null == commandLine) {System.exit(-1);return null;}// 兩個配置類在這里final NamesrvConfig namesrvConfig = new NamesrvConfig();final NettyServerConfig nettyServerConfig = new NettyServerConfig();// 端口號簡單粗暴nettyServerConfig.setListenPort(9876);// -c指令指定配置文件,可以替換端口號。配置文件內容:listenPort=9876if (commandLine.hasOption('c')) {String file = commandLine.getOptionValue('c');if (file != null) {InputStream in = new BufferedInputStream(new FileInputStream(file));properties = new Properties();properties.load(in);MixAll.properties2Object(properties, namesrvConfig);MixAll.properties2Object(properties, nettyServerConfig);namesrvConfig.setConfigStorePath(file);System.out.printf("load config properties file OK, %s%n", file);in.close();}}if (commandLine.hasOption('p')) {InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);MixAll.printObjectProperties(console, namesrvConfig);MixAll.printObjectProperties(console, nettyServerConfig);System.exit(0);}... ...final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);// remember all configs to prevent discardcontroller.getConfiguration().registerConfig(properties);return controller;
}// NamesrvController#NamesrvController
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {this.namesrvConfig = namesrvConfig;this.nettyServerConfig = nettyServerConfig;this.kvConfigManager = new KVConfigManager(this);this.routeInfoManager = new RouteInfoManager();this.brokerHousekeepingService = new BrokerHousekeepingService(this);this.configuration = new Configuration(log,this.namesrvConfig, this.nettyServerConfig);this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}// NamesrvController#initialize
public boolean initialize() {this.kvConfigManager.load();this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);... ...
}
Broker服務啟動過程
關注重點
Broker是整個RocketMQ的業務核心。所有消息存儲、轉發這些重要的業務都是Broker進行處理。
重點梳理Broker有哪些內部服務。這些內部服務將是整理Broker核心業務流程的起點
源碼重點
Broker啟動的入口在BrokerStartup這個類,可以從他的main方法開始調試
啟動過程關鍵點:重點也是圍繞一個BrokerController對象,先創建,然后再啟動
// BrokerStartup#main
public static void main(String[] args) {start(createBrokerController(args));
}public static BrokerController createBrokerController(String[] args) {System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));try {//PackageConflictDetect.detectFastjson();Options options = ServerUtil.buildCommandlineOptions(new Options());commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),new PosixParser());if (null == commandLine) {System.exit(-1);}// Broker服務配置final BrokerConfig brokerConfig = new BrokerConfig();// Netty服務端占用了10911端口。同樣也可以在配置文件中覆蓋final NettyServerConfig nettyServerConfig = new NettyServerConfig();// Broker既要作為Netty服務端,向客戶端提供核心業務能力,又要作為Netty客戶端,向NameServer注冊心跳final NettyClientConfig nettyClientConfig = new NettyClientConfig();nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));nettyServerConfig.setListenPort(10911);// 消息存儲配置。 這兩個配置參數都可以在broker.conf文件中進行配置final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();... ...} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null;
}
BrokerConfig、NettyServerConfig、NettyClientConfig、MessageStoreConfig 這幾個配置是了解如何優化 RocketMQ 使用的關鍵
在BrokerController.start方法啟動了一大堆Broker的核心服務,部分源碼:
// BrokerController#start
public void start() throws Exception {if (this.messageStore != null) {//啟動核心的消息存儲組件this.messageStore.start();}if (this.remotingServer != null) {this.remotingServer.start();}if (this.fastRemotingServer != null) {//啟動兩個Netty服務this.fastRemotingServer.start();}if (this.brokerOuterAPI != null) {//啟動客戶端,往外發請求this.brokerOuterAPI.start();}... ...this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {//向NameServer注冊心跳BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {log.error("registerBrokerAll Exception", e);}}}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);if (this.brokerStatsManager != null) {this.brokerStatsManager.start();}if (this.brokerFastFailure != null) {//負責具體業務的功能組件this.brokerFastFailure.start();}
}
抽象出Broker的一個整體結構:
實際上,在應用中,可以通過producer.setSendMessageWithVIPChannel(true),讓少量比較重要的producer走VIP的通道。而在消費者端,也可以通過consumer.setVipChannelEnabled(true),讓消費者支持VIP通道的數據。
小試牛刀階段
開始理解一些比較簡單的業務邏輯
Netty服務注冊框架
關注重點
網絡通信服務是構建分布式應用的基礎,也是理解RocketMQ底層業務的基礎。
重點梳理RocketMQ的這個服務注冊框架,理解各個業務進程之間是如何進行RPC遠程通信的
Netty的所有遠程通信功能都由remoting模塊實現。RemotingServer模塊里包含了RPC的服務端RemotingServer以及客戶端RemotingClient。在RocketMQ中,NameServer主要是RPC的服務端RemotingServer,Broker對于客戶端來說,是RPC的服務端RemotingServer,而對于NameServer來說,又是RPC的客戶端。各種Client是RPC的客戶端RemotingClient。
RocketMQ基于Netty保持客戶端與服務端的長連接Channel。RemotingServer和RemotingClient都需要注冊自己的服務。
源碼重點
1、NameServer需要NettyServer。客戶端,Producer和Consumer,需要NettyClient。
2、所有的RPC請求數據都封裝成RemotingCommand對象。每個處理消息的服務邏輯,都會封裝成一個NettyRequestProcessor對象。
3、服務端和客戶端都維護一個processorTable,這是個HashMap。key是服務碼requestCode,value是對應的運行單元 Pair<NettyRequestProcessor,ExecutorService>類型,包含了處理Processor和執行線程的線程池。具體的Processor,由業務系統自行注冊。Broker服務注冊:BrokerController.registerProcessor(),客戶端的服務注冊:MQClientAPIImpl。NameServer則會注冊一個大的DefaultRequestProcessor,統一處理所有服務。
4、請求類型分為REQUEST和RESPONSE。這是為了支持異步的RPC調用。NettyServer處理完請求后,可以先緩存到responseTable中,等NettyClient下次來獲取,這樣就不用阻塞Channel了,可以提升請求吞吐量。
5、重點理解remoting包中是如何實現全流程異步化。
整體RPC框架流程:
RocketMQ使用Netty框架提供了一套基于服務碼的服務注冊機制,讓各種不同的組件都可以按照自己的需求,注冊自己的服務方法。RocketMQ的這一套服務注冊機制,是非常簡潔實用的。要開發一個大型的IM項目,要加減好友、發送文本,圖片,甚至紅包、維護群聊信息等等各種各樣的請求,這些請求如何封裝,就可以很好的參考這個框架。
關于RocketMQ的同步結果推送與異步結果推送
RocketMQ的RemotingServer服務端,會維護一個responseTable,這是一個線程同步的Map結構。 key為請求的ID,value是異步的消息結果。ConcurrentMap<Integer /* opaque */, ResponseFuture>
處理同步請求(NettyRemotingAbstract#invokeSyncImpl)時,處理的結果會存入responseTable,通過ResponseFuture提供一定的服務端異步處理支持,提升服務端的吞吐量。 請求返回后,立即從responseTable中移除請求記錄。
//NettyRemotingAbstract#invokeSyncImpl
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,final long timeoutMillis)throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {final int opaque = request.getOpaque();try {final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);this.responseTable.put(opaque, responseFuture);final SocketAddress addr = channel.remoteAddress();channel.writeAndFlush(request).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture f) throws Exception {if (f.isSuccess()) {responseFuture.setSendRequestOK(true);return;} else {responseFuture.setSendRequestOK(false);}responseTable.remove(opaque);responseFuture.setCause(f.cause());responseFuture.putResponse(null);log.warn("send a request command to channel <" + addr + "> failed.");}});RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);if (null == responseCommand) {if (responseFuture.isSendRequestOK()) {throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,responseFuture.getCause());} else {throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());}}return responseCommand;} finally {this.responseTable.remove(opaque);}
}//ResponseFuture#waitResponse
//實際上,同步也是通過異步實現的
//發送消息后,通過countDownLatch阻塞當前線程,造成同步等待的效果
public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);return this.responseCommand;
}//ResponseFuture#putResponse
//等待異步獲取到消息后,再通過countDownLatch釋放當前線程
public void putResponse(final RemotingCommand responseCommand) {this.responseCommand = responseCommand;this.countDownLatch.countDown();
}
處理異步請求(NettyRemotingAbstract#invokeAsyncImpl)時,處理的結果依然會存入responsTable,等待客戶端后續再來請求結果。但是他保存的依然是一個ResponseFuture,也就是在客戶端請求結果時再去獲取真正的結果。 另外,在RemotingServer啟動時,會啟動一個定時的線程任務,不斷掃描responseTable,將其中過期的response清除掉。
//NettyRemotingAbstract#invokeAsyncImpl
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,final InvokeCallback invokeCallback)throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {long beginStartTime = System.currentTimeMillis();final int opaque = request.getOpaque();boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);if (acquired) {final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);long costTime = System.currentTimeMillis() - beginStartTime;if (timeoutMillis < costTime) {once.release();throw new RemotingTimeoutException("invokeAsyncImpl call timeout");}final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);this.responseTable.put(opaque, responseFuture);try {channel.writeAndFlush(request).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture f) throws Exception {if (f.isSuccess()) {responseFuture.setSendRequestOK(true);return;}requestFail(opaque);log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));}});} catch (Exception e) {responseFuture.release();log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);}} else {if (timeoutMillis <= 0) {throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");} else {String info =String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",timeoutMillis,this.semaphoreAsync.getQueueLength(),this.semaphoreAsync.availablePermits());log.warn(info);throw new RemotingTimeoutException(info);}}
}//org.apache.rocketmq.remoting.netty.NettyRemotingServer
this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {NettyRemotingServer.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}
}, 1000 * 3, 1000);
Broker心跳注冊管理
關注重點
Broker會在啟動時向所有NameServer注冊自己的服務信息,并且會定時往NameServer發送心跳信息。而NameServer會維護Broker的路由列表,并對路由表進行實時更新。
源碼重點
Broker啟動后會立即發起向NameServer注冊心跳。方法入口:BrokerController.this.registerBrokerAll。 然后啟動一個定時任務,以10秒延遲,默認30秒的間隔持續向NameServer發送心跳。
NameServer內部會通過RouteInfoManager組件及時維護Broker信息。同時在NameServer啟動時,會啟動定時任務,掃描不活動的Broker。方法入口:NamesrvController.initialize方法。
極簡化的服務注冊發現流程
為什么RocketMQ要自己實現一個NameServer,而不用Zookeeper、Nacos這樣現成的注冊中心?
首先,依賴外部組件會對產品的獨立性形成侵入,不利于自己的版本演進。Kafka要拋棄Zookeeper就是一個先例。
另外,其實更重要的還是對業務的合理設計。NameServer之間不進行信息同步,而是依賴Broker端向所有NameServer同時發起注冊。這讓NameServer的服務可以非常輕量。
但是,要知道,這種極簡的設計,其實是以犧牲數據一致性為代價的。Broker往多個NameServer同時發起注冊,有可能部分NameServer注冊成功,而部分NameServer注冊失敗了。這樣,多個NameServer之間的數據是不一致的。作為注冊中心,這是不可接受的。但是對于RocketMQ,這又變得可以接受了。因為客戶端從NameServer上獲得的,只要有一個正常運行的Broker就可以了,并不需要完整的Broker列表。
Producer發送消息過程
關注重點
回顧Producer使用案例
Producer有兩種:
-
一種是普通發送者:DefaultMQProducer。只負責發送消息,發送完消息,就可以停止了。
-
另一種是事務消息發送者: TransactionMQProducer。支持事務消息機制。需要在事務消息過程中提供事務狀態確認的服務,這就要求事務消息發送者雖然是一個客戶端,但是也要完成整個事務消息的確認機制后才能退出。
事務消息機制后面將結合Broker進行整理分析。這一步暫不關注。這里只關注DefaultMQProducer的消息發送過程。
整個Producer的使用流程,大致分為兩個步驟:一是調用start方法,進行一大堆的準備工作。 二是各種send方法,進行消息發送。
重點關注以下幾個問題:
1、Producer啟動過程中啟動了哪些服務
2、Producer如何管理broker路由信息。 可以設想一下,如果Producer啟動了之后,NameServer掛了,那么Producer還能不能發送消息?
3、關于Producer的負載均衡。也就是Producer到底將消息發到哪個MessageQueue中。這里可以結合順序消息機制來理解一下。消息中那個莫名奇妙的MessageSelector到底是如何工作的。
源碼重點
-
Producer的核心啟動流程
所有Producer的啟動過程,最終都會調用到DefaultMQProducerImpl#start方法。在start方法中的通過一個mQClientFactory對象,啟動生產者的一大堆重要服務。
這里其實就是一種設計模式,雖然有很多種不同的客戶端,但是這些客戶端的啟動流程最終都是統一的,全是交由MQClientFactory對象來啟動。而不同之處在于這些客戶端在啟動過程中,按照服務端的要求注冊不同的信息。例如生產者注冊到producerTable,消費者注冊到consumerTable,管理控制端注冊到adminExtTable
-
發送消息的核心流程
1、發送消息時,會維護一個本地的topicPublishInfoTable緩存,DefaultMQProducer會盡量保證這個緩存數據是最新的。但是,如果NameServer掛了,那么DefaultMQProducer還是會基于這個本地緩存去找Broker。只要能找到Broker,還是可以正常發送消息到Broker的。
2、生產者如何找MessageQueue: 默認情況下,生產者是按照輪訓的方式,依次輪訓各個MessageQueue。但是如果某一次往一個Broker發送請求失敗后,下一次就會跳過這個Broker。
//org.apache.rocketmq.client.impl.producer.TopicPublishInfo
//如果進到這里lastBrokerName不為空,那么表示上一次向這個Broker發送消息是失敗的,這時就盡量不要再往這個Broker發送消息了。
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {if (lastBrokerName == null) {return selectOneMessageQueue();} else {for (int i = 0; i < this.messageQueueList.size(); i++) {int index = this.sendWhichQueue.incrementAndGet();int pos = Math.abs(index) % this.messageQueueList.size();if (pos < 0)pos = 0;MessageQueue mq = this.messageQueueList.get(pos);if (!mq.getBrokerName().equals(lastBrokerName)) {return mq;}}return selectOneMessageQueue();}
}
3、如果在發送消息時傳了Selector,那么Producer就不會走這個負載均衡的邏輯,而是會使用Selector去尋找一個隊列。 具體參見org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendSelectImpl 方法
//DefaultMQProducerImpl#sendSelectImpl
private SendResult sendSelectImpl(Message msg,MessageQueueSelector selector,Object arg,final CommunicationMode communicationMode,final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginStartTime = System.currentTimeMillis();this.makeSureStateOK();Validators.checkMessage(msg, this.defaultMQProducer);TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {MessageQueue mq = null;try {List<MessageQueue> messageQueueList =mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());Message userMessage = MessageAccessor.cloneMessage(msg);String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());userMessage.setTopic(userTopic);mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));} catch (Throwable e) {throw new MQClientException("select message queue threw exception.", e);}long costTime = System.currentTimeMillis() - beginStartTime;if (timeout < costTime) {throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");}if (mq != null) {return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);} else {throw new MQClientException("select message queue return null.", null);}}validateNameServerSetting();throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
}
Consumer拉取消息過程
關注重點
回顧消費者的幾個重點問題:
-
消費者也是有兩種,推模式消費者和拉模式消費者。優秀的MQ產品都會有一個高級的目標,就是要提升整個消息處理的性能。而要提升性能,服務端的優化手段往往不夠直接,最為直接的優化手段就是對消費者進行優化。所以在RocketMQ中,整個消費者的業務邏輯是非常復雜的,甚至某種程度上來說,比服務端更復雜,所以,在這里重點關注用得最多的推模式的消費者。
-
消費者組之間有集群模式和廣播模式兩種消費模式。就要了解下這兩種集群模式是如何做的邏輯封裝。
-
然后關注消費者端的負載均衡的原理。即消費者是如何綁定消費隊列的,那些消費策略到底是如何落地的。
-
最后關注在推模式的消費者中,MessageListenerConcurrently 和MessageListenerOrderly這兩種消息監聽器的處理邏輯到底有什么不同,為什么后者能保持消息順序。
源碼重點
Consumer的核心啟動過程和Producer是一樣的, 最終都是通過MQClientFactory對象啟動。不過之間添加了一些注冊信息。整體啟動過程:
廣播模式與集群模式的Offset處理
在DefaultMQPushConsumerImpl的start方法中,啟動了非常多的核心服務。 比如,對于廣播模式與集群模式的Offset處理
if (this.defaultMQPushConsumer.getOffsetStore() != null) {this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING:this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;case CLUSTERING:this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;default:break;}this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();
廣播模式是使用LocalFileOffsetStore,在Consumer本地保存Offset,而集群模式是使用RemoteBrokerOffsetStore,在Broker端遠程保存offset。而這兩種Offset的存儲方式,最終都是通過維護本地的offsetTable緩存來管理Offset。
Consumer與MessageQueue建立綁定關系
start方法中還一個比較重要的東西是給rebalanceImpl設定了一個AllocateMessageQueueStrategy,用來給Consumer分配MessageQueue的。
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
//Consumer負載均衡策略
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
AllocateMessageQueueStrategy就是用來給Consumer和MessageQueue之間建立一種對應關系的。也就是說,只要Topic當中的MessageQueue以及同一個ConsumerGroup中的Consumer實例都沒有變動,那么某一個Consumer實例只是消費固定的一個或多個MessageQueue上的消息,其他Consumer不會來搶這個Consumer對應的MessageQueue。
為什么要讓一個MessageQueue只能由同一個ConsumerGroup中的一個Consumer實例來消費?
因為Broker需要按照ConsumerGroup管理每個MessageQueue上的Offset,如果一個MessageQueue上有多個同屬一個ConsumerGroup的Consumer實例,他們的處理進度就會不一樣。這樣的話,Offset就亂套了。
順序消費與并發消費
在start方法中,啟動了consumerMessageService線程,進行消息拉取。
//Consumer中自行指定的回調函數
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {this.consumeOrderly = true;this.consumeMessageService =new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {this.consumeOrderly = false;this.consumeMessageService =new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}this.consumeMessageService.start();
Consumer通過registerMessageListener方法指定的回調函數,都被封裝成了ConsumerMessageService的子實現類。
而對于這兩個服務實現類的調用,會延續到DefaultMQPushConsumerImpl的pullCallback對象中。也就是Consumer每拉過來一批消息后,就向Broker提交下一個拉取消息的的請求。
這里也可以印證一個點,就是順序消息,只對異步消費也就是推模式有效。同步消費的拉模式是無法進行順序消費的。因為這個pullCallback對象,在拉模式的同步消費時,根本就沒有往下傳。
當然,這并不是說拉模式不能鎖定隊列進行順序消費,拉模式在Consumer端應用就可以指定從哪個隊列上拿消息。
PullCallback pullCallback = new PullCallback() {@Overridepublic void onSuccess(PullResult pullResult) {if (pullResult != null) {pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,subscriptionData);switch (pullResult.getPullStatus()) {case FOUND:... ...DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);
這里提交的,實際上是一個ConsumeRequest線程。而提交的這個ConsumeRequest線程,在兩個不同的ConsumerService中有不同的實現。
這其中,兩者最為核心的區別在于ConsumerMessageOrderlyService是鎖定了一個隊列,處理完了之后,再消費下一個隊列。
@Override
public void run() {... ...final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);synchronized (objLock) {... ...}
}
為什么給隊列加個鎖,就能保證順序消費呢?
從源碼中可以看到,Consumer提交請求時,都是往線程池里異步提交的請求。如果不加隊列鎖,那么就算Consumer提交針對同一個MessageQueue的拉取消息請求,這些請求都是異步執行,他們的返回順序是亂的,無法進行控制。給隊列加個鎖之后,就保證了針對同一個隊列的第二個請求,必須等第一個請求處理完了之后,釋放了鎖,才可以提交。這也是在異步情況下保證順序的基礎思路。
實際拉取消息還是通過PullMessageService完成的
start方法中,相當于對很多消費者的服務進行初始化,包括指定一些服務的實現類,以及啟動一些定時的任務線程,比如清理過期的請求緩存等。最后,會隨著mQClientFactory組件的啟動,啟動一個PullMessageService。實際的消息拉取都交由PullMesasgeService進行。
所謂消息推模式,其實還是通過Consumer拉消息實現的。
//org.apache.rocketmq.client.impl.consumer.PullMessageService
private void pullMessage(final PullRequest pullRequest) {final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if (consumer != null) {DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;impl.pullMessage(pullRequest);} else {log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);}
}
客戶端負載均衡管理總結
Producer負載均衡
Producer發送消息時,默認會輪詢目標Topic下的所有MessageQueue,并采用遞增取模的方式往不同的MessageQueue上發送消息,以達到讓消息平均落在不同的queue上的目的。而由于MessageQueue是分布在不同的Broker上的,所以消息也會發送到不同的broker。
Producer輪訓時,如果發現往某一個Broker上發送消息失敗了,那么下一次會盡量避免再往同一個Broker上發送消息。但是,如果你的應用場景允許發送消息長延遲,也可以給Producer設定setSendLatencyFaultEnable(true)。這樣對于某些Broker集群的網絡不是很好的環境,可以提高消息發送成功的幾率。
同時生產者在發送消息時,可以指定一個MessageQueueSelector。通過這個對象來將消息發送到自己指定的MessageQueue上。這樣可以保證消息局部有序。
Consumer負載均衡
Consumer也是以MessageQueue為單位來進行負載均衡。分為集群模式和廣播模式。
1、集群模式
在集群消費模式下,每條消息只需要投遞到訂閱這個topic的Consumer Group下的一個實例即可。RocketMQ采用主動拉取的方式拉取并消費消息,在拉取的時候需要明確指定拉取哪一條message queue。
而每當實例的數量有變更,都會觸發一次所有實例的負載均衡,這時候會按照queue的數量和實例的數量平均分配queue給每個實例。
每次分配時,都會將MessageQueue和消費者ID進行排序后,再用不同的分配算法進行分配。內置的分配的算法共有六種,分別對應AllocateMessageQueueStrategy下的六種實現類,可以在consumer中直接set來指定。默認情況下使用的是最簡單的平均分配策略。
-
AllocateMachineRoomNearby: 將同機房的Consumer和Broker優先分配在一起。
這個策略可以通過一個machineRoomResolver對象來定制Consumer和Broker的機房解析規則。然后還需要引入另外一個分配策略來對同機房的Broker和Consumer進行分配。一般也就用簡單的平均分配策略或者輪詢分配策略。
源碼中有測試代碼AllocateMachineRoomNearByTest。
-
AllocateMessageQueueAveragely:平均分配。將所有MessageQueue平均分給每一個消費者
-
AllocateMessageQueueAveragelyByCircle: 輪詢分配。輪流的給一個消費者分配一個MessageQueue。
-
AllocateMessageQueueByConfig: 不分配,直接指定一個messageQueue列表。類似于廣播模式,直接指定所有隊列。
-
AllocateMessageQueueByMachineRoom:按邏輯機房的概念進行分配。又是對BrokerName和ConsumerIdc有定制化的配置。
-
AllocateMessageQueueConsistentHash。源碼中有測試代碼AllocateMessageQueueConsitentHashTest。這個一致性哈希策略只需要指定一個虛擬節點數,是用的一個哈希環的算法,虛擬節點是為了讓Hash數據在環上分布更為均勻。
最常用的就是平均分配和輪訓分配了。
2、廣播模式
廣播模式下,每一條消息都會投遞給訂閱了Topic的所有消費者實例,所以也就沒有消息分配這一說。而在實現上,就是在Consumer分配Queue時,所有Consumer都分到所有的Queue。
廣播模式實現的關鍵是將消費者的消費偏移量不再保存到broker當中,而是保存到客戶端當中,由客戶端自行維護自己的消費偏移量。