參考自科大訊飛AI大賽(多模態RAG方向) - Datawhale
賽題意義:
我們正處在一個信息爆炸的時代,但這些信息并非以整潔的純文本形式存在。它們被封裝在各種各樣的載體中:公司的年度財報、市場研究報告、產品手冊、學術論文以及無數的網頁。這些載體的共同特點是 圖文混排 ——文字、圖表、照片、流程圖等元素交織在一起,共同承載著完整的信息。
傳統的AI技術,如搜索引擎或基于文本的問答系統,在處理這類復雜文檔時顯得力不從心。它們能很好地理解文字,但對于圖表中蘊含的趨勢、數據和關系卻是“視而不見”的。這就造成了一個巨大的信息鴻溝:AI無法回答那些需要結合視覺內容才能解決的問題,例如“根據這張條形圖,哪個產品的市場份額最高?”或“請解釋一下這張流程圖的工作原理”。
多模態技術方案選擇:
基于圖片描述 :對所有圖片生成文本描述,將這些描述與原文的文本塊統一處理。這能將多模態問題簡化為純文本問題,最適合快速構建Baseline。
具體工具棧:
PDF解析 :這個環節我們選擇的mineru,但是task1里面為了降低大家的學習門檻,使用的是pymupdf作為平替方案。
Embedding實現 :我最初考慮使用 sentence-transformer 庫。但在進一步查閱資料時,我發現了 Xinference ?,它能將模型部署為服務,并通過兼容OpenAI的API來調用。我立即決定采用這種方式,因為 服務化能讓我的Embedding模塊與主應用邏輯解耦,更利于調試和未來的擴展。
Baseline執行流程:
時間消耗問題:
解決?: 不要在一個腳本里完成所有事 。強烈建議使用 Jupyter Notebook 進行開發調試,并將流程拆分:
-
第一階段:解析 。在一個Notebook中,專門負責調用pymupdf,將所有PDF解析為JSON并 保存到本地 。這個階段成功運行一次后,就不再需要重復執行。
-
第二階段:預處理與Embedding 。在另一個Notebook中,讀取第一步生成的JSON文件,進行圖片描述生成、數據清洗,并調用Embedding模型。將最終包含向量的知識庫 保存為持久化文件 。
-
第三階段:檢索與生成 。在第三個Notebook中,加載第二步保存好的知識庫,專注于調試檢索邏輯和Prompt工程。
通過這種 分步執行、緩存中間結果 的方式,可以極大地提高調試效率,每次修改只需運行對應的、耗時較短的模塊。
核心問題:
-
LLM需要什么樣的信息? 檢索模塊是LLM的眼睛和耳朵。我們是應該只把和問題最相似的那一小塊知識喂給LLM,還是應該提供更豐富的周邊信息?例如,找到一個關鍵段落后,是否應該把它的上下文(前后段落、所屬章節標題)也一并提供,來幫助LLM更好地理解?
Baseline代碼:
1、將壓縮包解壓到指定目錄:
unzip -n datas/財報數據庫.zip -d datas/
-
-n
→ 不覆蓋已存在的文件(已有文件會跳過解壓) -
datas/財報數據庫.zip
→ 壓縮包位置 -
-d datas/
→ 解壓到datas/
目錄
2、文檔分塊(沒有涉及多模態的理解,對圖表類也僅僅是提取文本)
批量遍歷./datas目錄下(含子目錄,即./datas/財報數據庫)的所有 PDF 文件,然后用 PyMuPDF(fitz
)逐頁提取文本內容,最終把每一頁的文字和相關信息整理成 JSON 文件保存到all_pdf_page_chunks.json中(即按頁的方式來分塊),即我們RAG系統的最終知識庫(而非向量數據庫)。
fitz_pipeline_all.py:
import fitz # PyMuPDF,用于操作 PDF 文件(提取文字、圖片等)
import json
from pathlib import Path # Python 標準庫,用于更方便地處理路徑def process_pdfs_to_chunks(datas_dir: Path, output_json_path: Path):"""掃描指定目錄下的所有 PDF 文件,將每個 PDF 按頁提取文本內容,轉換成“頁面數據塊(chunk)”,最終統一保存到一個 JSON 文件中。Args:datas_dir (Path): 存放 PDF 文件的目錄(可以包含子目錄)。output_json_path (Path): 保存最終 JSON 文件的路徑。"""all_chunks = [] # 用于存儲所有 PDF 頁面的數據塊# 遞歸搜索 datas_dir 目錄下的所有 .pdf 文件pdf_files = list(datas_dir.rglob('*.pdf'))if not pdf_files:print(f"警告:在目錄 '{datas_dir}' 中未找到任何 PDF 文件。")returnprint(f"找到 {len(pdf_files)} 個 PDF 文件,開始處理...")# 遍歷找到的每一個 PDF 文件for pdf_path in pdf_files:file_name_stem = pdf_path.stem # 文件名(不包含擴展名)full_file_name = pdf_path.name # 完整文件名(含擴展名)print(f" - 正在處理: {full_file_name}")try:# 打開 PDF 文件(with 確保用完后自動關閉)with fitz.open(pdf_path) as doc:# 遍歷 PDF 的每一頁for page_idx, page in enumerate(doc):# 提取當前頁面的文字("text" 模式只取文本,不含圖片)content = page.get_text("text")# 如果該頁沒有任何文字,則跳過if not content.strip():continue# 構造當前頁面的“數據塊”字典chunk = {"id": f"{file_name_stem}_page_{page_idx}", # 唯一 ID(文件名 + 頁碼)"content": content, # 當前頁的文字內容"metadata": { # 元信息"page": page_idx, # 頁碼(從 0 開始)"file_name": full_file_name # 源文件名}}all_chunks.append(chunk) # 添加到總列表中except Exception as e:# 如果某個 PDF 處理出錯,則打印錯誤信息并繼續處理下一個print(f"處理文件 '{pdf_path}' 時發生錯誤: {e}")# 確保輸出目錄存在(沒有就創建)output_json_path.parent.mkdir(parents=True, exist_ok=True)# 將收集到的所有數據塊寫入 JSON 文件with open(output_json_path, 'w', encoding='utf-8') as f:json.dump(all_chunks, f, ensure_ascii=False, indent=2)print(f"\n處理完成!所有內容已保存至: {output_json_path}")def main():# 獲取當前腳本所在目錄base_dir = Path(__file__).parent# 假設 PDF 數據在當前目錄的 datas 文件夾中datas_dir = base_dir / 'datas'# 生成的 JSON 存放在當前目錄,文件名為 all_pdf_page_chunks.jsonchunk_json_path = base_dir / 'all_pdf_page_chunks.json'# 執行 PDF 批量處理process_pdfs_to_chunks(datas_dir, chunk_json_path)if __name__ == '__main__':main()
兩層循環,所有pdf文檔進行循環,然后是一個pdf文檔中所有頁進行循環,最終JSON文件以每一頁內容為單位進行記錄。(即從一個pdf文件的第一頁開始,直到最后一個pdf的最后一頁),即按頁的方式來分塊。
每一個chunk(或者說每一頁)字典輸出格式:
? ? ? ? ? ? ? ? ? ? chunk = {
? ? ? ? ? ? ? ? ? ? ? ? "id": f"{file_name_stem}_page_{page_idx}",
? ? ? ? ? ? ? ? ? ? ? ? "content": content,
? ? ? ? ? ? ? ? ? ? ? ? "metadata": {
? ? ? ? ? ? ? ? ? ? ? ? ? ? "page": page_idx, ?# 0-based page index
? ? ? ? ? ? ? ? ? ? ? ? ? ? "file_name": full_file_name
? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? }
如下所示:
[{"id": "file1_page_0","content": "第一頁的文字內容...","metadata": {"page": 0,"file_name": "file1.pdf"}},{"id": "file1_page_1","content": "第二頁的文字內容...","metadata": {"page": 1,"file_name": "file1.pdf"}},{"id": "file2_page_0","content": "第一頁的文字內容...","metadata": {"page": 0,"file_name": "file2.pdf"}}
]
3、構建一個簡易的多文檔 RAG系統
步驟如下:
-
讀取 PDF 頁內容的 JSON 文件(之前我們提取的
all_pdf_page_chunks.json
)。 -
調用本地的 embedding 接口,把每個頁面內容(鍵content對應的值)轉成向量:embeddings = self.embedding_model.embed_texts([c['content'] for c in chunks])
-
把向量和對應的chunk(page)一起存進一個簡單的向量庫(
SimpleVectorStore,
本Baseline為了簡化,用一個內存列表(SimpleVectorStore
)模擬了向量數據庫的功能,可以看詳細代碼直接定義了兩個列表去儲存,也就是直接在
內存中存 embeddings + chunks,然后用 numpy 實現余弦相似度搜索):self.vector_store.add_chunks(chunks, embeddings) -
接收一個問題 → 檢索最相似的頁面 → 調用大模型生成結構化回答(JSON 格式 answer / filename / page),檢索步驟如下:
generate_answer()
:先檢索回Top-k個chunk,再把所有召回的chunk依次拼接成上下文(文件名 + 頁碼 + 內容)全部join起來成為一個大的上下文context;生成一個 prompt,要求模型用 JSON 格式返回;調用本地的大模型 API(LOCAL_TEXT_MODEL
);用extract_json_array
解析模型輸出的 JSON,如果解析失敗,就用原始文本+第一條檢索結果的文件名和頁碼;返回結構化字典:question
,answer
,filename
,page
,retrieval_chunks
。 -
prompt如下:
prompt = (f"你是一名專業的金融分析助手,請根據以下檢索到的內容回答用戶問題。\n"f"請嚴格按照如下JSON格式輸出:\n"f'{{"answer": "你的簡潔回答", "filename": "來源文件名", "page": "來源頁碼"}}'"\n"f"檢索內容:\n{context}\n\n問題:{question}\n"f"請確保輸出內容為合法JSON字符串,不要輸出多余內容。")
-
支持批量測試,可從
datas/test.json
讀取一堆問題并并發處理,最后輸出兩個結果文件。rag_top1_pred_raw.json
:原始結果(包含 retrieval_chunks 和原始 idx)rag_top1_pred.json
:過濾結果(去掉 retrieval_chunks,未回答的補空白),用于評測結果的提交,包含test.json里面的全部問題和針對其中10個問題的答案。 -
采用線程池并發調用的方式:with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:,因為在RAG檢索和LLM回答的瓶頸不是 CPU 計算,而是 IO 等待(網絡延遲 + 模型響應時間),而現在一次性有10個問題,因此線程池并發而不是順序執行的方式,把多個題目同時處理,大幅減少總耗時。
rag_from_page_chunks.py:
import json
import osimport hashlib
from typing import List, Dict, Any
from tqdm import tqdm
import sys
import concurrent.futures
import randomfrom get_text_embedding import get_text_embeddingfrom dotenv import load_dotenv
from openai import OpenAI
# 統一加載項目根目錄的.env
load_dotenv()class PageChunkLoader:def __init__(self, json_path: str):self.json_path = json_pathdef load_chunks(self) -> List[Dict[str, Any]]:with open(self.json_path, 'r', encoding='utf-8') as f:return json.load(f)class EmbeddingModel:def __init__(self, batch_size: int = 64):self.api_key = os.getenv('LOCAL_API_KEY')self.base_url = os.getenv('LOCAL_BASE_URL')self.embedding_model = os.getenv('LOCAL_EMBEDDING_MODEL')self.batch_size = batch_sizeif not self.api_key or not self.base_url:raise ValueError('請在.env中配置LOCAL_API_KEY和LOCAL_BASE_URL')def embed_texts(self, texts: List[str]) -> List[List[float]]:return get_text_embedding(texts,api_key=self.api_key,base_url=self.base_url,embedding_model=self.embedding_model,batch_size=self.batch_size)def embed_text(self, text: str) -> List[float]:return self.embed_texts([text])[0]class SimpleVectorStore:def __init__(self):self.embeddings = []self.chunks = []
# 直接內存中存 embeddings + chunksdef add_chunks(self, chunks: List[Dict[str, Any]], embeddings: List[List[float]]):self.chunks.extend(chunks)self.embeddings.extend(embeddings)def search(self, query_embedding: List[float], top_k: int = 3) -> List[Dict[str, Any]]:from numpy import dotfrom numpy.linalg import normimport numpy as npif not self.embeddings:return []emb_matrix = np.array(self.embeddings)query_emb = np.array(query_embedding)sims = emb_matrix @ query_emb / (norm(emb_matrix, axis=1) * norm(query_emb) + 1e-8)idxs = sims.argsort()[::-1][:top_k]return [self.chunks[i] for i in idxs]class SimpleRAG:def __init__(self, chunk_json_path: str, model_path: str = None, batch_size: int = 32):self.loader = PageChunkLoader(chunk_json_path)self.embedding_model = EmbeddingModel(batch_size=batch_size)self.vector_store = SimpleVectorStore()def setup(self):print("加載所有頁chunk...")chunks = self.loader.load_chunks()print(f"共加載 {len(chunks)} 個chunk")print("生成嵌入...")embeddings = self.embedding_model.embed_texts([c['content'] for c in chunks])print("存儲向量...")self.vector_store.add_chunks(chunks, embeddings)print("RAG向量庫構建完成!")def query(self, question: str, top_k: int = 3) -> Dict[str, Any]:q_emb = self.embedding_model.embed_text(question)results = self.vector_store.search(q_emb, top_k)return {"question": question,"chunks": results}def generate_answer(self, question: str, top_k: int = 3) -> Dict[str, Any]:"""檢索+大模型生成式回答,返回結構化結果"""qwen_api_key = os.getenv('LOCAL_API_KEY')qwen_base_url = os.getenv('LOCAL_BASE_URL')qwen_model = os.getenv('LOCAL_TEXT_MODEL')if not qwen_api_key or not qwen_base_url or not qwen_model:raise ValueError('請在.env中配置LOCAL_API_KEY、LOCAL_BASE_URL、LOCAL_TEXT_MODEL')q_emb = self.embedding_model.embed_text(question)chunks = self.vector_store.search(q_emb, top_k)# 拼接檢索內容,帶上元數據context = "\n".join([f"[文件名]{c['metadata']['file_name']} [頁碼]{c['metadata']['page']}\n{c['content']}" for c in chunks])# 明確要求輸出JSON格式 answer/page/filenameprompt = (f"你是一名專業的金融分析助手,請根據以下檢索到的內容回答用戶問題。\n"f"請嚴格按照如下JSON格式輸出:\n"f'{{"answer": "你的簡潔回答", "filename": "來源文件名", "page": "來源頁碼"}}'"\n"f"檢索內容:\n{context}\n\n問題:{question}\n"f"請確保輸出內容為合法JSON字符串,不要輸出多余內容。")client = OpenAI(api_key=qwen_api_key, base_url=qwen_base_url)completion = client.chat.completions.create(model=qwen_model,messages=[{"role": "system", "content": "你是一名專業的金融分析助手。"},{"role": "user", "content": prompt}],temperature=0.2,max_tokens=1024)import json as pyjsonfrom extract_json_array import extract_json_arrayraw = completion.choices[0].message.content.strip()# 用 extract_json_array 提取 JSON 對象json_str = extract_json_array(raw, mode='objects')if json_str:try:arr = pyjson.loads(json_str)# 只取第一個對象if isinstance(arr, list) and arr:j = arr[0]answer = j.get('answer', '')filename = j.get('filename', '')page = j.get('page', '')else:answer = rawfilename = chunks[0]['metadata']['file_name'] if chunks else ''page = chunks[0]['metadata']['page'] if chunks else ''except Exception:answer = rawfilename = chunks[0]['metadata']['file_name'] if chunks else ''page = chunks[0]['metadata']['page'] if chunks else ''else:answer = rawfilename = chunks[0]['metadata']['file_name'] if chunks else ''page = chunks[0]['metadata']['page'] if chunks else ''# 結構化輸出return {"question": question,"answer": answer,"filename": filename,"page": page,"retrieval_chunks": chunks}if __name__ == '__main__':# 路徑可根據實際情況調整chunk_json_path = "./all_pdf_page_chunks.json"rag = SimpleRAG(chunk_json_path)rag.setup()# 控制測試時讀取的題目數量,默認只隨機抽取10個,實際跑全部時設為NoneTEST_SAMPLE_NUM = 10 # 設置為None則全部跑FILL_UNANSWERED = True # 未回答的也輸出默認內容# 批量評測腳本:讀取測試集,檢索+大模型生成,輸出結構化結果test_path = "./datas/test.json"if os.path.exists(test_path):with open(test_path, 'r', encoding='utf-8') as f:test_data = json.load(f)# 記錄所有原始索引all_indices = list(range(len(test_data)))# 隨機抽取部分題目用于測試selected_indices = all_indicesif TEST_SAMPLE_NUM is not None and TEST_SAMPLE_NUM > 0:if len(test_data) > TEST_SAMPLE_NUM:selected_indices = sorted(random.sample(all_indices, TEST_SAMPLE_NUM))def process_one(idx):item = test_data[idx]question = item['question']tqdm.write(f"[{selected_indices.index(idx)+1}/{len(selected_indices)}] 正在處理: {question[:30]}...")result = rag.generate_answer(question, top_k=5)return idx, resultresults = []if selected_indices:with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:results = list(tqdm(executor.map(process_one, selected_indices), total=len(selected_indices), desc='并發批量生成'))# 先輸出一份未過濾的原始結果(含 idx)raw_out_path = "./rag_top1_pred_raw.json"with open(raw_out_path, 'w', encoding='utf-8') as f:json.dump(results, f, ensure_ascii=False, indent=2)print(f'已輸出原始未過濾結果到: {raw_out_path}')# 只保留結果部分,并去除 retrieval_chunks 字段idx2result = {idx: {k: v for k, v in r.items() if k != 'retrieval_chunks'} for idx, r in results}filtered_results = []for idx, item in enumerate(test_data):if idx in idx2result:filtered_results.append(idx2result[idx])elif FILL_UNANSWERED:# 未被回答的,補默認內容filtered_results.append({"question": item.get("question", ""),"answer": "","filename": "","page": "",})# 輸出結構化結果到jsonout_path = "./rag_top1_pred.json"with open(out_path, 'w', encoding='utf-8') as f:json.dump(filtered_results, f, ensure_ascii=False, indent=2)print(f'已輸出結構化檢索+大模型生成結果到: {out_path}')else:print("datas/test.json 不存在")
目前Baseline存在的問題:
1、在第二步的文檔解析(沒有涉及多模態的理解,對圖表類也僅僅是提取文本,即目前采用的是圖像文本化)?這個方法會造成信息損失 :沒有提取圖片里面的內容。
2、Embedding那里針對4263個chunk(page)共耗時3分鐘,在做向量相似度的search的時候耗時11分鐘(隨機抽取 TEST_SAMPLE_NUM=10
個問題,并做了并發調用),每個問題召回的chunk數目TOP-K是5,如何進行加速?
3、將文檔按照頁面切分成獨立的塊進行檢索,可能會破壞原文中段落與段落、段落與圖表之間的上下文關聯。檢索出的知識塊可能是孤立的,缺乏上下文。
4、檢索策略單一 :僅基于語義相似度的檢索,對于一些包含特定關鍵詞或需要大范圍信息整合的問題,可能不是最優解。
提分策略:
-
分塊策略 (Chunking Strategy) :
-
目前是按“頁”分塊,這樣做簡單但粗糙。是否可以嘗試更細粒度的分塊,比如按段落、甚至固定長度的句子分塊?這會如何影響檢索的精度和召回率?
-
如何處理跨越多個塊的表格或段落?是否可以引入重疊(Overlap)分塊的策略?
-
-
檢索優化 (Retrieval Optimization) :
-
當前的Top-K檢索策略很簡單。如果檢索回來的5個塊中,只有1個是真正相關的,其他4個都是噪音,這會嚴重干擾LLM。如何提高檢索結果的信噪比?
-
可以引入重排(Re-ranking)模型嗎?即在初步檢索(召回)出20個候選塊后,用一個更強的模型對這20個塊進行重新排序,選出最相關的5個。
-
-
Prompt工程 (Prompt Engineering) :
-
rag_from_page_chunks.py
中的Prompt是整個生成環節的靈魂。你能設計出更好的Prompt嗎? -
比如,如何更清晰地指示LLM在多個來源中選擇最精確的那一個?如何讓它在信息不足時回答“根據現有信息無法回答”,而不是產生幻覺?
-
-
多模態融合 (Multimodal Fusion) :
-
“圖片->文字描述”的方案有信息損失。有沒有辦法做得更好?
-
可以嘗試 多路召回 嗎?即文本問題同時去檢索文本庫和圖片庫(使用CLIP等多模態向量模型),將檢索到的文本和圖片信息都提供給一個多模態大模型(如Qwen-VL),讓它自己去融合信息并作答。
-
-
升級數據解析方案:從
fitz
到MinerU
:
-
這是至關重要的進階環節。基礎方案所使用的
fitz_pipeline_all.py
僅能提取文本,會遺漏表格、圖片等關鍵信息。
提升攻略1:Prompt工程設計
1、明確的來源選擇指令
告訴 LLM,即使有多個來源,也要進行判斷和篩選,選擇質量最高的那個來作為最終的引用。
prompt = (f"你是一名專業的金融分析助手,請根據以下檢索到的內容回答用戶問題。\n"f"---檢索內容---\n"f"{context}\n"f"---用戶問題---\n"f"{question}\n"f"請嚴格遵循以下規則:\n"f"1. **只使用**上述檢索內容中的信息進行回答,不要使用你的外部知識。\n"f"2. 如果有多個來源(文件名和頁碼)提供了相似或互補的信息,請綜合它們。但在最終回答中,**只引用最直接、最完整的那個來源**的文件名和頁碼。\n"f"3. 你的回答必須是簡潔、精確的,并且直接引用或概括檢索到的內容。\n"f"4. 嚴格按照如下JSON格式輸出,不要輸出多余內容:\n"f'{{"answer": "你的簡潔回答", "filename": "來源文件名", "page": "來源頁碼"}}'
)
2、處理信息不足的策略
防止 LLM 在信息不足時產生幻覺是 RAG 系統設計中一個重要挑戰,與其讓它“編造”一個看似合理的答案,不如明確告訴它在這種情況下該怎么做,比如在 Prompt 中加入無法回答的指令:
prompt = (# ... (前面的 Prompt 內容不變) ...f"請嚴格遵循以下規則:\n"f"1. **只使用**上述檢索內容中的信息進行回答,不要使用你的外部知識。\n"f"2. 如果檢索內容中**沒有足夠的信息來回答問題**,請直接在 'answer' 字段中回答:'根據現有信息無法回答'。此時,`filename` 和 `page` 字段都應為空。\n"f"3. 如果有多個來源... (其余規則不變) ..."
)