環境
jdk1.8, springboot2.7.3
Maven依賴
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.3</version><relativePath/> <!-- lookup parent from repository --></parent> <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version><exclusions><exclusion><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.5</version></dependency>
配置文件
rocketmq.name-server=192.168.6.128:9876
#生產通用群組, 也可單獨指定
rocketmq.producer.group=springBootGroup
#消費通用群組, 也可單獨指定
rocketmq.consumer.group=testGroup
server.port=9000
代碼
生產者發送消息
@RestController
@RequestMapping("/producer")
public class ProducerController {@Autowiredprivate ProducerService producerService;// 發送同步消息@PostMapping("/sendSync")public Object sendSync(@RequestBody MessageReq req) {return producerService.sendSyncMessage(req.getTopic(), req.getTag(), req.getMessage());}// 發送異步消息@PostMapping("/sendAsync")public Object sendAsyncMessage(@RequestBody MessageReq req) {producerService.sendAsyncMessage(req.getTopic(), req.getTag(), req.getMessage());return "200";}
}
@Service
public class ProducerService {@Autowiredprivate RocketMQTemplate rocketMQTemplate;/*** 發送同步消息.* @return 發送結果*/public SendResult sendSyncMessage(String topic, String tag, String message) {// param1: topic和tag冒號分隔return rocketMQTemplate.syncSend(topic + ":" + tag, message);}/*** 發送異步消息.*/public void sendAsyncMessage(String topic, String tag, String message) {rocketMQTemplate.convertAndSend(topic + ":" + tag, message);}
}
消費者
@Component
@RocketMQMessageListener(consumerGroup = "SimpleStringConsumerGroup", // consumerGroup:消費者組名topic = "MQ_sp_test1", // topic:訂閱的主題selectorExpression = "Tag-kk||Tag-kk2", // selectorExpression, 1. 根據Tag過濾, 多個用||分割, 也可設置*; 2. 根據SQL92語法過濾
// selectorExpression = "*",
// selectorType = SelectorType.SQL92, // 設置SQL92語法過濾, 不設置默認TAGmessageModel = MessageModel.CLUSTERING, // messageModel: 控制消息模式。MessageModel.CLUSTERING:負載均衡;MessageModel.BROADCASTING:廣播模式consumeMode= ConsumeMode.CONCURRENTLY // CONCURRENTLY: 無序消費; ORDERLY: 有序消費
)
public class SimpleConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("Received message : "+ message);}
}
測試
同步消息
異步消息?
TAG過濾消息
1. 消費者指定了TAG, 不滿足的不會消費, 狀態是CONSUMED_BUT_FILTERED