一、系統概述
該系統基于計算機視覺技術,實現對視頻或攝像頭畫面中的人員進行檢測、跟蹤,并生成軌跡數據。支持透視變換校準(鳥瞰圖顯示)、多目標跟蹤、軌跡存儲及視頻錄制功能,適用于安防監控、行為分析等場景。
二、依賴庫
python
運行
import cv2 # 計算機視覺處理(OpenCV庫)
import numpy as np # 數值計算
import time # 時間處理
import os # 文件與目錄操作
from datetime import datetime # 日期時間處理
三、類定義:PersonTracker
3.1 構造函數?__init__
功能
初始化人員跟蹤器,配置視頻源、輸出參數、背景減除器及跟蹤參數。
參數說明
參數名 | 類型 | 默認值 | 描述 |
---|---|---|---|
video_source | int/str | 0 | 視頻源(0 為默認攝像頭,或指定視頻文件路徑) |
save_video | bool | False | 是否保存處理后的視頻 |
show_warped | bool | True | 是否顯示透視變換后的鳥瞰圖 |
內部屬性
- 視頻源與基礎參數:
cap
:視頻捕獲對象(cv2.VideoCapture
實例)frame_width
/frame_height
:視頻幀寬高fps
:幀率
- 輸出配置:
output_folder
:輸出文件夾(默認output
)out
:視頻寫入對象(cv2.VideoWriter
實例,僅當save_video=True
時創建)
- 背景減除:
fgbg
:使用MOG2
算法的背景減除器,支持陰影檢測
- 跟蹤參數:
min_contour_area
/max_contour_area
:過濾輪廓的面積閾值(單位像素)trajectories
:存儲軌跡的字典(鍵為人員 ID,值為軌跡信息)max_disappeared_frames
:允許目標消失的最大幀數(超過則刪除軌跡)max_distance
:軌跡匹配的最大距離(像素)
- 透視變換:
perspective_transform
:透視變換矩陣(校準后生成)warped_width
/warped_height
:鳥瞰圖尺寸(寬度固定 500,高度與原始幀一致)
3.2 方法列表
3.2.1?calibrate_perspective()
- 功能:通過鼠標點擊選擇 4 個點,校準透視變換矩陣,生成鳥瞰圖。
- 操作說明:
- 顯示視頻第一幀,按順序點擊左上、右上、右下、左下四個點,形成矩形區域。
- 按
q
鍵退出校準。
- 返回值:
bool
(True
為校準成功,False
為取消或失敗)
3.2.2?detect_persons(frame)
- 功能:在輸入幀中檢測人員,返回檢測結果和二值化掩碼。
- 輸入:
frame
(BGR 格式圖像) - 處理流程:
- 應用背景減除,生成前景掩碼。
- 形態學操作(開運算 + 閉運算)去除噪聲。
- 查找輪廓,過濾面積不符合閾值的輪廓。
- 計算每個輪廓的中心點和邊界框。
- 返回值:
(persons, thresh)
,其中:persons
:檢測到的人員列表(每個元素為字典,包含bbox
、center
、contour
、area
)thresh
:二值化掩碼圖像
3.2.3?track_persons(detected_persons)
- 功能:根據檢測結果更新人員軌跡。
- 輸入:
detected_persons
(detect_persons
返回的人員列表) - 算法邏輯:
- 計算現有軌跡與新檢測的匹配距離(歐氏距離),優先匹配近距離目標。
- 未匹配的軌跡:若連續消失超過
max_disappeared_frames
,則刪除。 - 未匹配的檢測:創建新軌跡,分配唯一 ID。
3.2.4?draw_results(frame, persons, thresh)
- 功能:在圖像上繪制檢測框、軌跡、ID 及統計信息,支持鳥瞰圖顯示。
- 輸入:
frame
:原始幀persons
:檢測到的人員列表thresh
:二值化掩碼(未使用,僅保留接口)
- 輸出:繪制后的結果圖像(若
show_warped=True
,則為原始幀與鳥瞰圖的橫向拼接圖)
3.2.5?save_trajectories()
- 功能:將當前所有軌跡數據保存到文本文件,包含 ID、起始時間、軌跡點坐標等。
- 存儲路徑:
output_folder/trajectories_時間戳.txt
3.2.6?run()
- 功能:運行跟蹤主循環,處理視頻流并實時顯示結果。
- 操作說明:
- 按
q
鍵退出程序。 - 按
s
鍵保存當前軌跡數據。
- 按
- 流程:
- 調用
calibrate_perspective()
進行透視校準(可選)。 - 逐幀讀取視頻,檢測、跟蹤人員,繪制結果。
- 釋放資源并關閉窗口。
- 調用
四、主程序入口
python
運行
if __name__ == "__main__":tracker = PersonTracker(video_source=0, # 0為攝像頭,或指定視頻文件路徑(如"video.mp4")save_video=True, # 啟用視頻錄制show_warped=True # 顯示鳥瞰圖)tracker.run()
五、使用說明
5.1 環境配置
- 安裝依賴庫:
bash
pip install opencv-python numpy
- 確保攝像頭或視頻文件可用。
5.2 透視校準操作
- 運行程序后,會彈出窗口提示選擇 4 個點。
- 按順序點擊視頻中的矩形區域四角(如地面區域),生成鳥瞰圖。
- 校準完成后,右側會顯示鳥瞰圖中的軌跡。
5.3 輸出文件
- 視頻文件:若
save_video=True
,生成output/tracking_時間戳.avi
。 - 軌跡文件:按
s
鍵生成output/trajectories_時間戳.txt
,包含各 ID 的坐標序列。
六、參數調整建議
參數名 | 作用 | 調整場景 |
---|---|---|
min_contour_area | 過濾小目標(如噪聲) | 目標較小時調小,反之調大 |
max_contour_area | 過濾大目標(如多人重疊) | 目標較大時調大,反之調小 |
max_disappeared_frames | 目標消失后保留軌跡的幀數 | 目標運動間隔較長時調大 |
max_distance | 軌跡匹配的最大允許距離 | 目標運動速度快時調大 |
warped_width | 鳥瞰圖寬度 | 顯示區域寬窄調整 |
七、注意事項
- 背景減除器
MOG2
需要一定時間學習背景(前幾秒可能檢測不穩定)。 - 透視校準的四點應選擇實際場景中的矩形區域(如地面邊框),以確保鳥瞰圖坐標準確。
- 若視頻幀率較低,可嘗試降低
warped_width
或關閉show_warped
以減少計算量。
完成代碼
import cv2
import numpy as np
import time
import os
from datetime import datetimeclass PersonTracker:def __init__(self, video_source=0, save_video=False, show_warped=True):"""初始化人員跟蹤器"""# 視頻源設置self.video_source = video_sourceself.cap = cv2.VideoCapture(video_source)if not self.cap.isOpened():raise ValueError("無法打開視頻源", video_source)# 獲取視頻的寬度、高度和幀率self.frame_width = int(self.cap.get(3))self.frame_height = int(self.cap.get(4))self.fps = self.cap.get(cv2.CAP_PROP_FPS)# 輸出設置self.save_video = save_videoself.output_folder = "output"self.show_warped = show_warped# 創建輸出文件夾if not os.path.exists(self.output_folder):os.makedirs(self.output_folder)# 背景減除器self.fgbg = cv2.createBackgroundSubtractorMOG2(history=500, varThreshold=100, detectShadows=True)# 人員檢測參數self.min_contour_area = 1000 # 最小輪廓面積self.max_contour_area = 50000 # 最大輪廓面積# 軌跡存儲self.trajectories = {} # 存儲每個人的軌跡self.next_person_id = 1 # 下一個可用的人員IDself.max_disappeared_frames = 10 # 最大消失幀數self.max_distance = 100 # 最大匹配距離# 透視變換參數self.perspective_transform = Noneself.warped_width = 500self.warped_height = self.frame_height # 與原始幀高度一致# 錄制設置self.out = Noneif save_video:timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")output_path = os.path.join(self.output_folder, f"tracking_{timestamp}.avi")fourcc = cv2.VideoWriter_fourcc(*'XVID')self.out = cv2.VideoWriter(output_path, fourcc, self.fps, (self.frame_width, self.frame_height))def calibrate_perspective(self):"""校準透視變換,創建鳥瞰圖"""print("請在圖像中選擇4個點,形成一個矩形區域,用于透視變換")print("按順序點擊:左上、右上、右下、左下")# 讀取一幀用于選擇點ret, frame = self.cap.read()if not ret:print("無法讀取視頻幀")return False# 創建窗口并設置鼠標回調cv2.namedWindow("選擇透視變換點 (按 'q' 退出)")points = []def click_event(event, x, y, flags, param):if event == cv2.EVENT_LBUTTONDOWN:points.append((x, y))cv2.circle(frame, (x, y), 5, (0, 255, 0), -1)cv2.imshow("選擇透視變換點 (按 'q' 退出)", frame)cv2.setMouseCallback("選擇透視變換點 (按 'q' 退出)", click_event)# 顯示圖像并等待點擊cv2.imshow("選擇透視變換點 (按 'q' 退出)", frame)while len(points) < 4:key = cv2.waitKey(1) & 0xFFif key == ord('q'):cv2.destroyAllWindows()return Falsecv2.destroyAllWindows()# 定義目標矩形src = np.float32(points)dst = np.float32([[0, 0],[self.warped_width, 0],[self.warped_width, self.warped_height],[0, self.warped_height]])# 計算透視變換矩陣self.perspective_transform = cv2.getPerspectiveTransform(src, dst)return Truedef detect_persons(self, frame):"""檢測圖像中的人物"""# 應用背景減除fgmask = self.fgbg.apply(frame)# 圖像預處理_, thresh = cv2.threshold(fgmask, 127, 255, cv2.THRESH_BINARY)kernel = np.ones((5, 5), np.uint8)thresh = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel, iterations=2)thresh = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel, iterations=3)# 查找輪廓contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)persons = []for contour in contours:area = cv2.contourArea(contour)if area < self.min_contour_area or area > self.max_contour_area:continue# 計算邊界框x, y, w, h = cv2.boundingRect(contour)center = (int(x + w/2), int(y + h/2))# 計算輪廓的中心點M = cv2.moments(contour)if M["m00"] != 0:cX = int(M["m10"] / M["m00"])cY = int(M["m01"] / M["m00"])center = (cX, cY)persons.append({'bbox': (x, y, w, h),'center': center,'contour': contour,'area': area})return persons, threshdef track_persons(self, detected_persons):"""跟蹤檢測到的人員"""# 計算當前檢測點與現有軌跡的距離unmatched_tracks = list(self.trajectories.keys())unmatched_detections = list(range(len(detected_persons)))matches = []# 計算所有可能的匹配for track_id in self.trajectories:trajectory = self.trajectories[track_id]last_position = trajectory['positions'][-1]min_distance = float('inf')min_index = -1for i, person in enumerate(detected_persons):if i in unmatched_detections:distance = np.sqrt((last_position[0] - person['center'][0])**2 + (last_position[1] - person['center'][1])**2)if distance < min_distance and distance < self.max_distance:min_distance = distancemin_index = i# 如果找到匹配if min_index != -1:matches.append((track_id, min_index, min_distance))# 按距離排序,優先處理距離近的匹配matches.sort(key=lambda x: x[2])# 應用匹配for match in matches:track_id, detection_index, _ = matchif track_id in unmatched_tracks and detection_index in unmatched_detections:# 更新軌跡self.trajectories[track_id]['positions'].append(detected_persons[detection_index]['center'])self.trajectories[track_id]['last_seen'] = 0self.trajectories[track_id]['bbox'] = detected_persons[detection_index]['bbox']# 從待匹配列表中移除unmatched_tracks.remove(track_id)unmatched_detections.remove(detection_index)# 處理未匹配的軌跡for track_id in unmatched_tracks:self.trajectories[track_id]['last_seen'] += 1if self.trajectories[track_id]['last_seen'] > self.max_disappeared_frames:del self.trajectories[track_id]# 處理未匹配的檢測結果for detection_index in unmatched_detections:# 創建新軌跡self.trajectories[self.next_person_id] = {'positions': [detected_persons[detection_index]['center']],'last_seen': 0,'bbox': detected_persons[detection_index]['bbox'],'start_time': time.time()}self.next_person_id += 1def draw_results(self, frame, persons, thresh):"""在圖像上繪制檢測和跟蹤結果"""output = frame.copy()# 繪制檢測到的人物for person in persons:x, y, w, h = person['bbox']cv2.rectangle(output, (x, y), (x + w, y + h), (0, 255, 0), 2)cv2.circle(output, person['center'], 5, (0, 0, 255), -1)# 繪制軌跡for track_id, trajectory in self.trajectories.items():positions = trajectory['positions']# 繪制軌跡線for i in range(1, len(positions)):cv2.line(output, positions[i-1], positions[i], (255, 0, 0), 2)# 繪制軌跡點for pos in positions:cv2.circle(output, pos, 3, (255, 0, 0), -1)# 繪制ID和軌跡長度if len(positions) > 0:last_pos = positions[-1]cv2.putText(output, f"ID: {track_id}", (last_pos[0] + 10, last_pos[1] - 20),cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 0, 0), 2)cv2.putText(output, f"Points: {len(positions)}", (last_pos[0] + 10, last_pos[1]),cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 0, 0), 2)# 顯示統計信息cv2.putText(output, f"Persons: {len(self.trajectories)}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)cv2.putText(output, f"FPS: {int(self.fps)}", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)# 創建結果顯示窗口if self.show_warped and self.perspective_transform is not None:# 創建鳥瞰圖warped = cv2.warpPerspective(output, self.perspective_transform, (self.warped_width, self.warped_height))# 在鳥瞰圖上繪制軌跡for track_id, trajectory in self.trajectories.items():positions = trajectory['positions']for i in range(1, len(positions)):# 將原始坐標轉換為鳥瞰圖坐標pos1 = np.array([[positions[i-1][0], positions[i-1][1]]], dtype=np.float32).reshape(-1, 1, 2)pos2 = np.array([[positions[i][0], positions[i][1]]], dtype=np.float32).reshape(-1, 1, 2)warped_pos1 = cv2.perspectiveTransform(pos1, self.perspective_transform)[0][0]warped_pos2 = cv2.perspectiveTransform(pos2, self.perspective_transform)[0][0]cv2.line(warped, (int(warped_pos1[0]), int(warped_pos1[1])),(int(warped_pos2[0]), int(warped_pos2[1])), (255, 0, 0), 2)# 合并顯示combined = np.hstack((output, warped))return combinedreturn outputdef save_trajectories(self):"""保存軌跡數據到文件"""timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")output_path = os.path.join(self.output_folder, f"trajectories_{timestamp}.txt")with open(output_path, 'w') as f:f.write("Person Trajectories\n")f.write(f"Recorded on: {datetime.now()}\n\n")for track_id, trajectory in self.trajectories.items():f.write(f"Person ID: {track_id}\n")f.write(f"Start Time: {time.ctime(trajectory['start_time'])}\n")f.write(f"Duration: {time.time() - trajectory['start_time']:.2f} seconds\n")f.write(f"Trajectory Points: {len(trajectory['positions'])}\n")f.write("Positions:\n")for pos in trajectory['positions']:f.write(f" ({pos[0]}, {pos[1]})\n")f.write("\n")print(f"軌跡數據已保存到: {output_path}")def run(self):"""運行人員跟蹤系統"""# 首先進行透視校準if not self.calibrate_perspective():print("透視校準失敗,使用原始視角")print("開始人員跟蹤...")print("按 'q' 退出,按 's' 保存軌跡數據")frame_count = 0start_time = time.time()while True:ret, frame = self.cap.read()if not ret:break# 計算實際幀率frame_count += 1if frame_count % 10 == 0:elapsed_time = time.time() - start_timeself.fps = frame_count / elapsed_time# 檢測人員persons, thresh = self.detect_persons(frame)# 跟蹤人員self.track_persons(persons)# 繪制結果result = self.draw_results(frame, persons, thresh)# 保存視頻if self.save_video:self.out.write(result)# 顯示結果cv2.imshow("人員軌跡跟蹤系統 (按 'q' 退出,按 's' 保存軌跡)", result)# 按鍵處理key = cv2.waitKey(1) & 0xFFif key == ord('q'):breakelif key == ord('s'):self.save_trajectories()# 釋放資源self.cap.release()if self.out:self.out.release()cv2.destroyAllWindows()print("人員跟蹤系統已關閉")# 主程序入口
if __name__ == "__main__":# 創建人員跟蹤器實例tracker = PersonTracker(video_source=0, # 0表示默認攝像頭,也可以指定視頻文件路徑save_video=True, # 是否保存視頻show_warped=True # 是否顯示鳥瞰圖)# 運行跟蹤器tracker.run()