全面解剖 消息中間件 RocketMQ-(3)
一、RocketMQ – mqadmin 命令介紹
1、mqadmin 管理工具 使用方式
進入 RocketMQ 安裝位置,在 bin 目錄下執行 ./mqadmin {command} {args}
# 進入 RocketMQ 安裝目錄的 bin 目錄下:
cd /usr/local/rocketmq/rocketmq-all-4.5.1-bin-release/bin/# mqadmin 管理工具 使用方式
./mqadmin {command} {args}
2、mqadmin 命令介紹
1)Topic 相關
名稱 | 含義 | 命令選項 | 說明 |
---|---|---|---|
updateTopic | 創建更新 Topic 配置 | -b | Broker 地址,表示 topic 所在 Broker,只支持單臺 Broker,地址為 ip:port |
-c | cluster 名稱,表示 topic 所在集群(集群可通過 clusterList查詢) | ||
-h- | 打印幫助 | ||
-n | NameServer服務地址,格式 ip:port | ||
-p | 指定新topic的讀寫權限(W=2 | ||
-r | 可讀隊列數(默認為8) | ||
-w | 可寫隊列數(默認為8) | ||
-t | topic 名稱(名稱只能使用字符 【^ [a-zA-Z0-9_-]+$) 】 | ||
deleteTopic | 刪除 Topic | -h | 打印幫助 |
-n | NameServer服務地址,格式 ip:port | ||
-t | topic 名稱(名稱只能使用字符 【 ^ [a-zA-Z0-9_-]+$) 】 | ||
topicList | 查看 Topic 列表信息 | -c | 不配置-c只返回 topic 列表,增加-c返回 clusterName, topic,consumerGroup 信息,即 topic 的所屬集群和訂閱關系,沒有參數 |
-n | NameServer服務地址,格式 ip:port | ||
-t | topic 名稱 | ||
topicRoute | 查看 Topic 路由信息 | -h | 打印幫助 |
-n | NameServer 服務地址,格式 ip:port | ||
-t | topic 名稱 | ||
topicStatus | 查看 Topic 消息隊列 offset | -h | 打印幫助 |
-n | NameServer服務地址,格式 ip:port | ||
topicClusterList | 查看 Topic 所在集群列表 | -h | 打印幫助 |
-n | NameServer 服務地址,格式 ip:port | ||
updateTopicPerm | 更新 Topic 讀寫權限 | -t | topic 名稱 |
-h | 打印幫助 | ||
-n | NameServer服務地址,格式 ip:port | ||
-b | Broker 地址,表示 topic 所在 Broker,只支持單臺 Broker,地址為:ip:port | ||
-p | 指定新 topic 的讀寫權限(W=2 | ||
-c | cluster 名稱,表示 topic 所在集群(集群可通過 clusterList 查詢,-b 優先,如果沒有 -b,則對集群中所有 Broker 執行命令 | ||
updateOrderConf | 從 NameServer 上創建,刪除,獲取特定命名空間的 kv 配置,目前還未啟用 | -t | topic 鍵 |
-h | 打印幫助 | ||
-n | NameServer服務地址,格式 ip:port | ||
-v | orderConf,值 | ||
-m | method,可選 get,put,delete | ||
allocateMQ | -t | topic 名稱 | |
-h | 打印幫助 | ||
-n | NameServer服務地址,格式 ip:port | ||
-v | Broker 地址,表示 topic 所在 Broker,只支持單臺 Broker,地址為:ip:port | ||
2)集群相關
名稱 | 含義 | 命令選項 | 說明 |
---|---|---|---|
clusterList | 查看集群信息,集群,BrokerName,Brokerld, TPS 等信息 | -m | #OutTotalYest,#InTotalToday,#OutTotalToday |
-h | 打印幫助 | ||
-n | NameServer服務地址,格式 ip:port | ||
-i | 打印間隔,單位秒 | ||
-a | amount,每次探測總數,RT=總時間/amount | ||
-s | 消息大小,單位 B | ||
-c | 探測那個集群 | ||
-p | 是否打印格式化日志,以 | ||
-i | 打印間隔,單位秒 | ||
clusterRT | 發送消息檢測集群各 BrokerRT,消息發往 ${BrokerName}Topic | -a | amount,每次探測總數,RT=總時間/amount |
-s | 消息大小,單位 B | ||
-c | 探測那個集群 | ||
-p | 是否打印格式化日志,以 | ||
-h | 打印幫助 | ||
-m | 所屬機房,打印使用 | ||
-i | 打印間隔,單位秒 | ||
-n | NameServer服務地址,格式 ip:port | ||
3)Broker 相關
名稱 | 含義 | 命令選項 | 說明 |
---|---|---|---|
updateBrokerConfig | 更新 Broker 配置文件,會修改 Broker.conf | -b | Broker 地址,格式 ip:port |
-c | cluster 名稱 | ||
-k | key 值 | ||
-v | value 值 | ||
-h | 打印幫助 | ||
4)消息相關
名稱 | 含義 | 命令選項 | 說明 |
---|---|---|---|
queryMsgById | 根據 offsetMsgId 查詢 msg,如果使用開源控制臺,應使用,此命令還有其他參數,具體作用請閱讀QueryMsgByIdSubCommand | -i | msgId |
-h | 打印幫助 | ||
-n | NameServer服務地址,格式 ip:port | ||
queryMsgByKey | 根據消息 Key 查詢消息 | -k | msgKey |
-t | Topic 名稱 | ||
-h | 打印幫助 | ||
-n | NameServer 服務地址,格式 ip:port | ||
-b | Broker 名稱,(這里需要注意填寫的是 Broker 的名稱,不是 Broker 的地址,Broker 名稱可以在 clusterList 查到 | ||
queryMsgByOffset | 根據Offset 查詢消息 | -i | query 隊列 id |
-o | offset 值 | ||
-t | Topic 名稱 | ||
-h | 打印幫助 | ||
-n | NameServer服務地址,格式 ip:port | ||
queryMsgByUniqueKey | 根據 msgId 查詢,msgId 不同于 offsetMsgId,區別詳見常見運維問題,-g,-d 配合使用,查到消息后嘗試讓特定的消費者消費消息并返回消費結果 | -h | 打印幫助 |
-n | NameServer服務地址,格式 ip:port | ||
-i | uniqe msg id | ||
-g | consumerGroup | ||
-d | clientId | ||
-t | Topic 名稱 | ||
checkMsgSendRT | 檢測向 topic 發消息的 RT,功能類似 clusterRT | -h | 打印幫助 |
-n | NameServer服務地址,格式 ip:port | ||
-o | offset 值 | ||
-t | Topic 名稱 | ||
-a | 探測次數 | ||
sendMessage | 發送一條消息,可以根據配置發往特定 MessageQueue 或普通發送 | -s | 消息大小 |
-h | 打印幫助 | ||
-n | NameServer服務地址,格式 ip:port | ||
-t | Topic 名稱 | ||
-p | body 消息體 | ||
-k | keys | ||
-c | tags | ||
-b | BrokerName | ||
-i | queueId | ||
-h | 打印幫助 | ||
-n | NameServer服務地址,格式 ip:port | ||
-t | Topic 名稱 | ||
5)消費者、消費組相關
名稱 | 含義 | 命令選項 | 說明 |
---|---|---|---|
consumerProgress | 查看訂閱組消費狀態,可以查看具體的 client IP 的消息積累量 | -g | 消費者所屬組名 |
-s | 是否打印 client IP | ||
-h | 打印幫助 | ||
-n | NameServer服務地址,格式 ip:port | ||
consumerStatus | 查看消費者狀態,包括同一個分組中是否是相同的訂閱,分析 ProcessQueue 是否堆積,返回消費者 jstack 結果,內容較多,使用者參見 ConsumerStatusSubCommand | -h | 打印幫助 |
-n | NameServer服務地址,格式 ip:port | ||
-g | consumer group | ||
-i | clientId | ||
-s | 是否執行 jstack | ||
getConsumerStatus | 獲取 Consumer 消費進度 | -g | 消費者所屬組名 |
-t | 查詢主題 | ||
-i | Consumer 客戶端 IP | ||
-h | 打印幫助 | ||
-n | NameServer服務地址,格式 ip:port | ||
updateSubGroup | 更新或創建訂閱關系 | -h | 打印幫助 |
-n | NameServer服務地址,格式 ip:port | ||
-b | Broker 地址 | ||
-c | 集群名稱 | ||
-g | 消費者分組名稱 | ||
-s | 分組是否允許消費 | ||
-m | 是否從最小 offset 開始消費 | ||
-d | 是否是廣播模式 | ||
-q | 重試隊列數量 | ||
-r | 最大重試次數 | ||
-i | 當 slaveReadEnable 開啟時有效,且還未達到從 slave 消費時建議從哪個 Brokerld 消費,可以配置備機 id,主動從備機消費 | ||
-w | 如果 Broker 建議從 slave 消費,配置決定從哪個 slave 消費,配置 Brokerld,例如 | ||
6)連接相關
名稱 | 含義 | 命令選項 | 說明 |
---|---|---|---|
consumerConnection | 查詢 Consumer 的網絡連接 | -g | 消費者所屬組名 |
-n | NameServer服務地址,格式 ip:port | ||
-h | 打印幫助 | ||
-g | 生產者所屬組名 | ||
-t | 主題名稱 | ||
producerConnection | 查詢 Producer 的網絡連 | -n | NameServer服務地址,格式 ip:port |
-h | 打印幫助 | ||
7)NameServer 相關
名稱 | 含義 | 命令選項 | 說明 |
---|---|---|---|
updateKvConfig | 更新 NameServer 的 kv 配置,目前還未使用 | -s | 命名空間 |
-k | key | ||
-v | value | ||
-n | NameServer服務地址,格式 ip:port | ||
8)其它
名稱 | 含義 | 命令選項 | 說明 |
---|---|---|---|
startMonitoring | 開啟監控進程,監控消息誤刪,重試隊列消息數等 | -n | NameServer 服務地址,格式 ip:port |
-h | 打印幫助 |
3、使用 mqadmin 命令 注意事項:
- 幾乎所有命令都需要配置-n表示NameServer地址,格式為 ip:port。
- 幾乎所有命令都可以通過-h獲取幫助。
- 如果既有 Broker 地址(-b)配置項又有 clusterName(-c) 配置項,則優先以 Broker 地址執行命令;如果不配置 Broker 地址,則對集群中所有主機執行命令。
二、RocketMQ – console 集群監控平臺搭建
1、下載安裝 RocketMQ – console 監控平臺
RocketMQ 有一個對其擴展的開源項目 incubator-rocketmg-externals,這個項目中有一個子模塊叫 rocketmq-console,這個便是管
理控制臺項目了,先將 incubator-rocketmq-externals 拉到本地,因為我們需要自己對 rocketmq-console 進行編譯打包運行。
下載地址:https://github.com/apache/rocketmq-externals
rocketmq-externals-master.zip
git clone https://github.com/apache/rocketmq-externals
cd rocketmq-console
mvn clean package -Dmaven.test.skip=true
2、解壓即安裝
/rocketmq-externals-master/rocketmq-console/
3、修改配置文件:在 rocketmq-console 中配置 namesrv 集群地址:
rocketmq.config.namesrvAddr=192.168.25.135:9876;192.168.25.138:9876
4、打包: 在項目的 pom.xml 所在的目錄,打開 cmd 命令提示符
# 進入項目目錄
cd /rocketmq-externals-master/rocketmq-console/# 打包
mvn clean package -Dmaven.test.skip=true
5、打包完成: 在項目的 pom.xml 所在的目錄,會在 target 文件夾下,生成 rocketmq-console-ng-1.0.1.jar
將此 jar 包上傳到兩個虛擬機(192.168.25.135:9876;192.168.25.138:9876)的 /usr/local/rocketmq/ 下(/usr/soft/)。
6、啟動 rocketmq-console 控制臺
java -jar rocketmq-console-ng-1.0.0.jar
7、啟動成功后,通過瀏覽器訪問:http://localhost:8080 進入 控制臺界面
http://192.168.25.135:8080
http://192.168.25.138:8080
三、RocketMQ – 消息發送樣例介紹和步驟分析
1、消息發送樣例
1.1 導入 MQ 客戶端依賴
<dependency><groupid>org.apache.rocketmq</groupId><artifactid>rocketmq-client</artifactid><version>4.4.0</version>
</dependency>
1.2 消息發送者步驟分析
1.創建消息生產者 producer,并制定生產者組名2.指定 Nameserver 地址3.啟動 producer4.創建消息對象,指定主題 Topic、Tag 和消息體5.發送消息6.關閉生產者 producer。
1.3 消息消費者步驟分析
1.創建消費者 Consumer,制定消費者組名2.指定 Nameserver 地址3.訂閱主題 Topic 和 Tag4.設置回調函數,處理消息5.啟動消費者 consumer。
2、打開 idea,創建 rocketmq_demo 的 maven 工程。
--> idea --> File --> New --> Project --> Maven Project SDK: ( 1.8(java version "1.8.0_131" ) --> Next --> Groupld : ( djh.it )Artifactld : ( rocketmq_demo )Version : 1.0-SNAPSHOT--> Name: ( rocketmq_demo )Location: ( \rocketmq_demo\ ) --> Finish
3、在工程 rocketmq_demo (模塊)中的 pom.xml 中導入依賴
<dependency><groupid>org.apache.rocketmq</groupId><artifactid>rocketmq-client</artifactid><version>4.4.0</version>
</dependency>
4、在工程 rocketmq_demo (模塊)中,創建包結構:
src/main/java/djh/it/mq/rocketmq/base/
src/main/java/djh/it/mq/rocketmq/batch/
src/main/java/djh/it/mq/rocketmq/delay/
src/main/java/djh/it/mq/rocketmq/filter/
src/main/java/djh/it/mq/rocketmq/order/
src/main/java/djh/it/mq/rocketmq/transaction/
四、RocketMQ – 發送同步消息
1、RocketMQ – 發送同步消息
發送同步消息,這種可靠性同步地發送方式使用的比較廣泛,比如:重要的消息通知、短信通知等。
2、在工程 rocketmq_demo (模塊)中,創建 發送同步消息類 SyncProducer.java
/*** rocketmq_demo\src\main\java\djh\it\mq\rocketmq\base\producer\SyncProducer.java** 2024-5-24 創建 發送同步消息類 SyncProducer.java*/
package djh.it.mq.rocketmq.base.producer;import org.apache.rocketmq.client.producer.DefaultMQProducer;public class SyncProducer {public static void main(String[] args) throws Exception {//1.創建消息生產者 producer,并制定生產者組名DefaultMQProducer producer = new DefaultMQProducer("group1");//2.指定 Nameserver 地址producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");//3.啟動 producerproducer.start();for(int i=0; i<10; i++){//4.創建消息對象,指定主題 Topic、Tag 和消息體//參數一:消息主題 Topic, 參數二:消息 Tag, 參數三:消息內容Message msg = new Message("base", "Tag1", ("Hello World"+i).getBytes());//5.發送消息SendResult result = producer.send(msg);//發送狀態SendStatus status = result.getSendStatus();//消息 ID //String msgId = result.getMsgId();//消息接受隊列 ID //int queueId = result.getMessageQueue().getQueueId();//System.out.println("發送狀態:"+result+", 消息ID"+msgId+", 隊列"+queueId);System.out.println("發送結果:"+result);TimeUnit.SECONDS.sleep(1); //線程睡1秒}//6.關閉生產者 producer。producer.shutdown(); }
}
3、運行測試類,進行測試。查看發送結果。
五、RocketMQ – 發送異步消息
1、RocketMQ – 發送異步消息
異步消息通常用在對響應時間敏感的業務場景,即發送端不能容忍長時間的等待 Broker 的響應。
2、在工程 rocketmq_demo (模塊)中,創建 發送異步消息類 AsyncProducer.java
/*** rocketmq_demo\src\main\java\djh\it\mq\rocketmq\base\producer\AsyncProducer.java** 2024-5-24 創建 發送異步消息類 AsyncProducer.java*/
package djh.it.mq.rocketmq.base.producer;import org.apache.rocketmq.client.producer.DefaultMQProducer;public class AsyncProducer {public static void main(String[] args) throws Exception {//1.創建消息生產者 producer,并制定生產者組名DefaultMQProducer producer = new DefaultMQProducer("group1");//2.指定 Nameserver 地址producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");//3.啟動 producerproducer.start();for(int i=0; i<10; i++){//4.創建消息對象,指定主題 Topic、Tag 和消息體//參數一:消息主題 Topic, 參數二:消息 Tag, 參數三:消息內容Message msg = new Message("base", "Tag2", ("Hello World"+i).getBytes());//5.發送 異步 消息producer.send(msg, new SendCallback(){//發送成功的回調函數public void onSuccess(SendResult sendResult){System.out.println("發送結果:"+sendResult);}//發送失敗的回調函數public void onException(Throwable e){System.out.println("發送異常:"+e);}});System.out.println("發送結果:"+result);TimeUnit.SECONDS.sleep(1); //線程睡1秒}//6.關閉生產者 producer。producer.shutdown(); }
}
3、運行測試類,進行測試。查看發送結果。
六、RocketMQ – 發送單向消息
1、RocketMQ – 發送單向消息
單向消息這種方式主要用在不特別關心發送結果的場景,例如日志發送。
2、在工程 rocketmq_demo (模塊)中,創建 發送單向消息類 OneWayProducer.java
/*** rocketmq_demo\src\main\java\djh\it\mq\rocketmq\base\producer\OneWayProducer.java** 2024-5-24 創建 發送單向消息類 OneWayProducer.java*/
package djh.it.mq.rocketmq.base.producer;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import java.util.concurrent.TimeUnit;public class OneWayProducer {public static void main(String[] args) throws Exception {//1.創建消息生產者 producer,并制定生產者組名DefaultMQProducer producer = new DefaultMQProducer("group1");//2.指定 Nameserver 地址producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");//3.啟動 producerproducer.start();for(int i=0; i<3; i++){//4.創建消息對象,指定主題 Topic、Tag 和消息體//參數一:消息主題 Topic, 參數二:消息 Tag, 參數三:消息內容Message msg = new Message("base", "Tag3", ("Hello World 單向消息"+i).getBytes());//5.發送 單向 消息(無返回值)producer.sendOneway(msg);TimeUnit.SECONDS.sleep(5); //線程睡5秒}//6.關閉生產者 producer。producer.shutdown(); }
}
3、運行測試類,進行測試。
七、RocketMQ – 消息消費基本流程
1、消息消費者步驟分析
1.創建消費者 Consumer,制定消費者組名2.指定 Nameserver 地址3.訂閱主題 Topic 和 Tag4.設置回調函數,處理消息5.啟動消費者 consumer。
2、在工程 rocketmq_demo (模塊)中,創建 消息消費類 Consumer.java
/*** rocketmq_demo\src\main\java\djh\it\mq\rocketmq\base\consumer\Consumer.java** 2024-5-24 創建 消息消費類 Consumer.java*/
package djh.it.mq.rocketmq.base.consumer;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import java.util.concurrent.TimeUnit;public class Consumer {public static void main(String[] args) throws Exception {//1.創建消費者 Consumer,制定消費者組名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");//2.指定 Nameserver 地址consumer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");//3.訂閱主題 Topic 和 Tag//consumer.subscribe("base", "Tag1"); //接收同步消息consumer.subscribe("base", "Tag2"); //接收異步消息前,可以讓先發送異步消息。//4.設置回調函數,處理消息consumer.registerMessageListener(new MessageListenerConcurrently(){//接受消息內容public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){//System.out.println(msgs); //接收到的消息是未轉換的字節碼for(MessageExt msg : msgs){System.out.println(new String(msg.getBody())); //轉換為字符串消息}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//5.啟動消費者 consumer。consumer.start();}
}
3、運行測試類,進行測試。
八、RocketMQ – 消費者廣播模式和負載均衡模式
1、RocketMQ – 消費者 廣播模式
消費者采用廣播的方式消費消息,每個消費者消費的消息都是相同的。
2、RocketMQ – 消費者 負載均衡模式
消費者采用負載均衡方式消費消息,多個消費者共同消費隊列消息,每個消費者處理的消息不同。
3、在工程 rocketmq_demo (模塊)中,修改 消息消費測試類 Consumer.java
/*** rocketmq_demo\src\main\java\djh\it\mq\rocketmq\base\consumer\Consumer.java** 2024-5-24 修改 消息消費類 Consumer.java 添加消費模式。*/
package djh.it.mq.rocketmq.base.consumer;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import java.util.concurrent.TimeUnit;public class Consumer {public static void main(String[] args) throws Exception {//1.創建消費者 Consumer,制定消費者組名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");//2.指定 Nameserver 地址consumer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");//3.訂閱主題 Topic 和 Tag//consumer.subscribe("base", "Tag1"); //接收同步消息//consumer.subscribe("base", "Tag1 | Tag2"); //接收同步消息 和 異步消息//consumer.subscribe("base", "*"); //接收所有消息consumer.subscribe("base", "Tag1"); //接收同步消息前,可以讓先發送同步消息。//添加消費模式 //consumer.setMessageModel(MessageModel.CLUSTERING); //默認是負載均衡模式消費consumer.setMessageModel(MessageModel.BROADCASTING); //廣播模式消費//4.設置回調函數,處理消息consumer.registerMessageListener(new MessageListenerConcurrently(){//接受消息內容public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){//System.out.println(msgs); //接收到的消息是未轉換的字節碼for(MessageExt msg : msgs){System.out.println(new String(msg.getBody())); //轉換為字符串消息}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//5.啟動消費者 consumer。consumer.start();}
}
4、運行測試類,進行測試。
上一節關聯鏈接請點擊:
# 全面解剖 消息中間件 RocketMQ-(2)