RocketMQ-源碼架構二

梳理一些比較完整,比較復雜的業務線

消息持久化設計

RocketMQ的持久化文件結構

消息持久化也就是將內存中的消息寫入到本地磁盤的過程。而磁盤IO操作通常是一個很耗性能,很慢的操作,所以,對消息持久化機制的設計,是一個MQ產品提升性能的關鍵,甚至可以說是最為重要的核心也不為過。接下來梳理RocketMQ是如何在本地磁盤中保存消息的

RocketMQ消息直接采用磁盤文件保存消息,默認路徑在${user_home}/store目錄。這些存儲目錄可以在broker.conf中自行指定。

存儲文件主要分為三個部分:

  • CommitLog:存儲消息的元數據。所有消息都會順序存入到CommitLog文件當中。CommitLog由多個文件組成,每個文件固定大小1G。以第一條消息的偏移量為文件名。

  • ConsumerQueue:存儲消息在CommitLog的索引。一個MessageQueue一個文件,記錄當前MessageQueue被哪些消費者組消費到了哪一條CommitLog。

  • IndexFile:為消息查詢提供了一種通過key或時間區間來查詢消息的方法,這種通過IndexFile來查找消息的方法不影響發送與消費消息的主流程

另外,還有幾個輔助的存儲文件,主要記錄一些描述消息的元數據:

  • checkpoint:數據存盤檢查點。里面主要記錄commitlog文件、ConsumeQueue文件以及IndexFile文件最后一次刷盤的時間戳。

  • config/*.json:這些文件是將RocketMQ的一些關鍵配置信息進行存盤保存。例如Topic配置、消費者組配置、消費者組消息偏移量Offset 等等一些信息。

  • abort:這個文件是RocketMQ用來判斷程序是否正常關閉的一個標識文件。正常情況下,會在啟動時創建,而關閉服務時刪除。但是如果遇到一些服務器宕機,或者kill -9這樣一些非正常關閉服務的情況,這個abort文件就不會刪除,因此RocketMQ就可以判斷上一次服務是非正常關閉的,后續就會做一些數據恢復的操作。

整體的消息存儲結構,官方做了個圖進行描述:

Producer發過來的所有消息,不管是屬于哪個Topic,Broker都統一存在CommitLog文件當中,然后分別構建ConsumeQueue文件和IndexFile兩個索引文件,用來輔助消費者進行消息檢索。這種設計最直接的好處是可以較少查找目標文件的時間,讓消息以最快的速度落盤。對比Kafka存文件時,需要尋找消息所屬的Partition文件,再完成寫入。當Topic比較多時,這樣的Partition尋址就會浪費非常多的時間。所以Kafka不太適合多Topic的場景。而RocketMQ的這種快速落盤的方式,在多Topic的場景下,優勢就比較明顯了。

在文件形式上:CommitLog文件的大小是固定的。文件名就是當前CommitLog文件當中存儲的第一條消息的Offset。

ConsumeQueue文件主要是加速消費者進行消息索引。每個文件夾對應RocketMQ中的一個MessageQueue,文件夾下的文件記錄了每個MessageQueue中的消息在CommitLog文件當中的偏移量。這樣,消費者通過ConsumeQueue文件,就可以快速找到CommitLog文件中感興趣的消息記錄。而消費者在ConsumeQueue文件中的消費進度,會保存在config/consumerOffset.json文件當中。

IndexFile文件主要是輔助消費者進行消息索引。消費者進行消息消費時,通過ConsumeQueue文件就足夠完成消息檢索了,但是如果消費者指定時間戳進行消費,或者要按照MessageId或者MessageKey來檢索文件,比如RocketMQ管理控制臺的消息軌跡功能,ConsumeQueue文件就不夠用了。IndexFile文件就是用來輔助這類消息檢索的。他的文件名比較特殊,不是以消息偏移量命名,而是用的時間命名。但是其實,他也是一個固定大小的文件。

這是對RocketMQ存盤文件最基礎的了解,但是只有這樣的設計,是不足以支撐RocketMQ的三高性能的。RocketMQ如何保證ConsumeQueue、IndexFile兩個索引文件與CommitLog中的消息對齊?如何保證消息斷電不丟失?如何保證文件高效的寫入磁盤?等等。如果你想要去抓住RocketMQ這些三高問題的核心設計,那么還是需要到源碼當中去深究。

commitLog寫入

消息存儲的入口在: DefaultMessageStore.asyncPutMessage方法

CommitLog的asyncPutMessage方法中會給寫入線程加鎖,保證一次只會允許一個線程寫入。寫入消息的過程是串行的,一次只會允許一個線程寫入。

最終進入CommitLog中的DefaultAppendMessageCallback#doAppend方法,這里就是Broker寫入消息的實際入口。這個方法最終會把消息追加到MappedFile映射的一塊內存里,并沒有直接寫入磁盤。而是在隨后調用ComitLog#submitFlushRequest方法,提交刷盤申請。刷盤完成之后,內存中的文件才真正寫入到磁盤當中。

在提交刷盤申請之后,就會立即調用CommitLog#submitReplicaRequest方法,發起主從同步申請。

文件同步刷盤與異步刷盤

入口:CommitLog.submitFlushRequest

這里涉及到了對于同步刷盤與異步刷盤的不同處理機制。這里有很多極致提高性能的設計,對于我們理解和設計高并發應用場景有非常大的借鑒意義。

同步刷盤和異步刷盤是通過不同的FlushCommitLogService的子服務實現的。

//org.apache.rocketmq.store.CommitLog的構造方法
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {this.flushCommitLogService = new GroupCommitService();
} else {this.flushCommitLogService = new FlushRealTimeService();
}
?
this.commitLogService = new CommitRealTimeService();

同步刷盤采用的是GroupCommitService子線程。雖然是叫做同步刷盤,但是從源碼中能看到,他實際上并不是來一條消息就刷一次盤。而是這個子線程每10毫秒執行一次doCommit方法,掃描文件的緩存。只要緩存當中有消息,就執行一次Flush操作。

而異步刷盤采用的是FlushRealTimeService子線程。這個子線程最終也是執行Flush操作,只不過他的執行時機會根據配置進行靈活調整。所以可以看到,這里異步刷盤和同步刷盤的最本質區別,實際上是進行Flush操作的頻率不同。

我們經常說使用RocketMQ的同步刷盤,可以保證Broker斷電時,消息不會丟失。但是可以看到,RocketMQ并不可能真正來一條消息就進行一次刷盤,這樣在海量數據下,操作系統是承受不了的。而只要不是來一次消息刷一次盤,那么在Broker直接斷電的情況接下,就總是會有內存中的消息沒有刷入磁盤的情況,這就會造成消息丟失。所以,對于消息安全性的設計,其實是重在取舍,無法做到絕對。

同步刷盤和異步刷盤最終落地到FileChannel的force方法。這個force方法就會最終調用一次操作系統的fsync系統調用,完成文件寫入。

//org.apache.rocketmq.store.MappedFile#flush
public int flush(final int flushLeastPages) {if (this.isAbleToFlush(flushLeastPages)) {if (this.hold()) {int value = getReadPosition();
?try {//We only append data to fileChannel or mappedByteBuffer, never both.if (writeBuffer != null || this.fileChannel.position() != 0) {this.fileChannel.force(false);} else {this.mappedByteBuffer.force();}} catch (Throwable e) {log.error("Error occurred when force data to disk.", e);}
?this.flushedPosition.set(value);this.release();} else {log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());this.flushedPosition.set(getReadPosition());}}return this.getFlushedPosition();
}

另外一個CommitRealTimeService這個子線程則是用來寫入堆外內存的。應用可以通過配置TransientStorePoolEnable參數開啟堆外內存,如果開啟了堆外內存,會在啟動時申請一個跟CommitLog文件大小一致的堆外內存,這部分內存就可以確保不會被交換到虛擬內存中。而CommitRealTimeService處理消息的方式則只是調用mappedFileQueue的commit方法。這個方法只是往操作系統的PagedCache里寫入消息,并不主動進行刷盤操作。會由操作系統通過Dirty Page機制,在某一個時刻進行統一刷盤。例如我們在正常關閉操作系統時,經常會等待很長時間。這里面大部分的時間其實就是在做PageCache的刷盤。

//org.apache.rocketmq.store.MappedFileQueue
public boolean commit(final int commitLeastPages) {boolean result = true;MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);if (mappedFile != null) {int offset = mappedFile.commit(commitLeastPages);long where = mappedFile.getFileFromOffset() + offset;result = where == this.committedWhere;this.committedWhere = where;}
?return result;
}

在梳理同步刷盤與異步刷盤的具體實現時,可以看到一個小點,RocketMQ是如何讓兩個刷盤服務間隔執行的?RocketMQ提供了一個自己實現的CountDownLatch2工具類來提供線程阻塞功能,使用CAS驅動CountDownLatch2的countDown操作。每來一個消息就啟動一次CAS,成功后,調用一次countDown。而這個CountDonwLatch2在Java.util.concurrent.CountDownLatch的基礎上,實現了reset功能,這樣可以進行對象重用

CommigLog主從復制

入口:CommitLog.submitReplicaRequest

主從同步時,也體現到了RocketMQ對于性能的極致追求。最為明顯的,RocketMQ整體是基于Netty實現的網絡請求,而在主從復制這一塊,卻放棄了Netty框架,轉而使用更輕量級的Java的NIO來構建。

在主要的HAService中,會在啟動過程中啟動三個守護進程。

//HAService#start
public void start() throws Exception {this.acceptSocketService.beginAccept();this.acceptSocketService.start();this.groupTransferService.start();this.haClient.start();
}

這其中與Master相關的是acceptSocketService和groupTransferService。其中acceptSocketService主要負責維護Master與Slave之間的TCP連接。groupTransferService主要與主從同步復制有關。而slave相關的則是haClient。

至于其中關于主從的同步復制與異步復制的實現流程,還是比較復雜的,有興趣的同學可以深入去研究一下。

推薦一篇可供參考的博客 RocketMQ源碼分析之主從數據復制-CSDN博客

分發ConsumeQueue和IndexFile

當CommitLog寫入一條消息后,在DefaultMessageStore的start方法中,會啟動一個后臺線程reputMessageService。源碼就定義在DefaultMessageStore中。這個后臺線程每隔1毫秒就會去拉取CommitLog中最新更新的一批消息。如果發現CommitLog中有新的消息寫入,就會觸發一次doDispatch。

//org.apache.rocketmq.store.DefaultMessageStore中的ReputMessageService線程類
public void doDispatch(DispatchRequest req) {for (CommitLogDispatcher dispatcher : this.dispatcherList) {dispatcher.dispatch(req);}
}

dispatchList中包含兩個關鍵的實現類CommitLogDispatcherBuildConsumeQueue和CommitLogDispatcherBuildIndex。源碼就定義在DefaultMessageStore中。他們分別用來構建ConsumeQueue索引和IndexFile索引。

并且,如果服務異常宕機,會造成CommitLog和ConsumeQueue、IndexFile文件不一致,有消息寫入CommitLog后,沒有分發到索引文件,這樣消息就丟失了。DefaultMappedStore的load方法提供了恢復索引文件的方法,入口在load方法。

過期文件刪除機制

入口: DefaultMessageStore.addScheduleTask -> DefaultMessageStore.this.cleanFilesPeriodically()

在這個方法中會啟動兩個線程,cleanCommitLogService用來刪除過期的CommitLog文件,cleanConsumeQueueService用來刪除過期的ConsumeQueue和IndexFile文件。

在刪除CommitLog文件時,Broker會啟動后臺線程,每60秒,檢查CommitLog、ConsumeQueue文件。然后對超過72小時的數據進行刪除。也就是說,默認情況下, RocketMQ只會保存3天內的數據。這個時間可以通過fileReservedTime來配置。

觸發過期文件刪除時,有兩個檢查的緯度,一個是,是否到了觸發刪除的時間,也就是broker.conf里配置的deleteWhen屬性。另外還會檢查磁盤利用率,達到閾值也會觸發過期文件刪除。這個閾值默認是72%,可以在broker.conf文件當中定制。但是最大值為95,最小值為10。

然后在刪除ConsumeQueue和IndexFile文件時,會去檢查CommitLog當前的最小Offset,然后在刪除時進行對齊。

需要注意的是,RocketMQ在刪除過期CommitLog文件時,并不檢查消息是否被消費過。 所以如果有消息長期沒有被消費,是有可能直接被刪除掉,造成消息丟失的。

RocketMQ整個文件管理的核心入口在DefaultMessageStore的start方法中,整體流程總結如下:

文件索引結構

了解了大部分的文件寫入機制之后,最后我們來理解一下RocketMQ的索引構建方式。

1、CommitLog文件的大小是固定的,但是其中存儲的每個消息單元長度是不固定的,具體格式可以參考org.apache.rocketmq.store.CommitLog中計算消息長度的方法

protected static int calMsgLength(int sysFlag, int bodyLength, int topicLength, int propertiesLength) {int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;int storehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20;final int msgLen = 4 //TOTALSIZE+ 4 //MAGICCODE+ 4 //BODYCRC+ 4 //QUEUEID+ 4 //FLAG+ 8 //QUEUEOFFSET+ 8 //PHYSICALOFFSET+ 4 //SYSFLAG+ 8 //BORNTIMESTAMP+ bornhostLength //BORNHOST+ 8 //STORETIMESTAMP+ storehostAddressLength //STOREHOSTADDRESS+ 4 //RECONSUMETIMES+ 8 //Prepared Transaction Offset+ 4 + (bodyLength > 0 ? bodyLength : 0) //BODY+ 1 + topicLength //TOPIC+ 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength+ 0;return msgLen;
}

因為消息的記錄大小不固定,所以RocketMQ在每次存CommitLog文件時,都會去檢查當前CommitLog文件空間是否足夠,如果不夠的話,就重新創建一個CommitLog文件。文件名為當前消息的偏移量。

2、ConsumeQueue文件主要是加速消費者的消息索引。他的每個文件夾對應RocketMQ中的一個MessageQueue,文件夾下的文件記錄了每個MessageQueue中的消息在CommitLog文件當中的偏移量。這樣,消費者通過ComsumeQueue文件,就可以快速找到CommitLog文件中感興趣的消息記錄。而消費者在ConsumeQueue文件當中的消費進度,會保存在config/consumerOffset.json文件當中。

文件結構: 每個ConsumeQueue文件固定由30萬個固定大小20byte的數據塊組成,數據塊的內容包括:msgPhyOffset(8byte,消息在文件中的起始位置)+msgSize(4byte,消息在文件中占用的長度)+msgTagCode(8byte,消息的tag的Hash值)。

msgTag是和消息索引放在一起的,所以消費者根據Tag過濾消息的性能是非常高的。

在ConsumeQueue.java當中有一個常量CQ_STORE_UNIT_SIZE=20,這個常量就表示一個數據塊的大小。

private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,final long cqOffset) {
?if (offset + size <= this.maxPhysicOffset) {log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);return true;}
?this.byteBufferIndex.flip();//在ConsumeQueue.java當中構建一條ConsumeQueue索引的方法中,記錄一個單元塊的數據this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);this.byteBufferIndex.putLong(offset);this.byteBufferIndex.putInt(size);this.byteBufferIndex.putLong(tagsCode);
?final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;... ...
}

3、IndexFile文件主要是輔助消息檢索。他的作用主要是用來支持根據key和timestamp檢索消息。他的文件名比較特殊,不是以消息偏移量命名,而是用的時間命名。但是其實,他也是一個固定大小的文件。

文件結構: 他的文件結構由 indexHeader(固定40byte)+ slot(固定500W個,每個固定20byte) + index(最多500W*4個,每個固定20byte) 三個部分組成。

indexFile的詳細結構有大廠之前面試過,參考博文: RocketMQ之底層IndexFile存儲協議_rocketmq index_roykingw的博客-CSDN博客

延遲消息機制

關注重點

延遲消息是RocketMQ非常有特色的一個功能,其他MQ產品中,往往需要開發者使用一些特殊方法來變相實現延遲消息功能。而RocketMQ直接在產品中實現了這個功能,開發者只需要設定一個屬性就可以快速實現。

延遲消息的核心使用方法就是在Message中設定一個MessageDelayLevel參數,對應18個延遲級別。然后Broker中會創建一個默認的Schedule_Topic主題,這個主題下有18個隊列,對應18個延遲級別。消息發過來之后,會先把消息存入Schedule_Topic主題中對應的隊列。然后等延遲時間到了,再轉發到目標隊列,推送給消費者進行消費。

源碼重點

延遲消息的處理入口在scheduleMessageService這個組件中。 會在broker啟動時也一起加載。

1、消息寫入到系統內置的Topic中

代碼見CommitLog.putMessage方法。

在CommitLog寫入消息時,會判斷消息的延遲級別,然后修改Message的Topic和Queue,將消息轉儲到系統內部的Topic中,這樣消息就對消費者不可見了。而原始的目標信息,會作為消息的屬性,保存到消息當中。

//should be consistent with the old version
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// Delay Delivery// 延遲消息轉到系統Topicif (msg.getDelayTimeLevel() > 0) {if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}
?
?String topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
?// Backup real topic, queueIdMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));// 修改消息的Topic和Queue,轉儲到系統的Topic中msg.setTopic(topic);msg.setQueueId(queueId);}
}

2、消息轉儲到目標Topic

接下來就是需要過一點時間,再將消息轉回到Producer提交的Topic和Queue中,這樣就可以正常往消費者推送了。

這個轉儲的核心服務是scheduleMessageService,他也是Broker啟動過程中的一個功能組件。隨DefaultMessageStore組件一起構建。這個服務只在master節點上啟動,而在slave節點上會主動關閉這個服務。

//org.apache.rocketmq.store.DefaultMessageStore
@Override
public void handleScheduleMessageService(final BrokerRole brokerRole) {if (this.scheduleMessageService != null) {if (brokerRole == BrokerRole.SLAVE) {this.scheduleMessageService.shutdown();} else {this.scheduleMessageService.start();}}
}

由于RocketMQ的主從節點支持切換,所以就需要考慮這個服務的冪等性。在節點切換為slave時就要關閉服務,切換為master時就要啟動服務。并且,即便節點多次切換為master,服務也只啟動一次。所以在ScheduleMessageService的start方法中,就通過一個CAS操作來保證服務的啟動狀態。

if (started.compareAndSet(false, true)) {

這個CAS操作還保證了在后面,同一時間只有一個DeliverDelayedMessageTimerTask執行。這種方式,給整個延遲消息服務提供了一個基礎保證。

ScheduleMessageService會每隔1秒鐘執行一個executeOnTimeup任務,將消息從延遲隊列中寫入正常Topic中。 代碼見ScheduleMessageService中的DeliverDelayedMessageTimerTask.executeOnTimeup方法。

在executeOnTimeup方法中,就會去掃描SCHEDULE_TOPIC_XXXX這個Topic下的所有messageQueue,然后掃描這些MessageQueue對應的ConsumeQueue文件,找到沒有處理過的消息,計算他們的延遲時間。如果延遲時間沒有到,就等下一秒再重新掃描。如果延遲時間到了,就進行消息轉儲。將消息轉回到原來的目標Topic下。

整個延遲消息的實現方式:

ScheduleMessageService中掃描延遲消息的主要邏輯:

//ScheduleMessageService.DeliverDelayedMessageTimerTask#executeOnTimeup
public void executeOnTimeup() {//找到延遲隊列對應的ConsumeQueue文件ConsumeQueue cq =ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,delayLevel2QueueId(delayLevel));
?if (cq == null) {this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);return;}//通過計算,找到這一次掃描需要處理的的ConsumeQueue文件SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);... ...long nextOffset = this.offset;try {int i = 0;ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();//循環過濾ConsumeQueue文件當中的每一條消息索引for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {//解析每一條ConsumeQueue記錄long offsetPy = bufferCQ.getByteBuffer().getLong();int sizePy = bufferCQ.getByteBuffer().getInt();long tagsCode = bufferCQ.getByteBuffer().getLong();
?... ...//計算延遲時間long now = System.currentTimeMillis();long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);//延遲時間沒到就等下一次掃描long countdown = deliverTimestamp - now;if (countdown > 0) {this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);return;}
?... ...//時間到了就進行轉儲boolean deliverSuc;if (ScheduleMessageService.this.enableAsyncDeliver) {deliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy);} else {deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy);}
?if (!deliverSuc) {this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);return;}}//計算下一次掃描時的Offset起點nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);} catch (Exception e) {log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e);} finally {bufferCQ.release();}//部署下一次掃描任務this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
}

如果清楚了ConsumeQueue文件的結構,就可以很清晰的感受到RocketMQ其實就是在Broker端,像一個普通消費者一樣去進行消費,然后擴展出了延遲消息的整個擴展功能。而這,其實也是很多互聯網大廠對RocketMQ進行自定義功能擴展的很好的參考。

長輪詢機制

功能回顧

RocketMQ對消息消費者提供了Push推模式和Pull拉模式兩種消費模式。但是這兩種消費模式的本質其實都是Pull拉模式,Push模式可以認為是一種定時的Pull機制。但是這時有一個問題,當使用Push模式時,如果RocketMQ中沒有對應的數據,那難道一直進行空輪詢嗎?如果是這樣的話,那顯然會極大的浪費網絡帶寬以及服務器的性能,并且,當有新的消息進來時,RocketMQ也沒有辦法盡快通知客戶端,而只能等客戶端下一次來拉取消息了。針對這個問題,RocketMQ實現了一種長輪詢機制 long polling。

長輪詢機制簡單來說,就是當Broker接收到Consumer的Pull請求時,判斷如果沒有對應的消息,不用直接給Consumer響應(給響應也是個空的,沒意義),而是就將這個Pull請求給緩存起來。當Producer發送消息過來時,增加一個步驟去檢查是否有對應的已緩存的Pull請求,如果有,就及時將請求從緩存中拉取出來,并將消息通知給Consumer。

源碼重點

Consumer請求緩存,代碼入口PullMessageProcessor#processRequest方法

PullRequestHoldService服務會隨著BrokerController一起啟動。

生產者線:從DefaultMessageStore.doReput進入

整個流程以及源碼重點:

關于零拷貝與順序寫

刷盤機制保證消息不丟失

在操作系統層面,當應用程序寫入一個文件時,文件內容并不會直接寫入到硬件當中,而是會先寫入到操作系統中的一個緩存PageCache中。PageCache緩存以4K大小為單位,緩存文件的具體內容。這些寫入到PageCache中的文件,在應用程序看來,是已經完全落盤保存好了的,可以正常修改、復制等等。但是,本質上,PageCache依然是內存狀態,所以一斷電就會丟失。因此,需要將內存狀態的數據寫入到磁盤當中,這樣數據才能真正完成持久化,斷電也不會丟失。這個過程就稱為刷盤。

Java當中使用FileOutputStream類或者BufferedWriter類,進行write操作,就是寫入的Pagecache。

RocketMQ中通過fileChannel.commit方法寫入消息,也是寫入到Pagecache。

PageCache是源源不斷產生的,而Linux操作系統顯然不可能時時刻刻往硬盤寫文件。所以,操作系統只會在某些特定的時刻將PageCache寫入到磁盤。例如當我們正常關機時,就會完成PageCache刷盤。另外,在Linux中,對于有數據修改的PageCache,會標記為Dirty(臟頁)狀態。當Dirty Page的比例達到一定的閾值時,就會觸發一次刷盤操作。例如在Linux操作系統中,可以通過/proc/meminfo文件查看到Page Cache的狀態。

[root@192-168-65-174 ~]# cat /proc/meminfo 
MemTotal: ? ? ? 16266172 kB
.....
Cached: ? ? ? ? ? 923724 kB
.....
Dirty: ? ? ? ? ? ? ? ?32 kB
Writeback: ? ? ? ? ? ? 0 kB
.....
Mapped: ? ? ? ? ? 133032 kB
.....

但是,只要操作系統的刷盤操作不是時時刻刻執行的,那么對于用戶態的應用程序來說,那就避免不了非正常宕機時的數據丟失問題。因此,操作系統也提供了一個系統調用,應用程序可以自行調用這個系統調用,完成PageCache的強制刷盤。在Linux中是fsync,同樣我們可以用man 2 fsync 指令查看。

RocketMQ對于何時進行刷盤,也設計了兩種刷盤機制,同步刷盤和異步刷盤。只需要在broker.conf中進行配置就行。

零拷貝加速文件讀寫

零拷貝(zero-copy)是操作系統層面提供的一種加速文件讀寫的操作機制,非常多的開源軟件都在大量使用零拷貝,來提升IO操作的性能。對于Java應用層,對應著mmap和sendFile兩種方式。

理解CPU拷貝和DMA拷貝

操作系統對于內存空間,是分為用戶態和內核態的。用戶態的應用程序無法直接操作硬件,需要通過內核空間進行操作轉換,才能真正操作硬件。這其實是為了保護操作系統的安全。正因為如此,應用程序需要與網卡、磁盤等硬件進行數據交互時,就需要在用戶態和內核態之間來回的復制數據。而這些操作,原本都是需要由CPU來進行任務的分配、調度等管理步驟的,早先這些IO接口都是由CPU獨立負責,所以當發生大規模的數據讀寫操作時,CPU的占用率會非常高。

之后,操作系統為了避免CPU完全被各種IO調用給占用,引入了DMA(直接存儲器存儲)。由DMA來負責這些頻繁的IO操作。DMA是一套獨立的指令集,不會占用CPU的計算資源。這樣,CPU就不需要參與具體的數據復制的工作,只需要管理DMA的權限即可。

DMA拷貝極大的釋放了CPU的性能,因此他的拷貝速度會比CPU拷貝要快很多。但是,其實DMA拷貝本身,也在不斷優化。

引入DMA拷貝之后,在讀寫請求的過程中,CPU不再需要參與具體的工作,DMA可以獨立完成數據在系統內部的復制。但是,數據復制過程中,依然需要借助數據總進線。當系統內的IO操作過多時,還是會占用過多的數據總線,造成總線沖突,最終還是會影響數據讀寫性能。

為了避免DMA總線沖突對性能的影響,后來又引入了Channel通道的方式。Channel,是一個完全獨立的處理器,專門負責IO操作。既然是處理器,Channel就有自己的IO指令,與CPU無關,他也更適合大型的IO操作,性能更高。

這也解釋了,為什么Java應用層與零拷貝相關的操作都是通過Channel的子類實現的。這其實是借鑒了操作系統中的概念。

而所謂的零拷貝技術,其實并不是不拷貝,而是要盡量減少CPU拷貝。

再來理解下mmap文件映射機制是怎么回事

mmap機制的具體實現參見配套示例代碼。主要是通過java.nio.channels.FileChannel的map方法完成映射。

以一次文件的讀寫操作為例,應用程序對磁盤文件的讀與寫,都需要經過內核態與用戶態之間的狀態切換,每次狀態切換的過程中,就需要有大量的數據復制。

在這個過程中,總共需要進行四次數據拷貝。而磁盤與內核態之間的數據拷貝,在操作系統層面已經由CPU拷貝優化成了DMA拷貝。而內核態與用戶態之間的拷貝依然是CPU拷貝。所以,在這個場景下,零拷貝技術優化的重點,就是內核態與用戶態之間的這兩次拷貝。

而mmap文件映射的方式,就是在用戶態不再保存文件的內容,而只保存文件的映射,包括文件的內存起始地址,文件大小等。真實的數據,也不需要在用戶態留存,可以直接通過操作映射,在內核態完成數據復制。

這個拷貝過程都是在操作系統的系統調用層面完成的,在Java應用層,其實是無法直接觀測到的,但是我們可以去JDK源碼當中進行間接驗證。在JDK的NIO包中,java.nio.HeapByteBuffer映射的就是JVM的一塊堆內內存,在HeapByteBuffer中,會由一個byte數組來緩存數據內容,所有的讀寫操作也是先操作這個byte數組。這其實就是沒有使用零拷貝的普通文件讀寫機制。

HeapByteBuffer(int cap, int lim) { ? ? ? ? ? ?// package-privatesuper(-1, 0, lim, cap, new byte[cap], 0);/*hb = new byte[cap];offset = 0;*/
}

而NIO把包中的另一個實現類java.nio.DirectByteBuffer則映射的是一塊堆外內存。在DirectByteBuffer中,并沒有一個數據結構來保存數據內容,只保存了一個內存地址。所有對數據的讀寫操作,都通過unsafe魔法類直接交由內核完成,這其實就是mmap的讀寫機制。

最后,這種mmap的映射機制由于還是需要用戶態保存文件的映射信息,數據復制的過程也需要用戶態的參與,這其中的變數還是非常多的。所以,mmap機制適合操作小文件,如果文件太大,映射信息也會過大,容易造成很多問題。通常mmap機制建議的映射文件大小不要超過2G 。而RocketMQ的CommitLog文件保持在1G固定大小,也是為了方便文件映射。

梳理下sendFile機制是怎么運行的

sendFile機制的具體實現參見配套示例代碼。主要是通過java.nio.channels.FileChannel的transferTo方法完成

sourceReadChannel.transferTo(0,sourceFile.length(),targetWriteChannel);

早期的sendfile實現機制其實還是依靠CPU進行頁緩存與socket緩存區之間的數據拷貝。但是,在后期的不斷改進過程中,sendfile優化了實現機制,在拷貝過程中,并不直接拷貝文件的內容,而是只拷貝一個帶有文件位置和長度等信息的文件描述符FD,這樣就大大減少了需要傳遞的數據。而真實的數據內容,會交由DMA控制器,從頁緩存中打包異步發送到socket中。

最后,sendfile機制在內核態直接完成了數據的復制,不需要用戶態的參與,所以這種機制的傳輸效率是非常穩定的。sendfile機制非常適合大數據的復制轉移。

順序寫加速文件寫入磁盤

通常應用程序往磁盤寫文件時,由于磁盤空間不是連續的,會有很多碎片。所以我們去寫一個文件時,也就無法把一個文件寫在一塊連續的磁盤空間中,而需要在磁盤多個扇區之間進行大量的隨機寫。這個過程中有大量的尋址操作,會嚴重影響寫數據的性能。而順序寫機制是在磁盤中提前申請一塊連續的磁盤空間,每次寫數據時,就可以避免這些尋址操作,直接在之前寫入的地址后面接著寫就行。

Kafka官方詳細分析過順序寫的性能提升問題。Kafka官方曾說明,順序寫的性能基本能夠達到內存級別。而如果配備固態硬盤,順序寫的性能甚至有可能超過寫內存。而RocketMQ很大程度上借鑒了Kafka的這種思想。

例如可以看下org.apache.rocketmq.store.CommitLog#DefaultAppendMessageCallback中的doAppend方法。在這個方法中,會以追加的方式將消息先寫入到一個堆外內存byteBuffer中,然后再通過fileChannel寫入到磁盤。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/211304.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/211304.shtml
英文地址,請注明出處:http://en.pswp.cn/news/211304.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

華為機試真題 C++ 實現【字符串重新排列】

題目 給定一個字符串s&#xff0c;s包括以空格分隔的若干個單詞&#xff0c;請對s進行如下處理后輸出&#xff1a; 1、單詞內部調整&#xff1a;對每個單詞字母重新按字典序排序 2、單詞間順序調整&#xff1a; 1&#xff09;統計每個單詞出現的次數&#xff0c;并按次數降序…

蒙特霍爾問題(選擇三扇門后的車與羊)及其貝葉斯定理數學解釋

1. 蒙特霍爾問題 有一個美國電視游戲節目叫做“Let’s Make a Deal”&#xff0c;游戲中參賽者將面對3扇關閉的門&#xff0c;其中一扇門背后有一輛汽車&#xff0c;另外兩扇門后是山羊&#xff0c;參賽者如果能猜中哪一扇門后是汽車&#xff0c;就可以得到它。 通常&#xf…

筆記68:Pytorch中repeat函數的用法

repeat 相當于一個broadcasting的機制 repeat(*sizes) 沿著指定的維度重復tensor。不同與expand()&#xff0c;本函數復制的是tensor中的數據。 import torch import torch.nn.functional as F import numpy as np a torch.Tensor(128,1,512) B a.repeat(1,5,1) print(B.s…

OpenGL 著色器程序的保存和加載(二進制)

背景 為了提高OpenGL 著色器程序的編譯和鏈接速度&#xff0c;我們可以將程序保存為二進制進行加載&#xff0c;可以大幅度提升加載效率。 方法 以下是加載和保存二進制程序的方法。 // 加載著色器程序的二進制文件到已創建的著色器程序中 bool loadPragram(const std::str…

javaee實驗:文件上傳及攔截器的使用

目錄 文件上傳ModelAttribute注解實驗目的實驗內容實驗過程項目結構編寫代碼結果展示 文件上傳 Spring MVC 提供 MultipartFile 接口作為參數來處理文件上傳。 MultipartFile 提供以下方法來獲取上傳的文件信息&#xff1a; ? getOriginalFilename 獲取上傳的文件名字&#x…

華為OD機試真題-測試用例執行計劃-2023年OD統一考試(C卷)

題目描述: 某個產品當前迭代周期內有N個特性( F1,F2,.......FN)需要進行覆蓋測試,每個特性都被評估了對應的優先級,特性使用其ID作為下標進行標識。 設計了M個測試用例(T1,T2......,TM ),每個用例對應了一個覆蓋特性的集合,測試用例使用其ID作為下標進行標識,測試用例…

特權FPGA學習筆記

C/C/system C-----vivado HLS------------->RTL門電路&#xff0c;省去了HDL語言的中間轉換&#xff0c;可以看作是C向C#的演進&#xff0c;基于zynq面向以前使用C的開發人員&#xff0c;但是個人覺得&#xff0c;HDL存在且未被C取代&#xff0c;工具的著眼點就是面向底層調…

Spring Cloud 與微服務學習總結(19)—— Spring Cloud Alibaba 之 Nacos 2.3.0 史上最大更新版本發布

Nacos 一個用于構建云原生應用的動態服務發現、配置管理和服務管理平臺,由阿里巴巴開源,致力于發現、配置和管理微服務。說白了,Nacos 就是充當微服務中的的注冊中心和配置中心。 Nacos 2.3.0 新特性 1. 反脆弱插件 Nacos 2.2.0 版本開始加入反脆弱插件,從 2.3.0 版本開…

飛天使-linux操作的一些技巧與知識點2

TCP 的三次握手 第一次&#xff0c;客戶端與服務端建立鏈接&#xff0c;需要發送請求連接的消息 第二次&#xff0c;服務端接口到數據后&#xff0c;返回一個確認的操作*&#xff08;至此客戶端和服務端鏈路建立成功&#xff09; 第三次&#xff0c;服務端還需要發送要與客戶端…

【Linux】探索Linux進程狀態 | 僵尸進程 | 孤兒進程

最近&#xff0c;我發現了一個超級強大的人工智能學習網站。它以通俗易懂的方式呈現復雜的概念&#xff0c;而且內容風趣幽默。我覺得它對大家可能會有所幫助&#xff0c;所以我在此分享。點擊這里跳轉到網站。 目錄 一、進程狀態1.1運行狀態1.2阻塞狀態1.3掛起狀態 二、具體L…

React中使用react-json-view展示JSON數據

文章目錄 一、前言1.1、在線demo1.2、Github倉庫 二、實踐2.1、安裝react-json-view2.2、組件封裝2.3、效果2.4、參數詳解2.4.1、src(必須) &#xff1a;JSON Object2.4.2、name&#xff1a;string或false2.4.3、theme&#xff1a;string2.4.4、style&#xff1a;object2.4.5、…

[ROS2] --- service

1 service介紹 1.1 service概念 話題通信是基于訂閱/發布機制的&#xff0c;無論有沒有訂閱者&#xff0c;發布者都會周期發布數據&#xff0c;這種模式適合持續數據的收發&#xff0c;比如傳感器數據。機器人系統中還有另外一些配置性質的數據&#xff0c;并不需要周期處理&…

C#,圖算法——以鄰接節點表示的圖最短路徑的迪杰斯特拉(Dijkstra)算法C#程序

1 文本格式 using System; using System.Text; using System.Linq; using System.Collections; using System.Collections.Generic; namespace Legalsoft.Truffer.Algorithm { public class Node // : IComparable<Node> { private int vertex, weigh…

第7章-使用統計方法進行變量有效性測試-7.5.4-模型評估

目錄 混淆矩陣 準確率 定義 局限性 精準率 定義 局限性

【分布式微服務專題】從單體到分布式(一、SpringCloud項目初步升級)

目錄 前言閱讀對象閱讀導航前置知識筆記正文一、單體服務介紹二、服務拆分三、分布式微服務升級前的思考3.1 關于SpringBoot/SpringCloud的思考【有點門檻】 四、SpringCloud升級整合4.1 新建父子項目 學習總結感謝 前言 從本節課開始&#xff0c;我將自己手寫一個基于SpringC…

如何輕松恢復 Windows 中刪除的文件夾

我們都曾經歷過這樣的事&#xff0c;而且我們中的大多數人可能很快就會再次這樣做。我們討論的是在 Windows 中按“Delete”或“ShiftDelete”鍵意外刪除重要文件夾的情況。 如果您剛剛按下刪除鍵且未超過 30 天&#xff0c;或者尚未清空回收站&#xff0c;則可以恢復文件夾。…

操作系統學習筆記---內存管理

目錄 概念 功能 內存空間的分配和回收 地址轉換 邏輯地址&#xff08;相對地址&#xff09; 物理地址&#xff08;絕對地址&#xff09; 內存空間的擴充 內存共享 存儲保護 方式 源程序變為可執行程序步驟 鏈接方式 裝入方式 覆蓋 交換 連續分配管理方式 單一連…

python安裝與工具PyCharm

摘要&#xff1a; 周末閑來無事學習一下python&#xff01;不是你菜雞&#xff0c;只不過是對手太強了&#xff01;所以你要不斷努力&#xff0c;去追求更高的未來&#xff01;下面先了解python與環境的安裝與工具的配置&#xff01; python安裝&#xff1a; 官網 進入官網下載…

lua腳本串口收發與CRC16校驗及使用方法

lua腳本CRC16校驗 --calculate CRC16校驗 --data : t, data to be verified --n : number of verified --return : check result function add_crc16(start, n, data)local carry_flag, a 0local result 0xfffflocal i startwhile(true)doresult result ~ data[i]for j…

git 關于分支、merge、commit提交

最近開始用git終端提交代碼&#xff0c;梳理了一些知識點 一 關于分支 關于分支&#xff0c;git的分支分為本地分支遠程分支兩種分支&#xff0c;在上傳代碼時&#xff0c;我們要確保當前本地分支連接了一個遠程分支。 我們可以通過下面代碼查看當前的本地分支&#xff1a; g…