目錄
qwen 視頻理解能力
messages 構建 demo
qwen 抽幀代碼分析
驗證兩個實際 case
官網介紹圖
性能對比:ffmpeg 抽幀、decord 庫抽幀
介紹
聯系
對比
測試結果
測試明細
ffmpeg
100 qps 測試(CPU)
decord
100 qps 測試(CPU)
100 qps 測試(GPU)
本文也錄制了詳細的視頻講解:
[IT超016] 大模型:源碼分析Qwen2.5VL視頻抽幀模塊(附加FFmpeg性能對比測試)_嗶哩嗶哩_bilibili
qwen 視頻理解能力
Qwen2.5-VL 是由阿里云 Qwen 團隊開發的多模態大型語言模型系列,倉庫地址:https://github.com/QwenLM/Qwen2.5-VL
messages 構建 demo
# 方式一:輸入視頻文件(這種才會走抽幀邏輯)
messages = [{"role": "user","content": [{"type": "video","video": "file:///path/to/video1.mp4","max_pixels": 360 * 420,"fps": 1.0,},{"type": "text", "text": "Describe this video."},],}
]# 方式二:直接輸入多圖
messages = [{"role": "user","content": [{"type": "video","video": ["file:///path/to/frame1.jpg","file:///path/to/frame2.jpg","file:///path/to/frame3.jpg","file:///path/to/frame4.jpg",],},{"type": "text", "text": "Describe this video."},],}
]
qwen 抽幀代碼分析
視頻 message 處理的核心代碼:https://github.com/QwenLM/Qwen2.5-VL/blob/main/qwen-vl-utils/src/qwen_vl_utils/vision_process.py
視頻解析抽幀能力依賴——decord 庫:https://github.com/dmlc/decord?tab=readme-ov-file#install-from-source
Decord 是一個專門為視頻數據處理和深度學習設計的輕量級、高性能的視頻解碼庫,擅長處理幀的隨機訪問模式,避免了像 FFmpeg 那樣從頭開始逐幀解碼——Qwen 抽幀模塊用的這個庫
?
1、vision_process.py # process_vision_info:處理 messages 中的 圖像 / 視頻 輸入
?
2、vision_process.py # fetch_video:處理 messages 中的 視頻 輸入
?
🚩 核心抽幀邏輯(簡單來說:0.5 秒抽一張)(30 秒視頻,最終抽幀是 60 張)
根據 FPS 常數(默認 2)、視頻總幀數(秒時長 * 原始 fps)計算抽幀數量 nframes,之后等距離/步長抽取 nframes 張幀圖
- 可解析總幀數 = 秒時長 * 原始 fps ———— 例如 30s 視頻,24 fps,值為 720
- 抽幀數 = FPS 常數(默認 2)* 秒時長 ————例如 30s 視頻,值為 60張
- 抽幀步長:默認 0.5 秒————(等距步長平均分割,和 FPS 常數有關)
?
?
驗證兩個實際 case
錄屏設置:24 fps
方案 | 30秒 的視頻 | 1分30秒 的視頻 |
qwen 抽幀 | 加入日志
... | 加入日志
... |
?
摘出 qwen 2.5-vl 抽幀模塊代碼(增加自定義日志)
import torch
import time
import math
from typing import Tuple
from torchvision.utils import save_imageIMAGE_FACTOR = 28
MIN_PIXELS = 4 * 28 * 28
MAX_PIXELS = 16384 * 28 * 28
MAX_RATIO = 200VIDEO_MIN_PIXELS = 128 * 28 * 28
VIDEO_MAX_PIXELS = 768 * 28 * 28
FRAME_FACTOR = 2
FPS = 2.0
FPS_MIN_FRAMES = 4
FPS_MAX_FRAMES = 768def _read_video_decord(ele: dict,
) -> Tuple[torch.Tensor, float]:"""read video using decord.VideoReaderArgs:ele (dict): a dict contains the configuration of video.support keys:- video: the path of video. support "file://", "http://", "https://" and local path.- video_start: the start time of video.- video_end: the end time of video.Returns:torch.Tensor: the video tensor with shape (T, C, H, W)."""import decordvideo_path = ele["video"]st = time.time()vr = decord.VideoReader(video_path)# TODO: support start_pts and end_ptsif 'video_start' in ele or 'video_end' in ele:raise NotImplementedError("not support start_pts and end_pts in decord for now.")total_frames, video_fps = len(vr), vr.get_avg_fps()print(f"==11: {video_path=}, {total_frames=}, {video_fps=}, time={time.time() - st:.3f}s\n")nframes = smart_nframes(ele, total_frames=total_frames, video_fps=video_fps)idx = torch.linspace(0, total_frames - 1, nframes).round().long().tolist()video = vr.get_batch(idx).asnumpy()print(f"==22: {nframes=}, {idx=}, {len(idx)=}, video: ")print("Type:", type(video))print("Shape:", video.shape)print("Data Type:", video.dtype)print("Number of dimensions:", video.ndim)print("Number of elements:", video.size)print("Size in bytes:", video.nbytes)print('\n')video = torch.tensor(video).permute(0, 3, 1, 2) # Convert to TCHW formatsample_fps = nframes / max(total_frames, 1e-6) * video_fpsreturn video, sample_fpsdef smart_nframes(ele: dict,total_frames: int,video_fps: float,
) -> int:"""calculate the number of frames for video used for model inputs.Args:ele (dict): a dict contains the configuration of video.support either `fps` or `nframes`:- nframes: the number of frames to extract for model inputs.- fps: the fps to extract frames for model inputs.- min_frames: the minimum number of frames of the video, only used when fps is provided.- max_frames: the maximum number of frames of the video, only used when fps is provided.total_frames (int): the original total number of frames of the video.video_fps (int | float): the original fps of the video.Raises:ValueError: nframes should in interval [FRAME_FACTOR, total_frames].Returns:int: the number of frames for video used for model inputs."""assert not ("fps" in ele and "nframes" in ele), "Only accept either `fps` or `nframes`"if "nframes" in ele:nframes = round_by_factor(ele["nframes"], FRAME_FACTOR)else:fps = ele.get("fps", FPS)min_frames = ceil_by_factor(ele.get("min_frames", FPS_MIN_FRAMES), FRAME_FACTOR)max_frames = floor_by_factor(ele.get("max_frames", min(FPS_MAX_FRAMES, total_frames)), FRAME_FACTOR)nframes = total_frames / video_fps * fpsif nframes > total_frames:print(f"smart_nframes: nframes[{nframes}] > total_frames[{total_frames}]")nframes = min(min(max(nframes, min_frames), max_frames), total_frames)nframes = floor_by_factor(nframes, FRAME_FACTOR)if not (FRAME_FACTOR <= nframes and nframes <= total_frames):raise ValueError(f"nframes should in interval [{FRAME_FACTOR}, {total_frames}], but got {nframes}.")return nframesdef round_by_factor(number: int, factor: int) -> int:"""Returns the closest integer to 'number' that is divisible by 'factor'."""return round(number / factor) * factordef ceil_by_factor(number: int, factor: int) -> int:"""Returns the smallest integer greater than or equal to 'number' that is divisible by 'factor'."""return math.ceil(number / factor) * factordef floor_by_factor(number: int, factor: int) -> int:"""Returns the largest integer less than or equal to 'number' that is divisible by 'factor'."""return math.floor(number / factor) * factorvideo, sample_fps = _read_video_decord(ele={"video": "150108580006469_30s.mp4"})
print(f"read_video_decord result: {video=}, {sample_fps=}\n")# 將視頻幀保存為本地圖片
for i, frame in enumerate(video):# 將像素值縮放到 [0, 1] 范圍frame = frame.float() / 255.0save_image(frame, f'./video-frame-30s/frame_{i}.png')# save_image(frame, f'./video-frame-1m30s/frame_{i}.png')print(f"Frame {i} saved as frame_{i}.png")
官網介紹圖
?
性能對比:ffmpeg 抽幀、decord 庫抽幀
介紹
ffmpeg 是一個強大且廣泛使用的開源多媒體框架,它由多個庫和工具組成,例如libavformat(處理音視頻封裝格式)、libavcodec(進行音視頻編解碼)、libavutil(提供通用的工具函數)等。在抽幀時,FFmpeg 會利用這些庫協同工作。
?
Decord 是一個專門為視頻數據處理和深度學習設計的輕量級、高性能的視頻解碼庫,擅長處理幀的隨機訪問模式,避免了像 FFmpeg 那樣從頭開始逐幀解碼
?
聯系
decord 依賴 ffmpeg 的核心庫
?
對比
- 速度:ffmpeg 更優,比 decord 快 16%
- 資源消耗:ffmpeg 更優,比 decord 節省資源 25%(只測了下 cpu 抽幀)
- 官方維護:
-
- ffmpeg 一直在更新,社區也更活躍,48.9k star
- decord 庫已經 3 年未更新,2.1k star
?
測試結果
測試指標:
- 100 batch 視頻文件測試
- 開發機:train-A100-6 配置為 94核 - 860GiB - 4卡A100
| 抽幀方案 | 測試樣本(37秒視頻) video_150108527915923_37s.mp4 | 測試樣本(1分21秒視頻)video_150108525914511_1m21s.mp4 |
現狀 | ffmpeg - cpu | 總耗時: 4.18 秒 平均耗時: 2829.07 毫秒 成功數: 100 失敗數: 0
抽出 37 張 + 寫入磁盤
CPU核數換算——需 7 核(94*6%*120%) | 總耗時: 12.09 秒 平均耗時: 9383.33 毫秒 成功數: 100 失敗數: 0
抽出 81 張 + 寫入磁盤
CPU核數換算——需 23 核(94*22%*120%) |
新方案 | decord - cpu | 總耗時: 5.33 秒 平均耗時: 3352.33 毫秒 成功數: 100 失敗數: 0
抽出 38 張 + 寫入磁盤
CPU核數換算——需 7 核(94*7%*120%) | 總耗時: 15.89 秒 平均耗時: 12617.40 毫秒 成功數: 100 失敗數: 0
抽出 85 張 + 寫入磁盤
CPU核數換算——需 24 核(94*25%*120%) |
| decord - gpu | 環境不兼容 | 環境不兼容 |
?
測試明細
ffmpeg
# 登陸開發機
ffmpeg -i /root/zhoulongchao/script/resource/video_150108527915923_37s.mp4 -map 0:v -q:v 3 -vsync 0 -f image2 -vf fps=1 -y ./%d.jpg
?
100 qps 測試(CPU)
cd /root/zhoulongchao/script/ffmpeg
vim demo-ffmpeg.py
python demo-ffmpeg.py
import subprocess
import time
from concurrent.futures import ThreadPoolExecutordef run_ffmpeg_command():# FFmpeg 命令command = ['ffmpeg', '-i', '/root/zhoulongchao/script/resource/video_150108525914511_1m21s.mp4','-map', '0:v', '-q:v', '3', '-vsync', '0', '-f', 'image2', '-vf', 'fps=1', '-y', './%d.jpg']start_time = time.time()# 運行 FFmpeg 命令result = subprocess.run(command, capture_output=True, text=True)end_time = time.time()# 計算耗時(毫秒)elapsed_time_ms = (end_time - start_time) * 1000print("FFmpeg 輸出:")print(result.stdout)print("錯誤信息:")print(result.stderr)print(f"耗時: {elapsed_time_ms:.2f} 毫秒")# 根據返回碼判斷是否成功success = result.returncode == 0return elapsed_time_ms, successif __name__ == "__main__":# 目標 QPStarget_qps = 100# 每個請求的間隔時間(秒)interval = 1 / target_qpstotal_elapsed_time = 0all_elapsed_times = []success_count = 0failure_count = 0with ThreadPoolExecutor(max_workers=target_qps) as executor:start_time = time.time()futures = []for _ in range(target_qps):future = executor.submit(run_ffmpeg_command)futures.append(future)time.sleep(interval)for future in futures:elapsed_time, success = future.result()all_elapsed_times.append(elapsed_time)total_elapsed_time += elapsed_timeif success:success_count += 1else:failure_count += 1end_time = time.time()average_elapsed_time = total_elapsed_time / target_qps if target_qps > 0 else 0print(f"總耗時: {end_time - start_time:.2f} 秒")print(f"平均耗時: {average_elapsed_time:.2f} 毫秒")print(f"成功數: {success_count}")print(f"失敗數: {failure_count}")
?
decord
cd /root/zhoulongchao/script/decord
vim main.py
python main.py
?
100 qps 測試(CPU)
import cv2
import time
from decord import VideoReader
from decord import cpu
from concurrent.futures import ProcessPoolExecutor
import multiprocessingdef process_video(video_path):start_time = time.time()try:vr = VideoReader(video_path, ctx=cpu(0))fps = vr.get_avg_fps()interval = int(fps)for i in range(0, len(vr), interval):frame = vr[i].asnumpy()frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)cv2.imwrite(f'frame_{i}.jpg', frame_bgr)end_time = time.time()elapsed_time = (end_time - start_time) * 1000return elapsed_time, Trueexcept Exception:end_time = time.time()elapsed_time = (end_time - start_time) * 1000return elapsed_time, Falseif __name__ == "__main__":video_path = '/root/zhoulongchao/script/resource/video_150108527915923_37s.mp4'target_qps = 100interval = 1 / target_qpstotal_elapsed_time = 0success_count = 0failure_count = 0# 獲取系統的 CPU 核心數量cpu_count = multiprocessing.cpu_count()with ProcessPoolExecutor(max_workers=cpu_count) as executor:start_time = time.time()futures = []for _ in range(target_qps):future = executor.submit(process_video, video_path)futures.append(future)time.sleep(interval)for future in futures:elapsed_time, success = future.result()total_elapsed_time += elapsed_timeif success:success_count += 1else:failure_count += 1end_time = time.time()average_elapsed_time = total_elapsed_time / target_qps if target_qps > 0 else 0print(f"總耗時: {end_time - start_time:.2f} 秒")print(f"平均耗時: {average_elapsed_time:.2f} 毫秒")print(f"成功數: {success_count}")print(f"失敗數: {failure_count}")
?
100 qps 測試(GPU)
?》這種方式環境配置失敗了,暫時只寫了一半
1、安裝用于構建共享庫的系統包 Ubuntu 運行:
# official PPA comes with ffmpeg 2.8, which lacks tons of features, we use ffmpeg 4.0 here
add-apt-repository ppa:jonathonf/ffmpeg-4 # for ubuntu20.04 official PPA is already version 4.2, you may skip this step
apt-get update
apt-get install -y build-essential python3-dev python3-setuptools make cmake
apt-get install -y ffmpeg libavcodec-dev libavfilter-dev libavformat-dev libavutil-dev
# note: make sure you have cmake 3.8 or later, you can install from cmake official website if it's too old
?
2、遞歸克隆 repo(重要)
git clone --recursivehttps://github.com/dmlc/decord
?
3、在源根目錄中構建共享庫(指定-DUSE_CUDA=ON或-DUSE_CUDA=/path/to/cuda或-DUSE_CUDA=ON-DCMAKE_CUDA_COMPILER=/path/to/cuda/nvcc啟用 NVDEC 硬件加速解碼)(要指定自定義的 FFMPEG 庫路徑,請使用“-DFFMPEG_DIR=/path/to/ffmpeg”):
cd /root/zhoulongchao/script/decord
cd decord
mkdir build && cd build
cmake .. -DUSE_CUDA=ON -DCMAKE_BUILD_TYPE=Release
make
請注意,如果您遇到了問題libnvcuvid.so,可能是由于缺少鏈接 libnvcuvid.so,可以手動找到它(ldconfig -p | grep libnvcuvid)并將庫鏈接到,CUDA_TOOLKIT_ROOT_DIR\lib64以便decord順利檢測并鏈接正確的庫。或者——
Video Codec SDK - Get Started | NVIDIA Developer
mv libnvcuvid.so /usr/local/cuda/lib64/
?
4、安裝python綁定:
cd ../python
# option 1: add python path to $PYTHONPATH, you will need to install numpy separately
pwd=$PWD
echo "PYTHONPATH=$PYTHONPATH:$pwd" >> ~/.bashrc
source ~/.bashrc
# option 2: install with setuptools
python3 setup.py install --user
?
make
?
import cv2
import time
from decord import VideoReader
from decord import gpu
from concurrent.futures import ProcessPoolExecutor
import multiprocessingdef process_video(video_path):start_time = time.time()try:# 修改為使用 GPU 進行解碼vr = VideoReader(video_path, ctx=gpu(0))fps = vr.get_avg_fps()interval = int(fps)for i in range(0, len(vr), interval):frame = vr[i].asnumpy()frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)cv2.imwrite(f'frame_{i}.jpg', frame_bgr)end_time = time.time()elapsed_time = (end_time - start_time) * 1000return elapsed_time, Trueexcept Exception:end_time = time.time()elapsed_time = (end_time - start_time) * 1000return elapsed_time, Falseif __name__ == "__main__":video_path = '/root/zhoulongchao/script/resource/video_150108527915923_37s.mp4'target_qps = 100interval = 1 / target_qpstotal_elapsed_time = 0success_count = 0failure_count = 0# 獲取系統的 CPU 核心數量cpu_count = multiprocessing.cpu_count()with ProcessPoolExecutor(max_workers=cpu_count) as executor:start_time = time.time()futures = []for _ in range(target_qps):future = executor.submit(process_video, video_path)futures.append(future)time.sleep(interval)for future in futures:elapsed_time, success = future.result()total_elapsed_time += elapsed_timeif success:success_count += 1else:failure_count += 1end_time = time.time()average_elapsed_time = total_elapsed_time / target_qps if target_qps > 0 else 0print(f"總耗時: {end_time - start_time:.2f} 秒")print(f"平均耗時: {average_elapsed_time:.2f} 毫秒")print(f"成功數: {success_count}")print(f"失敗數: {failure_count}")
?