目錄
1、什么是問答系統?
2、問答系統的核心工作流程
2.1、理解問題:把問題 “翻譯” 成機器能懂的形式
2.2、 尋找答案:從信息中定位答案
2.3、生成答案:整理并輸出結果
2.4、優化迭代:讓系統更 “聰明”
3、主流技術:讓系統 “變聰明” 的關鍵
4、問答系統的常見類型
5、現實挑戰:問答系統還需要突破什么?
6、問答系統的應用場景
7、英文版回答系統(顯示上下文)
8、中文版回答系統(顯示上下文)
9、中文版回答系統(隱藏上下文)
10、中英文版回答系統(隱藏上下文)
1、什么是問答系統?
簡單來說,問答系統是一種能 “聽懂” 人類問題,并給出準確答案的智能系統。比如我們平時用的智能助手(如 Siri、小愛同學),輸入 “今天天氣怎么樣?” 就能得到答案,這就是最常見的問答系統應用。
從技術角度看,它的核心任務是:接收自然語言問題,結合已有信息(如上下文、知識庫),返回簡潔準確的答案。
2、問答系統的核心工作流程
一個完整的問答系統,無論復雜與否,都離不開以下 4 個關鍵步驟:
2.1、理解問題:把問題 “翻譯” 成機器能懂的形式
人類用自然語言提問(比如 “法國的首都是什么?”),機器首先需要理解問題的核心含義:
- 識別關鍵詞:提取問題中的關鍵信息(如 “法國”“首都”)。
- 判斷問題類型:是問事實(“是什么”)、原因(“為什么”),還是方法(“怎么做”)?不同類型的問題,尋找答案的方式不同。
- 轉換為機器語言:通過技術手段(如詞向量、預訓練模型)將文字轉為機器能處理的數字形式(向量)。
比如 “法國的首都是什么?”,機器會識別出 “法國” 是主體,“首都” 是要查詢的屬性,核心是尋找 “法國” 對應的 “首都” 名稱。
2.2、 尋找答案:從信息中定位答案
理解問題后,系統需要從 “信息源” 中找答案。常見的信息源有兩種:
- 給定的上下文:比如閱讀一篇文章后回答相關問題(如考試中的閱讀理解),答案一定在文章里。
- 外部知識庫:比如回答 “地球自轉一周需要多久?”,系統需要調用已有的知識儲備(如百科全書、數據庫)。
以 “從上下文找答案” 為例,機器需要完成:
- 定位相關段落:在上下文里篩選出可能包含答案的部分(比如問題問 “法國首都”,就重點看提到 “法國” 的段落)。
- 精確匹配答案:在相關段落中,找到與問題關鍵詞匹配的內容(比如 “巴黎是法國的首都”,這里 “巴黎” 就是答案)。
2.3、生成答案:整理并輸出結果
找到答案后,系統需要把它整理成人類能理解的自然語言:
- 提取核心信息:如果答案藏在長句子里(如 “法國的首都是巴黎,它是歐洲的重要城市”),需要提煉出 “巴黎” 這個核心答案。
- 確保答案準確:檢查答案是否完全匹配問題(比如問題問 “首都”,就不能返回 “法國的貨幣是歐元”)。
- 格式化輸出:用簡潔的語言呈現(比如直接說 “巴黎”,而不是重復整句話)。
2.4、優化迭代:讓系統更 “聰明”
問答系統不是一成不變的,需要通過不斷優化提升效果:
- 學習新數據:用更多的問答樣本(如 “問題 - 答案 - 上下文” 組合)訓練系統,讓它見過更多場景。
- 修正錯誤:如果系統答錯了(比如把 “法國首都” 說成 “倫敦”),通過人工標注或算法調整糾正錯誤。
- 適應復雜場景:逐步處理更難的問題(如需要推理的問題 “小明有 3 個蘋果,吃了 1 個,又買了 2 個,現在有幾個?”)。
3、主流技術:讓系統 “變聰明” 的關鍵
目前,問答系統的核心技術依賴自然語言處理(NLP)?和機器學習,其中最具代表性的是 “預訓練語言模型”(如 BERT、GPT 等):
- BERT 模型:擅長從上下文中找答案。它能像人類閱讀一樣,理解句子中每個詞的含義,以及詞與詞之間的關系(比如 “它” 指的是前文的 “巴黎”)。因此,在 “給定上下文找答案” 的場景中表現突出。
- GPT 模型:擅長生成答案。它不僅能理解問題,還能結合自己 “學過” 的海量知識,生成全新的答案(即使上下文里沒有直接提到),更接近人類的 “思考” 過程。
4、問答系統的常見類型
根據應用場景和技術特點,問答系統可以分為幾類:
- 抽取式問答:答案一定來自給定的上下文(如閱讀理解),系統的任務是 “抽取” 出其中的片段。
- 生成式問答:答案可以是全新的句子(不一定來自上下文),系統需要 “創造” 答案(如智能助手回答常識問題)。
- 知識庫問答:專門從結構化的知識庫(如百科數據庫)中查詢答案,適合精準的事實性問題(如 “李白是哪一年出生的?”)。
- 對話式問答:能進行多輪對話(比如 “明天會下雨嗎?”→“會下小雨。”→“那需要帶傘嗎?”→“需要。”),需要記住之前的對話內容。
5、現實挑戰:問答系統還需要突破什么?
盡管現在的問答系統已經很實用,但仍有不少難題:
- 復雜問題推理:對于需要多步思考的問題(如 “小明的媽媽是小紅的姑姑,小紅和小明是什么關系?”),系統往往難以處理。
- 歧義問題理解:同一個問題可能有多種含義(如 “蘋果多少錢?” 可能指水果,也可能指手機),系統需要結合語境判斷。
- 常識與經驗依賴:很多問題需要生活常識(如 “為什么天熱會出汗?”),而機器缺乏人類的生活經驗,容易答錯。
- 對抗性問題干擾:遇到故意設計的 “陷阱問題”(如語法混亂、包含錯誤信息的問題),系統可能被誤導。
6、問答系統的應用場景
除了我們熟悉的智能助手,問答系統還廣泛應用在:
- 客服領域:自動回答用戶關于產品的常見問題(如 “退貨流程是什么?”),減少人工成本。
- 教育領域:作為 “智能輔導老師”,解答學生的學科問題(如 “數學公式怎么推導?”)。
- 醫療領域:輔助醫生查詢醫學知識(如 “這個癥狀可能是什么病?”),但需嚴格驗證準確性。
- 搜索引擎:比如百度、谷歌的 “直接回答” 功能,在搜索結果頂部直接顯示問題答案(如搜索 “地球半徑”,結果頂部會直接給出數字)。
7、英文版回答系統(顯示上下文)
# 導入PyTorch深度學習框架,用于張量運算和模型訓練
import torch
# 導入datasets庫,用于加載和處理數據集;Dataset類用于構建自定義數據集
from datasets import load_dataset, Dataset
# 導入os庫用于文件路徑操作,shutil庫用于文件刪除/移動等操作
import os
import shutil
# 從transformers庫導入所需組件:
# BertForQuestionAnswering:用于問答任務的BERT模型
# BertTokenizerFast:快速BERT分詞器(支持offset_mapping等功能)
# TrainingArguments:訓練參數配置類
# Trainer:模型訓練器(封裝了訓練循環)
# DataCollatorWithPadding:用于批量數據填充的工具
from transformers import (BertForQuestionAnswering,BertTokenizerFast,TrainingArguments,Trainer,DataCollatorWithPadding
)# 設置環境變量:解決protobuf庫的潛在兼容性問題(部分環境下可能出現導入錯誤)
os.environ["PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION"] = "python"# 檢查是否有可用GPU,優先使用GPU加速訓練/推理,否則使用CPU
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"使用設備: {device}")# 1. 加載本地JSON格式的SQuAD風格數據集,并展開嵌套結構為單個問答樣本
def load_and_expand_squad(json_path):"""加載SQuAD格式的JSON數據集,并將嵌套結構展開為扁平的問答樣本列表參數:json_path: 本地JSON文件路徑(SQuAD 1.1/2.0格式)返回:Dataset對象:包含"question"(問題)、"context"(上下文)、"answers"(答案)的樣本集合"""# 加載原始JSON數據(SQuAD格式的JSON通常包含"data"字段,內嵌套篇章、段落、問答對)try:# 使用load_dataset加載JSON文件,指定field="data"提取數據核心字段,取"train"拆分(實際可根據文件內容調整)raw_dataset = load_dataset("json", data_files=json_path, field="data")["train"]except Exception as e:print(f"加載數據集失敗: {e}")print(f"請確保 {json_path} 文件存在且格式正確(SQuAD風格)")exit(1) # 加載失敗則退出程序# 展開嵌套結構:SQuAD格式為 篇章(article)→段落(paragraphs)→問答對(qas),需展開為單個樣本expanded = [] # 存儲展開后的樣本invalid_count = 0 # 統計無效樣本數量for article in raw_dataset: # 遍歷每個篇章try:for paragraph in article["paragraphs"]: # 遍歷篇章中的每個段落context = paragraph["context"] # 提取段落文本作為上下文for qa in paragraph["qas"]: # 遍歷段落中的每個問答對# 構建單個樣本:包含問題、上下文、答案(確保答案格式為{"text": [答案文本], "answer_start": [起始位置]})expanded.append({"question": qa["question"], # 問題文本"context": context, # 上下文文本"answers": {# 取第一個答案(SQuAD可能有多個答案,此處簡化為單答案)"text": [qa["answers"][0]["text"]],"answer_start": [qa["answers"][0]["answer_start"]]}})except Exception as e:print(f"解析樣本時出錯: {e},跳過該樣本")invalid_count += 1continue # 跳過無效樣本print(f"加載完成,有效樣本數: {len(expanded)},無效樣本數: {invalid_count}")return Dataset.from_list(expanded) # 轉換為Dataset對象返回# 2. 加載并準備訓練集和驗證集
print("加載訓練集...")
# 加載訓練集(假設本地有SQuAD格式的train-v2.0.json)
dataset = load_and_expand_squad("train-v2.0.json")
print("加載驗證集...")
# 加載驗證集(假設本地有SQuAD格式的dev-v2.0.json)
val_dataset = load_and_expand_squad("dev-v2.0.json")# 取1%樣本用于快速測試(僅為調試用,實際訓練可注釋掉以使用全量數據)
# train_test_split(test_size=0.01)表示取1%數據作為測試集(此處用作快速訓練樣本)
dataset = dataset.train_test_split(test_size=0.01)["test"]
val_dataset = val_dataset.train_test_split(test_size=0.01)["test"]print(f"訓練集大小: {len(dataset)},驗證集大小: {len(val_dataset)}")# 3. 加載預訓練模型和分詞器(基于BERT-base-uncased)
model_name = "bert-base-uncased" # 模型名稱:BERT-base無大小寫區分版本(適合英文任務)
local_model_dir = "./bert-base-uncased" # 本地緩存模型的路徑# 清理舊緩存(可選,僅在需要強制重新下載模型時啟用)
# if os.path.exists(local_model_dir):
# print(f"清理模型緩存: {local_model_dir}")
# shutil.rmtree(local_model_dir, ignore_errors=True) # 刪除舊緩存目錄# 加載分詞器和模型
try:# 加載快速分詞器(BertTokenizerFast支持return_offsets_mapping等功能,是QA任務必需的)tokenizer = BertTokenizerFast.from_pretrained(model_name,cache_dir=local_model_dir, # 緩存到本地目錄,避免重復下載use_fast=True # 強制使用快速分詞器)# 加載用于問答任務的BERT模型(輸出start_logits和end_logits,用于預測答案位置)model = BertForQuestionAnswering.from_pretrained(model_name,cache_dir=local_model_dir).to(device) # 將模型移動到指定設備(GPU/CPU)
except Exception as e:print(f"加載模型失敗: {e}")print(f"請確保 {local_model_dir} 目錄存在或網絡連接正常(首次運行需下載模型)")exit(1)# 驗證分詞器類型(快速分詞器是QA任務必需的,否則無法獲取offset_mapping)
print(f"加載的分詞器類型: {type(tokenizer).__name__}")
if type(tokenizer).__name__ != "BertTokenizerFast":print("警告: 當前使用的不是快速分詞器,可能導致性能問題或功能缺失(QA任務需要快速分詞器)")# 4. 數據預處理函數(將文本轉換為模型可接受的輸入格式)
def preprocess_function(examples):"""預處理函數:將原始文本(問題、上下文、答案)轉換為模型輸入格式參數:examples: 包含"question"、"context"、"answers"的批量樣本返回:處理后的字典:包含input_ids、attention_mask、start_positions、end_positions"""# 編碼問題和上下文:使用分詞器將文本轉換為token IDinputs = tokenizer(examples["question"], # 問題文本(列表)examples["context"], # 上下文文本(列表)max_length=512, # 最大序列長度(BERT-base支持最長512token)truncation="only_second", # 僅截斷上下文(question在前,context在后,優先保留問題)return_offsets_mapping=True, # 返回offset_mapping(token與原始文本字符的映射關系,用于定位答案)padding="max_length" # 填充到max_length長度)# 提取offset_mapping(每個token對應的原始文本起始/結束字符位置)offset_mapping = inputs.pop("offset_mapping") # 移除offset_mapping(不輸入模型,僅用于計算答案位置)start_positions = [] # 存儲每個樣本的答案起始token位置end_positions = [] # 存儲每個樣本的答案結束token位置# 遍歷每個樣本的offset_mapping,計算答案的start和end token位置for i, offsets in enumerate(offset_mapping):answer = examples["answers"][i] # 當前樣本的答案# 處理無答案情況(SQuAD 2.0支持無答案樣本,用(0,0)表示)if len(answer["text"]) == 0:start_positions.append(0) # 無答案時起始位置設為0([CLS] token)end_positions.append(0)continue# 獲取答案在原始上下文的字符位置(start_char:起始字符索引;end_char:結束字符索引)start_char = answer["answer_start"][0] # 答案起始字符位置end_char = start_char + len(answer["text"][0]) # 答案結束字符位置(起始+長度)# 定位上下文在token序列中的范圍(通過sequence_ids區分問題和上下文)# sequence_ids:每個token對應的序列類型(0=問題,1=上下文,None=特殊token如[CLS]、[SEP])sequence_ids = inputs.sequence_ids(i) # 獲取第i個樣本的sequence_ids# 找到上下文的起始token索引(第一個sequence_id=1的位置)context_start = 0while sequence_ids[context_start] != 1:context_start += 1# 找到上下文的結束token索引(最后一個sequence_id=1的位置)context_end = len(sequence_ids) - 1while sequence_ids[context_end] != 1:context_end -= 1# 查找答案在token序列中的起始和結束位置start_token = None # 答案起始token索引end_token = None # 答案結束token索引# 檢查答案是否超出上下文范圍(若答案不在上下文中,視為無答案)if offsets[context_start][0] > end_char or offsets[context_end][1] < start_char:start_positions.append(0)end_positions.append(0)else:# 尋找答案的起始token:找到第一個包含start_char的tokenfor idx in range(context_start, context_end + 1):token_start, token_end = offsets[idx] # 當前token的字符范圍if token_start <= start_char <= token_end:start_token = idx # 記錄起始token索引break# 尋找答案的結束token:找到最后一個包含end_char的tokenfor idx in range(context_end, context_start - 1, -1):token_start, token_end = offsets[idx] # 當前token的字符范圍if token_start <= end_char <= token_end:end_token = idx # 記錄結束token索引break# 若找不到答案位置(如tokenization導致的偏移),設為[CLS] token(0)start_positions.append(start_token if start_token is not None else 0)end_positions.append(end_token if end_token is not None else 0)# 將計算好的start和end位置添加到輸入中(作為模型訓練的標簽)inputs["start_positions"] = start_positionsinputs["end_positions"] = end_positionsreturn inputs# 5. 應用預處理函數到訓練集和驗證集
print("預處理訓練數據...")
# 對訓練集應用預處理:batched=True表示批量處理,remove_columns移除原始文本列(模型不需要)
tokenized_dataset = dataset.map(preprocess_function, batched=True, remove_columns=dataset.column_names)
print("預處理驗證數據...")
# 對驗證集應用同樣的預處理
tokenized_val_dataset = val_dataset.map(preprocess_function, batched=True, remove_columns=val_dataset.column_names)# 將數據集轉換為PyTorch張量格式(模型需要張量輸入)
# 只保留模型需要的列:input_ids(token ID)、attention_mask(掩碼,區分有效token和填充)、start/end_positions(標簽)
tokenized_dataset.set_format("torch", columns=["input_ids", "attention_mask", "start_positions", "end_positions"])
tokenized_val_dataset.set_format("torch", columns=["input_ids", "attention_mask", "start_positions", "end_positions"])# 6. 配置訓練參數
training_args = TrainingArguments(output_dir="./qa_model", # 模型和日志輸出目錄evaluation_strategy="epoch", # 每輪結束后進行驗證learning_rate=3e-5, # 學習率(BERT微調常用3e-5)per_device_train_batch_size=4, # 每個設備的訓練批大小(GPU內存小則設小些)per_device_eval_batch_size=4, # 每個設備的驗證批大小num_train_epochs=2, # 訓練輪次(小樣本快速測試用2輪,實際訓練可增至3-5輪)weight_decay=0.01, # 權重衰減(L2正則化,防止過擬合)logging_dir="./logs", # 日志目錄(可通過tensorboard查看)logging_steps=10, # 每10步記錄一次日志save_strategy="epoch", # 每輪結束后保存模型fp16=True if device == "cuda" else False, # 若使用GPU,啟用混合精度訓練(加速訓練并減少內存占用)
)# 數據整理器:用于批量處理時自動填充(確保同批次樣本長度一致)
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)# 7. 初始化Trainer并開始訓練
print("開始訓練模型...")
trainer = Trainer(model=model, # 待訓練的模型args=training_args, # 訓練參數train_dataset=tokenized_dataset, # 訓練集eval_dataset=tokenized_val_dataset, # 驗證集tokenizer=tokenizer, # 分詞器(用于保存模型時一同保存)data_collator=data_collator, # 數據整理器
)# 開始訓練(封裝了完整的訓練循環:前向傳播、損失計算、反向傳播、參數更新)
trainer.train()# 訓練完成后保存模型(Trainer會自動保存,這里再次確認)
print(f"訓練完成,模型已保存到: {training_args.output_dir}")
trainer.save_model(training_args.output_dir) # 手動保存模型(包含配置、分詞器、權重)# 8. 答案生成函數(使用訓練好的模型進行問答推理)
def generate_answer(question, context):"""生成答案:根據問題和上下文,使用模型預測答案參數:question: 輸入的問題文本context: 相關的上下文文本返回:模型預測的答案文本"""# 確保模型在正確的設備上,并設置為評估模式(關閉dropout等訓練特有的層)model.to(device)model.eval()# 編碼輸入:將問題和上下文轉換為模型輸入格式(返回張量)inputs = tokenizer(question, # 單個問題context, # 單個上下文max_length=512,truncation="only_second", # 僅截斷上下文return_tensors="pt", # 返回PyTorch張量padding="max_length" # 填充到max_length).to(device) # 移動到指定設備# 模型推理(關閉梯度計算,節省內存)with torch.no_grad():outputs = model(** inputs) # 輸入解包(input_ids、attention_mask等)# 從輸出中獲取預測的答案起始和結束token索引(取概率最大的位置)start_idx = torch.argmax(outputs.start_logits).item() # 起始token索引end_idx = torch.argmax(outputs.end_logits).item() # 結束token索引# 將token ID轉換為原始token(用于拼接答案)tokens = tokenizer.convert_ids_to_tokens(inputs["input_ids"][0]) # 取第一個樣本(僅輸入一個樣本)answer_tokens = tokens[start_idx:end_idx + 1] # 截取答案對應的token序列#答案對應的token序列# 處理無答案情況(若start和end都為0,對應[CLS] token,視為無答案)if start_idx == 0 and end_idx == 0:return "No answer found" # 無答案時返回提示# 將token轉換為文本,并清理特殊token(如[CLS]、[SEP])answer = tokenizer.convert_tokens_to_string(answer_tokens)# 移除可能殘留的特殊token,并strip去空格return answer.replace("[CLS]", "").replace("[SEP]", "").strip()# 9. 交互式問答測試(方便用戶手動輸入問題和上下文進行測試)
if __name__ == "__main__":print("\n=== 問答測試 ===")# 示例問題(用于快速驗證模型功能)test_questions = [{"question": "What is the capital of France?", # 問題:法國的首都是什么?"context": "France is a country in Europe. Its capital is Paris, which is also a major cultural center." # 上下文},{"question": "Who developed BERT?", # 問題:誰開發了BERT?"context": "BERT is a language model developed by Google in 2018. It uses a Transformer architecture." # 上下文}]# 運行示例問題測試for i, test in enumerate(test_questions):print(f"\n問題 {i + 1}: {test['question']}")print(f"答案: {generate_answer(test['question'], test['context'])}")# 交互式問答(允許用戶自定義輸入)while True:print("\n=== 交互式問答 ===")question = input("請輸入問題 (輸入q退出): ")if question.lower() == "q": # 輸入q則退出breakcontext = input("請輸入相關上下文: ")if not context: # 上下文不能為空print("上下文不能為空!")continue# 生成并打印答案answer = generate_answer(question, context)print(f"模型回答: {answer}")
8、中文版回答系統(顯示上下文)
# 導入JSON模塊用于處理JSON格式數據(數據集通常為JSON格式)
import json
# 導入os模塊用于文件路徑操作(創建目錄、檢查文件是否存在等)
import os
# 導入PyTorch深度學習框架,用于張量運算和模型訓練
import torch
# 從torch.utils.data導入Dataset(自定義數據集基類)和DataLoader(數據加載器)
from torch.utils.data import Dataset, DataLoader
# 從transformers庫導入中文問答任務所需組件:
# BertTokenizerFast:快速BERT分詞器(支持offset_mapping,用于定位答案在原始文本中的位置)
# BertForQuestionAnswering:用于問答任務的BERT模型(輸出start_logits和end_logits,預測答案起止位置)
# AdamW:適用于Transformer模型的優化器
# get_linear_schedule_with_warmup:學習率調度器(控制訓練過程中學習率的變化)
from transformers import (BertTokenizerFast,BertForQuestionAnswering,AdamW,get_linear_schedule_with_warmup
)
# 導入tqdm用于顯示訓練進度條
from tqdm import tqdm# 設置計算設備:優先使用GPU(cuda)加速訓練/推理,若無則使用CPU
# 說明:GPU的并行計算能力可大幅縮短訓練時間,尤其適合BERT等大型模型
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"使用設備: {device}")# 自定義數據集類:將預處理后的編碼數據轉換為PyTorch可識別的數據集格式
# 作用:統一數據接口,方便DataLoader按批次加載數據
class QADataset(Dataset):def __init__(self, encodings):# encodings:包含input_ids、attention_mask、start_positions、end_positions的字典self.encodings = encodings# 按索引獲取單個樣本(PyTorch數據集必需實現的方法)def __getitem__(self, idx):# 將每個字段轉換為PyTorch張量(模型輸入需為張量格式)return {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}# 返回數據集總樣本數(PyTorch數據集必需實現的方法)def __len__(self):# 以input_ids的長度為準(所有字段長度相同)return len(self.encodings.input_ids)# 加載并驗證數據集:確保數據格式正確,為訓練提供可靠輸入
def load_existing_dataset(train_path='data/train_dataset.json', val_path='data/val_dataset.json'):print(f"加載數據集: {train_path} 和 {val_path}")# 內部函數:加載單個數據集并驗證格式def validate_dataset(path):# 檢查文件是否存在if not os.path.exists(path):raise FileNotFoundError(f"數據集文件不存在: {path}")# 加載JSON數據并解析with open(path, 'r', encoding='utf-8') as f:try:dataset = json.load(f) # 加載為列表格式(每個元素是一個樣本)except json.JSONDecodeError as e:raise ValueError(f"JSON解析錯誤: {path}, 錯誤: {e}")# 驗證數據集是否為列表(JSON數組)if not isinstance(dataset, list):raise ValueError(f"數據集格式錯誤: {path} 應為JSON數組(樣本列表)")# 驗證數據集不為空if not dataset:raise ValueError(f"數據集為空: {path}(至少需要一個樣本)")# 驗證每個樣本的結構是否符合要求required_fields = ['question', 'context', 'answers'] # 每個樣本必須包含的字段for i, sample in enumerate(dataset):# 檢查是否包含必要字段for field in required_fields:if field not in sample:raise ValueError(f"樣本 {i} 缺少必要字段: {field}(必須包含問題、上下文、答案)")# 驗證answers字段的結構(必須是字典)answers = sample['answers']if not isinstance(answers, dict):raise TypeError(f"樣本 {i} 的answers字段應為字典,實際為: {type(answers).__name__}")# 檢查answers是否包含text和answer_start子字段(抽取式問答必需)if 'text' not in answers or 'answer_start' not in answers:raise ValueError(f"樣本 {i} 的answers字段缺少必要子字段(需包含text和answer_start)")# 驗證text和answer_start是否為列表(允許空列表表示無答案)if not isinstance(answers['text'], list) or not isinstance(answers['answer_start'], list):raise TypeError(f"樣本 {i} 的answers.text或answers.answer_start應為列表(存儲答案文本和起始位置)")# 關鍵校驗:無答案樣本需text和answer_start同時為空,有答案則同時非空if (not answers['text'] and answers['answer_start']) or (answers['text'] and not answers['answer_start']):raise ValueError(f"樣本 {i} 的answers.text和answer_start需同時為空(無答案)或同時非空(有答案)")# 對有答案的樣本,進一步驗證answer_start的類型和位置if answers['answer_start']: # 僅處理有答案的樣本# 驗證answer_start中的元素是否為整數(位置必須是整數索引)if not all(isinstance(pos, int) for pos in answers['answer_start']):raise TypeError(f"樣本 {i} 的answers.answer_start包含非整數類型(必須為字符起始位置)")# 驗證答案文本與上下文的匹配性(確保答案確實在上下文中)context = sample['context']for j, (text, pos) in enumerate(zip(answers['text'], answers['answer_start'])):if not isinstance(text, str):raise TypeError(f"樣本 {i} 的answers.text[{j}]不是字符串(答案必須是文本)")# 檢查答案起始位置是否超出上下文范圍if pos < 0 or pos >= len(context):raise ValueError(f"樣本 {i} 的answers.answer_start[{j}]={pos}超出上下文長度(上下文長度為{len(context)})")# 檢查答案文本是否與上下文對應位置的內容一致(防止標注錯誤)if context[pos:pos + len(text)] != text:print(f"警告: 樣本 {i} 的答案文本與上下文不匹配")print(f" 上下文: {context}")print(f" 答案文本: '{text}', 起始位置: {pos}(上下文對應位置為'{context[pos:pos + len(text)]}')")print(f"已驗證數據集: {path}, 樣本數: {len(dataset)}")return dataset, len(dataset) # 返回驗證后的數據集和樣本數# 驗證訓練集和驗證集train_data, train_size = validate_dataset(train_path)val_data, val_size = validate_dataset(val_path)# 保存驗證后的數據集(確保后續使用的是格式正確的數據)with open(train_path, 'w', encoding='utf-8') as f:json.dump(train_data, f, ensure_ascii=False, indent=2)with open(val_path, 'w', encoding='utf-8') as f:json.dump(val_data, f, ensure_ascii=False, indent=2)print(f"訓練集規模: {train_size}, 驗證集規模: {val_size}")return train_path, val_path # 返回驗證后的數據集路徑# 數據預處理:將原始文本轉換為BERT可接受的輸入格式(核心步驟)
def prepare_data(dataset_path, tokenizer, max_length=512):# 加載數據集with open(dataset_path, 'r', encoding='utf-8') as f:dataset = json.load(f)# 打印第一個樣本的結構(方便調試和理解數據格式)if dataset:print(f"\n樣本結構示例:")for key, value in dataset[0].items():if key == 'answers':print(f" {key}: {type(value).__name__}({list(value.keys())})") # 顯示answers的子字段else:print(f" {key}: {type(value).__name__}") # 顯示問題和上下文的類型# 過濾無效樣本(保留格式正確的樣本用于訓練)valid_samples = []for i, item in enumerate(dataset):try:# 基本字段類型檢查(問題和上下文必須是字符串)if not isinstance(item['question'], str) or not isinstance(item['context'], str):raise TypeError(f"問題或上下文不是字符串類型(必須為文本)")ans = item['answers']if not isinstance(ans, dict):raise TypeError(f"answers字段不是字典類型(格式錯誤)")# 驗證答案字段的格式(必須是列表)if not isinstance(ans.get('text', []), list) or not isinstance(ans.get('answer_start', []), list):raise ValueError(f"answers.text或answer_start格式錯誤(必須為列表)")# 處理無答案樣本(text和answer_start均為空)if not ans['text'] and not ans['answer_start']:valid_samples.append({'question': item['question'],'context': item['context'],'answers': {'text': [], 'answer_start': []} # 保留無答案樣本})continue# 有答案樣本必須同時包含text和answer_start(排除格式矛盾的樣本)if not ans['text'] or not ans['answer_start']:raise ValueError(f"answers.text和answer_start需同時為空或非空(格式矛盾)")# 取第一個答案(通常抽取式問答每個問題對應一個答案)answer_text = ans['text'][0]answer_start = ans['answer_start'][0]# 確保answer_start是整數(若為字符串則嘗試轉換)if not isinstance(answer_start, int):try:answer_start = int(answer_start)except:raise TypeError(f"answer_start不是整數(無法轉換為位置索引)")# 驗證答案起始位置是否在上下文范圍內context = item['context']if answer_start < 0 or answer_start >= len(context):raise ValueError(f"answer_start超出上下文范圍(上下文長度為{len(context)})")# 修正答案文本與上下文的匹配性(若標注錯誤,以上下文實際內容為準)actual_text = context[answer_start:answer_start + len(answer_text)]if actual_text != answer_text:print(f"警告: 樣本 {i} 的答案文本不匹配")print(f" 預期: '{answer_text}', 實際: '{actual_text}'")answer_text = actual_text # 修正為上下文實際內容# 將驗證通過的樣本加入有效列表valid_samples.append({'question': item['question'],'context': context,'answers': {'text': [answer_text],'answer_start': [answer_start]}})except Exception as e:print(f"跳過無效樣本 {i}: {e}") # 打印錯誤信息并跳過無效樣本print(f"有效樣本數: {len(valid_samples)}/{len(dataset)}") # 統計有效樣本比例# 提取問題、上下文、答案列表(用于批量處理)questions = [item['question'] for item in valid_samples]contexts = [item['context'] for item in valid_samples]answers = [item['answers'] for item in valid_samples]# 用BERT分詞器編碼文本:將問題和上下文轉換為模型可識別的token ID# 關鍵參數說明:# - truncation="only_first": 僅截斷上下文(保留問題完整,因問題更關鍵)# - return_offsets_mapping: 返回token與原始文本字符的映射關系(用于定位答案)# - return_token_type_ids: 返回token類型(0表示問題,1表示上下文,BERT需要區分)encodings = tokenizer(contexts, # 第一個參數:上下文列表questions, # 第二個參數:問題列表(BERT要求上下文在前,問題在后)truncation="only_first",padding='max_length', # 填充至max_length(512)max_length=max_length, # BERT-base模型最大支持512個tokenreturn_tensors='pt', # 返回PyTorch張量return_offsets_mapping=True, # 核心:token與原始字符的映射(抽取答案必需)return_token_type_ids=True)# 存儲每個樣本的答案在token序列中的起始和結束位置(模型訓練的標簽)start_positions = []end_positions = []for i in range(len(answers)):answer = answers[i]# 無答案樣本:將start和end位置設為0(對應[CLS] token,BERT中常用0表示無答案)if not answer['text'] or not answer['answer_start']:start_positions.append(0)end_positions.append(0)continue# 有答案樣本:計算答案在token序列中的位置start_char = answer['answer_start'][0] # 答案在上下文的起始字符索引end_char = start_char + len(answer['text'][0]) - 1 # 答案在上下文的結束字符索引(減1避免超出)# 定位上下文在token序列中的范圍(通過token_type_ids區分問題和上下文)sequence_ids = encodings.sequence_ids(i) # 第i個樣本的token類型(0=問題,1=上下文,None=特殊token)context_start = sequence_ids.index(1) # 上下文的第一個token索引context_end = len(sequence_ids) - 1 - sequence_ids[::-1].index(1) # 上下文的最后一個token索引# 查找答案對應的token起始和結束位置start_token = None # 答案起始token索引end_token = None # 答案結束token索引offsets = encodings.offset_mapping[i] # 第i個樣本的token-字符映射(每個token對應(起始字符, 結束字符))# 尋找答案起始token:找到第一個包含start_char的tokenfor idx in range(context_start, context_end + 1):token_start, token_end = offsets[idx] # 當前token的字符范圍if token_start <= start_char < token_end: # start_char在當前token范圍內start_token = idxbreak# 尋找答案結束token:找到最后一個包含end_char的tokenfor idx in range(context_end, context_start - 1, -1):token_start, token_end = offsets[idx] # 當前token的字符范圍if token_start <= end_char < token_end: # end_char在當前token范圍內end_token = idxbreak# 兜底處理:若未找到答案位置(如tokenization導致偏移),視為無答案if start_token is None or end_token is None:print(f"警告:樣本 {i} 的答案無法映射到tokens,設為無答案")start_positions.append(0)end_positions.append(0)else:start_positions.append(start_token)end_positions.append(end_token)# 移除offset_mapping(模型不需要該字段,僅預處理時用)encodings.pop('offset_mapping')# 將start_positions和end_positions加入編碼結果(作為訓練標簽)encodings.update({'start_positions': start_positions, # 答案起始token索引'end_positions': end_positions # 答案結束token索引})# 返回自定義數據集對象(封裝編碼后的數據)return QADataset(encodings)# 模型訓練函數:定義訓練流程,優化模型參數
def train_model(model, tokenizer, train_loader, val_loader, optimizer, scheduler, epochs, model_save_path):# 將模型設為訓練模式(啟用dropout等訓練特有的層)model.train()# 創建模型保存目錄(若不存在)os.makedirs(model_save_path, exist_ok=True)# 遍歷訓練輪次for epoch in range(epochs):total_loss = 0 # 累計訓練損失# 用tqdm顯示訓練進度progress_bar = tqdm(enumerate(train_loader), total=len(train_loader))# 遍歷每個批次的訓練數據for step, batch in progress_bar:# 將批次數據移至指定設備(GPU/CPU)input_ids = batch['input_ids'].to(device) # token ID序列attention_mask = batch['attention_mask'].to(device) # 注意力掩碼(區分有效token和填充)start_positions = batch['start_positions'].to(device) # 答案起始位置標簽end_positions = batch['end_positions'].to(device) # 答案結束位置標簽# 獲取token_type_ids(區分問題和上下文,BERT需要)token_type_ids = batch.get('token_type_ids', None)if token_type_ids is not None:token_type_ids = token_type_ids.to(device)# 前向傳播:將輸入傳入模型,得到輸出outputs = model(input_ids=input_ids,attention_mask=attention_mask,token_type_ids=token_type_ids,start_positions=start_positions, # 傳入標簽用于計算損失end_positions=end_positions)# 提取損失值(BertForQuestionAnswering會自動計算start和end位置的聯合損失)loss = outputs.losstotal_loss += loss.item() # 累計損失(轉換為Python數值)# 反向傳播:計算梯度optimizer.zero_grad() # 清空之前的梯度(避免累積)loss.backward() # 計算當前損失對參數的梯度optimizer.step() # 根據梯度更新參數scheduler.step() # 調整學習率(按預設策略)# 更新進度條顯示當前輪次和損失progress_bar.set_description(f'Epoch {epoch + 1}/{epochs}, Loss: {loss.item():.4f}')# 計算本輪平均訓練損失avg_train_loss = total_loss / len(train_loader)# 在驗證集上評估模型性能(計算驗證損失)val_loss = evaluate_model(model, val_loader)# 打印本輪訓練和驗證損失(用于判斷模型是否過擬合)print(f'Epoch {epoch + 1}, Train Loss: {avg_train_loss:.4f}, Val Loss: {val_loss:.4f}')# 訓練完成后保存模型和分詞器(模型權重、配置、分詞器詞匯表)model.save_pretrained(model_save_path)tokenizer.save_pretrained(model_save_path)print(f"模型已保存至: {model_save_path}")# 模型評估函數:在驗證集上計算損失,評估模型泛化能力
def evaluate_model(model, val_loader):# 將模型設為評估模式(關閉dropout等層,確保結果穩定)model.eval()total_loss = 0 # 累計驗證損失# 關閉梯度計算(評估時無需更新參數,節省內存)with torch.no_grad():# 遍歷驗證集批次for batch in val_loader:# 數據移至設備input_ids = batch['input_ids'].to(device)attention_mask = batch['attention_mask'].to(device)start_positions = batch['start_positions'].to(device)end_positions = batch['end_positions'].to(device)token_type_ids = batch.get('token_type_ids', None)if token_type_ids is not None:token_type_ids = token_type_ids.to(device)# 前向傳播(僅計算輸出,不更新參數)outputs = model(input_ids=input_ids,attention_mask=attention_mask,token_type_ids=token_type_ids,start_positions=start_positions,end_positions=end_positions)# 累計驗證損失total_loss += outputs.loss.item()# 返回平均驗證損失(越低表示模型在未見過的數據上表現越好)return total_loss / len(val_loader)# 答案預測函數:根據問題和上下文,用訓練好的模型生成答案
def predict_answer(question, context, model, tokenizer, confidence_threshold=0.1, debug=True):# 將模型設為評估模式model.eval()# 用分詞器編碼輸入(問題和上下文)inputs = tokenizer(context, # 上下文在前question, # 問題在后(符合BERT輸入格式)truncation=True, # 超過最大長度則截斷max_length=512,return_offsets_mapping=True, # 保留token與字符的映射,用于提取答案return_tensors='pt' # 返回PyTorch張量)# 提取offset_mapping并轉換為numpy數組(用于后續映射)offset_mapping = inputs.pop('offset_mapping').numpy()[0] # 形狀:(token數, 2)# 將輸入移至模型所在設備inputs = {k: v.to(device) for k, v in inputs.items()}# 關閉梯度計算,進行推理with torch.no_grad():outputs = model(** inputs) # 傳入所有輸入字段# 提取模型輸出的logits(未歸一化的概率)start_logits = outputs.start_logits[0].cpu().numpy() # 每個token作為答案起始的分數end_logits = outputs.end_logits[0].cpu().numpy() # 每個token作為答案結束的分數# 找到分數最高的起始和結束token索引start_idx = start_logits.argmax()end_idx = end_logits.argmax()# 調試信息:打印中間結果(幫助分析預測錯誤原因)if debug:print(f"\n===== 預測調試信息 =====")print(f"問題: {question}")print(f"上下文: {context}")# 打印預測的token索引和對應分數(分數越高表示模型越有把握)# print(f"start_idx={start_idx}, end_idx={end_idx}")# print(f"start_logits[start_idx]={start_logits[start_idx]:.4f}, end_logits[end_idx]={end_logits[end_idx]:.4f}")# 將token ID轉換為文字(查看模型關注的token)tokens = tokenizer.convert_ids_to_tokens(inputs['input_ids'][0].cpu().numpy())# print(f"預測的起始token: {tokens[start_idx]}")# print(f"預測的結束token: {tokens[end_idx]}")# print(f"start_idx的offset: {offset_mapping[start_idx]}")# print(f"end_idx的offset: {offset_mapping[end_idx]}")# 過濾低置信度或無效的答案(無答案邏輯)if (start_logits[start_idx] < confidence_threshold or # 起始位置分數過低end_logits[end_idx] < confidence_threshold or # 結束位置分數過低start_idx > end_idx or # 起始位置在結束位置之后(無效)(start_idx == 0 and end_idx == 0)): # 起始和結束均為0(對應[CLS],視為無答案)if debug:print(f"答案過濾: 置信度不足或無答案")return "未找到答案"# 邊界擴展:適當延長答案結束位置(若下一個token的分數較高)max_end_idx = len(end_logits) - 1while end_idx < max_end_idx and end_logits[end_idx + 1] > end_logits[end_idx] * 0.8:end_idx += 1if debug:print(f"擴展end_idx至: {end_idx}, end_logits={end_logits[end_idx]:.4f}")# 根據offset_mapping將token位置映射回原始文本的字符位置start_char = offset_mapping[start_idx][0] # 答案起始字符索引end_char = offset_mapping[end_idx][1] # 答案結束字符索引(取token的結束位置)# 調試:打印提取的字符范圍和文本if debug:# print(f"提取字符范圍: [{start_char}, {end_char})")print(f"提取的答案文本: '{context[start_char:end_char]}'")# 從上下文提取答案文本answer = context[start_char:end_char]return answer if answer else "未找到答案"# 主函數:整合數據加載、模型訓練和交互式問答流程
def main():try:# 加載并驗證訓練集和驗證集train_path, val_path = load_existing_dataset()# 加載中文BERT分詞器和模型(專為中文優化的抽取式問答模型)# BertTokenizerFast:快速分詞器,支持offset_mapping(抽取式任務必需)# BertForQuestionAnswering:在BERT基礎上添加了start/end logits輸出層,用于預測答案位置tokenizer = BertTokenizerFast.from_pretrained('bert-base-chinese')model = BertForQuestionAnswering.from_pretrained('bert-base-chinese').to(device)# 準備訓練數據(轉換為模型輸入格式)print("\n準備訓練數據...")train_dataset = prepare_data(train_path, tokenizer)# 準備驗證數據print("\n準備驗證數據...")val_dataset = prepare_data(val_path, tokenizer)# 創建數據加載器(按批次加載數據,支持打亂和并行加載)train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True) # 訓練集打亂順序val_loader = DataLoader(val_dataset, batch_size=8) # 驗證集無需打亂# 配置優化器和學習率調度器optimizer = AdamW(model.parameters(), lr=1e-5) # 學習率1e-5(BERT微調常用)epochs = 20 # 訓練輪次(根據數據量調整,20輪適合中小規模數據集)total_steps = len(train_loader) * epochs # 總訓練步數# 線性學習率調度器:從初始學習率線性衰減到0(避免后期波動)scheduler = get_linear_schedule_with_warmup(optimizer, 0, total_steps)# 開始訓練模型print("\n開始訓練...")train_model(model, tokenizer, train_loader, val_loader, optimizer, scheduler, epochs, './chinese_qa_model')# 在驗證集上評估最終模型性能val_loss = evaluate_model(model, val_loader)print(f"\n評估結果: eval_loss={val_loss:.4f}(損失越低越好)")# 交互式問答測試(允許用戶輸入問題和上下文,查看模型預測結果)print("\n=== 交互式問答 ===")print("提示: 輸入q退出,輸入d切換調試模式,輸入r重新加載模型")debug_mode = True # 默認開啟調試模式(顯示中間過程)while True:question = input("\n問題 (輸入q退出, d切換調試模式, r重新加載模型): ").strip()# 處理用戶指令if question.lower() == 'q':break # 退出程序if question.lower() == 'd':debug_mode = not debug_mode # 切換調試模式print(f"調試模式已切換為: {'開啟' if debug_mode else '關閉'}")continueif question.lower() == 'r':print("重新加載模型...") # 重新加載訓練好的模型model = BertForQuestionAnswering.from_pretrained('./chinese_qa_model').to(device)print("模型已重新加載")continue# 獲取用戶輸入的上下文context = input("上下文: ").strip()if not context:print("上下文不能為空!")continue# 調用預測函數生成答案answer = predict_answer(question, context, model, tokenizer, debug=debug_mode)print(f"\n答案: {answer}")except Exception as e:print(f"\n錯誤: {e}")import tracebacktraceback.print_exc() # 打印完整錯誤堆棧,方便調試# 程序入口:當腳本直接運行時執行main函數
if __name__ == "__main__":main()
9、中文版回答系統(隱藏上下文)
模型和分詞器
# 導入JSON處理模塊(用于加載和解析數據集)
import json
# 導入OS模塊(用于文件路徑操作和目錄創建)
import os
# 導入PyTorch深度學習框架
import torch
# 導入數據集和數據加載器(用于批量處理訓練數據)
from torch.utils.data import Dataset, DataLoader
# 導入transformers庫中的自動加載器和模型組件
# AutoTokenizer:自動加載與模型匹配的分詞器
# AutoModelForSeq2SeqLM:自動加載適合序列到序列任務的語言模型(如T5)
# AdamW:優化器(帶權重衰減的Adam)
# get_linear_schedule_with_warmup:學習率調度器
from transformers import (AutoTokenizer,AutoModelForSeq2SeqLM,AdamW,get_linear_schedule_with_warmup
)
# 導入進度條庫(用于顯示訓練進度)
from tqdm import tqdm# 設置計算設備(優先使用GPU,否則使用CPU)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"使用設備: {device}")# 自定義數據集類(處理問答對,生成模型可接受的輸入格式)
class QADataset(Dataset):def __init__(self, data, tokenizer, max_length=128):# 原始問答數據self.data = data# 分詞器(將文本轉換為模型可接受的token IDs)self.tokenizer = tokenizer# 最大序列長度(超出部分將被截斷)self.max_length = max_lengthdef __len__(self):# 返回數據集大小return len(self.data)def __getitem__(self, idx):# 獲取單個樣本item = self.data[idx]# 提取問題文本question = item['question']# 提取答案文本(取第一個答案,若無答案則為空字符串)answer = item['answers']['text'][0] if item['answers']['text'] else ""# 編碼問題(添加"question: "前綴,明確任務類型)# 注:T5模型通過前綴提示任務類型(如"translate English to German: ...")inputs = self.tokenizer(f"question: {question}", # 添加任務前綴max_length=self.max_length,truncation=True, # 截斷超出長度的文本padding='max_length', # 填充至最大長度return_tensors='pt' # 返回PyTorch張量)# 編碼答案(作為模型的目標輸出)labels = self.tokenizer(answer,max_length=self.max_length,truncation=True,padding='max_length',return_tensors='pt')# 將padding位置的標簽設為-100(PyTorch交叉熵損失會忽略-100位置)# 這樣在計算損失時會忽略填充部分,只關注有效tokenlabels = labels['input_ids'].squeeze()labels[labels == self.tokenizer.pad_token_id] = -100return {'input_ids': inputs['input_ids'].squeeze(), # 問題的token IDs'attention_mask': inputs['attention_mask'].squeeze(), # 注意力掩碼(標識有效token)'labels': labels # 答案的token IDs(padding位置為-100)}# 加載并過濾數據集(僅保留有答案的樣本)
def load_data(train_path='data/train_dataset.json', val_path='data/val_dataset.json'):def load_and_filter(path):# 讀取JSON格式的數據集with open(path, 'r', encoding='utf-8') as f:data = json.load(f)# 過濾無答案的樣本(生成式模型需要明確的答案)filtered = [item for item in data if item['answers']['text']]print(f"加載{path},有效樣本數: {len(filtered)}/{len(data)}")return filtered# 加載訓練集和驗證集train_data = load_and_filter(train_path)val_data = load_and_filter(val_path)return train_data, val_data# 模型訓練函數
def train_model(model, tokenizer, train_loader, val_loader, optimizer, scheduler, epochs, save_path):# 設置模型為訓練模式(啟用dropout等訓練特有的層)model.train()# 創建保存模型的目錄os.makedirs(save_path, exist_ok=True)# 訓練主循環for epoch in range(epochs):total_loss = 0 # 累計訓練損失# 使用tqdm顯示訓練進度progress_bar = tqdm(enumerate(train_loader), total=len(train_loader))for step, batch in progress_bar:# 將數據移至指定設備(GPU/CPU)input_ids = batch['input_ids'].to(device)attention_mask = batch['attention_mask'].to(device)labels = batch['labels'].to(device)# 前向傳播(計算模型輸出和損失)outputs = model(input_ids=input_ids,attention_mask=attention_mask,labels=labels # 傳入目標答案,模型會自動計算生成損失)loss = outputs.losstotal_loss += loss.item() # 累計損失值# 反向傳播(計算梯度并更新模型參數)optimizer.zero_grad() # 清空梯度loss.backward() # 計算梯度optimizer.step() # 更新參數scheduler.step() # 更新學習率# 更新進度條顯示當前損失progress_bar.set_description(f'Epoch {epoch + 1}/{epochs}, Loss: {loss.item():.4f}')# 計算平均訓練損失avg_train_loss = total_loss / len(train_loader)# 在驗證集上評估模型val_loss = evaluate_model(model, val_loader)print(f'Epoch {epoch + 1}, 訓練損失: {avg_train_loss:.4f}, 驗證損失: {val_loss:.4f}')# 保存訓練好的模型和分詞器model.save_pretrained(save_path)tokenizer.save_pretrained(save_path)print(f"模型已保存至: {save_path}")# 模型評估函數(計算驗證集上的平均損失)
def evaluate_model(model, val_loader):# 設置模型為評估模式(關閉dropout等)model.eval()total_loss = 0with torch.no_grad(): # 不計算梯度(節省內存和計算資源)for batch in val_loader:# 將數據移至指定設備input_ids = batch['input_ids'].to(device)attention_mask = batch['attention_mask'].to(device)labels = batch['labels'].to(device)# 前向傳播計算損失outputs = model(input_ids=input_ids,attention_mask=attention_mask,labels=labels)total_loss += outputs.loss.item()# 返回平均驗證損失return total_loss / len(val_loader)# 答案預測函數(給定問題,生成答案)
def predict_answer(question, model, tokenizer, max_length=128, temperature=0.7):# 設置模型為評估模式model.eval()# 準備輸入文本(添加任務前綴)input_text = f"question: {question}"# 編碼輸入文本inputs = tokenizer(input_text,max_length=max_length,truncation=True,padding='max_length',return_tensors='pt').to(device)# 生成答案(使用beam search生成更連貫的文本)with torch.no_grad():outputs = model.generate(**inputs, # 傳入編碼后的輸入max_length=max_length, # 最大生成長度num_beams=5, # beam search的寬度(生成質量和速度的權衡)temperature=temperature, # 控制生成的隨機性(值越小越確定性)no_repeat_ngram_size=2, # 避免生成重復的2-gram短語early_stopping=True # 當所有beam都生成結束標記時停止)# 解碼生成的答案(將token IDs轉換為文本)answer = tokenizer.decode(outputs[0], skip_special_tokens=True).strip()return answer if answer else "無法生成答案"# 主函數(程序入口點)
def main():# 加載預訓練的中文T5模型和分詞器# Langboat/mengzi-t5-base:專為中文優化的T5模型,適合生成任務model_name = "Langboat/mengzi-t5-base"tokenizer = AutoTokenizer.from_pretrained(model_name)model = AutoModelForSeq2SeqLM.from_pretrained(model_name).to(device)# 加載訓練數據和驗證數據train_data, val_data = load_data(train_path='data/train_dataset.json',val_path='data/val_dataset.json')# 創建數據集和數據加載器train_dataset = QADataset(train_data, tokenizer)val_dataset = QADataset(val_data, tokenizer)train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True) # 訓練集打亂順序val_loader = DataLoader(val_dataset, batch_size=8) # 驗證集不打亂# 配置優化器和學習率調度器optimizer = AdamW(model.parameters(), lr=5e-5) # 學習率設置為5e-5epochs = 15 # 訓練15個輪次total_steps = len(train_loader) * epochs # 計算總訓練步數scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=total_steps)# 開始訓練模型print("開始訓練生成式問答模型...")train_model(model, tokenizer, train_loader, val_loader, optimizer, scheduler, epochs, './chinese_qa_generator')# 交互式問答測試(無需提供上下文,模型直接生成答案)print("\n=== 交互式問答(無需上下文) ===")print("提示: 輸入問題(輸入q退出)")while True:question = input("\n問題: ").strip()if question.lower() == 'q':break # 退出程序# 調用預測函數生成答案answer = predict_answer(question, model, tokenizer)print(f"答案: {answer}")# 程序入口點(當腳本直接運行時執行main函數)
if __name__ == "__main__":main()
10、中英文版回答系統(隱藏上下文)(需要調試)
"""
多語言生成式問答模型(修復數據集合并與索引錯誤)
"""
import torch
import os
import random
import json
from datasets import load_dataset, Dataset, concatenate_datasets # 新增合并工具
from transformers import (AutoModelForSeq2SeqLM,AutoTokenizer,TrainingArguments,Trainer
)device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"使用設備: {device}")# 1. 數據加載函數(修復索引錯誤)
def load_qa_data(json_path, sample_ratio=0.01, is_chinese=False):"""加載問答數據,修復:- 中文樣本的list index out of range錯誤- 數據集合并方法"""try:if is_chinese:# 中文數據:手動加載扁平JSONwith open(json_path, 'r', encoding='utf-8') as f:raw_data = json.load(f)if not isinstance(raw_data, list):print(f"中文數據錯誤:{json_path} 必須是JSON列表")exit(1)dataset = Dataset.from_list(raw_data)else:# 英文數據:SQuAD格式dataset = load_dataset("json", data_files=json_path, field="data")["train"]except Exception as e:print(f"加載數據失敗: {e}")exit(1)# 提取問題和答案(修復索引錯誤)samples = []for idx, item in enumerate(dataset):try:if is_chinese: # 中文數據處理(核心修復)question = item.get("question", "").strip()answers = item.get("answers", {}) # 默認為字典,避免列表索引錯誤# 修復:先判斷answers類型,再提取textif isinstance(answers, list):# 處理列表格式:answers = [{"text": "..."}]if len(answers) == 0:print(f"中文樣本{idx}錯誤:answers為空列表")continueanswer_text = answers[0].get("text", "").strip()elif isinstance(answers, dict):# 處理字典格式:answers = {"text": ["..."]}text_list = answers.get("text", [])if not isinstance(text_list, list) or len(text_list) == 0:print(f"中文樣本{idx}錯誤:answers.text為空列表")continueanswer_text = text_list[0].strip()else:print(f"中文樣本{idx}錯誤:answers格式錯誤(非列表/字典)")continue# 驗證有效樣本if question and answer_text:samples.append({"question": question, "answer": answer_text})else:print(f"中文樣本{idx}無效:問題或答案為空")else: # 英文數據處理for para in item.get("paragraphs", []):for qa in para.get("qas", []):question = qa.get("question", "").strip()answers = qa.get("answers", [{"text": ""}])if len(answers) == 0:continueanswer_text = answers[0]["text"].strip()if question and answer_text:samples.append({"question": question, "answer": answer_text})except Exception as e:print(f"樣本{idx}處理錯誤: {e},已跳過")continue# 抽樣邏輯(安全處理)if len(samples) == 0:print(f"錯誤:{json_path} 未提取到有效樣本")exit(1)sample_size = min(int(len(samples) * sample_ratio), len(samples))sample_size = max(1, sample_size)sampled = samples if len(samples) <= sample_size else random.sample(samples, sample_size)print(f"{ '中文' if is_chinese else '英文' }數據:原始{len(samples)}條,抽樣后{len(sampled)}條")# 打印示例if sampled:print(f"樣本示例 - 問題: {sampled[0]['question'][:50]}...")print(f"樣本示例 - 答案: {sampled[0]['answer'][:50]}...")return Dataset.from_list(sampled)# 2. 預處理函數(保持不變)
def preprocess_data(examples, tokenizer, max_length=128):inputs = tokenizer(examples["question"],max_length=max_length,truncation=True,padding="max_length",return_tensors="pt")labels = tokenizer(examples["answer"],max_length=max_length,truncation=True,padding="max_length",return_tensors="pt")["input_ids"]labels[labels == tokenizer.pad_token_id] = -100return {"input_ids": inputs["input_ids"].squeeze(),"attention_mask": inputs["attention_mask"].squeeze(),"labels": labels.squeeze()}# 3. 訓練函數(保持不變)
def train_model(model, tokenizer, train_data, val_data, output_dir):print("預處理訓練數據...")tokenized_train = train_data.map(lambda x: preprocess_data(x, tokenizer),batched=True,remove_columns=train_data.column_names,desc="預處理訓練集")print("預處理驗證數據...")tokenized_val = val_data.map(lambda x: preprocess_data(x, tokenizer),batched=True,remove_columns=val_data.column_names,desc="預處理驗證集")training_args = TrainingArguments(output_dir=output_dir,evaluation_strategy="epoch",learning_rate=3e-5,per_device_train_batch_size=4,per_device_eval_batch_size=4,num_train_epochs=3,weight_decay=0.01,logging_steps=5,save_strategy="epoch",fp16=device == "cuda",load_best_model_at_end=True)trainer = Trainer(model=model,args=training_args,train_dataset=tokenized_train,eval_dataset=tokenized_val,tokenizer=tokenizer)print("開始訓練模型...")trainer.train()trainer.save_model(output_dir)print(f"模型已保存至: {output_dir}")return model# 4. 預測函數(保持不變)
def predict_answer(question, model, tokenizer):model.eval()input_text = f"answer the question: {question}"inputs = tokenizer(input_text,max_length=128,truncation=True,return_tensors="pt").to(device)with torch.no_grad():outputs = model.generate(**inputs,max_length=128,num_beams=3,temperature=0.7,no_repeat_ngram_size=2,early_stopping=True)return tokenizer.decode(outputs[0].cpu(), skip_special_tokens=True).strip() or "無法生成答案"# 主函數(修復數據集合并)
def main():model_config = {"model_name": "google/mt5-base","train_data": {"english": "train-v2.0.json","chinese": "train_dataset.json"},"val_data": {"english": "dev-v2.0.json","chinese": "val_dataset.json"},"output_dir": "./multilingual_qa_model"}print(f"加載模型: {model_config['model_name']}")try:tokenizer = AutoTokenizer.from_pretrained(model_config["model_name"])model = AutoModelForSeq2SeqLM.from_pretrained(model_config["model_name"]).to(device)except Exception as e:print(f"加載模型失敗: {e}")return# 加載英文數據print("\n加載英文訓練數據...")english_train = load_qa_data(model_config["train_data"]["english"], sample_ratio=0.05, is_chinese=False)print("加載英文驗證數據...")english_val = load_qa_data(model_config["val_data"]["english"], sample_ratio=0.05, is_chinese=False)# 加載中文數據print("\n加載中文訓練數據...")chinese_train = load_qa_data(model_config["train_data"]["chinese"], sample_ratio=1.0, is_chinese=True)print("加載中文驗證數據...")chinese_val = load_qa_data(model_config["val_data"]["chinese"], sample_ratio=1.0, is_chinese=True)# 修復:使用concatenate_datasets合并數據集(兼容所有版本)print("\n合并中英文數據集...")train_data = concatenate_datasets([english_train, chinese_train])val_data = concatenate_datasets([english_val, chinese_val])print(f"最終訓練樣本數: {len(train_data)},驗證樣本數: {len(val_data)}")# 訓練模型model = train_model(model, tokenizer, train_data, val_data, model_config["output_dir"])# 交互式測試print("\n=== 雙語問答測試 ===")while True:question = input("\n請輸入問題 (輸入q退出): ").strip()if question.lower() == "q":breakprint(f"答案: {predict_answer(question, model, tokenizer)}")if __name__ == "__main__":main()