SpringBoot集成RocketMQ
首先依舊是引入依賴
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version>
</dependency>
然后就可以編寫發送不同類型消息的代碼了
package blossom.project.springbootkp.seckillproducer;import blossom.project.springbootkp.seckillproducer.entity.MsgModel;
import com.alibaba.fastjson.JSONObject;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;import java.util.Arrays;
import java.util.List;@SpringBootTest
class SecKillProducerApplicationTests {private List<MsgModel> msgModels = Arrays.asList(new MsgModel("qwer", 1L, "下單"),new MsgModel("qwer", 1L, "短信"),new MsgModel("qwer", 1L, "物流"),new MsgModel("zxcv", 2L, "下單"),new MsgModel("zxcv", 2L, "短信"),new MsgModel("zxcv", 2L, "物流"));@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Testvoid syncProducer() {rocketMQTemplate.syncSend("bootTestTopic","使用springboot集成rocketmq");}@Testvoid asyncProducer(){rocketMQTemplate.asyncSend("bootTestTopic", "發送一條異步消息", new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("發送成功");}@Overridepublic void onException(Throwable throwable) {System.out.println("發送失敗"+throwable.getMessage());}});}@Testvoid oneWayProducer(){rocketMQTemplate.sendOneWay("bootTestTopic","發送一個單向消息");}@Testvoid delayProducer(){Message<String> message = MessageBuilder.withPayload("這是一條延遲消息").build();rocketMQTemplate.syncSend("bootTestTopic",message,3000,2);}@Testvoid orderedProducer(){msgModels.forEach(x->{String s = JSONObject.toJSONString(x);rocketMQTemplate.syncSendOrderly("orderlyTopic", s,x.getOrderSn());});}@Testvoid tagProducer(){rocketMQTemplate.syncSend("bootTestTopic:tagA","我是一個帶標簽的消息");}@Testvoid keyProducer(){Message<String> message = MessageBuilder.withPayload("我是一個帶有key的消息").setHeader(RocketMQHeaders.KEYS, "testKey").build();rocketMQTemplate.syncSend("bootTestTopic",message);}
}
對于不同的消息類型,我們可以使用不同的方式去接收。
創建一個順序消息的監聽器
@Component
@RocketMQMessageListener(topic = "orderlyTopic",consumerGroup = "boot-orderly-consumer-group",consumeMode = ConsumeMode.ORDERLY, //順序消費模式 單線程maxReconsumeTimes = 5) //最大重試次數
public class OrderlyMessageListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {MsgModel msgModel = JSON.parseObject(new String(messageExt.getBody()), MsgModel.class);System.out.println(msgModel);}
}
普通的創建一個監聽器
@Component
@RocketMQMessageListener(topic = "bootTestTopic",consumerGroup = "boot-consumuer-group")
public class SimpleMessageListener implements RocketMQListener<MessageExt> {/*** 這個方法就是消費者方法* 這里的String就是消息內容* 這里的泛型就是這里的參數類型* 如果泛型指定了固定的類型 那么消息體就是我們的參數* 如果我們的類型設定為具體的類型 那么我們只能拿到消息體* 而如果我們把消息類型設定為MessageExt類型,那么我們可以拿到消息頭* ------------------------------------------------* 只要這個方法不報錯 就會直接完成消息的接收 而如果報錯了 就會重試* @param msg*/@Overridepublic void onMessage(MessageExt msg) {String keys = msg.getKeys();System.out.println("接收到的keys為"+keys);String body = new String(msg.getBody());System.out.println("接收到的消息體為"+body);}
}
創建一個識別tag標簽的監聽器
@RocketMQMessageListener(topic = "bootTestTopic",consumerGroup = "boot-tag-consumer-group",
selectorType = SelectorType.TAG, //tag過濾模式
selectorExpression = "tagA || tagB") //tag標簽匹配模式
public class TagMessageListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt msg) {System.out.println(new String(msg.getBody()));}
}