需求背景
給一個客戶對接人臉識別的設備,最后需要通知服務端進行一些消息推送。
簡單例子
# 作者 陳老師
# https://v.iiar.cn
import json
import paho.mqtt.client as mqtt
import requests
from flask import Flask, requestapp = Flask(__name__)# MQTT配置
mq_broker = "127.0.0.1" # MQTT測試服務器
mq_port = 1883 # 非加密端口
zt = 'ddddd'def http_post(data):url = "http://127.0.0.1:6688/api/user/mqtt_msg"try:res = requests.post(url, json=data)print(res.text)except Exception as e:print(e)# MQTT連接回調函數
def on_connect(client, userdata, flags, rc):if rc == 0:client.subscribe(zt)print("MQTT連接成功")else:print(f"MQTT連接失敗,返回碼:{rc}")# 處理接收到的MQTT消息
def on_message(client, userdata, msg):mqtt_message = msg.payload.decode() # 解析MQTT消息print('收到消息', mqtt_message)http_post(json.loads(mqtt_message))# 連接MQTT服務器
mq = mqtt.Client()
mq.on_connect = on_connect
mq.on_message = on_message
mq.connect(mq_broker, mq_port, 60)
# 啟動MQTT客戶端線程
mq.loop_start()def send_mqtt_msg(text):mq.publish(zt, text)@app.route('/', methods=['POST'])
def index():data = request.jsonprint('給機器發消息', data)t = json.dumps(data)t = json.loads(t)send_mqtt_msg(t)return 'ok'# 調試模式運行
if __name__ == '__main__':# debug=True,app.run(port=6699, host='0.0.0.0')# 安裝依賴
# pip install -r requirements.txt# 導出依賴庫
# pip freeze > requirements.txt
所需安裝拓展
requirements.txt
blinker==1.9.0
certifi==2024.8.30
charset-normalizer==3.4.0
click==8.1.7
Flask==3.1.0
idna==3.10
importlib_metadata==8.5.0
itsdangerous==2.2.0
Jinja2==3.1.4
MarkupSafe==3.0.2
paho-mqtt==2.1.0
requests==2.32.3
urllib3==2.2.3
Werkzeug==3.1.3
zipp==3.21.0
實際應用例子
# 作者 陳老師
# https://v.iiar.cn
import json
from datetime import datetime
import paho.mqtt.client as mqtt
import requests
from flask import Flask, requestapp = Flask(__name__)# MQTT配置
mq_broker = "127.0.0.1" # MQTT測試服務器
mq_port = 1883 # 非加密端口
sb = '123456789' # 我的設備號
zt = f'fungxi_{sb}_downLink' # 發給設備
zt2 = f'fungxi_{sb}_upLink' # 設備上報def http_post(data): # 接收mqtt回調的apiurl = "http://127.0.0.1:6688/api/user/mqtt_msg"try:res = requests.post(url, json=data)print(res.text)except Exception as e:print(e)# MQTT連接回調函數
def on_connect(client, userdata, flags, rc):if rc == 0:client.subscribe(zt)client.subscribe(zt2)print("MQTT連接成功")else:print(f"MQTT連接失敗,返回碼:{rc}")# 處理接收到的MQTT消息
def on_message(client, userdata, msg):mqtt_message = msg.payload.decode() # 解析MQTT消息try:data = json.loads(mqtt_message)except json.JSONDecodeError:print("無效的JSON數據")returnif msg.topic == zt:if data['cmd'] == 'heartD':returnprint('發給設備', mqtt_message)elif msg.topic == zt2:if data['cmd'] == 'heartU':bot_heartbeat()returnelif data['cmd'] == 'strangerRecordU': # 陌生人returnelif data['cmd'] == 'verifiedRecordU': # 人臉庫的# http_post(['data'])check_successfully(data['data'])print('設備上報', mqtt_message)# 連接MQTT服務器
mq = mqtt.Client()
mq.on_connect = on_connect
mq.on_message = on_message
mq.connect(mq_broker, mq_port, 60)
# 啟動MQTT客戶端線程
mq.loop_start()def send_mqtt_msg(text):mq.publish(zt, text)# 機器返回心跳
def bot_heartbeat():r = {"cmd": "heartD","time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}t = json.dumps(r)send_mqtt_msg(t)def check_successfully(data):data = data[0]user_id = data['num'] # 用戶idname = data['name'] # 用戶名img = data['liveImageBase64'] # 掃臉圖片verifiedCode = data['verifiedCode'] # 識別狀態bot_time = data['time'] # 機器里識別時間personType = data['personType']if personType == '1' and verifiedCode == 0: # 通過passsend_data = {'user_id': user_id,'name': name,'img': img,'verifiedCode': verifiedCode,'bot_time': bot_time,'personType': personType}# print(bot_time)http_post(send_data)@app.route('/', methods=['POST'])
def index():data = request.jsonprint('給機器發消息', data)t = json.dumps(data)t = json.loads(t)send_mqtt_msg(t)return 'ok'# 調試模式運行
if __name__ == '__main__':# debug=True,app.run(port=6699, host='0.0.0.0')# 安裝依賴
# pip install -r requirements.txt# 導出依賴庫
# pip freeze > requirements.txt
詳細說明文檔
由chatGPT整理
中文解釋文檔
1. 項目簡介
該項目實現了一個基于 MQTT 協議和 Flask 框架的應用。它的功能主要包括:
- 接收來自設備的上報消息(如人臉識別結果、心跳等)。
- 向設備發送下行指令(如心跳響應等)。
- 通過HTTP請求將接收到的數據轉發到本地服務器進行處理。
該應用集成了 MQTT 客戶端庫 paho.mqtt.client
,用于與設備進行通信,并使用 Flask 框架提供一個HTTP接口,接收外部發送的消息并轉發給設備。
2. 主要功能
-
MQTT通信:
- 設備通過MQTT協議發送上報消息(如人臉識別結果、心跳等),服務器接收并進行處理。
- 服務器可以向設備發送下行消息(如心跳響應、命令等)。
-
HTTP接口:
- 服務器提供一個HTTP接口(
POST /
)接收外部請求,通過MQTT協議向設備發送消息。
- 服務器提供一個HTTP接口(
-
人臉識別處理:
- 設備通過MQTT上報人臉識別結果,服務器將識別結果轉發給本地API進行進一步的處理(如存儲、通知等)。
3. 主要模塊
3.1 MQTT配置
mq_broker = "127.0.0.1" # MQTT測試服務器
mq_port = 1883 # 非加密端口
sb = '123456789' # 我的設備號
zt = f'fungxi_{sb}_downLink' # 發給設備
zt2 = f'fungxi_{sb}_upLink' # 設備上報
- mq_broker:MQTT服務器的IP地址,用于連接設備。
- mq_port:MQTT服務器的端口號,使用非加密連接(默認為1883端口)。
- sb:設備的唯一序列號。
- zt:發送給設備的消息主題,用于控制設備或向設備發送指令。
- zt2:設備上報消息的主題,用于接收設備的上報數據(如識別結果)。
3.2 HTTP POST請求
def http_post(data):url = "http://127.0.0.1:6688/api/user/mqtt_msg" # 本地API接口地址try:res = requests.post(url, json=data) # 發送POST請求print(res.text) # 打印服務器響應內容except Exception as e:print(e) # 打印異常信息
- http_post(data):這個函數用于將數據通過HTTP POST請求發送到本地的API接口。一般用于將設備上報的數據轉發到其他服務進行處理。
3.3 MQTT消息處理
def on_connect(client, userdata, flags, rc):if rc == 0: # 連接成功client.subscribe(zt) # 訂閱設備下行消息client.subscribe(zt2) # 訂閱設備上行消息print("MQTT連接成功")else: # 連接失敗print(f"MQTT連接失敗,返回碼:{rc}")
- on_connect:當MQTT客戶端連接成功時,會訂閱兩個主題:
zt
(發送給設備)和zt2
(設備上報的消息)。 - on_message:處理接收到的MQTT消息,根據主題分發不同的處理邏輯。
3.4 處理接收到的MQTT消息
def on_message(client, userdata, msg):mqtt_message = msg.payload.decode() # 解析MQTT消息try:data = json.loads(mqtt_message) # 將消息轉為JSON格式except json.JSONDecodeError: # 如果無法解析為JSONprint("無效的JSON數據")returnif msg.topic == zt: # 如果是下行消息if data['cmd'] == 'heartD': # 如果是心跳消息,直接返回returnprint('發給設備', mqtt_message)elif msg.topic == zt2: # 如果是設備上報消息if data['cmd'] == 'heartU': # 如果是設備上報的心跳消息bot_heartbeat() # 發送機器心跳returnelif data['cmd'] == 'strangerRecordU': # 陌生人記錄returnelif data['cmd'] == 'verifiedRecordU': # 人臉識別記錄check_successfully(data['data']) # 檢查是否識別成功print('設備上報', mqtt_message)
- on_message:處理接收到的消息,并根據消息的主題和命令類型(如心跳、陌生人記錄、人臉識別記錄等)執行不同的操作。
3.5 發送MQTT消息
def send_mqtt_msg(text):mq.publish(zt, text) # 向設備下行主題發送消息
- send_mqtt_msg:將文本消息通過MQTT協議發送到設備,通常用于向設備發送指令或控制消息。
3.6 心跳響應
def bot_heartbeat():r = {"cmd": "heartD", # 命令類型:心跳"time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 當前時間}t = json.dumps(r) # 將數據轉換為JSON字符串send_mqtt_msg(t) # 發送心跳消息
- bot_heartbeat:當設備向服務器發送心跳時,服務器通過MQTT向設備響應心跳消息,表明設備仍然在線。
3.7 人臉識別結果處理
def check_successfully(data):data = data[0] # 獲取識別結果中的第一條數據user_id = data['num'] # 用戶IDname = data['name'] # 用戶名img = data['liveImageBase64'] # 掃臉圖片的Base64編碼verifiedCode = data['verifiedCode'] # 識別狀態(0為通過)bot_time = data['time'] # 機器識別的時間personType = data['personType'] # 人員類型(1為已登記用戶)if personType == '1' and verifiedCode == 0: # 如果是已登記用戶且識別通過pass # 這里可以處理已通過的情況# 構造發送數據send_data = {'user_id': user_id,'name': name,'img': img,'verifiedCode': verifiedCode,'bot_time': bot_time,'personType': personType}# 打印識別時間(可用于調試)# print(bot_time)# 發送數據到本地APIhttp_post(send_data)
- check_successfully:該函數處理設備上報的人臉識別數據,將其解析并將成功的識別結果轉發到本地API進行進一步處理。
3.8 Flask HTTP接口
@app.route('/', methods=['POST'])
def index():data = request.json # 獲取請求中的JSON數據print('給機器發消息', data) # 打印發送給機器的消息t = json.dumps(data) # 將數據轉換為JSON字符串t = json.loads(t) # 重新解析JSON(這里可以進行進一步的數據處理)send_mqtt_msg(t) # 發送消息給設備return 'ok' # 返回成功響應
- index:這是Flask應用提供的HTTP接口,接收外部的POST請求。請求中的數據會通過MQTT協議轉發給設備。
4. 運行說明
-
安裝依賴:
在項目根目錄下創建requirements.txt
文件,列出項目的依賴庫:pip install -r requirements.txt
-
啟動服務:
運行Flask應用:python app.py
服務器將會監聽在
6699
端口上。 -
導出依賴:
如果需要將當前環境的依賴導出為requirements.txt
文件,可以運行:pip freeze > requirements.txt
5. 總結
該項目主要通過MQTT協議與設備進行實時通信,通過Flask框架提供一個HTTP接口接收外部請求并轉發給設備。它主要應用于設備控制、實時數據處理和系統集成等場景。
延伸了解
這種基于 MQTT 協議和 Flask 框架的系統,通常會應用于需要實時設備控制、數據收集和狀態反饋的場景。以下是一些典型的應用場景:
1. 智能門禁系統
在智能門禁系統中,設備(如門禁機、門鎖等)可能需要通過人臉識別來判斷是否允許某人進入。設備通過MQTT協議將人臉識別結果(例如是否識別成功、人員身份等)上報到服務器,服務器根據識別結果進行處理,并反饋給設備是否允許打開門鎖。
- 設備角色:門禁設備、門鎖、門禁卡讀卡器、人臉識別攝像頭等。
- 系統需求:
- 設備上報用戶識別數據(如人臉識別結果、時間戳等)。
- 系統根據識別結果決定是否打開門鎖。
- 系統向設備發送狀態更新(如心跳包、設備狀態監控等)。
使用場景:
- 辦公樓、住宅小區、公共場所等的智能門禁控制。
- 企業考勤系統,自動記錄員工進出。
2. 智能監控與安防
在智能監控系統中,攝像頭、傳感器等設備可能會使用人臉識別、動作檢測等技術監控環境。當發現可疑人物或異常行為時,設備會通過MQTT協議將監控數據上傳至服務器,服務器進行處理(如報警、記錄日志等),并根據規則發送指令給設備(如開啟警報、錄像等)。
- 設備角色:攝像頭、傳感器、警報器、錄像設備等。
- 系統需求:
- 實時接收設備的監控數據或報警信號。
- 根據設備上報的結果,向設備發送指令(如觸發警報、錄像等)。
- 與外部系統(如安防公司、移動端APP等)集成,進行信息轉發或展示。
使用場景:
- 智能家居安防系統,監控家庭安全。
- 企業或公共場所的智能安防系統,防盜、監控等。
3. 智能家居系統
在智能家居場景中,設備之間的交互和控制通常通過MQTT協議實現。例如,用戶通過手機APP控制家里的智能燈泡、空調、門鎖等設備,設備將狀態更新上報到服務器,服務器再根據控制指令下發新的狀態更新給設備。
- 設備角色:智能燈泡、空調、窗簾、智能插座、門鎖等。
- 系統需求:
- 設備實時上報狀態(如溫度、濕度、是否開鎖、是否開燈等)。
- 用戶通過手機APP發送控制指令(如開關燈、調節空調溫度等)。
- 系統根據用戶指令向設備發送MQTT消息進行控制。
使用場景:
- 智能家居控制系統,用戶通過手機控制家居設備。
- 智能辦公環境,自動調節溫度、照明等。
4. 遠程醫療與健康監測
在醫療設備或健康監測系統中,傳感器(如心率監測儀、血壓計、體溫計等)通過MQTT將患者的健康數據實時上傳到服務器,服務器將數據存儲并進行分析,同時向設備發送指令進行實時干預(例如在心率異常時提醒設備報警)。
- 設備角色:健康監測設備、傳感器、智能手表等。
- 系統需求:
- 設備上報患者的實時健康數據(如心率、血壓、體溫等)。
- 系統根據數據分析,給設備發送指令(如報警、記錄日志等)。
- 數據存儲與遠程監控,醫生或護理人員可以實時查看患者狀態。
使用場景:
- 老年人健康監護,實時監測老年人的身體狀況。
- 慢性病患者的遠程健康管理。
5. 工業物聯網(IIoT)
在工業物聯網(IIoT)應用中,傳感器和設備需要實時監控生產線或設備的狀態,例如溫度、壓力、濕度、運行速度等數據。這些設備通過MQTT協議上傳狀態數據,服務器根據數據進行故障預警、生產調度等。
- 設備角色:工業傳感器、監控設備、生產機器、自動化設備等。
- 系統需求:
- 設備實時上報生產數據(如設備運行狀態、溫度、濕度等)。
- 系統根據設備數據進行狀態監控,發現異常時發出報警指令。
- 向設備發送控制指令(如調整溫度、改變生產參數等)。
使用場景:
- 工廠自動化,實時監控生產線設備的運行狀態。
- 智能制造,優化生產流程,減少故障率。
6. 智慧停車系統
在智慧停車系統中,停車場的入口、出口、車位傳感器等設備通過MQTT協議上傳實時的停車數據。例如,設備上報車輛進出情況、空閑車位數等,系統根據這些信息提供停車導航、收費結算等服務。
- 設備角色:停車場攝像頭、車位傳感器、停車收費機等。
- 系統需求:
- 設備上報實時停車數據(如車位是否空閑、車輛進出時間等)。
- 系統根據數據向用戶提供停車位信息、導航指引。
- 向設備發送指令進行車位管理或收費操作。
使用場景:
- 智慧停車場,實時監控停車位使用情況。
- 城市交通管理,優化停車資源利用。
7. 智能農業與環境監測
在智能農業和環境監測系統中,設備(如土壤濕度傳感器、氣象站、灌溉系統等)通過MQTT協議實時上傳環境數據,系統根據數據分析向設備發送指令(如啟動灌溉、調整溫室溫度等)。
- 設備角色:土壤濕度傳感器、溫度傳感器、氣象站、自動化灌溉系統等。
- 系統需求:
- 設備上報環境數據(如溫度、濕度、土壤濕度等)。
- 系統根據數據自動調整設備參數(如啟動灌溉、調節溫濕度等)。
- 數據存儲與遠程監控,農業管理人員可以隨時查看環境數據。
使用場景:
- 智能溫室,自動調節溫濕度,控制灌溉系統。
- 智能農場,精準農業,減少水資源浪費。
總結
這種需求主要出現在那些 需要實時監控、數據收集、設備控制與反饋的場景。利用MQTT協議和Flask框架實現的系統能夠高效地處理設備之間的消息傳遞和數據流轉。典型的應用場景包括:
- 智能門禁與考勤系統
- 智能安防與監控
- 智能家居控制
- 遠程醫療健康監測
- 工業物聯網(IIoT)
- 智慧停車與交通管理
- 智能農業與環境監測
這些場景都涉及到設備與服務器之間的高效通信、狀態反饋和遠程控制,且對于實時性、穩定性和可擴展性有較高的要求。