mqtt 發送消息過多_阿里云MQTT服務端注解式消息處理分發與同步調用實踐小結

一、前言

前段時間公司預研了設備app端與服務端的交互方案,出于多方面考量最終選用了阿里云的微服務隊列MQTT方案,基于此方案,本人主要實踐有:

1. 封裝了RocketMQ實現MQTT訂閱與發布的實現細節;

2. 實現了注解式分發處理,可利用如MqttController, MqttTopicMapping等相關自定義注解的方式來統一訂閱MQTT的Topic以及消息處理的分發;

3. 使用了一套請求和響應的同步機制來達到PUB/SUB異步通信的偽同步調用。

Github 地址點此鏈接

二、RocketMQ的接入細節

1. 為什么服務端要使用RocketMQ接入

阿里云微消息隊列MQTT是在以消息隊列 RocketMQ 為核心存儲的基礎上,實現更適合移動互聯網和IoT領域的無狀態網關,兩者之間具備天然的數據互通性。MQTT實例本身并不提供消息數據持久化功能,消息數據持久化需要搭配后端的消息存儲實例來使用。因此現階段每一個阿里云MQTT實例都必須配套一個消息存儲實例,即RocketMQ實例來提供消息數據持久化功能,因此他們之間可以說是消息互通的,即可用RocketMQ訂閱的方式來消費用MQTT協議發布的消息,同理也可用 MQTT協議訂閱的方式來消費RocketMQ發布的消息。

幫助文檔也給出了以下兩種產品的區別說明:

微消息隊列MQTT基于MQTT協議實現,單個客戶端的處理能力較弱。因此,微消息隊列MQTT適用于擁有大量在線客戶端(很多企業設備端過萬,甚至上百萬),但每個客戶端消息較少的場景。
相比之下,消息隊列RocketMQ是面向服務端的消息引擎,主要用于服務組件之間的解耦、異步通知、削峰填谷等,服務器規模較小(極少企業服務器規模過萬),但需要大量的消息處理,吞吐量要求高。因此,消息隊列RocketMQ適用于服務端進行大批量的數據處理和分析的場景。

基于以上區別,官方也推薦在移動端設備上使用微消息隊列MQTT,而在服務端應用中則使用消息隊列RocketMQ,具體則可以通過 MQTT SDK 以公網訪問方式來實現設備間的通信,通過MQ SDK以內網方式來實現服務端通信。

2. RocketMQ如何對接

RocketMQ與MQTT在消息結構和一些屬性字段上都有一定的映射關系,具體內容(摘自幫助文檔)如下。

微消息隊列MQTT使用MQTT協議接入,而消息隊列RocketMQ使用的是私有協議,因此,兩者的關鍵概念存在如下映射關系。

fb73025e85d4341240c5a9c1b93aec9e.png
MQ與MQTT消息結構映射關系

如上圖所示,MQTT協議中Topic是多級結構,而消息隊列RocketMQ的Topic 僅有一級,因此,MQTT中的一級Topic映射到消息隊列RocketMQ的Topic,而二級和三級Topic則映射到消息隊列RocketMQ的消息屬性(Properties)中。

消息隊列 RocketMQ 協議中的消息(Message)可以擁有自定義屬性(Properties),而MQTT協議目前的版本不支持屬性,但為了方便對MQTT協議中的Header信息和設備信息進行溯源,MQTT的部分信息將被映射到 RocketMQ的消息屬性中,方便使用消息隊列RocketMQ的SDK接入的用戶獲取。

目前,微消息隊列MQTT和消息隊列RocketMQ支持的屬性字段映射表如下圖所示。使用消息隊列RocketMQ的SDK的應用和使用消息隊列MQTT的SDK的應用進行交互時,可以通過讀寫這些屬性字段來達到信息獲取或者設置的目的。

6562f3a1a63d4e993ef95de3ce2290b9.png
屬性字段映射關系

3. RocketMQ對MQTT消息訂閱的實現

Properties properties = new Properties();
// 在控制臺創建的Group ID
properties.put(PropertyKeyConst.GROUP_ID, "xxx");
// 阿里云AccessKey 
properties.put(PropertyKeyConst.AccessKey, "xxx");
// 阿里云SecretKey
properties.put(PropertyKeyConst.SecretKey, "xxx");
// 在RocketMQ控制臺的實例基本信息中可查看到的TCP協議接入點
properties.put(PropertyKeyConst.NAMESRV_ADDR,"xxx");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("topic", "*", new MessageListener() { //訂閱全部 Tagpublic Action consume(Message message, ConsumeContext context) {//獲得mqtt消息中的第一級topicString mqttFirstTopic = message.getTopic();//獲得mqtt消息中除去1級后的所有topicString mqttSecondTopic = message.getUserProperties(PropertyKeyConst.MqttSecondTopic);//獲得mqtt消息中的messageIdString messageId = message.getUserProperties("UNIQ_KEY");//獲得mqtt消息中的消息體String messageBody = new String(message.getBody());//...return Action.CommitMessage;}});
consumer.start();

實現主要注意2點:

  • 這邊的 MQ 只需要訂閱 MQTT 的一級 Topic 。如果 MQTT 會發布2個 Topic 的消息 robot/alarmrobot/task/test ,則在此處只需要訂閱 robot 這個第一級Topic即可。
  • MQTT 的一些屬性字段可以從 RocketMQ 消息 MessageuserProperties 字段中獲得,比如上面代碼中通過 message.getUserProperties(PropertyKeyConst.MqttSecondTopic); 可以獲得 MQTT 中的 除去1級后的所有 Topic 字符串,如上述舉例的2個 Topic 可分別獲得 /alarm/task/test。 具體能夠獲得哪些字段可以參考上一節的屬性字段映射表,也可自行查看 PropertyKeyConst 類中定義的一些字符串常量來大概知曉。

使用阿里云MQTT控制臺發送一個MQTT消息,如圖所示:

f451754dda09e418f7b4d64a925c7019.png
MQTT控制臺發送消

在程序中加一個斷點獲得當前Message對象的字段如下:

1c4d74b33f6cdf96c40039d87619bf90.png
Message消息體

上圖可看到userProperties中的一些值,比如qoslevelmqttSecondTopic等,這些字段都可以在PropertyKeyConst 類中找到對應的字符串常量,但是UNIQ_KEYcleansessionflagPropertyKeyConst 類中并沒有對應的字符串常量,這邊暫時就message.getUserProperties("UNIQ_KEY")這樣使用自定義字符量來獲得。

4. RocketMQ對MQTT消息發布的實現

Properties properties = new Properties();
// 在控制臺創建的Group ID
properties.put(PropertyKeyConst.GROUP_ID, "xxx");
// 阿里云AccessKey 
properties.put(PropertyKeyConst.AccessKey, "xxx");
// 阿里云SecretKey
properties.put(PropertyKeyConst.SecretKey, "xxx");
// 在RocketMQ控制臺的實例基本信息中可查看到的TCP協議接入點
properties.put(PropertyKeyConst.NAMESRV_ADDR,"xxx");
//設置發送超時時間,單位毫秒
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
Producer producer = ONSFactory.createProducer(properties);
// 在發送消息前,必須調用 start 方法來啟動 Producer,只需調用一次即可
producer.start();//發送一個mqtt消息
String parentTopic = topic.substring(0, topic.indexOf("/"));
String subTopic = topic.substring(topic.indexOf("/"));
Message msg = new Message(parentTopic, "", message.getBytes());
msg.putUserProperties(PropertyKeyConst.MqttSecondTopic, subTopic);
msg.putUserProperties(PropertyKeyConst.MqttQOS, qos);
msg.putUserProperties("cleansessionflag", "" + cleanSessionFlag);
SendResult result = producer.send(msg);
  • 該代碼僅實現了普通消息的同步發送,若需發送順序消息、延時消息等,可參考SDK幫助文檔創建不同的Producer實現即可。
  • 上述代碼將需要發送的MQTT全量Topic拆分成1級與2級,1級Topic設置為MQ中的Topic參數,2級Topic字符串則設為userPropertiesPropertyKeyConst.MqttSecondTopic的,其他屬相如qoslevelcleansessionflag等也是通過userProperties的相關字段來設置。

三、注解式分發處理的實現

1. 前置知識點

1.1 BeanPostProcessor

BeanPostProcessor是Spring IOC容器給我們提供的一個擴展接口。BeanPostProcessor接口定義了兩個方法:

public interface BeanPostProcessor {// 前置處理Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException;// 后置處理Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException;
}

Spring中Bean的整個生命周期如圖所示:

992704fcfcdeba2ece274adf068d09a5.png
Bean生命周期

postProcessBeforeInitialization()方法與postProcessAfterInitialization()分別對應圖中前置處理和后置處理兩個步驟將執行的方法。這兩個方法中都傳入了bean對象實例的引用,為擴展容器的對象實例化過程提供了很大便利,在這兒幾乎可以對傳入的實例執行任何操作。

可以看到,Spring容器通過BeanPostProcessor給了我們一個機會對Spring管理的bean進行再加工,注解、AOP等功能的實現均大量使用了BeanPostProcessor。通過實現BeanPostProcessor的接口,在其中處理方法中判斷bean對象上是否有自定義的一些注解,如果有,則可以對這個bean實例繼續進行其他操作,這也是本例中使用該接口要實現的主要目的。

1.2 ApplicationListener

在IOC的容器的啟動過程,當所有的bean都已經處理完成之后,spring ioc容器會有一個發布事件的動作。從 AbstractApplicationContext 的源碼中就可以看出:

protected void finishRefresh() {// Initialize lifecycle processor for this context.initLifecycleProcessor();// Propagate refresh to lifecycle processor first.getLifecycleProcessor().onRefresh();// Publish the final event.publishEvent(new ContextRefreshedEvent(this));// Participate in LiveBeansView MBean, if active.LiveBeansView.registerApplicationContext(this);
}

因此當所有的bean都初始化完成并被成功裝載后會觸發ContextRefreshedEvent事件。

ApplicationListener是spring中用來監聽事件(ApplicationEvent)的傳遞,每個實現了ApplicationListener接口的bean都會收到ApplicationEvent對象的通知,每個ApplicationListener可根據事件類型只接收處理自己感興趣的事件,因此利用實現ApplicationListener的接口可以收到監聽ContextRefreshedEvent動作,然后可以寫自己的一些處理邏輯,比如初始化環境,準備測試數據、加載一些數據到內存等等。用法如下:

@Component
public class TestApplicationListener  implements ApplicationListener<ContextRefreshedEvent>{@Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {//todo:一些處理邏輯}}

1.3 反射

Java反射機制是在運行狀態中,對于任意一個類,都能夠知道這個類的所有屬性和方法;對于任意一個對象,都能夠調用它的任意一個方法和屬性;這種動態獲取的信息以及動態調用對象的方法的功能稱為java語言的反射機制。

在Java中,Class類與java.lang.reflect類庫一起對反射技術進行了全力的支持。獲取Class對象有三種方式:

  • 通過實例對象獲得:Class<?> class = object.getClass();
  • 通過類名獲得:Class<?> class = ClassName.class;
  • 通過類名全路徑獲得:Class<?> class = Class.forName("類名全路徑");

反射包中常用的類主要有

  • Constructor,表示的類的構造方法信息,利用它可以在運行時動態創建對象
  • Field,表示類的成員變量信息,通過它可以在運行時動態修改成員變量的屬性值(包含private)
  • Method,表示類的成員方法信息,通過它可以動態調用對象的方法(包含private)

下面說明一下本例中用到的一些反射api:

//獲得Class對象
Class clazz= obj.getClass();//判斷注解B是否在此A上
boolean isAnnotation= A.isAnnotationPresent(B.class);//獲得該clazz上的注解對象
B b=clazz.getAnnotation(B.class));//獲得本類以及父類或者父接口中所有的公共方法
Method[] methods=clazz.getMethods();//獲取方法上的所有參數
Parameter[] parameters = method.getParameters();//執行某對象的方法,owner為該對象,paramValues為入參數組,method為Method對象
method.invoke(owner, paramValues);

2. 整體實現思路

  1. 自定義注解MqttControllerMqttTopicMappingMqttMessageIdMqttMessageBody
  2. 利用BeanPostProcessor,獲得所有注解了MqttController的bean及其注解值,獲得其所有注解了MqttTopicMapping的Method方法及其注解值,利用兩者的注解值作為其key,分別將bean,Method為value放入不同的map中,記錄所有注解了MqttController的注解值作為下一步需要訂閱的Topic;
  3. 利用ApplicationListener在所有bean加載完成后使用實例化的mqConsumer來訂閱所有需要訂閱的Topic;
  4. 在mq訂閱的處理方法中,根據消息的全Topic在上述步驟的map中獲得其對應的bean和Method,同時根據MqttMessageIdMqttMessageBody來設置相關參數,使用method.invoke(owner, paramValues);實現方法的調用,來達到消息的處理分發。

3. 實現細節

3.1 自定義注解

@MqttController在類上使用,其中parentTopic值為需要監聽的1級Topic,其中使用@Component可以使其注解的類實例化為為Bean對象放入到Spring容器中,基于此才能在利用BeanPostProcessor中獲得其對象。

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface MqttController {/*** 監聽的父topic** @return 監聽的父topic*/String parentTopic();
}

@MqttTopicMapping在方法上使用,其中subTopic的值為需要訂閱的子Topic,與1級Topic共同組成MQTT的Topic

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MqttTopicMapping {/*** 訂閱的子topic,默認可以只訂閱1級topic** @return 訂閱的子topic*/String subTopic() default "";
}

MqttMessageBody在方法參數上使用,使得參數自動獲得messageBody對象

@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MqttMessageBody {}

MqttMessageId在方法參數上使用,使得參數自動獲得messageId的值

@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MqttMessageId {}

自定義注解的使用示例如下:

@Slf4j
@MqttController(parentTopic = "robot1")
public class MqttRobot1 {@MqttTopicMappingpublic void dealFirstTopic() {log.info("MqttRobot1.dealAlarm 收到消息啦,只處理了一級topic");}@MqttTopicMapping(subTopic = "alarm")public void dealAlarm(@MqttMessageId String messageId, @MqttMessageBody AlarmVo alarmVo) {log.info("MqttRobot1.dealAlarm 收到消息啦");log.info("messageId:{}", messageId);log.info("alarmVo:{}", alarmVo);}@MqttTopicMapping(subTopic = "task")public void dealTask() {log.info("MqttRobot1.dealTask 收到消息啦");}
}

3.2 提取Method和Bean對象

在MqttHandlerFactory類中定義以下幾個容器,分別存儲mqtt處理類的bean,mqtt處理方法以及parentTopic列表

/*** 用于存儲mqtt處理類的bean,key為parentTopic/subTopic*/
private static Map<String, Object> mqttControllers = new HashMap<>();/*** 用于存儲mqtt處理方法,key為parentTopic/subTopic*/
private static Map<String, Method> mqttHandlers = new HashMap<>();/*** 存儲parentTopic列表*/
private static Set<String> parentTopicSet = new HashSet<>();

利用BeanPostProcessor接口來處理實現了自定義注解的Bean對象。

具體代碼及注釋如下:

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {Class beanClazz = bean.getClass();//使用的MqttController注解的bean對象if (beanClazz.isAnnotationPresent(MqttController.class)) {//獲得MqttController的注解值//存儲parentTopic列表String parentTopic = ((MqttController) beanClazz.getAnnotation(MqttController.class)).parentTopic();MqttHandlerFactory.getParentTopicSet().add(parentTopic);for (Method method : beanClazz.getMethods()) {//獲得MqttTopicMapping的Method對象if (method.isAnnotationPresent(MqttTopicMapping.class)) {//獲得MqttTopicMapping的注解值String subTopic = method.getAnnotation(MqttTopicMapping.class).subTopic();String realTopic;if ("".equals(subTopic)) {realTopic = parentTopic + "/";} else {realTopic = (parentTopic + "/" + subTopic + "/").replaceAll("/+", "/");}if (null != MqttHandlerFactory.getMqttHandler(realTopic)) {throw new MqttBeansException(bean.getClass().getSimpleName() + " topic 重復定義,值為" + realTopic);}//存儲mqtt處理類的beanMqttHandlerFactory.registerMqttHandler(realTopic, method);//存儲mqtt處理方法MqttHandlerFactory.registerMqttController(realTopic, bean);log.info("MqttHandler Mapped "{}" onto {}", realTopic, method.toString());}}}return bean;
}

3.3 mq的消息訂閱與處理

實現ApplicationListener接口在所有Bean對象加載完之后根據前面記錄的parentTopicSet作為所有需要訂閱的1級Topic開始訂閱。

在訂閱消息處理中從message信息可以獲得其對應的1級Topic與2級Topic,將其處理成MQTT的全Toic并從前面記錄的mqttHandlers,mqttControllers中獲得對應的Method對象及Bean對象,從message信息中提取對應的messageId級messageBody并設置為使用了@MqttMessageBody@MqttMessageId的注解的參數中,利用反射method.invoke(MqttHandlerFactory.getMqttController(mqttTopic), paramValues);來實現方法的調用。

具體代碼及注釋如下:

@Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {//...//設置mqConsumer實例化所需propertiesConsumer mqConsumer = ONSFactory.createConsumer(properties);Set<String> parentTopicSet = MqttHandlerFactory.getParentTopicSet();if (parentTopicSet.size() == 0) {log.warn("當前應用并未有任何topic訂閱");}//根據parentTopic和subTopic訂閱parentTopicSet.forEach(parentTopic -> {log.info("Add a new rocketMq subscription,topic:{}", parentTopic);mqConsumer.subscribe(parentTopic, "*", (message, context) -> {log.debug("MqReceive Message: " + message);//獲得topicString mqttFirstTopic = message.getTopic();String mqttSecondTopic = message.getUserProperties(PropertyKeyConst.MqttSecondTopic);if (null == mqttSecondTopic) {//只有1級topic的情況mqttSecondTopic = "/";}if (!"/".equals(mqttSecondTopic.substring(mqttSecondTopic.length() - 1))) {mqttSecondTopic += "/";}String mqttTopic = mqttFirstTopic + mqttSecondTopic;Method method = MqttHandlerFactory.getMqttHandler(mqttTopic);if (null == method) {log.warn("當前沒有處理該topic的handler,topic:{}", mqttTopic);return Action.CommitMessage;} else {//獲得mqtt的一些數據String messageId = message.getUserProperties("UNIQ_KEY");String messageBody = new String(message.getBody());//處理入參Parameter[] parameters = method.getParameters();Object[] paramValues = new Object[parameters.length];for (int i = 0; i < parameters.length; i++) {if (parameters[i].isAnnotationPresent(MqttMessageId.class)) {//@MqttMessageId注解的參數paramValues[i] = messageId;} else if (parameters[i].isAnnotationPresent(MqttMessageBody.class)) {//@MqttMessageBody注解的參數Class parameterClazz = parameters[i].getType();try {paramValues[i] = JSONObject.parseObject(messageBody, parameterClazz);} catch (Exception e) {log.error("mqttMessageBody 格式錯誤,messageId:{},messageBody:{}", messageId, messageBody);//                              return Action.ReconsumeLater;return Action.CommitMessage;}} else {//自己定義的一些參數就給null把paramValues[i] = null;}}try {method.invoke(MqttHandlerFactory.getMqttController(mqttTopic), paramValues);} catch (Exception e) {log.error("處理失敗啦");}}return Action.CommitMessage;});});mqConsumer.start();log.info("MqConsumer Started");}

四、MQTT同步調用的實現

MQTT協議是基于PUB/SUB的異步通信模式,不適用于服務端同步控制設備端返回結果的場景。通過制定一套請求和響應的同步機制,可以無需改動MQTT協議來達到同步調用的目的。

1. 整體實現思路

MQTT的同步調用實際上是使用了兩個異步調用完成的,即生產者調用消費者的同時,自己也作為消費者等待某一隊列的返回消息,消費者接受到生產者的消息同時,也作為消息發送者發送一消息給生產者。

具體同步調用機制示意如下:

02a61bb89f885b09ae613bf35d540f0f.png
同步調用示意圖

首先服務端和設備端服務端都訂閱了相關的Topic,服務端發起同步調用即發布一個示意需同步返回的message到指定request Topic,設備端接收到該message后處理完業務邏輯則會將調用結果發布一個返回message到request消息體中攜帶的response Topic中,最后服務端接收到設備端返回的message可以從消息體中獲得其調用結果。

整個調用過程中客戶端需要做的工作有:

  1. 訂閱request Topic
  2. 收到消息,判斷消息是否是同步消息,是的話則處理完業務邏輯后異步發送特別的response Topic

服務端需要做的工作有:

  1. 統一訂閱特別的設備的response Topic
  2. 發送消息到特別的request Topic
  3. Future.get(timeout) 模式處理request和response的關系
  4. 異步超時請求線程處理(處理超時請求,30ms運行一次)
  5. 消息監控和異常補救

服務端處理異步為同步調用的邏輯借鑒了Dubbo底層將Netty的異步調用轉化成同步的方式,下面在實現細節中會具體闡述。

2. 實現細節

MQTT同步調用的代碼如下:

public String publishSync(String topic, String qos, boolean cleanSessionFlag, Object data, int timeout)throws MqttRemoteException {String parentTopic = topic.substring(0, topic.indexOf("/"));String subTopic = topic.substring(topic.indexOf("/"));String mId = UUID.randomUUID().toString().replaceAll("-", "");MqttMessage mqttMessage = new MqttMessage(mId, replyParentTopic + "/" + mId, data);Message msg = new Message(parentTopic, "", JSON.toJSONString(mqttMessage).getBytes());msg.putUserProperties(PropertyKeyConst.MqttSecondTopic, subTopic);msg.putUserProperties(PropertyKeyConst.MqttQOS, qos);msg.putUserProperties("cleansessionflag", "" + cleanSessionFlag);MqttFuture mqttFuture = new MqttFuture(mqttMessage, timeout);try {producer.send(msg);} catch (ONSClientException e) {mqttFuture.cancel();throw e;}return mqttFuture.get();
}

前部分就是正常RocketMQ發布MQTT消息的代碼,但是發送的消息是自定義的MqttMessage,同時這邊在調用producer.send(msg);前先構建了一個MqttFuture對象然后發送完消息后使用mqttFuture.get();來獲得同步調用的結果。

MqttMessage定義如下,其中syncFlag表示該消息是否需要同步返回,mId表示該消息的唯一id,用于本地判斷消息返回具體映射哪個同步調用的key,replyTopic表示該消息需要返回消息的Topic,data則是具體業務數據。

@Data
public class MqttMessage implements Serializable {private static final long serialVersionUID = 6648680154051903549L;/*** 是否需要同步返回,默認為true*/private boolean syncFlag = true;/*** 生成的id,用uuid生成把*/private final String mId;/*** 客戶端返回消息的Topic*/private final String replyTopic;/*** 發送數據*/private Object data;public MqttMessage(String mId, String replyTopic, Object data) {this.mId = mId;this.replyTopic = replyTopic;this.data = data;}
}

MqttFuture對象則是用來處理同步調用的邏輯,每一個MqttFuture對象都有有一個mId作為唯一標識,發送的message消息體,調用返回結果MqttResponse(包括正常mqtt返回或者超時等異常返回結果)和同步調用超時時間,還有一個鎖及其創建的Condition,用來處理線程的等待與通知,后面會對其具體邏輯進行分析。每創建一個新的MqttFuture對象都會將其放入到存儲MqttFuture的Map中,key即為該消息的mId。

@Slf4j
@Data
public class MqttFuture {public static final int DEFAULT_TIMEOUT = 1000;public static final Map<String, MqttFuture> FUTURES = new ConcurrentHashMap<>();private final Lock lock = new ReentrantLock();private final Condition done = lock.newCondition();/*** 唯一id*/private final String mId;/*** 發送的message消息體*/private final MqttMessage message;/*** 設置的同步調用超時時間*/private final int timeout;/*** 等待開始時間*/private final long start = System.currentTimeMillis();/*** 返回結果*/private volatile MqttResponse response;public MqttFuture(MqttMessage message, int timeout) {this.message = message;this.timeout = timeout;this.mId = message.getMId();FUTURES.put(mId, this);}//...

調用返回結果MqttResponse定義如下,如果是正常成功返回則返回狀態是OK且會有對應的消息體messageResult,如果超時或者其他異常情況則會返回對應的錯誤消息errorMessage。

@Data
public class MqttResponse {/*** ok狀態,正常返回result,否則返回errorMessage*/public static final Integer OK = 20000;/*** 客戶端超時未處理*/public static final Integer TIMEOUT = 40001;/*** 服務端主動取消*/public static final Integer CANCEL = 40002;private Integer mStatus = OK;/*** request生成的messageId*/private String mId;/*** 收到的消息體*/private String messageResult;/*** 狀態不是成功返回的錯誤信息*/private String errorMessage;public MqttResponse(String mId) {this.mId = mId;}
}

在上述MQ同步調用代碼中,若調用producer.send(msg);同步發送mqtt消息失敗的話,則會調用mqttFuture.cancel();來取消該MqttFture對象,代碼如下,這邊主要是設置了異常的相應response避過將該MqttFuture對象從存儲MqttFuture的Map中移除。

@Slf4j
@Data
public class MqttFuture {//...public void cancel() {MqttResponse errorResult = new MqttResponse(mId);errorResult.setMStatus(MqttResponse.CANCEL);errorResult.setErrorMessage("主動請求取消");response = errorResult;FUTURES.remove(mId);}//...

若調用producer.send(msg);同步發送mqtt消息成功的話則會調用mqttFuture.get();來獲得其同步調用的結果,具體代碼如下,首先判斷是否調用已完成(有響應結果mqttResponse,包括正常獲得返回結果或者超時等其他異常的情況),若完成的話則直接返回調用結果或者拋出相應的異常,若沒有調用完成則獲得鎖并循環判斷是否調用完成,沒有的話則調用done.await(timeout, TimeUnit.MILLISECONDS);來實現該線程的超時等待直至其他線程調用該Condition的signal方法將其喚醒。

@Slf4j
@Data
public class MqttFuture {//...public String get() throws MqttRemoteException {return this.get(timeout);}public String get(int timeout) throws MqttRemoteException {if (timeout <= 0) {timeout = DEFAULT_TIMEOUT;}if (!isDone()) {long start = System.currentTimeMillis();lock.lock();try {while (!isDone()) {done.await(timeout, TimeUnit.MILLISECONDS);if (isDone() || System.currentTimeMillis() - start > timeout) {break;}}} catch (InterruptedException e) {throw new RuntimeException(e);} finally {lock.unlock();}if (!isDone()) {throw new MqttTimeoutException("Waiting client-side response timeout");}}if (response == null) {throw new IllegalStateException("response cannot be null");}if (response.getMStatus().equals(MqttResponse.OK)) {return response.getMessageResult();}if (response.getMStatus().equals(MqttResponse.TIMEOUT)) {throw new MqttTimeoutException("Waiting client-side response timeout");}throw new MqttRemoteException(response.getErrorMessage());}/*** 判斷是否有response結果** @return 是否返回結果*/public boolean isDone() {return response != null;}//...

將上述等待喚醒的代碼如下,只需調用下面的received方法并傳入相應的MqttResponse結果即可,該方法會在后面講的判斷調用超時和正常mqtt消息結果返回的情況中調用。

@Slf4j
@Data
public class MqttFuture {//...private void doReceived(MqttResponse res) {lock.lock();try {response = res;done.signal();} finally {lock.unlock();}}public static void received(MqttResponse response) {MqttFuture future = FUTURES.remove(response.getMId());if (future != null) {future.doReceived(response);} else {log.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ", response " + response);}}//...

后臺會開啟一個線程來掃描超時任務,主要是遍歷上述存儲MqttFuture的Map,如果對應的MqttFuture并沒有完成(獲得對應的response)且調用時間已超過設置的超時時間,則設置一個超時異常的MqttResponse并調用MqttFuture.received(timeoutResponse);來喚醒上述代碼中的done.await(timeout, TimeUnit.MILLISECONDS);的線程等待。該線程每30s會遍歷一個存儲MqttFuture的Map。

@Slf4j
@Data
public class MqttFuture {//...private static class RemotingInvocationTimeoutScan implements Runnable {@Overridepublic void run() {while (true) {try {for (MqttFuture future : FUTURES.values()) {if (future == null || future.isDone()) {continue;}if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) {//當前mqtt請求已超時MqttResponse timeoutResponse = new MqttResponse(future.getMId());timeoutResponse.setMStatus(MqttResponse.TIMEOUT);MqttFuture.received(timeoutResponse);}}//每30ms掃一次Thread.sleep(30);} catch (Throwable e) {log.error("Exception when scan the timeout invocation of remoting.", e);}}}}static {Thread th = new Thread(new RemotingInvocationTimeoutScan(), "MqttResponseTimeoutScanTimer");th.setDaemon(true);th.start();}//...

為了處理正常的mqtt的消息返回,除了使用上一節中講到的自定義注解MqttController來訂閱相關的Topic,還需要訂閱特殊的一個Topic來處理mqtt同步調用的返回消息,訂閱代碼如下,根據收到的mId和messageBody設置對應的MqttResponse并調用MqttFuture.received(response);來喚醒上述代碼中的done.await(timeout, TimeUnit.MILLISECONDS);的線程等待。

//訂閱偽同步請求回復
log.info("Add a new rocketMq subscription,topic:{}", replyParentTopic);
mqConsumer.subscribe(replyParentTopic, "*", (message, context) -> {String mqttSecondTopic = message.getUserProperties(PropertyKeyConst.MqttSecondTopic);if (null == mqttSecondTopic) {mqttSecondTopic = "";}String mId = mqttSecondTopic.replaceAll("/", "");String messageBody = new String(message.getBody());MqttResponse response = new MqttResponse(mId);response.setMessageResult(messageBody);MqttFuture.received(response);return Action.CommitMessage;
});

五、demo使用

1. 項目結構

由于使用了springboot框架來實現該demo,所以項目結構如下:

75e97126796c2b204d7d9d6295b43d66.png
demo項目結

其中mqtt工具包目錄如下:

8489402e6d28594778b8dbc51170e04d.png
mqtt工具包結構

2. mqtt工具包的使用

2.1 在yml配置中添加相關配置

配置示例如下,其中xxx改為自己使用的即可

ali:mqtt:accessKey: xxxsecretKey: xxxgroupId: xxxnamesrvAddr: xxxsendMsgTimeoutMillis: 3000#消費者線程固定位50個consumeThreadNums: 50
#    用于同步調用返回發送的topicreplyParentTopic: xxx

2.2 添加工具包中的MqttConfig

@Import({ MqttConfig.class})
@Configuration
public class MqttConfigure {}

2.3 自定義注解的使用

@Slf4j
@MqttController(parentTopic = "robot1")
public class MqttRobot1 {@MqttTopicMappingpublic void dealFirstTopic() {log.info("MqttRobot1.dealAlarm 收到消息啦,只處理了一級topic");}@MqttTopicMapping(subTopic = "alarm")public void dealAlarm(@MqttMessageId String messageId, @MqttMessageBody AlarmVo alarmVo) {log.info("MqttRobot1.dealAlarm 收到消息啦");log.info("messageId:{}", messageId);log.info("alarmVo:{}", alarmVo);}@MqttTopicMapping(subTopic = "task")public void dealTask() {log.info("MqttRobot1.dealTask 收到消息啦");}
}

2.4 測試同步調用,模擬MQTT客戶端消息返回代碼

mqtt客戶端實現代碼示例參考阿里云官方demo https://github.com/AliwareMQ/lmq-demo

其中xxx的地方都改成自己的即可,下面代碼中mqttClient2.publish(replyTopic, message);即將結果發送到replyTopic中。

public class MqttClientTest {public static void main(String[] args) throws Exception {String instanceId = "xxx";String endPoint = "xxx";String accessKey = "xxx";String secretKey = "xxx";String clientId = "xxx";final String parentTopic = "xxx";//這邊需自定義mqtt客戶端topic,final String mq4IotTopic = parentTopic + "/" + "xxx" + "/xxx";final int qosLevel = 0;ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey,clientId);final MemoryPersistence memoryPersistence = new MemoryPersistence();final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);final MqttClient mqttClient2 = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);/*** 客戶端設置好發送超時時間,防止無限阻塞*/mqttClient.setTimeToWait(5000);final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());mqttClient.setCallback(new MqttCallbackExtended() {@Overridepublic void connectComplete(boolean reconnect, String serverURI) {/*** 客戶端連接成功后就需要盡快訂閱需要的 topic*/System.out.println("connect success");executorService.submit(new Runnable() {@Overridepublic void run() {try {final String topicFilter[] = { mq4IotTopic };final int[] qos = { qosLevel };mqttClient.subscribe(topicFilter, qos);} catch (MqttException e) {e.printStackTrace();}}});}@Overridepublic void connectionLost(Throwable throwable) {throwable.printStackTrace();}@Overridepublic void messageArrived(String s, MqttMessage mqttMessage) throws Exception {System.out.println("receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));JSONObject jsonObject = JSON.parseObject(new String(mqttMessage.getPayload()));String mId = jsonObject.getString("mId");String replyTopic = jsonObject.getString("replyTopic");String result = mId + "回復啦";MqttMessage message = new MqttMessage(result.getBytes());message.setQos(qosLevel);//這邊會將結果發送到replyTopic中mqttClient2.publish(replyTopic, message);System.out.println("發送啦");}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);}});mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());mqttClient2.connect(connectionOptionWrapper.getMqttConnectOptions());Thread.sleep(Long.MAX_VALUE);}
}

3. demo體驗

啟動項目可以在控制臺看到有如下日志:

130792937b1d3c44e434e340c1af2018.png
MQTT處理Topic映射日志

28b9eac961f933d79ed8bd9307e393b9.png
MQTT訂閱Topic日志

從日志中可以看出程序自動處理了自定義注解的Mqtt消息處理的映射,并根據mqtt的1級Topic進行了RocketMQ的相關訂閱。

提供了一個測試MQTT消息簡單發送接口如下:

@GetMapping("/publish")
public String doPublish(@RequestParam("topic") String topic, @RequestParam("message") String message) {try {return mqttClient.publish(topic, message);} catch (ONSClientException e) {return e.getMessage();}
}

使用 http://localhost:8080/mqtt/publish?topic=robot1/alarm&message={id:"1",code:"heheh"}來調用該接口。

可以在控制臺看到如下日志打印:

7e955305f9c3939b875b87ee0dab5c1e.png
MQTT訂閱的消息日志

由于本demo中使用自定義注解訂閱了該Topic,所以調用該接口發送消息之后也會被本demo成功接收,并分發到對應的處理函數中,因此調用了該接口后可以在控制臺看到如上日志打印,可以看到MQTT消息成功發布,訂閱到該消息并實現了MQTT消息的處理分發。

提供了一個測試MQTT同步調用的接口如下:

@GetMapping("/publish-sync")public String publishSync(@RequestParam("topic") String topic, @RequestParam("message") String message) {try {return mqttClient.publishSync(topic, message, 5000);} catch (MqttRemoteException e) {return e.getMessage();}
}

使用http://localhost:8080/mqtt/publish-sync?topic=robot1/alarm/GID_ROBOT@@@DEVICEID_001&message=hehehe來調用接口。

可以看到下圖的結果:

456d585067873e676cce0e8a65b8c9d9.png
MQTT同步調用超時

這是MQTT同步調用超時的情況,因為此時還沒有開啟對應的MQTT客戶端,因此發送的MQTT消息并沒有客戶端進行回應,所以出現了調用超時的情況,如果運行上述模擬MQTT客戶端消息返回的代碼后再次調用該接口,可以看到同步調用成功返回了對應的結果,如圖所示。

abbfc919190ca8a25ea649f7217e726d.png
MQTT同步調用成功返回

源碼地址如下,僅供學習參考

DavidDingXu/panda-mqtt?github.com
460b53ce50b2c889a573697c96827e8f.png

六、參考

  • 阿里云微消息隊列 MQTT幫助文檔
  • 阿里云消息隊列 RocketMQ幫助文檔

本文原創,歡迎轉載,轉載請注明出處,如有不正確的地方懇請各位看官指正。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/534271.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/534271.shtml
英文地址,請注明出處:http://en.pswp.cn/news/534271.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

linux課堂筆記(8)

磁盤分區工具&#xff08;parted&#xff09; 1&#xff0c;語法&#xff1a; parted 磁盤 注&#xff1a;磁盤類型&#xff08;根據接口劃分&#xff09; &#xff08;1&#xff09;IDE-hd&#xff0c;hda&#xff0c;hdc&#xff08;光驅&#xff09;&#xff0c;hdd 加…

css引入矢量圖標_WEB 項目開發中的圖標管理解決方案

相信很多前端開發人員在項目開發過程中都會遇到這樣一個問題&#xff1a;頁面的圖標發生改動&#xff0c;需要往圖標文件中追加新的圖標&#xff0c;卻因為圖標文件已經打包好而無從下手&#xff0c;重新制作一份圖標文件吧&#xff0c;要考慮替換整個項目的圖標&#xff0c;工…

判斷按鍵值_Pygame(九)按鍵事件(2)

Pygame(九)按鍵事件(2)前情提要前情提要作業解析完整代碼# /usr/bin/python3# Author: 愛編程的章老師# Time: 2021/1/7 0007# E-mail: Bluesand2010163.comimport pygame, sys, timefrom random import randintdef homework():作業 擋板接球 一個寬100 ,高10的棕色擋板,…

動態添加input_前端提效必備:動態模版生成

前言在日常開發中&#xff0c;我們需要不停的新建頁面和組件。以 Vue 項目為例&#xff0c;我們在新建一個頁面的時候&#xff0c;需要經歷一遍又一遍重復的過程&#xff1a;1、先新建一個文件夾2、然后新建一個 .vue 文件&#xff0c;寫上 、", "" ],"…

在dom最前面插入_JavaScript中的DOM

1. 關于DOM文檔對象模型(DocumentObject Model)&#xff0c;是基于瀏覽器編程的一套API接口&#xff0c;W3C出臺的推薦標準&#xff0c;每個瀏覽器都有一些細微的差別&#xff0c;其中以Mozilla(火狐)的瀏覽器最與標準接近。通過 DOM&#xff0c;可以訪問所有的 HTML元素&#…

python 快速刪除程序_如何快速一次性卸載所有python包(第三方庫)呢

很多時候我們都會有一個麻煩事&#xff0c;就是打開pycharm或者VScode等的時候&#xff0c;都有可能因為自己電腦上面安裝的第三方模塊過多&#xff0c;導致電腦很卡&#xff0c;風扇轉速不停地增加&#xff0c;這時候我們就會想去卸載一些不太用得著的第三方模塊&#xff0c;但…

python正則表達式修飾符_Python正則表達式

正則表達式是一個特殊的字符序列&#xff0c;它能幫助你方便的檢查一個字符串是否與某種模式匹配。re 模塊使 Python 語言擁有全部的正則表達式功能。compile 函數根據一個模式字符串和可選的標志參數生成一個正則表達式對象。該對象擁有一系列方法用于正則表達式匹配和替換。r…

當代最值得收藏的畫家作品_當代最具潛力和收藏價值的十大畫家

原標題&#xff1a;當代最具潛力和收藏價值的十大畫家當代從事繪畫的人成千上萬&#xff0c;哪些名家作品值得收藏&#xff1f;當前都有哪些“潛力股”&#xff0c;相關專家綜合市場分析&#xff0c;純從藝術水準上列出值得收藏的“潛力股”&#xff0c;供愛好書畫收藏的各界人…

備抵附加賬戶的期末余額_會計賬戶的分類(二)

待攤費用賬戶的格式和運用方法同盤存類賬戶&#xff0c;即&#xff1a;期初如果有余額在借方&#xff0c;本期發生額的增加數在借方&#xff0c;本期發生額的減少數在貸方&#xff0c;期末如果有余額在借方。預提費用賬戶是指用來核算和監督按規定預先提取計入當期&#xff0c;…

mysql union all 別名_mysql union和union all

如下先創建2個表&#xff0c;aa bb.CREATE table aa(uid int(20) not null,name VARCHAR(30) not null)engineinnodb default charsetutf8mb4 COLLATE utf8mb4_general_ci;INSERT INTO aa(uid, name) VALUES (10, 張芳);INSERT INTO aa(uid, name) VALUES (11, 王凱);INSERT IN…

MySQL不走聯合索引_mysql group by 多列優化思路?為什么不走聯合索引?

explain SELECT a, b, COUNT(*) FROM tbnameGROUP BY a, border by a DESClimit 1a 和 b 列已經設置聯合索引&#xff0c; 為什么這種操作也會執行全表掃描呢&#xff1f;explain SELECT a, b FROM tbnameGROUP BY a, border by a DESC去掉 COUNT 和 limit 則走索引沒有掃描&am…

mysql ddl 鎖_MySQL Online DDL導致全局鎖表案例分析

MySQL Online DDL導致全局鎖表案例分析我這邊遇到了什么問題?線上給某個表執行新增索引SQL, 然后整個數據CPU打到100%, 連接數暴增到極限, 最后導致所有訪問數據庫的應用都奔潰.SQL如下:ALTER TABLE bookADD INDEX idx_sub_title (sub_title ASC);能看到什么?10063293, root,…

ci框架 亂碼 mysql_mysql容器亂碼問題

在docker-compose.yml文件中定義mysql導入utf-8的萬國碼services:mysql:image:mysql:5.7# command: [--character-set-serverutf8mb4, --collation-serverutf8mb4_unicode_ci]volumes:-./data/docker/mysql:/var/lib/mysql-./mysql/:/docker-entrypoint-initdb.d/-./conf/mys…

mysql分表 查詢 優化_MySQL性能管理及架構(查詢優化、分庫分表)一遍文章搞定...

相關配置參數&#xff1a;slow_query_log # 啟動停止記錄慢查日志&#xff0c;慢查詢日志默認是沒有開啟的可以在配置文件中開啟(on)slow_query_log_file # 指定慢查日志的存儲路徑及文件&#xff0c;日志存儲和數據從存儲應該分開存儲long_query_time # 指定記錄慢查詢日志SQL…

dokcer mysql修改編碼_默認支持utf8編碼的mysql docker鏡像

其他人都是不正確的使用mysql容器鏡像可以很快速的運行mysql&#xff0c;免去了傳統的虛擬機安裝方式的繁瑣配置。但是使用官方的mysql鏡像&#xff0c;你會遇到中文亂碼的問題&#xff0c;原因是官方鏡像的字符集默認值不是utf8。這時候你去google&#xff0c;會找到一些文章&…

mysql臨時表 清空_在數據庫中臨時表什么時候會被清除呢

展開全部我們仍使用 實驗 05 中的環境&#xff0c;略去準備數據的過程。我們仍然使用兩個會話&#xff0c;62616964757a686964616fe59b9ee7ad9431333433646439一個會話 run&#xff0c;用于運行主 SQL&#xff1b;另一個會話 ps&#xff0c;用于進行 performance_schema 的觀察…

locust mysql_locust性能壓測連接mysql,隨機取出班級,綁定學生

from locust import HttpLocust, TaskSet, taskimport pymysqlimport randomclass UserBehavior(TaskSet):def on_start(self):db pymysql.connect("ip地址", "用戶名", "密碼", "數據庫")cursor db.cursor()cursor.execute("S…

mysql無法添加或更新子行_違反完整性約束:1452無法添加或更新子行:

我試圖在我的注釋表中插入值&#xff0c;但出現錯誤。俗話說&#xff0c;我不能添加或更新子行&#xff0c;我也不知道這意味著什么。我的架構看起來像這樣-- ------------------------------ Table structure for comments-- ----------------------------DROP TABLE IF EXIST…

python醫學圖像分割_基于cv2的醫學圖像分割

例如&#xff0c;圖像如下所示&#xff1a;import cv2import numpy as npimg cv2.imread("mdb168.pgm",0)import matplotlib.pyplot as pltplt.imshow(img, cmap"gray")我想刪除圖像中所有的偽影和不必要的部分。在為此&#xff0c;我首先對圖像進行二值化…

python解析response的json_從HTML responseTex解析JSON

因此&#xff0c;我大體上同意&#xff0c;更好的解決方案是確保服務器只返回JSON&#xff0c;不過這是通過客戶端Javascript實現的一種快速方法&#xff0c;如Barmer所建議的那樣&#xff0c;將html解析到DOM&#xff0c;獲取body中的文本childNode并在其上運行JSONParse。在v…