一、目的
本文主要記錄物聯網設備接入MQTT以及對接服務端SpringBoot整個的交互流程和使用。
二、概念
2.1什么是MQTT?
MQTT是基于TCP/IP協議棧構建的異步通信消息協議,是一種輕量級的發布、訂閱信息傳輸協議。可以在不可靠的網絡環境中進行擴展,適用于設備硬件存儲空間或網絡帶寬有限的場景。使用MQTT協議,消息發送者與接收者不受時間和空間的限制。物聯網平臺支持設備使用MQTT協議接入。
簡單來講:為物聯網弱聯網設備提供可靠的消息傳輸,比如傳感器、藍牙、手表等。本文的應用場景是葡萄糖檢測儀,實時上傳接受推送消息至服務器,服務器推送到設備端通知消息等。
2.2工作原理
當然不能直接使用,下邊簡單圖解一下
MQTT(Message Queuing Telemetry Transport)由IBM于1999年開發的一種基于發布訂閱模式"的輕量級的消息傳輸協議!
發布訂閱模式是一種傳統的客戶端-服務器架構的替代方案
,因為一般傳統的客戶端-服務器是客戶端能夠直接和服務器進行通信完成消息的傳輸。發布訂閱模式會將發送消息的發布者publisher與接收消息的訂閱者subscribers進行分離
,publisher與subscribers 并不會直接通信,他們甚至都不清楚對方是否存在,他們之間的交流由第三方組件broker
代理。
看到這個有沒有很熟悉?像不像RabbitMQ的發布訂閱?像不像RocketMQ?
沒錯,其實都是一個套路。
MQTT特殊的是,它只是一種通信協議,如果想要使用它,就需要基于MQTT協議的服務端實現,哎,這個服務端的實現,就類似消息中心的功能,負責消息的中轉,甚至還能在客戶端崩潰時,緩存接收到的消息(正因為此,它的可靠性是極高的)。
看到中間的 MQTT Broker了嗎,這個就是對MQTT協議服務端的實現,負責處理客戶端請求的關鍵組件,包括建立連接、斷開連接、訂閱和取消訂閱等操作,同時還負責消息的轉發。
2.3如何使用
如果沒有購買云消息隊列MQTT,就需要用第三方實現了MQTT的消息代理
2.3.1 EMQX
EMQX,是一款實現了MQTT協議的,開源的MQTT消息代理軟件
。MQTT定義了消息通訊的規則和流程,而EMQX則是遵循這些規則的軟件,使得設備能夠依據MQTT協議進行有效通信。在新版本的EMQX中同時支持MQTT3.1.1協議和5.0協議
。
下載地址:
官網地址
其他代理軟件
2.3.2 EMQX部署
選擇EMQX企業版進行部署:企業版
購買云服務器ECS(不想買的話可以用虛擬機安裝,也可以去薅免費試用的),安裝Docker:
# 移除舊版本docker
sudo yum remove docker \docker-client \docker-client-latest \docker-common \docker-latest \docker-latest-logrotate \docker-logrotate \docker-engine# 配置docker yum源。
sudo yum install -y yum-utils
sudo yum-config-manager \
--add-repo \
http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo# 安裝 最新 docker
sudo yum install -y docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin# 啟動& 開機啟動docker; enable + start 二合一
systemctl enable docker --now# 配置加速
sudo mkdir -p /etc/docker
sudo tee /etc/docker/daemon.json <<-'EOF'
{"registry-mirrors": ["https://82m9ar63.mirror.aliyuncs.com"]
}
EOF
sudo systemctl daemon-reload
sudo systemctl restart docker
運行啟動
docker run -d --name emqx-enterprise \-p 1883:1883 -p 8083:8083 \-p 8084:8084 -p 8883:8883 \-p 18083:18083 \-v emqx_data:/opt/emqx/data \-v emqx_log:/opt/emqx/log \-v emqx_etc:/opt/emqx/etc \emqx/emqx-enterprise:5.6.1
常見端口介紹:
端口號 | 說明 |
---|---|
1883 | TCP端口 |
8083 | WebSocket端口 |
8084 | WebSocket Secure 端口 |
8883 | SSL/TLS 端口 |
18083 | Broker的Dashboard訪問端口號 |
下邊就可以使用了,至于這個軟件的功能,這里不贅述了,有興趣的可以找我要筆記。
2.4 客戶端
2.4.1 運行docker容器模擬消息接收客戶端
docker run -e TOPIC="xxx" \-e INSTANCE_ID="mqtt-cn-xx" \-e ENDPOINT="xx" \-e DEVICE_ID="xx" \-e GROUP_ID="您在 Group 管理頁面中創建的 Group 的 ID" \-e AK="您訪問阿里云的 AccessKey" \-e SK="您訪問阿里云的 SecretKey" \registry.cn-hangzhou.aliyuncs.com/aliyun-mq/mqtt
# TOPIC 你創建的Topic
# -e TOPIC="cgm_monitor"
# 設置 MQTT 的主題(Topic)為 cgm_monitor。MQTT 客戶端將訂閱或發布到這個主題。# -e INSTANCE_ID="mqtt-cn-hic4av7iy01"
# 設置阿里云 MQTT 實例的 ID。這是您在阿里云上創建的 MQTT 實例的唯一標識。# -e ENDPOINT="mqtt-cn-hic4av7iy01.mqtt.aliyuncs.com"
# 設置 MQTT 服務的接入點(Endpoint),即 MQTT 服務器的地址。# -e DEVICE_ID="i3c7bfbe"
# 設置設備 ID。MQTT 客戶端會以這個設備身份連接到阿里云 MQTT 服務。# -e GROUP_ID="您在 Group 管理頁面中創建的 Group 的 ID"
# 設置設備所屬的 Group ID。Group 是阿里云 MQTT 服務中用于管理一組設備的邏輯單元。
不懂看下圖
先建立topic才能有公網訪問地址,也要建Group
建立topic
建立Group
簽名校驗得到,Client ID/用戶名/密碼
這樣 去云服務運行上面這段docker命令,就可以得到一個連接云MQTT的客戶端,也就是消費者。
2.4.2 使用MQTTX客戶端
MQTTX 簡化了使用 MQTT broker 的過程,包括連接,發布與訂閱消息主題。無論你使用桌面版,命令行,或是網頁版,MQTTX 使每個關鍵步驟都更加順滑。
官網地址:MQTTX
下載后傻瓜式安裝就行,下一步直到完成。
可以設置下中文
點擊新建連接
可以參考我的配置:
如果用的不是云MQTT,本地自己docker安裝的 其實訪問更簡單,直接界面化Broker那里自己設置用戶名/密碼。
下邊就可以連接了
如果你都配置正確,點擊連接會出現已連接
下邊演示下在阿里云MQTT控制臺,快速體驗消息收發:
可以看到,已經發送成功了!那么客戶端MQTTX有沒有收到消息呢?
可以看到,確實是收到了!!!如此便成功了!
2.4.3SpringBoot集成MQTT
這里把源碼貼在這里,有興趣的也可以去阿里云官網看,一樣的
package com.kiki.app.mqtt.demo;import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;import com.kiki.app.util.ConnectionOptionWrapper;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;/*** 本代碼提供簽名鑒權模式下 MQ4IOT 客戶端發送消息到 MQ4IOT 客戶端的示例,其中初始化參數請根據實際情況修改* 簽名模式即使用阿里云賬號系統提供的 AccessKey 和 SecretKey 對每個客戶端計算出一個獨立的簽名供客戶端識別使用。* 對于實際業務場景使用過程中,考慮到私鑰 SecretKey 的隱私性,可以將簽名過程放在受信任的環境完成。** 完整 demo 工程,參考https://github.com/AliwareMQ/lmq-demo*/
public class MQ4IoTSendMessageToMQ4IoTUseSignatureMode {public static void main(String[] args) throws Exception {/*** MQ4IOT 實例 ID,購買后控制臺獲取*/String instanceId = "test001";/*** 接入點地址,購買 MQ4IOT 實例,且配置完成后即可獲取,接入點地址必須填寫分配的域名,不得使用 IP 地址直接連接,否則可能會導致客戶端異常。* */String endPoint = "xxx";/*** 賬號 accesskey,從賬號系統控制臺獲取* 阿里云賬號AccessKey擁有所有API的訪問權限,建議您使用RAM用戶進行API訪問或日常運維。* 強烈建議不要把AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導致AccessKey泄露,威脅您賬號下所有資源的安全。* 本示例以把AccessKey ID和AccessKey Secret保存在環境變量為例說明。運行本代碼示例之前,請先配置環境變量MQTT_AK_ENV和MQTT_SK_ENV* 例如:export MQTT_AK_ENV=<access_key_id>* export MQTT_SK_ENV=<access_key_secret>* 需要將<access_key_id>替換為已準備好的AccessKey ID,<access_key_secret>替換為AccessKey Secret。*/String accessKey = "xxx";/*** 賬號 secretKey,從賬號系統控制臺獲取,僅在Signature鑒權模式下需要設置*/String secretKey = "xxx";/*** MQ4IOT clientId,由業務系統分配,需要保證每個 tcp 連接都不一樣,保證全局唯一,如果不同的客戶端對象(tcp 連接)使用了相同的 clientId 會導致連接異常斷開。* clientId 由兩部分組成,格式為 GroupID@@@DeviceId,其中 groupId 在 MQ4IOT 控制臺申請,DeviceId 由業務方自己設置,clientId 總長度不得超過64個字符。*/String clientId = "GID_test01@@@pub001";/*** MQ4IOT 消息的一級 topic,需要在控制臺申請才能使用。* 如果使用了沒有申請或者沒有被授權的 topic 會導致鑒權失敗,服務端會斷開客戶端連接。*/final String parentTopic = "xxx";/*** MQ4IOT支持子級 topic,用來做自定義的過濾,此處為示意,可以填寫任何字符串,具體參考https://help.aliyun.com/document_detail/42420.html?spm=a2c4g.11186623.6.544.1ea529cfAO5zV3* 需要注意的是,完整的 topic 參考 https://help.aliyun.com/document_detail/63620.html?spm=a2c4g.11186623.6.554.21a37f05ynxokW。*/final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";/*** QoS參數代表傳輸質量,可選0,1,2,根據實際需求合理設置,具體參考 https://help.aliyun.com/document_detail/42420.html?spm=a2c4g.11186623.6.544.1ea529cfAO5zV3*/final int qosLevel = 0;ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);final MemoryPersistence memoryPersistence = new MemoryPersistence();/*** 客戶端使用的協議和端口必須匹配,具體參考文檔 https://help.aliyun.com/document_detail/44866.html?spm=a2c4g.11186623.6.552.25302386RcuYFB* 如果是 SSL 加密則設置ssl://endpoint:8883*/final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);/*** 客戶端設置好發送超時時間,防止無限阻塞*/mqttClient.setTimeToWait(5000);final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());mqttClient.setCallback(new MqttCallbackExtended() {@Overridepublic void connectComplete(boolean reconnect, String serverURI) {/*** 客戶端連接成功后就需要盡快訂閱需要的 topic*/System.out.println("connect success---------------------------------------------------");executorService.submit(new Runnable() {@Overridepublic void run() {try {final String topicFilter[] = {mq4IotTopic};final int[] qos = {qosLevel};mqttClient.subscribe(topicFilter, qos);} catch (MqttException e) {e.printStackTrace();}}});}@Overridepublic void connectionLost(Throwable throwable) {throwable.printStackTrace();}@Overridepublic void messageArrived(String s, MqttMessage mqttMessage) throws Exception {/*** 消費消息的回調接口,需要確保該接口不拋異常,該接口運行返回即代表消息消費成功。* 消費消息需要保證在規定時間內完成,如果消費耗時超過服務端約定的超時時間,對于可靠傳輸的模式,服務端可能會重試推送,業務需要做好冪等去重處理。超時時間約定參考限制* https://help.aliyun.com/document_detail/63620.html?spm=a2c4g.11186623.6.546.229f1f6ago55Fj*/System.out.println("receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);}});mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());for (int i = 0; i < 10; i++) {MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());message.setQos(qosLevel);/*** 發送普通消息時,topic 必須和接收方訂閱的 topic 一致,或者符合通配符匹配規則*/mqttClient.publish(mq4IotTopic, message);/*** MQ4IoT支持點對點消息,即如果發送方明確知道該消息只需要給特定的一個設備接收,且知道對端的 clientId,則可以直接發送點對點消息。* 點對點消息不需要經過訂閱關系匹配,可以簡化訂閱方的邏輯。點對點消息的 topic 格式規范是 {{parentTopic}}/p2p/{{targetClientId}}*/final String p2pSendTopic = parentTopic + "/p2p/" + clientId;message = new MqttMessage("hello mq4Iot p2p msg".getBytes());message.setQos(qosLevel);mqttClient.publish(p2pSendTopic, message);}Thread.sleep(Long.MAX_VALUE);}
}
有需要的同學,可以私聊找我要源碼!!!