1,引用jar包
build.gradle文件添加jar包引用
compile group: 'org.apache.rocketmq', name: 'rocketmq-spring-boot-starter', version: '2.1.1'
2,配置文件
application.properties 配置文件
spring.application.name=app-demp
server.port=8081
###rocketmq###
rocketmq.name-server=192.168.1.107:9876
rocketmq.producer.timeout=10000
3,生成者
MQSender.java - 消息發生接口
import org.apache.rocketmq.client.producer.SendResult;
public interface MQSender{
/**
* 發送消息
*
* @param message 消息信息
* @param topic 主題
* @return 發送結果
*/
SendResult sendMessage(Object message, String topic);
/**
* 發送消息
*
* @param message 消息信息
* @param topic 主題
* @param tags 主題的標簽
* @return 發送結果
*/
SendResult sendMessage(Object message, String topic, String tags);
}
RocketMQSender.java - RockemtMQ實現
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Slf4j
@Component
public class RocketMQSender implements MQSender {
@Value("${rocketmq.name-server}")
private String nameServer;
@Value("${rocketmq.producer.timeout}")
private int timeout;
@Value("${spring.application.name}")
private String group;
private DefaultMQProducer producer;
@PostConstruct
public void init() {
producer = new DefaultMQProducer(group); try {
producer.setNamesrvAddr(nameServer); producer.setSendMsgTimeout(timeout); producer.start(); log.info("RocketMQ Producer啟動成功! nameServer={}, group={}", nameServer, group);
} catch (MQClientException e) {
log.error("RocketMQ Producer啟動失敗! nameServer={}, group={} ", nameServer, group, e);
} } @Override
public SendResult sendMessage(Object message, String topic) {
try {
Message msg = new Message(topic, JSON.toJSONBytes(message)); SendResult sendResult = producer.send(msg); log.info("發送MQ成功:sendResult={},message={}", sendResult, message.toString());
return sendResult;
} catch (Exception e) {
log.error("消息發送失敗, topic:{}, message:{}", topic, message, e);
} return null;
} @Override
public SendResult sendMessage(Object message, String topic, String tags) {
try {
Message msg = new Message(topic, tags, JSON.toJSONBytes(message)); SendResult sendResult = producer.send(msg); log.info("發送MQ成功:sendResult={},message={}", sendResult, message.toString());
return sendResult;
} catch (Exception e) {
log.error("消息發送失敗, topic:{}, tags:{}, message:{}", topic, tags, message, e);
} return null;
}}
OrderProducer.java - 發送者實例
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
public class OrderProducer {
@Resource
private MQSender mqSender;
public void createOrder() {
mqSender.sendMessage("我是注冊訂單,請盡快處理", "TEMP");
}}
4,消費者
OrderConsumer.java
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "TEMP-GROUP", topic = "TEMP")
public class OrderConsumer implements RocketMQListener {
@Override
public void onMessage(MessageExt messageExt) {
String message = new String(messageExt.getBody());
log.info("收到消息,topic:{}, tag:{}, msgId:{}, body:{}", messageExt.getTopic(), messageExt.getTags(),
messageExt.getMsgId(), message); }}
發送者執行結果
消費者執行結果