目錄
前言
準備
消息載體CommitLog
文件持久化位置?
源碼解析
broker消息對象MessageExtBrokerInner
異步存儲message
CommitLog的真相
創建MappedFile文件
加入異步刷盤隊列
Message異步存儲MappedByteBuffer
總結
前言
在面試中我們經常會聽到這樣的回答,生產者將message發送給broker服務,然后消費者從broker中獲取消息并消費,為了保證message在broker服務中不丟失,mq會對消息數據進行持久化到磁盤中。那么message到達broker服務后是如何進行存儲并持久化到磁盤中的呢?這就是本篇要學習的內容。
準備
源碼地址:https://github.com/apache/rocketmq
目前最新版本為:5.2.0
那么我們在idea上切換分支為 release-5.2.0
消息載體CommitLog
該對象是broker服務接收到message后進行存儲的數據對象,一般就把存儲消息的文件就稱為commitLog文件也就是最終存儲磁盤上的數據文件。
大致的message流向如圖:
根據源碼可以知道,一個commitLog文件最大存儲1G數據,文件寫滿了,則會寫入下一個文件中
文件持久化位置?
commitlog文件的持久化存放的位置是通過broker.conf配置文件中storePathCommitLog配置
storePathCommitLog = /Users/leonsh/rocketmqnamesrv/store/commitlog
最后生成的文件為這樣
文件命名
查看上面圖片可知文件的名稱是一串數字20個0組成,因為文件名稱是按照偏移量offset來命名的,
因為這是第一個文件所以offset為0,補全20位,所以文件名稱為20個0
,以此類推第二個文件名稱則為00000000001073741824
上面說過一個commitlog文件最大存儲1G,而1G=1024*1024*1024=1073741824bit,這就是第二個文件的偏移量
源碼解析
前面說到Producer發送message到broker后,broker會對接收的message請求進行處理
//源碼位置
//包名:org.apache.rocketmq.broker.processor
//文件:SendMessageProcessor
//行數:87
public RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request)
上面的方法名中顧名思義就是處理請求的,并且所在的文件命名SendMessageProcessor也說明了該類的作用。那么我們就從該方法深入源碼中
看方法引用位置我們會發現許多地方調用了該方法,先拋開前面broker如何接收的,反正最后消息會到達這里,從該方法開始就是broker處理message的核心流程,也是本篇學習的重點
broker消息對象MessageExtBrokerInner
MessageExtBrokerInner該對象就是用來后續對message處理的封裝
//源碼位置
//包名:org.apache.rocketmq.broker.processor
//文件:SendMessageProcessor
//行數:255
//獲取請求對象中的消息體
final byte[] body = request.getBody();
int queueIdInt = requestHeader.getQueueId();
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (queueIdInt < 0) {queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
}
//初始化消息對象
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);
- requestHeader 該對象就是在上一篇中講到的發送message的消息請求頭
- 從請求頭中獲取設置的隊列id,如果沒有設置,則會從對應的topic中隨機獲取一個randomQueueId()
- 從請求頭中獲取topic名稱,通過名稱再去獲取broker中存儲的topic對應的數據對象,深入源碼會發現,broker中存儲topic數據也是使用的map,ConcurrentMap<String, TopicConfig> topicConfigTable;
- 最后就是創建MessageExtBrokerInner對象并設值
異步存儲message
//源碼位置
//包名:org.apache.rocketmq.broker.processor
//文件:SendMessageProcessor
//行數:255
CompletableFuture<PutMessageResult> asyncPutMessageFuture;
if (sendTransactionPrepareMessage) {//事務消息asyncPutMessageFuture = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {//普通消息asyncPutMessageFuture = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
或許大家和博主開始一樣都有一個疑惑,我們生產者發送的是同步消息,為何到了broker卻是異常存儲呢?
1.其實生產者發送同步消息和broker異步存儲都是相互獨立互不干擾的,broker異步存儲只是為了提高mq接收消息的寫入性能和吞吐量。broker異步存儲會將寫入內存的message進行異步刷盤。
2.就算broker是異步存儲,但也不會立即返回結果給生產者,需要等待broker異步刷盤成功才會返回結果給生產者,通過broker提供的CompletableFuture機制實現。
什么,看完解釋還是有點懵,有點抽象,我們繼續向下深入源碼,一步一步解開疑惑,我相信看完后面的解析同樣會豁然開朗的!
CommitLog的真相
//源碼位置
//包名:org.apache.rocketmq.broker.processor
//文件:SendMessageProcessor
//行數:255
CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
到這里,本文開頭提到的commitLog對象終于出現了,查看該源碼可知,commitlog對象中定義了一個MappedFileQueue對象,這個對象又是做什么的,我們繼續深入源碼
//源碼位置
//包名:org.apache.rocketmq.store
//文件:CommitLog
//行數:942
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
深入該方法,大概意思就是從MappedFileQueue對象中的CopyOnWriteArrayList<MappedFile> mappedFiles;集合中取出里面的最后一個MappedFile對象,至此贏來大結局MappedFile對象才是最終映射到磁盤文件的,而CommitLog可以理解為MappedFile對象的外層封裝。但落到磁盤上的文件我們依然稱為commitLog文件
擴展:
CopyOnWriteArrayList 是 Java 中的一種線程安全的 List 實現,屬于 java.util.concurrent 包
讀操作:不需要加鎖,直接操作底層數組,底層數組在寫操作時是一個副本,讀操作不會影響正在進行的寫操作,能夠保證高效的并發讀性能。
寫操作:會創建底層數組的一個新的副本,對這個副本進行修改, 修改完成后,新的副本會替換原來的數組
創建MappedFile文件
//源碼位置
//包名:org.apache.rocketmq.store
//文件:CommitLog
//行數:1001
if (null == mappedFile || mappedFile.isFull()) {mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noiseif (isCloseReadAhead()) {setFileReadMode(mappedFile, LibC.MADV_RANDOM);}
}
因為broker是啟動后首次存儲數據,所以上面獲取出來的mappedFile一定為空則進入if代碼塊
因此偏移量也是初始值0
生成MappedFile文件路徑名稱
//源碼位置
//包名:org.apache.rocketmq.store
//文件:MappedFileQueue
//行數:345
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
String nextNextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset+ this.mappedFileSize);
- this.storePath:該字段就是前面在broker.conf文件中配置的文件地址
- File.separator:分隔符
- UtilAll.offset2FileName(createOffset):生成20位數字組成的文件名稱,當前createOffset=0。
為何會生成兩個地址nextFilePath與nextNextFilePath呢?
因為mq在生成當前需要使用的文件時同時生成下一個使用的文件,當第一個文件存儲滿后,直接使用下一個文件,減小了創建文件的開銷,提高mq的性能。所以會同時生成2個文件。
那么問題來了,為何本文開頭生成的文件怎么只有一個?
我們查看源碼提交記錄可知,nextNextFilePath第二個文件是2021年9月才新增的
查看rocketMq在github上各個版本的發布時間,2021年9月并沒有發布新版本,但是2021年10月發布了rocketmq-all-4.9.2
那么由此可得,rocketMq同時創建2個文件從版本4.9.2開始支持,之前的版本都只會創建1個文件
因為博主的broker服務是通過docker鏡像啟動的,但是查看鏡像版本顯示的確為最新版本
其實這只是rocketMq鏡像的版本,而我們看的是鏡像中使用rocketMq框架版本
執行命令查看鏡像的詳細信息
docker inspect apacherocketmq/rocketmq:latest
由此可得博主的rocketMq版本為:4.6.0,所以只會創建一個commitLog文件
加入異步刷盤隊列
//源碼位置
//包名:org.apache.rocketmq.store
//文件:AllocateMappedFileService
//行數:62
AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
//...
//加入隊列觸發異步刷盤操作
boolean offerOK = this.requestQueue.offer(nextReq);
//...
AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
//...
boolean offerOK = this.requestQueue.offer(nextNextReq);
- AllocateRequest:就是message異步存儲請求最后的封裝
- this.requestTable:也是一個map對象 ConcurrentMap<String, AllocateRequest> requestTable;key為文件的路徑,value則為AllocateRequest
- this.requestQueue:這是一個隊列PriorityBlockingQueue<AllocateRequest> requestQueue;隊列元素為AllocateRequest
PriorityBlockingQueue是如何做到異步刷盤的呢?
該隊列就是為broker實現異步存儲的核心,可能大家對這個隊列比較陌生
它是Java 中 java.util.concurrent 包提供的一個線程安全的優先級隊列。它基于優先級堆實現,能夠保證元素按照自然順序或者指定的比較器順序進行排序
因為它是一個隊列,那么我們首先就會想到生產者和消費者,那么就起到了異步解耦的作用
他有兩個非常重要的方法:
- offer(): 將一個元素插入到隊列中
- take(): 從隊列中獲取并移除元素 由于 PriorityBlockingQueue 是一個阻塞隊列,如果隊列為空,take 方法會一直阻塞直到有元素可用
總結:由上面我們知道offer()一般用于生產者調用,而take()則是消費者調用,當隊列為空時消費者線程會一直阻塞,只要隊列中存入對象,消費者就會感知到并消費。可以理解為消費者和生產者共享PriorityBlockingQueue對象
//源碼位置
//包名:org.apache.rocketmq.store
//文件:AllocateMappedFileService
//行數:99
AllocateRequest result = this.requestTable.get(nextFilePath);
//...
//阻塞等待刷盤結果
boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
上面源碼的作用就是等待異步刷盤結果
- 第一段就是取出之前存入的第一個請求對象AllocateRequest
- 第二段則是判斷異步刷盤是否完成,成功則返回,還沒有處理完則一直阻塞,直到達到超時時間waitTimeOut
result.getCountDownLatch().await為何能做到阻塞等待結果呢?
進入AllocateRequest對象中可知,操作的是這個對象CountDownLatch countDownLatch = new CountDownLatch(1)
CountDownLatch或許大家不太熟悉,但ReentrantLock大家并不陌生吧,面試中經常問到,他們同屬于java并發包JUC( java.util.concurrent )下的對象.
概念:它允許一個或多個線程等待,直到在其他線程中執行的一組操作完成。它是通過一個計數器實現的,該計數器初始化為一個給定的值。每當一個線程完成了它的一項操作后,這個計數器就遞減。當計數器的值到達零時,等待在這個計數器上的線程將被喚醒并繼續執行
總結:通過源碼我們看到AllocateRequest被創建時里面屬性CountDownLatch中的計數器默認就是1,所以需要一直等待被修改為0時才會繼續執行后續邏輯,那就是等待異步刷盤完成。
Message異步存儲MappedByteBuffer
//源碼位置
//包名:org.apache.rocketmq.store
//文件:AllocateMappedFileService
//行數:155
AllocateRequest req = null;
req = this.requestQueue.take();
AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
該源碼就是對之前加入隊列的AllocateRequest取出來,并執行后續的存儲操作,可以說就是消費者消費的地方,我們可以結合源碼上下文代碼可以知道,所在的類的頂級繼承類是Runnable,而上面代碼所在方法就是被重寫的run()方法調用,可以認為消費者是在單獨的一個線程中執行的。
獲取緩沖區
//源碼位置
//包名:org.apache.rocketmq.store.logfile
//文件:DefaultMappedFile
//行數:607
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
被操作的對象是MappedByteBuffer
MappedByteBuffer是什么?
是 Java NIO(New Input/Output)中的一個類,它允許將文件直接映射到內存中,從而提高文件的讀寫效率。RocketMQ 使用 MappedByteBuffer 來管理 CommitLog 文件,以實現高效的消息存儲和檢索。通過將文件映射到內存,RocketMQ 可以直接操作內存數據,而無需頻繁的磁盤 I/O 操作。
MappedByteBuffer也是mmap的一種實現方式
什么是mmap?
mmap(內存映射文件)是一種將文件內容映射到進程的地址空間的技術。這樣一來,文件內容就可以像訪問內存一樣被讀寫,從而顯著提高 I/O 操作的效率。
調用mappedByteBuffer.slice()方法的作用是什么?
用于創建一個新的緩沖區,該緩沖區與原始緩沖區共享相同的底層內存,但具有獨立的位置、限制和標記。這在需要操作內存映射文件的某一部分時非常有用,而不影響整個映射文件的其他部分。
MappedByteBuffer有兩大特點:
- 延遲寫入:數據寫入 MappedByteBuffer 時,實際上是寫入了內存中的映射區域,操作系統會在合適的時候將這些數據同步到磁盤,而不是立即進行磁盤 I/O 操作。
- 強制刷新:為了確保數據的一致性和持久性,MappedByteBuffer 提供了 force() 方法,可以將內存中的修改強制刷新到磁盤
//源碼位置
//包名:org.apache.rocketmq.store.logfile
//文件:DefaultMappedFile
//行數:611
byteBuffer.put((int) i, (byte) 0);
//...
mappedByteBuffer.force();
總結:那么在RocketMQ 中,MappedFile 類通過使用 MappedByteBuffer 來管理 CommitLog 文件,并且使用 slice() 方法來創建子緩沖區進行局部操作,通過延遲寫入減少了頻繁的磁盤 I/O 操作,定期調用 force() 方法,將內存中的數據同步到磁盤,減少數據丟失的風險。這樣可以提高性能和靈活性,特別是在處理大量消息時。
內存數據的刷盤過程本篇就不在深究,只要知道是通過MappedByteBuffer對延遲寫入配置相關策略,并在設定的時期將內存數據寫入磁盤文件中就可以了
基于上面所有內容重新修改一版簡易的流程圖如下
總結
本篇涉及到的知識面比較廣,在broker存儲message中出現了許多我們在日常開發中并不常見但功能強大的對象,比如PriorityBlockingQueue、CountDownLatch、MappedByteBuffer,RocketMq正是合理的運用了他們,從而造就了rocketMq本身這款優秀的消息隊列框架,這也是我們讀源碼所要學習的。下一篇我們將學習RocketMq的“大腦”NameServer!