依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>java_sc_alibaba</artifactId><groupId>jkw.life</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>test-rocketmq8009</artifactId><properties><maven.compiler.source>17</maven.compiler.source><maven.compiler.target>17</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><!-- rocketmq--><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version></dependency><!-- SpringMVC--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency></dependencies></project>
application.yml
server:port: 8009
spring:application:name: test-rocketmq8009
rocketmq:# nameserver地址name-server: 192.168.66.101:9876producer:# 生產組group: my-group1# 發送消息超時時間send-message-timeout: 300000
demo:rocketmq:topic: testtopicconsumer: test_customer
啟動類
package jkw;import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;@Slf4j
@SpringBootApplication
public class Main8009 {public static void main(String[] args) {SpringApplication.run(Main8009.class, args);log.info("************** 服務提供者 8009 啟動成功 ************");}
}
RocketMQ SpringBoot 3.0不兼容解決方案
我們要在resources文件夾中,新建META-INF/spring文件夾,在里面新建一個叫 org.springframework.boot.autoconfigure.AutoConfiguration.imports 的文件里面填入 org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration
創建topic
mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t testtopic
生產者服務
package jkw.service;import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@Service
public class MessageProduce {@Autowiredprivate RocketMQTemplate rocketMQTemplate; // 直接注入生產者@Value("${demo.rocketmq.topic}")private String topic;/*** 發送消息** @param message* @return*/public SendResult sendMessage(String message) {return rocketMQTemplate.syncSend(topic, message);}
}
生產者控制器
package jkw.controller;import jkw.service.MessageProduce;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class RocketmqProduceCon {@Autowiredprivate MessageProduce messageProduce;/*** 發送消息** @param message* @return*/@GetMapping("/send")public SendResult sendMessage(String message) {return messageProduce.sendMessage(message);}
}
消費者服務
package jkw.service;import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;/*** 消費者主要使用RocketMQMessageListener接口進行監聽配置*/
@Service
@RocketMQMessageListener(//主題topic = "${demo.rocketmq.topic}",//消費組consumerGroup = "${demo.rocketmq.consumer}",// 過濾方式:默認為Tag過濾selectorType = SelectorType.TAG,// 過濾值:默認為全部消費,不過濾【*】selectorExpression = "*",// 消費模式:順序消費ORDERLY,并發消費CONCURRENTLYconsumeMode = ConsumeMode.ORDERLY,// 消息模式:有集群消費CLUSTERING,廣播消費messageModel = MessageModel.CLUSTERING)
public class MessageConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {System.out.println(s);}
}
測試
1.發送消息:http://localhost:8009/send?message=test
2.idea的控制臺查看監控后輸出的內容