RocketMQ常用基本操作

文章中的rabbitmq使用的是rocketmq-all-5.1.3-bin-release版本,需要安裝包的可自行下載

RockerMQ啟動停止命令

啟動命令

nohup sh bin/mqnamesrv &

nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &

查看日志

tail -f ~/logs/rocketmqlogs/namesrv.log

tail -f ~/logs/rocketmqlogs/proxy.log

停止命令

sh bin/mqshutdown broker

sh bin/mqshutdown namesrv

集群狀態

sh mqadmin clusterList -n 127.0.0.1:9876

創建topic

sh mqadmin updateTopic?-n 127.0.0.1:9876?rocket_test

查看所有topic信息

sh mqadmin topicList -n 127.0.0.1:9876

sh mqadmin topicList -n 127.0.0.1:9876 -c

查看 Topic 路由信息

sh mqadmin topicRoute -n 127.0.0.1:9876 -t TopicTest

發送測試消息

export NAMESRV_ADDR=localhost:9876

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

消費消息

sh?bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

Java代碼收發消息

Producer

package com.rocket.demo;

import com.alibaba.fastjson.JSON;

import org.apache.rocketmq.client.producer.DefaultMQProducer;

import org.apache.rocketmq.client.producer.SendResult;

import org.apache.rocketmq.common.message.Message;

import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.HashMap;

import java.util.Map;

public class RocketProducerDemo {

????private final static String nameServer = "127.0.0.1:9876";

????private final static String producerGroup = "my_group2";

????// debezium-mysql-source-topic topic-test

????private final static String topic = "TopicTest";

????public static void main(String[] args) {

????????try {

????????????// 初始化一個producer并設置Producer group name

????????????DefaultMQProducer producer = new DefaultMQProducer(producerGroup);

// ???????????DefaultMQProducer producer = new DefaultMQProducer();

????????????// 設置NameServer地址

????????????producer.setNamesrvAddr(nameServer);

????????????// 啟動producer

????????????producer.start();

????????????for (int i = 0; i < 100; i++) {

????????????????Map<String, String> data = new HashMap();

????????????????data.put("id", i+"");

????????????????data.put("name", i+","+System.currentTimeMillis());

????????????????// 創建一條消息,并指定topic、tag、body等信息,tag可以理解成標簽,對消息進行再歸類,RocketMQ可以在消費端對tag進行過濾

????????????????Message msg = new Message("TopicTest", "tagA", JSON.toJSONString(data).getBytes(RemotingHelper.DEFAULT_CHARSET));

????????????????// 利用producer進行發送,并同步等待發送結果

????????????????SendResult sendResult = producer.send(msg, 10000);

????????????????System.out.println(sendResult);

????????????}

????????????// 一旦producer不再使用,關閉producer

????????????producer.shutdown();

????????} catch (Exception e) {

????????????e.printStackTrace();

????????}

????}

}

Consumer

package com.rocket.demo;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class RocketConsumerDemo {

????public static void main(String[] args) throws Exception {

????????DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_group");

????????consumer.setNamesrvAddr("localhost:9876");

????????// debezium-mysql-source-topic ?topic-test debezium-mysql-source db-history-debezium-topic debezium-mysql-source

????????consumer.subscribe("TopicTest", "*"); // 訂閱主題和標簽,* 表示訂閱所有標簽

????????consumer.registerMessageListener(new MessageListenerConcurrently() {

????????????@Override

????????????public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {

????????????????for (MessageExt message : messages) {

????????????????????System.out.println("Received message: " + new String(message.getBody()));

????????????????}

????????????????return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

????????????}

????????});

????????consumer.start();

????????System.out.println("Consumer started");

????}

}

常見問題

service not available now. It may be caused by one of the following reasons: the broker's disk is full [CL: 0.96 CQ: 0.96 INDEX: 0.96], messages are put to the slave, message store has been shut down

錯誤原因:博主測試的服務器磁盤使用率到0.96了,rocketmq不允許磁盤超過0.9,清理下磁盤數據即可

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/38563.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/38563.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/38563.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

多線程編程的挑戰與解決方案

多線程編程的挑戰與解決方案 大家好&#xff0c;我是免費搭建查券返利機器人省錢賺傭金就用微賺淘客系統3.0的小編&#xff0c;也是冬天不穿秋褲&#xff0c;天冷也要風度的程序猿&#xff01; 1. 多線程編程的挑戰 在現代軟件開發中&#xff0c;多線程編程成為處理并發任務…

PatchTST創新點

這篇論文的創新點主要集中在PatchTST模型的設計和應用中。以下是對其創新點的詳細說明&#xff1a; 創新點 頻道獨立補丁設計&#xff1a;PatchTST模型通過將多變量時間序列分割成不同的頻道&#xff0c;每個頻道作為單變量時間序列處理。每個頻道獨立地通過實例歸一化操作和補…

明星中藥企業系列洞察(九)一手好牌打的稀爛!近500年老字號鎖定退市,太安堂為何“塌房”了?

近日&#xff0c;太安堂發布公告稱&#xff0c;公司已收到深交所下發的《關于廣東太安堂藥業股份有限公司股票終止上市的決定》&#xff0c;深交所決定終止公司股票上市&#xff0c;預計其最后交易日期為7月4日。太安堂曾作為國內知名的中成藥上市公司之一&#xff0c;是國家級…

matlab仿真 通信信號和系統分析(上)

&#xff08;內容源自詳解MATLAB&#xff0f;SIMULINK 通信系統建模與仿真 劉學勇編著第三章內容&#xff0c;有興趣的讀者請閱讀原書&#xff09; 一、求離散信號卷積和 主要還是使用卷積函數conv&#xff0c;值得注意的是&#xff0c;得到的卷積和長度結果為81&#xff0…

node.js+uniapp(vue),阿里云短信驗證碼

reg.vue: 思路是&#xff1a;前端調用獲取驗證碼的接口 > 后端生成驗證碼返回給前端 > 前端渲染驗證碼 <template> <div> <input class"sl-input" v-model"phone" type"tel" maxlength"11" placeholder"手…

微信小程序畢業設計-微信食堂線上訂餐系統項目開發實戰(附源碼+論文)

大家好&#xff01;我是程序猿老A&#xff0c;感謝您閱讀本文&#xff0c;歡迎一鍵三連哦。 &#x1f49e;當前專欄&#xff1a;微信小程序畢業設計 精彩專欄推薦&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb; &#x1f380; Python畢業設計…

【在線評論】不同視角下在線評論對客戶滿意度和推薦度的影響—推文分析—2024-07-01

今天的推文主題是【在線評論】&#xff0c;重點關注可以關注第四篇&#xff0c;很全面地分析了在線評論的信息多維性。 第一篇從客戶的在線評論入手&#xff0c;將客戶消費的動機為功利、享受、社會滿足&#xff1b;第二篇是關于在線評論對消費者再次選擇同一家酒店的機制探索…

MySQL之主從同步、分庫分表

1、主從同步的原理 MySQL主從復制的核心是二進制日志 二進制日志&#xff08;binlog&#xff09;記錄了所有DDL語句和DML語句&#xff0c;但不包括數據查詢&#xff08;select、show&#xff09;語句。 1.1、復制分三步 master主庫在事務提交時&#xff0c;會把數據變更記錄…

電子戰學習筆記01:電子戰概論

0、寫在文前 本人在學習電子戰相關理論知識時&#xff0c;一直感覺無從下手&#xff0c;之后在老師的推薦下購買了《EW101&#xff1a;電子戰基礎》紙質書籍學習&#xff0c;所以將自己的學習筆記在CSDN上記錄一下&#xff0c;也供有需要的同學參考。 1、電子戰定義 電子戰&…

Spring Boot與Apache Kafka集成的深度指南

Spring Boot與Apache Kafka集成的深度指南 大家好&#xff0c;我是免費搭建查券返利機器人省錢賺傭金就用微賺淘客系統3.0的小編&#xff0c;也是冬天不穿秋褲&#xff0c;天冷也要風度的程序猿&#xff01; 在現代分布式系統中&#xff0c;消息隊列的作用愈發重要&#xff0…

【鴻蒙學習筆記】鴻蒙ArkTS學習筆記

應用開發導讀&#xff1a;https://developer.huawei.com/consumer/cn/doc/harmonyos-guides-V5/application-dev-guide-V5 【鴻蒙培訓】第&#xff11;天?環境安裝 【鴻蒙培訓】第&#xff12;天?裝飾器?組件和頁面生命周期 【鴻蒙學習筆記】數據類型 【鴻蒙學習筆記】運算…

Spring Cloud中的服務發現與注冊

Spring Cloud中的服務發現與注冊 大家好&#xff0c;我是免費搭建查券返利機器人省錢賺傭金就用微賺淘客系統3.0的小編&#xff0c;也是冬天不穿秋褲&#xff0c;天冷也要風度的程序猿&#xff01;今天我們將探討Spring Cloud中的服務發現與注冊&#xff0c;這是微服務架構中至…

全網最詳細的 gin框架請求數據綁定Bind 源碼解析 -- 幫助你全面了解gin框架的請求數據綁定原理和方法

在gin框架中&#xff0c;我們可以將多種請求數據&#xff08;json, form,uri&#xff0c;header等&#xff09;直接綁定到我們定義的結構體&#xff0c;底層是通過反射方式獲取我們定義在結構體上面的tag來實現請求數據到我們的結構體數據的綁定的。 在gin的底層有2大體系的數據…

Python pip install模塊時C++編譯環境問題

pip install模塊時C編譯環境問題 在接觸和使用python后&#xff0c;常常會通過pip install命令安裝第三方模塊&#xff0c;大多數模塊可以直接安裝&#xff0c;但許多新同學仍會遇見某些模塊需要實時編譯后才能安裝&#xff0c;如報錯信息大概是缺乏C編譯環境&#xff0c;本文則…

【Elasticsearch】Elasticsearch索引創建與管理詳解

文章目錄 &#x1f4d1;引言一、Elasticsearch 索引的基礎概念二、創建索引2.1 使用默認設置創建索引2.2 自定義設置創建索引2.3 創建索引并設置映射 三、索引模板3.1 創建索引模板3.2 使用索引模板創建索引 四、管理索引4.1 查看索引4.2 更新索引設置4.3 刪除索引 五、索引別名…

Go-知識測試-性能測試

Go-知識測試-性能測試 1. 定義2. 例子3. testing.common 測試基礎數據4. testing.TB 接口5. 關鍵函數5.1 testing.runBenchmarks5.2 testing.B.runN5.3 testing.B.StartTimer5.4 testing.B.StopTimer5.5 testing.B.ResetTimer5.6 testing.B.Run5.7 testing.B.run15.8 testing.B…

監聽藍牙對話的BlueSpy技術復現

本文是之前文章的BlueSpy技術的復現過程&#xff1a;https://mp.weixin.qq.com/s/iCeImLLPAwwKH1avLmqEpA 2個月前&#xff0c;網絡安全和情報公司Tarlogic在西班牙安全大會RootedCon 2024上提出了一項利用藍牙漏洞的BlueSpy技術&#xff0c;并在之后發布了一個名為BlueSpy的概…

深度學習之生成對抗網絡StyleGAN3

StyleGAN3 是由 NVIDIA 團隊提出的第三代生成對抗網絡(GAN),在前代 StyleGAN 和 StyleGAN2 的基礎上進行了改進,以實現更高質量的圖像生成。StyleGAN3 的主要改進在于解決了 StyleGAN2 中存在的偽影(artifacts)問題,并且提升了生成圖像的一致性和穩定性。 StyleGAN3 的…

git 提交代碼忽略eslint代碼檢測

在暫存代碼的時候會出現以上情況因為在提交代碼的時候會默認運行代碼進行檢測&#xff0c;如果不符合代碼規范就會進行報錯 解決&#xff1a; 使用 git commit --no-verify -m xxx 忽略eslint的檢測

Laravel 謹慎使用Storage::append()

在 driver 為 local 時&#xff0c;Storage::append()在高并發下&#xff0c;會存在丟失數據問題&#xff0c;文件被覆寫&#xff0c;而非尾部添加&#xff0c;如果明確是本地文件操作&#xff0c;像日志寫入&#xff0c;建議使用 Illuminate\Filesystem\Filesystem或者php原生…