前言
需要準備windows10操作系統,python3.11.9,cuDNN8.9.2.26,CUDA11.8,paddleDetection2.7
流程:
- 準備數據集-澳洲鰲蝦VOC數據集?
- 基于RT-DETR目標檢測模型訓練
- 導出onnx模型進行python部署
- 平滑濾波處理視頻幀保留的物體質心坐標
- 基于pywebview為軟件前端,falsk為軟件后端制作UI
- 使用pyinstaller打包成exe
- 使用into setup生成安裝包
本人代碼禁止任何商業化用途,個人開發者隨意。所有代碼均開源
項目目錄
XXX 項目總目錄static 存放js靜態文件plotly.jstemplates 存放html文件index.htmltemp 用戶上傳文件保存路徑venv 虛擬環境main.py 主程序model.onnx 模型文件1.ico 打包的程序圖標
準備數據集
點擊下載澳洲鰲蝦VOC數據集
下載后解壓,文件目錄為
dataAnnotations0.xml1.xml...imgs0.jpg1.jpg...lables.txt
然后使用如下的腳本把數據集劃分為訓練集和測試集
import os
import random
import shutildef splitDatasets(images_dir,xmls_dir,train_dir,test_dir):if os.path.exists(train_dir):shutil.rmtree(train_dir)os.makedirs(train_dir)os.makedirs(train_dir+'/imgs')os.makedirs(train_dir+'/annotations')if os.path.exists(test_dir):shutil.rmtree(test_dir)os.makedirs(test_dir)os.makedirs(test_dir+'/imgs')os.makedirs(test_dir+'/annotations')images=os.listdir(images_dir)random.shuffle(images)split_index=int(0.9*len(images))train_images=images[:split_index]test_images=images[split_index:]with open(train_dir+'/train.txt','w') as file:for img in train_images:shutil.copy(os.path.join(images_dir,img),os.path.join(train_dir,'imgs',img))ann=img.replace('jpg','xml')shutil.copy(os.path.join(xmls_dir,ann),os.path.join(train_dir,'annotations',ann))line=os.path.join(train_dir,'imgs',img)+' '+os.path.join(train_dir,'annotations',ann)+'\n'file.write(line)with open(test_dir+'/test.txt','w') as file:for img in test_images:shutil.copy(os.path.join(images_dir,img),os.path.join(test_dir,'imgs',img))ann=img.replace('jpg','xml')shutil.copy(os.path.join(xmls_dir,ann),os.path.join(test_dir,'annotations',ann))line=os.path.join(test_dir,'imgs',img)+' '+os.path.join(test_dir,'annotations',ann)+'\n'file.write(line)shutil.rmtree(images_dir)shutil.rmtree(xmls_dir)if __name__=='__main__':# 填寫img文件夾所在絕對路徑images_dir='/home/aistudio/work/voc/imgs'# 填寫Annotations文件夾所在絕對路徑xmls_dir='/home/aistudio/work/voc/Annotations'# 填寫 訓練集 的存放的絕對路徑train_dir='/home/aistudio/work/voc/trains'# 填寫 測試集 的存放的絕對路徑test_dir='/home/aistudio/work/voc/tests'splitDatasets(images_dir,xmls_dir,train_dir,test_dir)
訓練模型
可在aistudio云平臺訓練,我放好了所有的相關文件,點擊進入,里面的說明很詳細
也可在本地進行訓練,下面來配置本地的訓練環境
配置相關文件
下載paddleDetection2.7
原始目錄如下
paddleDetection2.7.github.travisactivitybenchmarkconfigs 模型配置文件dataset 里面有數據集下載的腳本文件demodeploy 推理的相關文件docs 說明文檔industrial_tutorialppdet 模型運行的核心文件scriptstest_pictools 模型訓練入口,測試,驗證,導出等腳本文件.gitignore.pre-commit-config.yaml.style.yapf.travis.ymlLICENSEREADME_cn.md 說明文檔中文版README_en.md 說明文檔英文版requirements.txt 相關依賴庫setup.py 模型編譯的相關腳本
需要刪除一些目錄,把README_en.md改名為README.md,處理過的目錄如下
paddleDetection2.7configsdatasetdeployppdettoolsREADME.mdrequirements.txtsetup.py
把dataset里所有東西都刪除,再將劃分好的數據集放到該文件下,處理好的目錄如下
datasetvoctrainsannotationsimgstrain.txttestsannotationsimgstest.txtlabels.txt
進入tools目錄,只保留如下文件,其余全刪除,處理后的文件目錄如下
toolstrain.pyinfer.pyeval.pyexport_model.py
進入configs目錄,只保留下面三個文件和目錄,處理后的目錄如下
configsdatasetsrtdetrruntime.yml
進入datasets目錄,只保留voc.yml,其余文件全刪除,處理后的目錄如下
datasetsvoc.yml
并用如下內容覆蓋voc.yml
metric: VOC
map_type: 11point
num_classes: 1TrainDataset:name: VOCDataSetdataset_dir: dataset/vocanno_path: trains/train.txtlabel_list: labels.txtdata_fields: ['image', 'gt_bbox', 'gt_class', 'difficult']EvalDataset:name: VOCDataSetdataset_dir: dataset/vocanno_path: tests/test.txtlabel_list: labels.txtdata_fields: ['image', 'gt_bbox', 'gt_class', 'difficult']TestDataset:name: ImageFolderanno_path: dataset/labels.txt
進入rtdetr目錄,只保留如下2個文件和目錄,處理后的目錄如下:
rtdetr_base_rtdetr_hgnetv2_x_6x_coco.yml
進入_base_目錄,找到optimizer_6x.yml,修改第一行為epoch: 200,意思是訓練200輪
找到rtdetr_reader.yml,根據自己的CPU和GPU調整相關參數,如果是4核CPU,worker_num可為8,batch_size根據顯存調整,占用80%到90%的顯存即可
安裝依賴庫
建議在虛擬環境中操作
!pip install -r requirements.txt
!pip install pycocotools
!pip install filterpy
!pip install flask
!pip install pyinstaller
!pip install pywebview
!pip install onnxruntime-gpu
!pip install onnxruntime
!pip install onnx
!pip install paddle2onnx
!python setup.py install
開始訓練
建議命令行輸入,先進入paddleDetection所在位置,再執行以下命令
python tools/train.py -c configs/rtdetr/rtdetr_hgnetv2_x_6x_coco.yml --eval --use_vdl True --vdl_log_dir vdl_log_dir/scalar
然后就是漫長的等待
導出模型
生成的模型在paddleDetection/output/best_model/model.pdparams
先進入paddleDetection所在位置,再執行以下命令
python tools/export_model.py -c configs/rtdetr/rtdetr_hgnetv2_x_6x_coco.yml -o weights=output/best_model/model.pdparams
轉onnx
先進入paddleDetection所在位置,再執行以下命令,可以根據需要選擇保存路徑
paddle2onnx --model_dir=output_inference/rtdetr_hgnetv2_x_6x_coco/ \--model_filename model.pdmodel \--params_filename model.pdiparams \--opset_version 16 \--save_file /home/work/infer/model.onnx
模型部署
導包
import webview
from flask import Flask, request, jsonify,render_template,stream_with_context,Response
import os
import time
import cv2
from onnxruntime import InferenceSession
import numpy as np
from werkzeug.utils import secure_filename
總覽代碼
class TrackShrimp():def __init__(self,video_path,model_path,onnx_threshold=0.7):# 獲取幀數據self.cap=self.init_video(video_path)frame_width=int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))frame_height=int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))# 圖形尺寸im_shape = np.array([[frame_height, frame_width]], dtype='float32')# y軸縮放量self.im_scale_y=640.0/frame_height# x軸縮放量self.im_scale_x=640.0/frame_widthscale_factor = np.array([[self.im_scale_y,self.im_scale_x]]).astype('float32')# 定義模型輸入self.inputs_dict = {'im_shape': im_shape,'image': None,'scale_factor': scale_factor}# 初始化模型self.sess=self.init_session(model_path)# 模型輸出閾值self.onnx_threshold=onnx_thresholddef init_video(self,input_path):cap=cv2.VideoCapture(input_path)if not cap.isOpened():raise ValueError(f'無法打開視頻{input_path}')return capdef init_session(self,model_path):try:return InferenceSession(model_path, providers=['CUDAExecutionProvider']) except:return InferenceSession(model_path, providers=['CPUExecutionProvider'])def precess_img(self,frame):img = cv2.resize(frame, None,None,fx=self.im_scale_x,fy=self.im_scale_y,interpolation=2)img = img.astype(np.float32) / 255.0img = np.transpose(img, [2, 0, 1])img = img[np.newaxis, :, :, :]return imgdef postcess(self,results:np.ndarray,all_centers:list[np.ndarray]):results=results[(results[:, 0] == 0) & (results[:, 1] > self.onnx_threshold)]x_centers = (results[:, 2] + results[:, 4]) / 2y_centers = (results[:, 3] + results[:, 5]) / 2centers = np.column_stack((x_centers, y_centers))all_centers.extend(centers)def by_smoothfilter(self,centers:list[np.ndarray],window_size=24):""":param centers: list[np.ndarray,np.ndarray,...]:param window_size: 平滑窗口大小:return: 平滑后的質心坐標NumPy數組"""centers=np.stack(centers)# 計算滑動窗口的平均值,pad函數在序列前后補零以處理邊界情況padded_centers = np.pad(centers, ((window_size//2, window_size//2), (0, 0)), mode='edge')window_sum = np.cumsum(padded_centers, axis=0)smoothed_centers = (window_sum[window_size:] - window_sum[:-window_size]) / window_sizereturn smoothed_centersdef calculate_distance(self,centers:np.ndarray):'''centers:np.ndarray n*2'''# 計算相鄰點之間的差diffs = centers[1:] - centers[:-1]# 計算每個差值的歐幾里得距離distances = np.linalg.norm(diffs, axis=1)# 計算總路程return int(np.sum(distances))def gain_position(self,centers:np.ndarray):position_list=centers.tolist()return position_listdef run(self):global scheduleglobal run_task# 幀數frame_count=int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))frame_number=0center_list=[]for frame_number in range(frame_count):if not run_task:returnsuccess, frame = self.cap.read()if not success:breakschedule=int(frame_number/frame_count*100)# 打印進度if frame_number%10==0:print('Process: ',schedule)# 圖片預處理img=self.precess_img(frame)self.inputs_dict['image']=imgresults=self.sess.run(None,self.inputs_dict)[0]if results is not None:self.postcess(results,center_list)# 使用平滑濾波filtered_centers = self.by_smoothfilter(center_list)self.cap.release()# 返回路程,軌跡坐標return self.calculate_distance(filtered_centers),self.gain_position(filtered_centers)
由于是對視頻進行推理,所以首先得初始化視頻打開的方法
def init_video(self,input_path):cap=cv2.VideoCapture(input_path)if not cap.isOpened():raise ValueError(f'無法打開視頻{input_path}')return cap
初始化onnx運行引擎,優先使用顯卡,如果CUDA環境有問題,就使用CPU運行
def init_session(self,model_path):try:return InferenceSession(model_path, providers=['CUDAExecutionProvider']) except:return InferenceSession(model_path, providers=['CPUExecutionProvider'])
onnx引擎需要一定的輸入格式,放到類的init里
def __init__(self,video_path,model_path,onnx_threshold=0.7):# 獲取幀數據self.cap=self.init_video(video_path)frame_width=int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))frame_height=int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))# 圖形尺寸im_shape = np.array([[frame_height, frame_width]], dtype='float32')# y軸縮放量self.im_scale_y=640.0/frame_height# x軸縮放量self.im_scale_x=640.0/frame_widthscale_factor = np.array([[self.im_scale_y,self.im_scale_x]]).astype('float32')# 定義模型輸入self.inputs_dict = {'im_shape': im_shape,'image': None,'scale_factor': scale_factor}# 初始化模型self.sess=self.init_session(model_path)# 模型輸出閾值self.onnx_threshold=onnx_threshold
在提取每一幀后需要進行圖像處理,resize圖片為模型輸入的要求,歸一化
def precess_img(self,frame):img = cv2.resize(frame, None,None,fx=self.im_scale_x,fy=self.im_scale_y,interpolation=2)img = img.astype(np.float32) / 255.0img = np.transpose(img, [2, 0, 1])img = img[np.newaxis, :, :, :]return img
在提取到視頻的每一幀中的鰲蝦的質心坐標后,由于每一幀的圖像都不一樣,輸入模型后再輸出的結果就不一樣,會抖動,也就是噪聲,我們需要濾波去噪,這里使用平滑濾波,相比卡爾曼濾波簡單使用快速出結果。
def by_smoothfilter(self,centers:list[np.ndarray],window_size=24):""":param centers: list[np.ndarray,np.ndarray,...]:param window_size: 平滑窗口大小:return: 平滑后的質心坐標NumPy數組"""centers=np.stack(centers)# 計算滑動窗口的平均值,pad函數在序列前后補零以處理邊界情況padded_centers = np.pad(centers, ((window_size//2, window_size//2), (0, 0)), mode='edge')window_sum = np.cumsum(padded_centers, axis=0)smoothed_centers = (window_sum[window_size:] - window_sum[:-window_size]) / window_sizereturn smoothed_centers
我們需要計算鰲蝦的運動總路程,用濾波后的質心坐標計算
def calculate_distance(self,centers:np.ndarray):'''centers:np.ndarray n*2'''# 計算相鄰點之間的差diffs = centers[1:] - centers[:-1]# 計算每個差值的歐幾里得距離distances = np.linalg.norm(diffs, axis=1)# 計算總路程return int(np.sum(distances))
濾波后的質心坐標是numpy數組,需要一定的轉換再發送到前端進行渲染(matplotlib畫的圖太丑了,不如plotly.js)
def gain_position(self,centers:np.ndarray):position_list=centers.tolist()return position_list
在獲取每一幀圖像后,送入模型。模型會輸出一對numpy數組,需要進行一對的后處理,低于閾值的就拋棄,然后取閾值最高的,計算質心坐標并保存
def postcess(self,results:np.ndarray,all_centers:list[np.ndarray]):results=results[(results[:, 0] == 0) & (results[:, 1] > self.onnx_threshold)]x_centers = (results[:, 2] + results[:, 4]) / 2y_centers = (results[:, 3] + results[:, 5]) / 2centers = np.column_stack((x_centers, y_centers))all_centers.extend(centers)
需要在一個主函數里將上述打開視頻,圖像預處理,送入模型,后處理連起來
def run(self):global scheduleglobal run_task# 幀數frame_count=int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))frame_number=0center_list=[]for frame_number in range(frame_count):if not run_task:returnsuccess, frame = self.cap.read()if not success:breakschedule=int(frame_number/frame_count*100)# 打印進度if frame_number%10==0:print('Process: ',schedule)# 圖片預處理img=self.precess_img(frame)self.inputs_dict['image']=imgresults=self.sess.run(None,self.inputs_dict)[0]if results is not None:self.postcess(results,center_list)# 使用平滑濾波filtered_centers = self.by_smoothfilter(center_list)self.cap.release()# 返回路程,軌跡坐標return self.calculate_distance(filtered_centers),self.gain_position(filtered_centers)
前端的設計
以pywebview為平臺,html和css設計前端
?
?
?
代碼總覽
index.html
<!DOCTYPE html>
<html>
<head><title></title><link rel="shortcut icon" href="#" /><script src="https://code.jquery.com/jquery-3.6.0.min.js"></script><script src="../static/plotly.js"></script><style>html,body{width: 100%;height: 100%;margin: 0 auto;}body{display: flex;align-items: center;justify-content: center;height: 100vh;background-color: rgb(6, 32, 80);}main{display: grid;grid-template-columns: 1fr 3fr;column-gap: 2%;width: 98%;height: 98%;}fieldset{border: 2px solid rgb(32, 139, 139);color: rgb(32, 139, 139);margin: 8% 0 8% 0;}#s2{text-align: center;display: flex;justify-content: center;align-items: center;background-color: rgba(32, 139, 139, 0.301);border: 2px solid rgb(32, 139, 139);}#progress-circle{border: 1em solid rgb(32, 139, 139);width: 40vh;height: 40vh;border-radius: 20vh;display: flex; justify-content: center; align-items: center;}#progress-num{font-size: 18vh;color: rgb(32, 139, 139);}</style>
</head>
<body><main><section id="s1"><form id="form" enctype="multipart/form-data"><fieldset><legend>選擇你要檢測的視頻</legend><input type="file" accept=".mp4" id="video" name="vedio"></fieldset><fieldset><legend>功能按鍵</legend><button onclick="submit_to()" id="submit">開始上傳</button><button onclick="stopRun()">終止運行</button></fieldset></form><script>async function stopRun(){try{const response=await fetch('/stopRun',{method:'POST'})if (!response.ok) { throw new Error('Network response was not ok.'); }data=await response.json()alert(data.data)}catch(error){console.log(error)}}async function submit_to(){// 防重復激發const button = document.getElementById('submit'); button.disabled = true;try{// 獲取文件const input=document.getElementById('video')const file=input.files[0]if (!file){throw new Error('未選擇文件')}if(file.type!=='video/mp4'){throw new Error('請選擇MP4文件')}// 刷新界面 const s2=document.getElementById('s2')Plotly.purge(s2)// 初始化進度顯示const progressCircle=document.getElementById('progress-circle')const progressNum=document.getElementById('progress-num')progressCircle.style.display='flex'progressNum.innerHTML='0%'// 更新進度let source = new EventSource("/progress")source.onmessage = function(event) {progressNum.innerHTML = event.data+'%'}// 發送請求const formData=new FormData()formData.append('video', file)const response=await fetch('/shrimp',{method:'POST',body:formData})if (!response.ok) {throw new Error('Network response was not ok.'); }source.close()const data=await response.json()button.disabled=falseif(data.data==='任務被終止'){alert(data.data)}else{progressCircle.style.display='none'$('#distance').text('總路程'+data.distance)// 畫圖var trace=[{x: data.position_data.map(item=>item[0]),y: data.position_data.map(item=>item[1]),mode:"lines",line:{color:'rgb(32, 139, 139)'}}]var layout = {xaxis: {range: [0, 600],title: "x(像素)",titlefont: { color: 'rgb(32, 139, 139)' // 軸標簽顏色 }, linecolor: 'rgb(32, 139, 139)', // 軸線顏色 tickfont: { color: 'lrgb(32, 139, 139)' // 軸刻度標簽顏色 }},yaxis: {range: [0, 600],title: "y(像素)",titlefont: { color: 'rgb(32, 139, 139)' // 軸標簽顏色 }, linecolor: 'rgb(32, 139, 139)', // 軸線顏色 tickfont: { color: 'lrgb(32, 139, 139)' // 軸刻度標簽顏色 }}, title: "鰲蝦運動軌跡",titlefont:{color:'rgb(32, 139, 139)'},plot_bgcolor: 'rgba(0,0,0,0)',paper_bgcolor:'rgba(0,0,0,0)'}Plotly.newPlot("s2", trace, layout,{scrollZoom: true,editable: true }) }}catch(error){button.disabled = falseif(error.message.startsWith('Failed to fetch')){}else{alert(error)}}}</script><fieldset><legend>輸出結果</legend><P id="distance">總路程:</P></fieldset><fieldset><legend>注意事項</legend><p>本程序運行將消耗大量算力和內存,最好使用高配電腦。不支持windows10以下的操作系統。在后臺有任務在跑時,切勿重復上傳視頻,等待后臺跑完出圖時再上傳新的視頻。如果選錯視頻并上傳了,請點擊'終止運行'再重新上傳視頻。有問題聯系wx:m989783106</p></fieldset></section><section id="s2"><div id="progress-circle"><p id="progress-num"></p></div></section></main>
</body>
</html>
plotly.js從官網下載
代碼分覽
總體設計是以<html>和<body>為底,<main>為主容器內使用grid2列布局,2個<section>作為內容器占據左右2個網格。
左邊的<section>容納文件上傳表單,功能按鈕,數據顯示,使用說明
<section id="s1"><form id="form" enctype="multipart/form-data"><fieldset><legend>選擇你要檢測的視頻</legend><input type="file" accept=".mp4" id="video" name="vedio"></fieldset><fieldset><legend>功能按鍵</legend><button onclick="submit_to()" id="submit">開始上傳</button><button onclick="stopRun()">終止運行</button></fieldset></form><fieldset><legend>輸出結果</legend><P id="distance">總路程:</P></fieldset><fieldset><legend>注意事項</legend><p>本程序運行將消耗大量算力和內存,最好使用高配電腦。不支持windows10以下的操作系統。在后臺有任務在跑時,切勿重復上傳視頻,等待后臺跑完出圖時再上傳新的視頻。如果選錯視頻并上傳了,請點擊'終止運行'再重新上傳視頻。有問題聯系wx:m989783106</p></fieldset></section>
之間用<fieldset>做了區域劃分,簡單又美觀。
<button>均使用onclick屬性進行觸發
在上傳前會檢測用戶是否選擇文件,是否選擇的是MP4文件
// 獲取文件
const input=document.getElementById('video')
const file=input.files[0]
if (!file){throw new Error('未選擇文件')
}
if(file.type!=='video/mp4'){throw new Error('請選擇MP4文件')
}
?一共有3個請求:
- 請求上傳文件,將MP4上傳給后端,然后后端運行模型發送質心坐標給前端渲染
- 請求終止程序,當用戶想終止后端運行模型,重新上傳文件時
- 請求獲取模型處理進度,后端返回進度給前端,前端進行渲染展示
畫軌跡圖,前端用plotly.js將質心坐標進行渲染,同時軌跡圖還有一定的交互能力。
// 畫圖
var trace=[{x: data.position_data.map(item=>item[0]),y: data.position_data.map(item=>item[1]),mode:"lines",line:{color:'rgb(32, 139, 139)'}
}]
var layout = {xaxis: {range: [0, 600],title: "x(像素)",titlefont: { color: 'rgb(32, 139, 139)' // 軸標簽顏色 }, linecolor: 'rgb(32, 139, 139)', // 軸線顏色 tickfont: { color: 'lrgb(32, 139, 139)' // 軸刻度標簽顏色 }},yaxis: {range: [0, 600],title: "y(像素)",titlefont: { color: 'rgb(32, 139, 139)' // 軸標簽顏色 }, linecolor: 'rgb(32, 139, 139)', // 軸線顏色 tickfont: { color: 'lrgb(32, 139, 139)' // 軸刻度標簽顏色 }}, title: "鰲蝦運動軌跡",titlefont:{color:'rgb(32, 139, 139)'},plot_bgcolor: 'rgba(0,0,0,0)',paper_bgcolor:'rgba(0,0,0,0)'}
Plotly.newPlot("s2", trace, layout,{scrollZoom: true,editable: true })
其余的就是代碼的排布順序,異步執行調度,錯誤處理能力,系統穩定性,用戶交互能力的提升,細節很多,均包含在代碼中
右邊的<section>容納進度圈,軌跡圖
<section id="s2"><div id="progress-circle"><p id="progress-num"></p></div></section>
在文件上傳時,就初始化渲染進度條,然后異步請求獲取進度,渲染到頁面;當進度到達一定值,比如99%,就關閉獲取進度的請求,同時設置進度條的display=none。當用戶打斷程序執行或者重新運行程序,就清理軌跡圖,初始化進度條,循環往復。
后端設計
后端整體使用flask,jinjia模板,將flask與pywebview結合。把模型檢測代碼封裝到一個類TrackShrimp,其余的就是各種請求函數。
代碼總覽
import webview
from flask import Flask, request, jsonify,render_template,stream_with_context,Response
import os
import time
import cv2
from onnxruntime import InferenceSession
import numpy as np
from werkzeug.utils import secure_filenameclass TrackShrimp():def __init__(self,video_path,model_path,onnx_threshold=0.7):# 獲取幀數據self.cap=self.init_video(video_path)frame_width=int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))frame_height=int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))# 圖形尺寸im_shape = np.array([[frame_height, frame_width]], dtype='float32')# y軸縮放量self.im_scale_y=640.0/frame_height# x軸縮放量self.im_scale_x=640.0/frame_widthscale_factor = np.array([[self.im_scale_y,self.im_scale_x]]).astype('float32')# 定義模型輸入self.inputs_dict = {'im_shape': im_shape,'image': None,'scale_factor': scale_factor}# 初始化模型self.sess=self.init_session(model_path)# 模型輸出閾值self.onnx_threshold=onnx_thresholddef init_video(self,input_path):cap=cv2.VideoCapture(input_path)if not cap.isOpened():raise ValueError(f'無法打開視頻{input_path}')return capdef init_session(self,model_path):try:return InferenceSession(model_path, providers=['CUDAExecutionProvider']) except:return InferenceSession(model_path, providers=['CPUExecutionProvider'])def precess_img(self,frame):img = cv2.resize(frame, None,None,fx=self.im_scale_x,fy=self.im_scale_y,interpolation=2)img = img.astype(np.float32) / 255.0img = np.transpose(img, [2, 0, 1])img = img[np.newaxis, :, :, :]return imgdef postcess(self,results:np.ndarray,all_centers:list[np.ndarray]):results=results[(results[:, 0] == 0) & (results[:, 1] > self.onnx_threshold)]x_centers = (results[:, 2] + results[:, 4]) / 2y_centers = (results[:, 3] + results[:, 5]) / 2centers = np.column_stack((x_centers, y_centers))all_centers.extend(centers)def by_smoothfilter(self,centers:list[np.ndarray],window_size=24):""":param centers: list[np.ndarray,np.ndarray,...]:param window_size: 平滑窗口大小:return: 平滑后的質心坐標NumPy數組"""centers=np.stack(centers)# 計算滑動窗口的平均值,pad函數在序列前后補零以處理邊界情況padded_centers = np.pad(centers, ((window_size//2, window_size//2), (0, 0)), mode='edge')window_sum = np.cumsum(padded_centers, axis=0)smoothed_centers = (window_sum[window_size:] - window_sum[:-window_size]) / window_sizereturn smoothed_centersdef calculate_distance(self,centers:np.ndarray):'''centers:np.ndarray n*2'''# 計算相鄰點之間的差diffs = centers[1:] - centers[:-1]# 計算每個差值的歐幾里得距離distances = np.linalg.norm(diffs, axis=1)# 計算總路程return int(np.sum(distances))def gain_position(self,centers:np.ndarray):position_list=centers.tolist()return position_listdef run(self):global scheduleglobal run_task# 幀數frame_count=int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))frame_number=0center_list=[]for frame_number in range(frame_count):if not run_task:returnsuccess, frame = self.cap.read()if not success:breakschedule=int(frame_number/frame_count*100)# 打印進度if frame_number%10==0:print('Process: ',schedule)# 圖片預處理img=self.precess_img(frame)self.inputs_dict['image']=imgresults=self.sess.run(None,self.inputs_dict)[0]if results is not None:self.postcess(results,center_list)# 使用平滑濾波filtered_centers = self.by_smoothfilter(center_list)self.cap.release()# 返回路程,軌跡坐標return self.calculate_distance(filtered_centers),self.gain_position(filtered_centers)app = Flask(__name__)
UPLOAD_FOLDER = './temp'
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
schedule=0
run_task=Falsedef run_flask():app.run(debug=False, threaded=True,host='127.0.0.1',port=5000)def video_process(video_path):return TrackShrimp(video_path,'./model.onnx').run()# 主頁面
@app.route('/',methods=['POST','GET'])
def return_main_page():return render_template('index.html')# 檢測視頻頁面
@app.route('/shrimp',methods=['POST'])
def shrimp_track():global run_taskrun_task=Truefile=request.files.get('video')filename = secure_filename(file.filename)video_path=os.path.join(app.config['UPLOAD_FOLDER'],filename)file.save(video_path)try:results=video_process(video_path)if results is not None:distance,position_data=resultsdata = {'distance': distance,'position_data': position_data}run_task=Falsereturn jsonify(data)else:return jsonify({'data':'任務被終止'})except Exception as e:print('error:',e)return jsonify({'data':'任務被終止'})finally:if os.path.exists(video_path):os.remove(video_path)@app.route('/stopRun',methods=['GET','POST'])
def stopRun():global run_taskglobal scheduleif run_task:run_task=Falseschedule=0return jsonify({'data':'正在停止任務'})else:return jsonify({'data':'當前沒有任務運行'})# 進度查詢路由
@app.route('/progress',methods=['GET'])
def progress():@stream_with_contextdef generate():global run_taskratio = schedulewhile ratio < 95 and run_task:yield "data:" + str(ratio) + "\n\n"ratio = scheduletime.sleep(5)return Response(generate(), mimetype='text/event-stream')if __name__=='__main__':# 啟動后端 # flask_thread = threading.Thread(target=run_flask) # flask_thread.start()# time.sleep(1)# 啟動前端webview.create_window('鰲蝦軌跡偵測',url=app,width=900,height=600)# webview.create_window('鰲蝦軌跡偵測',url=f'http://127.0.0.1:5000',width=900,height=600)webview.start()
代碼分覽
一個onnx部署的類TrackShrimp,詳細見前面。
一些常量的定義
app = Flask(__name__)
UPLOAD_FOLDER = './temp' # 文件的上傳路徑,后端需要該路徑保留用戶上傳的文件
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
schedule=0 # 實時進度,初始化進度為0
run_task=False # 一個onnx模型是否在運行的標志,用于接收用戶中斷信號從而終止模型運行
定義一個flask的·啟動函數,用于web調試,瀏覽器F12啟動調試窗口
def run_flask():app.run(debug=False, threaded=True,host='127.0.0.1',port=5000)
主頁面的請求函數,該頁面為主要的UI
# 主頁面
@app.route('/',methods=['POST','GET'])
def return_main_page():return render_template('index.html')
用戶請求中斷的請求函數
首先通過標志位(run_task)檢測模型是否在跑,如果檢測到模型正在運行,就把標志位設為False,然后把進度歸0
@app.route('/stopRun',methods=['GET','POST'])
def stopRun():global run_taskglobal scheduleif run_task:run_task=Falseschedule=0return jsonify({'data':'正在停止任務'})else:return jsonify({'data':'當前沒有任務運行'})
進度查詢
這里設置當進度為95%時,就停止查詢。
@app.route('/progress',methods=['GET'])
def progress():@stream_with_contextdef generate():global run_taskratio = schedulewhile ratio < 95 and run_task:yield "data:" + str(ratio) + "\n\n"ratio = scheduletime.sleep(5)return Response(generate(), mimetype='text/event-stream')
一個檢測的入口函數
def video_process(video_path):return TrackShrimp(video_path,'./model.onnx').run()
接收用戶上傳文件的函數
一旦用戶上傳文件,就設置運行標志位為True,然后將文件保存,再送入模型運行接口函數,當用戶請求終止時,results為None,所以使用if else進行區分。模型結果出來后就把標志位設為False,同時將數據傳到前端
@app.route('/shrimp',methods=['POST'])
def shrimp_track():global run_taskrun_task=Truefile=request.files.get('video')filename = secure_filename(file.filename)video_path=os.path.join(app.config['UPLOAD_FOLDER'],filename)file.save(video_path)try:results=video_process(video_path)if results is not None:distance,position_data=resultsdata = {'distance': distance,'position_data': position_data}run_task=Falsereturn jsonify(data)else:return jsonify({'data':'任務被終止'})except Exception as e:print('error:',e)return jsonify({'data':'任務被終止'})finally:if os.path.exists(video_path):os.remove(video_path)
接著就是啟動所有代碼了,為了調試方便,我寫了2份代碼,一份用于調試,一份用于成品
if __name__=='__main__':# 啟動前端webview.create_window('鰲蝦軌跡偵測',url=app,width=900,height=600)webview.start()
if __name__=='__main__':# 啟動后端 flask_thread = threading.Thread(target=run_flask) flask_thread.start()time.sleep(1)# 啟動前端webview.create_window('鰲蝦軌跡偵測',url=f'http://127.0.0.1:5000',width=900,height=600)webview.start()
pyinstaller打包
進入項目目錄,命令行輸入
piinstaller -D -w main.py
找到生成的main.spec文件,按如下修改
# -*- mode: python ; coding: utf-8 -*-a = Analysis(['main.py'],pathex=[],binaries=[],datas=[('templates/','templates/'),('static/','static/'),('venv/Lib/site-packages/onnxruntime/capi/onnxruntime_providers_shared.dll','onnxruntime/capi/'),('venv/Lib/site-packages/onnxruntime/capi/onnxruntime_providers_cuda.dll','onnxruntime/capi/')],hiddenimports=[],hookspath=[],hooksconfig={},runtime_hooks=[],excludes=[],noarchive=False,optimize=0,
)
pyz = PYZ(a.pure)exe = EXE(pyz,a.scripts,[],exclude_binaries=True,name='main',debug=False,bootloader_ignore_signals=False,strip=False,upx=True,console=False,disable_windowed_traceback=False,argv_emulation=False,target_arch=None,codesign_identity=None,entitlements_file=None,icon='1.ico'
)
coll = COLLECT(exe,a.binaries,a.datas,strip=False,upx=True,upx_exclude=[],name='main',
)
在項目目錄下放置一個圖標命名為1.ico,最好是48*48像素
然后命令行運行
pyinstaller main.spec
然后在venv中找到 onnxruntime_gpu-1.18.1.dist-info 文件夾,復制到 dist/main/_internal 中
同時在cuDNN中找到如下幾個動態鏈接庫,復制到 dist/main/_internal 中
cudnn_ops_infer64_8.dll
cudnn_cnn_infer64_8.dll
cudnn_adv_infer64_8.dll
cudnn64_8.dll
cudart64_110.dll
cublasLt64_11.dll
cublas64_11.dll
cufft64_10.dll
然后將model.onnx放到 dist/main/ ,并在該目錄創建一個目錄temp
最后處理的結果如下
XXXdistmain_internalmain.exemodel.onnxtemp
生成安裝包
使用into setup軟件,并在網站找到中文的語言包下載為 Chinese.isl 文件,放到intosetup軟件安裝目錄的 Languages 文件夾下
接著如圖所示
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
取消立即編譯,先進入文件里修改一些東西
修改成下面這樣?
點擊編譯
然后就生成了安裝包,就可以在任何win10,win11電腦里用CPU跑了,如果安裝的電腦?有顯卡和CUDA并把CUDA添加到了環境變量,就可以用GPU跑了