海康威視二次開發相機管理
這段代碼基于python開發的,用了opencv的一些庫函數。實現了一個完整的海康機器人相機管理工具,支持多相機連接、參數配置、圖像采集和實時顯示功能。目前USB相機測試無誤,除了丟一些包。
1. 主要類結構
HKCameraManager
類
這是整個系統的核心類,負責管理所有相機的生命周期和操作。
全局可調參數
# 相機參數配置
EXPOSURE_MODE = 0 # 曝光模式:0:關閉;1:一次;2:自動曝光
EXPOSURE_TIME = 40000 # 曝光時間
GAIN_VALUE = 10 #增益值
ReverseX_enable = True # 水平翻轉
ReverseY_enable = True # 垂直翻轉
#圖像顯示大小
scale_width = 0.2 # 寬度縮放因子
scale_height = 0.2 # 高度縮放因子
PacketSizeLog = True # 啟用丟包信息檢測
主要屬性:
cameras
: 字典,存儲所有已連接相機的信息和句柄_last_error
: 記錄最后一次錯誤信息_running
: 字典,記錄每個相機的運行狀態_lock
: 線程鎖,保證線程安全_display_threads
: 字典,存儲每個相機的顯示線程_fps
: 字典,記錄每個相機的幀率
def __init__(self):"""初始化相機管理器"""self.cameras: Dict[int, Dict] = {} # 存儲所有相機實例和信息self._last_error: str = ""self._running = {} # 每個相機的運行狀態self._lock = threading.Lock()self._display_threads = {} # 每個相機的顯示線程self._fps = {} # 每個相機的FPS
2. 核心功能流程
2.1 設備枚舉
- 通過
enumerate_devices()
方法枚舉所有可用設備 - 支持GigE和USB兩種接口類型的相機
- 返回設備列表,包含型號、序列號、IP地址等信息
def enumerate_devices(self) -> Optional[List[dict]]:"""枚舉所有可用設備"""try:# 設置要枚舉的設備類型tlayer_type = MV_GIGE_DEVICE | MV_USB_DEVICE | MV_GENTL_CAMERALINK_DEVICE | MV_GENTL_CXP_DEVICE | MV_GENTL_XOF_DEVICE# 初始化設備列表結構體device_list = MV_CC_DEVICE_INFO_LIST()memset(byref(device_list), 0, sizeof(device_list))# 創建臨時相機實例用于枚舉temp_cam = MvCamera()# 枚舉設備ret = temp_cam.MV_CC_EnumDevices(tlayer_type, device_list)if ret != 0:self._log_error("枚舉設備", ret)return None# 檢查找到的設備數量if device_list.nDeviceNum == 0:print("未檢測到任何相機設備")return []devices = []for i in range(device_list.nDeviceNum):# 獲取設備信息指針device_info = cast(device_list.pDeviceInfo[i], POINTER(MV_CC_DEVICE_INFO)).contents# 根據傳輸層類型處理設備信息if device_info.nTLayerType == MV_GIGE_DEVICE:# GigE設備device_data = {'model': ctypes.string_at(device_info.SpecialInfo.stGigEInfo.chModelName).decode('utf-8'),'serial': ctypes.string_at(device_info.SpecialInfo.stGigEInfo.chSerialNumber).decode('utf-8'),'ip': ".".join(map(str, device_info.SpecialInfo.stGigEInfo.nCurrentIp)),'type': 'GigE','index': i}elif device_info.nTLayerType == MV_USB_DEVICE:# USB設備# 修正USB設備信息獲取方式usb_info = device_info.SpecialInfo.stUsb3VInfo# 使用ctypes的string_at函數獲取字符串model_name = string_at(usb_info.chModelName).decode('utf-8', errors='ignore')serial_num = string_at(usb_info.chSerialNumber).decode('utf-8', errors='ignore')device_data = {'model': model_name.strip('\x00'),'serial': serial_num.strip('\x00'),'type': 'USB','index': i}else:continuedevices.append(device_data)return devicesexcept Exception as e:self._last_error = f"枚舉設備時發生異常: {str(e)}"print(self._last_error)import tracebacktraceback.print_exc() # 打印完整的錯誤堆棧return None
2.2 相機連接
connect_camera()
方法連接指定索引的相機- 步驟:
- 檢查相機是否已連接
- 枚舉設備并選擇指定索引的設備
- 創建相機句柄
- 打開設備
- 配置相機參數(曝光、增益等)
- 開始采集圖像
- 存儲相機信息到字典中
def connect_camera(self, device_index: int) -> bool:"""連接指定索引的相機設備"""try:with self._lock:if device_index in self.cameras and self.cameras[device_index]['connected']:print(f"相機 {device_index} 已連接")return True# 枚舉設備tlayerType = MV_GIGE_DEVICE | MV_USB_DEVICE | MV_GENTL_CAMERALINK_DEVICE | MV_GENTL_CXP_DEVICE | MV_GENTL_XOF_DEVICEdeviceList = MV_CC_DEVICE_INFO_LIST()memset(byref(deviceList), 0, sizeof(deviceList))# 實例化相機cam = MvCamera()# 枚舉設備ret = cam.MV_CC_EnumDevices(tlayerType, deviceList)if ret != 0:self._log_error("枚舉設備", ret)return Falseif deviceList.nDeviceNum == 0:self._last_error = "未找到任何設備"print(self._last_error)return Falseif device_index >= deviceList.nDeviceNum:self._last_error = f"設備索引超出范圍,最大可用索引: {deviceList.nDeviceNum - 1}"print(self._last_error)return False# 選擇指定設備stDeviceList = cast(deviceList.pDeviceInfo[device_index], POINTER(MV_CC_DEVICE_INFO)).contents# 創建句柄ret = cam.MV_CC_CreateHandleWithoutLog(stDeviceList)if ret != MV_OK:self._log_error("創建句柄", ret)return False# 獲取設備信息if stDeviceList.nTLayerType == MV_GIGE_DEVICE:model_name = ctypes.string_at(stDeviceList.SpecialInfo.stGigEInfo.chModelName).decode('utf-8')serial_num = ctypes.string_at(stDeviceList.SpecialInfo.stGigEInfo.chSerialNumber).decode('utf-8')ip_addr = ".".join(map(str, stDeviceList.SpecialInfo.stGigEInfo.nCurrentIp))device_type = 'GigE'print(f"正在連接設備 {device_index}: {model_name} (SN: {serial_num}, IP: {ip_addr}。GiGe)")else:usb_info = stDeviceList.SpecialInfo.stUsb3VInfomodel_name = string_at(usb_info.chModelName).decode('utf-8', errors='ignore')serial_num = string_at(usb_info.chSerialNumber).decode('utf-8', errors='ignore')ip_addr = Nonedevice_type = 'USB'print(f"正在連接設備 {device_index}: {model_name} (SN: {serial_num}, USB-3.0)")# 打開相機ret = cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)if ret != MV_OK:# 特別處理USB相機連接問題if stDeviceList.nTLayerType == MV_USB_DEVICE:# 嘗試設置USB傳輸大小(海康USB相機常見問題)ret = cam.MV_CC_SetIntValue("TransferSize", 0x100000)if ret == MV_OK:ret = cam.MV_CC_SetIntValue("NumTransferBuffers", 8)if ret == MV_OK:ret = cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)if ret != 0:self._log_error("打開設備", ret)return False# 配置相機參數if not self._configure_camera(cam):cam.MV_CC_CloseDevice()cam.MV_CC_DestroyHandle()return False# 開始取流ret = cam.MV_CC_StartGrabbing()if ret != 0:self._log_error("開始取流", ret)cam.MV_CC_CloseDevice()cam.MV_CC_DestroyHandle()return False# 存儲相機信息 - 確保所有必要字段都正確設置self.cameras[device_index] = {'handle': cam,'model': model_name.strip('\x00') if isinstance(model_name, str) else model_name,'serial': serial_num.strip('\x00') if isinstance(serial_num, str) else serial_num,'type': device_type,'ip': ip_addr,'connected': True, # 確保連接狀態正確設置為True'frame_count': 0,'last_frame_time': time.time()}# 初始化FPS計數器self._fps[device_index] = 0print(f"相機 {device_index} 連接成功: {model_name} (SN: {serial_num})")return Trueexcept Exception as e:self._last_error = f"連接相機時發生異常: {str(e)}"print(self._last_error)if 'cam' in locals():cam.MV_CC_DestroyHandle()return False
2.3 相機參數配置
_configure_camera()
私有方法處理相機參數配置- 可配置項:
- 觸發模式(連續采集)
- 曝光模式(手動/自動)
- 增益設置
- 圖像翻轉(水平/垂直)
def _configure_camera(self, cam: MvCamera) -> bool:"""配置相機參數"""try:# 設置觸發方式為連續采集ret = cam.MV_CC_SetEnumValue("TriggerMode", MV_TRIGGER_MODE_OFF)if ret != 0:self._log_error("設置觸發模式", ret)return False# 設置曝光模式match EXPOSURE_MODE:case 0: # 手動設置參數ret = cam.MV_CC_SetEnumValue("ExposureAuto", MV_EXPOSURE_AUTO_MODE_OFF)if ret != 0:print("警告: 關閉自動曝光設置失敗,將采用自動曝光")# 設置曝光時間exposure = float(EXPOSURE_TIME)ret = cam.MV_CC_SetFloatValue("ExposureTime", exposure)if ret != 0:raise RuntimeError(f"Set ExposureTime failed with error {ret}")case 1: # 一次曝光ret = cam.MV_CC_SetEnumValue("ExposureAuto", MV_EXPOSURE_AUTO_MODE_ONCE)if ret != 0:print("警告: 一次曝光設置失敗,將繼續使用手動曝光")case 2: # 自動曝光ret = cam.MV_CC_SetEnumValue("ExposureAuto", MV_EXPOSURE_AUTO_MODE_CONTINUOUS)if ret != 0:print("警告: 自動曝光設置失敗,將繼續使用手動曝光")# 設置增益ret = cam.MV_CC_SetEnumValue("GainAuto", MV_GAIN_MODE_OFF)if ret != 0:print("警告: 手動增益設置失敗,將采用自動增益")gain_val = float(GAIN_VALUE)ret = cam.MV_CC_SetFloatValue("Gain", gain_val)if ret != 0:raise RuntimeError(f"Set gain failed with error {ret}")# 設置水平翻轉flip = c_int(1 if ReverseX_enable else 0)ret = cam.MV_CC_SetBoolValue("ReverseX", flip)if ret != 0:raise RuntimeError(f"Set horizontal flip failed with error {ret}")print(f"Horizontal flip {'enabled' if ReverseX_enable else 'disabled'}")# 設置垂直翻轉flip = c_int(1 if ReverseY_enable else 0)ret = cam.MV_CC_SetBoolValue("ReverseY", flip)if ret != 0:raise RuntimeError(f"Set vertical flip failed with error {ret}")print(f"Vertical flip {'enabled' if ReverseY_enable else 'disabled'}")return Trueexcept Exception as e:self._last_error = f"配置相機時發生異常: {str(e)}"print(self._last_error)return False
2.4 圖像獲取
get_image()
方法獲取指定相機的圖像- 步驟:
- 獲取圖像緩沖區
- 復制圖像數據
- 根據像素類型處理圖像數據
- 轉換為灰度圖像
- 釋放圖像緩沖區
- 更新幀統計信息
def get_image(self, device_index: int, timeout: int = 300) -> Optional[Tuple[np.ndarray, np.ndarray]]:"""獲取指定相機的圖像并返回原始圖像和灰度圖像"""with self._lock:if device_index not in self.cameras or not self.cameras[device_index]['connected']:self._last_error = f"相機 {device_index} 未連接"print(self._last_error)return Nonecam = self.cameras[device_index]['handle']try:# 初始化幀輸出結構stOutFrame = MV_FRAME_OUT()memset(byref(stOutFrame), 0, sizeof(stOutFrame))# 獲取圖像ret = cam.MV_CC_GetImageBuffer(stOutFrame, timeout)if ret != 0:self._log_error(f"相機 {device_index} 獲取圖像", ret)return None# 獲取圖像信息frame_info = stOutFrame.stFrameInfonPayloadSize = frame_info.nFrameLenpData = stOutFrame.pBufAddr# 打印調試信息# print(f"相機 {device_index} 圖像信息: "# f"Width={frame_info.nWidth}, Height={frame_info.nHeight}, "# f"PixelType={frame_info.enPixelType}, Size={nPayloadSize}")# 復制圖像數據data_buf = (c_ubyte * nPayloadSize)()cdll.msvcrt.memcpy(byref(data_buf), pData, nPayloadSize)# 轉換為numpy數組temp = np.frombuffer(data_buf, dtype=np.uint8)# 獲取圖像參數width = frame_info.nWidthheight = frame_info.nHeightpixel_type = frame_info.enPixelType# 根據像素類型處理圖像img = self._process_image_data(temp, width, height, pixel_type)if img is None:if PacketSizeLog:print(f"相機 {device_index} 圖像處理失敗 - 數據大小: {len(temp)}, "f"預期大小: {width * height * (3 if pixel_type in [PixelType_Gvsp_RGB8_Packed, PixelType_Gvsp_BGR8_Packed] else 1)}")cam.MV_CC_FreeImageBuffer(stOutFrame)return None# 轉換為灰度圖像if len(img.shape) == 2: # 已經是灰度圖像gray = img.copy()else:gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)# 釋放圖像緩存cam.MV_CC_FreeImageBuffer(stOutFrame)# 更新幀統計信息self.cameras[device_index]['frame_count'] += 1self.cameras[device_index]['last_frame_time'] = time.time()return img, grayexcept Exception as e:self._last_error = f"相機 {device_index} 獲取圖像時發生異常: {str(e)}"print(self._last_error)if 'stOutFrame' in locals():cam.MV_CC_FreeImageBuffer(stOutFrame)return None
2.5 圖像顯示
start_display()
啟動相機實時顯示- 為每個相機創建獨立的顯示線程
- 顯示線程中:
- 循環獲取圖像
- 計算并顯示FPS
- 顯示圖像到窗口
- 處理用戶按鍵(ESC退出)
def start_display(self,device_index: int) -> bool:"""啟動所有已連接相機的實時顯示"""with self._lock: # 添加線程鎖# 檢查相機是否已連接if device_index not in self.cameras or not self.cameras[device_index]['connected']:print(f"相機 {device_index} 未連接,無法啟動顯示")return Falseif device_index in self._running and self._running[device_index]:print(f"相機 {device_index} 顯示已啟動")return True# 設置運行標志self._running[device_index] = True# 創建并啟動顯示線程display_thread = threading.Thread(target=self._display_thread,args=(device_index,),daemon=True)self._display_threads[device_index] = display_threaddisplay_thread.start()print(f"相機 {device_index} 顯示線程已啟動")return True
2.6 斷開連接
disconnect_camera()
斷開單個相機連接disconnect_all()
斷開所有相機連接- 釋放所有資源
def disconnect_camera(self, device_index: int) -> bool:"""斷開指定相機的連接"""with self._lock:if device_index not in self.cameras or not self.cameras[device_index]['connected']:print(f"相機 {device_index} 未連接")return Truecam = self.cameras[device_index]['handle']try:success = True# 停止取流ret = cam.MV_CC_StopGrabbing()if ret != 0:self._log_error(f"相機 {device_index} 停止取流", ret)success = False# 關閉設備ret = cam.MV_CC_CloseDevice()if ret != 0:self._log_error(f"相機 {device_index} 關閉設備", ret)success = False# 銷毀句柄ret = cam.MV_CC_DestroyHandle()if ret != 0:self._log_error(f"相機 {device_index} 銷毀句柄", ret)success = Falseif success:print(f"相機 {device_index} 已成功斷開連接")self.cameras[device_index]['connected'] = False# 從字典中移除相機del self.cameras[device_index]# 停止顯示線程if device_index in self._running:self._running[device_index] = Falsereturn successexcept Exception as e:self._last_error = f"斷開相機 {device_index} 連接時發生異常: {str(e)}"print(self._last_error)return Falsedef disconnect_all(self) -> None:"""斷開所有相機的連接"""self.stop_display() # 先停止所有顯示for device_index in list(self.cameras.keys()):if self.cameras[device_index]['connected']:self.disconnect_camera(device_index)
3. 目前程序的一些功能和特點如下
-
多線程支持:
- 每個相機的顯示使用獨立線程
- 使用線程鎖保證線程安全
-
錯誤處理:
- 詳細的錯誤日志記錄
- 異常捕獲和處理
-
相機參數配置:
- 支持多種曝光模式
- 可配置增益、翻轉等參數
-
圖像處理:
- 支持多種像素格式(Mono8, RGB8, BGR8等)
- 自動處理數據大小不匹配的情況
- 圖像縮放顯示
-
性能監控:
- 實時計算和顯示FPS
- 幀計數統計
4. 完整代碼如下
from HK_Camera.MvCameraControl_class import *
from ctypes import *
from typing import Optional, Tuple, List, Dict
import time
import cv2
import numpy as np
import threading# 相機參數配置
EXPOSURE_MODE = 0 # 曝光模式:0:關閉;1:一次;2:自動曝光
EXPOSURE_TIME = 40000 # 曝光時間
GAIN_VALUE = 10 #增益值
ReverseX_enable = True # 水平翻轉
ReverseY_enable = True # 垂直翻轉
#圖像顯示大小
scale_width = 0.2 # 寬度縮放因子
scale_height = 0.2 # 高度縮放因子
PacketSizeLog = True # 啟用丟包信息檢測class HKCameraManager:def __init__(self):"""初始化相機管理器"""self.cameras: Dict[int, Dict] = {} # 存儲所有相機實例和信息self._last_error: str = ""self._running = {} # 每個相機的運行狀態self._lock = threading.Lock()self._display_threads = {} # 每個相機的顯示線程self._fps = {} # 每個相機的FPS@propertydef last_error(self) -> str:"""獲取最后一次錯誤信息"""return self._last_errordef _log_error(self, operation: str, ret: int) -> None:"""記錄錯誤日志"""self._last_error = f"{operation}失敗,錯誤碼: 0x{ret:x}"print(self._last_error)def enumerate_devices(self) -> Optional[List[dict]]:"""枚舉所有可用設備"""try:# 設置要枚舉的設備類型tlayer_type = MV_GIGE_DEVICE | MV_USB_DEVICE | MV_GENTL_CAMERALINK_DEVICE | MV_GENTL_CXP_DEVICE | MV_GENTL_XOF_DEVICE# 初始化設備列表結構體device_list = MV_CC_DEVICE_INFO_LIST()memset(byref(device_list), 0, sizeof(device_list))# 創建臨時相機實例用于枚舉temp_cam = MvCamera()# 枚舉設備ret = temp_cam.MV_CC_EnumDevices(tlayer_type, device_list)if ret != 0:self._log_error("枚舉設備", ret)return None# 檢查找到的設備數量if device_list.nDeviceNum == 0:print("未檢測到任何相機設備")return []devices = []for i in range(device_list.nDeviceNum):# 獲取設備信息指針device_info = cast(device_list.pDeviceInfo[i], POINTER(MV_CC_DEVICE_INFO)).contents# 根據傳輸層類型處理設備信息if device_info.nTLayerType == MV_GIGE_DEVICE:# GigE設備device_data = {'model': ctypes.string_at(device_info.SpecialInfo.stGigEInfo.chModelName).decode('utf-8'),'serial': ctypes.string_at(device_info.SpecialInfo.stGigEInfo.chSerialNumber).decode('utf-8'),'ip': ".".join(map(str, device_info.SpecialInfo.stGigEInfo.nCurrentIp)),'type': 'GigE','index': i}elif device_info.nTLayerType == MV_USB_DEVICE:# USB設備# 修正USB設備信息獲取方式usb_info = device_info.SpecialInfo.stUsb3VInfo# 使用ctypes的string_at函數獲取字符串model_name = string_at(usb_info.chModelName).decode('utf-8', errors='ignore')serial_num = string_at(usb_info.chSerialNumber).decode('utf-8', errors='ignore')device_data = {'model': model_name.strip('\x00'),'serial': serial_num.strip('\x00'),'type': 'USB','index': i}else:continuedevices.append(device_data)return devicesexcept Exception as e:self._last_error = f"枚舉設備時發生異常: {str(e)}"print(self._last_error)import tracebacktraceback.print_exc() # 打印完整的錯誤堆棧return Nonedef connect_camera(self, device_index: int) -> bool:"""連接指定索引的相機設備"""try:with self._lock:if device_index in self.cameras and self.cameras[device_index]['connected']:print(f"相機 {device_index} 已連接")return True# 枚舉設備tlayerType = MV_GIGE_DEVICE | MV_USB_DEVICE | MV_GENTL_CAMERALINK_DEVICE | MV_GENTL_CXP_DEVICE | MV_GENTL_XOF_DEVICEdeviceList = MV_CC_DEVICE_INFO_LIST()memset(byref(deviceList), 0, sizeof(deviceList))# 實例化相機cam = MvCamera()# 枚舉設備ret = cam.MV_CC_EnumDevices(tlayerType, deviceList)if ret != 0:self._log_error("枚舉設備", ret)return Falseif deviceList.nDeviceNum == 0:self._last_error = "未找到任何設備"print(self._last_error)return Falseif device_index >= deviceList.nDeviceNum:self._last_error = f"設備索引超出范圍,最大可用索引: {deviceList.nDeviceNum - 1}"print(self._last_error)return False# 選擇指定設備stDeviceList = cast(deviceList.pDeviceInfo[device_index], POINTER(MV_CC_DEVICE_INFO)).contents# 創建句柄ret = cam.MV_CC_CreateHandleWithoutLog(stDeviceList)if ret != MV_OK:self._log_error("創建句柄", ret)return False# 獲取設備信息if stDeviceList.nTLayerType == MV_GIGE_DEVICE:model_name = ctypes.string_at(stDeviceList.SpecialInfo.stGigEInfo.chModelName).decode('utf-8')serial_num = ctypes.string_at(stDeviceList.SpecialInfo.stGigEInfo.chSerialNumber).decode('utf-8')ip_addr = ".".join(map(str, stDeviceList.SpecialInfo.stGigEInfo.nCurrentIp))device_type = 'GigE'print(f"正在連接設備 {device_index}: {model_name} (SN: {serial_num}, IP: {ip_addr}。GiGe)")else:usb_info = stDeviceList.SpecialInfo.stUsb3VInfomodel_name = string_at(usb_info.chModelName).decode('utf-8', errors='ignore')serial_num = string_at(usb_info.chSerialNumber).decode('utf-8', errors='ignore')ip_addr = Nonedevice_type = 'USB'print(f"正在連接設備 {device_index}: {model_name} (SN: {serial_num}, USB-3.0)")# 打開相機ret = cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)if ret != MV_OK:# 特別處理USB相機連接問題if stDeviceList.nTLayerType == MV_USB_DEVICE:# 嘗試設置USB傳輸大小(海康USB相機常見問題)ret = cam.MV_CC_SetIntValue("TransferSize", 0x100000)if ret == MV_OK:ret = cam.MV_CC_SetIntValue("NumTransferBuffers", 8)if ret == MV_OK:ret = cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)if ret != 0:self._log_error("打開設備", ret)return False# 配置相機參數if not self._configure_camera(cam):cam.MV_CC_CloseDevice()cam.MV_CC_DestroyHandle()return False# 開始取流ret = cam.MV_CC_StartGrabbing()if ret != 0:self._log_error("開始取流", ret)cam.MV_CC_CloseDevice()cam.MV_CC_DestroyHandle()return False# 存儲相機信息 - 確保所有必要字段都正確設置self.cameras[device_index] = {'handle': cam,'model': model_name.strip('\x00') if isinstance(model_name, str) else model_name,'serial': serial_num.strip('\x00') if isinstance(serial_num, str) else serial_num,'type': device_type,'ip': ip_addr,'connected': True, # 確保連接狀態正確設置為True'frame_count': 0,'last_frame_time': time.time()}# 初始化FPS計數器self._fps[device_index] = 0print(f"相機 {device_index} 連接成功: {model_name} (SN: {serial_num})")return Trueexcept Exception as e:self._last_error = f"連接相機時發生異常: {str(e)}"print(self._last_error)if 'cam' in locals():cam.MV_CC_DestroyHandle()return Falsedef _configure_camera(self, cam: MvCamera) -> bool:"""配置相機參數"""try:# 設置觸發方式為連續采集ret = cam.MV_CC_SetEnumValue("TriggerMode", MV_TRIGGER_MODE_OFF)if ret != 0:self._log_error("設置觸發模式", ret)return False# 設置曝光模式match EXPOSURE_MODE:case 0: # 手動設置參數ret = cam.MV_CC_SetEnumValue("ExposureAuto", MV_EXPOSURE_AUTO_MODE_OFF)if ret != 0:print("警告: 關閉自動曝光設置失敗,將采用自動曝光")# 設置曝光時間exposure = float(EXPOSURE_TIME)ret = cam.MV_CC_SetFloatValue("ExposureTime", exposure)if ret != 0:raise RuntimeError(f"Set ExposureTime failed with error {ret}")case 1: # 一次曝光ret = cam.MV_CC_SetEnumValue("ExposureAuto", MV_EXPOSURE_AUTO_MODE_ONCE)if ret != 0:print("警告: 一次曝光設置失敗,將繼續使用手動曝光")case 2: # 自動曝光ret = cam.MV_CC_SetEnumValue("ExposureAuto", MV_EXPOSURE_AUTO_MODE_CONTINUOUS)if ret != 0:print("警告: 自動曝光設置失敗,將繼續使用手動曝光")# 設置增益ret = cam.MV_CC_SetEnumValue("GainAuto", MV_GAIN_MODE_OFF)if ret != 0:print("警告: 手動增益設置失敗,將采用自動增益")gain_val = float(GAIN_VALUE)ret = cam.MV_CC_SetFloatValue("Gain", gain_val)if ret != 0:raise RuntimeError(f"Set gain failed with error {ret}")# 設置水平翻轉flip = c_int(1 if ReverseX_enable else 0)ret = cam.MV_CC_SetBoolValue("ReverseX", flip)if ret != 0:raise RuntimeError(f"Set horizontal flip failed with error {ret}")print(f"Horizontal flip {'enabled' if ReverseX_enable else 'disabled'}")# 設置垂直翻轉flip = c_int(1 if ReverseY_enable else 0)ret = cam.MV_CC_SetBoolValue("ReverseY", flip)if ret != 0:raise RuntimeError(f"Set vertical flip failed with error {ret}")print(f"Vertical flip {'enabled' if ReverseY_enable else 'disabled'}")return Trueexcept Exception as e:self._last_error = f"配置相機時發生異常: {str(e)}"print(self._last_error)return Falsedef get_image(self, device_index: int, timeout: int = 300) -> Optional[Tuple[np.ndarray, np.ndarray]]:"""獲取指定相機的圖像并返回原始圖像和灰度圖像"""with self._lock:if device_index not in self.cameras or not self.cameras[device_index]['connected']:self._last_error = f"相機 {device_index} 未連接"print(self._last_error)return Nonecam = self.cameras[device_index]['handle']try:# 初始化幀輸出結構stOutFrame = MV_FRAME_OUT()memset(byref(stOutFrame), 0, sizeof(stOutFrame))# 獲取圖像ret = cam.MV_CC_GetImageBuffer(stOutFrame, timeout)if ret != 0:self._log_error(f"相機 {device_index} 獲取圖像", ret)return None# 獲取圖像信息frame_info = stOutFrame.stFrameInfonPayloadSize = frame_info.nFrameLenpData = stOutFrame.pBufAddr# 打印調試信息# print(f"相機 {device_index} 圖像信息: "# f"Width={frame_info.nWidth}, Height={frame_info.nHeight}, "# f"PixelType={frame_info.enPixelType}, Size={nPayloadSize}")# 復制圖像數據data_buf = (c_ubyte * nPayloadSize)()cdll.msvcrt.memcpy(byref(data_buf), pData, nPayloadSize)# 轉換為numpy數組temp = np.frombuffer(data_buf, dtype=np.uint8)# 獲取圖像參數width = frame_info.nWidthheight = frame_info.nHeightpixel_type = frame_info.enPixelType# 根據像素類型處理圖像img = self._process_image_data(temp, width, height, pixel_type)if img is None:if PacketSizeLog:print(f"相機 {device_index} 圖像處理失敗 - 數據大小: {len(temp)}, "f"預期大小: {width * height * (3 if pixel_type in [PixelType_Gvsp_RGB8_Packed, PixelType_Gvsp_BGR8_Packed] else 1)}")cam.MV_CC_FreeImageBuffer(stOutFrame)return None# 轉換為灰度圖像if len(img.shape) == 2: # 已經是灰度圖像gray = img.copy()else:gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)# 釋放圖像緩存cam.MV_CC_FreeImageBuffer(stOutFrame)# 更新幀統計信息self.cameras[device_index]['frame_count'] += 1self.cameras[device_index]['last_frame_time'] = time.time()return img, grayexcept Exception as e:self._last_error = f"相機 {device_index} 獲取圖像時發生異常: {str(e)}"print(self._last_error)if 'stOutFrame' in locals():cam.MV_CC_FreeImageBuffer(stOutFrame)return Nonedef _process_image_data(self, data: np.ndarray, width: int, height: int, pixel_type: int) -> Optional[np.ndarray]:"""根據像素類型處理原始圖像數據"""try:if PacketSizeLog:# 首先檢查數據大小是否匹配預期expected_size = width * heightif pixel_type in [PixelType_Gvsp_Mono8, PixelType_Gvsp_RGB8_Packed, PixelType_Gvsp_BGR8_Packed]:if pixel_type == PixelType_Gvsp_Mono8:expected_size = width * heightelse:expected_size = width * height * 3if len(data) != expected_size:print(f"警告: 數據大小不匹配 (預期: {expected_size}, 實際: {len(data)}), 嘗試自動處理")# 嘗試自動計算正確的高度if pixel_type == PixelType_Gvsp_Mono8:actual_height = len(data) // widthif actual_height * width == len(data):return data.reshape((actual_height, width))else:actual_height = len(data) // (width * 3)if actual_height * width * 3 == len(data):img = data.reshape((actual_height, width, 3))if pixel_type == PixelType_Gvsp_RGB8_Packed:return cv2.cvtColor(img, cv2.COLOR_RGB2BGR)return imgreturn None# 正常處理流程if pixel_type == PixelType_Gvsp_Mono8:return data.reshape((height, width))elif pixel_type == PixelType_Gvsp_RGB8_Packed:img = data.reshape((height, width, 3))return cv2.cvtColor(img, cv2.COLOR_RGB2BGR)elif pixel_type == PixelType_Gvsp_BGR8_Packed:return data.reshape((height, width, 3))elif pixel_type in [PixelType_Gvsp_Mono10, PixelType_Gvsp_Mono12]:# 對于10位或12位圖像,需要進行位轉換img = data.view(np.uint16)img = (img >> (pixel_type - PixelType_Gvsp_Mono8)).astype(np.uint8)return img.reshape((height, width))else:self._last_error = f"不支持的像素格式: {pixel_type}"print(self._last_error)return Noneexcept Exception as e:self._last_error = f"圖像處理錯誤: {str(e)}"if PacketSizeLog:print(self._last_error)return Nonedef disconnect_camera(self, device_index: int) -> bool:"""斷開指定相機的連接"""with self._lock:if device_index not in self.cameras or not self.cameras[device_index]['connected']:print(f"相機 {device_index} 未連接")return Truecam = self.cameras[device_index]['handle']try:success = True# 停止取流ret = cam.MV_CC_StopGrabbing()if ret != 0:self._log_error(f"相機 {device_index} 停止取流", ret)success = False# 關閉設備ret = cam.MV_CC_CloseDevice()if ret != 0:self._log_error(f"相機 {device_index} 關閉設備", ret)success = False# 銷毀句柄ret = cam.MV_CC_DestroyHandle()if ret != 0:self._log_error(f"相機 {device_index} 銷毀句柄", ret)success = Falseif success:print(f"相機 {device_index} 已成功斷開連接")self.cameras[device_index]['connected'] = False# 從字典中移除相機del self.cameras[device_index]# 停止顯示線程if device_index in self._running:self._running[device_index] = Falsereturn successexcept Exception as e:self._last_error = f"斷開相機 {device_index} 連接時發生異常: {str(e)}"print(self._last_error)return False
###########################圖像視頻流顯示部分################################################def start_display(self,device_index: int) -> bool:"""啟動所有已連接相機的實時顯示"""with self._lock: # 添加線程鎖# 檢查相機是否已連接if device_index not in self.cameras or not self.cameras[device_index]['connected']:print(f"相機 {device_index} 未連接,無法啟動顯示")return Falseif device_index in self._running and self._running[device_index]:print(f"相機 {device_index} 顯示已啟動")return True# 設置運行標志self._running[device_index] = True# 創建并啟動顯示線程display_thread = threading.Thread(target=self._display_thread,args=(device_index,),daemon=True)self._display_threads[device_index] = display_threaddisplay_thread.start()print(f"相機 {device_index} 顯示線程已啟動")return Truedef stop_display(self, device_index: int = None) -> None:"""停止指定相機的顯示或停止所有相機顯示"""if device_index is None:# 停止所有顯示for idx in list(self._running.keys()):self._running[idx] = Falsefor idx, thread in self._display_threads.items():if thread.is_alive():thread.join()self._display_threads.clear()cv2.destroyAllWindows()else:# 停止指定相機顯示if device_index in self._running:self._running[device_index] = Falseif device_index in self._display_threads:if self._display_threads[device_index].is_alive():self._display_threads[device_index].join()del self._display_threads[device_index]cv2.destroyWindow(f"Camera {device_index}")def _display_thread(self, device_index: int) -> None:"""單個相機的顯示線程"""frame_count = 0last_time = time.time()window_name = f"Camera {device_index}"def _window_exists(window_name):"""檢查OpenCV窗口是否存在"""try:return cv2.getWindowProperty(window_name, cv2.WND_PROP_VISIBLE) >= 0except:return Falsetry:while self._running.get(device_index, False):try:# 獲取圖像result = self.get_image(device_index)if result is None:if PacketSizeLog:print(f"相機 {device_index} 獲取圖像超時")time.sleep(0.1)continueimg, _ = result# 計算FPSframe_count += 1current_time = time.time()if current_time - last_time >= 1.0:self._fps[device_index] = frame_count / (current_time - last_time)frame_count = 0last_time = current_time# 在圖像上顯示信息info = f"Cam {device_index} | {self.cameras[device_index]['model']} | FPS: {self._fps[device_index]:.1f}"cv2.putText(img, info, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)resized_image_by_scale = cv2.resize(img, None, fx=scale_width, fy=scale_height)# 顯示圖像cv2.imshow(window_name, resized_image_by_scale)# 檢查按鍵key = cv2.waitKey(1) & 0xFFif key == 27: # ESC鍵退出self._running[device_index] = Falsebreakexcept Exception as e:print(f"相機 {device_index} 顯示線程異常: {str(e)}")time.sleep(0.1)finally:# 線程結束時清理if _window_exists(window_name):cv2.destroyWindow(window_name)print(f"相機 {device_index} 顯示線程已停止")def disconnect_all(self) -> None:"""斷開所有相機的連接"""self.stop_display() # 先停止所有顯示for device_index in list(self.cameras.keys()):if self.cameras[device_index]['connected']:self.disconnect_camera(device_index)def __del__(self):"""析構函數,確保資源釋放"""self.disconnect_all()def main():# 創建相機管理器cam_manager = HKCameraManager()# 枚舉設備devices = cam_manager.enumerate_devices()if not devices:print("未找到任何相機設備")returnprint("找到以下相機設備:")for i, dev in enumerate(devices):# 根據設備類型顯示不同信息if dev['type'] == 'GigE':print(f"{i}: {dev['model']} (SN: {dev['serial']}, IP: {dev['ip']})")else: # USB設備print(f"{i}: {dev['model']} (SN: {dev['serial']}, Type: USB)")# 先連接所有相機for i in range(len(devices)):if not cam_manager.connect_camera(i):print(f"無法連接相機 {i}")continue # 即使一個相機連接失敗,也繼續嘗試其他相機# 確認連接狀態后再啟動顯示for i in range(len(devices)):if i in cam_manager.cameras and cam_manager.cameras[i]['connected']:cam_manager.start_display(i)try:# 主線程等待while any(cam_manager._running.values()):time.sleep(0.1)except KeyboardInterrupt:print("用戶中斷...")finally:# 清理資源cam_manager.disconnect_all()print("程序退出")if __name__ == "__main__":main()
5. 寫在最后
目前程序還有一些未增加的功能,后續會增加補充
- 相機參數動態調整功能
- 圖像保存功能
- 支持更多像素格式
- 網絡相機重連機制
- 日志系統替代print輸出