這篇文章將結合之前寫的兩篇文章 無人機實戰系列(一)在局域網內傳輸數據 和 無人機實戰系列(二)本地攝像頭 + Depth-Anything V2 實現了以下功能:
- 本地筆記本攝像頭發布圖像 + 遠程GPU實時處理(無回傳);
- 【異步】本地筆記本攝像頭發布圖像 + 遠程GPU實時處理(回傳至筆記本并展示);
- 【同步】本地筆記本攝像頭發布圖像 + 遠程GPU實時處理(回傳至筆記本并展示);
建議在運行這個demo之前先查看先前的兩篇文章以熟悉 zmq 庫與 Depth-Anything V2 這個模型;
這里之所以提供兩個在是否回傳上的demo是因為你需要根據自己的實際狀況進行選擇,盡管回傳并顯示深度圖能夠更加直觀查看計算結果但你仍然需要平衡以下兩個方面:
- 深度本身體積較大,回傳會占用通訊帶寬;
- 回傳的圖像本地顯示會占用本地算力;
【注意】:這篇文章的代碼需要在 無人機實戰系列(二)本地攝像頭 + Depth-Anything V2 中的文件夾下運行,否則會報錯找不到對應的文件與模型。
本地筆記本攝像頭發布圖像 + 遠程GPU實時處理(無回傳)
這個demo實現了本地筆記本打開攝像頭后將圖像發布出去,遠程GPU服務器接受到圖像后使用 Depth-Anything V2 處理圖像并展示。
本地筆記本發布攝像頭圖像
在下面的代碼中有以下幾點需要注意:
- 設置發布頻率
send_fps
,較低的發布頻率可以讓減少GPU端的壓力; - 設置發布隊列大小
socket.setsockopt(zmq.SNDHWM, 1)
,讓發布隊列始終僅有當前幀畫面,降低帶寬壓力; - 設置僅保存最新消息
socket.setsockopt(zmq.CONFLATE, 1)
,讓發布隊列僅存儲最新的畫面,降低接受端的畫面延遲;
import zmq
import cv2
import timecontext = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5555") # 本地綁定端口socket.setsockopt(zmq.SNDHWM, 1) # 發送隊列大小為1
socket.setsockopt(zmq.CONFLATE, 1) # 僅保存最新消息cap = cv2.VideoCapture(0) # 讀取攝像頭send_fps = 30 # 限制傳輸的fps,降低接受方的處理壓力while True:start_time = time.time()ret, frame = cap.read()if not ret:continue_, buffer = cv2.imencode('.jpg', frame) # 編碼成JPEG格式socket.send(buffer.tobytes()) # 發送圖像數據cv2.imshow("Origin image", frame)if cv2.waitKey(1) & 0xFF == ord('q'):breaktime.sleep(max(1/send_fps - (time.time() - start_time), 0))
運行:
$ python camera_pub.py
GPU 服務器接受端
在下面的代碼中有以下幾點需要注意:
- 綁定發布端地址
socket.connect("tcp://192.168.75.201:5555")
,根據你筆記本的地址進行修改; - 僅接受最新消息
socket.setsockopt(zmq.CONFLATE, 1)
; - 清空舊數據幀
socket.setsockopt(zmq.RCVHWM, 1)
&&socket.poll(1)
;
import argparse
import cv2
import numpy as np
import torch
import time
import zmqfrom depth_anything_v2.dpt import DepthAnythingV2context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://192.168.75.201:5555") # 遠程發布端地址
socket.setsockopt(zmq.SUBSCRIBE, b"")
socket.setsockopt(zmq.CONFLATE, 1) # 僅接受最新消息
socket.setsockopt(zmq.RCVHWM, 1) # 清空舊數據幀if __name__ == '__main__':parser = argparse.ArgumentParser(description='Depth Anything V2')parser.add_argument('--input-size', type=int, default=518)parser.add_argument('--encoder', type=str, default='vits', choices=['vits', 'vitb', 'vitl', 'vitg'])parser.add_argument('--pred-only', action='store_true', help='only display the prediction')parser.add_argument('--grayscale', action='store_true', help='do not apply colorful palette')args = parser.parse_args()DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'model_configs = {'vits': {'encoder': 'vits', 'features': 64, 'out_channels': [48, 96, 192, 384]},'vitb': {'encoder': 'vitb', 'features': 128, 'out_channels': [96, 192, 384, 768]},'vitl': {'encoder': 'vitl', 'features': 256, 'out_channels': [256, 512, 1024, 1024]},'vitg': {'encoder': 'vitg', 'features': 384, 'out_channels': [1536, 1536, 1536, 1536]}}depth_anything = DepthAnythingV2(**model_configs[args.encoder])depth_anything.load_state_dict(torch.load(f'./models/depth_anything_v2_{args.encoder}.pth', map_location='cpu'))depth_anything = depth_anything.to(DEVICE).eval()margin_width = 50while True:start_time = time.time()# **優化 1: ZMQ 數據接收**try:while socket.poll(1): # 嘗試不斷讀取新數據,丟棄舊數據msg = socket.recv(zmq.NOBLOCK)zmq_time = time.time()# **優化 2: OpenCV 解碼**raw_frame = cv2.imdecode(np.frombuffer(msg, dtype=np.uint8), 1)decode_time = time.time()# **優化 3: 模型推理**with torch.no_grad():depth = depth_anything.infer_image(raw_frame, args.input_size)infer_time = time.time()# **優化 4: 歸一化 + OpenCV 偽彩色映射**depth = ((depth - depth.min()) / (depth.max() - depth.min()) * 255).astype(np.uint8)if args.grayscale:depth = np.repeat(depth[..., np.newaxis], 3, axis=-1)else:depth = cv2.applyColorMap(depth, cv2.COLORMAP_JET)process_time = time.time()# **優化 5: 合并圖像**split_region = np.ones((raw_frame.shape[0], margin_width, 3), dtype=np.uint8) * 255combined_frame = cv2.hconcat([raw_frame, split_region, depth])cv2.imshow('Raw Frame and Depth Prediction', combined_frame)if cv2.waitKey(1) & 0xFF == ord('q'):breakprint(f"[{args.encoder}] Frame cost time: {time.time() - start_time:.4f} s")print(f" ZMQ receive: {zmq_time - start_time:.4f} s")print(f" Decode: {decode_time - zmq_time:.4f} s")print(f" Inference: {infer_time - decode_time:.4f} s")print(f" Processing: {process_time - infer_time:.4f} s")except zmq.Again:print("No msg received, skip...")continue # 沒有消息就跳過cv2.destroyAllWindows()
運行:
$ python camera_recv.py
【異步】本地筆記本攝像頭發布圖像 + 遠程GPU實時處理(回傳至筆記本并展示)
和上面的代碼基本一致,只不過在發送與接收端都增加了一個收發對象,通常情況下使用異步方式處理收發因為可以避免一端服務來不及處理而導致另一端持續等待。
本地筆記本發布攝像頭圖像
import zmq
import cv2
import numpy as np
import timecontext = zmq.Context()# 發布原始數據
pub_socket = context.socket(zmq.PUB)
pub_socket.bind("tcp://*:5555") # 發布數據# 接收處理結果
pull_socket = context.socket(zmq.PULL)
pull_socket.bind("tcp://*:5556") # 監聽處理方返回數據send_fps = 30cap = cv2.VideoCapture(0)while True:start_time = time.time()ret, frame = cap.read()if not ret:continue# [可選] 圖像降采樣frame = cv2.pyrDown(frame)frame = cv2.pyrDown(frame)_, buffer = cv2.imencode('.jpg', frame) # 壓縮圖像pub_socket.send(buffer.tobytes()) # 發布數據# 非阻塞接收處理結果try:processed_data = pull_socket.recv(zmq.NOBLOCK)processed_frame = cv2.imdecode(np.frombuffer(processed_data, dtype=np.uint8), 1)except zmq.Again:print("No image received, continue...")continuecv2.imshow("Processed Frame", processed_frame)if cv2.waitKey(1) & 0xFF == ord('q'):breaktime.sleep(max(1/send_fps - (time.time() - start_time), 0))cv2.destroyAllWindows()
運行:
$ python camera_pub_async.py
GPU 服務器接受端(異步)
import argparse
import cv2
import numpy as np
import torch
import time
import zmqfrom depth_anything_v2.dpt import DepthAnythingV2context = zmq.Context()
sub_socket = context.socket(zmq.SUB)
sub_socket.connect("tcp://192.168.75.201:5555")
sub_socket.setsockopt(zmq.SUBSCRIBE, b"")
sub_socket.setsockopt(zmq.CONFLATE, 1) # 僅接受最新消息
sub_socket.setsockopt(zmq.RCVHWM, 1) # 清空舊數據幀# 發送處理結果
push_socket = context.socket(zmq.PUSH)
push_socket.connect("tcp://192.168.75.201:5556")if __name__ == '__main__':parser = argparse.ArgumentParser(description='Depth Anything V2')parser.add_argument('--input-size', type=int, default=518)parser.add_argument('--encoder', type=str, default='vits', choices=['vits', 'vitb', 'vitl', 'vitg'])parser.add_argument('--pred-only', action='store_true', help='only display the prediction')parser.add_argument('--grayscale', action='store_true', help='do not apply colorful palette')args = parser.parse_args()DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'model_configs = {'vits': {'encoder': 'vits', 'features': 64, 'out_channels': [48, 96, 192, 384]},'vitb': {'encoder': 'vitb', 'features': 128, 'out_channels': [96, 192, 384, 768]},'vitl': {'encoder': 'vitl', 'features': 256, 'out_channels': [256, 512, 1024, 1024]},'vitg': {'encoder': 'vitg', 'features': 384, 'out_channels': [1536, 1536, 1536, 1536]}}depth_anything = DepthAnythingV2(**model_configs[args.encoder])depth_anything.load_state_dict(torch.load(f'./models/depth_anything_v2_{args.encoder}.pth', map_location='cpu'))depth_anything = depth_anything.to(DEVICE).eval()margin_width = 50while True:start_time = time.time()# **優化 1: ZMQ 數據接收**try:while sub_socket.poll(1): # 嘗試不斷讀取新數據,丟棄舊數據msg = sub_socket.recv(zmq.NOBLOCK)msg = sub_socket.recv()zmq_time = time.time()# **優化 2: OpenCV 解碼**raw_frame = cv2.imdecode(np.frombuffer(msg, dtype=np.uint8), 1)decode_time = time.time()# **優化 3: 模型推理**with torch.no_grad():depth = depth_anything.infer_image(raw_frame, args.input_size)infer_time = time.time()# **優化 4: 歸一化 + OpenCV 偽彩色映射**depth = ((depth - depth.min()) / (depth.max() - depth.min()) * 255).astype(np.uint8)if args.grayscale:depth = np.repeat(depth[..., np.newaxis], 3, axis=-1)else:depth = cv2.applyColorMap(depth, cv2.COLORMAP_JET)process_time = time.time()# **優化 5: 合并圖像**split_region = np.ones((raw_frame.shape[0], margin_width, 3), dtype=np.uint8) * 255combined_frame = cv2.hconcat([raw_frame, split_region, depth])cv2.imshow('Raw Frame and Depth Prediction', combined_frame)if cv2.waitKey(1) & 0xFF == ord('q'):breakprint(f"[{args.encoder}] Frame cost time: {time.time() - start_time:.4f} s")print(f" ZMQ receive: {zmq_time - start_time:.4f} s")print(f" Decode: {decode_time - zmq_time:.4f} s")print(f" Inference: {infer_time - decode_time:.4f} s")print(f" Processing: {process_time - infer_time:.4f} s")_, buffer = cv2.imencode('.jpg', combined_frame)push_socket.send(buffer.tobytes()) # 發送回處理結果except zmq.Again:print("No msg received, skip...")continue # 沒有消息就跳過cv2.destroyAllWindows()
運行:
$ python camera_recv_async.py
【同步】本地筆記本攝像頭發布圖像 + 遠程GPU實時處理(回傳至筆記本并展示)
通常情況下這種視頻流的傳遞不會考慮同步方式,因為這需要發布方與接收端保持一致,對網絡穩定性有較高的要求。
本地筆記本發布攝像頭圖像
這個demo需要注意以下幾點:
- 設置發送端為請求響應模式
context.socket(zmq.REQ)
; - 阻塞等待服務器回傳數據
pub_socket.recv()
;
import zmq
import cv2
import numpy as np
import timecontext = zmq.Context()# 發布原始數據
pub_socket = context.socket(zmq.REQ) # 使用請求響應模式
pub_socket.bind("tcp://*:5555") # 發布數據send_fps = 30
cap = cv2.VideoCapture(0)while True:start_time = time.time()ret, frame = cap.read()if not ret:continue# [可選] 圖像降采樣frame = cv2.pyrDown(frame)frame = cv2.pyrDown(frame)try:_, buffer = cv2.imencode('.jpg', frame) # 壓縮圖像pub_socket.send(buffer.tobytes()) # 發布數據print("Waitting for server processed.")processed_data = pub_socket.recv()processed_frame = cv2.imdecode(np.frombuffer(processed_data, dtype=np.uint8), 1)except zmq.Again:print("No image received, continue...")continuecv2.imshow("Processed Frame", processed_frame)if cv2.waitKey(1) & 0xFF == ord('q'):breaktime.sleep(max(1/send_fps - (time.time() - start_time), 0))cv2.destroyAllWindows()
運行:
$ python camera_pub_sync.py
GPU 服務器接受端
這個demo需要注意以下幾點:
- 設置接受端為請求響應模式
context.socket(zmq.REP)
; - 阻塞接受發布端數據
sub_socket.recv()
; - 將處理好的數據進行同步回傳
sub_socket.send(buffer.tobytes())
;
import argparse
import cv2
import numpy as np
import torch
import time
import zmqfrom depth_anything_v2.dpt import DepthAnythingV2context = zmq.Context()
sub_socket = context.socket(zmq.REP)
sub_socket.connect("tcp://192.168.75.201:5555")if __name__ == '__main__':parser = argparse.ArgumentParser(description='Depth Anything V2')parser.add_argument('--input-size', type=int, default=518)parser.add_argument('--encoder', type=str, default='vits', choices=['vits', 'vitb', 'vitl', 'vitg'])parser.add_argument('--pred-only', action='store_true', help='only display the prediction')parser.add_argument('--grayscale', action='store_true', help='do not apply colorful palette')args = parser.parse_args()DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'model_configs = {'vits': {'encoder': 'vits', 'features': 64, 'out_channels': [48, 96, 192, 384]},'vitb': {'encoder': 'vitb', 'features': 128, 'out_channels': [96, 192, 384, 768]},'vitl': {'encoder': 'vitl', 'features': 256, 'out_channels': [256, 512, 1024, 1024]},'vitg': {'encoder': 'vitg', 'features': 384, 'out_channels': [1536, 1536, 1536, 1536]}}depth_anything = DepthAnythingV2(**model_configs[args.encoder])depth_anything.load_state_dict(torch.load(f'./models/depth_anything_v2_{args.encoder}.pth', map_location='cpu'))depth_anything = depth_anything.to(DEVICE).eval()margin_width = 50while True:start_time = time.time()# **優化 1: ZMQ 數據接收**try:msg = sub_socket.recv()zmq_time = time.time()# **優化 2: OpenCV 解碼**raw_frame = cv2.imdecode(np.frombuffer(msg, dtype=np.uint8), 1)decode_time = time.time()# **優化 3: 模型推理**with torch.no_grad():depth = depth_anything.infer_image(raw_frame, args.input_size)infer_time = time.time()# **優化 4: 歸一化 + OpenCV 偽彩色映射**depth = ((depth - depth.min()) / (depth.max() - depth.min()) * 255).astype(np.uint8)if args.grayscale:depth = np.repeat(depth[..., np.newaxis], 3, axis=-1)else:depth = cv2.applyColorMap(depth, cv2.COLORMAP_JET)process_time = time.time()# **優化 5: 合并圖像**split_region = np.ones((raw_frame.shape[0], margin_width, 3), dtype=np.uint8) * 255combined_frame = cv2.hconcat([raw_frame, split_region, depth])cv2.imshow('Raw Frame and Depth Prediction', combined_frame)if cv2.waitKey(1) & 0xFF == ord('q'):breakprint(f"[{args.encoder}] Frame cost time: {time.time() - start_time:.4f} s")print(f" ZMQ receive: {zmq_time - start_time:.4f} s")print(f" Decode: {decode_time - zmq_time:.4f} s")print(f" Inference: {infer_time - decode_time:.4f} s")print(f" Processing: {process_time - infer_time:.4f} s")_, buffer = cv2.imencode('.jpg', combined_frame)sub_socket.send(buffer.tobytes()) # 發送回處理結果except zmq.Again:print("No msg received, skip...")continue # 沒有消息就跳過cv2.destroyAllWindows()
運行:
$ python camera_recv_sync.py