使用 YOLOv8 的 results.xyxy
結構,下面是一個完整的 MQTT 推送腳本,用于把識別到的目標(比如突涌水、水漬、障礙物等)發送到 Frigate 的 MQTT 接口。
? 前提假設
- YOLOv8 推理代碼已經運行并生成
results.xyxy
。 - 每一行是
[x1, y1, x2, y2, conf, class_id]
。 - 類別標簽由你自己定義,如
model.names[class_id]
。 - MQTT 服務運行在本地(或局域網),默認端口 1883。
- Frigate 配置正確,監聽
frigate/events
主題。
📜 完整 Python 腳本(YOLOv8 + MQTT 發事件)
import paho.mqtt.publish as publish
import uuid
import json
from ultralytics import YOLO
import cv2# ---------- 你可以自定義的變量 ----------
MQTT_HOST = '127.0.0.1' # Mosquitto服務器地址
MQTT_PORT = 1883 # MQTT端口
FRIGATE_TOPIC = 'frigate/events'
CAMERA_NAME = 'usb_cam' # Frigate中配置的攝像頭名
DETECTION_CONFIDENCE_THRESHOLD = 0.5
# ----------------------------------------# 加載你訓練好的YOLOv8模型
model = YOLO("best.pt") # 改成你的模型路徑# 讀取圖片/視頻幀(你可以用cv2.VideoCapture循環處理流)
frame = cv2.imread("test.jpg") # 或者循環處理幀# 推理
results = model(frame)# 遍歷檢測結果(默認只有一張圖)
for r in results:boxes = r.boxes.xyxy.cpu().numpy() # shape: (n, 4)scores = r.boxes.conf.cpu().numpy() # shape: (n,)classes = r.boxes.cls.cpu().numpy().astype(int)for i in range(len(boxes)):conf = scores[i]if conf < DETECTION_CONFIDENCE_THRESHOLD:continue # 忽略低置信度結果box = boxes[i]class_id = classes[i]label = model.names[class_id] # 類別名payload = {"type": "new","after": {"id": str(uuid.uuid4()),"camera": CAMERA_NAME,"label": label,"top_score": float(conf),"box": {"x_min": int(box[0]),"y_min": int(box[1]),"x_max": int(box[2]),"y_max": int(box[3])}}}# 發布到 MQTTpublish.single(topic=FRIGATE_TOPIC,payload=json.dumps(payload),hostname=MQTT_HOST,port=MQTT_PORT)print(f"已發送檢測事件: {label} ({conf:.2f})")
🧪 示例運行流程
- 替換你的模型路徑和攝像頭名稱(與你 Frigate 中的一致);
- 放一張測試圖片
test.jpg
; - 運行腳本,Frigate 界面就能看到事件了。
📌 可選增強
目標 | 實現方式 |
---|---|
處理實時攝像頭流 | cv2.VideoCapture() 讀取幀循環處理 |
支持多幀批量推送 | 放在循環中,設置時間間隔 |
顯示檢測圖像 | 用 cv2.rectangle() 可視化框 |
🚨 注意事項
- MQTT 消息中
camera
字段必須是 Frigate 配置文件中的某個攝像頭名。 box
坐標建議使用整數。- YOLO 推理較慢時注意控制幀率,不要每幀都發(可 1~2 秒一次)。
- 如果你在 Windows 下網絡受限,確保 Frigate、MQTT、這個腳本三者之間是能連通的(用
127.0.0.1
通常沒問題)。