web端rtmp推拉流測試、抽幀識別計數,一鍵式生成巡檢報告

????????本文旨在實現無人機城市交通智慧巡檢中的一個模塊——無人機視頻實時推拉流以及識別流并在前端展示,同時,統計目標數量以及違停數量,生成結果評估,一并發送到前端展示。對于本文任何技術上的空缺,可在博主主頁前面博客尋找,有任何問題歡迎私信或評論區討論!!!

目錄

涉及技術棧

基本效果

存在的問題,亟需解決

代碼及粗略解釋

資源


涉及技術棧:

Django5+vue3+websocket+SRS+FFmpeg+RTMP+YOLOv8+AI模型+異步+多進程+flv.js+node.js

基本效果:

web端推拉流測試、抽幀識別計數,一鍵式生成巡檢報告

項目結構(Django):
├── DJI_yolo
│???├── __init__.py
│???├── __pycache__
│???├── asgi.py
│???├── settings.py
│???├── templates
│???├── urls.py
│???└── wsgi.py
├── app01
│???├── __init__.py
│???├── __pycache__
│???├── admin.py
│???├── apps.py
│???├── car_best.pt
│???├── consumers
│???│???├── __init__.py
│???│???├── __pycache__
│???│???├── detection_consumer.py
│???│???└── report_consumer.py
│???├── migrations
│???│???├── __init__.py
│???│???└── __pycache__
│???├── models.py
│???├── pictures
│???├── routings.py
│???├── services
│???│???├── __init__.py
│???│???├── __pycache__
│???│???└── detection_engine.py
│???├── stream_status.py
│???├── tests.py
│???├── utils.py
│???└── views.py
├── manage.py
├── media
│???├── 2025-06-02
│???│???├── 20時14分54秒-20時17分03秒
│???│???│???├── 關鍵幀圖片
│???│???│???├── 原視頻
│???│???│???└── 識別視頻
│???│???├── 20時26分11秒-20時29分00秒
│???│???│???├── 關鍵幀圖片
│???│???│???├── 原視頻
│???│???│???└── 識別視頻

前端代碼因保密不方便展示,可自行根據后端端口續寫,后端Django完整代碼在文章末尾,請查收!!!

存在的問題,亟需解決:

1、后端逐幀識別并返回前端,前端顯示卡頓,考慮跳幀識別,一秒識別一幀,剩余原始幀直接返回。

2、曾嘗試過使用 yolo 中的 model 類中的 track 方法進行跨幀追蹤,為每幀每個目標進行編號(id),提取幀中目標特征(速度、目標面積、唯位移等等),進行特征化工程,從而判斷相鄰兩識別幀中的某兩目標是否為同一目標。由于添加追蹤算法,單幀識別時間過長,單幀識別時間大于拉流幀傳輸時間,導致進程堵塞,程序崩潰。后續采取措施改進。

3、模型并未做違停檢測,考慮重新訓練模型,添加違停訓練集,重新訓練,順便搭配 yolo12 環境,用 yolo12 進行訓練。

4、Django 后端接受到流,并不能直接開始識別,而是先用四五秒時間加載模型,需要優化。

5、執行任務、完成任務到數據存儲、前端顯示巡檢報告,延遲較高,需要優化。

6、為后端添加車輛運動狀態算法,若連續識別的兩個幀同一目標靜止,并且識別為違停,則判斷為違停;若非靜止但識別違停,不做處理。相對靜止判斷?無人機在動,無法建立參考系。

7、進行車牌識別。

代碼及粗略解釋:

detection_consumer.py:

此處代碼使用 FFmpeg 拉取 SRS 上的流,并使用 detection_engine 中的類進行幀識別和計數,然后判斷計數,調用訊飛星火 API 生成 AI 答復,獲取關鍵幀,將所有信息封裝成一個字典,并傳給前端,以用于違停報告生成。同時,將實時識別幀通過 websocket 的消費者傳給前端顯示。

import asyncio
import base64
import os
import subprocess
import time
import traceback
from datetime import datetime
from pathlib import Path
from .report_consumer import ReportConsumer
import numpy as np
import cv2
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.layers import get_channel_layer
from app01.services.detection_engine import DetectionEngine, logger
from concurrent.futures import ThreadPoolExecutor
from app01.utils import rename_time_folder  # 導入重命名函數
from app01.stream_status import stream_status, status_lock  # 從新模塊導入
import requestsRTMP_URL = "rtmp://127.0.0.1:1935/live/stream"
channel_layer = get_channel_layer()class DetectionStreamConsumer(AsyncWebsocketConsumer):async def connect(self):await self.channel_layer.group_add("detection_group", self.channel_name)await self.accept()print("WebSocket 連接建立成功")async def disconnect(self, code):await self.channel_layer.group_discard("detection_group", self.channel_name)print("WebSocket 連接關閉")async def start_detection(self):print("開始檢測幀流...")await self.monitor_stream_status()async def monitor_stream_status(self):while True:if stream_status.is_streaming:logger.info("檢測到推流,開始處理...")await self.start_detection_stream()else:logger.info("當前無推流,等待中...")await asyncio.sleep(1)async def start_detection_stream(self, all_dict=None):ffmpeg_pull_command = ["ffmpeg","-c:v", "h264","-i", RTMP_URL,"-f", "rawvideo","-pix_fmt", "bgr24","-s", "640x360","-r", "5","-vcodec", "rawvideo","-an","-flags", "+low_delay","-tune", "zerolatency","-"]try:process = subprocess.Popen(ffmpeg_pull_command,stdout=subprocess.PIPE,stderr=subprocess.DEVNULL,bufsize=10 * 8)except Exception as e:logger.error(f"FFmpeg 子進程啟動失敗: {e}")returnif process.stdout is None:logger.error("FFmpeg stdout 為空")returnframe_width, frame_height = 640, 360frame_size = frame_width * frame_height * 3executor = ThreadPoolExecutor(max_workers=4)  # 最多可以同時運行 4 個線程engine = DetectionEngine()frame_count = 0skip_interval = 1try:while stream_status.is_streaming:try:loop = asyncio.get_event_loop()raw_frame = await asyncio.wait_for(loop.run_in_executor(executor, process.stdout.read, frame_size),timeout=8  # 超時檢測斷流)except asyncio.TimeoutError:logger.warning("讀取幀超時,觸發流結束")with status_lock:stream_status.is_streaming = Falsebreakif len(raw_frame) != frame_size:continuetry:frame = np.frombuffer(raw_frame, dtype=np.uint8).reshape((frame_height, frame_width, 3))except Exception as e:logger.warning(f"幀解析失敗: {e}")continueframe_count += 1if frame_count % skip_interval != 0:continueresult = await loop.run_in_executor(executor, engine.process_frame, frame)# 增加空值檢查if result is None:logger.warning("檢測結果為空,跳過處理")continueprocessed_frame, detected_classes = result# 更新車輛統計(僅按類別)with DetectionEngine.counter_lock:for class_id in detected_classes:if class_id == 0:DetectionEngine.total_count['car'] += 1DetectionEngine.total_count['total'] += 1elif class_id == 1:DetectionEngine.total_count['bus'] += 1DetectionEngine.total_count['total'] += 1elif class_id == 2:DetectionEngine.total_count['truck'] += 1DetectionEngine.total_count['total'] += 1elif class_id == 3:DetectionEngine.total_count['van'] += 1DetectionEngine.total_count['total'] += 1_, jpeg = cv2.imencode('.jpg', processed_frame, [int(cv2.IMWRITE_JPEG_QUALITY), 50])if channel_layer:await channel_layer.group_send("detection_group",{"type": "send_frame", "frame": jpeg.tobytes()})except Exception as e:logger.error(f"檢測處理錯誤: {e}")logger.error(traceback.format_exc())finally:logger.warning("流結束,進入處理...")if process.returncode is None:process.kill()logger.warning("FFmpeg 進程已終止")# 延遲一小段時間,確保文件操作完全釋放time.sleep(1)stream_status.is_streaming = Falsestream_status.end_datetime = datetime.now()  # 記錄結束時間logger.warning("正在獲取三個關鍵幀...")# 找3個關鍵幀,必須在重命名前執行folder = Path(f"{stream_status.image_path}")files = [f for f in folder.iterdir() if f.is_file()]if not files:return None, None, None# 按修改時間排序files.sort(key=lambda x: x.stat().st_mtime)first = files[0]last = files[-1]middle = files[len(files) // 2]# 轉換為Base64stream_status.image_li = [self.image_to_base64(str(f))  # 傳入字符串路徑,避免Path對象序列化問題for f in [first, middle, last]]logger.warning("正在重命名文件夾...")# 先復制 time_path,因為后續可能被修改time_path = stream_status.time_pathstart_datetime = stream_status.start_datetimeend_datetime = stream_status.end_datetimesave_path = ""# 調用重命名函數(使用完整 datetime)if time_path and start_datetime:path = rename_time_folder(time_path,start_datetime,  # 傳入開始時間end_datetime  # 傳入結束時間)save_path = pathlogger.warning(f"文件夾已重命名為 {start_datetime.strftime('%H時%M分%S秒')}-{end_datetime.strftime('%H時%M分%S秒')}")try:logger.warning("正在更新數據...")# 更新識別結果stats = self.correct_overcounted_stats(DetectionEngine.total_count)print("stats:", stats)stream_status.total_stats = statsprint("stream_status.total_stats:", stream_status.total_stats)self.save_vehicle_stats(stats, save_path)except Exception as e:logger.error(f"保存統計信息失敗: {e}")print("正在獲取AI答復...")self.AI_response()# 定義總數據,并序列化all_dict = {'datetime': stream_status.start_time.strftime("%Y-%m-%d %H:%M:%S.%f"),  # 序列化,轉為字符串'image_li': [bs for bs in stream_status.image_li],  # 關鍵幀轉二進制'detect_car': [_ for _ in stream_status.total_stats.values()],'Al_response': stream_status.AI_talk,}print("all_dict:", all_dict)print("正在發送信息...")await ReportConsumer.send_report_data(all_dict)executor.shutdown(wait=False)DetectionEngine.reset_stats()stream_status.reset_all_dict()def save_vehicle_stats(self, stats, save_path=None):# 如果未提供路徑,使用默認的 time_pathif not save_path:save_path = stream_status.time_pathif not save_path:logger.warning("無法保存統計信息:路徑為空")returnstats_path = os.path.join(save_path, 'vehicle_stats.txt')try:with open(stats_path, 'w', encoding='utf-8-sig') as f:f.write("車輛統計結果\n")f.write("-------------------------\n")for k, v in stats.items():f.write(f"{k}: {v}\n")f.write("-------------------------\n")logger.info(f"統計信息已保存至 {stats_path}")except Exception as e:logger.error(f"寫入統計信息失敗: {e}")def AI_response(self):"""獲取AI回復"""url = "https://spark-api-open.xf-yun.com/v1/chat/completions"headers = {"Authorization": "Bearer pDLDvpUbFDTEQZACQSjD:CqZewebAdgpuvxrVbbAv","Content-Type": "application/json",}content = (f'我現在要做一個無人機城市交通智慧巡檢報告,現在巡檢結果是:巡檢時間:{stream_status.start_time}'f'小汽車有{stream_status.total_stats["car"]}輛,面包車有{stream_status.total_stats["van"]}輛,'f'公交車有{stream_status.total_stats["bus"]}輛,卡車有{stream_status.total_stats["truck"]}輛,'f'識別到的違停車輛共有{stream_status.total_stats["illegally_car"]}輛。'f'幫我生成一段結論,要求不要有廢話,也不要寫具體那四個類別識別到的車輛數,字數200字,語言精煉嚴謹')question = {"role": "user","content": content}data = {"max_tokens": 4096,"top_k": 4,"messages": [question],"temperature": 0.5,"model": "4.0Ultra","stream": False,}response = requests.post(url, headers=headers, json=data)response.raise_for_status()result = response.json()# 提取模型輸出的內容reply = result.get('choices', [{}])[0].get('message', {}).get('content', '').strip()print(reply)stream_status.AI_talk = reply# 讀取圖片文件并轉換為 Base64@staticmethod  # 靜態方法不應接收self參數def image_to_base64(image_path):with open(image_path, "rb") as img_file:base64_str = base64.b64encode(img_file.read()).decode("utf-8")return f"data:image/jpeg;base64,{base64_str}"@staticmethod  # 去掉self第一個參數def correct_overcounted_stats(stats, avg_appearance=6):"""簡單修正嚴重重復統計的結果"""corrected = {}for cls in ['car', 'van', 'bus', 'truck']:if cls in stats:corrected[cls] = max(0, int(round(stats[cls] / avg_appearance)))# 可選:重新計算 totalcorrected['total'] = sum(corrected.values())corrected['illegally_car'] = int(corrected['total'] / 100)print(corrected)return correctedasync def send_frame(self, event):frame_bytes = event["frame"]await self.send(bytes_data=frame_bytes)

stream_status.py:

本文件用于 Django 項目全局狀態管理,相當于 vue3 中的 pinia ,?is_streaming? 是前端推本地視頻、后端上傳 SRS 后,變為 True?,后端因為 is_streaming? = True 而開始拉流,當超過 6 秒未拉到流,判斷流結束,is_streaming? = False。還定義了全局狀態鎖、狀態重置方法、時間獲取方法、文件夾名獲取方法、狀態實例。全局狀態鎖是為了防止多線程修改同一個共享變量,只有with status_lock:with 塊內,只有當前線程持有鎖時才能運行代碼,其他線程必須等待,相當于在此處把鎖”鎖“起來了。

import threading# 全局狀態鎖和狀態對象
status_lock = threading.Lock()
class StreamStatus:def __init__(self):self.is_streaming = False  # 是否正在推流/檢測self.start_time = None      # 推流開始時間(datetime對象)self.end_time = None        # 推流結束時間(datetime對象)self.time_path = None       # 當前時間文件夾路徑(如 Media/2025-06-01/12點)self.total_stats = {        # 車輛統計"car": 0,"van": 0,"bus": 0,"truck": 0,"total": 0,"illegally_car": 0,}self.last_frame_time = None # 最后收到有效幀的時間(用于斷流檢測)self.image_path = Noneself.image_li = []self.AI_talk = Noneself.all_dict = {}@propertydef date_folder(self):"""獲取日期文件夾名稱(如 2025-06-01)"""if self.start_time:return self.start_time.strftime("%Y-%m-%d")return None@propertydef start_hour(self):"""獲取開始小時(如 12)"""if self.start_time:return self.start_time.hourreturn Nonedef reset(self):"""重置狀態(用于新任務開始)"""self.is_streaming = Falseself.start_time = Noneself.end_time = Noneself.time_path = Noneself.total_stats = {k: 0 for k in self.total_stats}self.last_frame_time = Nonedef reset_all_dict(self):self.last_frame_time = None  # 最后收到有效幀的時間(用于斷流檢測)self.image_path = Noneself.image_li = []self.AI_talk = Noneself.all_dict = {}self.all_dict = {}
# 全局唯一的狀態實例
stream_status = StreamStatus()

detection_engine.py:

此處是 detection_consumer.py 使用的服務代碼塊,用于為其提供幀識別服務,consumer?拉到的每一個流都會調用該類中的?process_frame 類方法,并同時進行計數和關鍵幀保存。

import threading
import time
import cv2
from ultralytics import YOLO
import logging
import os
from app01.stream_status import stream_status, status_lock  # 從新模塊導入logger = logging.getLogger(__name__)MODEL_PATH = "F:\\FullStack\\Django\\DJI_yolo\\app01\\car_best.pt"
model = YOLO(MODEL_PATH)# 關鍵幀存儲位置
pictures_dir = "pictures"
if not os.path.exists(pictures_dir):os.makedirs(pictures_dir)class DetectionEngine:frame_counter = 1counter_lock = threading.Lock()  # 線程鎖,保證計數器安全# 新增:全局計數器和已統計 IDtotal_count = {'car': 0,'van': 0,'bus': 0,'truck': 0,"total": 0,"illegally_car": 0}# 下次任務重置@classmethoddef reset_stats(cls):with cls.counter_lock:for k in cls.total_count:cls.total_count[k] = 0# 下次任務重置@classmethoddef update_stats(cls, vehicle_type):with cls.counter_lock:if vehicle_type in cls.total_count:cls.total_count[vehicle_type] += 1cls.total_count["total"] += 1def __init__(self):self.model = model# 調整檢測參數,減少計算量self.detect_params = {'conf': 0.3,       # 提高置信度閾值,減少無效檢測'iou': 0.5,        # 調整 NMS 閾值'imgsz': [384, 640],  # 輸入尺寸,降低分辨率'classes': [0, 1, 2, 3],  # 僅檢測車輛類別(2: car, 5: bus, 7: truck)'device': 0,  # 使用 GPU'half': True,  # 使用 FP16 精度'show': False  # 關閉實時顯示,減少開銷}def process_frame(self, frame):"""處理單幀圖像:YOLO推理 + 保存結果到關鍵幀圖片文件夾"""results = model(frame)detected_objects = []annotated_frame = frame.copy()  # 復制原始幀以防止修改原圖# 防御性檢查:確保 results 非空且包含有效數據if not results or len(results) == 0:logger.warning("未獲取到檢測結果,跳過處理")return frame, []boxes = results[0].boxesif boxes is None or len(boxes) == 0:logger.warning("檢測結果為空,跳過處理")return frame, []try:# 假設類別信息存儲在 cls 屬性中classes = boxes.cls.int().cpu().tolist() if hasattr(boxes, 'cls') and boxes.cls is not None else []for class_id in classes:detected_objects.append(class_id)annotated_frame = results[0].plot()  # 繪制檢測框except Exception as e:logger.error(f"解析檢測數據失敗: {e}")return annotated_frame, []with status_lock:car_count = sum(1 for cls in detected_objects if cls == 0)if car_count > 18:try:timestamp = int(time.time())save_path = os.path.join(stream_status.image_path, f"keyframe_{timestamp}.jpg")cv2.imwrite(save_path, annotated_frame)logger.info(f"關鍵幀保存成功: {save_path}")except Exception as e:logger.error(f"保存關鍵幀失敗: {e}")return annotated_frame, detected_objects

repoet_consumer.py:

用于將前面?detection_consumer.py 中的字典數據發送給前端,首先會將數據緩存到后端,等到前端消費者建立成功,立馬發送出去。

import asynciofrom channels.generic.websocket import AsyncWebsocketConsumer
from channels.layers import get_channel_layer
import json# 緩存隊列:用于保存尚未發送的數據
queue = []
# 記錄是否有消費者連接
has_consumer = Falseclass ReportConsumer(AsyncWebsocketConsumer):async def connect(self):global has_consumer# 加入指定組await self.channel_layer.group_add("report_group", self.channel_name)await self.accept()print("巡檢報告WebSocket 已連接")# 設置有消費者連接的標志has_consumer = True# 嘗試發送緩存數據await flush_queue(self.channel_layer)async def disconnect(self, close_code):global has_consumer# 移除組成員await self.channel_layer.group_discard("report_group", self.channel_name)print("巡檢報告WebSocket 斷開連接")# 重置消費者連接標志has_consumer = Falseasync def receive(self, text_data=None, bytes_data=None):passasync def send_report(self, event):"""處理 send_report 類型的消息并發送給客戶端"""data = event['data']print("【發送到客戶端】", data)# 轉換為JSON字符串發送try:# 處理可能包含的非JSON可序列化對象# 這里假設 image_li 包含 Path 對象,需要轉換為字符串if 'image_li' in data:data['image_li'] = [str(path) for path in data['image_li']]await self.send(text_data=json.dumps(data))except Exception as e:print(f"【發送到客戶端失敗】{e}")@classmethodasync def send_report_data(cls, data):"""供其他模塊調用的方法"""global queue, has_consumer# 不再檢查是否有人連接,直接發送或緩存,生產模式用Redischannel_layer = get_channel_layer()# 如果沒有消費者連接,將數據加入隊列if not has_consumer:queue.append(data)print(f"【數據已緩存】等待消費者連接: {data}")else:# 有消費者連接,直接發送await send_report(channel_layer, data)# ========== 全局函數 ==========
# 定期檢查并發送緩存數據的任務
async def queue_check_task():channel_layer = get_channel_layer()while True:await asyncio.sleep(1)  # 每秒檢查一次if queue and has_consumer:await flush_queue(channel_layer)
# 在應用啟動時啟動隊列檢查任務
async def start_queue_check():asyncio.create_task(queue_check_task())
async def send_report(channel_layer, data):try:await channel_layer.group_send("report_group",{"type": "send_report","data": data,},)print("【發送成功】", data)except Exception as e:print(f"【發送失敗】{e}")async def flush_queue(channel_layer):global queueif not queue:returnitems = list(queue)queue.clear()for data in items:await send_report(channel_layer, data)

routings.py:

用于定義 websocket 的后端接收路徑,并連接消費者

from django.urls import re_path
from app01.consumers import detection_consumer, report_consumerwebsocket_urlpatterns = [re_path(r'^ws/detection/$', detection_consumer.DetectionStreamConsumer.as_asgi()),re_path(r'^ws/report/$', report_consumer.ReportConsumer.as_asgi()),
]

urls.py:

用于接收前端發送的 http 請求,并與 view.py 中的視圖函數建立聯系

"""
URL configuration for DJI_yolo project.The `urlpatterns` list routes URLs to views. For more information please see:https://docs.djangoproject.com/en/5.1/topics/http/urls/
Examples:
Function views1. Add an import:  from my_app import views2. Add a URL to urlpatterns:  path('', views.home, name='home')
Class-based views1. Add an import:  from other_app.views import Home2. Add a URL to urlpatterns:  path('', Home.as_view(), name='home')
Including another URLconf1. Import the include() function: from django.urls import include, path2. Add a URL to urlpatqterns:  path('blog/', include('blog.urls'))
"""
from django.conf.urls.static import static
from django.contrib import admin
from django.urls import pathfrom DJI_yolo import settings
from app01 import views
urlpatterns = [path("admin/", admin.site.urls),path('csrf/', views.get_csrf_token, name='get_csrf_token'),# 推流測試path('push_stream/', views.push_stream, name='push_stream'),# 媒體庫管理path('api4/date-folders/', views.get_date_folders, name='date-folders'),path('api4/time-folders/<str:date>/', views.get_time_folders, name='time-folders'),path('api4/files/<str:date>/<str:time_range>/<str:category>/count/', views.get_file_count, name='file-count'),path('api4/files/<str:date>/<str:time_range>/<str:category>/', views.get_files, name='files'),path('api4/delete-file/<str:date>/<str:time_range>/<str:category>/<str:filename>/', views.delete_file,name='delete-file'),path('delete-file/<str:date>/<str:time_range>/<str:category>/<str:filename>', views.delete_file,name='delete-file'),  # 添加一個用于刪除文件的 URL 路徑] + static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT)

views.py:

用于處理前端請求,這里報告推流、媒體管理系統、原始視頻存儲邏輯等等。

# views.py
from datetime import datetime
from pathlib import Path
import urllib.parse
from django.middleware.csrf import get_token
import os
import subprocess
import threading
from django.http import JsonResponse
from django.conf import settings
import asyncio
from .consumers.detection_consumer import DetectionStreamConsumer
from .utils import create_date_folder, create_time_folder
from .stream_status import stream_statusMEDIA_ROOT = Path(settings.MEDIA_ROOT)def get_csrf_token(request):token = get_token(request)print(token)return JsonResponse({'csrfToken': token})def push_stream(request):print("前端返回本地視頻,準備推流")video_file = request.FILES.get('video')if not video_file:return JsonResponse({'error': '未上傳視頻文件'}, status=400)# ------------------- 創建精確時間文件夾 -------------------start_datetime = datetime.now()  # 記錄精確到秒的開始時間date_str = start_datetime.strftime('%Y-%m-%d')date_path = create_date_folder(date_str)stream_status.start_time = datetime.now()# 創建帶時分秒的文件夾(如 15:30:45)time_path, time_folder = create_time_folder(date_path, start_datetime)# 創建三類子文件夾original_path = os.path.join(time_path, '原視頻')recognized_path = os.path.join(time_path, '識別視頻')image_path = os.path.join(time_path, '關鍵幀圖片')stream_status.image_path = image_path  # 保存到全局狀態for folder in [original_path, recognized_path, image_path]:os.makedirs(folder, exist_ok=True)# ------------------- 保存上傳視頻 -------------------original_video_name = f"original_{start_datetime.strftime('%H%M%S')}.mp4"original_video_path = os.path.join(original_path, original_video_name)with open(original_video_path, 'wb') as f:for chunk in video_file.chunks():f.write(chunk)# ------------------- 推流配置 -------------------push_url = "rtmp://127.0.0.1:1935/live/stream"recognized_video_name = f"recognized_{start_datetime.strftime('%H%M%S')}.mp4"recognized_video_path = os.path.join(recognized_path, recognized_video_name)command = ["ffmpeg","-re","-i", original_video_path,"-c:v", "libx264","-preset", "ultrafast","-tune", "zerolatency","-g", "50","-r", "30","-an","-f", "flv",push_url,"-f", "mp4",recognized_video_path]try:process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)# 保存完整時間狀態stream_status.is_streaming = Truestream_status.start_datetime = start_datetime  # 保存完整 datetimestream_status.time_path = time_pathstream_status.recognized_path = recognized_pathstream_status.image_path = image_pathdetection_thread = threading.Thread(target=run_detection_in_background,daemon=True)detection_thread.start()return JsonResponse({'status': 'success','date_folder': date_str,'time_folder': time_folder  # 格式如 15:30:45})except Exception as e:return JsonResponse({'error': str(e)}, status=500)def run_detection_in_background():try:loop = asyncio.new_event_loop()asyncio.set_event_loop(loop)loop.run_until_complete(DetectionStreamConsumer().start_detection())except Exception as e:print(f"后臺檢測線程異常: {e}")def get_date_folders(request):page = int(request.GET.get('page', 1))page_size = int(request.GET.get('page_size', 10))MEDIA_ROOT_PATH = Path(r'F:\FullStack\Django\DJI_yolo\media')date_folders = []for name in os.listdir(MEDIA_ROOT_PATH):folder_path = MEDIA_ROOT_PATH / nameif folder_path.is_dir() and len(name.split('-')) == 3:date_folders.append({'date': name})date_folders.sort(key=lambda x: x['date'], reverse=True)start = (page - 1) * page_sizeend = start + page_sizereturn JsonResponse({'data': date_folders[start:end],'total': len(date_folders)})def get_time_folders(request, date):date_path = Path(r'F:\FullStack\Django\DJI_yolo\media') / dateif not date_path.is_dir():return JsonResponse({'error': '日期文件夾不存在'}, status=404)page = int(request.GET.get('page', 1))page_size = int(request.GET.get('page_size', 10))time_folders = []for name in os.listdir(date_path):time_path = date_path / nameif time_path.is_dir():original_count = len(os.listdir(time_path / '原視頻')) if (time_path / '原視頻').is_dir() else 0recognized_count = len(os.listdir(time_path / '識別視頻')) if (time_path / '識別視頻').is_dir() else 0image_count = len(os.listdir(time_path / '關鍵幀圖片')) if (time_path / '關鍵幀圖片').is_dir() else 0time_folders.append({'time_range': name,  # 直接使用時間區間名稱'original_count': original_count,'recognized_count': recognized_count,'image_count': image_count})time_folders.sort(key=lambda x: x['time_range'], reverse=True)start = (page - 1) * page_sizeend = start + page_sizereturn JsonResponse({'data': time_folders[start:end],'total': len(time_folders)})def get_file_count(request, date, time_range, category):category_path = Path(r'F:\FullStack\Django\DJI_yolo\media') / date / time_range / categoryif not category_path.is_dir():return JsonResponse({'count': 0})count = len(os.listdir(category_path))return JsonResponse({'count': count})def get_files(request, date, time_range, category):try:decoded_time_range = urllib.parse.unquote(time_range)category_path = Path(r'F:\FullStack\Django\DJI_yolo\media') / date / decoded_time_range / categoryif not category_path.is_dir():return JsonResponse({'error': '文件類別不存在'}, status=404)page = int(request.GET.get('page', 1))page_size = int(request.GET.get('page_size', 10))files = []for filename in os.listdir(category_path):file_path = category_path / filenameif file_path.is_file():encoded_time_range = urllib.parse.quote(decoded_time_range)files.append({'name': filename,'url': f'http://{request.get_host()}/media/{date}/{encoded_time_range}/{category}/{filename}'})start = (page - 1) * page_sizeend = start + page_sizereturn JsonResponse({'data': files[start:end],'total': len(files),'status': 'success'})except Exception as e:print(f"文件列表錯誤: {str(e)}")return JsonResponse({'error': '服務器內部錯誤','detail': str(e)}, status=500)def delete_file(request, date, time_range, category, filename):try:decoded_time_range = urllib.parse.unquote(time_range)file_path = Path(r'F:\FullStack\Django\DJI_yolo\media') / date / decoded_time_range / category / filenameif not file_path.exists():return JsonResponse({'error': '文件不存在'}, status=404)os.remove(file_path)return JsonResponse({'status': 'success'})except Exception as e:return JsonResponse({'error': str(e)}, status=500)

感謝您的觀看!

資源如下:

https://download.csdn.net/download/2403_83182682/90961392?spm=1001.2014.3001.5501https://download.csdn.net/download/2403_83182682/90961392?spm=1001.2014.3001.5501

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/908679.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/908679.shtml
英文地址,請注明出處:http://en.pswp.cn/news/908679.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

基于自適應虛擬諧波阬的光儲VSG并網電流諧波抑制模型

“電氣仔推送”獲得資料&#xff08;專享優惠&#xff09; 模型簡介 此模型完全復現于《基于自適應虛擬阻抗的光儲并網系統諧波抑制策略》-程靜 此并網系統模型的核心控制為虛擬同步發電機&#xff08;VSG&#xff09;控制&#xff0c;采用基于混合廣義積分器的諧波信號提取…

【RockeMQ】第2節|RocketMQ快速實戰以及核?概念詳解(二)

升級Dledger高可用集群 一、主從架構的不足與Dledger的定位 主從架構缺陷 數據備份依賴Slave節點&#xff0c;但無自動故障轉移能力&#xff0c;Master宕機后需人工切換&#xff0c;期間消息可能無法讀取。Slave僅存儲數據&#xff0c;無法主動升級為Master響應請求&#xff…

【會員專享數據】2017-2024年我國分省的10米精度土地覆蓋數據

土地覆蓋數據是我們在各項研究中都非常常用的數據&#xff0c;之前我們分享過2017-2024年全球范圍的10米精度土地覆蓋數據&#xff08;均可查看之前的文章獲悉詳情&#xff09;&#xff01;該數據提供瓦片形式&#xff0c;也就是全球的數據沒有拼成一張圖&#xff0c;很多小伙伴…

通過css實現正方體效果

效果 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Document</title><style>/* css實…

Java并發編程-理論基礎

Java并發編程-理論基礎 1、什么是進程&#xff1f; 進程&#xff08;Process&#xff09;是計算機中的程序關于某數據集合上的一次運行活動&#xff0c;是系統進行資源分配的基本單位&#xff0c;是操作系統結構的基礎。在早期面向進程設計的計算機結構中&#xff0c;進程是程…

Tornado WebSocket實時聊天實例

在 Python 3 Tornado 中使用 WebSocket 非常直接。你需要創建一個繼承自 tornado.websocket.WebSocketHandler 的類&#xff0c;并實現它的幾個關鍵方法。 下面是一個簡單的示例&#xff0c;演示了如何創建一個 WebSocket 服務器&#xff0c;該服務器會接收客戶端發送的消息&a…

模塊化架構下的前端調試體系建設:WebDebugX 與多工具協同的工程實踐

隨著前端工程化的發展&#xff0c;越來越多的項目采用模塊化架構&#xff1a;單頁面應用&#xff08;SPA&#xff09;、微前端、組件化框架等。這類架構帶來了良好的可維護性和復用性&#xff0c;但也帶來了新的調試挑戰。 本文結合我們在多個模塊化項目中的真實經驗&#xff…

高考:如何合理選擇學科、專業以及職業

如何合理選擇學科、專業以及職業 一、自我認知&#xff1a;明確自身興趣與優勢&#xff08;一&#xff09;興趣探索&#xff08;二&#xff09;能力評估&#xff08;三&#xff09;價值觀與目標 二、外部調研&#xff1a;深入了解學科、專業與職業&#xff08;一&#xff09;學…

【新品解讀】一板多能,AXRF49 定義新一代 RFSoC FPGA 開發平臺

“硬件系統龐雜、調試周期長” “高頻模擬前端不穩定&#xff0c;影響采樣精度” “接收和發射鏈路難以同步&#xff0c;難以擴展更多通道” “數據流量大&#xff0c;處理與存儲跟不上” 這些是大部分客戶在構建多通道、高頻寬的射頻采樣鏈路時&#xff0c;面臨的主要問題。…

實現仿中國婚博會微信小程序

主要功能&#xff1a; 1、完成底部標簽導航設計、首頁海報輪播效果設計和宮格導航設計&#xff0c;如圖1所示 2、在首頁里&#xff0c;單擊全部分類宮格導航的時候&#xff0c;會進入到全部分類導航界面&#xff0c;把婚博會相關內容的導航集成到一個界面里&#xff0c;如圖2…

MySQL強化關鍵_020_SQL 優化

目 錄 一、order by 優化 1.未添加索引 2.添加索引 3.復合索引默認升序排列 4.復合索引降序排列 5.復合索引升序降序排列并用 6.總結 二、group by 優化 1.未添加索引 2.添加索引 3.添加復合索引 三、limit 優化 四、主鍵優化 1.主鍵設計原則 五、insert 優化…

湖北理元理律師事務所視角:企業債務優化的三維平衡之道

核心提示&#xff1a;債務優化的本質不是消滅債務&#xff0c;而是在法律框架內重建財務可持續性。 一、企業債務危機的典型誤區 某制造企業主曾向我坦言&#xff1a;“用新貸還舊貸3年&#xff0c;債務從200萬滾到500萬。”這類案例暴露出企業債務處置的共性痛點&#xff1a…

【Ragflow】27.RagflowPlus(v0.4.1):小版本迭代,問題修復與功能優化

概述 RagflowPlus v0.4.0 在發布后&#xff0c;收到了積極的反饋&#xff0c;同時也包含一些問題。 本次進行一輪小版本更新&#xff0c;發布 v0.4.1 版本&#xff0c;對已知問題進行修復&#xff0c;并對部分功能進行進一步優化。 開源地址&#xff1a;https://github.com/…

【hadoop】Flink安裝部署

一、單機模式 步驟&#xff1a; 1、使用XFTP將Flink安裝包flink-1.13.5-bin-scala_2.11.tgz發送到master機器的主目錄。 2、解壓安裝包&#xff1a; tar -zxvf ~/flink-1.13.5-bin-scala_2.11.tgz 3、修改文件夾的名字&#xff0c;將其改為flume&#xff0c;或者創建軟連接…

Linux 下 ChromeDriver 安裝

個人博客地址&#xff1a;Linux 下 ChromeDriver 安裝 | 一張假鈔的真實世界 Selenium 是一個用于 Web 應用程序測試的工具。可以通過它驅動瀏覽器執行特定的操作&#xff0c;如點擊、下滑、資源加載與渲染等。該工具在爬蟲開發中也非常有幫助。Selenium 需要通過瀏覽器驅動操…

Canal環境搭建并實現和ES數據同步

作者&#xff1a;田超凡 日期&#xff1a;2025年6月7日 Canal安裝&#xff0c;啟動端口11111、8082&#xff1a; 安裝canal-deployer服務端&#xff1a; https://github.com/alibaba/canal/releases/1.1.7/canal.deployer-1.1.7.tar.gz cd /opt/homebrew/etc mkdir canal…

STM32使用土壤濕度傳感器

1.1 介紹&#xff1a; 土壤濕度傳感器是一種傳感裝置&#xff0c;主要用于檢測土壤濕度的大小&#xff0c;并廣泛應用于汽車自動刮水系統、智能燈光系統和智能天窗系統等。傳感器采用優質FR-04雙料&#xff0c;大面積5.0 * 4.0厘米&#xff0c;鍍鎳處理面。 它具有抗氧化&…

鎖的藝術:深入淺出講解樂觀鎖與悲觀鎖

在多線程和分布式系統中&#xff0c;數據一致性是一個核心問題。鎖機制作為解決并發沖突的重要手段&#xff0c;被廣泛應用于各種場景。樂觀鎖和悲觀鎖是兩種常見的鎖策略&#xff0c;它們在設計理念、實現方式和適用場景上各有特點。本文將深入探討樂觀鎖和悲觀鎖的原理、實現…

Jinja2深度解析與應用指南

1. 概念與用途 1.1 核心概念 Jinja2是Python生態中功能強大的模板引擎&#xff0c;采用邏輯與表現分離的設計思想&#xff1a; 模板&#xff1a;包含靜態內容和動態占位符的文本文件&#xff08;.j2后綴&#xff09;渲染&#xff1a;將模板與數據結合生成最終文本的過程上下…

Ubuntu20.04中 Redis 的安裝和配置

Ubuntu20.04 中 Redis 的安裝和配置 Ubuntu 安裝 MySQL 及其配置 1. Redis 的安裝 更新系統包列表并安裝 Redis &#xff1a; # 更新包管理工具 sudo apt update# -y&#xff1a;自動確認所有提示&#xff08;非交互式安裝&#xff09; sudo apt install -y redis-server測…