?學習之前呢需要會使用linux的基礎命令
一.RocketMQ 主題與隊列的協同作用解析
在 RocketMQ 中,?主題(Topic)?與?隊列(Queue)?的協同設計實現了消息系統的邏輯抽象與物理存儲分離。雖然隊列實際存儲在不同集群的 Broker 節點上,但主題作為邏輯層面的核心單元,仍具有不可替代的作用:
1. ?主題是消息的邏輯分類與路由標識?
- ?邏輯聚合?:主題將同一類業務消息聚合為邏輯單元(例如訂單消息歸入?
OrderTopic
),生產者只需關注發送到哪個主題,無需感知底層隊列的物理分布。 - ?路由依據?:消息通過主題名稱路由到對應集群的隊列中。例如,生產者發送到?
PaymentTopic
?的消息會被自動分配到該主題關聯的隊列(可能分布在多個集群)。 - ?跨集群協同?:若隊列分布在多個集群,主題的元數據(如隊列分布規則)由 NameServer 統一管理,確保生產者/消費者能準確找到目標隊列。
2. ?主題實現消息的訂閱與消費隔離?
- ?訂閱關系管理?:消費者通過訂閱主題來接收消息,而非直接綁定隊列。即使隊列分布在多個集群,消費者組仍可通過主題名稱統一訂閱,RocketMQ 自動完成隊列分配和負載均衡。
- ?權限控制?:主題支持獨立的權限配置(如讀寫權限),實現不同業務消息的訪問隔離。例如,財務系統僅能訪問?
FinanceTopic
,與訂單系統隔離。 - ?Tag 過濾擴展?:在主題基礎上,通過 Tag 進一步細分消息類型(如?
OrderTopic:TagA
),消費者可選擇性訂閱特定 Tag,減少無關消息處理。
3. ?主題為運維提供統一管理入口?
- ?隊列動態擴展?:當集群擴容時,通過調整主題的隊列數量(如從 8 隊列增至 16 隊列),新隊列可自動分配到新增集群的 Broker,無需修改業務代碼。
- ?監控與告警?:主題級別的監控指標(如消息堆積量、消費 TPS)便于快速定位問題。例如,
InventoryTopic
?的消費延遲告警可直接關聯到庫存業務。 - ?數據生命周期?:主題支持獨立的消息保留策略(如保存 3 天),不同業務可按需配置,避免全局策略的局限性。
4. ?主題與隊列的協同模型?
?維度? | ?主題(Topic)? | ?隊列(Queue)? |
---|---|---|
?定位? | 邏輯分類單元,定義消息身份與權限 | 物理存儲單元,實際承載消息數據 |
?擴展性? | 通過增加隊列數量實現橫向擴展 | 依賴 Broker 集群擴容提升單隊列容量 |
?消費模式? | 支持集群消費(單隊列單消費者)或廣播消費 | 僅作為存儲載體,不直接決定消費模式 |
?運維操作? | 配置權限、Tag 規則、保留策略等 | 調整副本數、存儲路徑、故障遷移等 |
實際場景示例
- ?場景?:電商系統的訂單消息(
OrderTopic
)需要支持每秒 10 萬條吞吐量。 - ?實現?:
- 創建?
OrderTopic
?并設置 32 個隊列。 - 將隊列分布在 4 個集群的 Broker 節點上(每個集群 8 隊列)。
- 消費者組訂閱?
OrderTopic
,RocketMQ 自動將 32 個隊列均分給消費者實例。 - 當某個集群故障時,NameServer 將故障隊列的路由指向其他集群的副本隊列,保障高可用
- 創建?
小結
主題的核心價值在于?邏輯抽象?,它屏蔽了底層隊列的物理復雜性,同時提供業務層面的統一管理入口。隊列的分布式存儲解決的是?性能與可靠性問題?,而主題則定義了消息的?業務語義與協作規則?,兩者共同構成 RocketMQ 高并發、高可用的基石。
二.RocketMQ管理命令?
1、updateTopic
作用:修改或創建一個Topic
命令:mqadmin updateTopic -b | -c [-h] [-n ] [-o ] [-p ] [-r ] [-s ] -t [-u ] [-w ]
參數:
- -n: name server地址列表
- -c: cluster 名稱,表示topic 建在該集群
- -t: 設置topic名稱
- -h: 打印help信息
- -o: 設置topic是否為有序的 取值:true、false(默認)
- -p: 設置topic的權限
示例:?
?mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t testtopic
2、deleteTopic
作用:從broker和nameserver刪除topic?
命令:mqadmin deleteTopic -c [-h] [-n ] -t
參數:
- -n: name server地址列表
- -c: cluster 名稱,表示topic 建在該集群
- -t: 設置topic名稱
- -h: 打印help信息
示例:
mqadmin deleteTopic -n localhost:9876 -c DefaultCluster -t testtopic
3、topicList
作用:從nameserver列出所有topic
命令:mqadmin topicList [-c] [-h] [-n ]
參數:
- -n: name server地址列表
- -c: cluster 名稱,表示topic 建在該集群
- -h: 打印help信息
示例:
mqadmin topicList -n localhost:9876
4、topicStatus
作用:檢查topic的狀態信息
命令:mqadmin topicStatus [-h] [-n ] -t
參數:
- -n: name server地址列表
- -c: cluster 名稱,表示topic 建在該集群
- -t: 設置topic名稱
- -h: 打印help信息
?示例
mqadmin topicStatus -n localhost:9876 -t testtopic
5、cleanUnusedTopic
作用:清理未使用的topic
命令:mqadmin cleanUnusedTopic [-b ] [-c ] [-h] [-n ]
參數:
- -n: name server地址列表
- -b: broker地址
- -c: 集群名稱
- -h: 打印help信息
6、關閉namesrv和broker服務
mqshutdown namesrv
mqshutdown broker
三.RocketMQ安裝與配置?
1、首先修改環境變量?
vim /etc/profile
# 文件末尾追加改信息
export ROCKETMQ_HOME=/usr/local/rocketmq
export PATH=$PATH:$ROCKETMQ_HOME/bin
# 生效環境變量
source /etc/profile
2、解壓文件
unzip rocketmq-all-5.1.4-bin-release.zip
我們上傳安裝包至opt目錄下,使用該命令將其解壓至opt目錄下,最后使用mv命令對其進行位置移動和改名-----/usr/local/rocketmq
# 解壓到當前目錄(/opt)
unzip rocketmq-all-5.0.0-bin-release.zip# 將解壓后的目錄移動到/usr/local
mv rocketmq-all-5.0.0-bin-release /usr/local/# 改名
mv rocketmq-all-5.0.0-bin-release rocketmq
?進入bin目錄如下
3、啟動NameServer?
nohup sh mqnamesrv &
?
4、啟動broker?
nohup sh ./mqbroker -n localhost:9876 &?
使用jps命令查看后發現broker未啟動
啟動成功后的輸出結果?
使用 cat nohup.out?查看日志如下
?很常見的問題
1、java.lang.IllegalAccessError: class org.apache.rocketmq.common.UtilAll (in unnamed module @0x58acb723) cannot access class sun.nio.ch.DirectBuffer (in module java.base) because module java.base does not export sun.nio.ch to unnamed module?
解決:添加啟動參數
修改runbroker文件,添加紅色參數?$JAVA ${JAVA_OPT} --add-exports=java.base/sun.nio.ch=ALL-UNNAMED $@
2、堆空間初始值太大也報錯?
修改文件runbroker.sh(將光標位置修改為如下數字即可)
通過上述修改,將初始堆內存512M,最大堆內存設置為512M,新生代(Java中用于存儲創建對象的部分)設置為256M,修改完成后便可以正常啟動以及查看日志。?