從零開始讀RocketMq源碼(二)Message的發送詳解

目錄

前言

準備

消息發送方式

深入源碼

消息發送模式

選擇發送方式

同步發送消息

校驗消息體

獲取Topic訂閱信息

高級特性-消息重投

選擇消息隊列-負載均衡

裝載消息體發送消息

壓縮消息內容

構造發送message的請求的Header

更新broker故障信息

異步發送消息

總結


前言

上一篇我們已經對RocketMq生產者啟動源碼進行了學習《從零開始讀RocketMq源碼(一)生產者啟動》那么本篇我們將對生產者發送消息的源碼進行學習

準備

如果沒看前一篇的,這里還是要強調本篇的rocketmq版本

首先我們從github上拉取rocketmqd的源碼鏈接到本地,使用idea打開。

源碼地址:https://github.com/apache/rocketmq

目前最新版本為:5.2.0

那么我們在idea上切換分支為 release-5.2.0

注:請保持和本篇的版本一直,方便后面文章中給出的代碼塊定位

消息發送方式

在讀源碼之前我們先了解下mq支持的發送消息的類型。

消息的發送方式有三種,但我們最常用的是同步的方式發送

  • sync 同步:消息發送后,必須等待消息的發送結果返回后,才能發送下一條消息
  • async 異步:消息發送后,不用等待返回結果,直接發送下一條數據,但會設置一個回調方法接收返回結果
  • oneway 單向:消息發送后,不會返回結果,也不會等待,也不會設置回調方法。適用場景日志收集、監控數據和快速通知等對可靠性要求不高但需要高性能的場景

深入源碼

首先進入外層的producer.send()方法中

//源碼位置:
//包名:org.apache.rocketmq.example.simple
//文件名:Producer
//行數:42
SendResult sendResult = producer.send(msg);

消息發送模式

//源碼位置:
//包名:org.apache.rocketmq.client.producer
//文件名:DefaultMQProducer
//行數:431
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {msg.setTopic(withNamespace(msg.getTopic()));//批量發送if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {return sendByAccumulator(msg, null, null);} else {//單條發送return sendDirect(msg, null, null);}
}
  1. 自動批處理發送 -sendByAccumulator()
  • 該方法用于將消息累積到一個批處理容器中,等待足夠的消息數量或達到某個時間間隔后,再進行批量發送。
  • 可以顯著減少發送次數,提高吞吐量。

? ? ?2. 直接發送 -sendDirect()

  • 適用于即時發送或消息已經是批處理消息的情況

本章的重點就是直接發送消息,這也是開發中使用最頻發的方式

選擇發送方式

//源碼位置:
//包名:org.apache.rocketmq.client.producer
//文件名:DefaultMQProducer
//行數:720
public SendResult sendDirect(Message msg, MessageQueue mq,SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {// send in sync modeif (sendCallback == null) {if (mq == null) {//同步不指定隊列return this.defaultMQProducerImpl.send(msg);} else {//同步指定隊列return this.defaultMQProducerImpl.send(msg, mq);}} else {if (mq == null) {//異步不指定隊列this.defaultMQProducerImpl.send(msg, sendCallback);} else {//異步指定隊列this.defaultMQProducerImpl.send(msg, mq, sendCallback);}return null;}
}

有上面代碼可以知道,方法中提供了三個參數設置:

  • msg :消息體,這個為必填項
  • sendCallback :消息回調對象,如果這個參數不為空,則為異步發送,為空則為同步發送
  • mq :指定的隊列(指定與不指定的區別在于后續是否需要對隊列負載均衡,下面源碼中會講到)

根據最開始生產者發送消息,我們只傳入了msg,所以本次重點看同步不指定隊列代碼實現

同步發送消息

//源碼位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行數:1525
public SendResult send(Message msg,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

跟蹤代碼我們可以看到,方法中我們默認設置了CommunicationMode.SYNC 同步發送模式,并且回調參數為空,以及設置了默認超時時間3s

校驗消息體

//源碼位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行數:704
Validators.checkMessage(msg, this.defaultMQProducer);

該方法就是校驗消息內容是否合規

  • 校驗消息內容是否不為空,消息大小是否超過最大值maxMessageSize = 1024 * 1024 * 4; // 4M
  • 校驗消息發送的topic是否為不為空,以及topic的長度是否超過默認最長值127

獲取Topic訂閱信息

//源碼位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行數:709
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

該方法通過消息體中的topic名稱獲取topic的訂閱信息,該方法在我們上一篇生產者啟動中已經出現過了,深入方法內部其實就是先從本地topicPublishInfoTable map中獲取數據,沒有則從遠程nameserver中拉取

高級特性-消息重投

這是rocketMq中一個重要的特性,消息如果投遞失敗了,會重新投遞

//源碼位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行數:715
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;

這段代碼就是獲取總過重投的次數:

不難看出,只有發送方式為同步發送時才為1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() =3次,其余發送方式都只有一次機會。

只有同步發送消息才支持消息重投,如果第一次投遞失敗了,mq還回重試2次投遞

找到上面源碼位置往下看,其實可以看到下面代碼就是使用了一個for循環來進行重投

選擇消息隊列-負載均衡

通過上面我們知道,最開始并沒有指定隊列,所以需要程序來獲取一個隊列。

//源碼位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行數:724
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName, resetIndex);

因為自動創建的topic,會被默認分配4個隊列(生產環境為手動創建topic以及設置隊列數量),所以我們必須使用負載均衡保證隊列的合理分配到不同隊列上,減輕單個隊列的壓力

  • topicPublishInfo:為消息發送到指定topic的訂閱信息
  • lastBrokerName :為上一次選擇的broker名稱(如果在集群模式下,topic也會存在于多個broker上,因此記錄上一次選擇的broker名稱可以避免連續選擇同一個 Broker,從而實現更好的負載均衡和容錯處理
  • resetIndex :重置隊列索引位置(根據源碼邏輯可知,當消息進行重新投遞時會重置topic訂閱消息中隊列的索引位置)

深入上面源碼會發現,隊列負載均衡的算法獲取索引策略默認就是輪詢

//源碼位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:TopicPublishInfo
//行數:101
int index = Math.abs(sendQueue.incrementAndGet() % messageQueueList.size());

負載均衡策略

  1. 輪詢策略 (Round-Robin)
  2. 隨機策略 (Random)
  3. 一致性哈希策略 (Consistent Hashing)
  4. 權重隨機策略 (Weighted Random)
  5. 最少連接策略 (Least Connections)

裝載消息體發送消息

//源碼位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行數:740
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);

該方法就是發現消息的核心方法了,不管是同步發送還是異步發送都會執行該方法

做一些發送消息前的準備,接下深入該方法查看

壓縮消息內容

//源碼位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行數:898
if (this.tryToCompressMessage(msg)) {sysFlag |= MessageSysFlag.COMPRESSED_FLAG;sysFlag |= compressType.getCompressionFlag();msgBodyCompressed = true;
}
  • 首先判斷消息是否大于4k( compressMsgBodyOverHowmuch = 1024 * 4),大于則進行壓縮,小于則不處理
//源碼位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行數:1070
byte[] data = compressor.compress(body, compressLevel);
  • 傳入消息體以及壓縮的等級,這里大佬們提供了三種壓縮實現,分別基于三種不同的壓縮框架

在我們日常工作中,如果需要壓縮內容,也可以參考大佬們的實現,學習源碼不僅僅是了解框架的本身,也要吸取優秀的地方合理運用

構造發送message的請求的Header

message是Producer發送給Broker的一個請求,我們可以把內容抽象成兩部分組成:請求頭請求體

  • 請求體就是消息本身數據
  • 請求頭 SendMessageRequestHeader 則包含了各種必要的數據,比如topicmessaeQueue等等,更多可直接查看請求頭對象源碼

最后就是使用基于netty實現的遠程調用發送消息到broker中

//源碼位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行數:1016
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,brokerName,msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);

更新broker故障信息

//源碼位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行數:742
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false, true);

程序執行到這個位置,說明前面消息發送的流程全部執行完成了,那么我們也知道了消息發送的結果,從而知道broker服務的狀態情況,我們需要把當前的broker故障情況更新到 faultItemTable 本地map中,供后續對broker服務的故障規避faultItemTable 該map在前一篇生產者啟動中也提到過。

異步發送消息

選擇發送方式代碼中當sendCallback!=null時則進入異步發送消息

跟蹤源碼我們可知,異步發送其實就是創建了一個單獨的線程,使用Runnable對象實現,因為會返回一個執行結果

//源碼位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行數:550
Runnable runnable = new Runnable() {@Overridepublic void run() {long costTime = System.currentTimeMillis() - beginStartTime;if (timeout > costTime) {try {sendDefaultImpl(msg, CommunicationMode.ASYNC, newCallBack, timeout - costTime);} catch (Exception e) {newCallBack.onException(e);}} else {newCallBack.onException(new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));}}executeAsyncMessageSend(runnable, msg, newCallBack, timeout, beginStartTime);
};
  • sendDefaultImpl() 該方法就是和同步發送調用的同一個了,唯一區別就是類型 CommunicationMode.ASYNC 和存在回調方法newCallBack
  • executeAsyncMessageSend() 執行異步消息發送

總結

本篇對生產者發送消息源碼進行了跟蹤學習,你是否也有所收獲呢。下一篇我們將對rocketMq的核心組件Broker進行源碼解讀,Broker負責接收和存儲消息,管理消息隊列,并將消息分發給消費者, 是擔任連接生產者和消費者,確保消息的高效傳輸和存儲,保證系統的可靠性和性能的重要角色。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/42188.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/42188.shtml
英文地址,請注明出處:http://en.pswp.cn/web/42188.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Open3D KDtree的建立與使用

目錄 一、概述 1.1kd樹原理 1.2kd樹搜索原理 1.3kd樹構建示例 二、常見的領域搜索方式 2.1K近鄰搜索(K-Nearest Neighbors, KNN Search) 2.2半徑搜索(Radius Search) 2.3混合搜索(Hybrid Search) …

ai native 模型微調

AI native 模型微調(fine-tuning)是指在預訓練模型的基礎上,通過對其參數進行進一步訓練,使其在特定任務上表現更佳。以下是關于模型微調的一些基本步驟和概念: ### 1. 準備數據集 - **數據收集**:收集適用…

后端之路——登錄校驗前言(Cookie\ Session\ JWT令牌)

前言:Servlet 【登錄校驗】這個功能技術的基礎是【會話技術】,那么在講【會話技術】的時候必然要談到【Cookie】和【Session】這兩個東西,那么在這之前必須要先講一下一個很重要但是很多人都會忽略的一個知識點:【Servlet】 什么是…

Oracle PL/SQL 循環批量執行存儲過程

1. 查詢存儲過程 根據數據字典USER_OBJECTS查詢出所有存儲過程。 2. 動態拼接字符串(參數等) 根據數據字典USER_ARGUMENTS動態拼接參數。 3. 動態執行 利用EXECUTE IMMEDIATE動態執行無名塊。 4. 輸出執行信息 利用DBMS_OUTPUT.PUT_LINE輸出執行成功與…

Android Gradle 開發與應用 (十): Gradle 腳本最佳實踐

目錄 1. 使用Gradle Kotlin DSL 1.1 什么是Gradle Kotlin DSL 1.2 遷移到Kotlin DSL 1.3 優勢分析 2. 優化依賴管理 2.1 使用依賴版本管理文件 2.2 使用依賴分組 3. 合理使用Gradle插件 3.1 官方插件和自定義插件 3.2 插件管理的最佳實踐 4. 任務配置優化 4.1 使用…

Oracle 19c 統一審計表清理

zabbix 收到SYSAUX表空間告警超過90%告警,最后面給出的清理方法只適合ORACLE 統一審計表的清理,傳統審計表的清理SYS.AUD$不適合,請注意。 SQL> Col tablespace_name for a30 Col used_pct for a10 Set line 120 pages 120 select total.…

STM32實戰篇:閃燈 × 流水燈 × 蜂鳴器

IO引腳初始化 即開展某項活動之前所做的準備工作,對于一個IO引腳來說,在使用它之前必須要做一些參數配置(例如:選擇工作模式、速率)的工作(即IO引腳的初始化)。 IO引腳初始化流程 1、使能IO引…

LED燈的呼吸功能

"呼吸功能"通常是指 LED 燈的一種工作模式,它模擬人類的呼吸節奏,即 LED 燈的亮度會周期性地逐漸增強然后逐漸減弱,給人一種 LED 在"呼吸"的感覺。這種效果通常用于指示設備的狀態或者簡單地作為裝飾效果。(就…

Spring Boot Security自定義AuthenticationProvider

以下是一個簡單的示例,展示如何使用AuthenticationProvider自定義身份驗證。首先,創建一個繼承自標準AuthenticationProvider的類,并實現authenticate方法。 import com.kamier.security.web.service.MyUser; import org.springframework.se…

【Adobe】Photoshop圖層的使用

Adobe Photoshop(簡稱PS)中的圖層是圖像處理中一個核心概念,它允許用戶以堆疊的方式組織圖像的不同部分,從而實現對圖像的復雜編輯和處理而不影響原始圖像。以下是關于Adobe Photoshop圖層的詳細介紹: 一、圖層的定義 圖層就像是透明的紙張,你可以在上面繪制、添加圖像…

YOLOv10改進 | EIoU、SIoU、WIoU、DIoU、FocusIoU等二十余種損失函數

一、本文介紹 這篇文章介紹了YOLOv10的重大改進,特別是在損失函數方面的創新。它不僅包括了多種IoU損失函數的改進和變體,如SIoU、WIoU、GIoU、DIoU、EIOU、CIoU,還融合了“Focus”思想,創造了一系列新的損失函數。這些組合形式的…

Android Init Language自學筆記

Android Init Language由五個元素組成:Acttions、Commands、Services、Options和Imports。 Actions和Services隱式聲明了一個新的section。所以的Commands和Options都屬于最近聲明的section。 Services具有唯一的名稱,如果重名會報錯。 Actions Acti…

解決Spring Boot中的高可用性設計

解決Spring Boot中的高可用性設計 大家好,我是微賺淘客系統3.0的小編,也是冬天不穿秋褲,天冷也要風度的程序猿! 1. 高可用性設計概述 1.1 什么是高可用性? 高可用性指系統在面對各種故障和異常情況時,仍…

獨立開發者系列(22)——API調試工具apifox的使用

接口的邏輯已經實現,需要對外發布接口,而發布接口的時候,我們需要能自己簡單調試接口。當然,其實自己也可以寫簡單的代碼調試自己的接口,因為其實就是簡單的request請求或者curl庫讀取,調整請求方式get或者…

如果MySQL出現 “Too many connections“ 錯誤,該如何解決?

當你想要連接MySQL時出現"Too many connections" 報錯的情況下,該如何解決才能如愿以償呢?都是哥們兒,就教你兩招吧! 1.不想重啟數據庫的情況下 你可以嘗試采取以下方法來解決: 增加連接數限制&#xff1a…

RxJava學習記錄

文章目錄 1. 總覽1.1 基本原理1.2 導入包和依賴 2. 操作符2.1 創建操作符2.2 轉換操作符2.3 組合操作符2.4 功能操作符 1. 總覽 1.1 基本原理 參考文獻 構建流:每一步操作都會生成一個新的Observable節點(沒錯,包括ObserveOn和SubscribeOn線程變換操作…

asp.netWebForm(.netFramework) CSRF漏洞

asp.netWebForm(.netFramework) CSRF漏洞 CSRF(Cross-Site Request Forgery)跨站請求偽造是一種常見的 Web 應用程序安全漏 洞,攻擊者通過誘使已認證用戶在受信任的網站上執行惡意操作,從而利用用戶的身份 執行未經授權的操作。攻…

echarts實現3D餅圖

先看下最終效果 實現思路 使用echarts-gl的曲面圖&#xff08;surface&#xff09;類型 通過parametric繪制曲面參數實現3D效果 代碼實現 <template><div id"surfacePie"></div> </template> <script setup>import {onMounted} fro…

簡單的找到自己需要的flutter ui 模板

簡單的找到自己需要的flutter ui 模板 網站 https://flutterawesome.com/ 簡介 我原本以為會很難用 實際上不錯 很簡單 打開后界面類似于,右上角可以搜索 點擊view github 相當簡單 很oks

RabbitMq,通過prefetchCount限制消費并發數

1.問題:項目瓶頸,通過rabbitMq來異步上傳圖片,由于并發上傳的圖片過多導致阿里OSS異常, 解決方法:通過prefetchCount限制圖片上傳OSS的并發數量 2.定義消費者 Component AllArgsConstructor Slf4j public class ReceiveFaceImageEvent {private final UPloadService uploadSe…