本文嘗試從Apache RocketMQ的簡介、主要組件及其作用、3種部署模式、Controller集群模式工作流程、最佳實踐等方面對其進行詳細分析。希望對您有所幫助!
一、Apache RocketMQ 簡介
Apache RocketMQ 是一個開源的分布式消息中間件,由阿里巴巴集團開發并貢獻給 Apache 軟件基金會。它旨在提供高吞吐量、低延遲和高可靠性的消息傳遞和流處理服務。廣泛應用于金融、互聯網、物聯網等領域,支持多種應用場景。
核心特性
1. 高性能
- 高吞吐量:支持每秒處理數百萬條消息,適用于高并發的業務場景。
- 低延遲:具備低于毫秒級的延遲,確保消息能夠快速傳遞。
2. 高可靠性
- 數據一致性:通過消息確認和重新傳遞機制,確保消息的可靠傳遞。
- 消息持久化:支持消息持久化,保證在系統故障時消息不丟失。
3. 可擴展性
- 水平擴展:支持通過增加節點的方式水平擴展,滿足業務增長需求。
- 彈性伸縮:根據業務負載動態調整資源,提升資源利用效率。
4. 靈活性
- 多種消息模型:支持點對點、發布/訂閱、流處理等多種消息傳遞模型,滿足不同應用需求。
- 豐富的接口:提供Java、C++、Python等多種語言的客戶端接口,便于集成和使用。
關鍵功能和支持特性
- Producer(生產者):負責發送消息到 RocketMQ 集群。生產者可以指定消息的主題(Topic)和標簽(Tag),便于消息分類和篩選。
- Consumer(消費者):負責從 RocketMQ 集群接收和處理消息。消費者分為集群消費和廣播消費兩種模式。
- NameServer(命名服務器):提供輕量級的路由服務,存儲生產者和消費者與 Broker 之間的路由信息。支持動態擴展,確保系統的高可用性。
- Broker(代理服務器):負責存儲和轉發消息。Broker 將消息存儲在磁盤上,并根據需要將消息傳遞給消費者。支持主從架構,提升系統的容錯能力和數據可靠性。
- 事務消息:支持分布式事務消息,確保在分布式系統中實現最終一致性。事務消息可以確保消息在事務成功時被發送,事務失敗時被回滾。
- 順序消息:支持順序消息傳遞,確保消息按照發送順序被消費。
- 定時消息和延時消息:支持消息定時發送和延時發送,滿足特定的業務需求。
- 批量消息:支持批量消息發送,提高消息傳遞效率。
- 消息過濾:支持基于標簽的消息過濾,消費者可以根據標簽選擇性地接收消息。
- 消息重試:支持消息重試機制,確保消息在消費失敗時可以重新消費。
- 死信隊列(DLQ):支持死信隊列,確保處理失敗的消息不會丟失。
- 流量控制:支持流量控制,防止系統過載。
- 分布式架構:支持多數據中心部署和跨地域部署,提升系統的高可用性和容災能力。
- 監控和管理:提供完善的監控和管理工具,支持消息統計、系統監控和運維管理。
- 安全性:支持基于權限的訪問控制,確保消息傳遞的安全性。
使用場景
- 異步通信:在分布式系統中實現異步消息傳遞,解耦系統組件,提升系統的響應速度和可靠性。
- 事件驅動架構:構建基于事件的系統,通過消息隊列實現事件驅動的業務邏輯,提升系統的靈活性和可維護性。
- 日志收集:集中收集和處理分布式系統中的日志信息,實現統一的日志管理和分析。
- 流處理:實時處理和分析數據流,支持大數據分析、實時監控等應用場景。
優勢
- 高性能、高吞吐量:適用于大規模、高并發的業務場景。
- 強一致性:通過嚴格的消息確認機制,確保數據一致性。
- 靈活的擴展性:支持彈性伸縮,能夠適應業務的快速變化。
- 多語言支持:提供多種語言的客戶端接口,便于開發和集成。
Apache RocketMQ 以其高性能、高可靠性和靈活性,成為眾多企業實現分布式消息傳遞和流處理的首選解決方案。
二、RocketMQ 主要組件及其作用
Apache RocketMQ 主要組件及其作用如下:
1. NameServer(命名服務器)
作用:提供輕量級的路由服務,存儲生產者和消費者與 Broker 之間的路由信息。
- 路由管理:維護 Broker 的地址列表和路由信息,供生產者和消費者查詢。
- 動態注冊:Broker 啟動時向 NameServer 注冊,定期發送心跳以保持連接。
- 高可用性:可以部署多個 NameServer,實現高可用。
2. Broker(代理服務器)
作用:負責存儲和轉發消息,是 RocketMQ 的核心組件。
- 消息存儲:持久化存儲消息,確保數據的可靠性和持久性。
- 消息轉發:將消息從生產者傳遞到消費者。
- 主從架構:支持主從模式,主 Broker 負責消息處理,從 Broker 備份數據,提供容錯能力。
- 消息索引:通過建立消息索引,提高消息檢索效率。
3. Producer(生產者)
作用:負責發送消息到 RocketMQ 集群。
- 消息生成:創建并發送消息到指定的主題(Topic)。
- 異步發送:支持異步消息發送,提高發送性能。
- 路由獲取:通過 NameServer 獲取 Broker 路由信息,將消息發送到合適的 Broker。
4. Consumer(消費者)
作用:負責從 RocketMQ 集群接收和處理消息。
- 消息消費:從指定的主題(Topic)接收并處理消息。
- 消費模式:支持集群消費和廣播消費兩種模式。
- 集群消費:同一消費組內的多個消費者負載均衡地處理消息,每條消息只會被一個消費者處理一次。
- 廣播消費:每個消費者都會處理所有的消息,每條消息會被所有消費者處理一次。
- 消息過濾:可以根據標簽(Tag)進行消息過濾,選擇性接收和處理消息。
5. Controller(控制器)
作用:管理主從 Broker 的自動故障轉移和高可用性。
- 事務管理:負責分布式事務的管理,確保消息在事務成功時被發送,事務失敗時被回滾。
- 狀態監控:監控 Broker 的狀態,在主 Broker 發生故障時自動將從 Broker 提升為新的主 Broker。
- 協調主從:在 Broker 間協調主從關系,確保系統的高可用性和數據一致性。
6. RocketMQ Console(管理控制臺)
作用:提供可視化的運維管理工具。
- 集群監控:實時監控 RocketMQ 集群的運行狀態,包括 Broker、Topic、Consumer 等信息。
- 消息查詢:支持消息的精確查詢和模糊查詢,方便運維人員排查問題。
- 配置管理:提供 Topic、Consumer Group、Broker 配置的管理功能。
7. Store(消息存儲)
作用:實現消息的持久化存儲。
- CommitLog:存儲所有消息的物理文件,按順序寫入,支持快速寫入操作。
- ConsumeQueue:消息的邏輯隊列,記錄消息在 CommitLog 中的位置,便于消費者快速檢索。
- IndexFile:消息索引文件,通過消息的屬性(如消息鍵)建立索引,支持快速查詢。
8. Client(客戶端)
作用:包含 Producer 和 Consumer 的客戶端 SDK。
- API 提供:提供 Java、C++、Python 等多種語言的客戶端接口,便于集成和使用。
- 消息操作:支持消息的發送、接收、過濾、批處理等操作。
總結
最新版本的 Apache RocketMQ 通過這些組件的協同工作,提供了高性能、高可靠性和高可擴展性的分布式消息傳遞和流處理服務。各個組件分工明確,確保系統能夠高效、穩定地運行,并滿足各種復雜業務場景的需求。
三、RocketMQ 部署方式介紹
Apache RocketMQ 5.0 版本完成基本消息收發,包括 NameServer、Broker、Proxy 組件。 在 5.0 版本中 Proxy 和 Broker 根據實際訴求可以分為 Local 模式和 Cluster 模式,一般情況下如果沒有特殊需求,或者遵循從早期版本平滑升級的思路,可以選用Local模式。
Local 模式
- 在 Local 模式下,Broker 和 Proxy 是同進程部署,只是在原有 Broker 的配置基礎上新增 Proxy 的簡易配置就可以運行。
Cluster 模式
- 在 Cluster 模式下,Broker 和 Proxy 分別部署,即在原有的集群基礎上,額外再部署 Proxy 即可。
主備自動切換模式
- 主備自動切換模式部署方式下,主要增加支持自動主從切換的 Controller 組件,它可以獨立部署也可以內嵌在 NameServer 中。
下文分別介紹三種部署方式:
四、Local模式部署
由于 Local 模式下 Proxy 和 Broker 是同進程部署,Proxy本身無狀態,因此主要的集群配置仍然以 Broker 為基礎進行即可。
啟動 NameServer
NameServer需要先于Broker啟動,且如果在生產環境使用,為了保證高可用,建議一般規模的集群啟動3個NameServer,各節點的啟動命令相同,如下:
### 首先啟動Name Server
$ nohup sh mqnamesrv &### 驗證Name Server 是否啟動成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
啟動Broker+Proxy
單組節點單副本模式
警告
這種方式風險較大,因為 Broker 只有一個節點,一旦Broker重啟或者宕機時,會導致整個服務不可用。不建議線上環境使用, 可以用于本地測試。
啟動 Broker+Proxy
$ nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &### 驗證Broker 是否啟動成功,例如Broker的IP為:192.168.1.2,且名稱為broker-a
$ tail -f ~/logs/rocketmqlogs/broker_default.log
The broker[xxx, 192.169.1.2:10911] boot success...
多組節點(集群)單副本模式
一個集群內全部部署 Master 角色,不部署Slave 副本,例如2個Master或者3個Master,這種模式的優缺點如下:
-
優點:配置簡單,單個Master宕機或重啟維護對應用無影響,在磁盤配置為RAID10時,即使機器宕機不可恢復情況下,由于RAID10磁盤非常可靠,消息也不會丟(異步刷盤丟失少量消息,同步刷盤一條不丟),性能最高;
-
缺點:單臺機器宕機期間,這臺機器上未被消費的消息在機器恢復之前不可訂閱,消息實時性會受到影響。
啟動Broker+Proxy集群
### 在機器A,啟動第一個Master,例如NameServer的IP為:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties --enable-proxy &### 在機器B,啟動第二個Master,例如NameServer的IP為:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties --enable-proxy &...
備注
如上啟動命令是在單個NameServer情況下使用的。對于多個NameServer的集群,Broker啟動命令中-n
后面的地址列表用分號隔開即可,例如 192.168.1.1:9876;192.161.2:9876
。
多節點(集群)多副本模式-異步復制
每個Master配置一個Slave,有多組 Master-Slave,HA采用異步復制方式,主備有短暫消息延遲(毫秒級),這種模式的優缺點如下:
-
優點:即使磁盤損壞,消息丟失的非常少,且消息實時性不會受影響,同時Master宕機后,消費者仍然可以從Slave消費,而且此過程對應用透明,不需要人工干預,性能同多Master模式幾乎一樣;
-
缺點:Master宕機,磁盤損壞情況下會丟失少量消息。
啟動Broker+Proxy集群
### 在機器A,啟動第一個Master,例如NameServer的IP為:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties --enable-proxy &### 在機器B,啟動第二個Master,例如NameServer的IP為:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b.properties --enable-proxy &### 在機器C,啟動第一個Slave,例如NameServer的IP為:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties --enable-proxy &### 在機器D,啟動第二個Slave,例如NameServer的IP為:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties --enable-proxy &
多節點(集群)多副本模式-同步雙寫
每個Master配置一個Slave,有多對 Master-Slave,HA采用同步雙寫方式,即只有主備都寫成功,才向應用返回成功,這種模式的優缺點如下:
-
優點:數據與服務都無單點故障,Master宕機情況下,消息無延遲,服務可用性與數據可用性都非常高;
-
缺點:性能比異步復制模式略低(大約低10%左右),發送單個消息的RT會略高,且目前版本在主節點宕機后,備機不能自動切換為主機。
啟動 Broker+Proxy 集群
### 在機器A,啟動第一個Master,例如NameServer的IP為:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties --enable-proxy &### 在機器B,啟動第二個Master,例如NameServer的IP為:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties --enable-proxy &### 在機器C,啟動第一個Slave,例如NameServer的IP為:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties --enable-proxy &### 在機器D,啟動第二個Slave,例如NameServer的IP為:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties --enable-proxy &
提示
以上 Broker 與 Slave 配對是通過指定相同的 BrokerName 參數來配對,Master 的 BrokerId 必須是 0,Slave 的 BrokerId 必須是大于 0 的數。另外一個 Master 下面可以掛載多個 Slave,同一 Master 下的多個 Slave 通過指定不同的 BrokerId 來區分。$ROCKETMQ_HOME指的RocketMQ安裝目錄,需要用戶自己設置此環境變量。
5.0 HA新模式
提供更具靈活性的HA機制,讓用戶更好的平衡成本、服務可用性、數據可靠性,同時支持業務消息和流存儲的場景。詳見
五、Cluster模式部署
在 Cluster 模式下,Broker 與 Proxy分別部署,我可以在 NameServer和 Broker都啟動完成之后再部署 Proxy。
在 Cluster模式下,一個 Proxy集群和 Broker集群為一一對應的關系,可以在 Proxy的配置文件 rmq-proxy.json
中使用 rocketMQClusterName
進行配置
啟動 NameServer
### 首先啟動Name Server
$ nohup sh mqnamesrv &### 驗證Name Server 是否啟動成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
單組節點單副本模式
警告
這種方式風險較大,因為 Broker 只有一個節點,一旦Broker重啟或者宕機時,會導致整個服務不可用。不建議線上環境使用, 可以用于本地測試。
### 在機器A,啟動第一個Master,例如NameServer的IP為:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 &
多組節點(集群)單副本模式
一個集群內全部部署 Master 角色,不部署Slave 副本,例如2個Master或者3個Master,這種模式的優缺點如下:
-
優點:配置簡單,單個Master宕機或重啟維護對應用無影響,在磁盤配置為RAID10時,即使機器宕機不可恢復情況下,由于RAID10磁盤非常可靠,消息也不會丟(異步刷盤丟失少量消息,同步刷盤一條不丟),性能最高;
-
缺點:單臺機器宕機期間,這臺機器上未被消費的消息在機器恢復之前不可訂閱,消息實時性會受到影響。
### 在機器A,啟動第一個Master,例如NameServer的IP為:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties &### 在機器B,啟動第二個Master,例如NameServer的IP為:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties &...
如上啟動命令是在單個NameServer情況下使用的。對于多個NameServer的集群,Broker啟動命令中-n
后面的地址列表用分號隔開即可,例如 192.168.1.1:9876;192.161.2:9876
。
多節點(集群)多副本模式-異步復制
每個Master配置一個Slave,有多組 Master-Slave,HA采用異步復制方式,主備有短暫消息延遲(毫秒級),這種模式的優缺點如下:
-
優點:即使磁盤損壞,消息丟失的非常少,且消息實時性不會受影響,同時Master宕機后,消費者仍然可以從Slave消費,而且此過程對應用透明,不需要人工干預,性能同多Master模式幾乎一樣;
-
缺點:Master宕機,磁盤損壞情況下會丟失少量消息。
### 在機器A,啟動第一個Master,例如NameServer的IP為:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties &### 在機器B,啟動第二個Master,例如NameServer的IP為:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b.properties &### 在機器C,啟動第一個Slave,例如NameServer的IP為:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties &### 在機器D,啟動第二個Slave,例如NameServer的IP為:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties &
多節點(集群)多副本模式-同步雙寫
每個Master配置一個Slave,有多對 Master-Slave,HA采用同步雙寫方式,即只有主備都寫成功,才向應用返回成功,這種模式的優缺點如下:
-
優點:數據與服務都無單點故障,Master宕機情況下,消息無延遲,服務可用性與數據可用性都非常高;
-
缺點:性能比異步復制模式略低(大約低10%左右),發送單個消息的RT會略高,且目前版本在主節點宕機后,備機不能自動切換為主機。
### 在機器A,啟動第一個Master,例如NameServer的IP為:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties &### 在機器B,啟動第二個Master,例如NameServer的IP為:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties &### 在機器C,啟動第一個Slave,例如NameServer的IP為:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties &### 在機器D,啟動第二個Slave,例如NameServer的IP為:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties &
提示
以上 Broker 與 Slave 配對是通過指定相同的 BrokerName 參數來配對,Master 的 BrokerId 必須是 0,Slave 的 BrokerId 必須是大于 0 的數。另外一個 Master 下面可以掛載多個 Slave,同一 Master 下的多個 Slave 通過指定不同的 BrokerId 來區分。$ROCKETMQ_HOME指的RocketMQ安裝目錄,需要用戶自己設置此環境變量。
5.0 HA新模式
提供更具靈活性的HA機制,讓用戶更好的平衡成本、服務可用性、數據可靠性,同時支持業務消息和流存儲的場景。詳見
啟動 Proxy
可以在多臺機器啟動多個Proxy
### 在機器A,啟動第一個Proxy,例如NameServer的IP為:192.168.1.1
$ nohup sh bin/mqproxy -n 192.168.1.1:9876 &### 在機器B,啟動第二個Proxy,例如NameServer的IP為:192.168.1.1
$ nohup sh bin/mqproxy -n 192.168.1.1:9876 &### 在機器C,啟動第三個Proxy,例如NameServer的IP為:192.168.1.1
$ nohup sh bin/mqproxy -n 192.168.1.1:9876 &
若需要指定配置文件,可以使用 -pc
或者 --proxyConfigPath
進行指定
### 自定義配置文件
$ nohup sh bin/mqproxy -n 192.168.1.1:9876 -pc /path/to/proxyConfig.json &
六、主備自動切換模式部署
如何部署支持自動主從切換的 RocketMQ 集群? 其架構如上圖所示,主要增加支持自動主從切換的 Controller 組件,其可以獨立部署也可以內嵌在 NameServer 中。
Controller 部署
Controller 組件提供選主能力,若需要保證 Controller 具備容錯能力,Controller 部署需要三副本及以上(遵循 Raft 的多數派協議)。
注意
Controller 若只部署單副本也能完成 Broker Failover,但若該單點 Controller 故障,會影響切換能力,但不會影響存量集群的正常收發。
Controller 部署有兩種方式。一種是嵌入于 NameServer 進行部署,可以通過配置 enableControllerInNamesrv 打開(可以選擇性打開,并不強制要求每一臺 NameServer 都打開),在該模式下,NameServer 本身能力仍然是無狀態的,也就是內嵌模式下若 NameServer 掛掉多數派,只影響切換能力,不影響原來路由獲取等功能。另一種是獨立部署,需要單獨部署 Controller 組件。
Controller 嵌入 NameServer 部署
嵌入 NameServer 部署時只需要在 NameServer 的配置文件中設置 enableControllerInNamesrv=true,并填上 Controller 的配置即可。
enableControllerInNamesrv = true
controllerDLegerGroup = group1
controllerDLegerPeers = n0-127.0.0.1:9877;n1-127.0.0.1:9878;n2-127.0.0.1:9879
controllerDLegerSelfId = n0
controllerStorePath = /home/admin/DledgerController
enableElectUncleanMaster = false
notifyBrokerRoleChanged = true
參數解釋:
- enableControllerInNamesrv:Nameserver 中是否開啟 controller,默認 false。
- controllerDLegerGroup:DLedger Raft Group 的名字,同一個 DLedger Raft Group 保持一致即可。
- controllerDLegerPeers:DLedger Group 內各節點的端口信息,同一個 Group 內的各個節點配置必須要保證一致。
- controllerDLegerSelfId:節點 id,必須屬于 controllerDLegerPeers 中的一個;同 Group 內各個節點要唯一。
- controllerStorePath:controller 日志存儲位置。controller 是有狀態的,controller 重啟或宕機需要依靠日志來恢復數據,該目錄非常重要,不可以輕易刪除。
- enableElectUncleanMaster:是否可以從 SyncStateSet 以外選舉 Master,若為 true,可能會選取數據落后的副本作為 Master 而丟失消息,默認為 false。
- notifyBrokerRoleChanged:當 Broker 副本組上角色發生變化時是否主動通知,默認為 true。
參數設置完成后,指定配置文件啟動 Nameserver 即可。
$ nohup sh bin/mqnamesrv -c namesrv.conf &
Controller 獨立部署
獨立部署執行以下腳本即可
$ nohup sh bin/mqcontroller -c controller.conf &
mqcontroller 腳本在源碼包 distribution/bin/mqcontroller,配置參數與內嵌模式相同。
注意
獨立部署Controller后,仍然需要單獨部署NameServer提供路由發現能力
Broker 部署
Broker 啟動方法與之前相同,增加以下參數
- enableControllerMode:Broker controller 模式的總開關,只有該值為 true,自動主從切換模式才會打開。默認為 false。
- controllerAddr:controller 的地址,多個 controller 中間用分號隔開。例如
controllerAddr = 127.0.0.1:9877;127.0.0.1:9878;127.0.0.1:9879
- syncBrokerMetadataPeriod:向 controller 同步 Broker 副本信息的時間間隔。默認 5000(5s)。
- checkSyncStateSetPeriod:檢查 SyncStateSet 的時間間隔,檢查 SyncStateSet 可能會 shrink SyncState。默認5000(5s)。
- syncControllerMetadataPeriod:同步 controller 元數據的時間間隔,主要是獲取 active controller 的地址。默認10000(10s)。
- haMaxTimeSlaveNotCatchup:表示 Slave 沒有跟上 Master 的最大時間間隔,若在 SyncStateSet 中的 slave 超過該時間間隔會將其從 SyncStateSet 移除。默認為 15000(15s)。
- storePathEpochFile:存儲 epoch 文件的位置。epoch 文件非常重要,不可以隨意刪除。默認在 store 目錄下。
- allAckInSyncStateSet:若該值為 true,則一條消息需要復制到 SyncStateSet 中的每一個副本才會向客戶端返回成功,可以保證消息不丟失。默認為 false。
- syncFromLastFile:若 slave 是空盤啟動,是否從最后一個文件進行復制。默認為 false。
- asyncLearner:若該值為 true,則該副本不會進入 SyncStateSet,也就是不會被選舉成 Master,而是一直作為一個 learner 副本進行異步復制。默認為false。
- inSyncReplicas:需保持同步的副本組數量,默認為1,allAckInSyncStateSet=true 時該參數無效。
- minInSyncReplicas:最小需保持同步的副本組數量,若 SyncStateSet 中副本個數小于 minInSyncReplicas 則 putMessage 直接返回 PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH,默認為1。
在Controller模式下,Broker配置必須設置 enableControllerMode=true,并填寫 controllerAddr,并以下面命令啟動:
$ nohup sh bin/mqbroker -c broker.conf &
注意
自動主備切換模式下Broker無需指定brokerId和brokerRole,其由Controller組件進行分配
兼容性
該模式未對任何客戶端層面 API 進行新增或修改,不存在客戶端的兼容性問題。
Nameserver 本身能力未做任何修改,Nameserver 不存在兼容性問題。如開啟 enableControllerInNamesrv 且 controller 參數配置正確,則開啟 controller 功能。
Broker若設置 enableControllerMode=false,則仍然以之前方式運行。若設置 enableControllerMode=true,則需要部署 controller 且參數配置正確才能正常運行。
具體行為如下表所示:
舊版 Nameserver | 舊版 Nameserver+獨立部署 Controller | 新版 Nameserver 開啟 controller功能 | 新版 Nameserver 關閉 controller 功能 | |
---|---|---|---|---|
舊版 Broker | 正常運行,無法切換 | 正常運行,無法切換 | 正常運行,無法切換 | 正常運行,無法切換 |
新版 Broker 開啟 Controller 模式 | 無法正常上線 | 正常運行,可以切換 | 正常運行,可以切換 | 無法正常上線 |
新版 Broker 不開啟 Controller 模式 | 正常運行,無法切換 | 正常運行,無法切換 | 正常運行,無法切換 | 正常運行,無法切換 |
升級注意事項
從上述兼容性表述可以看出,NameServer 正常升級即可,無兼容性問題。在不想升級 Nameserver 情況,可以獨立部署 Controller 組件來獲得切換能力。
針對 Broker 升級,分為兩種情況:
(1)Master-Slave 部署升級成 Controller 切換架構
可以帶數據進行原地升級,對于每組 Broker,停機主、備 Broker,保證主、備的 CommitLog 對齊(可以在升級前禁寫該組 Broker 一段時間,或則通過拷貝方式保證一致),升級包后重新啟動即可。
注意
若主備 CommitLog 不對齊,需要保證主上線以后再上線備,否則可能會因為數據截斷而丟失消息。
(2)原 DLedger 模式升級到 Controller 切換架構
由于原 DLedger 模式消息數據格式與 Master-Slave 下數據格式存在區別,不提供帶數據原地升級的路徑。在部署多組 Broker 的情況下,可以禁寫某一組 Broker 一段時間(只要確認存量消息被全部消費即可,比如根據消息的保存時間來決定),然后清空 store 目錄下除 config/topics.json、subscriptionGroup.json 下(保留 topic 和訂閱關系的元數據)的其他文件后,進行空盤升級。
七、Rocketmq Controller集群模式的工作流程
在 RocketMQ 的集群模式下,Controller 負責管理多個 Broker 的元數據和協調它們的操作。下面是集群模式下的工作流程和 Mermaind 圖示例。
工作流程
-
啟動 Controller
- 啟動多個 Controller 節點,形成一個高可用的 Controller 集群。
- Controller 節點通過 Raft 協議選舉出一個 Leader,其他節點為 Follower。
-
啟動 Broker
- Broker 啟動時,會向 Controller 集群注冊,獲取元數據和配置信息。
- Broker 定期向 Controller 報告其狀態和心跳信息。
-
元數據管理
- Controller 負責維護 Broker 的元數據,包括 Broker 地址、Topic 分配信息等。
- Controller 處理 Broker 的注冊、注銷和狀態變更,并將這些變更通知給其他 Broker。
-
協調操作
- 當有新的 Topic 或者 Queue 需要創建時,客戶端向 Controller 發送請求。
- Controller 負責選擇合適的 Broker 并執行創建操作。
- Controller 負責處理 Broker 故障,重新分配 Topic 和 Queue。
-
數據同步
- 主 Broker 負責處理寫請求,從 Broker 負責處理讀請求。
- 主從 Broker 之間通過同步機制保持數據一致性。
Mermaind 圖
以下是表示 RocketMQ 集群模式下 Controller 工作流程的 Mermaind 圖示例:
這張圖展示了 Controller 和 Broker 之間的交互流程,Controller 負責管理元數據、協調 Broker 操作,并在 Broker 之間分配任務和同步數據。
八、RocketMQ最佳實踐
1. 集群部署
主備部署
- 多主多備:在生產環境中,推薦使用多主多備的部署方式。多個主節點(Master)和備份節點(Slave)可以確保在主節點發生故障時,備份節點能夠快速接管,保證系統的高可用性。
- 跨機房部署:將 Broker 分布在不同的機房,以防止單個機房故障導致服務不可用。
NameServer 高可用
- 多 NameServer 部署:至少部署三個 NameServer 節點,以確保高可用性。NameServer 是無狀態的,客戶端可以隨機選擇一個 NameServer 進行連接。
- 定期檢查:定期檢查 NameServer 的狀態,確保其處于正常工作狀態。
Broker 分片
- 分片部署:將 Broker 分成多個分片,分布在不同的物理服務器上。這樣即使某個 Broker 出現故障,也不會影響到整個系統的運行。
- 合理分配 Topic:將 Topic 分配到不同的分片中,避免單個分片承載過多的消息流量。
2. 消息存儲
消息持久化
- 開啟持久化:確保消息的持久化功能開啟,防止因 Broker 異常重啟導致的消息丟失。可以在 Broker 配置文件中設置
flushDiskType=SYNC_FLUSH
來開啟同步刷盤。
存儲路徑
- 選擇高性能磁盤:將消息存儲路徑設置在性能較好的 SSD 磁盤上,提升存儲和讀取效率。
- 分開存儲日志和數據:將 CommitLog 和 ConsumerQueue 存儲在不同的磁盤上,減少磁盤 I/O 的競爭。
清理過期消息
- 設置消息過期時間:根據業務需求,在 Broker 配置文件中設置消息過期時間(默認是72小時)。過期的消息會被定期清理,釋放存儲空間。
- 手動清理:在必要時,可以手動清理過期消息,確保存儲空間的充足。
3. 性能優化
批量發送
- 批量發送配置:在 Producer 端配置批量發送,減少網絡 I/O 次數。例如,可以通過
sendBatchMessage
方法發送批量消息,提高發送效率。 - 批量大小控制:合理設置批量消息的大小,避免單次發送的數據量過大,導致網絡擁堵。
消費端并發
- 并發消費:在 Consumer 端設置合理的并發線程數,以提高消息的消費能力。可以通過
consumeThreadMin
和consumeThreadMax
參數來調整消費線程數。 - 順序消費:對于需要保證順序的消息,使用順序消費模式(Orderly),避免消息亂序。
異步發送與回調
- 異步發送:使用異步發送方式,通過
sendAsync
方法發送消息,避免同步等待,提高發送性能。 - 回調處理:結合回調函數處理發送結果,及時處理發送失敗的情況。
4. 消息重試和補償
消息重試機制
- 消費重試:利用 RocketMQ 內置的重試機制,確保消息消費成功。可以通過
maxReconsumeTimes
參數設置最大重試次數。 - 重試間隔:合理設置重試間隔,避免頻繁重試導致的資源浪費。
業務補償機制
- 冪等性設計:在關鍵業務場景中,設計冪等性操作,確保多次處理同一消息不會產生副作用。
- 補償事務:對于分布式事務場景,設計補償機制,確保事務的一致性。
5. 監控和報警
系統監控
- RocketMQ Console:使用 RocketMQ Console 監控系統的運行狀態,包括 Broker、Producer 和 Consumer 的狀態。
- Prometheus 和 Grafana:將 RocketMQ 的監控數據導入 Prometheus,并使用 Grafana 可視化監控數據。
日志記錄
- 詳細日志:啟用詳細的日志記錄功能,記錄消息的發送、接收和消費的詳細信息。可以通過調整
logLevel
參數設置日志級別。 - 日志輪轉:定期輪轉日志文件,避免日志文件過大,影響系統性能。
報警設置
- 閾值報警:設置合理的報警閾值,當系統指標(如消息堆積、發送失敗率等)超過閾值時,觸發報警。
- 郵件和短信通知:配置報警通知,通過郵件和短信及時通知運維人員,進行處理。
6. 安全性
權限控制
- ACL 配置:啟用 RocketMQ 的訪問控制列表(ACL)功能,對 Producer 和 Consumer 進行權限管理,防止非法訪問。可以在配置文件中設置 ACL 規則。
- 用戶認證:對接入 RocketMQ 的用戶進行認證,確保只有合法用戶才能訪問消息系統。
數據加密
- 傳輸加密:對消息傳輸過程進行加密,使用 SSL/TLS 協議保護數據的安全。
- 存儲加密:對敏感數據進行加密存儲,確保數據在磁盤上的安全。
網絡隔離
- 內網部署:將 RocketMQ 部署在內網環境中,通過防火墻和 VPN 等手段進行網絡隔離,防止外部攻擊。
- IP 白名單:配置 IP 白名單,只允許特定 IP 地址訪問 RocketMQ 服務。
7. 災備和容災
跨地域部署
- 多地域部署:在多個地域部署 Broker 集群,確保在一個地域發生故障時,能夠自動切換到其他地域繼續提供服務。
- 數據同步:利用 RocketMQ 的數據同步功能,在不同地域之間進行數據同步,確保數據的一致性。
數據備份
- 定期備份:定期進行數據備份,確保在發生數據丟失時,能夠快速恢復數據。可以通過定期備份 CommitLog 和 ConsumerQueue 數據來實現。
- 異地備份:將備份數據存儲在異地,防止同一地點的災難導致備份數據丟失。
總結
通過遵循以上詳細的最佳實踐,可以有效提升 RocketMQ 的穩定性、性能和安全性,確保消息系統能夠高效、可靠地運行。結合具體的業務需求和系統環境,合理配置和優化 RocketMQ,可以實現最佳的使用效果。
九、Go語言實踐RocketMQ樣例
在 Go 語言中使用 RocketMQ,你需要使用 RocketMQ 的 Go 客戶端庫來進行消息的生產和消費。下面是一個簡單的實踐示例,包括如何配置 RocketMQ Go 客戶端、發送消息和消費消息的步驟。
1. 安裝 RocketMQ Go 客戶端庫
首先,你需要安裝 RocketMQ 的 Go 客戶端庫。可以通過 Go 的包管理工具 go get
來安裝:
go get github.com/apache/rocketmq-client-go/v2
2. 發送消息
以下是一個簡單的 Go 語言程序,用于向 RocketMQ 發送消息:
package mainimport ("fmt""log""github.com/apache/rocketmq-client-go/v2/producer""github.com/apache/rocketmq-client-go/v2/primitive"
)func main() {// 創建一個 RocketMQ 生產者實例p, err := producer.NewProducer(producer.WithNameServer([]string{"localhost:9876"}))if err != nil {log.Fatalf("create producer failed: %s", err.Error())}// 啟動生產者err = p.Start()if err != nil {log.Fatalf("start producer failed: %s", err.Error())}defer p.Shutdown()// 創建消息msg := &primitive.Message{Topic: "TestTopic",Body: []byte("Hello RocketMQ!"),}// 發送消息res, err := p.SendSync(context.Background(), msg)if err != nil {log.Fatalf("send message failed: %s", err.Error())}fmt.Printf("Send message success: %s\n", res.String())
}
3. 接收消息
以下是一個簡單的 Go 語言程序,用于從 RocketMQ 消費消息:
package mainimport ("context""fmt""log""github.com/apache/rocketmq-client-go/v2/consumer""github.com/apache/rocketmq-client-go/v2/primitive"
)func main() {// 創建一個 RocketMQ 消費者實例c, err := consumer.NewPushConsumer(consumer.WithNameServer([]string{"localhost:9876"}),consumer.WithGroupName("TestConsumerGroup"),)if err != nil {log.Fatalf("create consumer failed: %s", err.Error())}// 定義消息處理函數c.RegisterMessageListener(consumer.MessageListenerFunc(func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {for _, msg := range msgs {fmt.Printf("Received message: %s\n", string(msg.Body))}return consumer.ConsumeSuccess, nil}))// 訂閱主題err = c.Subscribe("TestTopic", consumer.MessageSelector{}, nil)if err != nil {log.Fatalf("subscribe topic failed: %s", err.Error())}// 啟動消費者err = c.Start()if err != nil {log.Fatalf("start consumer failed: %s", err.Error())}defer c.Shutdown()// 阻塞主線程,保持消費者運行select {}
}
4. 運行示例
-
確保你已經啟動了 RocketMQ 服務,并且
localhost:9876
是你的 NameServer 地址。 -
編譯并運行發送消息程序:
go run producer.go
-
編譯并運行接收消息程序:
go run consumer.go
-
你應該能看到發送的消息被消費者接收到并輸出。
注意事項
- NameServer 地址:確保你在創建生產者和消費者時指定的 NameServer 地址是正確的,并且 RocketMQ 服務正在運行。
- 主題和消費者組:確保在 RocketMQ 中已經創建了相應的主題(Topic),并且消費者組(Consumer Group)設置正確。
- 錯誤處理:在生產環境中,你應該實現更詳細的錯誤處理和日志記錄,以便調試和監控。
以上就是如何在 Go 語言中實踐 RocketMQ 的基本步驟。
十、RocketMQ歷史演進
RocketMQ 是一個高性能、高可用的分布式消息中間件,最初由阿里巴巴開源,并在社區中逐漸發展和演進。以下是 RocketMQ 的主要歷史演進過程:
1. 初始版本 (2012-2015)
-
2012年:RocketMQ 的前身被阿里巴巴開發為一個內部消息中間件,用于支持大規模的電商平臺和業務系統。
-
2013年:阿里巴巴開始開源 RocketMQ,最初以阿里巴巴內部的需求和經驗為基礎開發。早期版本主要用于阿里巴巴內部系統,支持基本的消息隊列功能。
-
2015年:RocketMQ 被正式開源,并成為 Apache 頂級項目。Apache RocketMQ 提供了更加成熟的消息中間件功能,滿足了高吞吐量、低延遲和高可靠性的需求。
2. 成為 Apache 頂級項目 (2016-2019)
-
2016年:RocketMQ 成為 Apache 頂級項目,標志著其在開源社區中的認可和穩定性。Apache RocketMQ 提供了生產環境所需的多種特性,包括事務消息、消息過濾、消息順序消費等。
-
2017年:引入了支持多種語言的客戶端 SDK,包括 Java、C++、Python 和 Go 等。
-
2018年:引入了新的特性和改進,包括改進的消息存儲機制、增強的高可用性和容錯能力、以及更多的管理和監控工具。
-
2019年:發布了 RocketMQ 4.x 版本,引入了更高的性能優化和新特性,如消息軌跡(Message Track)、增強的客戶端性能等。
3. 云原生和微服務支持 (2020-2023)
-
2020年:RocketMQ 5.0 版本發布,引入了云原生支持和對微服務架構的支持,包括對 Kubernetes 的支持、集成了 Prometheus 監控、增強的分布式事務能力等。RocketMQ 5.0 還引入了新的存儲引擎和更加靈活的消息路由策略。
-
2021年:進一步改進了 RocketMQ 的高可用性、可擴展性和性能,提供了更多的集群管理工具和性能監控功能。
-
2022年:引入了 RocketMQ 5.1 版本,改進了消息順序消費的可靠性和性能,同時增強了對云平臺的支持,優化了與大數據平臺的集成能力。
-
2023年:發布了 RocketMQ 5.2 版本,增加了新的功能和改進,包括更好的分布式事務支持、消息歷史追蹤功能和更高效的消息存儲機制。
4. 未來發展
- 未來計劃:RocketMQ 繼續關注云原生架構的演進、微服務架構的支持以及與其他大數據和流處理平臺的集成。未來的版本可能會引入更多對多租戶和彈性伸縮的支持,以及對新的消息通信協議的支持。
總結
RocketMQ 從一個內部使用的消息中間件發展成為一個成熟的開源項目,經過不斷的改進和演進,逐步成為 Apache 頂級項目,并支持現代的云原生和微服務架構。通過持續的開發和社區貢獻,RocketMQ 不斷適應新的技術需求和業務場景。
完。
希望對您有所幫助!關注鍋總,及時獲得更多花里胡哨的運維實用操作!
十一、一個秘密
鍋總個人博客
https://gentlewok.blog.csdn.net/
鍋總微信公眾號