一、引言
隨著物聯網(IoT)技術的快速發展,MQTT(Message Queuing Telemetry Transport)協議因其輕量級、低功耗和高效的特點,已成為物聯網設備通信的事實標準。本文將詳細介紹如何使用SpringBoot框架整合MQTT協議,基于開源MQTT代理EMQX實現設備與服務器之間的雙向通信。
二、技術選型與環境準備
2.1 技術棧介紹
-
SpringBoot 2.7.x:簡化Spring應用初始搭建和開發過程
-
EMQX 5.0:開源的大規模分布式MQTT消息服務器
-
Eclipse Paho:流行的MQTT客戶端庫
-
Lombok:簡化Java Bean編寫
2.2 環境準備
-
安裝EMQX服務器(可使用Docker快速部署):
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:5.0.14
-
確保Java開發環境(JDK 11+)和Maven已安裝
三、SpringBoot項目集成MQTT
3.1 創建SpringBoot項目并添加依賴
在pom.xml
中添加必要的依賴:
<dependencies><!-- SpringBoot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!-- MQTT Paho Client --><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency><!-- Lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><!-- JSON處理 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>
</dependencies>
3.2 配置MQTT連接參數
在application.yml
中添加配置:
mqtt:broker-url: tcp://localhost:1883username: emqxpassword: publicclient-id: springboot-serverdefault-topic: device/statustimeout: 30keepalive: 60qos: 1clean-session: true
創建配置類MqttProperties.java
:
@Data
@Configuration
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {private String brokerUrl;private String username;private String password;private String clientId;private String defaultTopic;private int timeout;private int keepalive;private int qos;private boolean cleanSession;
}
3.3 實現MQTT客戶端配置
創建MqttConfiguration.java
:
@Configuration
@RequiredArgsConstructor
public class MqttConfiguration {private final MqttProperties mqttProperties;@Beanpublic MqttConnectOptions mqttConnectOptions() {MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(new String[]{mqttProperties.getBrokerUrl()});options.setUserName(mqttProperties.getUsername());options.setPassword(mqttProperties.getPassword().toCharArray());options.setConnectionTimeout(mqttProperties.getTimeout());options.setKeepAliveInterval(mqttProperties.getKeepalive());options.setCleanSession(mqttProperties.isCleanSession());options.setAutomaticReconnect(true);return options;}@Beanpublic IMqttClient mqttClient() throws MqttException {IMqttClient client = new MqttClient(mqttProperties.getBrokerUrl(), mqttProperties.getClientId(), new MemoryPersistence());client.connect(mqttConnectOptions());return client;}
}
3.4 實現MQTT消息發布服務
創建MqttPublisher.java
:
@Service
@RequiredArgsConstructor
@Slf4j
public class MqttPublisher {private final IMqttClient mqttClient;private final MqttProperties mqttProperties;public void publish(String topic, String payload) throws MqttException {if (!mqttClient.isConnected()) {mqttClient.reconnect();}MqttMessage message = new MqttMessage(payload.getBytes());message.setQos(mqttProperties.getQos());message.setRetained(true);mqttClient.publish(topic, message);log.info("MQTT message published to topic: {}, payload: {}", topic, payload);}public void publish(String payload) throws MqttException {publish(mqttProperties.getDefaultTopic(), payload);}
}
3.5 實現MQTT消息訂閱服務
創建MqttSubscriber.java
:
@Service
@RequiredArgsConstructor
@Slf4j
public class MqttSubscriber {private final IMqttClient mqttClient;private final MqttProperties mqttProperties;@PostConstructpublic void init() throws MqttException {subscribe(mqttProperties.getDefaultTopic());}public void subscribe(String topic) throws MqttException {if (!mqttClient.isConnected()) {mqttClient.reconnect();}mqttClient.subscribe(topic, mqttProperties.getQos(), this::handleMessage);log.info("Subscribed to MQTT topic: {}", topic);}private void handleMessage(String topic, MqttMessage message) {String payload = new String(message.getPayload());log.info("Received MQTT message from topic: {}, payload: {}", topic, payload);// 這里可以添加業務邏輯處理接收到的消息processMessage(topic, payload);}private void processMessage(String topic, String payload) {// 示例:解析JSON格式的消息try {ObjectMapper mapper = new ObjectMapper();JsonNode jsonNode = mapper.readTree(payload);// 根據不同的topic和payload內容進行業務處理if (topic.startsWith("device/status")) {handleDeviceStatus(jsonNode);} else if (topic.startsWith("device/control")) {handleDeviceControl(jsonNode);}} catch (JsonProcessingException e) {log.error("Failed to parse MQTT message payload: {}", payload, e);}}private void handleDeviceStatus(JsonNode jsonNode) {// 處理設備狀態上報String deviceId = jsonNode.get("deviceId").asText();String status = jsonNode.get("status").asText();log.info("Device {} status updated to: {}", deviceId, status);}private void handleDeviceControl(JsonNode jsonNode) {// 處理設備控制指令響應String deviceId = jsonNode.get("deviceId").asText();String command = jsonNode.get("command").asText();String result = jsonNode.get("result").asText();log.info("Device {} executed command {} with result: {}", deviceId, command, result);}
}
四、實現雙向通信
4.1 服務器向設備發送控制指令
創建REST API接口用于發送控制指令:
@RestController
@RequestMapping("/api/device")
@RequiredArgsConstructor
@Slf4j
public class DeviceController {private final MqttPublisher mqttPublisher;@PostMapping("/control")public ResponseEntity<String> sendControlCommand(@RequestBody DeviceCommand command) {try {ObjectMapper mapper = new ObjectMapper();String payload = mapper.writeValueAsString(command);String topic = "device/control/" + command.getDeviceId();mqttPublisher.publish(topic, payload);return ResponseEntity.ok("Control command sent successfully");} catch (Exception e) {log.error("Failed to send control command", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Failed to send control command: " + e.getMessage());}}@Data@NoArgsConstructor@AllArgsConstructorpublic static class DeviceCommand {private String deviceId;private String command;private Map<String, Object> params;}
}
4.2 設備模擬客戶端
為了測試雙向通信,我們可以創建一個簡單的設備模擬客戶端:
@Component
@Slf4j
public class DeviceSimulator {private final MqttPublisher mqttPublisher;private final MqttProperties mqttProperties;private IMqttClient deviceClient;public DeviceSimulator(MqttPublisher mqttPublisher, MqttProperties mqttProperties) {this.mqttPublisher = mqttPublisher;this.mqttProperties = mqttProperties;initDeviceClient();}private void initDeviceClient() {try {String deviceId = "device-" + UUID.randomUUID().toString().substring(0, 8);deviceClient = new MqttClient(mqttProperties.getBrokerUrl(), deviceId, new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setUserName(mqttProperties.getUsername());options.setPassword(mqttProperties.getPassword().toCharArray());options.setAutomaticReconnect(true);deviceClient.connect(options);// 訂閱控制主題String controlTopic = "device/control/" + deviceId;deviceClient.subscribe(controlTopic, (topic, message) -> {String payload = new String(message.getPayload());log.info("Device received control command: {}", payload);// 模擬設備執行命令并返回響應executeCommand(payload, deviceId);});// 模擬設備定期上報狀態simulatePeriodicStatusReport(deviceId);} catch (MqttException e) {log.error("Failed to initialize device simulator", e);}}private void executeCommand(String payload, String deviceId) {try {ObjectMapper mapper = new ObjectMapper();JsonNode jsonNode = mapper.readTree(payload);String command = jsonNode.get("command").asText();// 模擬命令執行Thread.sleep(1000); // 模擬執行耗時// 構造響應ObjectNode response = mapper.createObjectNode();response.put("deviceId", deviceId);response.put("command", command);response.put("result", "success");response.put("timestamp", System.currentTimeMillis());// 發布響應String responseTopic = "device/control/response/" + deviceId;mqttPublisher.publish(responseTopic, response.toString());} catch (Exception e) {log.error("Failed to execute command", e);}}private void simulatePeriodicStatusReport(String deviceId) {ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();executor.scheduleAtFixedRate(() -> {try {ObjectMapper mapper = new ObjectMapper();ObjectNode status = mapper.createObjectNode();status.put("deviceId", deviceId);status.put("status", "online");status.put("cpuUsage", Math.random() * 100);status.put("memoryUsage", 30 + Math.random() * 50);status.put("timestamp", System.currentTimeMillis());String topic = "device/status/" + deviceId;mqttPublisher.publish(topic, status.toString());} catch (Exception e) {log.error("Failed to send status report", e);}}, 0, 10, TimeUnit.SECONDS);}
}
五、測試與驗證
5.1 測試設備狀態上報
-
啟動SpringBoot應用
-
觀察日志輸出,應該能看到設備模擬客戶端定期上報狀態信息
5.2 測試服務器控制指令
使用Postman或curl發送控制指令:
curl -X POST http://localhost:8080/api/device/control \
-H "Content-Type: application/json" \
-d '{"deviceId": "device-123456","command": "restart","params": {"delay": 5}
}'
5.3 驗證雙向通信
-
服務器發送控制指令到特定設備
-
設備接收指令并執行
-
設備發送執行結果回服務器
-
服務器接收并處理設備響應
六、高級功能擴展
6.1 消息持久化與QoS級別
-
QoS 0:最多一次,消息可能丟失
-
QoS 1:至少一次,消息不會丟失但可能重復
-
QoS 2:恰好一次,消息不丟失且不重復
根據業務需求選擇合適的QoS級別:
// 在發布消息時設置QoS
message.setQos(2); // 使用最高級別的QoS
6.2 安全配置
-
啟用TLS加密:
mqtt:broker-url: ssl://localhost:8883
-
配置EMQX的ACL規則,限制客戶端權限
6.3 集群部署
對于生產環境,可以部署EMQX集群:
# 啟動第一個節點
docker run -d --name emqx1 -p 1883:1883 -p 8081:8081 -e EMQX_NODE_NAME=emqx@node1.emqx.io -e EMQX_CLUSTER__DISCOVERY=static -e EMQX_CLUSTER__STATIC__SEEDS="emqx@node1.emqx.io,emqx@node2.emqx.io" emqx/emqx:5.0.14# 啟動第二個節點
docker run -d --name emqx2 -p 1884:1883 -p 8082:8081 -e EMQX_NODE_NAME=emqx@node2.emqx.io -e EMQX_CLUSTER__DISCOVERY=static -e EMQX_CLUSTER__STATIC__SEEDS="emqx@node1.emqx.io,emqx@node2.emqx.io" emqx/emqx:5.0.14
6.4 消息橋接與WebHook
通過EMQX的橋接功能,可以將消息轉發到其他MQTT服務器或Kafka等消息隊列。也可以通過WebHook將消息推送到HTTP服務。
七、總結
本文詳細介紹了如何使用SpringBoot整合MQTT協議,基于EMQX實現設備與服務器之間的雙向通信。主要內容包括:
-
SpringBoot項目中集成MQTT客戶端
-
實現消息發布和訂閱功能
-
設計雙向通信機制
-
設備模擬與測試驗證
-
高級功能擴展建議
這種架構非常適合物聯網場景,能夠支持海量設備連接和實時消息通信。開發者可以根據實際業務需求,在此基礎上進行擴展和優化,構建穩定可靠的物聯網平臺。
八、參考資料
-
EMQX官方文檔:Introduction | EMQX 5.0 Docs
-
Eclipse Paho項目:Eclipse Paho | The Eclipse Foundation
-
MQTT協議規范:MQTT Version 3.1.1
-
Spring Boot官方文檔:Spring Boot