????????本文旨在實現無人機城市交通智慧巡檢中的一個模塊——無人機視頻實時推拉流以及識別流并在前端展示,同時,統計目標數量以及違停數量,生成結果評估,一并發送到前端展示。對于本文任何技術上的空缺,可在博主主頁前面博客尋找,有任何問題歡迎私信或評論區討論!!!
目錄
涉及技術棧
基本效果
存在的問題,亟需解決
代碼及粗略解釋
資源
涉及技術棧:
Django5+vue3+websocket+SRS+FFmpeg+RTMP+YOLOv8+AI模型+異步+多進程+flv.js+node.js
基本效果:
web端推拉流測試、抽幀識別計數,一鍵式生成巡檢報告
項目結構(Django):
├── DJI_yolo
│???├── __init__.py
│???├── __pycache__
│???├── asgi.py
│???├── settings.py
│???├── templates
│???├── urls.py
│???└── wsgi.py
├── app01
│???├── __init__.py
│???├── __pycache__
│???├── admin.py
│???├── apps.py
│???├── car_best.pt
│???├── consumers
│???│???├── __init__.py
│???│???├── __pycache__
│???│???├── detection_consumer.py
│???│???└── report_consumer.py
│???├── migrations
│???│???├── __init__.py
│???│???└── __pycache__
│???├── models.py
│???├── pictures
│???├── routings.py
│???├── services
│???│???├── __init__.py
│???│???├── __pycache__
│???│???└── detection_engine.py
│???├── stream_status.py
│???├── tests.py
│???├── utils.py
│???└── views.py
├── manage.py
├── media
│???├── 2025-06-02
│???│???├── 20時14分54秒-20時17分03秒
│???│???│???├── 關鍵幀圖片
│???│???│???├── 原視頻
│???│???│???└── 識別視頻
│???│???├── 20時26分11秒-20時29分00秒
│???│???│???├── 關鍵幀圖片
│???│???│???├── 原視頻
│???│???│???└── 識別視頻
前端代碼因保密不方便展示,可自行根據后端端口續寫,后端Django完整代碼在文章末尾,請查收!!!
存在的問題,亟需解決:
1、后端逐幀識別并返回前端,前端顯示卡頓,考慮跳幀識別,一秒識別一幀,剩余原始幀直接返回。
2、曾嘗試過使用 yolo 中的 model 類中的 track 方法進行跨幀追蹤,為每幀每個目標進行編號(id),提取幀中目標特征(速度、目標面積、唯位移等等),進行特征化工程,從而判斷相鄰兩識別幀中的某兩目標是否為同一目標。由于添加追蹤算法,單幀識別時間過長,單幀識別時間大于拉流幀傳輸時間,導致進程堵塞,程序崩潰。后續采取措施改進。
3、模型并未做違停檢測,考慮重新訓練模型,添加違停訓練集,重新訓練,順便搭配 yolo12 環境,用 yolo12 進行訓練。
4、Django 后端接受到流,并不能直接開始識別,而是先用四五秒時間加載模型,需要優化。
5、執行任務、完成任務到數據存儲、前端顯示巡檢報告,延遲較高,需要優化。
6、為后端添加車輛運動狀態算法,若連續識別的兩個幀同一目標靜止,并且識別為違停,則判斷為違停;若非靜止但識別違停,不做處理。相對靜止判斷?無人機在動,無法建立參考系。
7、進行車牌識別。
代碼及粗略解釋:
detection_consumer.py:
此處代碼使用 FFmpeg 拉取 SRS 上的流,并使用 detection_engine 中的類進行幀識別和計數,然后判斷計數,調用訊飛星火 API 生成 AI 答復,獲取關鍵幀,將所有信息封裝成一個字典,并傳給前端,以用于違停報告生成。同時,將實時識別幀通過 websocket 的消費者傳給前端顯示。
import asyncio
import base64
import os
import subprocess
import time
import traceback
from datetime import datetime
from pathlib import Path
from .report_consumer import ReportConsumer
import numpy as np
import cv2
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.layers import get_channel_layer
from app01.services.detection_engine import DetectionEngine, logger
from concurrent.futures import ThreadPoolExecutor
from app01.utils import rename_time_folder # 導入重命名函數
from app01.stream_status import stream_status, status_lock # 從新模塊導入
import requestsRTMP_URL = "rtmp://127.0.0.1:1935/live/stream"
channel_layer = get_channel_layer()class DetectionStreamConsumer(AsyncWebsocketConsumer):async def connect(self):await self.channel_layer.group_add("detection_group", self.channel_name)await self.accept()print("WebSocket 連接建立成功")async def disconnect(self, code):await self.channel_layer.group_discard("detection_group", self.channel_name)print("WebSocket 連接關閉")async def start_detection(self):print("開始檢測幀流...")await self.monitor_stream_status()async def monitor_stream_status(self):while True:if stream_status.is_streaming:logger.info("檢測到推流,開始處理...")await self.start_detection_stream()else:logger.info("當前無推流,等待中...")await asyncio.sleep(1)async def start_detection_stream(self, all_dict=None):ffmpeg_pull_command = ["ffmpeg","-c:v", "h264","-i", RTMP_URL,"-f", "rawvideo","-pix_fmt", "bgr24","-s", "640x360","-r", "5","-vcodec", "rawvideo","-an","-flags", "+low_delay","-tune", "zerolatency","-"]try:process = subprocess.Popen(ffmpeg_pull_command,stdout=subprocess.PIPE,stderr=subprocess.DEVNULL,bufsize=10 * 8)except Exception as e:logger.error(f"FFmpeg 子進程啟動失敗: {e}")returnif process.stdout is None:logger.error("FFmpeg stdout 為空")returnframe_width, frame_height = 640, 360frame_size = frame_width * frame_height * 3executor = ThreadPoolExecutor(max_workers=4) # 最多可以同時運行 4 個線程engine = DetectionEngine()frame_count = 0skip_interval = 1try:while stream_status.is_streaming:try:loop = asyncio.get_event_loop()raw_frame = await asyncio.wait_for(loop.run_in_executor(executor, process.stdout.read, frame_size),timeout=8 # 超時檢測斷流)except asyncio.TimeoutError:logger.warning("讀取幀超時,觸發流結束")with status_lock:stream_status.is_streaming = Falsebreakif len(raw_frame) != frame_size:continuetry:frame = np.frombuffer(raw_frame, dtype=np.uint8).reshape((frame_height, frame_width, 3))except Exception as e:logger.warning(f"幀解析失敗: {e}")continueframe_count += 1if frame_count % skip_interval != 0:continueresult = await loop.run_in_executor(executor, engine.process_frame, frame)# 增加空值檢查if result is None:logger.warning("檢測結果為空,跳過處理")continueprocessed_frame, detected_classes = result# 更新車輛統計(僅按類別)with DetectionEngine.counter_lock:for class_id in detected_classes:if class_id == 0:DetectionEngine.total_count['car'] += 1DetectionEngine.total_count['total'] += 1elif class_id == 1:DetectionEngine.total_count['bus'] += 1DetectionEngine.total_count['total'] += 1elif class_id == 2:DetectionEngine.total_count['truck'] += 1DetectionEngine.total_count['total'] += 1elif class_id == 3:DetectionEngine.total_count['van'] += 1DetectionEngine.total_count['total'] += 1_, jpeg = cv2.imencode('.jpg', processed_frame, [int(cv2.IMWRITE_JPEG_QUALITY), 50])if channel_layer:await channel_layer.group_send("detection_group",{"type": "send_frame", "frame": jpeg.tobytes()})except Exception as e:logger.error(f"檢測處理錯誤: {e}")logger.error(traceback.format_exc())finally:logger.warning("流結束,進入處理...")if process.returncode is None:process.kill()logger.warning("FFmpeg 進程已終止")# 延遲一小段時間,確保文件操作完全釋放time.sleep(1)stream_status.is_streaming = Falsestream_status.end_datetime = datetime.now() # 記錄結束時間logger.warning("正在獲取三個關鍵幀...")# 找3個關鍵幀,必須在重命名前執行folder = Path(f"{stream_status.image_path}")files = [f for f in folder.iterdir() if f.is_file()]if not files:return None, None, None# 按修改時間排序files.sort(key=lambda x: x.stat().st_mtime)first = files[0]last = files[-1]middle = files[len(files) // 2]# 轉換為Base64stream_status.image_li = [self.image_to_base64(str(f)) # 傳入字符串路徑,避免Path對象序列化問題for f in [first, middle, last]]logger.warning("正在重命名文件夾...")# 先復制 time_path,因為后續可能被修改time_path = stream_status.time_pathstart_datetime = stream_status.start_datetimeend_datetime = stream_status.end_datetimesave_path = ""# 調用重命名函數(使用完整 datetime)if time_path and start_datetime:path = rename_time_folder(time_path,start_datetime, # 傳入開始時間end_datetime # 傳入結束時間)save_path = pathlogger.warning(f"文件夾已重命名為 {start_datetime.strftime('%H時%M分%S秒')}-{end_datetime.strftime('%H時%M分%S秒')}")try:logger.warning("正在更新數據...")# 更新識別結果stats = self.correct_overcounted_stats(DetectionEngine.total_count)print("stats:", stats)stream_status.total_stats = statsprint("stream_status.total_stats:", stream_status.total_stats)self.save_vehicle_stats(stats, save_path)except Exception as e:logger.error(f"保存統計信息失敗: {e}")print("正在獲取AI答復...")self.AI_response()# 定義總數據,并序列化all_dict = {'datetime': stream_status.start_time.strftime("%Y-%m-%d %H:%M:%S.%f"), # 序列化,轉為字符串'image_li': [bs for bs in stream_status.image_li], # 關鍵幀轉二進制'detect_car': [_ for _ in stream_status.total_stats.values()],'Al_response': stream_status.AI_talk,}print("all_dict:", all_dict)print("正在發送信息...")await ReportConsumer.send_report_data(all_dict)executor.shutdown(wait=False)DetectionEngine.reset_stats()stream_status.reset_all_dict()def save_vehicle_stats(self, stats, save_path=None):# 如果未提供路徑,使用默認的 time_pathif not save_path:save_path = stream_status.time_pathif not save_path:logger.warning("無法保存統計信息:路徑為空")returnstats_path = os.path.join(save_path, 'vehicle_stats.txt')try:with open(stats_path, 'w', encoding='utf-8-sig') as f:f.write("車輛統計結果\n")f.write("-------------------------\n")for k, v in stats.items():f.write(f"{k}: {v}\n")f.write("-------------------------\n")logger.info(f"統計信息已保存至 {stats_path}")except Exception as e:logger.error(f"寫入統計信息失敗: {e}")def AI_response(self):"""獲取AI回復"""url = "https://spark-api-open.xf-yun.com/v1/chat/completions"headers = {"Authorization": "Bearer pDLDvpUbFDTEQZACQSjD:CqZewebAdgpuvxrVbbAv","Content-Type": "application/json",}content = (f'我現在要做一個無人機城市交通智慧巡檢報告,現在巡檢結果是:巡檢時間:{stream_status.start_time}'f'小汽車有{stream_status.total_stats["car"]}輛,面包車有{stream_status.total_stats["van"]}輛,'f'公交車有{stream_status.total_stats["bus"]}輛,卡車有{stream_status.total_stats["truck"]}輛,'f'識別到的違停車輛共有{stream_status.total_stats["illegally_car"]}輛。'f'幫我生成一段結論,要求不要有廢話,也不要寫具體那四個類別識別到的車輛數,字數200字,語言精煉嚴謹')question = {"role": "user","content": content}data = {"max_tokens": 4096,"top_k": 4,"messages": [question],"temperature": 0.5,"model": "4.0Ultra","stream": False,}response = requests.post(url, headers=headers, json=data)response.raise_for_status()result = response.json()# 提取模型輸出的內容reply = result.get('choices', [{}])[0].get('message', {}).get('content', '').strip()print(reply)stream_status.AI_talk = reply# 讀取圖片文件并轉換為 Base64@staticmethod # 靜態方法不應接收self參數def image_to_base64(image_path):with open(image_path, "rb") as img_file:base64_str = base64.b64encode(img_file.read()).decode("utf-8")return f"data:image/jpeg;base64,{base64_str}"@staticmethod # 去掉self第一個參數def correct_overcounted_stats(stats, avg_appearance=6):"""簡單修正嚴重重復統計的結果"""corrected = {}for cls in ['car', 'van', 'bus', 'truck']:if cls in stats:corrected[cls] = max(0, int(round(stats[cls] / avg_appearance)))# 可選:重新計算 totalcorrected['total'] = sum(corrected.values())corrected['illegally_car'] = int(corrected['total'] / 100)print(corrected)return correctedasync def send_frame(self, event):frame_bytes = event["frame"]await self.send(bytes_data=frame_bytes)
stream_status.py:
本文件用于 Django 項目全局狀態管理,相當于 vue3 中的 pinia ,?is_streaming? 是前端推本地視頻、后端上傳 SRS 后,變為 True?,后端因為 is_streaming? = True 而開始拉流,當超過 6 秒未拉到流,判斷流結束,is_streaming? = False。還定義了全局狀態鎖、狀態重置方法、時間獲取方法、文件夾名獲取方法、狀態實例。全局狀態鎖是為了防止多線程修改同一個共享變量,只有with status_lock:在 with
塊內,只有當前線程持有鎖時才能運行代碼,其他線程必須等待,相當于在此處把鎖”鎖“起來了。
import threading# 全局狀態鎖和狀態對象
status_lock = threading.Lock()
class StreamStatus:def __init__(self):self.is_streaming = False # 是否正在推流/檢測self.start_time = None # 推流開始時間(datetime對象)self.end_time = None # 推流結束時間(datetime對象)self.time_path = None # 當前時間文件夾路徑(如 Media/2025-06-01/12點)self.total_stats = { # 車輛統計"car": 0,"van": 0,"bus": 0,"truck": 0,"total": 0,"illegally_car": 0,}self.last_frame_time = None # 最后收到有效幀的時間(用于斷流檢測)self.image_path = Noneself.image_li = []self.AI_talk = Noneself.all_dict = {}@propertydef date_folder(self):"""獲取日期文件夾名稱(如 2025-06-01)"""if self.start_time:return self.start_time.strftime("%Y-%m-%d")return None@propertydef start_hour(self):"""獲取開始小時(如 12)"""if self.start_time:return self.start_time.hourreturn Nonedef reset(self):"""重置狀態(用于新任務開始)"""self.is_streaming = Falseself.start_time = Noneself.end_time = Noneself.time_path = Noneself.total_stats = {k: 0 for k in self.total_stats}self.last_frame_time = Nonedef reset_all_dict(self):self.last_frame_time = None # 最后收到有效幀的時間(用于斷流檢測)self.image_path = Noneself.image_li = []self.AI_talk = Noneself.all_dict = {}self.all_dict = {}
# 全局唯一的狀態實例
stream_status = StreamStatus()
detection_engine.py:
此處是 detection_consumer.py 使用的服務代碼塊,用于為其提供幀識別服務,consumer?拉到的每一個流都會調用該類中的?process_frame 類方法,并同時進行計數和關鍵幀保存。
import threading
import time
import cv2
from ultralytics import YOLO
import logging
import os
from app01.stream_status import stream_status, status_lock # 從新模塊導入logger = logging.getLogger(__name__)MODEL_PATH = "F:\\FullStack\\Django\\DJI_yolo\\app01\\car_best.pt"
model = YOLO(MODEL_PATH)# 關鍵幀存儲位置
pictures_dir = "pictures"
if not os.path.exists(pictures_dir):os.makedirs(pictures_dir)class DetectionEngine:frame_counter = 1counter_lock = threading.Lock() # 線程鎖,保證計數器安全# 新增:全局計數器和已統計 IDtotal_count = {'car': 0,'van': 0,'bus': 0,'truck': 0,"total": 0,"illegally_car": 0}# 下次任務重置@classmethoddef reset_stats(cls):with cls.counter_lock:for k in cls.total_count:cls.total_count[k] = 0# 下次任務重置@classmethoddef update_stats(cls, vehicle_type):with cls.counter_lock:if vehicle_type in cls.total_count:cls.total_count[vehicle_type] += 1cls.total_count["total"] += 1def __init__(self):self.model = model# 調整檢測參數,減少計算量self.detect_params = {'conf': 0.3, # 提高置信度閾值,減少無效檢測'iou': 0.5, # 調整 NMS 閾值'imgsz': [384, 640], # 輸入尺寸,降低分辨率'classes': [0, 1, 2, 3], # 僅檢測車輛類別(2: car, 5: bus, 7: truck)'device': 0, # 使用 GPU'half': True, # 使用 FP16 精度'show': False # 關閉實時顯示,減少開銷}def process_frame(self, frame):"""處理單幀圖像:YOLO推理 + 保存結果到關鍵幀圖片文件夾"""results = model(frame)detected_objects = []annotated_frame = frame.copy() # 復制原始幀以防止修改原圖# 防御性檢查:確保 results 非空且包含有效數據if not results or len(results) == 0:logger.warning("未獲取到檢測結果,跳過處理")return frame, []boxes = results[0].boxesif boxes is None or len(boxes) == 0:logger.warning("檢測結果為空,跳過處理")return frame, []try:# 假設類別信息存儲在 cls 屬性中classes = boxes.cls.int().cpu().tolist() if hasattr(boxes, 'cls') and boxes.cls is not None else []for class_id in classes:detected_objects.append(class_id)annotated_frame = results[0].plot() # 繪制檢測框except Exception as e:logger.error(f"解析檢測數據失敗: {e}")return annotated_frame, []with status_lock:car_count = sum(1 for cls in detected_objects if cls == 0)if car_count > 18:try:timestamp = int(time.time())save_path = os.path.join(stream_status.image_path, f"keyframe_{timestamp}.jpg")cv2.imwrite(save_path, annotated_frame)logger.info(f"關鍵幀保存成功: {save_path}")except Exception as e:logger.error(f"保存關鍵幀失敗: {e}")return annotated_frame, detected_objects
repoet_consumer.py:
用于將前面?detection_consumer.py 中的字典數據發送給前端,首先會將數據緩存到后端,等到前端消費者建立成功,立馬發送出去。
import asynciofrom channels.generic.websocket import AsyncWebsocketConsumer
from channels.layers import get_channel_layer
import json# 緩存隊列:用于保存尚未發送的數據
queue = []
# 記錄是否有消費者連接
has_consumer = Falseclass ReportConsumer(AsyncWebsocketConsumer):async def connect(self):global has_consumer# 加入指定組await self.channel_layer.group_add("report_group", self.channel_name)await self.accept()print("巡檢報告WebSocket 已連接")# 設置有消費者連接的標志has_consumer = True# 嘗試發送緩存數據await flush_queue(self.channel_layer)async def disconnect(self, close_code):global has_consumer# 移除組成員await self.channel_layer.group_discard("report_group", self.channel_name)print("巡檢報告WebSocket 斷開連接")# 重置消費者連接標志has_consumer = Falseasync def receive(self, text_data=None, bytes_data=None):passasync def send_report(self, event):"""處理 send_report 類型的消息并發送給客戶端"""data = event['data']print("【發送到客戶端】", data)# 轉換為JSON字符串發送try:# 處理可能包含的非JSON可序列化對象# 這里假設 image_li 包含 Path 對象,需要轉換為字符串if 'image_li' in data:data['image_li'] = [str(path) for path in data['image_li']]await self.send(text_data=json.dumps(data))except Exception as e:print(f"【發送到客戶端失敗】{e}")@classmethodasync def send_report_data(cls, data):"""供其他模塊調用的方法"""global queue, has_consumer# 不再檢查是否有人連接,直接發送或緩存,生產模式用Redischannel_layer = get_channel_layer()# 如果沒有消費者連接,將數據加入隊列if not has_consumer:queue.append(data)print(f"【數據已緩存】等待消費者連接: {data}")else:# 有消費者連接,直接發送await send_report(channel_layer, data)# ========== 全局函數 ==========
# 定期檢查并發送緩存數據的任務
async def queue_check_task():channel_layer = get_channel_layer()while True:await asyncio.sleep(1) # 每秒檢查一次if queue and has_consumer:await flush_queue(channel_layer)
# 在應用啟動時啟動隊列檢查任務
async def start_queue_check():asyncio.create_task(queue_check_task())
async def send_report(channel_layer, data):try:await channel_layer.group_send("report_group",{"type": "send_report","data": data,},)print("【發送成功】", data)except Exception as e:print(f"【發送失敗】{e}")async def flush_queue(channel_layer):global queueif not queue:returnitems = list(queue)queue.clear()for data in items:await send_report(channel_layer, data)
routings.py:
用于定義 websocket 的后端接收路徑,并連接消費者
from django.urls import re_path
from app01.consumers import detection_consumer, report_consumerwebsocket_urlpatterns = [re_path(r'^ws/detection/$', detection_consumer.DetectionStreamConsumer.as_asgi()),re_path(r'^ws/report/$', report_consumer.ReportConsumer.as_asgi()),
]
urls.py:
用于接收前端發送的 http 請求,并與 view.py 中的視圖函數建立聯系
"""
URL configuration for DJI_yolo project.The `urlpatterns` list routes URLs to views. For more information please see:https://docs.djangoproject.com/en/5.1/topics/http/urls/
Examples:
Function views1. Add an import: from my_app import views2. Add a URL to urlpatterns: path('', views.home, name='home')
Class-based views1. Add an import: from other_app.views import Home2. Add a URL to urlpatterns: path('', Home.as_view(), name='home')
Including another URLconf1. Import the include() function: from django.urls import include, path2. Add a URL to urlpatqterns: path('blog/', include('blog.urls'))
"""
from django.conf.urls.static import static
from django.contrib import admin
from django.urls import pathfrom DJI_yolo import settings
from app01 import views
urlpatterns = [path("admin/", admin.site.urls),path('csrf/', views.get_csrf_token, name='get_csrf_token'),# 推流測試path('push_stream/', views.push_stream, name='push_stream'),# 媒體庫管理path('api4/date-folders/', views.get_date_folders, name='date-folders'),path('api4/time-folders/<str:date>/', views.get_time_folders, name='time-folders'),path('api4/files/<str:date>/<str:time_range>/<str:category>/count/', views.get_file_count, name='file-count'),path('api4/files/<str:date>/<str:time_range>/<str:category>/', views.get_files, name='files'),path('api4/delete-file/<str:date>/<str:time_range>/<str:category>/<str:filename>/', views.delete_file,name='delete-file'),path('delete-file/<str:date>/<str:time_range>/<str:category>/<str:filename>', views.delete_file,name='delete-file'), # 添加一個用于刪除文件的 URL 路徑] + static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT)
views.py:
用于處理前端請求,這里報告推流、媒體管理系統、原始視頻存儲邏輯等等。
# views.py
from datetime import datetime
from pathlib import Path
import urllib.parse
from django.middleware.csrf import get_token
import os
import subprocess
import threading
from django.http import JsonResponse
from django.conf import settings
import asyncio
from .consumers.detection_consumer import DetectionStreamConsumer
from .utils import create_date_folder, create_time_folder
from .stream_status import stream_statusMEDIA_ROOT = Path(settings.MEDIA_ROOT)def get_csrf_token(request):token = get_token(request)print(token)return JsonResponse({'csrfToken': token})def push_stream(request):print("前端返回本地視頻,準備推流")video_file = request.FILES.get('video')if not video_file:return JsonResponse({'error': '未上傳視頻文件'}, status=400)# ------------------- 創建精確時間文件夾 -------------------start_datetime = datetime.now() # 記錄精確到秒的開始時間date_str = start_datetime.strftime('%Y-%m-%d')date_path = create_date_folder(date_str)stream_status.start_time = datetime.now()# 創建帶時分秒的文件夾(如 15:30:45)time_path, time_folder = create_time_folder(date_path, start_datetime)# 創建三類子文件夾original_path = os.path.join(time_path, '原視頻')recognized_path = os.path.join(time_path, '識別視頻')image_path = os.path.join(time_path, '關鍵幀圖片')stream_status.image_path = image_path # 保存到全局狀態for folder in [original_path, recognized_path, image_path]:os.makedirs(folder, exist_ok=True)# ------------------- 保存上傳視頻 -------------------original_video_name = f"original_{start_datetime.strftime('%H%M%S')}.mp4"original_video_path = os.path.join(original_path, original_video_name)with open(original_video_path, 'wb') as f:for chunk in video_file.chunks():f.write(chunk)# ------------------- 推流配置 -------------------push_url = "rtmp://127.0.0.1:1935/live/stream"recognized_video_name = f"recognized_{start_datetime.strftime('%H%M%S')}.mp4"recognized_video_path = os.path.join(recognized_path, recognized_video_name)command = ["ffmpeg","-re","-i", original_video_path,"-c:v", "libx264","-preset", "ultrafast","-tune", "zerolatency","-g", "50","-r", "30","-an","-f", "flv",push_url,"-f", "mp4",recognized_video_path]try:process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)# 保存完整時間狀態stream_status.is_streaming = Truestream_status.start_datetime = start_datetime # 保存完整 datetimestream_status.time_path = time_pathstream_status.recognized_path = recognized_pathstream_status.image_path = image_pathdetection_thread = threading.Thread(target=run_detection_in_background,daemon=True)detection_thread.start()return JsonResponse({'status': 'success','date_folder': date_str,'time_folder': time_folder # 格式如 15:30:45})except Exception as e:return JsonResponse({'error': str(e)}, status=500)def run_detection_in_background():try:loop = asyncio.new_event_loop()asyncio.set_event_loop(loop)loop.run_until_complete(DetectionStreamConsumer().start_detection())except Exception as e:print(f"后臺檢測線程異常: {e}")def get_date_folders(request):page = int(request.GET.get('page', 1))page_size = int(request.GET.get('page_size', 10))MEDIA_ROOT_PATH = Path(r'F:\FullStack\Django\DJI_yolo\media')date_folders = []for name in os.listdir(MEDIA_ROOT_PATH):folder_path = MEDIA_ROOT_PATH / nameif folder_path.is_dir() and len(name.split('-')) == 3:date_folders.append({'date': name})date_folders.sort(key=lambda x: x['date'], reverse=True)start = (page - 1) * page_sizeend = start + page_sizereturn JsonResponse({'data': date_folders[start:end],'total': len(date_folders)})def get_time_folders(request, date):date_path = Path(r'F:\FullStack\Django\DJI_yolo\media') / dateif not date_path.is_dir():return JsonResponse({'error': '日期文件夾不存在'}, status=404)page = int(request.GET.get('page', 1))page_size = int(request.GET.get('page_size', 10))time_folders = []for name in os.listdir(date_path):time_path = date_path / nameif time_path.is_dir():original_count = len(os.listdir(time_path / '原視頻')) if (time_path / '原視頻').is_dir() else 0recognized_count = len(os.listdir(time_path / '識別視頻')) if (time_path / '識別視頻').is_dir() else 0image_count = len(os.listdir(time_path / '關鍵幀圖片')) if (time_path / '關鍵幀圖片').is_dir() else 0time_folders.append({'time_range': name, # 直接使用時間區間名稱'original_count': original_count,'recognized_count': recognized_count,'image_count': image_count})time_folders.sort(key=lambda x: x['time_range'], reverse=True)start = (page - 1) * page_sizeend = start + page_sizereturn JsonResponse({'data': time_folders[start:end],'total': len(time_folders)})def get_file_count(request, date, time_range, category):category_path = Path(r'F:\FullStack\Django\DJI_yolo\media') / date / time_range / categoryif not category_path.is_dir():return JsonResponse({'count': 0})count = len(os.listdir(category_path))return JsonResponse({'count': count})def get_files(request, date, time_range, category):try:decoded_time_range = urllib.parse.unquote(time_range)category_path = Path(r'F:\FullStack\Django\DJI_yolo\media') / date / decoded_time_range / categoryif not category_path.is_dir():return JsonResponse({'error': '文件類別不存在'}, status=404)page = int(request.GET.get('page', 1))page_size = int(request.GET.get('page_size', 10))files = []for filename in os.listdir(category_path):file_path = category_path / filenameif file_path.is_file():encoded_time_range = urllib.parse.quote(decoded_time_range)files.append({'name': filename,'url': f'http://{request.get_host()}/media/{date}/{encoded_time_range}/{category}/{filename}'})start = (page - 1) * page_sizeend = start + page_sizereturn JsonResponse({'data': files[start:end],'total': len(files),'status': 'success'})except Exception as e:print(f"文件列表錯誤: {str(e)}")return JsonResponse({'error': '服務器內部錯誤','detail': str(e)}, status=500)def delete_file(request, date, time_range, category, filename):try:decoded_time_range = urllib.parse.unquote(time_range)file_path = Path(r'F:\FullStack\Django\DJI_yolo\media') / date / decoded_time_range / category / filenameif not file_path.exists():return JsonResponse({'error': '文件不存在'}, status=404)os.remove(file_path)return JsonResponse({'status': 'success'})except Exception as e:return JsonResponse({'error': str(e)}, status=500)
感謝您的觀看!
資源如下:
https://download.csdn.net/download/2403_83182682/90961392?spm=1001.2014.3001.5501https://download.csdn.net/download/2403_83182682/90961392?spm=1001.2014.3001.5501