一、發送 & 接收單向消息
1.1、概述
? ? ? ? 發送單向消息,適用于發送方不關心或者不在意消息的發送結果,這種方式的吞吐量很大,但是存在消息丟失的風險,對于重要消息要慎用!該種方式通常適用于對消息沒有那么嚴格的場景中,例如日志信息的采集。
1.2、Demo03MQTestApp?
/*** @Author : 一葉浮萍歸大海* @Date: 2023/12/25 10:12* @Description: 發送 & 接收單向消息*/
@Slf4j
public class Demo03MQTestApp {/*** 發送同步消息*/@Testpublic void demo3Producer() throws Exception {// 1、創建一個生產者DefaultMQProducer producer = new DefaultMQProducer("oneway-producer-group");// 2、連接NameServerproducer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);// 3、啟動producer.start();// 4、創建消息Message message = new Message("oneway-topic", "這是一個單向消息)".getBytes(StandardCharsets.UTF_8));// 5、發送消息producer.sendOneway(message);log.info("【demo3Producer】發送消息成功!");// 6、關閉生產者producer.shutdown();}/*** 接收單向消息(Push方式)*/@Testpublic void demo3PushConsumer() throws Exception {// 1、創建一個消費者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("oneway-consumer-group");// 2、連接NameServerconsumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);// 3、訂閱消息,*表示訂閱該主題所有的消息consumer.subscribe("oneway-topic", "*");// 4、設置監聽器(采用異步回調方式,一直監聽)consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {// 業務邏輯log.info("我是消費者【demo3PushConsumer】");for (MessageExt msg : msgs) {log.info("我是消費者【demo3PushConsumer】,我收到的消息是:{},當前時間:{}", StrUtil.utf8Str(msg.getBody()), LocalDateTimeUtil.format(LocalDateTime.now(),"yyyy-MM-dd HH:mm:ss"));}/*** 返回值:消費消息成功與否* CONSUME_SUCCESS:表明消費成功,消息會從MQ出隊* RECONSUME_LATER:表明消費失敗,消息會重新回到隊里,過一會兒再重新投遞出來給當前消費者或者其他消費者*/return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 5、啟動consumer.start();log.info("【demo3PushConsumer】啟動成功,正在等待接收消息...");// 6、掛起當前JVMSystem.in.read();}}
1.3、測試
先后運行demo3PushConsumer和demo3Producer,觀察控制臺日志: