從零開始讀RocketMq源碼(三)Broker存儲Message流程解析

目錄

前言

準備

消息載體CommitLog

文件持久化位置?

源碼解析

broker消息對象MessageExtBrokerInner

異步存儲message

CommitLog的真相

創建MappedFile文件

加入異步刷盤隊列

Message異步存儲MappedByteBuffer

總結


前言

在面試中我們經常會聽到這樣的回答,生產者將message發送給broker服務,然后消費者從broker中獲取消息并消費,為了保證message在broker服務中不丟失,mq會對消息數據進行持久化到磁盤中。那么message到達broker服務后是如何進行存儲并持久化到磁盤中的呢?這就是本篇要學習的內容。

準備

源碼地址:https://github.com/apache/rocketmq

目前最新版本為:5.2.0

那么我們在idea上切換分支為 release-5.2.0

消息載體CommitLog

該對象是broker服務接收到message后進行存儲的數據對象,一般就把存儲消息的文件就稱為commitLog文件也就是最終存儲磁盤上的數據文件。

大致的message流向如圖:

根據源碼可以知道,一個commitLog文件最大存儲1G數據,文件寫滿了,則會寫入下一個文件中

文件持久化位置?

commitlog文件的持久化存放的位置是通過broker.conf配置文件中storePathCommitLog配置

storePathCommitLog = /Users/leonsh/rocketmqnamesrv/store/commitlog

最后生成的文件為這樣

文件命名

查看上面圖片可知文件的名稱是一串數字20個0組成,因為文件名稱是按照偏移量offset來命名的,

因為這是第一個文件所以offset為0,補全20位,所以文件名稱為20個0

,以此類推第二個文件名稱則為00000000001073741824

上面說過一個commitlog文件最大存儲1G,而1G=1024*1024*1024=1073741824bit,這就是第二個文件的偏移量

源碼解析

前面說到Producer發送message到broker后,broker會對接收的message請求進行處理

//源碼位置
//包名:org.apache.rocketmq.broker.processor
//文件:SendMessageProcessor
//行數:87
public RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request)

上面的方法名中顧名思義就是處理請求的,并且所在的文件命名SendMessageProcessor也說明了該類的作用。那么我們就從該方法深入源碼中

看方法引用位置我們會發現許多地方調用了該方法,先拋開前面broker如何接收的,反正最后消息會到達這里,從該方法開始就是broker處理message的核心流程也是本篇學習的重點

broker消息對象MessageExtBrokerInner

MessageExtBrokerInner該對象就是用來后續對message處理的封裝

//源碼位置
//包名:org.apache.rocketmq.broker.processor
//文件:SendMessageProcessor
//行數:255
//獲取請求對象中的消息體
final byte[] body = request.getBody();
int queueIdInt = requestHeader.getQueueId();
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (queueIdInt < 0) {queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
}
//初始化消息對象
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);
  • requestHeader 該對象就是在上一篇中講到的發送message的消息請求頭
  • 從請求頭中獲取設置的隊列id,如果沒有設置,則會從對應的topic中隨機獲取一個randomQueueId()
  • 從請求頭中獲取topic名稱,通過名稱再去獲取broker中存儲的topic對應的數據對象,深入源碼會發現,broker中存儲topic數據也是使用的map,ConcurrentMap<String, TopicConfig> topicConfigTable
  • 最后就是創建MessageExtBrokerInner對象并設值

異步存儲message

//源碼位置
//包名:org.apache.rocketmq.broker.processor
//文件:SendMessageProcessor
//行數:255
CompletableFuture<PutMessageResult> asyncPutMessageFuture;
if (sendTransactionPrepareMessage) {//事務消息asyncPutMessageFuture = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {//普通消息asyncPutMessageFuture = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}

或許大家和博主開始一樣都有一個疑惑,我們生產者發送的是同步消息,為何到了broker卻是異常存儲呢?

1.其實生產者發送同步消息和broker異步存儲都是相互獨立互不干擾的,broker異步存儲只是為了提高mq接收消息的寫入性能吞吐量broker異步存儲會將寫入內存的message進行異步刷盤。

2.就算broker是異步存儲但也不會立即返回結果給生產者,需要等待broker異步刷盤成功才會返回結果給生產者,通過broker提供的CompletableFuture機制實現。

什么,看完解釋還是有點懵,有點抽象,我們繼續向下深入源碼,一步一步解開疑惑,我相信看完后面的解析同樣會豁然開朗的!

CommitLog的真相

//源碼位置
//包名:org.apache.rocketmq.broker.processor
//文件:SendMessageProcessor
//行數:255
CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);

到這里,本文開頭提到的commitLog對象終于出現了,查看該源碼可知,commitlog對象中定義了一個MappedFileQueue對象這個對象又是做什么的,我們繼續深入源碼

//源碼位置
//包名:org.apache.rocketmq.store
//文件:CommitLog
//行數:942
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

深入該方法,大概意思就是從MappedFileQueue對象中的CopyOnWriteArrayList<MappedFile> mappedFiles集合中取出里面的最后一個MappedFile對象,至此贏來大結局MappedFile對象才是最終映射到磁盤文件的,而CommitLog可以理解為MappedFile對象的外層封裝。但落到磁盤上的文件我們依然稱為commitLog文件

擴展:

CopyOnWriteArrayList 是 Java 中的一種線程安全的 List 實現,屬于 java.util.concurrent 包

讀操作:不需要加鎖,直接操作底層數組,底層數組在寫操作時是一個副本,讀操作不會影響正在進行的寫操作,能夠保證高效的并發讀性能。

寫操作:會創建底層數組的一個新的副本,對這個副本進行修改, 修改完成后,新的副本會替換原來的數組

創建MappedFile文件

//源碼位置
//包名:org.apache.rocketmq.store
//文件:CommitLog
//行數:1001
if (null == mappedFile || mappedFile.isFull()) {mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noiseif (isCloseReadAhead()) {setFileReadMode(mappedFile, LibC.MADV_RANDOM);}
}

因為broker是啟動后首次存儲數據,所以上面獲取出來的mappedFile一定為空則進入if代碼塊

因此偏移量也是初始值0

生成MappedFile文件路徑名稱

//源碼位置
//包名:org.apache.rocketmq.store
//文件:MappedFileQueue
//行數:345
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
String nextNextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset+ this.mappedFileSize);
  • this.storePath:該字段就是前面在broker.conf文件中配置的文件地址
  • File.separator:分隔符
  • UtilAll.offset2FileName(createOffset):生成20位數字組成的文件名稱,當前createOffset=0。

為何會生成兩個地址nextFilePathnextNextFilePath呢?

因為mq在生成當前需要使用的文件時同時生成下一個使用的文件,當第一個文件存儲滿后,直接使用下一個文件,減小了創建文件的開銷,提高mq的性能。所以會同時生成2個文件

那么問題來了,為何本文開頭生成的文件怎么只有一個?

我們查看源碼提交記錄可知,nextNextFilePath第二個文件是2021年9月才新增的

查看rocketMq在github上各個版本的發布時間,2021年9月并沒有發布新版本,但是2021年10月發布了rocketmq-all-4.9.2

那么由此可得,rocketMq同時創建2個文件從版本4.9.2開始支持,之前的版本都只會創建1個文件

因為博主的broker服務是通過docker鏡像啟動的,但是查看鏡像版本顯示的確為最新版本

其實這只是rocketMq鏡像的版本,而我們看的是鏡像中使用rocketMq框架版本

執行命令查看鏡像的詳細信息

docker inspect apacherocketmq/rocketmq:latest

由此可得博主的rocketMq版本為:4.6.0,所以只會創建一個commitLog文件

加入異步刷盤隊列

//源碼位置
//包名:org.apache.rocketmq.store
//文件:AllocateMappedFileService
//行數:62
AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
//...
//加入隊列觸發異步刷盤操作
boolean offerOK = this.requestQueue.offer(nextReq);
//...
AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
//...
boolean offerOK = this.requestQueue.offer(nextNextReq);
  • AllocateRequest:就是message異步存儲請求最后的封裝
  • this.requestTable:也是一個map對象 ConcurrentMap<String, AllocateRequest> requestTable;key為文件的路徑,value則為AllocateRequest
  • this.requestQueue這是一個隊列PriorityBlockingQueue<AllocateRequest> requestQueue隊列元素為AllocateRequest

PriorityBlockingQueue是如何做到異步刷盤的呢?

該隊列就是為broker實現異步存儲核心,可能大家對這個隊列比較陌生

它是Java 中 java.util.concurrent 包提供的一個線程安全的優先級隊列。它基于優先級堆實現,能夠保證元素按照自然順序或者指定的比較器順序進行排序

因為它是一個隊列那么我們首先就會想到生產者消費者,那么就起到了異步解耦的作用

他有兩個非常重要的方法:

  • offer(): 將一個元素插入到隊列中
  • take(): 從隊列中獲取并移除元素 由于 PriorityBlockingQueue 是一個阻塞隊列,如果隊列為空,take 方法會一直阻塞直到有元素可用

總結:由上面我們知道offer()一般用于生產者調用,而take()則是消費者調用,當隊列為空時消費者線程會一直阻塞,只要隊列中存入對象,消費者就會感知到并消費。可以理解為消費者和生產者共享PriorityBlockingQueue對象

//源碼位置
//包名:org.apache.rocketmq.store
//文件:AllocateMappedFileService
//行數:99
AllocateRequest result = this.requestTable.get(nextFilePath);
//...
//阻塞等待刷盤結果
boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);

上面源碼的作用就是等待異步刷盤結果

  • 第一段就是取出之前存入的第一個請求對象AllocateRequest
  • 第二段則是判斷異步刷盤是否完成,成功則返回,還沒有處理完則一直阻塞,直到達到超時時間waitTimeOut

result.getCountDownLatch().await為何能做到阻塞等待結果呢?

進入AllocateRequest對象中可知,操作的是這個對象CountDownLatch countDownLatch = new CountDownLatch(1)

CountDownLatch或許大家不太熟悉,但ReentrantLock大家并不陌生吧,面試中經常問到,他們同屬于java并發包JUC( java.util.concurrent 下的對象.

概念:它允許一個或多個線程等待,直到在其他線程中執行的一組操作完成。它是通過一個計數器實現的,該計數器初始化為一個給定的值。每當一個線程完成了它的一項操作后,這個計數器就遞減。當計數器的值到達零時,等待在這個計數器上的線程將被喚醒并繼續執行

總結:通過源碼我們看到AllocateRequest被創建時里面屬性CountDownLatch中計數器默認就是1所以需要一直等待被修改為0時才會繼續執行后續邏輯,那就是等待異步刷盤完成。

Message異步存儲MappedByteBuffer

//源碼位置
//包名:org.apache.rocketmq.store
//文件:AllocateMappedFileService
//行數:155
AllocateRequest req = null;
req = this.requestQueue.take();
AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());

該源碼就是對之前加入隊列的AllocateRequest取出來,并執行后續的存儲操作,可以說就是消費者消費的地方,我們可以結合源碼上下文代碼可以知道,所在的類的頂級繼承類是Runnable,而上面代碼所在方法就是被重寫的run()方法調用,可以認為消費者是在單獨的一個線程中執行的。

獲取緩沖區

//源碼位置
//包名:org.apache.rocketmq.store.logfile
//文件:DefaultMappedFile
//行數:607
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();

被操作的對象是MappedByteBuffer

MappedByteBuffer是什么?

是 Java NIO(New Input/Output)中的一個類,它允許將文件直接映射到內存中,從而提高文件的讀寫效率。RocketMQ 使用 MappedByteBuffer 來管理 CommitLog 文件,以實現高效的消息存儲和檢索通過將文件映射到內存,RocketMQ 可以直接操作內存數據,而無需頻繁的磁盤 I/O 操作。

MappedByteBuffer也是mmap的一種實現方式

什么是mmap?

mmap(內存映射文件)是一種將文件內容映射到進程的地址空間的技術。這樣一來,文件內容就可以像訪問內存一樣被讀寫,從而顯著提高 I/O 操作的效率。

調用mappedByteBuffer.slice()方法的作用是什么?

用于創建一個新的緩沖區,該緩沖區與原始緩沖區共享相同的底層內存,但具有獨立的位置、限制和標記。這在需要操作內存映射文件的某一部分時非常有用,而不影響整個映射文件的其他部分。

MappedByteBuffer有兩大特點:

  • 延遲寫入:數據寫入 MappedByteBuffer 時,實際上是寫入了內存中的映射區域,操作系統會在合適的時候將這些數據同步到磁盤,而不是立即進行磁盤 I/O 操作。
  • 強制刷新:為了確保數據的一致性和持久性,MappedByteBuffer 提供了 force() 方法,可以將內存中的修改強制刷新到磁盤
//源碼位置
//包名:org.apache.rocketmq.store.logfile
//文件:DefaultMappedFile
//行數:611
byteBuffer.put((int) i, (byte) 0);
//...
mappedByteBuffer.force();

總結:那么在RocketMQ 中,MappedFile 類通過使用 MappedByteBuffer 來管理 CommitLog 文件,并且使用 slice() 方法來創建子緩沖區進行局部操作,通過延遲寫入減少了頻繁的磁盤 I/O 操作,定期調用 force() 方法,將內存中的數據同步到磁盤,減少數據丟失的風險。這樣可以提高性能和靈活性,特別是在處理大量消息時。


內存數據的刷盤過程本篇就不在深究,只要知道是通過MappedByteBuffer對延遲寫入配置相關策略,并在設定的時期將內存數據寫入磁盤文件中就可以了


基于上面所有內容重新修改一版簡易的流程圖如下

總結

本篇涉及到的知識面比較廣,在broker存儲message中出現了許多我們在日常開發中并不常見但功能強大的對象,比如PriorityBlockingQueueCountDownLatchMappedByteBufferRocketMq正是合理的運用了他們,從而造就了rocketMq本身這款優秀的消息隊列框架,這也是我們讀源碼所要學習的。下一篇我們將學習RocketMq的“大腦”NameServer!

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/45005.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/45005.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/45005.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

國產化趨勢下源代碼數據防泄密的信創沙盒的方案分享

隨著國產化的大力推進&#xff0c;越來越多的企事業單位在逐步替換Windows、Linux等操作系統的使用。那么什是國產化了&#xff1f;國產化是指在產品或服務中采用國內自主研發的技術和標注&#xff0c;替代過去依賴的他國的產品和服務&#xff0c;國產化又被稱之為“信創”&…

GitLab CI/CD實現項目自動化部署

1 GitLab CI/CD介紹 GitLab CI/CD 是 GitLab 中集成的一套用于軟件開發的持續集成&#xff08;Continuous Integration&#xff09;、持續交付&#xff08;Continuous Delivery&#xff09;和持續部署&#xff08;Continuous Deployment&#xff09;工具。這套系統允許開發團隊…

vue里實現點擊按鈕回到頁面頂部功能,博客必備!

效果 步驟 1-標簽結構 動態綁定樣式style&#xff0c;監聽點擊事件&#xff0c;后續控制opacity透明度。和滾動距離 <div class"toTop" :style"dynamicStyles" click"toTop"><!--<i class"fa fa-arrow-up"></i>…

Django ORM中的F 對象

F 對象非常強大&#xff0c;可以在查詢和更新操作中進行復雜的字段間運算。 假設我們有一個包含商品信息的模型 Product&#xff1a; from django.db import modelsclass Product(models.Model):name models.CharField(max_length100)price models.DecimalField(max_digits…

MySQL向自增列插入0失敗問題

問題 在一次上線時&#xff0c;發現通過腳本添加的狀態表中&#xff0c;待提交的狀態不正確&#xff0c;本來應該是0&#xff0c;線上是101。 原因 默認情況下&#xff0c;MySQL對應自增列&#xff0c;認為0和null等價&#xff08;因為mysql認為0不是最佳實踐不推薦使用&…

超簡單的通配證書簽發工具,免費,無需安裝任何插件到本地

常見的acme.sh 或者 lego等工具需要配置&#xff0c;安裝不靈活&#xff0c;續簽需要配置計劃任務&#xff0c;簽發單域名證書或者通配證書需要不同的指令和配置&#xff0c;繁瑣&#xff0c;如果自己程序想要對接簽發證書的api有的不支持&#xff0c;有的用起來繁瑣。 最近發…

[手機Linux PostmarketOS]三, Alpine Linux命令使用

Alpine Linux 一些常用的指令&#xff1a; 添加國內源下載鏈接&#xff1a; 編譯以下文件&#xff0c;添加鏈接進去&#xff1a; sudo vi /etc/apk/repositories##清華源&#xff1a; https://mirror.tuna.tsinghua.edu.cn/alpine/latest-stable/main https://mirror.tuna.tsi…

【VIVADO SDK調試遇到DataAbortHandler】

問題 SDK調試遇到DataAbortHandler問題。 運行后不顯示結果&#xff0c;debug模式下發現進入DataAbortHandler異常函數。程序中存在大數組。 原因:SDK默認的堆棧為1024bytes,需要將堆棧調大。 修改方法&#xff1a; 解決:對application中src下的lscript.ld雙擊&#xff0c;…

android 添加一個水平線

在Android中&#xff0c;添加一個水平線通常可以通過幾種方式實現&#xff0c;最常見的是使用View組件或者自定義的Drawable。下面是一個簡單的例子&#xff0c;展示如何在布局文件中添加一個水平線&#xff1a; 使用View組件 在你的布局XML文件中&#xff0c;你可以添加一個…

Linux 程序卡死的特殊處理

一、前言 Linux環境。 我們在日常編寫的程序中&#xff0c;可能會出現一些細節問題&#xff0c;導致程序卡死&#xff0c;即程序沒法正常運行&#xff0c;界面卡住&#xff0c;也不會閃退... 當這種問題出現在客戶現場&#xff0c;那就是大問題了。。。 當我們暫時還無法排…

Python如何調用C++

ctypes 有以下優點: Python內建&#xff0c;不需要單獨安裝Python可以直接調用C/C 動態鏈接庫(.dll 或 .so)在Python一側&#xff0c;不需要了解 c/c dll 內部的工作方式提供了 C/C 數據類型與Python類型的相互映射&#xff0c;以及轉換&#xff0c;包括指針類型。 在使用cty…

如何定量選擇孔銷基準?-DTAS來幫你!

在當今快速發展的工程領域&#xff0c;公差仿真的作用日漸重要&#xff0c;在公差仿真中&#xff0c;基準體系的選擇對于最終結果更是至關重要。基準體系不同可能導致仿真過程中的參數計算、誤差分析以及最終的工程設計都有所不同。基準體系作為評估和比較的參照&#xff0c;直…

Suricata引擎二次開發之命中規則定位

二開背景 suricata是一款高性能的開源網絡入侵檢測防御引擎&#xff0c;旨在檢測、預防和應對網絡中的惡意活動和攻擊。suricata引擎使用多線程技術&#xff0c;能夠快速、準確地分析網絡流量并識別潛在的安全威脅&#xff0c;是眾多IDS和IPS廠商的底層規則檢測模塊。 前段時間…

強制升級最新系統,微軟全面淘汰Win10和部分11用戶

說出來可能不信&#xff0c;距離 Windows 11 正式發布已過去整整三年時間&#xff0c;按理說現在怎么也得人均 Win 11 水平了吧&#xff1f; 然而事實卻是&#xff0c;三年時間過去 Win 11 占有率僅僅突破到 29%&#xff0c;也就跳起來摸 Win 10 屁股的程度。 2024 年 6 月 Wi…

【Linux】磁盤性能壓測-FIO工具

一、FIO工具介紹 fio&#xff08;Flexible I/O Tester&#xff09;是一個用于評估計算機系統中 I/O 性能的強大工具。 官網&#xff1a;fio - fio - Flexible IO Tester 注意事項&#xff01; 1、不要指定文件系統名稱&#xff08;如/dev/mapper/centos-root)&#xff0c;避…

react啟用mobx @decorators裝飾器語法

react如果沒有經過配置&#xff0c;直接使用decorators裝飾器語法會報錯&#xff1a; Support for the experimental syntax ‘decorators’ isn’t currently enabled 因為react默認是不支持裝飾器語法&#xff0c;需要做一些配置來啟用裝飾器語法。 step1: 在 tsconfig.js…

【學術會議征稿】第三屆能源互聯網及電力系統國際學術會議(ICEIPS 2024)

第三屆能源互聯網及電力系統國際學術會議&#xff08;ICEIPS 2024&#xff09; 2024 3rd International Conference on Energy Internet and Power Systems 能源互聯網是實現新一代電力系統智能互動、開放共享的重要支撐技術之一&#xff0c;也是提升能源調度效率&#xff0…

SQL 存儲過程

SQL&#xff08;Structured Query Language&#xff09;的存儲過程&#xff08;Stored Procedure&#xff09;是一組為了完成特定功能的SQL語句集&#xff0c;它經編譯后存儲在數據庫中&#xff0c;用戶通過指定存儲過程的名字并給它傳遞參數&#xff08;如果有的話&#xff09…

Jetson-AGX-Orin 非docker環境源碼編譯安裝CyberRT

Jetson-AGX-Orin 非docker環境源碼編譯安裝CyberRT 1、安裝依賴 sudo apt update sudo apt-get install g gdb gcc cmake sudo apt install libpoco-dev uuid-dev libncurses5-dev python3-dev python3-pip python3 -m pip install protobuf3.14.02、下載CyberRT源碼 git cl…

【代碼隨想錄算法訓練Day65】卡碼網47.參加科學大會、卡碼網94. 城市間貨物運輸 I

Day65 圖論第九天 卡碼網47.參加科學大會 #include <iostream> #include <vector> #include <list> #include <queue> #include <climits> using namespace std; // 小頂堆 class mycomparison { public:bool operator()(const pair<int, …