目錄
前言
準備
消息發送方式
深入源碼
消息發送模式
選擇發送方式
同步發送消息
校驗消息體
獲取Topic訂閱信息
高級特性-消息重投
選擇消息隊列-負載均衡
裝載消息體發送消息
壓縮消息內容
構造發送message的請求的Header
更新broker故障信息
異步發送消息
總結
前言
上一篇我們已經對RocketMq生產者啟動源碼進行了學習《從零開始讀RocketMq源碼(一)生產者啟動》那么本篇我們將對生產者發送消息的源碼進行學習
準備
如果沒看前一篇的,這里還是要強調本篇的rocketmq版本
首先我們從github上拉取rocketmqd的源碼鏈接到本地,使用idea打開。
源碼地址:https://github.com/apache/rocketmq
目前最新版本為:5.2.0
那么我們在idea上切換分支為 release-5.2.0
注:請保持和本篇的版本一直,方便后面文章中給出的代碼塊定位
消息發送方式
在讀源碼之前我們先了解下mq支持的發送消息的類型。
消息的發送方式有三種,但我們最常用的是同步的方式發送
- sync 同步:消息發送后,必須等待消息的發送結果返回后,才能發送下一條消息
- async 異步:消息發送后,不用等待返回結果,直接發送下一條數據,但會設置一個回調方法接收返回結果
- oneway 單向:消息發送后,不會返回結果,也不會等待,也不會設置回調方法。適用場景日志收集、監控數據和快速通知等對可靠性要求不高但需要高性能的場景
深入源碼
首先進入外層的producer.send()方法中
//源碼位置:
//包名:org.apache.rocketmq.example.simple
//文件名:Producer
//行數:42
SendResult sendResult = producer.send(msg);
消息發送模式
//源碼位置:
//包名:org.apache.rocketmq.client.producer
//文件名:DefaultMQProducer
//行數:431
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {msg.setTopic(withNamespace(msg.getTopic()));//批量發送if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {return sendByAccumulator(msg, null, null);} else {//單條發送return sendDirect(msg, null, null);}
}
- 自動批處理發送 -sendByAccumulator()
- 該方法用于將消息累積到一個批處理容器中,等待足夠的消息數量或達到某個時間間隔后,再進行批量發送。
- 可以顯著減少發送次數,提高吞吐量。
? ? ?2. 直接發送 -sendDirect()
- 適用于即時發送或消息已經是批處理消息的情況
本章的重點就是直接發送消息,這也是開發中使用最頻發的方式
選擇發送方式
//源碼位置:
//包名:org.apache.rocketmq.client.producer
//文件名:DefaultMQProducer
//行數:720
public SendResult sendDirect(Message msg, MessageQueue mq,SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {// send in sync modeif (sendCallback == null) {if (mq == null) {//同步不指定隊列return this.defaultMQProducerImpl.send(msg);} else {//同步指定隊列return this.defaultMQProducerImpl.send(msg, mq);}} else {if (mq == null) {//異步不指定隊列this.defaultMQProducerImpl.send(msg, sendCallback);} else {//異步指定隊列this.defaultMQProducerImpl.send(msg, mq, sendCallback);}return null;}
}
有上面代碼可以知道,方法中提供了三個參數設置:
- msg :消息體,這個為必填項
- sendCallback :消息回調對象,如果這個參數不為空,則為異步發送,為空則為同步發送
- mq :指定的隊列(指定與不指定的區別在于后續是否需要對隊列負載均衡,下面源碼中會講到)
根據最開始生產者發送消息,我們只傳入了msg,所以本次重點看同步不指定隊列代碼實現
同步發送消息
//源碼位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行數:1525
public SendResult send(Message msg,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}
跟蹤代碼我們可以看到,方法中我們默認設置了CommunicationMode.SYNC 同步發送模式,并且回調參數為空,以及設置了默認超時時間3s
校驗消息體
//源碼位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行數:704
Validators.checkMessage(msg, this.defaultMQProducer);
該方法就是校驗消息內容是否合規
- 校驗消息內容是否不為空,消息大小是否超過最大值maxMessageSize = 1024 * 1024 * 4; // 4M
- 校驗消息發送的topic是否為不為空,以及topic的長度是否超過默認最長值127
獲取Topic訂閱信息
//源碼位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行數:709
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
該方法通過消息體中的topic名稱獲取topic的訂閱信息,該方法在我們上一篇生產者啟動中已經出現過了,深入方法內部其實就是先從本地topicPublishInfoTable map中獲取數據,沒有則從遠程nameserver中拉取
高級特性-消息重投
這是rocketMq中一個重要的特性,消息如果投遞失敗了,會重新投遞
//源碼位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行數:715
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
這段代碼就是獲取總過重投的次數:
不難看出,只有發送方式為同步發送時才為1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() =3次,其余發送方式都只有一次機會。
只有同步發送消息才支持消息重投,如果第一次投遞失敗了,mq還回重試2次投遞
找到上面源碼位置往下看,其實可以看到下面代碼就是使用了一個for循環來進行重投
選擇消息隊列-負載均衡
通過上面我們知道,最開始并沒有指定隊列,所以需要程序來獲取一個隊列。
//源碼位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行數:724
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName, resetIndex);
因為自動創建的topic,會被默認分配4個隊列(生產環境為手動創建topic以及設置隊列數量),所以我們必須使用負載均衡保證隊列的合理分配到不同隊列上,減輕單個隊列的壓力
- topicPublishInfo:為消息發送到指定topic的訂閱信息
- lastBrokerName :為上一次選擇的broker名稱(如果在集群模式下,topic也會存在于多個broker上,因此記錄上一次選擇的broker名稱可以避免連續選擇同一個 Broker,從而實現更好的負載均衡和容錯處理 )
- resetIndex :重置隊列索引位置(根據源碼邏輯可知,當消息進行重新投遞時會重置topic訂閱消息中隊列的索引位置)
深入上面源碼會發現,隊列負載均衡的算法獲取索引策略默認就是輪詢
//源碼位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:TopicPublishInfo
//行數:101
int index = Math.abs(sendQueue.incrementAndGet() % messageQueueList.size());
負載均衡策略
- 輪詢策略 (Round-Robin)
- 隨機策略 (Random)
- 一致性哈希策略 (Consistent Hashing)
- 權重隨機策略 (Weighted Random)
- 最少連接策略 (Least Connections)
裝載消息體發送消息
//源碼位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行數:740
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
該方法就是發現消息的核心方法了,不管是同步發送還是異步發送都會執行該方法
做一些發送消息前的準備,接下深入該方法查看
壓縮消息內容
//源碼位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行數:898
if (this.tryToCompressMessage(msg)) {sysFlag |= MessageSysFlag.COMPRESSED_FLAG;sysFlag |= compressType.getCompressionFlag();msgBodyCompressed = true;
}
- 首先判斷消息是否大于4k( compressMsgBodyOverHowmuch = 1024 * 4),大于則進行壓縮,小于則不處理
//源碼位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行數:1070
byte[] data = compressor.compress(body, compressLevel);
- 傳入消息體以及壓縮的等級,這里大佬們提供了三種壓縮實現,分別基于三種不同的壓縮框架
在我們日常工作中,如果需要壓縮內容,也可以參考大佬們的實現,學習源碼不僅僅是了解框架的本身,也要吸取優秀的地方合理運用
構造發送message的請求的Header
message是Producer發送給Broker的一個請求,我們可以把內容抽象成兩部分組成:請求頭、請求體
- 請求體就是消息本身數據
- 請求頭 SendMessageRequestHeader 則包含了各種必要的數據,比如topic、messaeQueue等等,更多可直接查看請求頭對象源碼
最后就是使用基于netty實現的遠程調用發送消息到broker中
//源碼位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行數:1016
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,brokerName,msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);
更新broker故障信息
//源碼位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行數:742
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false, true);
程序執行到這個位置,說明前面消息發送的流程全部執行完成了,那么我們也知道了消息發送的結果,從而知道broker服務的狀態情況,我們需要把當前的broker故障情況更新到 faultItemTable 本地map中,供后續對broker服務的故障規避,faultItemTable 該map在前一篇生產者啟動中也提到過。
異步發送消息
從選擇發送方式代碼中當sendCallback!=null時則進入異步發送消息
跟蹤源碼我們可知,異步發送其實就是創建了一個單獨的線程,使用Runnable對象實現,因為會返回一個執行結果
//源碼位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行數:550
Runnable runnable = new Runnable() {@Overridepublic void run() {long costTime = System.currentTimeMillis() - beginStartTime;if (timeout > costTime) {try {sendDefaultImpl(msg, CommunicationMode.ASYNC, newCallBack, timeout - costTime);} catch (Exception e) {newCallBack.onException(e);}} else {newCallBack.onException(new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));}}executeAsyncMessageSend(runnable, msg, newCallBack, timeout, beginStartTime);
};
- sendDefaultImpl() 該方法就是和同步發送調用的同一個了,唯一區別就是類型 CommunicationMode.ASYNC 和存在回調方法newCallBack
- executeAsyncMessageSend() 執行異步消息發送
總結
本篇對生產者發送消息源碼進行了跟蹤學習,你是否也有所收獲呢。下一篇我們將對rocketMq的核心組件Broker進行源碼解讀,Broker負責接收和存儲消息,管理消息隊列,并將消息分發給消費者, 是擔任連接生產者和消費者,確保消息的高效傳輸和存儲,保證系統的可靠性和性能的重要角色。