文章中的rabbitmq使用的是rocketmq-all-5.1.3-bin-release版本,需要安裝包的可自行下載
RockerMQ啟動停止命令
啟動命令
nohup sh bin/mqnamesrv &
nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &
查看日志
tail -f ~/logs/rocketmqlogs/namesrv.log
tail -f ~/logs/rocketmqlogs/proxy.log
停止命令
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
查看集群狀態
sh mqadmin clusterList -n 127.0.0.1:9876
創建topic
sh mqadmin updateTopic?-n 127.0.0.1:9876?rocket_test
查看所有topic信息
sh mqadmin topicList -n 127.0.0.1:9876
sh mqadmin topicList -n 127.0.0.1:9876 -c
查看 Topic 路由信息
sh mqadmin topicRoute -n 127.0.0.1:9876 -t TopicTest
發送測試消息
export NAMESRV_ADDR=localhost:9876
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
消費消息
sh?bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
Java代碼收發消息
Producer
package com.rocket.demo;
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.HashMap;
import java.util.Map;
public class RocketProducerDemo {
????private final static String nameServer = "127.0.0.1:9876";
????private final static String producerGroup = "my_group2";
????// debezium-mysql-source-topic topic-test
????private final static String topic = "TopicTest";
????public static void main(String[] args) {
????????try {
????????????// 初始化一個producer并設置Producer group name
????????????DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
// ???????????DefaultMQProducer producer = new DefaultMQProducer();
????????????// 設置NameServer地址
????????????producer.setNamesrvAddr(nameServer);
????????????// 啟動producer
????????????producer.start();
????????????for (int i = 0; i < 100; i++) {
????????????????Map<String, String> data = new HashMap();
????????????????data.put("id", i+"");
????????????????data.put("name", i+","+System.currentTimeMillis());
????????????????// 創建一條消息,并指定topic、tag、body等信息,tag可以理解成標簽,對消息進行再歸類,RocketMQ可以在消費端對tag進行過濾
????????????????Message msg = new Message("TopicTest", "tagA", JSON.toJSONString(data).getBytes(RemotingHelper.DEFAULT_CHARSET));
????????????????// 利用producer進行發送,并同步等待發送結果
????????????????SendResult sendResult = producer.send(msg, 10000);
????????????????System.out.println(sendResult);
????????????}
????????????// 一旦producer不再使用,關閉producer
????????????producer.shutdown();
????????} catch (Exception e) {
????????????e.printStackTrace();
????????}
????}
}
Consumer
package com.rocket.demo;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class RocketConsumerDemo {
????public static void main(String[] args) throws Exception {
????????DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_group");
????????consumer.setNamesrvAddr("localhost:9876");
????????// debezium-mysql-source-topic ?topic-test debezium-mysql-source db-history-debezium-topic debezium-mysql-source
????????consumer.subscribe("TopicTest", "*"); // 訂閱主題和標簽,* 表示訂閱所有標簽
????????consumer.registerMessageListener(new MessageListenerConcurrently() {
????????????@Override
????????????public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
????????????????for (MessageExt message : messages) {
????????????????????System.out.println("Received message: " + new String(message.getBody()));
????????????????}
????????????????return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
????????????}
????????});
????????consumer.start();
????????System.out.println("Consumer started");
????}
}
常見問題
service not available now. It may be caused by one of the following reasons: the broker's disk is full [CL: 0.96 CQ: 0.96 INDEX: 0.96], messages are put to the slave, message store has been shut down
錯誤原因:博主測試的服務器磁盤使用率到0.96了,rocketmq不允許磁盤超過0.9,清理下磁盤數據即可