????????開發一個智能的問答系統,該系統支持用戶聊天,傳輸文件。通過自然語言處理技術,機器人能夠理解用戶的意圖。機器人將利用互聯網搜索引擎來補充信息,確保用戶能夠獲得全面且準確的回答。
一、web ui界面
我們采用gradio來編寫的ui界面
????????????Gradio 是一個簡單易用的 Python 庫,能夠幫助開發者快速搭建用戶友好的 Web 應 用,特別適合用于機器學習模型的展示。使用 Gradio 來搭建一個可以與 FastAPI 后端交互的對話機器人。
Gradio Blocks:用于組織界面布局的容器。
Slider:用于調整生成參數,如 temperature 和 top_p。
Textbox:用戶輸入對話的地方。
Button:發送用戶輸入或清空歷史記錄。
Chatbot:用于顯示對話歷史的組件。
安裝:
????????pip install gradio==5.0.2
import uuid
import gradio as gr
from webui.chat_with_agent_api import chat_with_agent# 定義一個生成唯一會話 ID 的函數
def generate_session_id():return str(uuid.uuid4())# 在 Gradio 應用加載時生成新的會話ID
def on_app_load():session_id = generate_session_id()return session_idwith gr.Blocks(fill_width=True, fill_height=True) as demo:# 使用 on_load 事件來生成會話IDsession_state = gr.State(value=on_app_load)with gr.Tab("🤖 聊天機器人"):gr.Markdown("## 🤖 聊天機器人")with gr.Row():with gr.Column(scale=1, variant="panel") as sidebar_left:sys_prompt = gr.Textbox(label="系統提示語", value="You are a helpful assistant. Answer questions in chinese !")history_len = gr.Slider(minimum=-1, maximum=10, value=-1, label="保留歷史消息的數量")temperature = gr.Slider(minimum=0.01, maximum=2.0, value=0.5, step=0.01, label="temperature")max_tokens = gr.Slider(minimum=64, maximum=1024, value=512, step=8, label="max_length")stream = gr.Checkbox(label="stream", value=True)with gr.Column(scale=10) as main:chatbot = gr.Chatbot(height=600, type="messages", )gr.ChatInterface(fn=chat_with_agent,multimodal=True,type="messages",theme="soft",chatbot=chatbot,additional_inputs=[sys_prompt,history_len,temperature,max_tokens,stream,session_state, # 使用 session_state 組件來傳遞會話ID],)
# 啟動應用
demo.launch()
?前端和后端進行交互,指向web_ui界面的gr.ChatInterface(fn=chat_with_agent)函數
# 定義后臺 API 的 URL
import json
import requestschat_with_agent_url = "http://127.0.0.1:6605/chat/agent_chat"
chat_url = "http://127.0.0.1:6605/chat/chat"def chat_with_agent(prompt, history, sys_prompt, history_len, temperature, max_tokens, stream, session_id):# 構建文件和表單數據files = [('files', (open(file_path, 'rb'))) for file_path in prompt["files"]]if prompt["files"] != []:# 提取文件路徑并拼接到 query 中query = f'{prompt["text"]}\n' + "".join(prompt["files"][0])else:query = f'{prompt["text"]}\n'# 構建請求數據data = {"query": query,"sys_prompt": sys_prompt,"history_len": history_len,"history": [str(h) for h in history],"temperature": temperature,"max_tokens": max_tokens,"session_id": session_id,}# 發送請求到 FastAPI 后端try:response = requests.post(chat_with_agent_url, files=files, data=data, stream=True)if response.status_code == 200:chunks = ""if stream:for chunk in response.iter_content(chunk_size=None, decode_unicode=True):if chunk:data = json.loads(chunk)chunks += data.get('answer', '')yield chunkselse:for chunk in response.iter_content(chunk_size=None, decode_unicode=True):data = json.loads(chunk)chunks += data.get('answer', '')yield chunkselse:yield "請求失敗,請檢查后臺服務器是否正常運行。"except Exception as e:yield f"發生錯誤:{e}"
二、文本操作
2.1、Rag文本操作讀取文本
import os
from langchain.document_loaders import (TextLoader, # 用于txt/csv/md/py等文本文件PDFPlumberLoader, # 用于PDF文件UnstructuredWordDocumentLoader, # 用于docx文件JSONLoader # 用于JSON文件
)def get_file_content(file_path):"""使用LangChain文檔加載器讀取文件內容Args:file_path (str): 文件路徑Returns:str: 文件內容字符串"""filename = os.path.basename(file_path)file_extension = os.path.splitext(filename)[1].lower()try:# 根據文件類型選擇對應的加載器if file_extension in ('.txt', '.csv', '.md', '.py'):loader = TextLoader(file_path, encoding='utf-8')elif file_extension == '.json':loader = JSONLoader(file_path,jq_schema=".",text_content=False)elif file_extension == '.pdf':loader = PDFPlumberLoader(file_path)elif file_extension == '.docx':loader = UnstructuredWordDocumentLoader(file_path)else:return "格式不支持,請更換文件!"# 加載文檔并合并內容docs = loader.load()file_content = "\n".join([doc.page_content for doc in docs])return file_contentexcept Exception as e:return f"文件讀取錯誤: {str(e)}"# 使用示例
if __name__ == "__main__":res = get_file_content('/home/use_Agent_project/test.csv')print(res)
2.2、with open操作讀取文本
import json
import os
import fitz
from docx import Documentdef get_file_content(file_path):filename = os.path.basename(file_path)file_extension = os.path.splitext(filename)[1].lower()# 讀取不同類型的文件內容if file_extension == '.txt' or file_extension == '.csv':with open(file_path, "r", encoding="utf-8") as f:file_content = f.read()elif file_extension == '.json':with open(file_path, "r", encoding="utf-8") as f:file_content = json.dumps(json.load(f), indent=4, ensure_ascii=False)elif file_extension == '.pdf':file_content = ""with fitz.open(file_path) as pdf_document:for page_num in range(pdf_document.page_count):page = pdf_document[page_num]file_content += page.get_text() + "\n"elif file_extension == '.docx':file_content = ""document = Document(file_path)for paragraph in document.paragraphs:file_content += paragraph.text + "\n"elif file_extension == '.md' or file_extension == '.py':with open(file_path, "r", encoding="utf-8") as f:file_content = f.read()else:file_content = "格式不支持,請更換文件!"return file_contentres=get_file_content('/home/use_Agent_project/黑神話悟空.txt')
print(res)
2.3、uuid+文本名稱+文本內容
import os
from typing import List, Union
from fastapi import UploadFiledef files_rag(files: List[Union[str, UploadFile]], uuid: str) -> str:"""處理文件集合并返回格式化內容Args:files: 文件列表,可以是文件路徑字符串或UploadFile對象uuid: 用于創建唯一存儲目錄的標識符Returns:str: 格式化后的文檔內容字符串"""# 定義存儲路徑kb_file_storage_path = os.path.join("./temp/data", uuid)result = []# 確保存儲目錄存在os.makedirs(kb_file_storage_path, exist_ok=True)# 處理文件上傳或路徑if files:for file in files:if isinstance(file, UploadFile):# 處理上傳文件file_path = os.path.join(kb_file_storage_path, file.filename)with open(file_path, "wb") as f:f.write(file.file.read())elif isinstance(file, str):# 處理文件路徑if os.path.isfile(file):filename = os.path.basename(file)file_path = os.path.join(kb_file_storage_path, filename)with open(file_path, "wb") as f_out, open(file, "rb") as f_in:f_out.write(f_in.read())else:print(f"文件不存在: {file}")continueelse:print(f"不支持的文件類型: {type(file)}")continue# 遍歷目錄中的所有文件并讀取內容for filename in os.listdir(kb_file_storage_path):file_path = os.path.join(kb_file_storage_path, filename)if os.path.isfile(file_path):try:file_content = get_file_content(file_path)result.append(f"document: {file_path}\ncontent: {file_content}\n")except Exception as e:print(f"處理文件 {filename} 時出錯: {str(e)}")continuereturn "\n".join(result)# 使用示例
if __name__ == "__main__":# 使用文件路徑rest = files_rag(['/home/use_Agent_project/黑神話悟空.txt'], 'uuid123')print(rest)# 使用UploadFile對象(需要實際運行FastAPI)# from fastapi import UploadFile# with open('/home/use_Agent_project/黑神話悟空.txt', 'rb') as f:# upload_file = UploadFile(filename="黑神話悟空.txt", file=f)# rest = files_rag([upload_file], 'uuid456')# print(rest)
三、Agent工具
????????在Langchain中,Agent代理是一種智能化的計算機制,它能夠根據輸入的指令或環 境上下文,動態選擇和調用特定的工具(如搜索引擎、數據庫、API等)來完成任 務。 這種代理通過預先定義的邏輯流程或者學習到的策略,幫助開發者實現自動化、動態 化和上下文敏感的計算與決策。
3.1、天氣查詢
?心知天氣:心知天氣 - 高精度氣象數據 - 天氣數據API接口 - 行業氣象解決方案
?
# 心知天氣API工具類
import requests
from pydantic import Field
TIME_OUT = 60
class WeatherCheck:city: str = Field(description="City name,include city and county")def __init__(self, api_key):self.api_key = api_keydef run(self, city):city = city.split('\n')[0] # 清除多余的\n不然API會報錯。url = f"https://api.seniverse.com/v3/weather/now.json?key={self.api_key}&location={city}&language=zh-Hans&unit=c"# 發送請求到心知天氣API# print(url)response = requests.get(url, timeout=TIME_OUT)# print(response.status_code)# 如果請求成功,解析返回數據if response.status_code == 200:data = response.json()# 獲取天氣信息weather = data['results'][0]['now']['text']temperature = data['results'][0]['now']['temperature']return f"{city}的天氣是{weather},溫度為{temperature}°C"else:return f"錯誤碼: {response.status_code}, 無法獲取{city}的天氣信息"weather_check = WeatherCheck("SfYdoqNuCW39UQUvb")print(weather_check.run('北京'))
?北京的天氣是霧,溫度為30°C
3.2、聯網搜索
地址:https://serpapi.com/
?
?
# 網絡搜索API工具類
import requests
from pydantic import Field
TIME_OUT=60
class WebSearch:query: str = Field(description="需要網上查找的內容")def __init__(self, api_key):# 初始化函數,用于創建類的實例self.api_key = api_keydef run(self, query):base_url = "https://serpapi.com/search"params = {"q": query,"api_key": self.api_key,"engine": "baidu","rn": 5, # 檢索結果前5個"proxy": "http://api.wlai.vip" # 代理(用了這個好像不會減使用次數)}# 發送請求到網絡搜索API# print(url)response = requests.get(base_url, params=params, timeout=TIME_OUT)# print(response.status_code)# 如果請求成功,解析返回數據if response.status_code == 200:data = response.json()print(data)organic_results = data['organic_results'][0]['related_news'] if data['organic_results'][0].get('related_news') else data['organic_results']# 獲取網絡信息results = "".join([f"titles:\n{result.get('title', '')} \nlinks:\n{result.get('link', '')}\nsnippets:\n{result.get('snippet', '')}\n" for result in organic_results])return resultselse:return f"無法獲取{query}的信息"web_search = WebSearch("0c38eac10d9b5c03b6007ece56f6c84372cdf86358e5c780fc4b87ae471ff582")
print(web_search.run('王者榮耀韓信出裝'))
3.3、時間服務
from datetime import datetimeclass Get_time:def run(self, text: str):# 獲取當前時間,格式為 年-月-日 小時:分鐘:秒current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")return current_timeget_time = Get_time()print(get_time.run(text=''))
3.4、代碼執行
# 導入必要的庫
import base64 # 用于Base64編解碼(處理圖像數據)
import os # 操作系統接口(文件路徑處理)
import re # 正則表達式(清理代碼)
from uuid import uuid4 # 生成唯一文件名
from codeboxapi import CodeBox # 代碼沙箱執行環境MEDIA_DIR = 60 # 建議改為:MEDIA_DIR = "media"class CodeInterpreter:"""代碼解釋器類,用于執行Python代碼并處理輸出"""def __init__(self):"""初始化方法"""self.output_files = "" # 存儲輸出文件路徑(如圖像)self.output_codes = "" # 存儲執行的原始代碼self.codebox = CodeBox(api_key="local") # 創建本地代碼沙箱實例self.codebox.start() # 啟動沙箱環境def get_outputs(self):"""獲取輸出結果的方法返回:tuple: (output_files, output_codes) 輸出文件路徑和原始代碼"""return self.output_files, self.output_codesdef run(self, code: str):"""執行代碼的核心方法參數:code (str): 要執行的Python代碼(可能包含Markdown代碼塊標記)返回:str: 執行結果文本或圖像保存確認信息"""# 清理代碼:移除Markdown代碼塊標記(```python等)clean_code = re.sub(r'(```python|```py|```)\s*', '', code, flags=re.IGNORECASE)clean_code = clean_code.strip() # 移除首尾空白字符# 移除空行并重新組合lines = [line for line in clean_code.split('\n') if line.strip()]cleaned_code = '\n'.join(lines)# 在沙箱中執行清理后的代碼output = self.codebox.run(cleaned_code)# 處理圖像類型輸出if output.type == "image/png":# 生成唯一文件名filename = f"{MEDIA_DIR}/image-{uuid4()}.png"# Base64解碼圖像數據decoded_image = base64.b64decode(output.content)# 寫入媒體文件(注意:MEDIA_DIR需要是有效目錄)try:with open(filename, 'wb') as file:file.write(decoded_image)except Exception as e:return f"文件保存失敗: {str(e)}"# 生成可訪問的URL路徑image_url = f"/media/{os.path.basename(filename)}"# 保存輸出記錄self.output_files = image_urlself.output_codes = codereturn "已生成圖像并發送給用戶"else:# 返回文本類型輸出return output.content# 使用示例
if __name__ == "__main__":code_interpreter = CodeInterpreter()text = '''print(1+1)'''res = code_interpreter.run(text)print(res) # 輸出執行結果
組件 | 作用 |
---|---|
CodeBox | 提供安全的代碼執行沙箱(類似Docker容器) |
正則清理clean_code | 去除Markdown代碼塊標記(```python 等),確保純代碼執行 |
base64 處理 | 解碼圖像輸出(常見于matplotlib等庫的圖形渲染場景) |
文件管理 | 自動保存圖像到MEDIA_DIR 并生成可訪問URL |
?
3.5、定義工具
from langchain_core.tools import Tool
from tools.code_interpreter import code_interpreter
from tools.weather_check import weather_check
from tools.web_search import web_search
from tools.get_time import get_time# 將API工具封裝成Langchain的Tool對象
tools = [Tool(name="weather check",func=weather_check.run,description="獲取當前地點或指定城市的實時天氣信息,包括溫度、天氣狀況等。"),Tool(name="web search",func=web_search.run,description="通過網絡搜索獲取最新資訊、回答問題或查找特定主題的相關內容。"),Tool(name="get time",func=get_time.run,description="獲取當前時間。輸入應始終為空字符串"),Tool(name="code interpreter",func=code_interpreter.run,description="一個Python shell。使用它來執行python代碼。輸入應該是一個有效的python代碼字符串。在與開頭引號相同的行開始編寫代碼。不要以換行開始你的代碼。"),
]
四、提示模板
PROMPT_TEMPLATES = {"agent": 'Answer the following questions as best you can. If it is in order, you can use some tools and knowledgebases ''appropriately. ''You have access to the following tools:\n\n''{tools}\n\n''You have access to the following knowledge bases:\n\n''{knowledgebases}\n\n''You have the following documents:\n\n''{documents}\n\n''Use the following format:\n''Question: the input question you must answer\n''Thought: you should always think about what to do and what tools to use.\n''Action: the action to take, should be one of [{tool_names}]\n''Action Input: the input to the action\n''Observation: the result of the action\n''... (this Thought/Action/Action Input/Observation can be repeated zero or more times)\n''Thought: I now know the final answer\n''Final Answer: the final answer to the original input question\n''Begin!\n\n''History: {history}\n\n''Question: {input}\n\n''Thought: {agent_scratchpad}\n',
}
五、fastapi后端響應
5.1、agent_chat后端構建
import json
import asyncio
import os
from ast import literal_eval
from uuid import uuid4
from fastapi import APIRouter, UploadFile, Form
from fastapi.responses import StreamingResponse
from tools.tools_select import tools
from typing import List, AsyncIterable
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain.agents import create_react_agent, AgentExecutor
from langchain.prompts import SystemMessagePromptTemplate, HumanMessagePromptTemplate
from configs.prompt import PROMPT_TEMPLATES
from configs.setting import chat_model_name, api_key, base_url, TIME_OUT, TEMP_FILE_STORAGE_DIR
from utils.callback import CustomAsyncIteratorCallbackHandler
from tools.code_interpreter import code_interpreter
from utils.load_docs import files_rag# 初始化FastAPI路由
chat_router = APIRouter(prefix="/chat", tags=["Chat 對話"])@chat_router.post("/agent_chat")
async def agent_chat(files: List[UploadFile] = None,query: str = Form(..., description="用戶輸入"),sys_prompt: str = Form("You are a helpful assistant.", description="系統提示"),history_len: int = Form(-1, description="保留歷史消息的數量"),history: List[str] = Form([], description="歷史對話"),temperature: float = Form(0.5, description="LLM采樣溫度"),max_tokens: int = Form(1024, description="LLM最大token數配置"),session_id: str = Form(None, description="會話標識"),
):"""Agent對話接口核心邏輯:1. 處理上傳文件并提取文本內容2. 加載歷史對話3. 初始化LLM和Agent4. 流式返回響應"""# 1. 文件處理try:documents = files_rag(files, session_id if session_id else str(uuid4()))except Exception as e:documents = f"文件處理錯誤: {str(e)}"# 2. 歷史消息處理try:history = [literal_eval(item) for item in history]except (ValueError, SyntaxError):history = []# 控制歷史記錄長度histories = ""if history_len > 0:history = history[-2 * history_len:] # 每條記錄含role和content兩個元素# 格式化歷史消息for msg in history:role = msg.get('role', 'unknown')content = msg.get('content', '')histories += f"{role}:{content}\n\n"async def agent_chat_iterator() -> AsyncIterable[str]:"""流式響應生成器"""# 初始化回調處理器callback = CustomAsyncIteratorCallbackHandler()callbacks = [callback]# 3. 初始化LLMchat_model = ChatOpenAI(model=chat_model_name,api_key=api_key,base_url=base_url,temperature=temperature,max_tokens=max_tokens,streaming=True,callbacks=callbacks)# 4. 構建提示模板system_prompt = SystemMessagePromptTemplate.from_template(sys_prompt)human_prompt = HumanMessagePromptTemplate.from_template(PROMPT_TEMPLATES["agent"] # 使用ReAct模板)chat_prompt = ChatPromptTemplate.from_messages([system_prompt, human_prompt])# 5. 創建Agentagent = create_react_agent(chat_model,tools,chat_prompt,stop_sequence=["\nObservation:"] # 修正停止標記)agent_executor = AgentExecutor.from_agent_and_tools(agent=agent,tools=tools,verbose=True,)# 6. 執行Agent任務code_interpreter.output_files, code_interpreter.output_codes = "", ""task = asyncio.create_task(agent_executor.acall(inputs={"input": query,"history": histories,"documents": documents}))# 7. 流式輸出async for token in callback.aiter():yield json.dumps({"answer": token}).encode('utf-8')# 8. 處理代碼解釋器輸出await asyncio.wait_for(task, TIME_OUT)output_files, output_codes = code_interpreter.get_outputs()if output_files:yield json.dumps({"answer": f'\n\n{output_codes}\n\n'}).encode('utf-8')elif output_codes:yield json.dumps({"answer": f'\n\n{output_codes}'}).encode('utf-8')return StreamingResponse(agent_chat_iterator(), media_type="application/json")
5.2、主程序
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from chat.chat_routes import chat_router
from fastapi.staticfiles import StaticFiles
from configs.setting import MEDIA_DIR# 創建FastAPI實例
app = FastAPI()# 允許跨域請求
app.add_middleware(CORSMiddleware,allow_origins=["*"],allow_credentials=True,allow_methods=["*"],allow_headers=["*"],
)# 掛載路由
app.include_router(chat_router)# 掛載靜態文件目錄
app.mount("/media", StaticFiles(directory=MEDIA_DIR), name="media")# 程序主入口
if __name__ == "__main__":# 導入unicorn服務器的包import uvicorn# 運行服務器uvicorn.run(app, host="127.0.0.1", port=6605, log_level="info")
?