【python】基于pycharm的海康相機SDK二次開發

海康威視二次開發相機管理

在這里插入圖片描述

這段代碼基于python開發的,用了opencv的一些庫函數。實現了一個完整的海康機器人相機管理工具,支持多相機連接、參數配置、圖像采集和實時顯示功能。目前USB相機測試無誤,除了丟一些包。

1. 主要類結構

HKCameraManager

這是整個系統的核心類,負責管理所有相機的生命周期和操作。
全局可調參數

# 相機參數配置
EXPOSURE_MODE = 0  # 曝光模式:0:關閉;1:一次;2:自動曝光
EXPOSURE_TIME = 40000 # 曝光時間
GAIN_VALUE = 10 #增益值
ReverseX_enable = True # 水平翻轉
ReverseY_enable = True # 垂直翻轉
#圖像顯示大小
scale_width = 0.2  # 寬度縮放因子
scale_height = 0.2  # 高度縮放因子
PacketSizeLog = True  # 啟用丟包信息檢測
主要屬性:
  • cameras: 字典,存儲所有已連接相機的信息和句柄
  • _last_error: 記錄最后一次錯誤信息
  • _running: 字典,記錄每個相機的運行狀態
  • _lock: 線程鎖,保證線程安全
  • _display_threads: 字典,存儲每個相機的顯示線程
  • _fps: 字典,記錄每個相機的幀率
 def __init__(self):"""初始化相機管理器"""self.cameras: Dict[int, Dict] = {}  # 存儲所有相機實例和信息self._last_error: str = ""self._running = {}  # 每個相機的運行狀態self._lock = threading.Lock()self._display_threads = {}  # 每個相機的顯示線程self._fps = {}  # 每個相機的FPS

2. 核心功能流程

2.1 設備枚舉

  • 通過enumerate_devices()方法枚舉所有可用設備
  • 支持GigE和USB兩種接口類型的相機
  • 返回設備列表,包含型號、序列號、IP地址等信息
    def enumerate_devices(self) -> Optional[List[dict]]:"""枚舉所有可用設備"""try:# 設置要枚舉的設備類型tlayer_type = MV_GIGE_DEVICE | MV_USB_DEVICE | MV_GENTL_CAMERALINK_DEVICE | MV_GENTL_CXP_DEVICE | MV_GENTL_XOF_DEVICE# 初始化設備列表結構體device_list = MV_CC_DEVICE_INFO_LIST()memset(byref(device_list), 0, sizeof(device_list))# 創建臨時相機實例用于枚舉temp_cam = MvCamera()# 枚舉設備ret = temp_cam.MV_CC_EnumDevices(tlayer_type, device_list)if ret != 0:self._log_error("枚舉設備", ret)return None# 檢查找到的設備數量if device_list.nDeviceNum == 0:print("未檢測到任何相機設備")return []devices = []for i in range(device_list.nDeviceNum):# 獲取設備信息指針device_info = cast(device_list.pDeviceInfo[i], POINTER(MV_CC_DEVICE_INFO)).contents# 根據傳輸層類型處理設備信息if device_info.nTLayerType == MV_GIGE_DEVICE:# GigE設備device_data = {'model': ctypes.string_at(device_info.SpecialInfo.stGigEInfo.chModelName).decode('utf-8'),'serial': ctypes.string_at(device_info.SpecialInfo.stGigEInfo.chSerialNumber).decode('utf-8'),'ip': ".".join(map(str, device_info.SpecialInfo.stGigEInfo.nCurrentIp)),'type': 'GigE','index': i}elif device_info.nTLayerType == MV_USB_DEVICE:# USB設備# 修正USB設備信息獲取方式usb_info = device_info.SpecialInfo.stUsb3VInfo# 使用ctypes的string_at函數獲取字符串model_name = string_at(usb_info.chModelName).decode('utf-8', errors='ignore')serial_num = string_at(usb_info.chSerialNumber).decode('utf-8', errors='ignore')device_data = {'model': model_name.strip('\x00'),'serial': serial_num.strip('\x00'),'type': 'USB','index': i}else:continuedevices.append(device_data)return devicesexcept Exception as e:self._last_error = f"枚舉設備時發生異常: {str(e)}"print(self._last_error)import tracebacktraceback.print_exc()  # 打印完整的錯誤堆棧return None

2.2 相機連接

  • connect_camera()方法連接指定索引的相機
  • 步驟:
    1. 檢查相機是否已連接
    2. 枚舉設備并選擇指定索引的設備
    3. 創建相機句柄
    4. 打開設備
    5. 配置相機參數(曝光、增益等)
    6. 開始采集圖像
    7. 存儲相機信息到字典中
    def connect_camera(self, device_index: int) -> bool:"""連接指定索引的相機設備"""try:with self._lock:if device_index in self.cameras and self.cameras[device_index]['connected']:print(f"相機 {device_index} 已連接")return True# 枚舉設備tlayerType = MV_GIGE_DEVICE | MV_USB_DEVICE | MV_GENTL_CAMERALINK_DEVICE | MV_GENTL_CXP_DEVICE | MV_GENTL_XOF_DEVICEdeviceList = MV_CC_DEVICE_INFO_LIST()memset(byref(deviceList), 0, sizeof(deviceList))# 實例化相機cam = MvCamera()# 枚舉設備ret = cam.MV_CC_EnumDevices(tlayerType, deviceList)if ret != 0:self._log_error("枚舉設備", ret)return Falseif deviceList.nDeviceNum == 0:self._last_error = "未找到任何設備"print(self._last_error)return Falseif device_index >= deviceList.nDeviceNum:self._last_error = f"設備索引超出范圍,最大可用索引: {deviceList.nDeviceNum - 1}"print(self._last_error)return False# 選擇指定設備stDeviceList = cast(deviceList.pDeviceInfo[device_index], POINTER(MV_CC_DEVICE_INFO)).contents# 創建句柄ret = cam.MV_CC_CreateHandleWithoutLog(stDeviceList)if ret != MV_OK:self._log_error("創建句柄", ret)return False# 獲取設備信息if stDeviceList.nTLayerType == MV_GIGE_DEVICE:model_name = ctypes.string_at(stDeviceList.SpecialInfo.stGigEInfo.chModelName).decode('utf-8')serial_num = ctypes.string_at(stDeviceList.SpecialInfo.stGigEInfo.chSerialNumber).decode('utf-8')ip_addr = ".".join(map(str, stDeviceList.SpecialInfo.stGigEInfo.nCurrentIp))device_type = 'GigE'print(f"正在連接設備 {device_index}: {model_name} (SN: {serial_num}, IP: {ip_addr}。GiGe)")else:usb_info = stDeviceList.SpecialInfo.stUsb3VInfomodel_name = string_at(usb_info.chModelName).decode('utf-8', errors='ignore')serial_num = string_at(usb_info.chSerialNumber).decode('utf-8', errors='ignore')ip_addr = Nonedevice_type = 'USB'print(f"正在連接設備 {device_index}: {model_name} (SN: {serial_num}, USB-3.0)")# 打開相機ret = cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)if ret != MV_OK:# 特別處理USB相機連接問題if stDeviceList.nTLayerType == MV_USB_DEVICE:# 嘗試設置USB傳輸大小(海康USB相機常見問題)ret = cam.MV_CC_SetIntValue("TransferSize", 0x100000)if ret == MV_OK:ret = cam.MV_CC_SetIntValue("NumTransferBuffers", 8)if ret == MV_OK:ret = cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)if ret != 0:self._log_error("打開設備", ret)return False# 配置相機參數if not self._configure_camera(cam):cam.MV_CC_CloseDevice()cam.MV_CC_DestroyHandle()return False# 開始取流ret = cam.MV_CC_StartGrabbing()if ret != 0:self._log_error("開始取流", ret)cam.MV_CC_CloseDevice()cam.MV_CC_DestroyHandle()return False# 存儲相機信息 - 確保所有必要字段都正確設置self.cameras[device_index] = {'handle': cam,'model': model_name.strip('\x00') if isinstance(model_name, str) else model_name,'serial': serial_num.strip('\x00') if isinstance(serial_num, str) else serial_num,'type': device_type,'ip': ip_addr,'connected': True,  # 確保連接狀態正確設置為True'frame_count': 0,'last_frame_time': time.time()}# 初始化FPS計數器self._fps[device_index] = 0print(f"相機 {device_index} 連接成功: {model_name} (SN: {serial_num})")return Trueexcept Exception as e:self._last_error = f"連接相機時發生異常: {str(e)}"print(self._last_error)if 'cam' in locals():cam.MV_CC_DestroyHandle()return False

2.3 相機參數配置

  • _configure_camera()私有方法處理相機參數配置
  • 可配置項:
    • 觸發模式(連續采集)
    • 曝光模式(手動/自動)
    • 增益設置
    • 圖像翻轉(水平/垂直)
    def _configure_camera(self, cam: MvCamera) -> bool:"""配置相機參數"""try:# 設置觸發方式為連續采集ret = cam.MV_CC_SetEnumValue("TriggerMode", MV_TRIGGER_MODE_OFF)if ret != 0:self._log_error("設置觸發模式", ret)return False# 設置曝光模式match EXPOSURE_MODE:case 0:  # 手動設置參數ret = cam.MV_CC_SetEnumValue("ExposureAuto", MV_EXPOSURE_AUTO_MODE_OFF)if ret != 0:print("警告: 關閉自動曝光設置失敗,將采用自動曝光")# 設置曝光時間exposure = float(EXPOSURE_TIME)ret = cam.MV_CC_SetFloatValue("ExposureTime", exposure)if ret != 0:raise RuntimeError(f"Set ExposureTime failed with error {ret}")case 1:  # 一次曝光ret = cam.MV_CC_SetEnumValue("ExposureAuto", MV_EXPOSURE_AUTO_MODE_ONCE)if ret != 0:print("警告: 一次曝光設置失敗,將繼續使用手動曝光")case 2:  # 自動曝光ret = cam.MV_CC_SetEnumValue("ExposureAuto", MV_EXPOSURE_AUTO_MODE_CONTINUOUS)if ret != 0:print("警告: 自動曝光設置失敗,將繼續使用手動曝光")# 設置增益ret = cam.MV_CC_SetEnumValue("GainAuto", MV_GAIN_MODE_OFF)if ret != 0:print("警告: 手動增益設置失敗,將采用自動增益")gain_val = float(GAIN_VALUE)ret = cam.MV_CC_SetFloatValue("Gain", gain_val)if ret != 0:raise RuntimeError(f"Set gain failed with error {ret}")# 設置水平翻轉flip = c_int(1 if ReverseX_enable else 0)ret = cam.MV_CC_SetBoolValue("ReverseX", flip)if ret != 0:raise RuntimeError(f"Set horizontal flip failed with error {ret}")print(f"Horizontal flip {'enabled' if ReverseX_enable else 'disabled'}")# 設置垂直翻轉flip = c_int(1 if ReverseY_enable else 0)ret = cam.MV_CC_SetBoolValue("ReverseY", flip)if ret != 0:raise RuntimeError(f"Set vertical flip failed with error {ret}")print(f"Vertical flip {'enabled' if ReverseY_enable else 'disabled'}")return Trueexcept Exception as e:self._last_error = f"配置相機時發生異常: {str(e)}"print(self._last_error)return False

2.4 圖像獲取

  • get_image()方法獲取指定相機的圖像
  • 步驟:
    1. 獲取圖像緩沖區
    2. 復制圖像數據
    3. 根據像素類型處理圖像數據
    4. 轉換為灰度圖像
    5. 釋放圖像緩沖區
    6. 更新幀統計信息
    def get_image(self, device_index: int, timeout: int = 300) -> Optional[Tuple[np.ndarray, np.ndarray]]:"""獲取指定相機的圖像并返回原始圖像和灰度圖像"""with self._lock:if device_index not in self.cameras or not self.cameras[device_index]['connected']:self._last_error = f"相機 {device_index} 未連接"print(self._last_error)return Nonecam = self.cameras[device_index]['handle']try:# 初始化幀輸出結構stOutFrame = MV_FRAME_OUT()memset(byref(stOutFrame), 0, sizeof(stOutFrame))# 獲取圖像ret = cam.MV_CC_GetImageBuffer(stOutFrame, timeout)if ret != 0:self._log_error(f"相機 {device_index} 獲取圖像", ret)return None# 獲取圖像信息frame_info = stOutFrame.stFrameInfonPayloadSize = frame_info.nFrameLenpData = stOutFrame.pBufAddr# 打印調試信息# print(f"相機 {device_index} 圖像信息: "#       f"Width={frame_info.nWidth}, Height={frame_info.nHeight}, "#       f"PixelType={frame_info.enPixelType}, Size={nPayloadSize}")# 復制圖像數據data_buf = (c_ubyte * nPayloadSize)()cdll.msvcrt.memcpy(byref(data_buf), pData, nPayloadSize)# 轉換為numpy數組temp = np.frombuffer(data_buf, dtype=np.uint8)# 獲取圖像參數width = frame_info.nWidthheight = frame_info.nHeightpixel_type = frame_info.enPixelType# 根據像素類型處理圖像img = self._process_image_data(temp, width, height, pixel_type)if img is None:if PacketSizeLog:print(f"相機 {device_index} 圖像處理失敗 - 數據大小: {len(temp)}, "f"預期大小: {width * height * (3 if pixel_type in [PixelType_Gvsp_RGB8_Packed, PixelType_Gvsp_BGR8_Packed] else 1)}")cam.MV_CC_FreeImageBuffer(stOutFrame)return None# 轉換為灰度圖像if len(img.shape) == 2:  # 已經是灰度圖像gray = img.copy()else:gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)# 釋放圖像緩存cam.MV_CC_FreeImageBuffer(stOutFrame)# 更新幀統計信息self.cameras[device_index]['frame_count'] += 1self.cameras[device_index]['last_frame_time'] = time.time()return img, grayexcept Exception as e:self._last_error = f"相機 {device_index} 獲取圖像時發生異常: {str(e)}"print(self._last_error)if 'stOutFrame' in locals():cam.MV_CC_FreeImageBuffer(stOutFrame)return None

2.5 圖像顯示

  • start_display()啟動相機實時顯示
  • 為每個相機創建獨立的顯示線程
  • 顯示線程中:
    • 循環獲取圖像
    • 計算并顯示FPS
    • 顯示圖像到窗口
    • 處理用戶按鍵(ESC退出)
    def start_display(self,device_index: int) -> bool:"""啟動所有已連接相機的實時顯示"""with self._lock:  # 添加線程鎖# 檢查相機是否已連接if device_index not in self.cameras or not self.cameras[device_index]['connected']:print(f"相機 {device_index} 未連接,無法啟動顯示")return Falseif device_index in self._running and self._running[device_index]:print(f"相機 {device_index} 顯示已啟動")return True# 設置運行標志self._running[device_index] = True# 創建并啟動顯示線程display_thread = threading.Thread(target=self._display_thread,args=(device_index,),daemon=True)self._display_threads[device_index] = display_threaddisplay_thread.start()print(f"相機 {device_index} 顯示線程已啟動")return True

2.6 斷開連接

  • disconnect_camera()斷開單個相機連接
  • disconnect_all()斷開所有相機連接
  • 釋放所有資源
    def disconnect_camera(self, device_index: int) -> bool:"""斷開指定相機的連接"""with self._lock:if device_index not in self.cameras or not self.cameras[device_index]['connected']:print(f"相機 {device_index} 未連接")return Truecam = self.cameras[device_index]['handle']try:success = True# 停止取流ret = cam.MV_CC_StopGrabbing()if ret != 0:self._log_error(f"相機 {device_index} 停止取流", ret)success = False# 關閉設備ret = cam.MV_CC_CloseDevice()if ret != 0:self._log_error(f"相機 {device_index} 關閉設備", ret)success = False# 銷毀句柄ret = cam.MV_CC_DestroyHandle()if ret != 0:self._log_error(f"相機 {device_index} 銷毀句柄", ret)success = Falseif success:print(f"相機 {device_index} 已成功斷開連接")self.cameras[device_index]['connected'] = False# 從字典中移除相機del self.cameras[device_index]# 停止顯示線程if device_index in self._running:self._running[device_index] = Falsereturn successexcept Exception as e:self._last_error = f"斷開相機 {device_index} 連接時發生異常: {str(e)}"print(self._last_error)return Falsedef disconnect_all(self) -> None:"""斷開所有相機的連接"""self.stop_display()  # 先停止所有顯示for device_index in list(self.cameras.keys()):if self.cameras[device_index]['connected']:self.disconnect_camera(device_index)

3. 目前程序的一些功能和特點如下

  1. 多線程支持

    • 每個相機的顯示使用獨立線程
    • 使用線程鎖保證線程安全
  2. 錯誤處理

    • 詳細的錯誤日志記錄
    • 異常捕獲和處理
      在這里插入圖片描述
  3. 相機參數配置

    • 支持多種曝光模式
    • 可配置增益、翻轉等參數
  4. 圖像處理

    • 支持多種像素格式(Mono8, RGB8, BGR8等)
    • 自動處理數據大小不匹配的情況
    • 圖像縮放顯示
  5. 性能監控

    • 實時計算和顯示FPS
    • 幀計數統計

4. 完整代碼如下

from HK_Camera.MvCameraControl_class import *
from ctypes import *
from typing import Optional, Tuple, List, Dict
import time
import cv2
import numpy as np
import threading# 相機參數配置
EXPOSURE_MODE = 0  # 曝光模式:0:關閉;1:一次;2:自動曝光
EXPOSURE_TIME = 40000 # 曝光時間
GAIN_VALUE = 10 #增益值
ReverseX_enable = True # 水平翻轉
ReverseY_enable = True # 垂直翻轉
#圖像顯示大小
scale_width = 0.2  # 寬度縮放因子
scale_height = 0.2  # 高度縮放因子
PacketSizeLog = True  # 啟用丟包信息檢測class HKCameraManager:def __init__(self):"""初始化相機管理器"""self.cameras: Dict[int, Dict] = {}  # 存儲所有相機實例和信息self._last_error: str = ""self._running = {}  # 每個相機的運行狀態self._lock = threading.Lock()self._display_threads = {}  # 每個相機的顯示線程self._fps = {}  # 每個相機的FPS@propertydef last_error(self) -> str:"""獲取最后一次錯誤信息"""return self._last_errordef _log_error(self, operation: str, ret: int) -> None:"""記錄錯誤日志"""self._last_error = f"{operation}失敗,錯誤碼: 0x{ret:x}"print(self._last_error)def enumerate_devices(self) -> Optional[List[dict]]:"""枚舉所有可用設備"""try:# 設置要枚舉的設備類型tlayer_type = MV_GIGE_DEVICE | MV_USB_DEVICE | MV_GENTL_CAMERALINK_DEVICE | MV_GENTL_CXP_DEVICE | MV_GENTL_XOF_DEVICE# 初始化設備列表結構體device_list = MV_CC_DEVICE_INFO_LIST()memset(byref(device_list), 0, sizeof(device_list))# 創建臨時相機實例用于枚舉temp_cam = MvCamera()# 枚舉設備ret = temp_cam.MV_CC_EnumDevices(tlayer_type, device_list)if ret != 0:self._log_error("枚舉設備", ret)return None# 檢查找到的設備數量if device_list.nDeviceNum == 0:print("未檢測到任何相機設備")return []devices = []for i in range(device_list.nDeviceNum):# 獲取設備信息指針device_info = cast(device_list.pDeviceInfo[i], POINTER(MV_CC_DEVICE_INFO)).contents# 根據傳輸層類型處理設備信息if device_info.nTLayerType == MV_GIGE_DEVICE:# GigE設備device_data = {'model': ctypes.string_at(device_info.SpecialInfo.stGigEInfo.chModelName).decode('utf-8'),'serial': ctypes.string_at(device_info.SpecialInfo.stGigEInfo.chSerialNumber).decode('utf-8'),'ip': ".".join(map(str, device_info.SpecialInfo.stGigEInfo.nCurrentIp)),'type': 'GigE','index': i}elif device_info.nTLayerType == MV_USB_DEVICE:# USB設備# 修正USB設備信息獲取方式usb_info = device_info.SpecialInfo.stUsb3VInfo# 使用ctypes的string_at函數獲取字符串model_name = string_at(usb_info.chModelName).decode('utf-8', errors='ignore')serial_num = string_at(usb_info.chSerialNumber).decode('utf-8', errors='ignore')device_data = {'model': model_name.strip('\x00'),'serial': serial_num.strip('\x00'),'type': 'USB','index': i}else:continuedevices.append(device_data)return devicesexcept Exception as e:self._last_error = f"枚舉設備時發生異常: {str(e)}"print(self._last_error)import tracebacktraceback.print_exc()  # 打印完整的錯誤堆棧return Nonedef connect_camera(self, device_index: int) -> bool:"""連接指定索引的相機設備"""try:with self._lock:if device_index in self.cameras and self.cameras[device_index]['connected']:print(f"相機 {device_index} 已連接")return True# 枚舉設備tlayerType = MV_GIGE_DEVICE | MV_USB_DEVICE | MV_GENTL_CAMERALINK_DEVICE | MV_GENTL_CXP_DEVICE | MV_GENTL_XOF_DEVICEdeviceList = MV_CC_DEVICE_INFO_LIST()memset(byref(deviceList), 0, sizeof(deviceList))# 實例化相機cam = MvCamera()# 枚舉設備ret = cam.MV_CC_EnumDevices(tlayerType, deviceList)if ret != 0:self._log_error("枚舉設備", ret)return Falseif deviceList.nDeviceNum == 0:self._last_error = "未找到任何設備"print(self._last_error)return Falseif device_index >= deviceList.nDeviceNum:self._last_error = f"設備索引超出范圍,最大可用索引: {deviceList.nDeviceNum - 1}"print(self._last_error)return False# 選擇指定設備stDeviceList = cast(deviceList.pDeviceInfo[device_index], POINTER(MV_CC_DEVICE_INFO)).contents# 創建句柄ret = cam.MV_CC_CreateHandleWithoutLog(stDeviceList)if ret != MV_OK:self._log_error("創建句柄", ret)return False# 獲取設備信息if stDeviceList.nTLayerType == MV_GIGE_DEVICE:model_name = ctypes.string_at(stDeviceList.SpecialInfo.stGigEInfo.chModelName).decode('utf-8')serial_num = ctypes.string_at(stDeviceList.SpecialInfo.stGigEInfo.chSerialNumber).decode('utf-8')ip_addr = ".".join(map(str, stDeviceList.SpecialInfo.stGigEInfo.nCurrentIp))device_type = 'GigE'print(f"正在連接設備 {device_index}: {model_name} (SN: {serial_num}, IP: {ip_addr}。GiGe)")else:usb_info = stDeviceList.SpecialInfo.stUsb3VInfomodel_name = string_at(usb_info.chModelName).decode('utf-8', errors='ignore')serial_num = string_at(usb_info.chSerialNumber).decode('utf-8', errors='ignore')ip_addr = Nonedevice_type = 'USB'print(f"正在連接設備 {device_index}: {model_name} (SN: {serial_num}, USB-3.0)")# 打開相機ret = cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)if ret != MV_OK:# 特別處理USB相機連接問題if stDeviceList.nTLayerType == MV_USB_DEVICE:# 嘗試設置USB傳輸大小(海康USB相機常見問題)ret = cam.MV_CC_SetIntValue("TransferSize", 0x100000)if ret == MV_OK:ret = cam.MV_CC_SetIntValue("NumTransferBuffers", 8)if ret == MV_OK:ret = cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)if ret != 0:self._log_error("打開設備", ret)return False# 配置相機參數if not self._configure_camera(cam):cam.MV_CC_CloseDevice()cam.MV_CC_DestroyHandle()return False# 開始取流ret = cam.MV_CC_StartGrabbing()if ret != 0:self._log_error("開始取流", ret)cam.MV_CC_CloseDevice()cam.MV_CC_DestroyHandle()return False# 存儲相機信息 - 確保所有必要字段都正確設置self.cameras[device_index] = {'handle': cam,'model': model_name.strip('\x00') if isinstance(model_name, str) else model_name,'serial': serial_num.strip('\x00') if isinstance(serial_num, str) else serial_num,'type': device_type,'ip': ip_addr,'connected': True,  # 確保連接狀態正確設置為True'frame_count': 0,'last_frame_time': time.time()}# 初始化FPS計數器self._fps[device_index] = 0print(f"相機 {device_index} 連接成功: {model_name} (SN: {serial_num})")return Trueexcept Exception as e:self._last_error = f"連接相機時發生異常: {str(e)}"print(self._last_error)if 'cam' in locals():cam.MV_CC_DestroyHandle()return Falsedef _configure_camera(self, cam: MvCamera) -> bool:"""配置相機參數"""try:# 設置觸發方式為連續采集ret = cam.MV_CC_SetEnumValue("TriggerMode", MV_TRIGGER_MODE_OFF)if ret != 0:self._log_error("設置觸發模式", ret)return False# 設置曝光模式match EXPOSURE_MODE:case 0:  # 手動設置參數ret = cam.MV_CC_SetEnumValue("ExposureAuto", MV_EXPOSURE_AUTO_MODE_OFF)if ret != 0:print("警告: 關閉自動曝光設置失敗,將采用自動曝光")# 設置曝光時間exposure = float(EXPOSURE_TIME)ret = cam.MV_CC_SetFloatValue("ExposureTime", exposure)if ret != 0:raise RuntimeError(f"Set ExposureTime failed with error {ret}")case 1:  # 一次曝光ret = cam.MV_CC_SetEnumValue("ExposureAuto", MV_EXPOSURE_AUTO_MODE_ONCE)if ret != 0:print("警告: 一次曝光設置失敗,將繼續使用手動曝光")case 2:  # 自動曝光ret = cam.MV_CC_SetEnumValue("ExposureAuto", MV_EXPOSURE_AUTO_MODE_CONTINUOUS)if ret != 0:print("警告: 自動曝光設置失敗,將繼續使用手動曝光")# 設置增益ret = cam.MV_CC_SetEnumValue("GainAuto", MV_GAIN_MODE_OFF)if ret != 0:print("警告: 手動增益設置失敗,將采用自動增益")gain_val = float(GAIN_VALUE)ret = cam.MV_CC_SetFloatValue("Gain", gain_val)if ret != 0:raise RuntimeError(f"Set gain failed with error {ret}")# 設置水平翻轉flip = c_int(1 if ReverseX_enable else 0)ret = cam.MV_CC_SetBoolValue("ReverseX", flip)if ret != 0:raise RuntimeError(f"Set horizontal flip failed with error {ret}")print(f"Horizontal flip {'enabled' if ReverseX_enable else 'disabled'}")# 設置垂直翻轉flip = c_int(1 if ReverseY_enable else 0)ret = cam.MV_CC_SetBoolValue("ReverseY", flip)if ret != 0:raise RuntimeError(f"Set vertical flip failed with error {ret}")print(f"Vertical flip {'enabled' if ReverseY_enable else 'disabled'}")return Trueexcept Exception as e:self._last_error = f"配置相機時發生異常: {str(e)}"print(self._last_error)return Falsedef get_image(self, device_index: int, timeout: int = 300) -> Optional[Tuple[np.ndarray, np.ndarray]]:"""獲取指定相機的圖像并返回原始圖像和灰度圖像"""with self._lock:if device_index not in self.cameras or not self.cameras[device_index]['connected']:self._last_error = f"相機 {device_index} 未連接"print(self._last_error)return Nonecam = self.cameras[device_index]['handle']try:# 初始化幀輸出結構stOutFrame = MV_FRAME_OUT()memset(byref(stOutFrame), 0, sizeof(stOutFrame))# 獲取圖像ret = cam.MV_CC_GetImageBuffer(stOutFrame, timeout)if ret != 0:self._log_error(f"相機 {device_index} 獲取圖像", ret)return None# 獲取圖像信息frame_info = stOutFrame.stFrameInfonPayloadSize = frame_info.nFrameLenpData = stOutFrame.pBufAddr# 打印調試信息# print(f"相機 {device_index} 圖像信息: "#       f"Width={frame_info.nWidth}, Height={frame_info.nHeight}, "#       f"PixelType={frame_info.enPixelType}, Size={nPayloadSize}")# 復制圖像數據data_buf = (c_ubyte * nPayloadSize)()cdll.msvcrt.memcpy(byref(data_buf), pData, nPayloadSize)# 轉換為numpy數組temp = np.frombuffer(data_buf, dtype=np.uint8)# 獲取圖像參數width = frame_info.nWidthheight = frame_info.nHeightpixel_type = frame_info.enPixelType# 根據像素類型處理圖像img = self._process_image_data(temp, width, height, pixel_type)if img is None:if PacketSizeLog:print(f"相機 {device_index} 圖像處理失敗 - 數據大小: {len(temp)}, "f"預期大小: {width * height * (3 if pixel_type in [PixelType_Gvsp_RGB8_Packed, PixelType_Gvsp_BGR8_Packed] else 1)}")cam.MV_CC_FreeImageBuffer(stOutFrame)return None# 轉換為灰度圖像if len(img.shape) == 2:  # 已經是灰度圖像gray = img.copy()else:gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)# 釋放圖像緩存cam.MV_CC_FreeImageBuffer(stOutFrame)# 更新幀統計信息self.cameras[device_index]['frame_count'] += 1self.cameras[device_index]['last_frame_time'] = time.time()return img, grayexcept Exception as e:self._last_error = f"相機 {device_index} 獲取圖像時發生異常: {str(e)}"print(self._last_error)if 'stOutFrame' in locals():cam.MV_CC_FreeImageBuffer(stOutFrame)return Nonedef _process_image_data(self, data: np.ndarray, width: int, height: int, pixel_type: int) -> Optional[np.ndarray]:"""根據像素類型處理原始圖像數據"""try:if PacketSizeLog:# 首先檢查數據大小是否匹配預期expected_size = width * heightif pixel_type in [PixelType_Gvsp_Mono8, PixelType_Gvsp_RGB8_Packed, PixelType_Gvsp_BGR8_Packed]:if pixel_type == PixelType_Gvsp_Mono8:expected_size = width * heightelse:expected_size = width * height * 3if len(data) != expected_size:print(f"警告: 數據大小不匹配 (預期: {expected_size}, 實際: {len(data)}), 嘗試自動處理")# 嘗試自動計算正確的高度if pixel_type == PixelType_Gvsp_Mono8:actual_height = len(data) // widthif actual_height * width == len(data):return data.reshape((actual_height, width))else:actual_height = len(data) // (width * 3)if actual_height * width * 3 == len(data):img = data.reshape((actual_height, width, 3))if pixel_type == PixelType_Gvsp_RGB8_Packed:return cv2.cvtColor(img, cv2.COLOR_RGB2BGR)return imgreturn None# 正常處理流程if pixel_type == PixelType_Gvsp_Mono8:return data.reshape((height, width))elif pixel_type == PixelType_Gvsp_RGB8_Packed:img = data.reshape((height, width, 3))return cv2.cvtColor(img, cv2.COLOR_RGB2BGR)elif pixel_type == PixelType_Gvsp_BGR8_Packed:return data.reshape((height, width, 3))elif pixel_type in [PixelType_Gvsp_Mono10, PixelType_Gvsp_Mono12]:# 對于10位或12位圖像,需要進行位轉換img = data.view(np.uint16)img = (img >> (pixel_type - PixelType_Gvsp_Mono8)).astype(np.uint8)return img.reshape((height, width))else:self._last_error = f"不支持的像素格式: {pixel_type}"print(self._last_error)return Noneexcept Exception as e:self._last_error = f"圖像處理錯誤: {str(e)}"if PacketSizeLog:print(self._last_error)return Nonedef disconnect_camera(self, device_index: int) -> bool:"""斷開指定相機的連接"""with self._lock:if device_index not in self.cameras or not self.cameras[device_index]['connected']:print(f"相機 {device_index} 未連接")return Truecam = self.cameras[device_index]['handle']try:success = True# 停止取流ret = cam.MV_CC_StopGrabbing()if ret != 0:self._log_error(f"相機 {device_index} 停止取流", ret)success = False# 關閉設備ret = cam.MV_CC_CloseDevice()if ret != 0:self._log_error(f"相機 {device_index} 關閉設備", ret)success = False# 銷毀句柄ret = cam.MV_CC_DestroyHandle()if ret != 0:self._log_error(f"相機 {device_index} 銷毀句柄", ret)success = Falseif success:print(f"相機 {device_index} 已成功斷開連接")self.cameras[device_index]['connected'] = False# 從字典中移除相機del self.cameras[device_index]# 停止顯示線程if device_index in self._running:self._running[device_index] = Falsereturn successexcept Exception as e:self._last_error = f"斷開相機 {device_index} 連接時發生異常: {str(e)}"print(self._last_error)return False
###########################圖像視頻流顯示部分################################################def start_display(self,device_index: int) -> bool:"""啟動所有已連接相機的實時顯示"""with self._lock:  # 添加線程鎖# 檢查相機是否已連接if device_index not in self.cameras or not self.cameras[device_index]['connected']:print(f"相機 {device_index} 未連接,無法啟動顯示")return Falseif device_index in self._running and self._running[device_index]:print(f"相機 {device_index} 顯示已啟動")return True# 設置運行標志self._running[device_index] = True# 創建并啟動顯示線程display_thread = threading.Thread(target=self._display_thread,args=(device_index,),daemon=True)self._display_threads[device_index] = display_threaddisplay_thread.start()print(f"相機 {device_index} 顯示線程已啟動")return Truedef stop_display(self, device_index: int = None) -> None:"""停止指定相機的顯示或停止所有相機顯示"""if device_index is None:# 停止所有顯示for idx in list(self._running.keys()):self._running[idx] = Falsefor idx, thread in self._display_threads.items():if thread.is_alive():thread.join()self._display_threads.clear()cv2.destroyAllWindows()else:# 停止指定相機顯示if device_index in self._running:self._running[device_index] = Falseif device_index in self._display_threads:if self._display_threads[device_index].is_alive():self._display_threads[device_index].join()del self._display_threads[device_index]cv2.destroyWindow(f"Camera {device_index}")def _display_thread(self, device_index: int) -> None:"""單個相機的顯示線程"""frame_count = 0last_time = time.time()window_name = f"Camera {device_index}"def _window_exists(window_name):"""檢查OpenCV窗口是否存在"""try:return cv2.getWindowProperty(window_name, cv2.WND_PROP_VISIBLE) >= 0except:return Falsetry:while self._running.get(device_index, False):try:# 獲取圖像result = self.get_image(device_index)if result is None:if PacketSizeLog:print(f"相機 {device_index} 獲取圖像超時")time.sleep(0.1)continueimg, _ = result# 計算FPSframe_count += 1current_time = time.time()if current_time - last_time >= 1.0:self._fps[device_index] = frame_count / (current_time - last_time)frame_count = 0last_time = current_time# 在圖像上顯示信息info = f"Cam {device_index} | {self.cameras[device_index]['model']} | FPS: {self._fps[device_index]:.1f}"cv2.putText(img, info, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)resized_image_by_scale = cv2.resize(img, None, fx=scale_width, fy=scale_height)# 顯示圖像cv2.imshow(window_name, resized_image_by_scale)# 檢查按鍵key = cv2.waitKey(1) & 0xFFif key == 27:  # ESC鍵退出self._running[device_index] = Falsebreakexcept Exception as e:print(f"相機 {device_index} 顯示線程異常: {str(e)}")time.sleep(0.1)finally:# 線程結束時清理if _window_exists(window_name):cv2.destroyWindow(window_name)print(f"相機 {device_index} 顯示線程已停止")def disconnect_all(self) -> None:"""斷開所有相機的連接"""self.stop_display()  # 先停止所有顯示for device_index in list(self.cameras.keys()):if self.cameras[device_index]['connected']:self.disconnect_camera(device_index)def __del__(self):"""析構函數,確保資源釋放"""self.disconnect_all()def main():# 創建相機管理器cam_manager = HKCameraManager()# 枚舉設備devices = cam_manager.enumerate_devices()if not devices:print("未找到任何相機設備")returnprint("找到以下相機設備:")for i, dev in enumerate(devices):# 根據設備類型顯示不同信息if dev['type'] == 'GigE':print(f"{i}: {dev['model']} (SN: {dev['serial']}, IP: {dev['ip']})")else:  # USB設備print(f"{i}: {dev['model']} (SN: {dev['serial']}, Type: USB)")# 先連接所有相機for i in range(len(devices)):if not cam_manager.connect_camera(i):print(f"無法連接相機 {i}")continue  # 即使一個相機連接失敗,也繼續嘗試其他相機# 確認連接狀態后再啟動顯示for i in range(len(devices)):if i in cam_manager.cameras and cam_manager.cameras[i]['connected']:cam_manager.start_display(i)try:# 主線程等待while any(cam_manager._running.values()):time.sleep(0.1)except KeyboardInterrupt:print("用戶中斷...")finally:# 清理資源cam_manager.disconnect_all()print("程序退出")if __name__ == "__main__":main()

5. 寫在最后

目前程序還有一些未增加的功能,后續會增加補充

  1. 相機參數動態調整功能
  2. 圖像保存功能
  3. 支持更多像素格式
  4. 網絡相機重連機制
  5. 日志系統替代print輸出

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/87160.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/87160.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/87160.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

HTTP 協議各個主要版本的功能特點、核心原理、使用場景總結

我們來系統總結一下 HTTP 協議各個主要版本的功能特點、核心原理(用圖示輔助說明)以及典型使用場景。 核心演進目標: 提升性能、安全性、效率和靈活性。 1. HTTP/0.9 (1991) - 遠古雛形 功能特點: 極其簡單: 只支持 GET 方法。無…

Linux編程:3、進程通信-信號

一、進程通信概述 (一)進程通信的目的 在企業開發中,一個項目常常需要多個進程共同協作,而這些進程之間需要進行通信(交換信息)以便協作。本章內容主要圍繞信號講解,其它進程通信的常用方式請…

深度解析Vue.js組件開發與實戰案例

一、Vue.js組件化思想 Vue.js的核心思想之一就是組件化開發。組件系統是Vue的一個重要概念,它允許我們使用小型、獨立和通常可復用的組件構建大型應用。在Vue中,組件本質上是一個擁有預定義選項的Vue實例。 1.1 為什么需要組件化 代碼復用:避免重復造輪子,提高開發效率可…

TensorFlow 2.0 與 Python 3.11 兼容性

TensorFlow 2.0 與 Python 3.11 兼容性 TensorFlow 2.0 官方版本對 Python 3.11 的支持有限,可能出現兼容性問題。建議使用 TensorFlow 2.10 或更高版本,這些版本已適配 Python 3.11。若需強制運行,可通過以下方式解決依賴沖突: …

MyBatisPlus 全面學習路徑

MyBatisPlus 全面學習路徑 學習目錄 第一部分:MyBatisPlus 基礎 MyBatisPlus 簡介與核心特性快速入門與環境搭建核心功能:BaseMapper 與 CRUD 接口條件構造器(Wrapper)詳解ActiveRecord 模式主鍵策略與通用枚舉 第二部分&…

React16,17,18,19更新對比

文章目錄 前言一、16更新二、17更新三、18更新四、19更新總結 前言 總結react 16,17,18,19所更新的內容,并且部分會涉及到原理講解。 一、16更新 1、在16.8之前更新,還是基于class組件的升級和維護更新。并且更新了一…

【git】有兩個遠程倉庫時的推送、覆蓋、合并問題

當你執行 git pull origin develop(或默認的 git pull)時,Git 會把遠端 origin/develop 的提交合并到你本地的 develop,如果遠端已經丟掉(或從未包含)你之前在私庫(priv)里提交過的改動,那這些改動就會被「覆蓋」,看起來就像「本地修改沒了」。 要解決這個問題,分…

Spring Boot 集成國內AI,包含文心一言、通義千問和訊飛星火平臺實戰教程

Spring Boot 集成國內AI,包含文心一言、通義千問和訊飛星火平臺實戰教程 一、項目結構二、添加Maven依賴三、配置API密鑰 (application.yml)四、配置類1. AI配置類 (AiProperties.java)2. 啟用配置類 (AiConfig.java) 五、服務層實現1. 文心一言服務 (WenxinService…

Elastic Search 學習筆記

1. Elasticsearch 是什么?有哪些應用場景? Elasticsearch 整體原理流程? Elasticsearch 是一個為海量數據提供近實時搜索和分析能力的分布式搜索引擎,廣泛應用于全文檢索、日志分析和大數據處理場景中。 Elasticsearch 整體原理…

動態規劃之斐波那契數(一)

解法一&#xff1a;遞歸 class Solution { public:int fib(int n) {if(n<2) return n;return fib(n-1)fib(n-2);} }; 解法二&#xff1a;dp class Solution { public:int fib(int N) {if (N < 1) return N;int dp[2];dp[0] 0;dp[1] 1;for (int i 2; i < N; i) {…

如何設置爬蟲的訪問頻率?

設置爬蟲的訪問頻率是爬蟲開發中的一個重要環節&#xff0c;尤其是在爬取大型網站&#xff08;如1688&#xff09;時&#xff0c;合理的訪問頻率可以避免對目標網站造成過大負擔&#xff0c;同時也能降低被封禁的風險。以下是一些常見的方法和建議&#xff0c;幫助你合理設置爬…

前端面試六之axios

一、axios簡介 Axios 是一個基于 Promise 的 HTTP 客戶端&#xff0c;用于瀏覽器和 Node.js 環境。在瀏覽器端&#xff0c;Axios 的底層實現是基于原生的 XMLHttpRequest&#xff08;XHR&#xff09;。它對 XHR 進行了封裝&#xff0c;增加了 Promise 支持、自動轉換 JSON 數據…

模板方法模式Template Method Pattern

模式定義 定義一個操作中算法的骨架&#xff0c;而將一些步驟延遲到子類中&#xff0c;模板方法使得子類可以不改變一個算法的結構即可重定義該算法的某些特定步驟 類行為型模式 模式結構 AbstractClass&#xff1a;抽象類ConcreteClass&#xff1a;具體子類 只有類之間的繼…

【行云流水AI筆記】游戲里面的強化學習使用場景

強化學習在游戲中的應用已從早期的棋類博弈擴展到現代復雜游戲的全流程優化&#xff0c;以下是結合最新技術進展的核心應用場景及典型案例&#xff1a; 一、競技游戲的策略突破 1. 策略博弈類游戲 代表案例&#xff1a;AlphaGo/AlphaZero&#xff08;圍棋&#xff09;、Alph…

使用Python和PyTorch框架,基于RetinaNet模型進行目標檢測,包含數據準備、模型訓練、評估指標計算和可視化

下面是一個完整的實現方案,使用Python和PyTorch框架,基于RetinaNet模型進行目標檢測,包含數據準備、模型訓練、評估指標計算和可視化。 import os import numpy as np import matplotlib.pyplot as plt import torch import torchvision from torchvision.models.detection…

springboot服務如何獲取pod當前ip方案及示例

在 Kubernetes 集群中&#xff0c;Spring Boot 服務獲取 Pod 當前 IP 的方案主要有兩種&#xff1a;通過環境變量注入 或 通過 Java 代碼動態獲取網絡接口 IP。以下是兩種方案的詳細說明及示例&#xff1a; 方案一&#xff1a;通過 Kubernetes Downward API 注入環境變量 原理…

1.MySQL三層結構

1.所謂安裝的Mysql數據庫&#xff0c;就是在電腦上安裝了一個數據庫管理系統&#xff08;【DBMS】database manage system&#xff09;&#xff0c;這個管理程序可以管理多個數據庫。 2.一個數據庫中可以創建多個表&#xff0c;以保存數據&#xff08;信息&#xff09;。【數據…

[深度學習]目標檢測基礎

目錄 一、實驗目的 二、實驗環境 三、實驗內容 3.1 LM_BoundBox 3.1.1 實驗代碼 3.1.2 實驗結果 3.2 LM_Anchor 3.2.1 實驗代碼 3.2.2 實驗結果 3.3 LM_Multiscale-object-detection 3.3.1 實驗代碼 3.3.2 實驗結果 四、實驗小結 一、實驗目的 了解python語…

ALOHA機器人平臺:低成本、高精度雙臂操作及其進展深度解析

原創1從感知決策到具身智能的技術躍遷與挑戰(基座模型與VLA模型)2ALOHA機器人平臺&#xff1a;低成本、高精度雙臂操作及其進展深度解析3(上)通用智能體與機器人Transformer&#xff1a;Gato和RT-1技術解析及與LLM Transformer的異同4(下)通用智能體與機器人Transformer&#x…

C++: 類 Class 的基礎用法

&#x1f3f7;? 標簽&#xff1a;C、面向對象、類、構造函數、成員函數、封裝、繼承、多態 &#x1f4c5; 更新時間&#xff1a;2025年6月15日 &#x1f4ac; 歡迎在評論區留言交流你的理解與疑問&#xff01; 文章目錄 前言一、什么是類&#xff1f;二、類的定義1.基本語法2.…