場景介紹
今天程序貓帶領大家如何獲取樹莓派傳感器溫濕度數據,并用MQTT進行廣播。
實現過程
啟動MQTT服務
1、終端啟動Mosquitto服務
sudo systemctl start mosquitto
2、設置服務開機自動啟動
sudo systemctl enable mosquitto
硬件連接
樹莓派4b連接GPIO引腳與DHT11傳感器;
硬件只涉及樹莓派、DHT11傳感器。
DHT11的信號引腳連接樹莓派的GPIO17, DHT11的Vdd(+)和GND引腳(-)分別連接樹莓派的電源正極和GND引腳。
實現思路
1、通過樹莓派GPIO來連接并控制DHT11傳感器,獲得溫度,濕度數據;
2、使用MQTT將樹莓派所獲取的溫度與濕度數據廣播;
3、訂閱MQTT服務器的廣播獲取濕度與溫度數據;
鏈接MQTTX客戶端
配置MQTT基礎信息,填寫ip地址,默認端口1883
如果連接成功,則會彈出已連接
添加訂閱主題
配置主題名Topic
讀取傳感器數據并發送到MQTT
編寫腳本代碼sensor_demo.py
1、配置MQTT :服務器地址 ,端口號,主題
2、傳感器引腳配置
3、讀取傳感器數據
4、向MQTT發送消息
import time
import board
import adafruit_dht
from datetime import datetime
import paho.mqtt.client as mqtt
import json# MQTT配置
broker = "192.168.137.31"
port = 1883
topic = "DHT11_message"# 傳感器引腳配置
SENSOR_PIN = board.D17 # 使用CircuitPython標準命名(物理引腳11)# SENSOR_PIN = board.GPIO17 # 替代:樹莓派專用GPIO命名# 初始化MQTT客戶端
client = mqtt.Client()
client.connect(broker, port)try:# 添加 use_pulseio=False 參數dhtDevice = adafruit_dht.DHT11(SENSOR_PIN, use_pulseio=False)print("傳感器初始化成功,開始數據采集......")while True:try:# 讀取溫濕度數據temperature = dhtDevice.temperaturehumidity = dhtDevice.humidity# 檢查有效讀數if temperature is None or humidity is None:raise RuntimeError("讀取到無效值")# 準備MQTT消息recDate = datetime.now().strftime('%Y-%m-%d %H:%M:%S')msg = json.dumps({'Tem': temperature,'Hum': humidity,'Date': recDate})# 發送消息result = client.publish(topic, msg)if result[0] == mqtt.MQTT_ERR_SUCCESS:print(f"? [{recDate}] 發送成功: {msg}")else:print(f"? 發送失敗,錯誤碼: {result[0]}")except RuntimeError as e:print(f"傳感器讀取錯誤: {e}")except Exception as e:print(f"意外錯誤: {e}")time.sleep(10.0) # 采集間隔10sexcept KeyboardInterrupt:print("程序被用戶終止")
except RuntimeError as e:print(f"初始化失敗: {e}")exit(1)
finally:# 清理資源if 'dhtDevice' in locals():dhtDevice.exit()client.disconnect()print("資源已釋放")
進入虛擬環境
樹莓派終端輸入命令:
source adafruit_env/bin/activate
運行腳本
控制臺輸入命令
python3 sensor_demo.py
控制臺可以看到數據采集OK
訂閱MQTT服務器的廣播獲取
打開MQTT客戶端,看到接收消息成功。