在物聯網(IoT)和智能設備橫行的今天,你有沒有遇到這樣的問題:
服務端需要實時把報警、狀態更新、控制指令推送給客戶端;
安卓 App、嵌入式設備、網頁等終端,需要輕量且穩定的連接方式;
HTTP 太“重”、WebSocket 配置又麻煩?
這時,輕量級消息傳輸協議 MQTT(Message Queuing Telemetry Transport)登場!
一句話理解 MQTT:專為低帶寬、高并發、實時通信設計的發布-訂閱協議。
那么問題來了 —— 在 Spring Boot 項目中,如何快速、優雅、高可控地落地 MQTT?
-01-
MQTT 接入方案選擇?
MQTT 本身只是一種通信協議,并不指定你用哪個消息中間件。而目前支持 MQTT 的主流 Broker 包括:
Broker | 特點簡述 |
---|---|
Mosquitto | 輕量級,C語言實現,非常穩定 |
RabbitMQ | 插件支持 MQTT,易與現有系統整合 |
EMQX | 高性能 MQTT Broker,專為 IoT 優化 |
HiveMQ | 商用支持強,價格偏貴 |
本次我們采用的是:RabbitMQ + MQTT 插件,實現服務端到安卓客戶端的推送通知,配合 Spring Boot 框架,集成簡便,生產可用!
-02-
MQTT 三大角色?
MQTT,就像微信一樣:
Publisher(發布者):你發朋友圈
Broker(中間人):微信服務器
Subscriber(訂閱者):看到你朋友圈的朋友
也就是說,消息不是點對點的,而是“你說一句,誰訂閱了就能聽到”。
-03-
實戰解析
Spring Boot + RabbitMQ MQTT 實現推送系統
整體架構:
[Spring Boot服務]?--發布消息-->?[RabbitMQ MQTT插件]?-->?[MQTT客戶端訂閱接收消息]
RabbitMQ 開啟 MQTT 插件
rabbitmq-plugins?enable?rabbitmq_mqtt ? ? ? ?# 服務端 MQTT 協議,端口1883
rabbitmq-plugins?enable?rabbitmq_web_mqtt ? ?# Web前端用 MQTT 協議,端口15675
引入依賴
<dependency>
??<groupId>org.springframework.integration</groupId>
??<artifactId>spring-integration-mqtt</artifactId>
</dependency>
配置 application.yml
mqtt-push:
? clientId: mqtt_client_
? serverClientId: mqtt_server_
? servers: tcp://127.0.0.1:1883
? username: guest
? password: guest
? defaultTopic: sensor/+/temperature
配置連接工廠
@Bean
public?MqttPahoClientFactory?mqttClientFactory()?{
? ??DefaultMqttPahoClientFactory?factory?=?new?DefaultMqttPahoClientFactory();
? ??MqttConnectOptions?options?=?new?MqttConnectOptions();
? ? options.setServerURIs(servers.split(","));
? ? options.setCleanSession(false);
? ? options.setUserName(username);
? ? options.setPassword(password.toCharArray());
? ? options.setKeepAliveInterval(20);
? ? factory.setConnectionOptions(options);
? ??return?factory;
}
服務端推送消息
@Bean
@ServiceActivator(inputChannel =?"mqttOutboundChannel")
public?MessageHandler?mqttOutbound() {
? ??MqttPahoMessageHandler?handler =?new?MqttPahoMessageHandler(serverClientId +?"producer_"?+?RandomUtil.getRandomStr(),?mqttClientFactory());
? ? handler.setAsync(true);
? ? handler.setDefaultQos(1);
? ? handler.setDefaultTopic(defaultTopic);
? ??return?handler;
}
使用接口發送消息:
@MessagingGateway(defaultRequestChannel =?"mqttOutboundChannel")
public?interface?MqttGateway?{
? ? void sendMessage2Mqtt(String?data,?@Header(MqttHeaders.TOPIC)?String topic);
}
服務端監聽客戶端消息
@Bean
public?MessageProducer?inbound() {
? ??MqttPahoMessageDrivenChannelAdapter?adapter =?new?MqttPahoMessageDrivenChannelAdapter(
? ? ? ? clientId +?"consumer_"?+?RandomUtil.getRandomStr(),?mqttClientFactory(), defaultTopic);
? ? adapter.setQos(2);
? ? adapter.setOutputChannel(mqttInputChannel());
? ??return?adapter;
}
處理消息回調:
@Bean
@ServiceActivator(inputChannel =?"mqttInputChannel")
public?MessageHandler?mqttInMessageHandler() {
? ??return?message -> {
? ? ? ??String?topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();
? ? ? ??String?payload = message.getPayload().toString();
? ? ? ? log.info("收到消息:主題 [{}] 內容 [{}]", topic, payload);
? ? };
}
MQTTBox 測試
MQTTBox 是一款強大的 MQTT 測試工具,可以模擬發送消息,也能訂閱查看接收到的消息:
1. 發布測試
使用 MQTTBox 向?sensor/s123/temperature
?發布消息
服務端通過通配符?sensor/+/temperature
?成功收到消息!
2. 控制器測試
@PostMapping("/sendMessage")
public?String?sendMqtt(@RequestBody?ReqSendMsgDTO dto) {
? ? mqttGateway.sendMessage2Mqtt(dto.getTopic(), dto.getPayload());
? ??return?"SUCCESS";
}
-04-
總結
實踐建議
clientId?必須唯一,推薦使用 UUID 或服務實例標識;
QoS 建議使用 1(至少一次),避免消息丟失;
若用 RabbitMQ,也可以使用 Exchange + Topic Binding 方式做高級路由;
對于高并發或長連接推送,推薦結合 Netty 或 Gateway 層限流處理。
技術方案
能力點 | 技術實現 |
---|
協議支持 | MQTT(通過 rabbitmq_mqtt 插件) |
服務端推送 | Spring Integration + MqttGateway |
客戶端訂閱 | MqttPahoMessageDrivenChannelAdapter |
工具聯調 | MQTTBox / Postman / 模擬器 |
安全與穩定性 | 唯一 clientId、QoS 保證、自動重連 |