MQTT
1.MQTT協議概述
MQTT是一種基于發布/訂閱模式的輕量級消息傳輸協議,設計用于低帶寬、高延遲或不穩定的網絡環境,廣泛應用于物聯網領域
1.1 MQTT協議的應用場景
1.智能家居、車聯網、工業物聯網:MQTT可以用于連接各種家電設備和傳感器,實現設備之間的通信和控制
2.遠程監控和控制 : MQTT可以用于將傳感器數據發布到云平臺,供其他設備或應用程序訂閱和使用
3.消息通知:MQTT可以用于提供實時消息通知功能
4.資源監控與管理:MQTT能夠提供對物聯網設備的實時監控和管理功能
5.數據采集和分析:MQTT也可以用于數據采集和分析
1.2 MQTT協議優勢
1.常見的計算機語言(C/C++、Java、Python、Go…)都有支持MQTT協議的客戶端
2.MQTT協議是建立在TCP/IP協議基礎之上,所以MQTT協議安全可靠
3.服務質量設置,MQTT協議提供了三種服務質量配置分別為:
Qos 0:消息可能丟失
Qos 1:消息不會丟失,但是可能重復
Qos 2:消息不會丟失也不會重復4.心跳保活:由于網絡問題可能造成連接陷于假死狀態,為了判斷客戶端和代理是否出現異常,MQTT定義自己的心跳機制,定期向代理發送報文,以便于快速識別出異常連接,讓客戶端快速與代理斷開連接
5.持久會話: 代理保留客戶端發送過來的消息,以便于消息訂閱端上線立刻獲取消息
1.3 MQTT協議報文
MQTT報文(數據包)由三部分組成:
1.固定報頭(Fixed header):所有數據包中都包含此報頭,用來表示數據包的類型,以及數據包的分組累標識
2.可變報頭(Variable header):存在于部分MQTT數據包中,數據包類型決定了可變頭是否存在及其具體內容
3.有效載荷(Payload):存在于部分MQTT數據包中,表示客戶端收到的具體內容
1.4 MQTT協議的工作原理
MQTT協議基于TCP/IP協議,TCP/IP協議是一個安全穩定的協議,通信需要服務端和客戶端經歷三次握手四次揮手,建立一個穩定的通道然后在進行數據傳輸
MQTT協議建立在TCP/IP協議之上,也是需要編寫服務端(上圖中的Broker)和客戶端(消息發布者和消息訂閱者)
2.MQTT代理服務器介紹和搭建
EMQX官網地址: https://www.emqx.io/zh
介紹:
1.開源大規模分布式MQTT代理服務器
2.單臺并發連接數可以高達一億,每秒處理百萬級消息
3.安全可靠的消息傳遞
2.1 EMQX安裝: windows
1.下載地址: https://www.emqx.io/zh/get-started
2.安裝步驟:
第一步: 下載 emqx-5.3.2-windows-amd64.zip 安裝包,版本可能和我這個不同
第二步: 解壓
第三步: 打開命令行(以管理員身份運行),切換到解壓目錄的bin目錄下
第四步: 安裝,在bin目錄下執行EMQX安裝命令 emqx.cmd install,完成之后有類似下面的輸出,說明安裝成功,只需要安裝一次(運行命令)
? D:\app\emqx-5.3.2-windows-amd64\bin>emqx.cmd install
? EMQX_NODE__DB_ROLE [node.role]: core
? EMQX_NODE__DB_BACKEND [node.db_backend]: mnesia
? D:\app\emqx-5.3.2-windows-amd64\erts-13.2.2.4\bin\erlsrv.exe: Service emqx_5.3.2 added to system.
? [SC] ChangeServiceConfig 成功
第五步(可選擇):如果想將EMQX從windows上卸載,可以執行 emqx.cmd uninstall 命令
第六步:去windows服務列表中找到第四步安裝的EMQX的服務,鼠標右鍵啟動
第七步:在命令行輸入 emqx.cmd console 命令,查看是否啟動成功,如果有類似以下日志啟動成功
? D:\app\emqx-5.3.2-windows-amd64\bin>emqx.cmd console
? EMQX_LOG__CONSOLE_HANDLER__ENABLE [log.console.enable]: true
? EMQX_NODE__DB_ROLE [node.role]: core
? EMQX_NODE__DB_BACKEND [node.db_backend]: mnesiaD:\app\emqx-5.3.2-windows-amd64>D:\app\emqx-5.3.2-windows-amd64\erts-13.2.2.4\bin\erl.exe -mode embedded -boot “D:\app\emqx-5.3.2-windows-amd64\releases\5.3.2\start” -config “D:\app\emqx-5.3.2-windows-amd64\data\configs\app.2024.05.06.16.38.19.config” -args_file “D:\app\emqx-5.3.2-windows-amd64\data\configs\vm.2024.05.06.16.38.19.args” -mnesia dir ‘d:/app/emqx-5.3.2-windows-amd64/data/mnesia/emqx@127.0.0.1’
Listener ssl:default on 0.0.0.0:8883 started.
Listener tcp:default on 0.0.0.0:1883 started.
Listener ws:default on 0.0.0.0:8083 started.
Listener wss:default on 0.0.0.0:8084 started.
Listener http:dashboard on :18083 started.
EMQX 5.3.2 is running now!
Eshell V13.2.2.4 (abort with ^G)
v5.3.2(emqx@127.0.0.1)1>
第八步:通過瀏覽器訪問控制臺http://127.0.0.1:18083,默認初始化用戶名: admin,默認密碼: public,進入之后會讓你重新修改密碼
注意事項: (第六步+第七步)這種啟動方式在開發時使用,如果想正式環境使用請遵循官網命令啟動介紹: 正式環境啟動在bin目錄下直接輸入 emqx start進行EMQX啟動,這時不需要(第六步和第七步)
官網命令詳細使用地址: https://www.emqx.io/docs/zh/latest/admin/cli.html
2.2 MQTT客戶端工具MQTTX
EMQX官網自帶工具MQTTX,官網地址: https://mqttx.app/zh/downloads
傻瓜式安裝,無腦下一步
若依框架融合mqtt
倉庫地址:https://gitee.com/peng-chuanbin/iot-mqtt.git
實現效果:(初步Demo實現)
點擊網頁的按鈕(發送數據),mqtt能夠接收到發送的數據
mqtt發送數據,Java程序能夠接收到,并且存儲到數據庫中
1.下載mqttx
2.運行項目
1.新建一個mqtt數據庫,然后運行sql文件,修改yaml中的數據庫配置
2.pom.xml添加mqtt的依賴
<!--mqtt--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency>
3.utils包下新建一個mqtt包,添加三個文件
package com.ruoyi.common.utils.mqtt;@Component
@ConfigurationProperties("spring.mqtt")
public class MqttConfig {@Autowiredprivate MqttPushClient mqttPushClient;/*** 用戶名*/private String username;/*** 密碼*/private String password;/*** 連接地址*/private String hostUrl;/*** 客戶Id*/private String clientId;/*** 默認連接話題*/private String defaultTopic;/*** 超時時間*/private int timeout;/*** 保持連接數*/private int keepalive;/*** mqtt功能使能*/private boolean enabled;public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public String getPassword() {return password;}public void setPassword(String password) {this.password = password;}public String getHostUrl() {return hostUrl;}public void setHostUrl(String hostUrl) {this.hostUrl = hostUrl;}public String getClientId() {return clientId;}public void setClientId(String clientId) {this.clientId = clientId;}public String getDefaultTopic() {return defaultTopic;}public void setDefaultTopic(String defaultTopic) {this.defaultTopic = defaultTopic;}public int getTimeout() {return timeout;}public void setTimeout(int timeout) {this.timeout = timeout;}public int getKeepalive() {return keepalive;}public void setKeepalive(int keepalive) {this.keepalive = keepalive;}public boolean isEnabled() {return enabled;}public void setEnabled(boolean enabled) {this.enabled = enabled;}@Beanpublic MqttPushClient getMqttPushClient() {if (enabled == true) {mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);//訂閱java主題 #重要,這里確定主題是哪一個mqttPushClient.subscribe("java",0);}return mqttPushClient;}
}
PushCallback
package com.ruoyi.common.utils.mqtt;@Component
public class PushCallback implements MqttCallback {private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);@Autowiredprivate MqttConfig mqttConfig;private static MqttClient client;private static String _topic;private static String _qos;private static String _msg;@Overridepublic void connectionLost(Throwable throwable) {// 連接丟失后,一般在這里面進行重連logger.info("連接斷開,可以做重連");if (client == null || !client.isConnected()) {mqttConfig.getMqttPushClient();}}@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {// subscribe后得到的消息會執行到這里面logger.info("接收消息主題 : " + topic);logger.info("接收消息Qos : " + mqttMessage.getQos());logger.info("接收消息內容 : " + new String(mqttMessage.getPayload()));}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {logger.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());}}
MqttPushClient
package com.ruoyi.common.utils.mqtt;@Component
public class MqttPushClient {private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);@Autowiredprivate PushCallback pushCallback;private static MqttClient client;private static MqttClient getClient() {return client;}private static void setClient(MqttClient client) {MqttPushClient.client = client;}/*** 客戶端連接** host ip+端口* clientID 客戶端Id* username 用戶名* password 密碼* timeout 超時時間* keepalive 保留數*/public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {MqttClient client;try {client = new MqttClient(host, clientID, new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setCleanSession(true);options.setUserName(username);options.setPassword(password.toCharArray());options.setConnectionTimeout(timeout);options.setKeepAliveInterval(keepalive);MqttPushClient.setClient(client);try {client.setCallback(pushCallback);client.connect(options);} catch (Exception e) {e.printStackTrace();}} catch (Exception e) {e.printStackTrace();}}/*** 發布* qos 連接方式* retained 是否保留* topic 主題* pushMessage 消息體*/public AjaxResult publish(int qos, boolean retained, String topic, String pushMessage) {MqttMessage message = new MqttMessage();message.setQos(qos);message.setRetained(retained);message.setPayload(pushMessage.getBytes());MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);if (null == mTopic) {logger.error("topic not exist");}MqttDeliveryToken token;try {token = mTopic.publish(message);token.waitForCompletion();return success();} catch (MqttPersistenceException e) {e.printStackTrace();return error();} catch (MqttException e) {e.printStackTrace();return error();}}/*** 訂閱某個主題* topic 主題* qos 連接方式*/public void subscribe(String topic, int qos) {logger.info("開始訂閱主題" + topic);try {MqttPushClient.getClient().subscribe(topic, qos);} catch (MqttException e) {e.printStackTrace();}}}
4.application.yml
如果有服務器了,直接修改broker.emqx.io這個就可以了,其他的都無所謂
broker.emqx.io:官方測試的
# mqttmqtt:username: pcb # 用戶名password: 123456 # 密碼hostUrl: tcp://broker.emqx.io:1883 # tcp://ip:端口 #重要clientId: clientIdBamBam # 客戶端iddefaultTopic: topic,topic1 # 訂閱主題 #重要timeout: 100 # 超時時間 (單位:秒)keepalive: 60 # 心跳 (單位:秒)enabled: true # 是否使能mqtt功能
5.啟動mqttx,運行項目,訪問 http://localhost/ruoyi
mqttx發送數據,Java程序接收數據
3.設計前端界面
新建一個WlwController
package com.ruoyi.project.system.wlw.controller;@Controller
@RequestMapping("/system/wlw")
public class WlwController {private String prefix = "system/wlw";@GetMapping()public String wlw(){return prefix + "/w";}}
在resources包下的templates包中system包新建一個w.html界面,用來發送數據和顯示數據
<!DOCTYPE html>
<html lang="zh" xmlns:th="http://www.thymeleaf.org">
<head><meta charset="UTF-8"><title>wlw</title>
</head>
<body>1
</body>
</html>
ShiroConfig放開攔截
/*** Shiro過濾器配置*/@Beanpublic ShiroFilterFactoryBean shiroFilterFactoryBean(SecurityManager securityManager){// Shiro連接約束配置,即過濾鏈的定義LinkedHashMap<String, String> filterChainDefinitionMap = new LinkedHashMap<>();// 對靜態資源設置匿名訪問filterChainDefinitionMap.put("/favicon.ico**", "anon");filterChainDefinitionMap.put("/ruoyi.png**", "anon");filterChainDefinitionMap.put("/html/**", "anon");filterChainDefinitionMap.put("/css/**", "anon");filterChainDefinitionMap.put("/docs/**", "anon");filterChainDefinitionMap.put("/fonts/**", "anon");filterChainDefinitionMap.put("/img/**", "anon");filterChainDefinitionMap.put("/ajax/**", "anon");filterChainDefinitionMap.put("/js/**", "anon");filterChainDefinitionMap.put("/ruoyi/**", "anon");filterChainDefinitionMap.put("/captcha/captchaImage**", "anon");// 退出 logout地址,shiro去清除sessionfilterChainDefinitionMap.put("/logout", "logout");// 不需要攔截的訪問filterChainDefinitionMap.put("/login", "anon,captchaValidate");// 不需要攔截的訪問 wlw 添加這句話filterChainDefinitionMap.put("/system/wlw", "anon,captchaValidate");}
訪問 http:/localhost:80/ruoyi/system/wlw
4.設計假數據
新建數據庫表w
使用若依自動生成代碼
將生成的代碼放到指定位置
xml文件,并且添加一句話:在執行插入(INSERT)操作之前,先生成一個主鍵值id,并將其設置到要插入的對象中
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.ruoyi.project.system.w.mapper.WMapper"><resultMap type="W" id="WResult"><result property="id" column="id" /><result property="topic" column="topic" /><result property="data" column="data" /></resultMap><sql id="selectWVo">select id, topic, data from w</sql><insert id="insertW" parameterType="W">//添加這句話<selectKey keyProperty="id" resultType="String" order="BEFORE" >SELECT REPLACE(UUID(),'-','') from dual</selectKey>insert into w<trim prefix="(" suffix=")" suffixOverrides=","><if test="id != null">id,</if><if test="topic != null">topic,</if><if test="data != null">data,</if></trim><trim prefix="values (" suffix=")" suffixOverrides=","><if test="id != null">#{id},</if><if test="topic != null">#{topic},</if><if test="data != null">#{data},</if></trim></insert></mapper>
前端文件
啟動項目,新建菜單
新增加一個Java訂閱,新增加了之后數據庫就會顯示一條數據:id=uuid,dtopic=java,data=null
這里要和MqttConfig中,選擇訂閱的主題一樣
注意:代碼寫的位置
package com.ruoyi.common.utils.mqtt;@Component
public class PushCallback implements MqttCallback {private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);@Autowiredprivate MqttConfig mqttConfig;private static MqttClient client;private static String _topic;private static String _qos;private static String _msg;@Overridepublic void connectionLost(Throwable throwable) {// 連接丟失后,一般在這里面進行重連logger.info("連接斷開,可以做重連");if (client == null || !client.isConnected()) {mqttConfig.getMqttPushClient();}}@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {// subscribe后得到的消息會執行到這里面logger.info("接收消息主題 : " + topic);logger.info("接收消息Qos : " + mqttMessage.getQos());logger.info("接收消息內容 : " + new String(mqttMessage.getPayload()));//todo 代碼一般寫在這里......}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {logger.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());}}
注意:在MqttConfig中,選擇訂閱的主題
@Beanpublic MqttPushClient getMqttPushClient() {if (enabled == true) {mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);//訂閱java主題 mqttPushClient.subscribe("java",0);}return mqttPushClient;}
5.接收數據
編寫代碼,硬件傳來的數據更新保存在數據庫中
先獲取topic=Java的數據,然后可存不存在,存在就j更新,將硬件傳過來的數據更新到data中
package com.ruoyi.common.utils.mqtt;@Component
public class PushCallback implements MqttCallback {private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);@Autowiredprivate MqttConfig mqttConfig;@Autowiredprivate IWService wService;private static MqttClient client;private static String _topic;private static String _qos;private static String _msg;@Overridepublic void connectionLost(Throwable throwable) {// 連接丟失后,一般在這里面進行重連logger.info("連接斷開,可以做重連");if (client == null || !client.isConnected()) {mqttConfig.getMqttPushClient();}}@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {// subscribe后得到的消息會執行到這里面logger.info("接收消息主題 : " + topic);logger.info("接收消息Qos : " + mqttMessage.getQos());logger.info("接收消息內容 : " + new String(mqttMessage.getPayload()));//硬件傳來的數據update在數據庫中
// @Log(title = "物聯網接收數據", businessType = BusinessType.UPDATE)
// @PostMapping("/edit")
// @ResponseBody
// public AjaxResult editSave (W w){
// return toAjax(wService.updateW(w));
// }//查詢更新操作,查詢topic=Java的數據,如果有,就把數據更新到數據庫中(data)W w = new W();w.setTopic("java");//查詢List<W> list = wService.selectWList(w);if (list.size() > 0) {//根據id去查詢w.setId(list.get(0).getId());w.setTopic(null);w.setData(new String(mqttMessage.getPayload()));try {wService.updateW(w);//alt+ctrl+t:拋異常} catch (Exception e) {throw new RuntimeException(e);}}}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {logger.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());}}
運行系統,此時數據庫的data為空(新增加一個Java訂閱的數據)
使用mqttx,模擬硬件發Java發消息
發送111
5.1 前端顯示接收的數據
wlw包的w.html編寫前端,編寫一個定時器localhost/ruoyi/system/wlw
<!DOCTYPE html>
<html lang="zh" xmlns:th="http://www.thymeleaf.org">
<head><meta charset="UTF-8"><title>wlw</title><script th:src="@{/js/jquery.min.js}"></script><script th:inline="javascript">var ctx = [[@{/}]];//定時器,每五秒定時接收數據setInterval(function(){$.ajax({type: "post",url: ctx + "system/w/list",dataType: "json",success: function (result) {console.log(result);}})}, 1000);</script>
</head>
<body><button>開</button>
</body>
</html>
shiroConfig,放開攔截
// 不需要攔截的訪問 接收
filterChainDefinitionMap.put("/system/w/list", "anon,captchaValidate");
控制臺接收數據
把查詢的數據顯示到界面上
<!DOCTYPE html>
<html lang="zh" xmlns:th="http://www.thymeleaf.org">
<head><meta charset="UTF-8"><title>wlw</title><script th:src="@{/js/jquery.min.js}"></script><script th:inline="javascript">var ctx = [[@{/}]];//定時器,每五秒定時接收數據setInterval(function () {$.ajax({type: "post",url: ctx + "system/w/list",data: {//根據topic為Java的查topic: "java",},dataType: "json",success: function (result) {if (result.total > 0) {//顯示數據$("div").text(result.rows[0].data);}}})}, 1000);</script>
</head>
<body>
<button>開</button>
<div></div>
</body>
</html>
使用mqttx模擬硬件發送數據,Java程序接收并顯示在界面上
6.發送消息
修改主題(三個地方)
PushCallback,修改為test
W w = new W();w.setTopic("test"); //test//查詢List<W> list = wService.selectWList(w);if (list.size() > 0) {//根據id去查詢w.setId(list.get(0).getId());w.setTopic(null);w.setData(new String(mqttMessage.getPayload()));try {wService.updateW(w);//alt+ctrl+t:拋異常} catch (Exception e) {throw new RuntimeException(e);}}
MqttConfig
@Beanpublic MqttPushClient getMqttPushClient() {if (enabled == true) {mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);//訂閱java主題mqttPushClient.subscribe("test",0);}
// if(enabled == true){
// String mqtt_topic[] = StringUtils.split(defaultTopic, ",");
// mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);//連接
// for(int i=0; i<mqtt_topic.length; i++){
// mqttPushClient.subscribe(mqtt_topic[i], 0);//訂閱主題
// }
// }return mqttPushClient;}
}
數據庫中的topic修改為test
前端代碼
<!DOCTYPE html>
<html lang="zh" xmlns:th="http://www.thymeleaf.org">
<head><meta charset="UTF-8"><title>wlw</title><script th:src="@{/js/jquery.min.js}"></script><script th:inline="javascript">var ctx = [[@{/}]];$(function () {$("button").click(function () {$.ajax({type: "post",url: ctx + "system/wlw/open",//添加這里的代碼data: {//向java主題發送open數據topic: "java",msg: "open"},dataType: "json",success: function (result) {console.log(result);}// data: {// topic: "java1",// msg: "open"// },// dataType: "json",// success: function(result) {// console.log(result);// }})})})</script>
</head>
<body>
<button>開</button>
<div></div>
</body>
</html>
運行項目,點擊開按鈕,查看mqttx是否接收到了數據
注意:發送的訂閱主題不能和接收的主題一樣,所以前面我們要修改主題為test
7.iot-mqtt
半成品項目https://gitee.com/peng-chuanbin/iot-mqtt.git