RocketMQ-源碼架構

源碼環境搭建

1、主要功能模塊

RocketMQ官方Git倉庫地址:GitHub - apache/rocketmq: Apache RocketMQ is a cloud native messaging and streaming platform, making it simple to build event-driven applications.

RocketMQ的官方網站下載:下載 | RocketMQ

重要模塊:

  • broker: Broker 模塊(broke 啟動進程)

  • client :消息客戶端,包含消息生產者、消息消費者相關類

  • example: RocketMQ 示例代碼

  • namesrv:NameServer模塊

  • store:消息存儲模塊

  • remoting:遠程訪問模塊

2、源碼啟動服務

將源碼導入IDEA后,需要先對源碼進行編譯。編譯指令clean install -Dmaven.test.skip=true

編譯完成后就可以開始調試代碼:

調試時,先在項目目錄下創建一個conf目錄,并從distribution拷貝broker.conflogback_broker.xmllogback_namesrv.xml

window10執行上面的編譯指令一直報錯A required class was missing while executing org.apache.maven.plugins:maven-remote-resources-plugin:1.5:process: org/apache/commons/collections/ExtendedProperties,最后在Linux執行才成功

unzip rocketmq-rocketmq-all-4.9.5.zip
cd rocketmq-rocketmq-all-4.9.5/
mvn clean install -Dmaven.test.skip=true
啟動nameServer

雖然編譯指令在windows執行報錯,但是nameSrv還是可以正常啟動的

如果啟動時報錯,并提示需要配置ROCKETMQ_HOME環境變量,可以在工程里面添加:

啟動Broker

啟動Broker之前,需要先修改之前復制的broker.conf文件

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH# 自動創建Topic
autoCreateTopicEnable=true
# nameServ地址
namesrvAddr=127.0.0.1:9876
# 存儲路徑
storePathRootDir=E:\\RocketMQ\\data\\rocketmq\\dataDir
# commitLog路徑
storePathCommitLog=E:\\RocketMQ\\data\\rocketmq\\dataDir\\commitlog
# 消息隊列存儲路徑
storePathConsumeQueue=E:\\RocketMQ\\data\\rocketmq\\dataDir\\consumequeue
# 消息索引存儲路徑
storePathIndex=E:\\RocketMQ\\data\\rocketmq\\dataDir\\index
# checkpoint文件路徑
storeCheckpoint=E:\\RocketMQ\\data\\rocketmq\\dataDir\\checkpoint
# abort文件存儲路徑
abortFile=E:\\RocketMQ\\data\\rocketmq\\dataDir\\abort

啟動Broker時,還需要配置一個-c 參數,指向broker.conf配置文件

發送消息

在源碼的example模塊下,提供了非常詳細的測試代碼。

啟動example模塊下的org.apache.rocketmq.example.quickstart.Producer類即可發送消息

在測試源碼中,需要指定NameServer地址。這個NameServer地址有兩種指定方式,一種是配置一個NAMESRV_ADDR的環境變量。另一種是在源碼中指定。

// 源碼中指定
producer.setNamesrvAddr("127.0.0.1:9876");
消費消息

啟動example模塊下的org.apache.rocketmq.example.quickstart.Consumer類來消費消息

consumer.setNamesrvAddr("192.168.232.128:9876");

3、讀源碼的方法

1、帶著問題讀源碼。一定要自己思考!

2、小步快走。不要覺得一兩遍就能讀懂源碼

3、分步總結。帶上自己的理解,及時總結。對各種擴展功能,嘗試驗證

對于RocketMQ,試著去理解源碼中的各種單元測試。

源碼熱身階段

NameServer的啟動過程

關注重點

RocketMQ集群中,實際上消息存儲、推送等核心功能點是Broker。NameServer的作用,和微服務中的注冊中心非常類似,只是提供了Broker端的服務注冊與發現功能。

源碼重點

NameServer的啟動入口類是org.apache.rocketmq.namesrv.NamesrvStartup。其中的核心是構建并啟動一個NamesrvController。這個Controller對象就跟MVC中的Controller是很類似的,都是響應客戶端的請求。只不過,他響應的是基于Netty的客戶端請求。

另外,他的實際啟動過程,其實可以配合NameServer的啟動腳本進行更深入的理解。

從NameServer啟動和關閉這兩個關鍵步驟,我們可以總結出NameServer的組件其實并不是很多,整個NameServer的結構是這樣的:

這兩個配置類就可以用來指導如何優化Nameserver的配置。比如,如何調整nameserver的端口?

可以看出,RocketMQ的整體源碼風格就是典型的MVC思想。Controller響應請求,Service處理業務,各種Table保存消息

部分源碼示例:

// NamesrvStartup#main
public static void main(String[] args) {main0(args);
}public static NamesrvController main0(String[] args) {try {NamesrvController controller = createNamesrvController(args);start(controller);String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();log.info(tip);System.out.printf("%s%n", tip);return controller;} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null;
}public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));//PackageConflictDetect.detectFastjson();Options options = ServerUtil.buildCommandlineOptions(new Options());commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());if (null == commandLine) {System.exit(-1);return null;}// 兩個配置類在這里final NamesrvConfig namesrvConfig = new NamesrvConfig();final NettyServerConfig nettyServerConfig = new NettyServerConfig();// 端口號簡單粗暴nettyServerConfig.setListenPort(9876);// -c指令指定配置文件,可以替換端口號。配置文件內容:listenPort=9876if (commandLine.hasOption('c')) {String file = commandLine.getOptionValue('c');if (file != null) {InputStream in = new BufferedInputStream(new FileInputStream(file));properties = new Properties();properties.load(in);MixAll.properties2Object(properties, namesrvConfig);MixAll.properties2Object(properties, nettyServerConfig);namesrvConfig.setConfigStorePath(file);System.out.printf("load config properties file OK, %s%n", file);in.close();}}if (commandLine.hasOption('p')) {InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);MixAll.printObjectProperties(console, namesrvConfig);MixAll.printObjectProperties(console, nettyServerConfig);System.exit(0);}... ...final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);// remember all configs to prevent discardcontroller.getConfiguration().registerConfig(properties);return controller;
}// NamesrvController#NamesrvController
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {this.namesrvConfig = namesrvConfig;this.nettyServerConfig = nettyServerConfig;this.kvConfigManager = new KVConfigManager(this);this.routeInfoManager = new RouteInfoManager();this.brokerHousekeepingService = new BrokerHousekeepingService(this);this.configuration = new Configuration(log,this.namesrvConfig, this.nettyServerConfig);this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}// NamesrvController#initialize
public boolean initialize() {this.kvConfigManager.load();this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);... ...
}

Broker服務啟動過程

關注重點

Broker是整個RocketMQ的業務核心。所有消息存儲、轉發這些重要的業務都是Broker進行處理。

重點梳理Broker有哪些內部服務。這些內部服務將是整理Broker核心業務流程的起點

源碼重點

Broker啟動的入口在BrokerStartup這個類,可以從他的main方法開始調試

啟動過程關鍵點:重點也是圍繞一個BrokerController對象,先創建,然后再啟動

// BrokerStartup#main
public static void main(String[] args) {start(createBrokerController(args));
}public static BrokerController createBrokerController(String[] args) {System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));try {//PackageConflictDetect.detectFastjson();Options options = ServerUtil.buildCommandlineOptions(new Options());commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),new PosixParser());if (null == commandLine) {System.exit(-1);}// Broker服務配置final BrokerConfig brokerConfig = new BrokerConfig();// Netty服務端占用了10911端口。同樣也可以在配置文件中覆蓋final NettyServerConfig nettyServerConfig = new NettyServerConfig();// Broker既要作為Netty服務端,向客戶端提供核心業務能力,又要作為Netty客戶端,向NameServer注冊心跳final NettyClientConfig nettyClientConfig = new NettyClientConfig();nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));nettyServerConfig.setListenPort(10911);// 消息存儲配置。 這兩個配置參數都可以在broker.conf文件中進行配置final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();... ...} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null;
}

BrokerConfig、NettyServerConfig、NettyClientConfig、MessageStoreConfig 這幾個配置是了解如何優化 RocketMQ 使用的關鍵

在BrokerController.start方法啟動了一大堆Broker的核心服務,部分源碼:

// BrokerController#start
public void start() throws Exception {if (this.messageStore != null) {//啟動核心的消息存儲組件this.messageStore.start();}if (this.remotingServer != null) {this.remotingServer.start();}if (this.fastRemotingServer != null) {//啟動兩個Netty服務this.fastRemotingServer.start();}if (this.brokerOuterAPI != null) {//啟動客戶端,往外發請求this.brokerOuterAPI.start();}... ...this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {//向NameServer注冊心跳BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {log.error("registerBrokerAll Exception", e);}}}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);if (this.brokerStatsManager != null) {this.brokerStatsManager.start();}if (this.brokerFastFailure != null) {//負責具體業務的功能組件this.brokerFastFailure.start();}
}

抽象出Broker的一個整體結構:

實際上,在應用中,可以通過producer.setSendMessageWithVIPChannel(true),讓少量比較重要的producer走VIP的通道。而在消費者端,也可以通過consumer.setVipChannelEnabled(true),讓消費者支持VIP通道的數據。

小試牛刀階段

開始理解一些比較簡單的業務邏輯

Netty服務注冊框架

關注重點

網絡通信服務是構建分布式應用的基礎,也是理解RocketMQ底層業務的基礎。

重點梳理RocketMQ的這個服務注冊框架,理解各個業務進程之間是如何進行RPC遠程通信的

Netty的所有遠程通信功能都由remoting模塊實現。RemotingServer模塊里包含了RPC的服務端RemotingServer以及客戶端RemotingClient。在RocketMQ中,NameServer主要是RPC的服務端RemotingServer,Broker對于客戶端來說,是RPC的服務端RemotingServer,而對于NameServer來說,又是RPC的客戶端。各種Client是RPC的客戶端RemotingClient。

RocketMQ基于Netty保持客戶端與服務端的長連接Channel。RemotingServer和RemotingClient都需要注冊自己的服務。

源碼重點

1、NameServer需要NettyServer。客戶端,Producer和Consumer,需要NettyClient。

2、所有的RPC請求數據都封裝成RemotingCommand對象。每個處理消息的服務邏輯,都會封裝成一個NettyRequestProcessor對象。

3、服務端和客戶端都維護一個processorTable,這是個HashMap。key是服務碼requestCode,value是對應的運行單元 Pair<NettyRequestProcessor,ExecutorService>類型,包含了處理Processor和執行線程的線程池。具體的Processor,由業務系統自行注冊。Broker服務注冊:BrokerController.registerProcessor(),客戶端的服務注冊:MQClientAPIImpl。NameServer則會注冊一個大的DefaultRequestProcessor,統一處理所有服務。

4、請求類型分為REQUEST和RESPONSE。這是為了支持異步的RPC調用。NettyServer處理完請求后,可以先緩存到responseTable中,等NettyClient下次來獲取,這樣就不用阻塞Channel了,可以提升請求吞吐量。

5、重點理解remoting包中是如何實現全流程異步化。

整體RPC框架流程:

RocketMQ使用Netty框架提供了一套基于服務碼的服務注冊機制,讓各種不同的組件都可以按照自己的需求,注冊自己的服務方法。RocketMQ的這一套服務注冊機制,是非常簡潔實用的。要開發一個大型的IM項目,要加減好友、發送文本,圖片,甚至紅包、維護群聊信息等等各種各樣的請求,這些請求如何封裝,就可以很好的參考這個框架。

關于RocketMQ的同步結果推送與異步結果推送

RocketMQ的RemotingServer服務端,會維護一個responseTable,這是一個線程同步的Map結構。 key為請求的ID,value是異步的消息結果。ConcurrentMap<Integer /* opaque */, ResponseFuture>

處理同步請求(NettyRemotingAbstract#invokeSyncImpl)時,處理的結果會存入responseTable,通過ResponseFuture提供一定的服務端異步處理支持,提升服務端的吞吐量。 請求返回后,立即從responseTable中移除請求記錄。

//NettyRemotingAbstract#invokeSyncImpl
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,final long timeoutMillis)throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {final int opaque = request.getOpaque();try {final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);this.responseTable.put(opaque, responseFuture);final SocketAddress addr = channel.remoteAddress();channel.writeAndFlush(request).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture f) throws Exception {if (f.isSuccess()) {responseFuture.setSendRequestOK(true);return;} else {responseFuture.setSendRequestOK(false);}responseTable.remove(opaque);responseFuture.setCause(f.cause());responseFuture.putResponse(null);log.warn("send a request command to channel <" + addr + "> failed.");}});RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);if (null == responseCommand) {if (responseFuture.isSendRequestOK()) {throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,responseFuture.getCause());} else {throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());}}return responseCommand;} finally {this.responseTable.remove(opaque);}
}//ResponseFuture#waitResponse
//實際上,同步也是通過異步實現的
//發送消息后,通過countDownLatch阻塞當前線程,造成同步等待的效果
public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);return this.responseCommand;
}//ResponseFuture#putResponse
//等待異步獲取到消息后,再通過countDownLatch釋放當前線程
public void putResponse(final RemotingCommand responseCommand) {this.responseCommand = responseCommand;this.countDownLatch.countDown();
}

處理異步請求(NettyRemotingAbstract#invokeAsyncImpl)時,處理的結果依然會存入responsTable,等待客戶端后續再來請求結果。但是他保存的依然是一個ResponseFuture,也就是在客戶端請求結果時再去獲取真正的結果。 另外,在RemotingServer啟動時,會啟動一個定時的線程任務,不斷掃描responseTable,將其中過期的response清除掉。

//NettyRemotingAbstract#invokeAsyncImpl
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,final InvokeCallback invokeCallback)throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {long beginStartTime = System.currentTimeMillis();final int opaque = request.getOpaque();boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);if (acquired) {final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);long costTime = System.currentTimeMillis() - beginStartTime;if (timeoutMillis < costTime) {once.release();throw new RemotingTimeoutException("invokeAsyncImpl call timeout");}final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);this.responseTable.put(opaque, responseFuture);try {channel.writeAndFlush(request).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture f) throws Exception {if (f.isSuccess()) {responseFuture.setSendRequestOK(true);return;}requestFail(opaque);log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));}});} catch (Exception e) {responseFuture.release();log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);}} else {if (timeoutMillis <= 0) {throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");} else {String info =String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",timeoutMillis,this.semaphoreAsync.getQueueLength(),this.semaphoreAsync.availablePermits());log.warn(info);throw new RemotingTimeoutException(info);}}
}//org.apache.rocketmq.remoting.netty.NettyRemotingServer
this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {NettyRemotingServer.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}
}, 1000 * 3, 1000);

Broker心跳注冊管理

關注重點

Broker會在啟動時向所有NameServer注冊自己的服務信息,并且會定時往NameServer發送心跳信息。而NameServer會維護Broker的路由列表,并對路由表進行實時更新。

源碼重點

Broker啟動后會立即發起向NameServer注冊心跳。方法入口:BrokerController.this.registerBrokerAll。 然后啟動一個定時任務,以10秒延遲,默認30秒的間隔持續向NameServer發送心跳。

NameServer內部會通過RouteInfoManager組件及時維護Broker信息。同時在NameServer啟動時,會啟動定時任務,掃描不活動的Broker。方法入口:NamesrvController.initialize方法。

極簡化的服務注冊發現流程

為什么RocketMQ要自己實現一個NameServer,而不用Zookeeper、Nacos這樣現成的注冊中心?

首先,依賴外部組件會對產品的獨立性形成侵入,不利于自己的版本演進。Kafka要拋棄Zookeeper就是一個先例。

另外,其實更重要的還是對業務的合理設計。NameServer之間不進行信息同步,而是依賴Broker端向所有NameServer同時發起注冊。這讓NameServer的服務可以非常輕量。

但是,要知道,這種極簡的設計,其實是以犧牲數據一致性為代價的。Broker往多個NameServer同時發起注冊,有可能部分NameServer注冊成功,而部分NameServer注冊失敗了。這樣,多個NameServer之間的數據是不一致的。作為注冊中心,這是不可接受的。但是對于RocketMQ,這又變得可以接受了。因為客戶端從NameServer上獲得的,只要有一個正常運行的Broker就可以了,并不需要完整的Broker列表。

Producer發送消息過程

關注重點

回顧Producer使用案例

Producer有兩種:

  • 一種是普通發送者:DefaultMQProducer。只負責發送消息,發送完消息,就可以停止了。

  • 另一種是事務消息發送者: TransactionMQProducer。支持事務消息機制。需要在事務消息過程中提供事務狀態確認的服務,這就要求事務消息發送者雖然是一個客戶端,但是也要完成整個事務消息的確認機制后才能退出。

事務消息機制后面將結合Broker進行整理分析。這一步暫不關注。這里只關注DefaultMQProducer的消息發送過程。

整個Producer的使用流程,大致分為兩個步驟:一是調用start方法,進行一大堆的準備工作。 二是各種send方法,進行消息發送。

重點關注以下幾個問題:

1、Producer啟動過程中啟動了哪些服務

2、Producer如何管理broker路由信息。 可以設想一下,如果Producer啟動了之后,NameServer掛了,那么Producer還能不能發送消息?

3、關于Producer的負載均衡。也就是Producer到底將消息發到哪個MessageQueue中。這里可以結合順序消息機制來理解一下。消息中那個莫名奇妙的MessageSelector到底是如何工作的。

源碼重點
  • Producer的核心啟動流程

所有Producer的啟動過程,最終都會調用到DefaultMQProducerImpl#start方法。在start方法中的通過一個mQClientFactory對象,啟動生產者的一大堆重要服務。

這里其實就是一種設計模式,雖然有很多種不同的客戶端,但是這些客戶端的啟動流程最終都是統一的,全是交由MQClientFactory對象來啟動。而不同之處在于這些客戶端在啟動過程中,按照服務端的要求注冊不同的信息。例如生產者注冊到producerTable,消費者注冊到consumerTable,管理控制端注冊到adminExtTable

  • 發送消息的核心流程

1、發送消息時,會維護一個本地的topicPublishInfoTable緩存,DefaultMQProducer會盡量保證這個緩存數據是最新的。但是,如果NameServer掛了,那么DefaultMQProducer還是會基于這個本地緩存去找Broker。只要能找到Broker,還是可以正常發送消息到Broker的。

2、生產者如何找MessageQueue: 默認情況下,生產者是按照輪訓的方式,依次輪訓各個MessageQueue。但是如果某一次往一個Broker發送請求失敗后,下一次就會跳過這個Broker。

//org.apache.rocketmq.client.impl.producer.TopicPublishInfo
//如果進到這里lastBrokerName不為空,那么表示上一次向這個Broker發送消息是失敗的,這時就盡量不要再往這個Broker發送消息了。
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {if (lastBrokerName == null) {return selectOneMessageQueue();} else {for (int i = 0; i < this.messageQueueList.size(); i++) {int index = this.sendWhichQueue.incrementAndGet();int pos = Math.abs(index) % this.messageQueueList.size();if (pos < 0)pos = 0;MessageQueue mq = this.messageQueueList.get(pos);if (!mq.getBrokerName().equals(lastBrokerName)) {return mq;}}return selectOneMessageQueue();}
}

3、如果在發送消息時傳了Selector,那么Producer就不會走這個負載均衡的邏輯,而是會使用Selector去尋找一個隊列。 具體參見org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendSelectImpl 方法

//DefaultMQProducerImpl#sendSelectImpl
private SendResult sendSelectImpl(Message msg,MessageQueueSelector selector,Object arg,final CommunicationMode communicationMode,final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginStartTime = System.currentTimeMillis();this.makeSureStateOK();Validators.checkMessage(msg, this.defaultMQProducer);TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {MessageQueue mq = null;try {List<MessageQueue> messageQueueList =mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());Message userMessage = MessageAccessor.cloneMessage(msg);String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());userMessage.setTopic(userTopic);mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));} catch (Throwable e) {throw new MQClientException("select message queue threw exception.", e);}long costTime = System.currentTimeMillis() - beginStartTime;if (timeout < costTime) {throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");}if (mq != null) {return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);} else {throw new MQClientException("select message queue return null.", null);}}validateNameServerSetting();throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
}

Consumer拉取消息過程

關注重點

回顧消費者的幾個重點問題:

  • 消費者也是有兩種,推模式消費者和拉模式消費者。優秀的MQ產品都會有一個高級的目標,就是要提升整個消息處理的性能。而要提升性能,服務端的優化手段往往不夠直接,最為直接的優化手段就是對消費者進行優化。所以在RocketMQ中,整個消費者的業務邏輯是非常復雜的,甚至某種程度上來說,比服務端更復雜,所以,在這里重點關注用得最多的推模式的消費者。

  • 消費者組之間有集群模式和廣播模式兩種消費模式。就要了解下這兩種集群模式是如何做的邏輯封裝。

  • 然后關注消費者端的負載均衡的原理。即消費者是如何綁定消費隊列的,那些消費策略到底是如何落地的。

  • 最后關注在推模式的消費者中,MessageListenerConcurrently 和MessageListenerOrderly這兩種消息監聽器的處理邏輯到底有什么不同,為什么后者能保持消息順序。

源碼重點

Consumer的核心啟動過程和Producer是一樣的, 最終都是通過MQClientFactory對象啟動。不過之間添加了一些注冊信息。整體啟動過程:

廣播模式與集群模式的Offset處理

在DefaultMQPushConsumerImpl的start方法中,啟動了非常多的核心服務。 比如,對于廣播模式與集群模式的Offset處理

if (this.defaultMQPushConsumer.getOffsetStore() != null) {this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING:this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;case CLUSTERING:this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;default:break;}this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();

廣播模式是使用LocalFileOffsetStore,在Consumer本地保存Offset,而集群模式是使用RemoteBrokerOffsetStore,在Broker端遠程保存offset。而這兩種Offset的存儲方式,最終都是通過維護本地的offsetTable緩存來管理Offset。

Consumer與MessageQueue建立綁定關系

start方法中還一個比較重要的東西是給rebalanceImpl設定了一個AllocateMessageQueueStrategy,用來給Consumer分配MessageQueue的。

this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
//Consumer負載均衡策略
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());

AllocateMessageQueueStrategy就是用來給Consumer和MessageQueue之間建立一種對應關系的。也就是說,只要Topic當中的MessageQueue以及同一個ConsumerGroup中的Consumer實例都沒有變動,那么某一個Consumer實例只是消費固定的一個或多個MessageQueue上的消息,其他Consumer不會來搶這個Consumer對應的MessageQueue。

為什么要讓一個MessageQueue只能由同一個ConsumerGroup中的一個Consumer實例來消費?

因為Broker需要按照ConsumerGroup管理每個MessageQueue上的Offset,如果一個MessageQueue上有多個同屬一個ConsumerGroup的Consumer實例,他們的處理進度就會不一樣。這樣的話,Offset就亂套了。

順序消費與并發消費

在start方法中,啟動了consumerMessageService線程,進行消息拉取。

//Consumer中自行指定的回調函數
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {this.consumeOrderly = true;this.consumeMessageService =new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {this.consumeOrderly = false;this.consumeMessageService =new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}this.consumeMessageService.start();

Consumer通過registerMessageListener方法指定的回調函數,都被封裝成了ConsumerMessageService的子實現類。

而對于這兩個服務實現類的調用,會延續到DefaultMQPushConsumerImpl的pullCallback對象中。也就是Consumer每拉過來一批消息后,就向Broker提交下一個拉取消息的的請求。

這里也可以印證一個點,就是順序消息,只對異步消費也就是推模式有效。同步消費的拉模式是無法進行順序消費的。因為這個pullCallback對象,在拉模式的同步消費時,根本就沒有往下傳。

當然,這并不是說拉模式不能鎖定隊列進行順序消費,拉模式在Consumer端應用就可以指定從哪個隊列上拿消息。

PullCallback pullCallback = new PullCallback() {@Overridepublic void onSuccess(PullResult pullResult) {if (pullResult != null) {pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,subscriptionData);switch (pullResult.getPullStatus()) {case FOUND:... ...DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);

這里提交的,實際上是一個ConsumeRequest線程。而提交的這個ConsumeRequest線程,在兩個不同的ConsumerService中有不同的實現。

這其中,兩者最為核心的區別在于ConsumerMessageOrderlyService是鎖定了一個隊列,處理完了之后,再消費下一個隊列。

@Override
public void run() {... ...final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);synchronized (objLock) {... ...}
}

為什么給隊列加個鎖,就能保證順序消費呢?

從源碼中可以看到,Consumer提交請求時,都是往線程池里異步提交的請求。如果不加隊列鎖,那么就算Consumer提交針對同一個MessageQueue的拉取消息請求,這些請求都是異步執行,他們的返回順序是亂的,無法進行控制。給隊列加個鎖之后,就保證了針對同一個隊列的第二個請求,必須等第一個請求處理完了之后,釋放了鎖,才可以提交。這也是在異步情況下保證順序的基礎思路。

實際拉取消息還是通過PullMessageService完成的

start方法中,相當于對很多消費者的服務進行初始化,包括指定一些服務的實現類,以及啟動一些定時的任務線程,比如清理過期的請求緩存等。最后,會隨著mQClientFactory組件的啟動,啟動一個PullMessageService。實際的消息拉取都交由PullMesasgeService進行。

所謂消息推模式,其實還是通過Consumer拉消息實現的。

//org.apache.rocketmq.client.impl.consumer.PullMessageService
private void pullMessage(final PullRequest pullRequest) {final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if (consumer != null) {DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;impl.pullMessage(pullRequest);} else {log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);}
}

客戶端負載均衡管理總結

Producer負載均衡

Producer發送消息時,默認會輪詢目標Topic下的所有MessageQueue,并采用遞增取模的方式往不同的MessageQueue上發送消息,以達到讓消息平均落在不同的queue上的目的。而由于MessageQueue是分布在不同的Broker上的,所以消息也會發送到不同的broker。

Producer輪訓時,如果發現往某一個Broker上發送消息失敗了,那么下一次會盡量避免再往同一個Broker上發送消息。但是,如果你的應用場景允許發送消息長延遲,也可以給Producer設定setSendLatencyFaultEnable(true)。這樣對于某些Broker集群的網絡不是很好的環境,可以提高消息發送成功的幾率。

同時生產者在發送消息時,可以指定一個MessageQueueSelector。通過這個對象來將消息發送到自己指定的MessageQueue上。這樣可以保證消息局部有序。

Consumer負載均衡

Consumer也是以MessageQueue為單位來進行負載均衡。分為集群模式和廣播模式。

1、集群模式

在集群消費模式下,每條消息只需要投遞到訂閱這個topic的Consumer Group下的一個實例即可。RocketMQ采用主動拉取的方式拉取并消費消息,在拉取的時候需要明確指定拉取哪一條message queue。

而每當實例的數量有變更,都會觸發一次所有實例的負載均衡,這時候會按照queue的數量和實例的數量平均分配queue給每個實例。

每次分配時,都會將MessageQueue和消費者ID進行排序后,再用不同的分配算法進行分配。內置的分配的算法共有六種,分別對應AllocateMessageQueueStrategy下的六種實現類,可以在consumer中直接set來指定。默認情況下使用的是最簡單的平均分配策略。

  • AllocateMachineRoomNearby: 將同機房的Consumer和Broker優先分配在一起。

這個策略可以通過一個machineRoomResolver對象來定制Consumer和Broker的機房解析規則。然后還需要引入另外一個分配策略來對同機房的Broker和Consumer進行分配。一般也就用簡單的平均分配策略或者輪詢分配策略。

源碼中有測試代碼AllocateMachineRoomNearByTest。

  • AllocateMessageQueueAveragely:平均分配。將所有MessageQueue平均分給每一個消費者

  • AllocateMessageQueueAveragelyByCircle: 輪詢分配。輪流的給一個消費者分配一個MessageQueue。

  • AllocateMessageQueueByConfig: 不分配,直接指定一個messageQueue列表。類似于廣播模式,直接指定所有隊列。

  • AllocateMessageQueueByMachineRoom:按邏輯機房的概念進行分配。又是對BrokerName和ConsumerIdc有定制化的配置。

  • AllocateMessageQueueConsistentHash。源碼中有測試代碼AllocateMessageQueueConsitentHashTest。這個一致性哈希策略只需要指定一個虛擬節點數,是用的一個哈希環的算法,虛擬節點是為了讓Hash數據在環上分布更為均勻。

最常用的就是平均分配和輪訓分配了。

2、廣播模式

廣播模式下,每一條消息都會投遞給訂閱了Topic的所有消費者實例,所以也就沒有消息分配這一說。而在實現上,就是在Consumer分配Queue時,所有Consumer都分到所有的Queue。

廣播模式實現的關鍵是將消費者的消費偏移量不再保存到broker當中,而是保存到客戶端當中,由客戶端自行維護自己的消費偏移量。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/214666.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/214666.shtml
英文地址,請注明出處:http://en.pswp.cn/news/214666.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

現在多種數據庫的讀寫模型對比

目錄 mongDB read write ES read write MySql write 總結 mongDB 3.0 版本后的WiredTiger存儲引擎 read 1. 應用通過driver 發起Buffer I/O讀操作&#xff0c;由操作系統將磁盤數據頁加載到文件系統的頁緩存區 2. 引擎層讀取頁緩沖區的數據&#xff0c;進行解壓后放…

C++STL算法庫中謂詞的使用

什么是c的謂詞 謂詞概念&#xff1a; 謂詞函數是一個判斷式&#xff0c;一個返回bool值的函數或者仿函數&#xff0c;有幾個入參就是幾元謂詞。一般做一個函數的參數使用【引用自百度百科】。 常見的可以作為謂詞的東西&#xff1a;函數、函數指針、函數對象、lambda表達式&am…

2023 年浙江省職業院校技能大賽信息安全管理與評估賽項規程

*2023 年浙江省職業院校技能大賽“高職組”* *“信息安全管理與評估”賽項規程* *一、賽項名稱* 賽項名稱&#xff1a;信息安全管理與評估 英文名稱&#xff1a;Information Security Management and Evaluation 賽項組別&#xff1a;高職 賽項歸屬產業&#xff1a;電子信…

熱電廠發電機組常見故障及預測性維護方法

熱電廠的發電機組是關鍵的能源生產設備&#xff0c;在電力供應中扮演著關鍵角色。但經過長期運行和高負荷工作&#xff0c;一旦發生故障&#xff0c;可能導致停機、設備損壞甚至引發嚴重事故。因此&#xff0c;實施有效的預測性維護方法對于確保發電機組的穩定運行至關重要。本…

Linux(17):認識與分析登錄檔

什么是登錄檔 【詳細而確實的分析以及備份系統的登錄文件】是一個系統管理員應該要進行的任務之一。 登錄檔 就是記錄系統活動信息的幾個文件&#xff0c;例如&#xff1a;何時、何地(來源IP)、何人(什么服務名稱)、做了什么動作(訊息登錄啰)。 換句話說就是&#xff1a;記錄系…

【MySQL】:表的操作

表的操作 一.創建表二.查看表結構三.修改表四.刪除表 一.創建表 field 表示列名。 datatype 表示列的類型。 character set 字符集&#xff0c;如果沒有指定字符集&#xff0c;則以所在數據庫的字符集為準。 collate 校驗規則&#xff0c;如果沒有指定校驗規則&#xff0c;則以…

MySQL系列(二)——日志篇

MySQL日志 主要包括錯誤日志、查詢日志、慢查詢日志、事務日志、二進制日志幾大類。其中&#xff0c;比較重要的還要屬二進制日志binlog&#xff08;歸檔日志&#xff09;和事務日志redo log&#xff08;重做日志&#xff09;和undo log&#xff08;回滾日志&#xff09;。 今…

windows批處理腳本(.bat)如何激活Anconda Prompt虛擬環境

通過call 來調用激活腳本&#xff0c; activate myenv指的是要激活的環境&#xff0c;若省略&#xff0c;則激活的是base環境。 call : 從另一個批處理程序調用一個批處理程序&#xff0c;而不停止父批處理程序。 call C:\ProgramData\Anaconda3\Scripts\activate.bat activate…

fastdds共享內存實現原理

fastdds 共享內存分兩個部分&#xff0c;一部分用于保存數據&#xff0c;一部分用于通信。 fastrtps_“UUID”:共享內存包括又兩部分數據&#xff0c;BufferNode和segment_size, 用配置文件port_queue_capacity_指定BufferNode的數量&#xff0c;segment_size用于保存實際傳輸的…

imp導入數據發現的

遷移歷史數據到歷史庫&#xff0c;因為災備數據中心使用的DG&#xff0c;無法使用數據泵&#xff0c;只能通過exp導出&#xff0c;然后再通過imp導入 為防止undo表空間壓力過大&#xff0c;在導入時imp使用了commit參數及buffer參數 這次導入數據量達到1TB&#xff0c;剛到了1/…

智物發布MT6877平臺無線AR智能眼鏡參考設計,推動下一代無線AR發展

隨著增強現實(AR)技術的不斷發展&#xff0c;有線AR眼鏡在連接和使用方面存在一些限制。為了解決這些問題&#xff0c;無線AR智能眼鏡的推出勢在必行。 新一代無線AR智能眼鏡采用了天璣900&#xff08;MT6877&#xff09;平臺作為參考設計&#xff0c;搭載了2.4GHz的八核處理器…

【rabbitMQ】Exchanges交換機

上一篇&#xff1a;springboot整合rabbitMQ模擬簡單收發消息 https://blog.csdn.net/m0_67930426/article/details/134904766 本篇代碼基于上一篇繼續寫 目錄 Fanout 交換機 1. add queue 2. add Exchange 3.綁定隊列 Direct 交換機 1. add queue 2. add Exchange 3.…

011 數據結構_哈希

前言 本文將會向你介紹哈希概念&#xff0c;哈希方法&#xff0c;如何解決哈希沖突&#xff0c;以及閉散列與開散列的模擬實現 1. 哈希概念 順序結構以及平衡樹中&#xff0c;元素關鍵碼與其存儲位置之間沒有對應的關系&#xff0c;因此在查找一個元素時&#xff0c;必須要經…

CyclicBarrier、CountDownLatch、Semaphore 的用法

CyclicBarrier、CountDownLatch、Semaphore 的用法 CountDownLatch&#xff08;線程計數器 &#xff09; CountDownLatch 類位于 java.util.concurrent 包下&#xff0c;利用它可以實現類似計數器的功能。比如有一個任務 A&#xff0c;它要等待其他 4 個任務執行完畢之后才能執…

數據結構與算法-Rust 版讀書筆記-2線性數據結構-隊列

數據結構與算法-Rust 版讀書筆記-2線性數據結構-隊列 1、隊列&#xff1a;先進先出 隊列是項的有序集合&#xff0c;其中&#xff0c;添加新項的一端稱為隊尾&#xff0c;移除項的另一端稱為隊首。一個元素在從隊尾進入隊列后&#xff0c;就會一直向隊首移動&#xff0c;直到…

鴻蒙原生應用再添新丁!同花順入局鴻蒙

鴻蒙原生應用再添新丁&#xff01;同花順入局鴻蒙 來自 HarmonyOS 微博12月11日消息&#xff0c;同花順已完成#鴻蒙原生應用#beta版本&#xff0c;并正在進行全量版本開發&#xff0c;進一步豐富了#鴻蒙原生應用#的覆蓋領域。同花順作為股民和券商首選的一站式金融理財服務平臺…

擴展學習|商業智能和分析:從大數據到大影響

文獻來源&#xff1a;Chen H, Chiang R H L, Storey V C. Business intelligence and analytics: From big data to big impact[J]. MIS quarterly, 2012: 1165-1188. 下載鏈接&#xff1a;https://pan.baidu.com/s/1JoHcTbwdc1TPGnwXsL4kIA 提取碼&#xff1a;a8uy 在不同的組…

MySQL忘記密碼

根據提供的引用內容&#xff0c;當使用root用戶登錄MySQL時&#xff0c;如果密碼錯誤&#xff0c;會出現"Access denied for user ‘root’‘localhost’ (using password: NO)"的錯誤提示。這個錯誤提示表示使用了錯誤的密碼或者沒有輸入密碼就嘗試登錄MySQL。解決這…

SQL命令---查看數據庫表

介紹 使用sql命令查看數據表。 命令 show create table 表名\G;\G&#xff1a;使顯示結果整齊美觀。

Vue-第七天

智慧商城項目&#xff1a; 1.創建項目選項&#xff1a; 2.調整&#xff1a; 主要是增加兩個文件夾&#xff0c;刪除倒是沒什么 3.組件庫&#xff08;vant-ui&#xff09;&#xff1a; 點擊進入官網:Vant 2 - Mobile UI Components built on Vue 4.導入&#xff1a; 全部導入…