梳理一些比較完整,比較復雜的業務線
消息持久化設計
RocketMQ的持久化文件結構
消息持久化也就是將內存中的消息寫入到本地磁盤的過程。而磁盤IO操作通常是一個很耗性能,很慢的操作,所以,對消息持久化機制的設計,是一個MQ產品提升性能的關鍵,甚至可以說是最為重要的核心也不為過。接下來梳理RocketMQ是如何在本地磁盤中保存消息的
RocketMQ消息直接采用磁盤文件保存消息,默認路徑在${user_home}/store目錄。這些存儲目錄可以在broker.conf中自行指定。
存儲文件主要分為三個部分:
-
CommitLog:存儲消息的元數據。所有消息都會順序存入到CommitLog文件當中。CommitLog由多個文件組成,每個文件固定大小1G。以第一條消息的偏移量為文件名。
-
ConsumerQueue:存儲消息在CommitLog的索引。一個MessageQueue一個文件,記錄當前MessageQueue被哪些消費者組消費到了哪一條CommitLog。
-
IndexFile:為消息查詢提供了一種通過key或時間區間來查詢消息的方法,這種通過IndexFile來查找消息的方法不影響發送與消費消息的主流程
另外,還有幾個輔助的存儲文件,主要記錄一些描述消息的元數據:
-
checkpoint:數據存盤檢查點。里面主要記錄commitlog文件、ConsumeQueue文件以及IndexFile文件最后一次刷盤的時間戳。
-
config/*.json:這些文件是將RocketMQ的一些關鍵配置信息進行存盤保存。例如Topic配置、消費者組配置、消費者組消息偏移量Offset 等等一些信息。
-
abort:這個文件是RocketMQ用來判斷程序是否正常關閉的一個標識文件。正常情況下,會在啟動時創建,而關閉服務時刪除。但是如果遇到一些服務器宕機,或者kill -9這樣一些非正常關閉服務的情況,這個abort文件就不會刪除,因此RocketMQ就可以判斷上一次服務是非正常關閉的,后續就會做一些數據恢復的操作。
整體的消息存儲結構,官方做了個圖進行描述:
Producer發過來的所有消息,不管是屬于哪個Topic,Broker都統一存在CommitLog文件當中,然后分別構建ConsumeQueue文件和IndexFile兩個索引文件,用來輔助消費者進行消息檢索。這種設計最直接的好處是可以較少查找目標文件的時間,讓消息以最快的速度落盤。對比Kafka存文件時,需要尋找消息所屬的Partition文件,再完成寫入。當Topic比較多時,這樣的Partition尋址就會浪費非常多的時間。所以Kafka不太適合多Topic的場景。而RocketMQ的這種快速落盤的方式,在多Topic的場景下,優勢就比較明顯了。
在文件形式上:CommitLog文件的大小是固定的。文件名就是當前CommitLog文件當中存儲的第一條消息的Offset。
ConsumeQueue文件主要是加速消費者進行消息索引。每個文件夾對應RocketMQ中的一個MessageQueue,文件夾下的文件記錄了每個MessageQueue中的消息在CommitLog文件當中的偏移量。這樣,消費者通過ConsumeQueue文件,就可以快速找到CommitLog文件中感興趣的消息記錄。而消費者在ConsumeQueue文件中的消費進度,會保存在config/consumerOffset.json文件當中。
IndexFile文件主要是輔助消費者進行消息索引。消費者進行消息消費時,通過ConsumeQueue文件就足夠完成消息檢索了,但是如果消費者指定時間戳進行消費,或者要按照MessageId或者MessageKey來檢索文件,比如RocketMQ管理控制臺的消息軌跡功能,ConsumeQueue文件就不夠用了。IndexFile文件就是用來輔助這類消息檢索的。他的文件名比較特殊,不是以消息偏移量命名,而是用的時間命名。但是其實,他也是一個固定大小的文件。
這是對RocketMQ存盤文件最基礎的了解,但是只有這樣的設計,是不足以支撐RocketMQ的三高性能的。RocketMQ如何保證ConsumeQueue、IndexFile兩個索引文件與CommitLog中的消息對齊?如何保證消息斷電不丟失?如何保證文件高效的寫入磁盤?等等。如果你想要去抓住RocketMQ這些三高問題的核心設計,那么還是需要到源碼當中去深究。
commitLog寫入
消息存儲的入口在: DefaultMessageStore.asyncPutMessage方法
CommitLog的asyncPutMessage方法中會給寫入線程加鎖,保證一次只會允許一個線程寫入。寫入消息的過程是串行的,一次只會允許一個線程寫入。
最終進入CommitLog中的DefaultAppendMessageCallback#doAppend方法,這里就是Broker寫入消息的實際入口。這個方法最終會把消息追加到MappedFile映射的一塊內存里,并沒有直接寫入磁盤。而是在隨后調用ComitLog#submitFlushRequest方法,提交刷盤申請。刷盤完成之后,內存中的文件才真正寫入到磁盤當中。
在提交刷盤申請之后,就會立即調用CommitLog#submitReplicaRequest方法,發起主從同步申請。
文件同步刷盤與異步刷盤
入口:CommitLog.submitFlushRequest
這里涉及到了對于同步刷盤與異步刷盤的不同處理機制。這里有很多極致提高性能的設計,對于我們理解和設計高并發應用場景有非常大的借鑒意義。
同步刷盤和異步刷盤是通過不同的FlushCommitLogService的子服務實現的。
//org.apache.rocketmq.store.CommitLog的構造方法
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {this.flushCommitLogService = new GroupCommitService();
} else {this.flushCommitLogService = new FlushRealTimeService();
}
?
this.commitLogService = new CommitRealTimeService();
同步刷盤采用的是GroupCommitService子線程。雖然是叫做同步刷盤,但是從源碼中能看到,他實際上并不是來一條消息就刷一次盤。而是這個子線程每10毫秒執行一次doCommit方法,掃描文件的緩存。只要緩存當中有消息,就執行一次Flush操作。
而異步刷盤采用的是FlushRealTimeService子線程。這個子線程最終也是執行Flush操作,只不過他的執行時機會根據配置進行靈活調整。所以可以看到,這里異步刷盤和同步刷盤的最本質區別,實際上是進行Flush操作的頻率不同。
我們經常說使用RocketMQ的同步刷盤,可以保證Broker斷電時,消息不會丟失。但是可以看到,RocketMQ并不可能真正來一條消息就進行一次刷盤,這樣在海量數據下,操作系統是承受不了的。而只要不是來一次消息刷一次盤,那么在Broker直接斷電的情況接下,就總是會有內存中的消息沒有刷入磁盤的情況,這就會造成消息丟失。所以,對于消息安全性的設計,其實是重在取舍,無法做到絕對。
同步刷盤和異步刷盤最終落地到FileChannel的force方法。這個force方法就會最終調用一次操作系統的fsync系統調用,完成文件寫入。
//org.apache.rocketmq.store.MappedFile#flush
public int flush(final int flushLeastPages) {if (this.isAbleToFlush(flushLeastPages)) {if (this.hold()) {int value = getReadPosition();
?try {//We only append data to fileChannel or mappedByteBuffer, never both.if (writeBuffer != null || this.fileChannel.position() != 0) {this.fileChannel.force(false);} else {this.mappedByteBuffer.force();}} catch (Throwable e) {log.error("Error occurred when force data to disk.", e);}
?this.flushedPosition.set(value);this.release();} else {log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());this.flushedPosition.set(getReadPosition());}}return this.getFlushedPosition();
}
另外一個CommitRealTimeService這個子線程則是用來寫入堆外內存的。應用可以通過配置TransientStorePoolEnable參數開啟堆外內存,如果開啟了堆外內存,會在啟動時申請一個跟CommitLog文件大小一致的堆外內存,這部分內存就可以確保不會被交換到虛擬內存中。而CommitRealTimeService處理消息的方式則只是調用mappedFileQueue的commit方法。這個方法只是往操作系統的PagedCache里寫入消息,并不主動進行刷盤操作。會由操作系統通過Dirty Page機制,在某一個時刻進行統一刷盤。例如我們在正常關閉操作系統時,經常會等待很長時間。這里面大部分的時間其實就是在做PageCache的刷盤。
//org.apache.rocketmq.store.MappedFileQueue
public boolean commit(final int commitLeastPages) {boolean result = true;MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);if (mappedFile != null) {int offset = mappedFile.commit(commitLeastPages);long where = mappedFile.getFileFromOffset() + offset;result = where == this.committedWhere;this.committedWhere = where;}
?return result;
}
在梳理同步刷盤與異步刷盤的具體實現時,可以看到一個小點,RocketMQ是如何讓兩個刷盤服務間隔執行的?RocketMQ提供了一個自己實現的CountDownLatch2工具類來提供線程阻塞功能,使用CAS驅動CountDownLatch2的countDown操作。每來一個消息就啟動一次CAS,成功后,調用一次countDown。而這個CountDonwLatch2在Java.util.concurrent.CountDownLatch的基礎上,實現了reset功能,這樣可以進行對象重用。
CommigLog主從復制
入口:CommitLog.submitReplicaRequest
主從同步時,也體現到了RocketMQ對于性能的極致追求。最為明顯的,RocketMQ整體是基于Netty實現的網絡請求,而在主從復制這一塊,卻放棄了Netty框架,轉而使用更輕量級的Java的NIO來構建。
在主要的HAService中,會在啟動過程中啟動三個守護進程。
//HAService#start
public void start() throws Exception {this.acceptSocketService.beginAccept();this.acceptSocketService.start();this.groupTransferService.start();this.haClient.start();
}
這其中與Master相關的是acceptSocketService和groupTransferService。其中acceptSocketService主要負責維護Master與Slave之間的TCP連接。groupTransferService主要與主從同步復制有關。而slave相關的則是haClient。
至于其中關于主從的同步復制與異步復制的實現流程,還是比較復雜的,有興趣的同學可以深入去研究一下。
推薦一篇可供參考的博客 RocketMQ源碼分析之主從數據復制-CSDN博客
分發ConsumeQueue和IndexFile
當CommitLog寫入一條消息后,在DefaultMessageStore的start方法中,會啟動一個后臺線程reputMessageService。源碼就定義在DefaultMessageStore中。這個后臺線程每隔1毫秒就會去拉取CommitLog中最新更新的一批消息。如果發現CommitLog中有新的消息寫入,就會觸發一次doDispatch。
//org.apache.rocketmq.store.DefaultMessageStore中的ReputMessageService線程類
public void doDispatch(DispatchRequest req) {for (CommitLogDispatcher dispatcher : this.dispatcherList) {dispatcher.dispatch(req);}
}
dispatchList中包含兩個關鍵的實現類CommitLogDispatcherBuildConsumeQueue和CommitLogDispatcherBuildIndex。源碼就定義在DefaultMessageStore中。他們分別用來構建ConsumeQueue索引和IndexFile索引。
并且,如果服務異常宕機,會造成CommitLog和ConsumeQueue、IndexFile文件不一致,有消息寫入CommitLog后,沒有分發到索引文件,這樣消息就丟失了。DefaultMappedStore的load方法提供了恢復索引文件的方法,入口在load方法。
過期文件刪除機制
入口: DefaultMessageStore.addScheduleTask -> DefaultMessageStore.this.cleanFilesPeriodically()
在這個方法中會啟動兩個線程,cleanCommitLogService用來刪除過期的CommitLog文件,cleanConsumeQueueService用來刪除過期的ConsumeQueue和IndexFile文件。
在刪除CommitLog文件時,Broker會啟動后臺線程,每60秒,檢查CommitLog、ConsumeQueue文件。然后對超過72小時的數據進行刪除。也就是說,默認情況下, RocketMQ只會保存3天內的數據。這個時間可以通過fileReservedTime來配置。
觸發過期文件刪除時,有兩個檢查的緯度,一個是,是否到了觸發刪除的時間,也就是broker.conf里配置的deleteWhen屬性。另外還會檢查磁盤利用率,達到閾值也會觸發過期文件刪除。這個閾值默認是72%,可以在broker.conf文件當中定制。但是最大值為95,最小值為10。
然后在刪除ConsumeQueue和IndexFile文件時,會去檢查CommitLog當前的最小Offset,然后在刪除時進行對齊。
需要注意的是,RocketMQ在刪除過期CommitLog文件時,并不檢查消息是否被消費過。 所以如果有消息長期沒有被消費,是有可能直接被刪除掉,造成消息丟失的。
RocketMQ整個文件管理的核心入口在DefaultMessageStore的start方法中,整體流程總結如下:
文件索引結構
了解了大部分的文件寫入機制之后,最后我們來理解一下RocketMQ的索引構建方式。
1、CommitLog文件的大小是固定的,但是其中存儲的每個消息單元長度是不固定的,具體格式可以參考org.apache.rocketmq.store.CommitLog中計算消息長度的方法
protected static int calMsgLength(int sysFlag, int bodyLength, int topicLength, int propertiesLength) {int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;int storehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20;final int msgLen = 4 //TOTALSIZE+ 4 //MAGICCODE+ 4 //BODYCRC+ 4 //QUEUEID+ 4 //FLAG+ 8 //QUEUEOFFSET+ 8 //PHYSICALOFFSET+ 4 //SYSFLAG+ 8 //BORNTIMESTAMP+ bornhostLength //BORNHOST+ 8 //STORETIMESTAMP+ storehostAddressLength //STOREHOSTADDRESS+ 4 //RECONSUMETIMES+ 8 //Prepared Transaction Offset+ 4 + (bodyLength > 0 ? bodyLength : 0) //BODY+ 1 + topicLength //TOPIC+ 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength+ 0;return msgLen;
}
因為消息的記錄大小不固定,所以RocketMQ在每次存CommitLog文件時,都會去檢查當前CommitLog文件空間是否足夠,如果不夠的話,就重新創建一個CommitLog文件。文件名為當前消息的偏移量。
2、ConsumeQueue文件主要是加速消費者的消息索引。他的每個文件夾對應RocketMQ中的一個MessageQueue,文件夾下的文件記錄了每個MessageQueue中的消息在CommitLog文件當中的偏移量。這樣,消費者通過ComsumeQueue文件,就可以快速找到CommitLog文件中感興趣的消息記錄。而消費者在ConsumeQueue文件當中的消費進度,會保存在config/consumerOffset.json文件當中。
文件結構: 每個ConsumeQueue文件固定由30萬個固定大小20byte的數據塊組成,數據塊的內容包括:msgPhyOffset(8byte,消息在文件中的起始位置)+msgSize(4byte,消息在文件中占用的長度)+msgTagCode(8byte,消息的tag的Hash值)。
msgTag是和消息索引放在一起的,所以消費者根據Tag過濾消息的性能是非常高的。
在ConsumeQueue.java當中有一個常量CQ_STORE_UNIT_SIZE=20,這個常量就表示一個數據塊的大小。
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,final long cqOffset) {
?if (offset + size <= this.maxPhysicOffset) {log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);return true;}
?this.byteBufferIndex.flip();//在ConsumeQueue.java當中構建一條ConsumeQueue索引的方法中,記錄一個單元塊的數據this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);this.byteBufferIndex.putLong(offset);this.byteBufferIndex.putInt(size);this.byteBufferIndex.putLong(tagsCode);
?final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;... ...
}
3、IndexFile文件主要是輔助消息檢索。他的作用主要是用來支持根據key和timestamp檢索消息。他的文件名比較特殊,不是以消息偏移量命名,而是用的時間命名。但是其實,他也是一個固定大小的文件。
文件結構: 他的文件結構由 indexHeader(固定40byte)+ slot(固定500W個,每個固定20byte) + index(最多500W*4個,每個固定20byte) 三個部分組成。
indexFile的詳細結構有大廠之前面試過,參考博文: RocketMQ之底層IndexFile存儲協議_rocketmq index_roykingw的博客-CSDN博客
延遲消息機制
關注重點
延遲消息是RocketMQ非常有特色的一個功能,其他MQ產品中,往往需要開發者使用一些特殊方法來變相實現延遲消息功能。而RocketMQ直接在產品中實現了這個功能,開發者只需要設定一個屬性就可以快速實現。
延遲消息的核心使用方法就是在Message中設定一個MessageDelayLevel參數,對應18個延遲級別。然后Broker中會創建一個默認的Schedule_Topic主題,這個主題下有18個隊列,對應18個延遲級別。消息發過來之后,會先把消息存入Schedule_Topic主題中對應的隊列。然后等延遲時間到了,再轉發到目標隊列,推送給消費者進行消費。
源碼重點
延遲消息的處理入口在scheduleMessageService這個組件中。 會在broker啟動時也一起加載。
1、消息寫入到系統內置的Topic中
代碼見CommitLog.putMessage方法。
在CommitLog寫入消息時,會判斷消息的延遲級別,然后修改Message的Topic和Queue,將消息轉儲到系統內部的Topic中,這樣消息就對消費者不可見了。而原始的目標信息,會作為消息的屬性,保存到消息當中。
//should be consistent with the old version
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// Delay Delivery// 延遲消息轉到系統Topicif (msg.getDelayTimeLevel() > 0) {if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}
?
?String topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
?// Backup real topic, queueIdMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));// 修改消息的Topic和Queue,轉儲到系統的Topic中msg.setTopic(topic);msg.setQueueId(queueId);}
}
2、消息轉儲到目標Topic
接下來就是需要過一點時間,再將消息轉回到Producer提交的Topic和Queue中,這樣就可以正常往消費者推送了。
這個轉儲的核心服務是scheduleMessageService,他也是Broker啟動過程中的一個功能組件。隨DefaultMessageStore組件一起構建。這個服務只在master節點上啟動,而在slave節點上會主動關閉這個服務。
//org.apache.rocketmq.store.DefaultMessageStore
@Override
public void handleScheduleMessageService(final BrokerRole brokerRole) {if (this.scheduleMessageService != null) {if (brokerRole == BrokerRole.SLAVE) {this.scheduleMessageService.shutdown();} else {this.scheduleMessageService.start();}}
}
由于RocketMQ的主從節點支持切換,所以就需要考慮這個服務的冪等性。在節點切換為slave時就要關閉服務,切換為master時就要啟動服務。并且,即便節點多次切換為master,服務也只啟動一次。所以在ScheduleMessageService的start方法中,就通過一個CAS操作來保證服務的啟動狀態。
if (started.compareAndSet(false, true)) {
這個CAS操作還保證了在后面,同一時間只有一個DeliverDelayedMessageTimerTask執行。這種方式,給整個延遲消息服務提供了一個基礎保證。
ScheduleMessageService會每隔1秒鐘執行一個executeOnTimeup任務,將消息從延遲隊列中寫入正常Topic中。 代碼見ScheduleMessageService中的DeliverDelayedMessageTimerTask.executeOnTimeup方法。
在executeOnTimeup方法中,就會去掃描SCHEDULE_TOPIC_XXXX這個Topic下的所有messageQueue,然后掃描這些MessageQueue對應的ConsumeQueue文件,找到沒有處理過的消息,計算他們的延遲時間。如果延遲時間沒有到,就等下一秒再重新掃描。如果延遲時間到了,就進行消息轉儲。將消息轉回到原來的目標Topic下。
整個延遲消息的實現方式:
ScheduleMessageService中掃描延遲消息的主要邏輯:
//ScheduleMessageService.DeliverDelayedMessageTimerTask#executeOnTimeup
public void executeOnTimeup() {//找到延遲隊列對應的ConsumeQueue文件ConsumeQueue cq =ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,delayLevel2QueueId(delayLevel));
?if (cq == null) {this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);return;}//通過計算,找到這一次掃描需要處理的的ConsumeQueue文件SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);... ...long nextOffset = this.offset;try {int i = 0;ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();//循環過濾ConsumeQueue文件當中的每一條消息索引for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {//解析每一條ConsumeQueue記錄long offsetPy = bufferCQ.getByteBuffer().getLong();int sizePy = bufferCQ.getByteBuffer().getInt();long tagsCode = bufferCQ.getByteBuffer().getLong();
?... ...//計算延遲時間long now = System.currentTimeMillis();long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);//延遲時間沒到就等下一次掃描long countdown = deliverTimestamp - now;if (countdown > 0) {this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);return;}
?... ...//時間到了就進行轉儲boolean deliverSuc;if (ScheduleMessageService.this.enableAsyncDeliver) {deliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy);} else {deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy);}
?if (!deliverSuc) {this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);return;}}//計算下一次掃描時的Offset起點nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);} catch (Exception e) {log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e);} finally {bufferCQ.release();}//部署下一次掃描任務this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
}
如果清楚了ConsumeQueue文件的結構,就可以很清晰的感受到RocketMQ其實就是在Broker端,像一個普通消費者一樣去進行消費,然后擴展出了延遲消息的整個擴展功能。而這,其實也是很多互聯網大廠對RocketMQ進行自定義功能擴展的很好的參考。
長輪詢機制
功能回顧
RocketMQ對消息消費者提供了Push推模式和Pull拉模式兩種消費模式。但是這兩種消費模式的本質其實都是Pull拉模式,Push模式可以認為是一種定時的Pull機制。但是這時有一個問題,當使用Push模式時,如果RocketMQ中沒有對應的數據,那難道一直進行空輪詢嗎?如果是這樣的話,那顯然會極大的浪費網絡帶寬以及服務器的性能,并且,當有新的消息進來時,RocketMQ也沒有辦法盡快通知客戶端,而只能等客戶端下一次來拉取消息了。針對這個問題,RocketMQ實現了一種長輪詢機制 long polling。
長輪詢機制簡單來說,就是當Broker接收到Consumer的Pull請求時,判斷如果沒有對應的消息,不用直接給Consumer響應(給響應也是個空的,沒意義),而是就將這個Pull請求給緩存起來。當Producer發送消息過來時,增加一個步驟去檢查是否有對應的已緩存的Pull請求,如果有,就及時將請求從緩存中拉取出來,并將消息通知給Consumer。
源碼重點
Consumer請求緩存,代碼入口PullMessageProcessor#processRequest方法
PullRequestHoldService服務會隨著BrokerController一起啟動。
生產者線:從DefaultMessageStore.doReput進入
整個流程以及源碼重點:
關于零拷貝與順序寫
刷盤機制保證消息不丟失
在操作系統層面,當應用程序寫入一個文件時,文件內容并不會直接寫入到硬件當中,而是會先寫入到操作系統中的一個緩存PageCache中。PageCache緩存以4K大小為單位,緩存文件的具體內容。這些寫入到PageCache中的文件,在應用程序看來,是已經完全落盤保存好了的,可以正常修改、復制等等。但是,本質上,PageCache依然是內存狀態,所以一斷電就會丟失。因此,需要將內存狀態的數據寫入到磁盤當中,這樣數據才能真正完成持久化,斷電也不會丟失。這個過程就稱為刷盤。
Java當中使用FileOutputStream類或者BufferedWriter類,進行write操作,就是寫入的Pagecache。
RocketMQ中通過fileChannel.commit方法寫入消息,也是寫入到Pagecache。
PageCache是源源不斷產生的,而Linux操作系統顯然不可能時時刻刻往硬盤寫文件。所以,操作系統只會在某些特定的時刻將PageCache寫入到磁盤。例如當我們正常關機時,就會完成PageCache刷盤。另外,在Linux中,對于有數據修改的PageCache,會標記為Dirty(臟頁)狀態。當Dirty Page的比例達到一定的閾值時,就會觸發一次刷盤操作。例如在Linux操作系統中,可以通過/proc/meminfo文件查看到Page Cache的狀態。
[root@192-168-65-174 ~]# cat /proc/meminfo
MemTotal: ? ? ? 16266172 kB
.....
Cached: ? ? ? ? ? 923724 kB
.....
Dirty: ? ? ? ? ? ? ? ?32 kB
Writeback: ? ? ? ? ? ? 0 kB
.....
Mapped: ? ? ? ? ? 133032 kB
.....
但是,只要操作系統的刷盤操作不是時時刻刻執行的,那么對于用戶態的應用程序來說,那就避免不了非正常宕機時的數據丟失問題。因此,操作系統也提供了一個系統調用,應用程序可以自行調用這個系統調用,完成PageCache的強制刷盤。在Linux中是fsync,同樣我們可以用man 2 fsync 指令查看。
RocketMQ對于何時進行刷盤,也設計了兩種刷盤機制,同步刷盤和異步刷盤。只需要在broker.conf中進行配置就行。
零拷貝加速文件讀寫
零拷貝(zero-copy)是操作系統層面提供的一種加速文件讀寫的操作機制,非常多的開源軟件都在大量使用零拷貝,來提升IO操作的性能。對于Java應用層,對應著mmap和sendFile兩種方式。
理解CPU拷貝和DMA拷貝
操作系統對于內存空間,是分為用戶態和內核態的。用戶態的應用程序無法直接操作硬件,需要通過內核空間進行操作轉換,才能真正操作硬件。這其實是為了保護操作系統的安全。正因為如此,應用程序需要與網卡、磁盤等硬件進行數據交互時,就需要在用戶態和內核態之間來回的復制數據。而這些操作,原本都是需要由CPU來進行任務的分配、調度等管理步驟的,早先這些IO接口都是由CPU獨立負責,所以當發生大規模的數據讀寫操作時,CPU的占用率會非常高。
之后,操作系統為了避免CPU完全被各種IO調用給占用,引入了DMA(直接存儲器存儲)。由DMA來負責這些頻繁的IO操作。DMA是一套獨立的指令集,不會占用CPU的計算資源。這樣,CPU就不需要參與具體的數據復制的工作,只需要管理DMA的權限即可。
DMA拷貝極大的釋放了CPU的性能,因此他的拷貝速度會比CPU拷貝要快很多。但是,其實DMA拷貝本身,也在不斷優化。
引入DMA拷貝之后,在讀寫請求的過程中,CPU不再需要參與具體的工作,DMA可以獨立完成數據在系統內部的復制。但是,數據復制過程中,依然需要借助數據總進線。當系統內的IO操作過多時,還是會占用過多的數據總線,造成總線沖突,最終還是會影響數據讀寫性能。
為了避免DMA總線沖突對性能的影響,后來又引入了Channel通道的方式。Channel,是一個完全獨立的處理器,專門負責IO操作。既然是處理器,Channel就有自己的IO指令,與CPU無關,他也更適合大型的IO操作,性能更高。
這也解釋了,為什么Java應用層與零拷貝相關的操作都是通過Channel的子類實現的。這其實是借鑒了操作系統中的概念。
而所謂的零拷貝技術,其實并不是不拷貝,而是要盡量減少CPU拷貝。
再來理解下mmap文件映射機制是怎么回事
mmap機制的具體實現參見配套示例代碼。主要是通過java.nio.channels.FileChannel的map方法完成映射。
以一次文件的讀寫操作為例,應用程序對磁盤文件的讀與寫,都需要經過內核態與用戶態之間的狀態切換,每次狀態切換的過程中,就需要有大量的數據復制。
在這個過程中,總共需要進行四次數據拷貝。而磁盤與內核態之間的數據拷貝,在操作系統層面已經由CPU拷貝優化成了DMA拷貝。而內核態與用戶態之間的拷貝依然是CPU拷貝。所以,在這個場景下,零拷貝技術優化的重點,就是內核態與用戶態之間的這兩次拷貝。
而mmap文件映射的方式,就是在用戶態不再保存文件的內容,而只保存文件的映射,包括文件的內存起始地址,文件大小等。真實的數據,也不需要在用戶態留存,可以直接通過操作映射,在內核態完成數據復制。
這個拷貝過程都是在操作系統的系統調用層面完成的,在Java應用層,其實是無法直接觀測到的,但是我們可以去JDK源碼當中進行間接驗證。在JDK的NIO包中,java.nio.HeapByteBuffer映射的就是JVM的一塊堆內內存,在HeapByteBuffer中,會由一個byte數組來緩存數據內容,所有的讀寫操作也是先操作這個byte數組。這其實就是沒有使用零拷貝的普通文件讀寫機制。
HeapByteBuffer(int cap, int lim) { ? ? ? ? ? ?// package-privatesuper(-1, 0, lim, cap, new byte[cap], 0);/*hb = new byte[cap];offset = 0;*/
}
而NIO把包中的另一個實現類java.nio.DirectByteBuffer則映射的是一塊堆外內存。在DirectByteBuffer中,并沒有一個數據結構來保存數據內容,只保存了一個內存地址。所有對數據的讀寫操作,都通過unsafe魔法類直接交由內核完成,這其實就是mmap的讀寫機制。
最后,這種mmap的映射機制由于還是需要用戶態保存文件的映射信息,數據復制的過程也需要用戶態的參與,這其中的變數還是非常多的。所以,mmap機制適合操作小文件,如果文件太大,映射信息也會過大,容易造成很多問題。通常mmap機制建議的映射文件大小不要超過2G 。而RocketMQ的CommitLog文件保持在1G固定大小,也是為了方便文件映射。
梳理下sendFile機制是怎么運行的
sendFile機制的具體實現參見配套示例代碼。主要是通過java.nio.channels.FileChannel的transferTo方法完成
sourceReadChannel.transferTo(0,sourceFile.length(),targetWriteChannel);
早期的sendfile實現機制其實還是依靠CPU進行頁緩存與socket緩存區之間的數據拷貝。但是,在后期的不斷改進過程中,sendfile優化了實現機制,在拷貝過程中,并不直接拷貝文件的內容,而是只拷貝一個帶有文件位置和長度等信息的文件描述符FD,這樣就大大減少了需要傳遞的數據。而真實的數據內容,會交由DMA控制器,從頁緩存中打包異步發送到socket中。
最后,sendfile機制在內核態直接完成了數據的復制,不需要用戶態的參與,所以這種機制的傳輸效率是非常穩定的。sendfile機制非常適合大數據的復制轉移。
順序寫加速文件寫入磁盤
通常應用程序往磁盤寫文件時,由于磁盤空間不是連續的,會有很多碎片。所以我們去寫一個文件時,也就無法把一個文件寫在一塊連續的磁盤空間中,而需要在磁盤多個扇區之間進行大量的隨機寫。這個過程中有大量的尋址操作,會嚴重影響寫數據的性能。而順序寫機制是在磁盤中提前申請一塊連續的磁盤空間,每次寫數據時,就可以避免這些尋址操作,直接在之前寫入的地址后面接著寫就行。
Kafka官方詳細分析過順序寫的性能提升問題。Kafka官方曾說明,順序寫的性能基本能夠達到內存級別。而如果配備固態硬盤,順序寫的性能甚至有可能超過寫內存。而RocketMQ很大程度上借鑒了Kafka的這種思想。
例如可以看下org.apache.rocketmq.store.CommitLog#DefaultAppendMessageCallback中的doAppend方法。在這個方法中,會以追加的方式將消息先寫入到一個堆外內存byteBuffer中,然后再通過fileChannel寫入到磁盤。