多智能體協作的自動化視頻輿情分析報告生成器
1. 項目的意義與價值
從“非結構化視頻”中挖掘“結構化洞察”的通用挑戰
在當今的數字生態中,視頻已成為信息傳播、知識分享和消費者意見表達的核心媒介。從企業內部的會議錄屏、技術培訓,到外部的市場宣傳、用戶評測,海量視頻內容正以前所未有的速度被創造出來。然而,對于絕大多數組織而言,這個龐大的視頻庫仍然是“暗數據”——蘊含著無盡的價值,卻因其非結構化的特性,而難以被高效地檢索、分析和利用。
人工觀看并提煉視頻信息的方法,成本高昂、效率低下且無法規模化,這構成了企業在智能化轉型中面臨的普遍瓶頸。
本項目旨在解決這一通用挑戰。構建了一個由多個AI智能體(Agent)協作的自動化工作流,該系統如同一支永不疲倦的虛擬分析師團隊,能夠:
- 規模化處理: 自動消化并理解海量的視頻源。
- 深度多模態理解: 同時“看懂”視頻畫面并“聽懂”語音內容,實現跨模態的信息融合。
- 智能提煉: 將線性的、非結構化的視頻信息,轉化為結構化的、可量化的數據資產。
以汽車行業為例:一個高價值的垂域應用場景
上述挑戰在那些產品復雜、競爭激烈、且高度依賴市場反饋的行業中尤為突出。為了具體展示本系統的強大能力,我們將聚焦于一個典型的高價值應用場景——新款汽車的市場輿情分析。
當一家車企發布一款新車后,其成敗往往在最初的“黃金72小時”就已初見端倪。市場的真實聲音,就分散在YouTube、B站等平臺上成百上千個KOL(關鍵意見領袖)的深度評-
測視頻中。車企高層迫切需要知道:
- 市場最關注我們新車的哪些核心特性?
- 在這些特性上,主流觀點是正面還是負面?
- 評測中,大家都在拿我們的車和哪些競品做對比?
- 我們真正的優勢和短板是什么?
本Jupyter Notebook將完整地、端到端地實現一個智能體系統來回答這些問題。我們將演示,這個系統如何自動處理多個關于同一款新車的評測視頻,并通過“探索-分析-策略”的多階段智能體協作,最終生成一份包含量化數據和深度洞察的專業級Markdown輿情洞察報告。
這不僅是一個技術演示,更是一個未來商業智能工作模式的縮影——將AI智能體應用于垂直領域,從而在海量信息中贏得決策先機。
2. 環境準備:安裝所有必需的庫
# 運行此單元格以安裝所有依賴項
!pip install agno baidu-aip openai opencv-python-headless moviepy pydub yt-dlp
3. 導入核心庫
import os
import json
import base64
import re
import logging
from typing import Dict, List, Iterator
from concurrent.futures import ThreadPoolExecutor, as_completed# 框架與核心工具
from agno.agent import Agent, RunResponse
from agno.workflow import Workflow
from agno.models.openai import OpenAIChat
from aip import AipSpeech
import cv2
from moviepy.editor import VideoFileClip
from pydub import AudioSegment
import yt_dlp
from openai import OpenAI
from tqdm.notebook import tqdm
from IPython.display import display, Markdown
4. 🚀 配置中心
百度智能云的語音識別相關配置需登錄/申請使用,文心多模態大模型在星河社區首頁點擊即送~
class Config:# --- 百度智能云 API 配置 ---BAIDU_APP_ID = "YOUR_BAIDU_APP_ID"BAIDU_API_KEY = "YOUR_BAIDU_API_KEY"BAIDU_SECRET_KEY = "YOUR_BAIDU_SECRET_KEY"# --- 文心多模態大模型 API 配置 ---# ERNIE_MODEL_NAME可選ernie-4.5-turbo-vl或ernie-4.5-vl-28b-a3bERNIE_API_KEY = "YOUR_ERNIE_API_KEY"ERNIE_BASE_URL = "https://aistudio.baidu.com/llm/lmapi/v3" ERNIE_MODEL_NAME = "ernie-4.5-turbo-vl" # --- 視頻源配置 (請提供1個或多個YouTube評測視頻鏈接) ---VIDEO_URLS = ["https://www.youtube.com/watch?v=l-SSk-fuNl8", "https://www.youtube.com/watch?v=YJ_Qs-zziMc"]# --- 工作目錄與分析參數 ---WORKING_DIRECTORY = "./auto_market_research"CHUNK_DURATION_SECONDS = 30LANGUAGE_MAP = {"普通話": 1537, "英語": 1737, "粵語": 1637, "四川話": 1837}SELECTED_LANGUAGE = "普通話"# --- 并發與重試 ---MAX_CONCURRENCY = 4MAX_RETRIES = 3# --- 日志與環境設置 ---
os.makedirs(Config.WORKING_DIRECTORY, exist_ok=True)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("VideoAnalysisSystem")
5. 輔助工具與預處理模塊
包含為Agent工作流準備數據的所有底層函數。它是一個確定性的數據處理管道,負責將視頻URL轉換為結構化的分析日志。
def extract_json_from_response(text: str) -> dict or list:"""從LLM的返回文本中穩健地提取JSON對象或列表。"""if not text or not isinstance(text, str): return {}match = re.search(r'```(?:json)?\s*(\{.*\}|\[.*\])\s*```', text, re.DOTALL)if match: json_str = match.group(1)else:match = re.search(r'(\{.*\}|\[.*\])', text, re.DOTALL)if not match: return {}json_str = match.group(0)try:return json.loads(json_str)except json.JSONDecodeError: return {}def download_video_with_yt_dlp(video_url: str, output_dir: str, video_id: str) -> str:"""使用 yt-dlp 下載視頻,限制分辨率為最高720p以加快處理速度。"""full_video_path = os.path.join(output_dir, f"{video_id}.mp4")ydl_opts = {'format': 'bestvideo[ext=mp4][height<=720]+bestaudio[ext=m4a]/best[ext=mp4][height<=720]/best','outtmpl': full_video_path, 'quiet': True, 'noplaylist': True, 'retries': 3,}try:with yt_dlp.YoutubeDL(ydl_opts) as ydl:ydl.download([video_url])return full_video_path if os.path.exists(full_video_path) else Noneexcept Exception as e:logger.error(f"yt-dlp 下載出錯 for {video_url}: {e}")return Nonedef run_audio_transcription(video_path: str, config: Config) -> List[Dict]:"""從視頻中提取音頻并進行轉錄。"""temp_audio_path = os.path.join(config.WORKING_DIRECTORY, f"temp_audio_{os.path.basename(video_path)}.wav")try:with VideoFileClip(video_path) as video:video.audio.write_audiofile(temp_audio_path, codec='pcm_s16le', fps=16000, logger=None)full_audio = AudioSegment.from_wav(temp_audio_path).set_channels(1)asr_client = AipSpeech(config.BAIDU_APP_ID, config.BAIDU_API_KEY, config.BAIDU_SECRET_KEY)dev_pid = config.LANGUAGE_MAP.get(config.SELECTED_LANGUAGE, 1537)tasks = [{"start_time": i / 1000.0, "data": full_audio[i:i + config.CHUNK_DURATION_SECONDS * 1000].raw_data}for i in range(0, len(full_audio), config.CHUNK_DURATION_SECONDS * 1000)if len(full_audio[i:i + config.CHUNK_DURATION_SECONDS * 1000].raw_data) > 1000]transcripts = []with ThreadPoolExecutor(max_workers=config.MAX_CONCURRENCY) as executor:future_to_task = {executor.submit(asr_client.asr, task["data"], 'pcm', 16000, {'dev_pid': dev_pid}): task for task in tasks}for future in tqdm(as_completed(future_to_task), total=len(tasks), desc=f"音頻轉錄 ({os.path.basename(video_path)})"):result = future.result()if result and result.get("err_no") == 0 and result.get("result"):task_info = future_to_task[future]transcripts.append({"start_time": task_info["start_time"], "end_time": task_info["start_time"] + config.CHUNK_DURATION_SECONDS, "text": "".join(result["result"])})return sorted(transcripts, key=lambda x: x['start_time'])finally:if os.path.exists(temp_audio_path): os.remove(temp_audio_path)def run_vision_analysis(video_path: str, transcripts: List[Dict], config: Config) -> List[Dict]:"""對視頻幀進行分塊分析。"""vision_client = OpenAI(api_key=config.ERNIE_API_KEY, base_url=config.ERNIE_BASE_URL)cap = cv2.VideoCapture(video_path)duration = cap.get(cv2.CAP_PROP_FRAME_COUNT) / cap.get(cv2.CAP_PROP_FPS)tasks = []for start_s in range(0, int(duration), config.CHUNK_DURATION_SECONDS):end_s = min(start_s + config.CHUNK_DURATION_SECONDS, duration)frames_b64 = []for i in range(int(end_s - start_s)):cap.set(cv2.CAP_PROP_POS_MSEC, (start_s + i) * 1000)ret, frame = cap.read()if ret:_, buffer = cv2.imencode('.jpg', frame, [int(cv2.IMWRITE_JPEG_QUALITY), 70])frames_b64.append(base64.b64encode(buffer).decode('utf-8'))if frames_b64:context_transcript = " ".join([t['text'] for t in transcripts if max(start_s, t['start_time']) < min(end_s, t['end_time'])])tasks.append({"start_time": start_s, "end_time": end_s, "frames": frames_b64, "transcript": context_transcript})cap.release()vision_analysis = []with ThreadPoolExecutor(max_workers=config.MAX_CONCURRENCY) as executor:future_to_task = {}for task in tasks:prompt = f"你是一個視頻分析AI。請結合語音內容“{task['transcript']}”和以下圖像幀,用一段話詳細描述從{int(task['start_time'])}秒到{int(task['end_time'])}秒的視頻場景和核心事件。"content_list = [{"type": "text", "text": prompt}] + [{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{f}"}} for f in task['frames']]messages = [{"role": "user", "content": content_list}]future = executor.submit(vision_client.chat.completions.create, model=config.ERNIE_MODEL_NAME, messages=messages, temperature=0.1, max_tokens=2048)future_to_task[future] = taskfor future in tqdm(as_completed(future_to_task), total=len(tasks), desc=f"視覺分析 ({os.path.basename(video_path)})"):response = future.result()task_info = future_to_task[future]vision_analysis.append({"start_time": task_info['start_time'], "end_time": task_info['end_time'], "description": response.choices[0].message.content.strip()})return sorted(vision_analysis, key=lambda x: x['start_time'])def batch_preprocess_videos(config: Config) -> Dict[str, str]:"""批量預處理視頻,并增加緩存機制,跳過已處理的視頻。"""video_log_paths = {}for url in config.VIDEO_URLS:try:video_id = url.split("=")[-1].split("&")[0]log_path = os.path.join(config.WORKING_DIRECTORY, f"log_{video_id}.json")if os.path.exists(log_path):logger.info(f"? [緩存] 發現已處理的日志,跳過預處理: {video_id}")video_log_paths[video_id] = log_pathcontinuelogger.info(f"--- 開始預處理新視頻: {video_id} ---")video_path = download_video_with_yt_dlp(url, config.WORKING_DIRECTORY, video_id)if not video_path: continuetranscripts = run_audio_transcription(video_path, config)vision_analysis = run_vision_analysis(video_path, transcripts, config)analysis_log = [{"start_time": v['start_time'], "end_time": v['end_time'], "transcript": " ".join([t['text'] for t in transcripts if max(v['start_time'], t['start_time']) < min(v['end_time'], t['end_time'])]), "vision_description": v['description']} for v in vision_analysis]with open(log_path, 'w', encoding='utf-8') as f:json.dump(analysis_log, f, indent=2, ensure_ascii=False)video_log_paths[video_id] = log_pathlogger.info(f"? 視頻 {video_id} 預處理完成 -> {log_path}")except Exception as e:logger.error(f"? 視頻 {url} 預處理失敗: {e}", exc_info=True)return video_log_paths
6. 🤖 Agent 工作流定義
這是項目的核心大腦,定義了三個智能體角色(探索者、分析師、策略師)以及驅動它們的agno工作流。
def create_llm(config: Config, temperature: float = 0.2):"""創建配置好的LLM實例"""return OpenAIChat(id=config.ERNIE_MODEL_NAME, api_key=config.ERNIE_API_KEY, base_url=config.ERNIE_BASE_URL,temperature=temperature)class AutoMarketResearchWorkflow(Workflow):def __init__(self, config: Config, video_log_paths: Dict[str, str]):super().__init__()self.config = configself.video_logs = {vid: json.load(open(path, 'r', encoding='utf-8')) for vid, path in video_log_paths.items()}def run(self) -> Iterator[RunResponse]:# --- 階段1: 探索 Agent (自動發現關鍵維度) ---yield RunResponse(content="--- **階段1: 探索Agent** 正在自動發現核心分析維度 ---\n")explorer_agent = Agent(model=create_llm(self.config, temperature=0.0),instructions=["你是一個市場洞察專家,任務是從多個視頻的分析日志中,自動歸納出被反復討論的【產品核心特性】和被明確提及的【競爭對手】。你的輸出必須是一個格式正確的JSON對象,包含`key_features`和`competitors`兩個列表。"])all_logs_text = json.dumps(self.video_logs, ensure_ascii=False, indent=2)response = explorer_agent.run(f"請分析以下來自多個視頻的日志,歸納出核心特性和競爭對手。\n```json\n{all_logs_text}\n```")discovered_dims = extract_json_from_response(response.content)self.session_state['key_features'] = discovered_dims.get('key_features', [])self.session_state['competitors'] = discovered_dims.get('competitors', [])yield RunResponse(content=f"? **探索完成!**\n - **發現特性**: {self.session_state['key_features']}\n - **發現競品**: {self.session_state['competitors']}\n")# --- 階段2: 分析師 Agent (基于發現的維度進行標注) ---yield RunResponse(content="\n--- **階段2: 分析師Agent** 正在對視頻進行聚焦分析和情感標注 ---\n")analyst_agent = Agent(model=create_llm(self.config),instructions=["你是一位嚴謹的分析師,任務是精讀一份視頻日志,抽取出所有關于指定【分析維度】的觀點,并標注情感。你的輸出必須是一個格式正確的JSON列表,每個對象包含`dimension`, `sentiment` ('positive', 'negative', 'neutral'), `quote`, 和 `timestamp`。"])tagged_reports = {}analysis_dims = self.session_state['key_features'] + self.session_state['competitors']for video_id, log_data in self.video_logs.items():prompt = f"分析以下視頻日志,抽取出所有關于【分析維度: {analysis_dims}】的觀點。\n日志:\n```json\n{json.dumps(log_data, ensure_ascii=False, indent=2)}\n```"response = analyst_agent.run(prompt)tagged_reports[video_id] = extract_json_from_response(response.content)self.session_state['tagged_reports'] = tagged_reportsyield RunResponse(content=f"? **觀點標注完成**,已處理所有視頻。\n")# --- 階段3: 策略師 Agent (整合數據并撰寫報告) ---yield RunResponse(content="\n--- **階段3: 市場策略師Agent** 正在撰寫最終洞察報告 ---\n")consolidated_db = {}for video_id, report in tagged_reports.items():if isinstance(report, list):for item in report:if isinstance(item, dict) and 'dimension' in item:dim = item['dimension']if dim not in consolidated_db: consolidated_db[dim] = []consolidated_db[dim].append({**item, "source_video": video_id})strategist_agent = Agent(model=create_llm(self.config, temperature=0.4),instructions=["你是一位高級市場策略師,任務是基于一份聚合后的輿情數據庫,撰寫一份給公司高層看的、專業的Markdown市場洞察報告。報告必須包含:1. 核心特性正負面評價統計分析。2. 主要優缺點總結(引用原話)。3. 競品對比分析。4. 結論與建議。"])prompt = f"這是關于新款汽車的聚合輿情數據。請基于此數據撰寫一份深入的Markdown洞察報告。\n聚合輿情數據庫:\n```json\n{json.dumps(consolidated_db, ensure_ascii=False, indent=2)}\n```"response = strategist_agent.run(prompt)final_report = response.contentself.session_state['final_report'] = final_reportyield RunResponse(content="\n--- **工作流成功結束** ---")yield RunResponse(content="\n\n" + "="*80 + "\n 最終輿情洞察報告\n" + "="*80 + "\n\n")yield RunResponse(content=final_report)
7. 🎬 主程序執行
運行此單元格將啟動整個流程,并最終在下方渲染出完整的Markdown報告。
def main():"""主執行入口"""# 檢查配置是否填寫if not Config.BAIDU_APP_ID or "YOUR_" in Config.BAIDU_APP_ID or \not Config.ERNIE_API_KEY or "YOUR_" in Config.ERNIE_API_KEY:logger.error("? 致命錯誤: 請在第4步的 Config 類中填入您真實的百度智能云和文心模型的 API Keys。")return# 步驟1: 批量預處理,帶有緩存機制logger.info(">>> 開始批量視頻預處理...")video_log_paths = batch_preprocess_videos(Config)if not video_log_paths:logger.error("所有視頻均預處理失敗,工作流終止。請檢查視頻鏈接或網絡連接。")return# 步驟2: 初始化并運行Agent工作流logger.info("\n>>> 預處理完成,開始運行Agent工作流...")workflow = AutoMarketResearchWorkflow(Config, video_log_paths)final_report_content = ""# 流式打印工作流的每一步輸出for response in workflow.run():print(response.content, end="")# 持續捕獲最終報告的完整內容if "最終輿情洞察報告" in response.content:final_report_content = "" # 報告開始,清空內容else:final_report_content += response.content# 步驟3: 使用Markdown格式優雅地展示最終報告logger.info("\n\n>>> 工作流執行完畢,正在渲染最終報告...")final_report_from_state = workflow.session_state.get('final_report')if final_report_from_state:# 使用Markdown格式優雅地展示最終報告display(Markdown(final_report_from_state))else:# 如果因為某些原因 state 中沒有報告,再給出一個明確的失敗提示error_message = "# 未能生成最終報告\n工作流已結束,但在 session_state 中未找到 'final_report'。請檢查策略師Agent的運行日志。"display(Markdown(error_message))# --- 運行! ---
if __name__ == "__main__":main()
最終輸出:
新款汽車市場洞察報告
1. 核心特性正負面評價統計分析
根據聚合輿情數據庫,我們對新款汽車的核心特性進行了正負面評價的統計分析。以下是主要特性的情感分布:
- 3.0T直列六缸發動機: 正面評價: 100%
- 48伏輕混系統: 正面評價: 100%
- 采埃孚變速箱: 正面評價: 100%
- 前置適時四驅和多片離合器: 正面評價: 100%
- 華為ADS 2.0智駕系統: 正面評價: 100%
- CDC可變阻尼減震器和單腔空氣懸掛: 正面評價: 100%
- 競品對比(奔馳、寶馬、奧迪): 負面評價較多
2. 主要優缺點總結
優點
-
3.0T直列六缸發動機:
- 用戶評價:“3.0t,直接最大馬力381匹,最大扭矩520牛米,百公里加速僅需4.8秒。”
-
48伏輕混系統:
- 用戶評價:“車加入了48伏輕混系統,當速度起來之后,發動機介入,動力采用了直列六缸布局,可以做到動力輸出更強勁,更平穩。”
-
采埃孚變速箱:
- 用戶評價:“變速箱的換擋邏輯清晰,仿佛它能讀懂你的心思,迅速降低到合適的檔位,既滿足了動力需求,同時也盡量減少了更多的頓挫感。”
-
前置適時四驅和多片離合器:
- 用戶評價:“在非鋪裝路面上,它才會自動切換到四驅,可以提供更好的駕駛體驗和穩定性。”
-
華為ADS 2.0智駕系統:
- 用戶評價:“它搭載了華為研發的ADS 2.0智駕系統,基本上哪里都能開。”
-
CDC可變阻尼減震器和單腔空氣懸掛:
- 用戶評價:“底盤采用了電動四驅,領先全系標配的CDC可變阻尼減震器和單腔空氣懸掛,不論是配置還是用料都屬于上乘。”
缺點
- 競品對比中的負面評價:
- 用戶評價(奔馳、寶馬、奧迪): “呆個兩三年哪有BBA(奔馳、寶馬、奧迪)有面子,甚至不如奧迪A6L有面子,一個小康出品的車吧,就是個雜牌貨被你吹得像個豪車。”
3. 競品對比分析
-
奔馳:
- 負面評價集中在品牌價值方面,認為新款汽車不如奔馳有面子。
- 用戶評價:“你你卻還停留在70、80后的BBA(奔馳、寶馬、奧迪)時代,問界M9提鞋都不配。”
-
寶馬:
- 正面評價:寶馬的駕駛體驗仍然受到認可。
- 負面評價:認為新款汽車在某些方面已經超越寶馬。
- 用戶評價:“論舒適豪華都說奔馳舒適豪華,但問界M9比奔馳還要舒適,內飾要比奔馳E還豪華,高檔,加速也比寶馬快。”
-
奧迪:
- 負面評價:與奔馳、寶馬類似,認為新款汽車在某些方面已經超越奧迪。
- 用戶評價:“滿大街的BBA,眼下國內很多款車型已經超越了我們一代人過時的夢想了。”
-
問界M9:
- 正面評價:在多個方面(舒適性、內飾豪華度、加速性能、智能化和輔助駕駛)都得到了高度評價。
- 用戶評價:“買問界M9的車主,那都是真大哥,家里都是已經有兩部三部車子的了,都是家里已經有奔馳、寶馬、奧迪的了,再買問界M9是為了提高享受生活,提升出行的品質。”
4. 結論與建議
結論
- 新款汽車在動力性能、駕駛體驗、智能化和輔助駕駛方面得到了用戶的高度評價。
- 競品對比中,奔馳、寶馬和奧迪在品牌價值方面仍然有一定優勢,但新款汽車在多個方面已經表現出超越競品的潛力。
- 問界M9作為新款汽車的競品,在舒適性、內飾豪華度、加速性能、智能化和輔助駕駛方面都得到了用戶的認可。
建議
- 品牌建設:加強品牌宣傳,提升品牌價值和知名度,以更好地與奔馳、寶馬和奧迪等競品競爭。
- 產品優化:繼續優化產品性能,特別是在智能化和輔助駕駛方面,以保持領先地位。
- 市場定位:明確市場定位,針對已有豪車(如奔馳、寶馬、奧迪)的車主進行精準營銷,強調新款汽車在提升生活品質和出行體驗方面的優勢。
通過以上分析和建議,我們相信新款汽車在市場上將有更大的發展潛力和競爭力。