一、導入相關依賴
????????在項目中引入MQ客戶端依賴,依賴版本最好和RocketMQ版本一致。
<!-- rocket客戶端--><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>${rocketmq.version}</version></dependency>
二、消息發送
????????首先介紹一下消息發送的大致流程,當我們調用消息發送方法時該方法會先對待發送消息進行前置驗證,如果消息主題和消息內容均沒有問題的話,就會根據消息主題(Topic)去獲取路由信息,即消息主題對應的隊列,broker,broker的ip和端口信息,然后選擇一條隊列發送消息,成功的話返回發送成功,失敗的話會根據我們設置的重試次數進行重新發送,單向消息發送不會進行失敗重試。
1、RocketMQ初始化
? ? ? ? RocketMQ配置類案例:
package com.example.framework.mq.config;import cn.hutool.core.thread.ThreadUtil;
import com.example.framework.mq.handler.MQHandler;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.Map;
import java.util.concurrent.ExecutorService;/*** MQ配置*/
@Configuration
public class MQConfig {@Value("${spring.application.name:application}")private String groupName;//集群名稱,這邊以應用名稱作為集群名稱/***************************消息消費者***************************/@Autowiredprivate Map<String, MQHandler> mqHandlerMap;// 消費者nameservice地址@Value("${rocketmq.consumer.namesrvAddr:127.0.0.1:9876}")private String cNamesrvAddr;// 最小線程數@Value("${rocketmq.consumer.consumeThreadMin:20}")private int consumeThreadMin;// 最大線程數@Value("${rocketmq.consumer.consumeThreadMax:64}")private int consumeThreadMax;// 消費者監聽主題,多個主題以分號隔開(topic~tag;topic~tag)@Value("${rocketmq.consumer.topics:test~*}")private String topics;// 一次消費消息的條數,默認為1條@Value("${rocketmq.consumer.consumeMessageBatchMaxSize:1}")private int consumeMessageBatchMaxSize;@Beanpublic DefaultMQPushConsumer getRocketMQConsumer() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);consumer.setNamesrvAddr(cNamesrvAddr);consumer.setConsumeThreadMin(consumeThreadMin);consumer.setConsumeThreadMax(consumeThreadMax);consumer.registerMessageListener(getMessageListenerConcurrently());// 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費,如果非第一次啟動,那么按照上次消費的位置繼續消費consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);// 設置消費模型,集群還是廣播,默認為集群//consumer.setMessageModel(MessageModel.CLUSTERING);// 設置一次消費消息的條數,默認為1條consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);try {// 設置該消費者訂閱的主題和tag,如果是訂閱該主題下的所有tag,則tag使用*;如果需要指定訂閱該主題下的某些tag,則使用||分割,例如tag1||tag2||tag3String[] topicTagsArr = topics.split(";");for (String topicTags : topicTagsArr) {String[] topicTag = topicTags.split("~");consumer.subscribe(topicTag[0],topicTag[1]);}consumer.start();}catch (Exception e){throw new Exception(e);}return consumer;}// 并發消息偵聽器(如果對順序消費有需求則使用MessageListenerOrderly 有序消息偵聽器)@Beanpublic MessageListenerConcurrently getMessageListenerConcurrently() {return new MQListenerConcurrently(mqHandlerMap);}/***************************消息生產者***************************/// 事務消息監聽器@Autowiredprivate MQTransactionListener mqTransactionListener;// 生產者nameservice地址@Value("${rocketmq.producer.namesrvAddr:127.0.0.1:9876}")private String pNamesrvAddr;// 消息最大大小,默認4M@Value("${rocketmq.producer.maxMessageSize:4096}")private Integer maxMessageSize ;// 消息發送超時時間,默認3秒@Value("${rocketmq.producer.sendMsgTimeout:30000}")private Integer sendMsgTimeout;// 消息發送失敗重試次數,默認2次@Value("${rocketmq.producer.retryTimesWhenSendFailed:2}")private Integer retryTimesWhenSendFailed;// 執行任務的線程池private static ExecutorService executor = ThreadUtil.newExecutor(32);//普通消息生產者@Bean("default")public DefaultMQProducer getDefaultMQProducer() {DefaultMQProducer producer = new DefaultMQProducer(this.groupName);producer.setNamesrvAddr(this.pNamesrvAddr);producer.setMaxMessageSize(this.maxMessageSize);producer.setSendMsgTimeout(this.sendMsgTimeout);producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);try {producer.start();} catch (MQClientException e) {System.out.println(e.getErrorMessage());}return producer;}//事務消息生產者(rocketmq支持柔性事務)@Bean("transaction")public TransactionMQProducer getTransactionMQProducer() {//初始化事務消息基本與普通消息生產者一致TransactionMQProducer producer = new TransactionMQProducer("transaction_" + this.groupName);producer.setNamesrvAddr(this.pNamesrvAddr);producer.setMaxMessageSize(this.maxMessageSize);producer.setSendMsgTimeout(this.sendMsgTimeout);producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);//添加事務消息處理線程池producer.setExecutorService(executor);//添加事務消息監聽producer.setTransactionListener(mqTransactionListener);try {producer.start();} catch (MQClientException e) {System.out.println(e.getErrorMessage());}return producer;}
}
2、發送消息
????????消息發送根據消息功能主要分為普通消息、事務消息、順序消息、延時消息等!特別說明一下事務消息多用于保證多服務模塊間的事務一致性,事務消息發送后并不會直接通知消費者消費消息,而是會先生成一個半消息,會先進入事務消息監聽器中,確保該消息事務提交成功后才會向broker發送消息,從而被消費者獲取并進行消費。
????????根據發送方式可以分為同步消息,異步消息和單向消息等:
- 同步消息常用于比較重要的消息發送,需要等待broker響應告知消息發送狀態。
- 異步消息的話常用于對響應時間敏感,需要快速返回的模塊,我們會設置一個回調代碼塊去異步監聽Borker的響應。
- 單向消息的話主要用于對發送結果不敏感,不會影響業務的模塊,無需監聽broker響應,常用于日志發送等模塊。
? ? ? ? 下面代碼演示四種消息的使用方式:
package com.example.order.service.impl;import com.alibaba.fastjson.JSON;
import com.example.framework.utils.SonwflakeUtils;
import com.example.order.entity.Order;
import com.example.order.mapper.OrderMapper;
import com.example.order.service.OrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;/*** 服務實現類*/
@Service
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements OrderService {@Qualifier("default")@Autowiredprivate DefaultMQProducer producer;@Autowiredprivate TransactionMQProducer transactionMQProducer;/*** 添加訂單(發送消息積分模塊同步添加積分)* @param order 訂單信息* @return org.apache.rocketmq.client.producer.TransactionSendResult**/@Overridepublic Order addOder(Order order) {order.setOrderId(SonwflakeUtils.get().id());if (order.getMessageType() == 1) {//普通消息this.save(order);Message message = new Message("points", "default", JSON.toJSONString(order).getBytes());try {//同步消息SendResult sendResult = producer.send(message);System.out.println("發送狀態:" + sendResult.getSendStatus() +",消息ID:" + sendResult.getMsgId() +",隊列:" + sendResult.getMessageQueue().getQueueId());
// producer.sendOneway(message);//單向消息
//----------------------------異步消息-----------------------------------
// producer.send(message, new SendCallback() {
// @Override
// public void onSuccess(SendResult sendResult) {
//
// }
//
// @Override
// public void onException(Throwable throwable) {
//
// }
// });} catch (RemotingException | MQBrokerException | InterruptedException | MQClientException e) {e.printStackTrace();}} else {//事務消息Message message = new Message("points", "transaction", JSON.toJSONString(order).getBytes());try {transactionMQProducer.sendMessageInTransaction(message, null);} catch (MQClientException e) {e.printStackTrace();}}return order;}
}
3、消息消費
????????邊對MessageListenerConcurrently有進行一定封裝,主要是為了在消息處理時通過注解定位消息Topic和tag而自動選擇對應的消息處理類進行業務處理;封裝代碼如下:
package com.example.framework.mq.config;import cn.hutool.core.util.StrUtil;
import com.example.framework.mq.annotation.MQHandlerActualizer;
import com.example.framework.mq.handler.MQHandler;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.CollectionUtils;import java.util.Arrays;
import java.util.List;
import java.util.Map;/*** 并發消息監聽器*/
public class MQListenerConcurrently implements MessageListenerConcurrently {@Autowiredprivate Map<String, MQHandler> mqHandlerMap;public MQListenerConcurrently(Map<String, MQHandler> mqHandlerMap) {this.mqHandlerMap = mqHandlerMap;}@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {if(CollectionUtils.isEmpty(list)){System.out.println("接受到的消息為空,不處理,直接返回成功");return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}MessageExt messageExt = list.get(0);// 判斷該消息是否重復消費(RocketMQ不保證消息不重復,如果你的業務需要保證嚴格的不重復消息,需要你自己在業務端去重)// 獲取該消息重試次數int reconsume = messageExt.getReconsumeTimes();if(reconsume ==3){//消息已經重試了3次,需做告警處理,已經相關日志return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}// 處理對應的業務邏輯String topic = messageExt.getTopic();String tags = messageExt.getTags();System.out.println("接受到的消息主題為:" + topic + "; tag為:" + tags);MQHandler mqMsgHandler = null;//獲取消息處理類中的topic和tag注解,根據topic和tag進行策略分發出來具體業務for (Map.Entry<String, MQHandler> entry : mqHandlerMap.entrySet()) {MQHandlerActualizer msgHandlerActualizer = entry.getValue().getClass().getAnnotation(MQHandlerActualizer.class);if (msgHandlerActualizer == null) {//非消息處理類continue;}String annotationTopic = msgHandlerActualizer.topic();if (!StrUtil.equals(topic,annotationTopic)) {//非該主題處理類continue;}String[] annotationTags = msgHandlerActualizer.tags();if(StrUtil.equals(annotationTags[0],"*")){//獲取該實例mqMsgHandler = entry.getValue();break;}boolean isContains = Arrays.asList(annotationTags).contains(tags);if(isContains){//注解類中包含tag則獲取該實例mqMsgHandler = entry.getValue();break;}}if (mqMsgHandler == null) {return ConsumeConcurrentlyStatus.RECONSUME_LATER;}ConsumeConcurrentlyStatus status = mqMsgHandler.handle(tags,messageExt);// 如果沒有return success,consumer會重新消費該消息,直到return successreturn status;}
}
????????事務消息監聽器封裝:
package com.example.framework.mq.config;import cn.hutool.core.util.StrUtil;
import com.example.framework.mq.annotation.MQHandlerActualizer;
import com.example.framework.mq.handler.MQTransactionHandler;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;import java.util.Arrays;
import java.util.Map;public class MQTransactionListener implements TransactionListener {@Autowiredprivate Map<String, MQTransactionHandler> mqTransactionHandlerMap;@Overridepublic LocalTransactionState executeLocalTransaction(Message message, Object o) {MQTransactionHandler mqTransactionHandler = getListenner(message.getTopic(),message.getTags());return mqTransactionHandler.executeLocalTransaction(message,o);}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt messageExt) {MQTransactionHandler mqTransactionHandler = getListenner(messageExt.getTopic(),messageExt.getTags());return mqTransactionHandler.checkLocalTransaction(messageExt);}private MQTransactionHandler getListenner(String topic,String tags) {MQTransactionHandler mqTransactionHandler = null;for (Map.Entry<String, MQTransactionHandler> entry : mqTransactionHandlerMap.entrySet()) {MQHandlerActualizer msgHandlerActualizer = entry.getValue().getClass().getAnnotation(MQHandlerActualizer.class);if (msgHandlerActualizer != null) {String annotationTopic = msgHandlerActualizer.topic();String[] annotationTags = msgHandlerActualizer.tags();if (!StrUtil.equals(topic,annotationTopic)) {//非該主題處理類continue;}if(StrUtil.equals(annotationTags[0],"*")){//獲取該實例mqTransactionHandler = entry.getValue();break;}boolean isContains = Arrays.asList(annotationTags).contains(tags);if(isContains){//注解類中包含tag則獲取該實例mqTransactionHandler = entry.getValue();break;}}}return mqTransactionHandler;}
}
????????使用注解@MQHandlerActualizer標明該消息處理類的主題,默認監聽所有tag,如果需要對tag監聽進行分類,后面加上tag即可。消息監聽器在收到消息后會自動調用主題對應的處理類進行業務處理,示例如下:
package com.example.points.mqHandler;import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.example.framework.mq.annotation.MQHandlerActualizer;
import com.example.framework.mq.handler.MQHandler;
import com.example.framework.utils.SonwflakeUtils;
import com.example.points.entity.Points;
import com.example.points.service.PointsService;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;import java.util.Map;/***/
@MQHandlerActualizer(topic = "points")
public class PointsMQHandler implements MQHandler {@Autowiredprivate PointsService pointsService;@Overridepublic ConsumeConcurrentlyStatus handle(String tag, MessageExt messageExt) {//消息監聽String messageStr = new String(messageExt.getBody());Map orderMap = (Map) JSON.parse(messageStr);Points points = new Points();Long orderId = (Long) orderMap.get("orderId");System.out.println("消息tag為:" + tag);System.out.println("消息監聽:" + "為訂單" + orderId + "添加積分");//查詢該訂單是否已經生成對應積分(rocketMQ可能會重復發送消息,需實現冪等)QueryWrapper<Points> pointsQueryWrapper = new QueryWrapper<>();pointsQueryWrapper.lambda().eq(Points::getOrderId,orderId);Points tempPoints = pointsService.getOne(pointsQueryWrapper);if (tempPoints != null) {//該訂單已經生成積分System.out.println(orderId + "已經生成積分");return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}points.setPointsId(SonwflakeUtils.get().id());points.setOrderId(orderId);Integer orderAmout = (Integer) orderMap.get("orderAmout");points.setPoints(orderAmout * 10);pointsService.save(points);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
}
使用消息隊列注意事項
- RocketMQ確保所有消息至少傳遞一次。雖然大多數情況下,消息不會重復,但還是需要對重復消息做。
- 盡量減小消息的體積,例如選擇輕量的協議,超過一定體積做壓縮處理,就消息協議而言, 二進制協議 < 文本協議。而文本協議中 json < xml 等等。