場景介紹
實現測試客戶端與 MQTT 服務器的連接、訂閱、取消訂閱、收發消息等功能。
MQTT發布消息到代理服務器
安裝paho-mqtt
使用pip工具安裝paho-mqtt,輸入以下指令即可:
sudo pip install paho-mqtt
安裝 MQTT 客戶端庫
為了方便連接到 MQTT 服務器,我們需要安裝 paho-mqtt 庫。可以選擇以下兩種方法之一進行安裝。
1、使用源碼安裝
git clone https://github.com/eclipse/paho.mqtt.python
cd paho.mqtt.python
python3 setup.py install
2、使用 pip3 安裝
pip3 install paho-mqtt
MQTT 的使用
連接 MQTT 服務器
可以使用 EMQX 提供的 免費公共 MQTT 服務器,該服務基于 EMQX 的 MQTT 物聯網云平臺 創建。服務器接入信息如下:
● Broker: broker.emqx.io
● TCP Port: 1883
● Websocket Port: 8083
如果有需要,也可以使用 docker 在本地快速安裝 EMQX 服務器。
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 18083:18083 emqx/emqx
連接測試代碼
est_connect.py
import paho.mqtt.client as mqtt 回調函數。當嘗試與 MQTT broker 建立連接時,觸發該函數。
client 是本次連接的客戶端實例。
userdata 是用戶的信息,一般為空。但如果有需要,也可以通過 user_data_set 函數設置。
flags 保存服務器響應標志的字典。
rc 是響應碼。一般情況下,我們只需要關注 rc 響應碼是否為 0 就可以了。
def on_connect(client, userdata, flags, rc):if rc == 0:print("Connected success")else:print(f"Connected fail with code {rc}")client = mqtt.Client()
client.on_connect = on_connect
client.connect("broker.emqx.io", 1883, 60)
client.loop_forever()
將上面的代碼保存為 test_connect.py 文件,并運行
python3 test_connect.py
我們在 on_connect 函數里對響應碼進行了判斷,為 0 則輸出 Connected success 表示連接成功。如果返回的是其它數字,我們就需要對照下面的響應碼進行判斷。
在 MQTT 協議的概念中,消息是通過主題傳遞的,比如設備 A 向主題 T 發送消息,那么只有訂閱了主題 T 的設備才能接收到。所以僅僅接入 MQTT 服務器并沒有太大意議,要完整地使用 MQTT 服務,我們還需要知道如何訂閱和發布消息。
訂閱消息
打開任意編輯器,輸入下面的代碼,并保存為 subscriber.py 文件:
#subscriber.py
import paho.mqtt.client as mqttdef on_connect(client, userdata, flags, rc):print(f"Connected with result code {rc}")# 訂閱,需要放在 on_connect 里# 如果與 broker 失去連接后重連,仍然會繼續訂閱 raspberry/topic 主題client.subscribe("raspberry/topic")# 回調函數,當收到消息時,觸發該函數
def on_message(client, userdata, msg):print(f"{msg.topic} {msg.payload}")client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message# 設置遺囑消息,當樹莓派斷電,或者網絡出現異常中斷時,發送遺囑消息給其他客戶端
client.will_set('raspberry/status', b'{"status": "Off"}')# 創建連接,三個參數分別為 broker 地址,broker 端口號,保活時間
client.connect("broker.emqx.io", 1883, 60)# 設置網絡循環堵塞,在調用 disconnect() 或程序崩潰前,不會主動結束程序
client.loop_forever()
調用 subscribe() 函數,可以讓樹莓派訂閱一個主題。在上面的代碼中,我們使用它訂閱了 raspberry/topic 主題,并監聽消息。
另外,我們還使用 will_set() 設置了遺囑消息。 遺囑消息是 MQTT 的一個特性,當設備在意外斷開網絡連接后,會向某個特定的主題發送消息。通過這個特性,可以得知樹莓派是否斷電,或者出現網絡異常。
發布消息
打開任意編輯器,輸入下面的代碼,并保存為 publisher.py 文件:
import paho.mqtt.client as mqtt
import timedef on_connect(client, userdata, flags, rc):print(f"Connected with result code {rc}")client = mqtt.Client()
client.on_connect = on_connect
client.connect("broker.emqx.io", 1883, 60)
# 每間隔 1 秒鐘向 raspberry/topic 發送一個消息,連續發送 5 次
for i in range(5):# 四個參數分別為:主題,發送內容,QoS, 是否保留消息client.publish('raspberry/topic', payload=i, qos=0, retain=False)print(f"send {i} to raspberry/topic")time.sleep(1)client.loop_forever()
調用 publish() 函數,可以向一個主題發送消息。在上面的代碼中,我們使用了它向主題 raspberry/topic 發送消息。其中參數 QoS 是另一個 MQTT 特性,這里我們暫且設為 0。
MQTT客戶端下載安裝
瀏覽器輸入網址:https://mqttx.app/zh下載MQTTX客戶端工具進行測試。
測試訂閱消息
運行 Python 代碼,并主動發送一個消息。
-
打開終端,運行 Python 代碼,監聽消息 。
python3 subscriber.py -
使用 MQTTX 客戶端與 MQTT 服務器建立連接,并向主題 raspberry/topic 發送消息 。
-
查看樹莓派終端信息,將會看到已成功接收到 MQTTX 發布的消息。
測試發布消息
-
在 MQTTX 客戶端中訂閱 raspberry/topic 主題 。
-
在終端運行 Python 代碼。
-
在 MQTTX 客戶端中,查看樹莓派發送的消息。
總結
我們完成了在樹莓派上使用 Python MQTT 客戶端庫 paho-mqtt 編寫測試客戶端, 并實現了測試客戶端與 MQTT 服務器的連接、訂閱、取消訂閱、收發消息等功能。