SpringBoot整合MQTT實戰:基于EMQX構建高可靠物聯網通信,從零到一實現設備云端雙向對話

一、引言

隨著物聯網(IoT)技術的快速發展,MQTT(Message Queuing Telemetry Transport)協議因其輕量級、低功耗和高效的特點,已成為物聯網設備通信的事實標準。本文將詳細介紹如何使用SpringBoot框架整合MQTT協議,基于開源MQTT代理EMQX實現設備與服務器之間的雙向通信。

二、技術選型與環境準備

2.1 技術棧介紹

  • SpringBoot 2.7.x:簡化Spring應用初始搭建和開發過程

  • EMQX 5.0:開源的大規模分布式MQTT消息服務器

  • Eclipse Paho:流行的MQTT客戶端庫

  • Lombok:簡化Java Bean編寫

2.2 環境準備

  1. 安裝EMQX服務器(可使用Docker快速部署):

    docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:5.0.14

  2. 確保Java開發環境(JDK 11+)和Maven已安裝

三、SpringBoot項目集成MQTT

3.1 創建SpringBoot項目并添加依賴

pom.xml中添加必要的依賴:

<dependencies><!-- SpringBoot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!-- MQTT Paho Client --><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency><!-- Lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><!-- JSON處理 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>
</dependencies>

3.2 配置MQTT連接參數

application.yml中添加配置:

mqtt:broker-url: tcp://localhost:1883username: emqxpassword: publicclient-id: springboot-serverdefault-topic: device/statustimeout: 30keepalive: 60qos: 1clean-session: true

創建配置類MqttProperties.java

@Data
@Configuration
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {private String brokerUrl;private String username;private String password;private String clientId;private String defaultTopic;private int timeout;private int keepalive;private int qos;private boolean cleanSession;
}

3.3 實現MQTT客戶端配置

創建MqttConfiguration.java

@Configuration
@RequiredArgsConstructor
public class MqttConfiguration {private final MqttProperties mqttProperties;@Beanpublic MqttConnectOptions mqttConnectOptions() {MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(new String[]{mqttProperties.getBrokerUrl()});options.setUserName(mqttProperties.getUsername());options.setPassword(mqttProperties.getPassword().toCharArray());options.setConnectionTimeout(mqttProperties.getTimeout());options.setKeepAliveInterval(mqttProperties.getKeepalive());options.setCleanSession(mqttProperties.isCleanSession());options.setAutomaticReconnect(true);return options;}@Beanpublic IMqttClient mqttClient() throws MqttException {IMqttClient client = new MqttClient(mqttProperties.getBrokerUrl(), mqttProperties.getClientId(), new MemoryPersistence());client.connect(mqttConnectOptions());return client;}
}

3.4 實現MQTT消息發布服務

創建MqttPublisher.java

@Service
@RequiredArgsConstructor
@Slf4j
public class MqttPublisher {private final IMqttClient mqttClient;private final MqttProperties mqttProperties;public void publish(String topic, String payload) throws MqttException {if (!mqttClient.isConnected()) {mqttClient.reconnect();}MqttMessage message = new MqttMessage(payload.getBytes());message.setQos(mqttProperties.getQos());message.setRetained(true);mqttClient.publish(topic, message);log.info("MQTT message published to topic: {}, payload: {}", topic, payload);}public void publish(String payload) throws MqttException {publish(mqttProperties.getDefaultTopic(), payload);}
}

3.5 實現MQTT消息訂閱服務

創建MqttSubscriber.java

@Service
@RequiredArgsConstructor
@Slf4j
public class MqttSubscriber {private final IMqttClient mqttClient;private final MqttProperties mqttProperties;@PostConstructpublic void init() throws MqttException {subscribe(mqttProperties.getDefaultTopic());}public void subscribe(String topic) throws MqttException {if (!mqttClient.isConnected()) {mqttClient.reconnect();}mqttClient.subscribe(topic, mqttProperties.getQos(), this::handleMessage);log.info("Subscribed to MQTT topic: {}", topic);}private void handleMessage(String topic, MqttMessage message) {String payload = new String(message.getPayload());log.info("Received MQTT message from topic: {}, payload: {}", topic, payload);// 這里可以添加業務邏輯處理接收到的消息processMessage(topic, payload);}private void processMessage(String topic, String payload) {// 示例:解析JSON格式的消息try {ObjectMapper mapper = new ObjectMapper();JsonNode jsonNode = mapper.readTree(payload);// 根據不同的topic和payload內容進行業務處理if (topic.startsWith("device/status")) {handleDeviceStatus(jsonNode);} else if (topic.startsWith("device/control")) {handleDeviceControl(jsonNode);}} catch (JsonProcessingException e) {log.error("Failed to parse MQTT message payload: {}", payload, e);}}private void handleDeviceStatus(JsonNode jsonNode) {// 處理設備狀態上報String deviceId = jsonNode.get("deviceId").asText();String status = jsonNode.get("status").asText();log.info("Device {} status updated to: {}", deviceId, status);}private void handleDeviceControl(JsonNode jsonNode) {// 處理設備控制指令響應String deviceId = jsonNode.get("deviceId").asText();String command = jsonNode.get("command").asText();String result = jsonNode.get("result").asText();log.info("Device {} executed command {} with result: {}", deviceId, command, result);}
}

四、實現雙向通信

4.1 服務器向設備發送控制指令

創建REST API接口用于發送控制指令:

@RestController
@RequestMapping("/api/device")
@RequiredArgsConstructor
@Slf4j
public class DeviceController {private final MqttPublisher mqttPublisher;@PostMapping("/control")public ResponseEntity<String> sendControlCommand(@RequestBody DeviceCommand command) {try {ObjectMapper mapper = new ObjectMapper();String payload = mapper.writeValueAsString(command);String topic = "device/control/" + command.getDeviceId();mqttPublisher.publish(topic, payload);return ResponseEntity.ok("Control command sent successfully");} catch (Exception e) {log.error("Failed to send control command", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Failed to send control command: " + e.getMessage());}}@Data@NoArgsConstructor@AllArgsConstructorpublic static class DeviceCommand {private String deviceId;private String command;private Map<String, Object> params;}
}

4.2 設備模擬客戶端

為了測試雙向通信,我們可以創建一個簡單的設備模擬客戶端:

@Component
@Slf4j
public class DeviceSimulator {private final MqttPublisher mqttPublisher;private final MqttProperties mqttProperties;private IMqttClient deviceClient;public DeviceSimulator(MqttPublisher mqttPublisher, MqttProperties mqttProperties) {this.mqttPublisher = mqttPublisher;this.mqttProperties = mqttProperties;initDeviceClient();}private void initDeviceClient() {try {String deviceId = "device-" + UUID.randomUUID().toString().substring(0, 8);deviceClient = new MqttClient(mqttProperties.getBrokerUrl(), deviceId, new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setUserName(mqttProperties.getUsername());options.setPassword(mqttProperties.getPassword().toCharArray());options.setAutomaticReconnect(true);deviceClient.connect(options);// 訂閱控制主題String controlTopic = "device/control/" + deviceId;deviceClient.subscribe(controlTopic, (topic, message) -> {String payload = new String(message.getPayload());log.info("Device received control command: {}", payload);// 模擬設備執行命令并返回響應executeCommand(payload, deviceId);});// 模擬設備定期上報狀態simulatePeriodicStatusReport(deviceId);} catch (MqttException e) {log.error("Failed to initialize device simulator", e);}}private void executeCommand(String payload, String deviceId) {try {ObjectMapper mapper = new ObjectMapper();JsonNode jsonNode = mapper.readTree(payload);String command = jsonNode.get("command").asText();// 模擬命令執行Thread.sleep(1000); // 模擬執行耗時// 構造響應ObjectNode response = mapper.createObjectNode();response.put("deviceId", deviceId);response.put("command", command);response.put("result", "success");response.put("timestamp", System.currentTimeMillis());// 發布響應String responseTopic = "device/control/response/" + deviceId;mqttPublisher.publish(responseTopic, response.toString());} catch (Exception e) {log.error("Failed to execute command", e);}}private void simulatePeriodicStatusReport(String deviceId) {ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();executor.scheduleAtFixedRate(() -> {try {ObjectMapper mapper = new ObjectMapper();ObjectNode status = mapper.createObjectNode();status.put("deviceId", deviceId);status.put("status", "online");status.put("cpuUsage", Math.random() * 100);status.put("memoryUsage", 30 + Math.random() * 50);status.put("timestamp", System.currentTimeMillis());String topic = "device/status/" + deviceId;mqttPublisher.publish(topic, status.toString());} catch (Exception e) {log.error("Failed to send status report", e);}}, 0, 10, TimeUnit.SECONDS);}
}

五、測試與驗證

5.1 測試設備狀態上報

  1. 啟動SpringBoot應用

  2. 觀察日志輸出,應該能看到設備模擬客戶端定期上報狀態信息

5.2 測試服務器控制指令

使用Postman或curl發送控制指令:

curl -X POST http://localhost:8080/api/device/control \
-H "Content-Type: application/json" \
-d '{"deviceId": "device-123456","command": "restart","params": {"delay": 5}
}'

5.3 驗證雙向通信

  1. 服務器發送控制指令到特定設備

  2. 設備接收指令并執行

  3. 設備發送執行結果回服務器

  4. 服務器接收并處理設備響應

六、高級功能擴展

6.1 消息持久化與QoS級別

  • QoS 0:最多一次,消息可能丟失

  • QoS 1:至少一次,消息不會丟失但可能重復

  • QoS 2:恰好一次,消息不丟失且不重復

根據業務需求選擇合適的QoS級別:

// 在發布消息時設置QoS
message.setQos(2); // 使用最高級別的QoS

6.2 安全配置

  1. 啟用TLS加密:

mqtt:broker-url: ssl://localhost:8883
  1. 配置EMQX的ACL規則,限制客戶端權限

6.3 集群部署

對于生產環境,可以部署EMQX集群:

# 啟動第一個節點
docker run -d --name emqx1 -p 1883:1883 -p 8081:8081 -e EMQX_NODE_NAME=emqx@node1.emqx.io -e EMQX_CLUSTER__DISCOVERY=static -e EMQX_CLUSTER__STATIC__SEEDS="emqx@node1.emqx.io,emqx@node2.emqx.io" emqx/emqx:5.0.14# 啟動第二個節點
docker run -d --name emqx2 -p 1884:1883 -p 8082:8081 -e EMQX_NODE_NAME=emqx@node2.emqx.io -e EMQX_CLUSTER__DISCOVERY=static -e EMQX_CLUSTER__STATIC__SEEDS="emqx@node1.emqx.io,emqx@node2.emqx.io" emqx/emqx:5.0.14

6.4 消息橋接與WebHook

通過EMQX的橋接功能,可以將消息轉發到其他MQTT服務器或Kafka等消息隊列。也可以通過WebHook將消息推送到HTTP服務。

七、總結

本文詳細介紹了如何使用SpringBoot整合MQTT協議,基于EMQX實現設備與服務器之間的雙向通信。主要內容包括:

  1. SpringBoot項目中集成MQTT客戶端

  2. 實現消息發布和訂閱功能

  3. 設計雙向通信機制

  4. 設備模擬與測試驗證

  5. 高級功能擴展建議

這種架構非常適合物聯網場景,能夠支持海量設備連接和實時消息通信。開發者可以根據實際業務需求,在此基礎上進行擴展和優化,構建穩定可靠的物聯網平臺。

八、參考資料

  1. EMQX官方文檔:Introduction | EMQX 5.0 Docs

  2. Eclipse Paho項目:Eclipse Paho | The Eclipse Foundation

  3. MQTT協議規范:MQTT Version 3.1.1

  4. Spring Boot官方文檔:Spring Boot

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/81824.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/81824.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/81824.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

zData X zStorage 為什么采用全閃存架構而非混閃架構?

點擊藍字 關注我們 最近有用戶問到 zData X 的存儲底座 zStorage 分布式存儲為什么采用的是全閃存架構而非混閃架構&#xff1f;主要原因還是在于全閃存架構在性能和可靠性方面具有更顯著的優勢。zData X 的上一代產品 zData 的早期版本也使用了SSD盤作為緩存的技術架構&#x…

Fiddler抓包教程->HTTP和HTTPS基礎知識

1.簡介 有的伙伴可能會好奇&#xff0c;不是講解和分享抓包工具,怎么這里開始講解HTTP和HTTPS協議了。這是因為你對HTTP協議越了解&#xff0c;你就能越掌握Fiddler的使用方法&#xff0c;反過來你越使用Fiddler&#xff0c;就越能幫助你了解HTTP協議。 Fiddler無論對開發人員…

虛擬機NAT模式獲取不到ip

虛擬機NAT模式獲取不到ip 如圖所示 解決方案&#xff1a; 先查看NetworkManager是否啟動 systemctl status NetworkManager如果沒啟動就啟動一遍 使用DHCP手動獲取一遍ip sudo dhclient ens33成功得到ip 這是后遇到了另一個問題&#xff0c;ip釋放后&#xff0c;不能自動…

Sass 基礎用法速覽

Sass 基礎用法速覽 目錄 Sass 基礎用法速覽1. 什么是 Sass&#xff1f;2. 安裝 Sass2.1 使用 npm 安裝&#xff08;推薦&#xff09;2.2 使用 Dart Sass&#xff08;官方推薦&#xff09;2.3 使用 GUI 工具 3. Sass 基本用法3.1 編譯 Sass 4. Sass 語法詳解4.1 變量4.2 嵌套4.3…

洛谷B3840 [GESP202306 二級] 找素數

題目描述 小明剛剛學習了素數的概念&#xff1a;如果一個大于 1 的正整數&#xff0c;除了 1 和它自身外&#xff0c;不能被其他正整數整除&#xff0c;則這個正整數是素數。現在&#xff0c;小明想找到兩個正整數 A 和 B 之間&#xff08;包括 A 和 B&#xff09;有多少個素數…

idea部署本地倉庫和連接放送遠程倉庫

1.下載git&#xff0c;安裝好后任意地方又鍵會出現兩個帶git的東西 2.點擊bash here的那個&#xff0c;召喚出git的小黑窗&#xff0c;輸入 git config --global user.name "你自己取名" git config --global user.email "你自己輸入你的郵箱" 3.打開id…

C++(20): 文件輸入輸出庫 —— <fstream>

目錄 一、 的核心功能 二、核心類及功能 三、核心操作示例 1. 文本文件寫入&#xff08;ofstream&#xff09; 2. 文本文件讀取&#xff08;ifstream&#xff09; 3. 二進制文件操作&#xff08;fstream&#xff09; 四、文件打開模式 五、文件指針操作 六、錯誤處理技巧…

elementUI 循環出來的表單,怎么做表單校驗?

數據結構如下&#xff1a; diversionParamList: [ { length: null, positionNumber: null, value: null, } ] 思路&#xff1a;可根據 index 動態綁定 :props 屬性值&#xff0c;校驗規則寫在:rules <div class"config-item" v-for"(item, index) in form.…

x-cmd install | Pillager:Go 語言打造的敏感信息文件系統掃描利器

目錄 Pillager 的獨特優勢安裝Pillager 的應用場景Pillager 的核心功能 還在為文件系統中潛在的敏感信息泄露而擔憂嗎&#xff1f;Pillager 是一款由 Go 語言編寫的強大工具&#xff0c;旨在幫助你輕松掃描文件系統&#xff0c;發現隱藏的密鑰、密碼、API 令牌等敏感信息。 Pil…

大模型(2)——提示工程(Prompt Engineering)

文章目錄 一、提示工程的核心概念為什么需要提示工程&#xff1f; 二、提示設計的基本原則三、實用提示工程技巧1. 角色設定法2. 示例引導法&#xff08;Few-Shot Learning&#xff09;3. 分階段提問4. 負面約束5. 溫度&#xff08;Temperature&#xff09;控制 四、不同任務類…

環境搭建

一個簡單的請求在加入spring security之前的樣子, 在瀏覽器中輸入地址就可以直接訪問 <!--引入spring security依賴--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-security</artifactId>&…

院校機試刷題第六天:1134矩陣翻轉、1052學生成績管理、1409對稱矩陣

一、1134矩陣翻轉 1.題目描述 2.解題思路 很簡單的模擬題&#xff0c;甚至只是上下翻轉&#xff0c;遍歷輸出的時候先把最下面那一行輸出即可。 3.代碼 #include <iostream> #include <vector> using namespace std;int main() {int n;cin >> n;vector&l…

軟件架構風格系列(5):數據共享架構

數據共享架構&#xff1a;如何讓數據在系統間自由“流淌”&#xff1f; 引言 在企業數字化轉型的浪潮中&#xff0c;“數據孤島”成為橫在業務創新面前的大山&#xff1a;營銷系統的用戶畫像無法同步到客服系統&#xff0c;供應鏈的庫存數據難以為銷售決策提供支撐…… 此時&…

SAP-13-內表與工作區

內表 作用&#xff1a; 內表是 ABAP 程序中一種非常重要的數據結構&#xff0c;它類似于數據庫表&#xff0c;用于在程序運行時存儲和處理數據。與數據庫表不同的是&#xff0c;內表存在于程序的內存中&#xff0c;數據的讀寫速度比從數據庫中讀取要快很多。它可以存儲多條具有…

dali本地安裝和使用

Dali&#xff08;Distance-matrix ALIgnment&#xff09;是一種廣泛使用的蛋白質結構比對工具&#xff0c;主要用于比較蛋白質三維結構之間的相似性。它通過計算蛋白質結構之間的距離矩陣來評估結構之間的相似性&#xff0c;并生成比對結果。 1. 安裝 wget http://ekhidna2.b…

Unreal 從入門到精通之SceneCaptureComponent2D實現UI層3D物體360°預覽

文章目錄 前言SceneCaptureComponent2D實現步驟新建渲染目標新建材質UI控件激活3DPreview鼠標拖動旋轉模型最后前言 我們在(電商展示/角色預覽/裝備查看)等應用場景中,經常會看到這種3D展示的頁面。 即使用相機捕獲一個3D的模型的視圖,然后把這個視圖顯示在一個UI畫布上,…

2024CCPC遼寧省賽 個人補題 ABCEGJL

Dashboard - 2024 CCPC Liaoning Provincial Contest - Codeforces 過題難度 B A J C L E G 銅獎 4 953 銀獎 6 991 金獎 8 1664 B&#xff1a; 模擬題 // Code Start Here string s;cin >> s;reverse(all(s));cout << s << endl;A&#xff1a;很…

Java基礎 Day17

一、遞歸 方法直接或者間接調用本身 將大問題, 層層轉化為一個與原問題相似的、規模更小的問題來解決 二、異常 程序在編譯或執行過程中&#xff0c;出現的非正常的情況 (錯誤) 語法錯誤不是異常 1、閱讀異常信息 從下往上看&#xff1a;發生異常的位置、異常名稱、發生異…

hook原理和篡改猴編寫hook腳本

hook原理&#xff1a; hook是常用于js反編譯的技術&#xff1b;翻譯就是鉤子&#xff0c;他的原理就是劫持js的函數然后進行篡改 一段簡單的js代碼 &#xff1a;這個代碼是順序執行的 function test01(){console.log(test01)test02() } function test02(){console.log(02)tes…

使用 Vue 展示 Markdown 文本

使用 Vue 展示 Markdown 文本可以通過以下幾種方法&#xff1a; 方法 1&#xff1a;使用 v-html 指令 可以使用 v-html 指令來渲染 Markdown 文本&#xff1a; <template><div v-html"markdownText"></div> </template> <script>e…