第一節:
首先是環境的搭建。
環境的搭建。
root@topeet:/home/topeet/source_code/wang_onvif_python# python -m venv venv
Command 'python' not found, did you mean:command 'python3' from deb python3command 'python' from deb python-is-python3
root@topeet:/home/topeet/source_code/wang_onvif_python# python^Cm venv venv
root@topeet:/home/topeet/source_code/wang_onvif_python# ^C
root@topeet:/home/topeet/source_code/wang_onvif_python# ^C
root@topeet:/home/topeet/source_code/wang_onvif_python# python3
python3 python3.10 python3.10-config python3-config
root@topeet:/home/topeet/source_code/wang_onvif_python# python3
python3 python3.10 python3.10-config python3-config
root@topeet:/home/topeet/source_code/wang_onvif_python# python3 -m venv myenv
root@topeet:/home/topeet/source_code/wang_onvif_python# source myenv/bin/activate
(myenv) root@topeet:/home/topeet/source_code/wang_onvif_python# pip3 install WSDiscovery
Collecting WSDiscoveryDownloading WSDiscovery-2.1.2.tar.gz (23 kB)Preparing metadata (setup.py) ... done
Collecting clickDownloading click-8.2.1-py3-none-any.whl (102 kB)━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 102.2/102.2 KB 618.4 kB/s eta 0:00:00
Collecting ifaddrDownloading ifaddr-0.2.0-py3-none-any.whl (12 kB)
Using legacy 'setup.py install' for WSDiscovery, since package 'wheel' is not installed.
Installing collected packages: ifaddr, click, WSDiscoveryRunning setup.py install for WSDiscovery ... done
Successfully installed WSDiscovery-2.1.2 click-8.2.1 ifaddr-0.2.0
這里沒有使用 requirement.txt 。
然后是一些 基本的介紹。?
主要是 profile S
這是 接口文檔。
這個實際上是 HTTP 協議。
這是基本的功能。
這兩個是 必選的。
第二節:
然后是發現設備。
然后是基本的使用。
deepseek :
from wsdiscovery import WSDiscovery
from wsdiscovery import QNamedef discover_onvif_cameras():wsd = WSDiscovery()try:wsd.start()# ONVIF 設備類型onvif_type = QName("http://www.onvif.org/ver10/network/wsdl", "NetworkVideoTransmitter")# 搜索 ONVIF 設備services = wsd.searchServices(types=[onvif_type])print(f"發現 {len(services)} 個 ONVIF 攝像頭:")for service in services:print(f"\n攝像頭地址: {service.getXAddrs()[0]}")print(f"設備類型: {service.getTypes()}")finally:wsd.stop()if __name__ == "__main__":discover_onvif_cameras()
基本上 使用的話, 這個 庫就夠用了。
我自己 測試使用的代碼
43 from wsdiscovery import WSDiscovery2 def test_find():1 # http://www.onvif.org/onvif/ver10/network/wsdl/remotediscovery.wsdl5 # from wsdiscovery.threaded import NetworkingThread, MULTICAST_PORT12 clients = []34 wsd = WSDiscovery()5 wsd.start()6 services = wsd.searchServices()78 for service in services:9 get_ip = str(service.getXAddrs())10 get_types = str(service.getTypes())11 clients.append(get_ip)121314 wsd.stop()15 clients.sort()1617 print("------------------start------------------------")18 for client in clients:19 print(client)2021 if __name__ == '__main__':22 print("main.py")23 print("ONVIFClientManager")2425 test_find()26
執行 編譯+執行 命令。
這是返回的結果。
我的 局域網內 是有一個大華? 攝像頭的。
(myenv) root@topeet:/home/topeet/source_code/wang_onvif_python/my_code# python3 client_find.py
main.py
ONVIFClientManager
------------------start------------------------
['http://192.168.1.114:5357/e70e9f7e-9aa8-42c7-ab3d-10537725dc6d/']
['http://192.168.1.46/onvif/device_service']
['http://TOPEET_FILE:5357/c44da3fc-8dad-480c-9463-c3e887093657']
(myenv) root@topeet:/home/topeet/source_code/wang_onvif_python/my_code#
(myenv) root@topeet:/home/topeet/source_code/wang_onvif_python/my_code#
然后是一些基本的了解。
多播可以在 廣域網上傳播,但是廣播不行。
D類 地址 專門用于 廣播。
這里為什么是? 239.255.255.250 呢? 這個是固定的嗎?
第三節:
然后是 截屏,分辨率的配置。
一些基本的了解:?
它的流程是這樣的。
客戶端探測--->WSDiscovery ---->返回devcemgt 地址------>devicemgt 地址+ getservice----->得到其他模塊的地址。
這兩個函數是一樣的。
然后是我自己的代碼。
運行的有個報錯。
(myenv) root@topeet:/home/topeet/source_code/wang_onvif_python/my_code# pip3 uninstall suds-passworddigest
WARNING: Skipping suds-passworddigest as it is not installed.
(myenv) root@topeet:/home/topeet/source_code/wang_onvif_python/my_code# pip3 install --upgrade onvif_zeep
Collecting onvif_zeepDownloading onvif_zeep-0.2.12.tar.gz (163 kB)━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 163.1/163.1 KB 685.2 kB/s eta 0:00:00Preparing metadata (setup.py) ... done
(myenv) root@topeet:/home/topeet/source_code/wang_onvif_python/my_code# pip3 install pillow
Collecting pillowDownloading pillow-11.3.0-cp310-cp310-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl (6.6 MB)━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 6.6/6.6 MB 4.6 MB/s eta 0:00:00
Installing collected packages: pillow
Successfully installed pillow-11.3.0
(myenv) root@topeet:/home/topeet/source_code/wang_onvif_python/my_code#
驗證一下截圖的代碼:?
13 from mypackage import client12 import os1110 def test_client():98 #密碼+賬戶 任意7 wang_client = client.Client('192.168.1.46', 'admin', 'Admin123')65 # rtsp://admin:Admin123@192.168.1.145/live/0/MAIN4 # rtsp://admin:Admin123@192.168.1.145/live/0/SUB321 if not wang_client.connect():14 exit(0)12 root_dir = os.path.dirname(os.path.abspath(__file__))3 wang_client.Snapshot(file_dir=os.path.join(root_dir,"data"))45 # streamUri = wang_client.GetStreamUri()6 # print(streamUri)7 # profiles = client.GetProfiles()8 # osds = client.GetOSDs()9 # info = client.GetDeviceInformation()10 # videoSourceConfig = client.GetVideoSourceConfigurations()11 #12 # encoderConfig1 = client.GetVideoEncoderConfigurations()13 # client.SetVideoEncoderConfiguration()14 # encoderConfig2 = client.GetVideoEncoderConfigurations()15 #16 # test_client(client)1718 print("end")192021 if __name__ == '__main__':22 print("main.py")23 print("ONVIFClientManager")2425 test_client()26
前提是 已經把 client 這個 包 移植好。
import osfrom onvif import ONVIFCamera,ONVIFError
import zeep
import time
from datetime import datetime
import requests
from requests.auth import HTTPDigestAuth
from PIL import Imagedef zeep_pythonvalue(self, xmlvalue):return xmlvalueclass Client(object):def __init__(self, ip: str, username: str, password: str):self.ip = ipself.username = usernameself.password = passwordzeep.xsd.simple.AnySimpleType.pythonvalue = zeep_pythonvalue# zeep.xsd.simple.AnySimpleType.pythonvalue = lambda x:xdef connect(self):"""連接相機:return:"""try:self.camera = ONVIFCamera(self.ip, 80, self.username, self.password)self.media = self.camera.create_media_service()profiles = self.GetProfiles()self.media_profile = profiles[0] # 獲取配置信息print("連接成功")return Trueexcept Exception as e:print("連接失敗")print(e)return Falsedef Snapshot(self,file_dir='data'):"""截圖:return:"""if not os.path.exists(file_dir):os.makedirs(file_dir)file_path = os.path.join(file_dir,str(datetime.now().strftime("%Y%m%d_%H_%M_%S"))+".jpg")res = self.media.GetSnapshotUri({'ProfileToken': self.media_profile.token})# response = requests.get(res.Uri, auth=HTTPDigestAuth(self.username, self.password))response = requests.get(res.Uri)with open(file_path, 'wb') as f: # 保存截圖f.write(response.content)print("保存截圖成功:%s"%file_path)def Snapshot_resize(self, file_dir='data', size=None):"""截圖并調整尺寸:param picname: 保存截圖的文件名:param new_size: 調整后的尺寸,格式為(width, height)"""if not os.path.exists(file_dir):os.makedirs(file_dir)file_path = os.path.join(file_dir,str(datetime.now().strftime("%Y%m%d_%H_%M_%S"))+".jpg")res = self.media.GetSnapshotUri({'ProfileToken': self.media_profile.token})response = requests.get(res.Uri, auth=HTTPDigestAuth(self.username, self.password))with open(file_path, 'wb') as f:f.write(response.content)if size:with Image.open(file_path) as img:img = img.resize(size)img.save(file_path)print("保存截圖成功:%s"%file_path)def GetStreamUri(self):obj = self.media.create_type('GetStreamUri')obj.StreamSetup = {'Stream': 'rtp-unicast', 'Transport': {'Protocol': 'RTSP'}}obj.ProfileToken = self.media_profile.tokenres = self.media.GetStreamUri(obj)url = res['Uri']# url示例 rtsp://192.168.1.176:554/cam/realmonitor?channel=1&subtype=0&unicast=true&proto=Onvifif url.startswith("rtsp://"):url_suffix = url[7:]url = "rtsp://%s:%s@%s"%(self.username,self.password,url_suffix)return urldef GetDeviceInformation(self):# https://www.onvif.org/onvif/ver10/device/wsdl/devicemgmt.wsdldevicemgmt = self.camera.devicemgmtcapabilities = devicemgmt.GetCapabilities()services = devicemgmt.GetServices(True)hostname1 = devicemgmt.GetHostname()devicemgmt.SetHostname("hh")hostname2 = devicemgmt.GetHostname()# 查看設備支持的范圍scopes1 = devicemgmt.GetScopes()devicemgmt.SetScopes(["onvif://www.onvif.org/name/qj92122","onvif://www.onvif.org/location/city/hs22"])scopes2 = devicemgmt.GetScopes()info = devicemgmt.GetDeviceInformation()interfaces = devicemgmt.GetNetworkInterfaces()return infodef GetVideoSourceConfigurations(self):return self.media.GetVideoSourceConfigurations()def GetVideoEncoderConfigurations(self):return self.media.GetVideoEncoderConfigurations()def GetProfiles(self):return self.media.GetProfiles()def GetOSDs(self):return self.media.GetOSDs()def SetVideoEncoderConfiguration(self, ratio=(1920, 1080), Encoding='H264', bitrate=2044, fps=30, gop=50):videoSourceConfig = self.GetVideoSourceConfigurations()encoderConfig = self.GetVideoEncoderConfigurations()if ratio[0] > videoSourceConfig[0]['Bounds']['width']:raise ONVIFError(f'ratio Error: max width should less then {videoSourceConfig[0]["Bounds"]["width"]}')if ratio[1] > videoSourceConfig[0]['Bounds']['height']:raise ONVIFError(f'ratio Error: max height should less then {videoSourceConfig[0]["Bounds"]["height"]}')if Encoding.upper() not in ('H265', 'H264'):raise ONVIFError(f'encoding Error: enconding format should be "H265" or "H264"')encoderConfig[0]['Resolution']['Width'] = ratio[0]encoderConfig[0]['Resolution']['Height'] = ratio[1]encoderConfig[0]['Encoding'] = Encoding.upper()encoderConfig[0]['RateControl']['FrameRateLimit'] =fpsencoderConfig[0]['RateControl']['BitrateLimit'] = bitrate# encoderConfig[0]['GovLength'] = gopself.media.SetVideoEncoderConfiguration({'Configuration':encoderConfig[0],'ForcePersistence':True})
最后的結果也是 生成了 圖片。
然后測試一下 關于 rtsp 的功能。
目前測試了 rtsp 的功能。
osd 的返回值。
deviceinfo 的返回值。
videosource?
videoencoder?
設置 videoencoder 的功能。
4 from mypackage import client3 import os21 def test_client():51 #密碼+賬戶 任意2 wang_client = client.Client('192.168.1.46', 'admin', 'Admin123')34 # rtsp://admin:Admin123@192.168.1.145/live/0/MAIN5 # rtsp://admin:Admin123@192.168.1.145/live/0/SUB678 if not wang_client.connect():9 exit(0)10 #11 # root_dir = os.path.dirname(os.path.abspath(__file__))12 # wang_client.Snapshot(file_dir=os.path.join(root_dir,"data"))1314 streamUri = wang_client.wang_GetStreamUri()15 print(streamUri)16 profiles = wang_client.GetProfiles()17 # print(profiles)18 osds = wang_client.GetOSDs()19 # print(osds)20 info = wang_client.GetDeviceInformation()21 # print(info)22232425 videoSourceConfig = wang_client.GetVideoSourceConfigurations()26 # print(videoSourceConfig)27 encoderConfig1 = wang_client.GetVideoEncoderConfigurations()28 print(encoderConfig1)29 wang_client.SetVideoEncoderConfiguration()30 encoderConfig2 = wang_client.GetVideoEncoderConfigurations()31 print(encoderConfig2)3233 print("end")343536 if __name__ == '__main__':37 print("main.py")38 print("ONVIFClientManager")3940 test_client()41
第四節:
然后是Ptz 的配置。、
首先是 移植一下 Pzt 的模塊
這里我有一個疑問,就是
為什么 ptz 的控制 ,在代碼中要通過 media 模塊呢?
from time import sleep1 from mypackage import client23 class PTZ():4 def __init__(self, client: client.Client):56 self.media = client.media78 self.media_profile = self.media.GetProfiles()[0]9 token = self.media_profile.token10 self.ptz = self.media.create_ptz_service()1112 #Get available PTZ services13 request = self.ptz.create_type('GetServiceCapabilities')14 Service_Capabilities = self.ptz.GetServiceCapabilities(request)1516 #Get PTZ status17 status = self.ptz.GetStatus({'ProfileToken':token})1819 # Get PTZ configuration options for getting option ranges20 request = self.ptz.create_type('GetConfigurationOptions')21 request.ConfigurationToken = self.media_profile.PTZConfiguration.token22 ptz_configuration_options = self.ptz.GetConfigurationOptions(request)2324 self.requestc = self.ptz.create_type('ContinuousMove')25 self.requestc.ProfileToken = self.media_profile.token2627 self.requesta = self.ptz.create_type('AbsoluteMove')28 self.requesta.ProfileToken = self.media_profile.token2930 self.requestr = self.ptz.create_type('RelativeMove')31 self.requestr.ProfileToken = self.media_profile.token323334 self.requests = self.ptz.create_type('Stop')35 self.requests.ProfileToken = self.media_profile.token3637 self.requestp = self.ptz.create_type('SetPreset')38 self.requestp.ProfileToken = self.media_profile.token3940 self.requestg = self.ptz.create_type('GotoPreset')41 self.requestg.ProfileToken = self.media_profile.token4243 # self.stop()4445 def __del__(self):46 self.stop()4748 def stop(self):49 self.requests.PanTilt = True50 self.requests.Zoom = True5152 self.ptz.Stop(self.requests)53
#Continuous move functions44 def perform_move(self,timeout,x,y):43 # Start continuous move42 # ret = self.ptz.ContinuousMove(self.requestc)4140 self.ptz.ContinuousMove({39 'ProfileToken': self.media_profile.token,38 'Velocity': {37 'PanTilt': {'x': x, 'y': y}36 # 'Zoom': {'x': 1.0}35 }34 })3332 sleep(timeout)31 self.stop()3029 def move_absolute(self, x, y, velocity):28 # self.requesta.Position.PanTilt._x = pan27 # self.requesta.Position.PanTilt._y = tilt26 # self.requesta.Speed.PanTilt._x = velocity25 # self.requesta.Speed.PanTilt._y = velocity24 # ret = self.ptz.AbsoluteMove(self.requesta)2322 self.ptz.AbsoluteMove({21 'ProfileToken': self.media_profile.token,20 'Position': {19 'PanTilt': {'x': x, 'y': y}18 },17 'Speed': {16 'PanTilt': {'x': velocity, 'y': velocity}15 }14 })1312 # ret = self.ptz.AbsoluteMove(self.requesta, Position={11 # 'PanTilt': {'x': pan, 'y': tilt}10 # })98 sleep(2)76 # Relative move functions --NO ERRORS BUT CAMERA DOES NOT MOVE5 def move_relative(self, pan, tilt, velocity):4 # self.requestr.Translation.PanTilt._x = pan3 # self.requestr.Translation.PanTilt._y = tilt2 # self.requestr.Speed.PanTilt._x = velocity1 # ret = self.requestr.Speed.PanTilt._y = velocity101 # self.ptz.RelativeMove(self.requestr)12 self.ptz.RelativeMove({3 'ProfileToken': self.media_profile.token,4 'Translation': {'PanTilt': {'x': pan, 'y': tilt}}5 })6 sleep(2.0)46 def zoom(self, zoom, timeout):45 pass44 # self.requestc.Velocity.Zoom._x = velocity43 # self.perform_move(timeout)42 self.ptz.ContinuousMove({41 'ProfileToken': self.media_profile.token,40 'Velocity': {39 'PanTilt': {'x': 0, 'y': 0},38 'Zoom': {'x': zoom}37 }36 })3534 sleep(timeout)33 self.stop()32313029 def move_y(self, val, timeout):28 # self.requestc.Velocity.PanTilt._x = 0.027 # self.requestc.Velocity.PanTilt._y = velocity26 self.perform_move(timeout,x=0.0,y=val)2524 def move_x(self, val, timeout):23 # self.requestc.Velocity.PanTilt._x = velocity22 # self.requestc.Velocity.PanTilt._y = 0.021 self.perform_move(timeout,x=val,y=0.0)20191817 #Sets preset set, query and and go to16 def set_preset(self, name):15 self.requestp.PresetName = name14 self.requestp.PresetToken = '1'13 self.preset = self.ptz.SetPreset(self.requestp) #returns the PresetToken1211 def get_preset(self):10 presets = self.ptz.GetPresets({'ProfileToken': self.media_profile.token})9 print("presets:",presets)87 def goto_preset(self, name):6 try:5 self.ptz.GotoPreset(4 {'ProfileToken': self.media_profile.token, "PresetToken": name}) # 移動到指定預置點位置3 except Exception as e:2 print("云臺控制失敗:%s"%str(e))11551
應用就是:?
from mypackage import client1 from mypackage import ptz2 import os34 def test_ptz(client : client.Client):5 wang_ptz = ptz.PTZ(client)67 while True:8 # zoom in9 # ptz.zoom(1.0, 2)10 # zoom out11 print("開始縮小")12 wang_ptz.zoom(-1.0, 2)13 wang_ptz.zoom(-1.0, 2)14 wang_ptz.zoom(-1.0, 2)1516 print("開始放大")17 wang_ptz.zoom(1.0, 2)1819 # # move down20 # ptz.move_x(val=-1.0, timeout=2)21 #22 # time.sleep(10)23 # ptz.move_x(val=1.0, timeout=2)24 # time.sleep(10)25 #26 # exit(0)27 # # Set preset28 # # ptz.move_x(x=1.0, timeout=1)29 # # ptz.set_preset('home')30 #31 # # move right -- (velocity, duration of move)32 # ptz.move_x(val=1.0, timeout=2)33 #34 # # move left35 # ptz.move_x(val=-1.0, timeout=2)36 #37 # # move down38 # ptz.move_y(val=-1.0, timeout=2)39 #40 # # Move up41 # ptz.move_y(val=1.0, timeout=2)42 #43 #44 #45 # # Absolute pan-tilt (pan position, tilt position, velocity)46 # # DOES NOT RESULT IN CAMERA MOVEMENT47 # ptz.move_absolute(x=-1.0, y=1.0, velocity=1.0)48 # ptz.move_absolute(x=1.0, y=-1.0, velocity=1.0)49 #50 # # Relative move (pan increment, tilt increment, velocity)51 # # DOES NOT RESULT IN CAMERA MOVEMENT52 # # ptz.move_relative(0.5, 0.5, 8.0)
#47 # # Get presets46 # ptz.get_preset()45 # # Go back to preset44 # ptz.goto_preset('home')434241 def test_client():4039 #密碼+賬戶 任意38 wang_client = client.Client('192.168.1.46', 'admin', 'Admin123')3736 # rtsp://admin:Admin123@192.168.1.145/live/0/MAIN35 # rtsp://admin:Admin123@192.168.1.145/live/0/SUB343332 if not wang_client.connect():31 exit(0)30 #29 # root_dir = os.path.dirname(os.path.abspath(__file__))28 # wang_client.Snapshot(file_dir=os.path.join(root_dir,"data"))2726 streamUri = wang_client.wang_GetStreamUri()25 print(streamUri)24 profiles = wang_client.GetProfiles()23 # print(profiles)22 osds = wang_client.GetOSDs()21 # print(osds)20 info = wang_client.GetDeviceInformation()19 # print(info)18171615 videoSourceConfig = wang_client.GetVideoSourceConfigurations()14 # print(videoSourceConfig)13 encoderConfig1 = wang_client.GetVideoEncoderConfigurations()12 #print(encoderConfig1)11 wang_client.SetVideoEncoderConfiguration()10 encoderConfig2 = wang_client.GetVideoEncoderConfigurations()9 #print(encoderConfig2)87 #print("end")654 if __name__ == '__main__':3 print("main.py")2 print("ONVIFClientManager")1102 #test_client()12 wang_client = client.Client('192.168.1.46', 'admin', 'Admin123')3 if not wang_client.connect():4 exit(0)5 test_ptz(wang_client)
但是目前沒有測試。因為我的攝像頭不支持 ptz ,運行的話會報錯。
暫時先擱置。
然后就是將代碼 上傳gitee.