RocketMq系列整體欄目
內容 | 鏈接地址 |
---|---|
【一】RocketMq安裝和基本概念 | https://zhenghuisheng.blog.csdn.net/article/details/134486709 |
【二】RocketMq的架構解析和高性能設計/font> | https://zhenghuisheng.blog.csdn.net/article/details/134559514 |
RocketMq的架構解析和高性能設計
- 一,RocketMq的架構解析和高性能設計
- 2,rocketmq底層原理
- 2.1,事務的底層實現
- 2.2,如何保證消息不丟失
- 2.3,rocketmq積壓問題
- 2.4,如何保證順序消費
- 2.5,rocketmq的持久化
- 2.6,死信隊列
- 2.7,消息的冪等性
- 3,rocketmq高性能的設計
- 3.1,零拷貝技術
- 3.2,順序寫技術
- 3.3,刷盤機制
一,RocketMq的架構解析和高性能設計
在rocketMq中,其整體架構如下,在RocketMqServer中,主要有NameServer,Broker,MessageQueue,Message等組件,并且存在Topic這種邏輯組件,表示一種主題
NameServer是topic的注冊中心,NameServer會和topic建立長連接,將broker的信息通過topic注冊到NameServer中,然后生產者和消費者都會先通過這個NameServer獲取相關信息,再和對應的broker建立長連接。
在微服務中,有Nacos,zookeeper等作為注冊中心:
但是zk很明顯不適合作為這種高可用的注冊的這中心,因為內部可能會因為選舉出現腦裂問題,并且因為這個問題可能會導致整個服務出現一定時間的不可用的問題,而rocketmq主要就是高吞吐量,低延遲的特性,因此不可能去選擇zk作為注冊中心的;
而nacos和eureka也不適合作為rocketmq的注冊中心,如nacos中會記錄很多信息,如心跳信息,端口,host等信息,而Nameserver中只需要記錄這個Broker的信息,如果使用nacos來做的話,有點大材小用了。并且如果引用nacos,還要考慮版本沖突這些,做一些適配器等,相對來說是更加復雜的
在topic中的Consumer配置中,每個topic都會對應一個或者多個消費者組,topic主題和消費者組是多對多的關系,一個consumer消費者組,代表的是一組邏輯相同的消費者,一個message消息,只能被消費者組中的一個消費者消費,這個和kafka中的消息消費是一樣的
上面提到了消費者組的概念,在生產者中,也有生產者組。在事務機制中,當生產者給broker發送數據之后,broker需要給生產者一個數據回調,那么就需要指定生產者名字,那么此時生產者組就能發揮其作用
生產者producer在本地會有一個緩存存儲Nameserver中存儲的broker,在往broker投遞之前,會向注冊中心中發起一個請求判斷是否需要拉取最新的配置,然后再往對應的broker發送數據
2,rocketmq底層原理
2.1,事務的底層實現
rocketmq的事務實現,相當于一個簡單的分布式事務,主要是保證生產者本地事務和發送到broker事務的原子性。而broker到consumer端是一定可以保證消息消費成功的,如果一個消費者失敗,那么可以往別的消費者里面推送,如果最終依舊失敗,那么可以先重試,最后加入到死信隊列里面
事務消息的底層實現如下圖,首先生產者會發送一個half消息給Broker,Broker在接收到這個half消息之后,就會向broker返回一個確認的標志,然后事務的發送者就會執行本地事務,通過這個execute去執行本地事務。如果本地事務執行成功,那么生產者會返回一個提在交的狀態給Broker,隨后Broker將消息投遞到消費者中;如果是回滾狀態,那么消息會直接丟掉;如果是在4的時候,本地事務需要的時間過長,那么本地會先返回一個unknow的未知狀態,然后broker會等一段時間,隨后再回生產者中定時回查,消息生產者會去檢查事務,默認是回查15次,如果是15次之后檢查還是沒有完成,那么消息就會直接丟棄掉
half消息有點類似于建立tcp連接,主要是做為一種嗅探機制,判斷當前broker服務是否正常,如果broker服務掛了,那么連本地事務,也可以直接不執行了。
如一個訂單場景,30s檢查一次是否支付,那么就可以直接通過這種事務去實現,通過execute方法去執行本地事務,然后通過這個check的方式去銀行進行對賬。如果最終超時,那么最終將消息放入到死信隊列中,在私信隊列中寫對應的邏輯,如將庫存加回等。
2.2,如何保證消息不丟失
在mq中,消息丟失主要有四個地方,分別是生產者到broker、broker到消費者,broker的master到slave以及操作系統自身的緩存。
- 生產者到broker的解決方案可以如下:可以選擇最簡單的同步+多次試錯的方式,或者可以直接選擇事務消息
- broker到消費者之間:消費者本身具有重試功能,消費者不應答就會往別的消費者投遞
- 操作系統主要是因為數據在緩存,如果出現斷電而未來得及刷盤導致,因此應該采用同步刷盤解決
- broker到的master到slave之間:也可以采用同步的方式,來一條消息就往slave寫入,或者通過Dledger集群
操作系統和主從之間保證消息不丟失,主要是通過同步的方式解決,但是在保證安全的情況下,會在一定的程度上影響吞吐量和性能
2.3,rocketmq積壓問題
在rocketmq中,其處理數據積壓問題時比其他mq的能力強的,如果出現積壓,那么可以直接通過控制臺上面的topic,通過內部的代理者位點和消費者位點所產生的差值查看,如果差值為0,則表示有消息積壓未處理。
在rocketmq內部,一個MessageQueue隊列的消息只能由一個消費者組中的一個消費者去消費,其底層實現和kafka是一樣的,因此如果出現消息積壓,那么首先可以查看消費者組中的消費者個數和隊列的個數是否相同,如果消費者個數小于隊列的個數,那么可以增加消費者個數,直到和隊列的個數一致,如默認隊列的個數為4,那么將消費者組中的消費者個數設置成4
當然,消費者個數調大是沒有用的,因為最大只能和topic中的隊列一致,那么就可以通過重寫一個topic,調大topic中隊列的數量,如原來的隊列個數只有4,那么可以創建一個新的topic,設置隊列的個數為8,并且原來的消費者對消息不消費,而是做一個轉發功能,將4個隊列的topic的數據轉發到8個隊列的topic中,那么在消費者組中,其個數就可以設置成8,那么這樣子就很好的處理消息積壓的問題了。
數據的搬運可以在具體的消費者代碼里面去編寫,主要功能有接收四個topic隊列的數據,然后轉發到八個topic的隊列中,最后再寫一個消費者去消費八個隊列topic的消息
2.4,如何保證順序消費
這里的順序消息只能保證局部有序,而不是全局有序。在rocketmq內部,在生產者端,消息會根據id做一個取模運算,會將同一個區取模運算的值放入一個隊列里面,在消費者端,會鎖定隊列消費,就是會先消費完一個隊列再消費下一個隊列,從而保證單個隊列消費的有序性
2.5,rocketmq的持久化
rocketmq為了保證消息的安全性,在broker內部都會做一個持久化的操作,首先當生產者將消息發送到broker之后,會現將消息存儲到 coimmit 文件中,每個topic都會有對應的commit文件,每個文件大小為1g,如果消息滿了則會創建新的文件,文件的格式為二進制格式。
在消費者中,會有一個 comsumeQueue 文件,改文件不存數據,只存索引信息,如存一些偏移量等,在消費時可以更快的定位到commit文件中的數據,隨后去消費里面的數據,并且可以通過Tag標簽去過濾消息
除了上面兩個文件之外,還有維護一個index文件,內部會記錄Commit日志的偏移量等
2.6,死信隊列
當broker和consumer之間重試16次之后,消息依舊沒能被消費,那么消息就會加入到死信隊列中。一個私信隊列會對應一個消費者組,其perm對應的權限值為2。死信隊列的消息默認不會被消費,而是需要開發者自身去處理該隊列中的數據。
并且私信隊列中消息的有效期也是三天,可以在broker.conf配置文件設置,當超過這個時間,消息都會被刪除。
2.7,消息的冪等性
在rocketmq中,消息的冪等性為 at least once 至少被消費一次。官方建議使用里面的key去做冪等性,key是一個唯一值,就是一個唯一id。除了這些方式之外,在分布式場景下,也可以開率分布式鎖這些做冪等。
3,rocketmq高性能的設計
3.1,零拷貝技術
零拷貝是操作系統層面的一種加速文件讀寫的操作機制,可以通過這種零拷貝的形式提升IO操作的性能。在java中,主要是通過這種 fileChannel 的方式實現零拷貝,其具體實現由 mmap和sendFile 兩種形式
以一個文件的拷貝為例,正常來說,需要從用戶態切換到內核態,然后再去執行io操作,并且需要通過cpu的調度,從磁盤中將文件加載到內存,再加載到網卡。而在引入零拷貝技術之后,可以讓channel代替cpu去做io操作,cpu只需要給channel對應的權限即可。在操作系統層面,就是利用這種DMA技術,將原來四次的cpu拷貝,變成了兩次,從而提高整體性能。
3.2,順序寫技術
本人在寫過一個順序io和隨機io的文章:https://zhenghuisheng.blog.csdn.net/article/details/129080088 ,順序寫可以減少磁頭的移動去尋址,不管是插入數據還是查詢數據,都可以提升其性能,并且可以減少磁盤的碎片。
3.3,刷盤機制
rocketmq為了保證數據的安全性,在broker中會持久化到commitlog中,在刷盤時有兩種方式,分別是:同步刷盤和異步刷盤 ,默認采用的刷盤機制時異步刷盤
flushDiskType=ASYNC_FLUSH