package com.ldj.rocketmq.producer;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;import java.nio.charset.StandardCharsets;/*** User: ldj* Date: 2024/3/26* Time: 2:26* Description: 單向消息生產者 簡單場景:如發送日志*/
public class OneWayProducer {public static void main(String[] args) {String msg = "我要和你來一場浪漫之旅!";DefaultMQProducer producer = new DefaultMQProducer("produce-group1");producer.setNamesrvAddr("192.168.208.190:9876;192.168.208.191:9876;192.168.208.192:9876");try {producer.start();producer.sendOneway(new Message("love_letter", "appointment", msg.getBytes(StandardCharsets.UTF_8)));System.out.println("消息已發送");} catch (Exception e) {e.printStackTrace();} finally {producer.shutdown();}}
}
package com.ldj.rocketmq.consumer;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import java.util.List;/*** User: ldj* Date: 2024/3/26* Time: 2:36* Description: 推送消費*/
public class PushConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group1");consumer.setNamesrvAddr("192.168.208.190:9876;192.168.208.191:9876;192.168.208.192:9876");//接收Topic 'love_letter'所有消息//consumer.subscribe("love_letter", "*");//只接收接收Topic 'love_letter'下的 'appointment'標簽的消息consumer.subscribe("love_letter", "appointment");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.registerMessageListener(( List<MessageExt> msgs,ConsumeConcurrentlyContext context)->{msgs.forEach(msg->{System.out.println(new String(msg.getBody()));});return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});//設置消息消費模式: 集群模式(輪詢:你一個我一個 || 公平:分發能者多勞)、廣播模式 (每個消費者都消費全量消息)//consumer.setMessageModel(MessageModel.BROADCASTING);consumer.setMessageModel(MessageModel.CLUSTERING);consumer.start();System.out.println("消費者準備就緒");}
}