前提條件:
1、吳恩達MCP課程(5):research_server_prompt_resource.py
2、server_config_prompt_resource.json文件
{"mcpServers": {"filesystem": {"command": "npx","args": ["--y","@modelcontextprotocol/server-filesystem","."]},"research": {"command": "uv","args": ["run", "research_server_prompt_resource.py"]},"fetch": {"command": "uvx","args": ["mcp-server-fetch"]}}
}
代碼
原課程用的anthropic的,下面改成openai,并用千問模型做測試
from dotenv import load_dotenv
import openai
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from contextlib import AsyncExitStack
import json
import asyncio
import osload_dotenv()class MCP_ChatBot:def __init__(self):self.exit_stack = AsyncExitStack()self.client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"),base_url=os.getenv("OPENAI_API_BASE"))# Tools list required for OpenAI APIself.available_tools = []# Prompts list for quick display self.available_prompts = []# Sessions dict maps tool/prompt names or resource URIs to MCP client sessionsself.sessions = {}async def connect_to_server(self, server_name, server_config):try:server_params = StdioServerParameters(**server_config)stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))read, write = stdio_transportsession = await self.exit_stack.enter_async_context(ClientSession(read, write))await session.initialize()try:# List available toolsresponse = await session.list_tools()for tool in response.tools:self.sessions[tool.name] = sessionself.available_tools.append({"type": "function","function": {"name": tool.name,"description": tool.description,"parameters": tool.inputSchema}})# List available promptsprompts_response = await session.list_prompts()if prompts_response and prompts_response.prompts:for prompt in prompts_response.prompts:self.sessions[prompt.name] = sessionself.available_prompts.append({"name": prompt.name,"description": prompt.description,"arguments": prompt.arguments})# List available resourcesresources_response = await session.list_resources()if resources_response and resources_response.resources:for resource in resources_response.resources:resource_uri = str(resource.uri)self.sessions[resource_uri] = sessionexcept Exception as e:print(f"Error {e}")except Exception as e:print(f"Error connecting to {server_name}: {e}")async def connect_to_servers(self):try:with open("server_config_prompt_resource.json", "r") as file:data = json.load(file)servers = data.get("mcpServers", {})for server_name, server_config in servers.items():await self.connect_to_server(server_name, server_config)except Exception as e:print(f"Error loading server config: {e}")raiseasync def process_query(self, query):messages = [{"role":"user", "content":query}]while True:response = self.client.chat.completions.create(model="qwen-turbo",tools=self.available_tools,messages=messages)message = response.choices[0].message# 檢查是否有普通文本內容if message.content:print(message.content)messages.append({"role": "assistant", "content": message.content})break# 檢查是否有工具調用elif message.tool_calls:# 添加助手消息到歷史messages.append({"role": "assistant", "content": None,"tool_calls": message.tool_calls})# 處理每個工具調用for tool_call in message.tool_calls:tool_id = tool_call.idtool_name = tool_call.function.nametool_args = json.loads(tool_call.function.arguments)print(f"Calling tool {tool_name} with args {tool_args}")# 獲取session并調用工具session = self.sessions.get(tool_name)if not session:print(f"Tool '{tool_name}' not found.")breakresult = await session.call_tool(tool_name, arguments=tool_args)# 添加工具結果到消息歷史messages.append({"role": "tool","tool_call_id": tool_id,"content": result.content})else:breakasync def get_resource(self, resource_uri):session = self.sessions.get(resource_uri)# Fallback for papers URIs - try any papers resource sessionif not session and resource_uri.startswith("papers://"):for uri, sess in self.sessions.items():if uri.startswith("papers://"):session = sessbreakif not session:print(f"Resource '{resource_uri}' not found.")returntry:result = await session.read_resource(uri=resource_uri)if result and result.contents:print(f"\nResource: {resource_uri}")print("Content:")print(result.contents[0].text)else:print("No content available.")except Exception as e:print(f"Error: {e}")async def list_prompts(self):"""List all available prompts."""if not self.available_prompts:print("No prompts available.")returnprint("\nAvailable prompts:")for prompt in self.available_prompts:print(f"- {prompt['name']}: {prompt['description']}")if prompt['arguments']:print(f" Arguments:")for arg in prompt['arguments']:arg_name = arg.name if hasattr(arg, 'name') else arg.get('name', '')print(f" - {arg_name}")async def execute_prompt(self, prompt_name, args):"""Execute a prompt with the given arguments."""session = self.sessions.get(prompt_name)if not session:print(f"Prompt '{prompt_name}' not found.")returntry:result = await session.get_prompt(prompt_name, arguments=args)if result and result.messages:prompt_content = result.messages[0].content# Extract text from content (handles different formats)if isinstance(prompt_content, str):text = prompt_contentelif hasattr(prompt_content, 'text'):text = prompt_content.textelse:# Handle list of content itemstext = " ".join(item.text if hasattr(item, 'text') else str(item) for item in prompt_content)print(f"\nExecuting prompt '{prompt_name}'...")await self.process_query(text)except Exception as e:print(f"Error: {e}")async def chat_loop(self):print("\nMCP Chatbot Started!")print("Type your queries or 'quit' to exit.")print("Use @folders to see available topics")print("Use @<topic> to search papers in that topic")print("Use /prompts to list available prompts")print("Use /prompt <name> <arg1=value1> to execute a prompt")while True:try:query = input("\nQuery: ").strip()if not query:continueif query.lower() == 'quit':break# Check for @resource syntax firstif query.startswith('@'):# Remove @ sign topic = query[1:]if topic == "folders":resource_uri = "papers://folders"else:resource_uri = f"papers://{topic}"await self.get_resource(resource_uri)continue# Check for /command syntaxif query.startswith('/'):parts = query.split()command = parts[0].lower()if command == '/prompts':await self.list_prompts()elif command == '/prompt':if len(parts) < 2:print("Usage: /prompt <name> <arg1=value1> <arg2=value2>")continueprompt_name = parts[1]args = {}# Parse argumentsfor arg in parts[2:]:if '=' in arg:key, value = arg.split('=', 1)args[key] = valueawait self.execute_prompt(prompt_name, args)else:print(f"Unknown command: {command}")continueawait self.process_query(query)except Exception as e:print(f"\nError: {str(e)}")async def cleanup(self):await self.exit_stack.aclose()async def main():chatbot = MCP_ChatBot()try:await chatbot.connect_to_servers()await chatbot.chat_loop()finally:await chatbot.cleanup()if __name__ == "__main__":asyncio.run(main())
代碼解釋
這段代碼實現了一個基于MCP(Model Context Protocol)的聊天機器人,能夠連接到多個MCP服務器,使用它們提供的工具、提示和資源。下面是對代碼的詳細解釋:
類結構與初始化
class MCP_ChatBot:def __init__(self):self.exit_stack = AsyncExitStack()self.client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"),base_url=os.getenv("OPENAI_API_BASE"))# Tools list required for OpenAI APIself.available_tools = []# Prompts list for quick display self.available_prompts = []# Sessions dict maps tool/prompt names or resource URIs to MCP client sessionsself.sessions = {}
AsyncExitStack
:用于管理多個異步上下文管理器,確保資源正確釋放openai.OpenAI
:創建OpenAI客戶端,使用環境變量中的API密鑰和基礎URLavailable_tools
:存儲可用工具列表,用于OpenAI API的工具調用功能available_prompts
:存儲可用提示列表,用于快速顯示sessions
:字典,將工具名稱、提示名稱或資源URI映射到對應的MCP客戶端會話
服務器連接
async def connect_to_server(self, server_name, server_config):# 創建服務器參數、建立連接并初始化會話# 獲取可用工具、提示和資源
這個方法負責:
- 使用提供的配置創建
StdioServerParameters
- 通過
stdio_client
建立與服務器的連接 - 創建并初始化
ClientSession
- 獲取服務器提供的工具、提示和資源
- 將它們添加到相應的列表和字典中
async def connect_to_servers(self):# 從配置文件加載服務器信息并連接到每個服務器
這個方法從server_config_prompt_resource.json
文件加載服務器配置,并為每個服務器調用connect_to_server
方法。
查詢處理
async def process_query(self, query):# 處理用戶查詢,使用OpenAI API和可用工具
這個方法是聊天機器人的核心,它:
- 創建包含用戶查詢的消息列表
- 調用OpenAI API,傳遞消息和可用工具
- 處理API的響應:
- 如果有普通文本內容,打印并添加到消息歷史
- 如果有工具調用,執行每個工具調用并將結果添加到消息歷史
工具調用的處理流程:
- 從響應中提取工具ID、名稱和參數
- 從
sessions
字典中獲取對應的會話 - 調用工具并獲取結果
- 將結果添加到消息歷史中
資源和提示管理
async def get_resource(self, resource_uri):# 獲取并顯示指定URI的資源內容
這個方法用于獲取和顯示資源內容,特別是論文資源。它會:
- 從
sessions
字典中獲取對應的會話 - 對于以
papers://
開頭的URI,如果找不到對應會話,會嘗試使用任何處理papers的會話 - 調用
read_resource
方法獲取資源內容 - 打印資源內容
async def list_prompts(self):# 列出所有可用的提示
這個方法列出所有可用的提示及其描述和參數。
async def execute_prompt(self, prompt_name, args):# 執行指定的提示,并處理結果
這個方法執行指定的提示,并將結果傳遞給process_query
方法進行處理。它處理不同格式的提示內容,確保能夠正確提取文本。
聊天循環
async def chat_loop(self):# 主聊天循環,處理用戶輸入
這個方法是聊天機器人的主循環,它:
- 打印歡迎信息和使用說明
- 循環接收用戶輸入
- 根據輸入的不同格式執行不同操作:
- 如果輸入是
quit
,退出循環 - 如果輸入以
@
開頭,調用get_resource
方法獲取資源 - 如果輸入以
/
開頭,執行命令(如列出提示或執行提示) - 否則,調用
process_query
方法處理普通查詢
- 如果輸入是
資源清理
async def cleanup(self):# 清理資源
這個方法調用exit_stack.aclose()
來清理所有資源。
主函數
async def main():chatbot = MCP_ChatBot()try:await chatbot.connect_to_servers()await chatbot.chat_loop()finally:await chatbot.cleanup()
主函數創建MCP_ChatBot
實例,連接到服務器,運行聊天循環,并確保在結束時清理資源。
代碼運行結果
uv run mcp_chatbot_prompt_resource.py