目錄
- 完整代碼
- 代碼解釋
- 1. 導入和初始化
- 2. 類型定義
- 3. MCP_ChatBot 類初始化
- 4. 查詢處理 (process_query)
- 5. 服務器連接管理
- 6. 核心特性總結
- 示例
完整代碼
原課程代碼是用Anthropic寫的,下面代碼是用OpenAI改寫的,模型則用阿里巴巴的模型做測試
.env 文件為:
OPENAI_API_KEY=sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
OPENAI_API_BASE=https://dashscope.aliyuncs.com/compatible-mode/v1
另外,課程代碼只是單輪對話,下面代碼修改為多輪對話,更適合千問模型的調用方式
from dotenv import load_dotenv
import openai
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from typing import List,TypedDict
from contextlib import AsyncExitStack
from typing import Dict
import asyncio
import json
import osload_dotenv()class ToolDefinition(TypedDict):name: strdescription: strinput_schema: dictclass MCP_ChatBot:def __init__(self):# Initialize session and client objectsself.sessions: List[ClientSession] = [] # newself.exit_stack = AsyncExitStack()self.client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"),base_url=os.getenv("OPENAI_API_BASE"))self.available_tools: List[ToolDefinition] = [] # newself.tool_to_session: Dict[str, ClientSession] = {}self.messages = []async def process_query(self, query):self.messages.append({'role':'user', 'content':query})response = self.client.chat.completions.create(model='qwen-turbo',# max_tokens=2024,tools=self.available_tools,messages=self.messages )process_query = Truewhile process_query:# 獲取助手的回復message = response.choices[0].message# 檢查是否有普通文本內容if message.content:print(message.content)process_query = False# 檢查是否有工具調用elif message.tool_calls:# 添加助手消息到歷史self.messages.append({"role": "assistant", "content": None,"tool_calls": message.tool_calls})# 處理每個工具調用for tool_call in message.tool_calls:tool_id = tool_call.idtool_name = tool_call.function.nametool_args = json.loads(tool_call.function.arguments)print(f"Calling tool {tool_name} with args {tool_args}")# 執行工具調用session = self.tool_to_session[tool_name]result = await session.call_tool(tool_name, arguments=tool_args)# 添加工具結果到消息歷史self.messages.append({"role": "tool","tool_call_id": tool_id,"content": result.content})# 獲取下一個回復response = self.client.chat.completions.create(model='qwen-turbo',# max_tokens=2024,tools=self.available_tools,messages=self.messages )self.messages.append({"role": "assistant", "content": response.choices[0].message.content})# 如果只有文本回復,則結束處理if response.choices[0].message.content and not response.choices[0].message.tool_calls:print(response.choices[0].message.content)process_query = Falseasync def chat_loop(self):"""Run an interactive chat loop"""print("\nMCP Chatbot Started!")print("Type your queries or 'quit' to exit.")while True:try:query = input("\nQuery: ").strip()if query.lower() == 'quit':breakawait self.process_query(query)print("\n")except Exception as e:print(f"\nError: {str(e)}")async def connect_to_server(self, server_name: str, server_config: dict) -> None:"""Connect to a single MCP server."""try:server_params = StdioServerParameters(**server_config)stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))read, write = stdio_transportsession = await self.exit_stack.enter_async_context(ClientSession(read, write))await session.initialize()self.sessions.append(session)# List available tools for this sessionresponse = await session.list_tools()tools = response.toolsprint(f"\nConnected to {server_name} with tools:", [t.name for t in tools])for tool in tools: # newself.tool_to_session[tool.name] = sessionself.available_tools.append({"type": "function","function": {"name": tool.name,"description": tool.description,"parameters": tool.inputSchema}})except Exception as e:print(f"Failed to connect to {server_name}: {e}")async def connect_to_servers(self): # new"""Connect to all configured MCP servers."""try:with open("server_config.json", "r") as file:data = json.load(file)servers = data.get("mcpServers", {})for server_name, server_config in servers.items():await self.connect_to_server(server_name, server_config)except Exception as e:print(f"Error loading server configuration: {e}")raise async def clenup(self):await self.exit_stack.aclose()async def main():chatbot = MCP_ChatBot()try:await chatbot.connect_to_servers()await chatbot.chat_loop()finally:await chatbot.clenup()if __name__ == "__main__":asyncio.run(main())"""
1、Fetch the content of this website: https://modelcontextprotocol.io/docs/concepts/architecture.
2、save the content in the file "mcp_summary.md"
"""
代碼解釋
1. 導入和初始化
from dotenv import load_dotenv
import openai
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from typing import List,TypedDict
from contextlib import AsyncExitStack
from typing import Dict
import asyncio
import json
import osload_dotenv()
- 導入必要的庫,包括OpenAI客戶端、MCP協議相關模塊、異步處理模塊等
load_dotenv()
加載環境變量配置
2. 類型定義
class ToolDefinition(TypedDict):name: strdescription: strinput_schema: dict
定義工具的類型結構,用于類型提示。
3. MCP_ChatBot 類初始化
class MCP_ChatBot:def __init__(self):self.sessions: List[ClientSession] = [] # 存儲多個MCP會話self.exit_stack = AsyncExitStack() # 管理異步資源self.client = openai.OpenAI( # OpenAI客戶端api_key=os.getenv("OPENAI_API_KEY"),base_url=os.getenv("OPENAI_API_BASE"))self.available_tools: List[ToolDefinition] = [] # 可用工具列表self.tool_to_session: Dict[str, ClientSession] = {} # 工具名到會話的映射self.messages = [] # 對話歷史
關鍵特性:
- 多會話支持:
sessions
列表存儲多個MCP服務器會話 - 工具映射:
tool_to_session
將工具名映射到對應的會話,實現工具路由 - 資源管理:使用
AsyncExitStack
管理異步資源的生命周期
4. 查詢處理 (process_query)
async def process_query(self, query):self.messages.append({'role':'user', 'content':query})response = self.client.chat.completions.create(model='qwen-turbo',tools=self.available_tools,messages=self.messages )
核心處理邏輯:
- 消息循環處理:使用
while process_query
循環處理多輪對話 - 工具調用處理:檢測并執行工具調用,通過
tool_to_session
路由到正確的MCP服務器 - 結果整合:將工具執行結果添加到對話歷史中
5. 服務器連接管理
async def connect_to_server(self, server_name: str, server_config: dict) -> None:# 建立單個服務器連接server_params = StdioServerParameters(**server_config)stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))# ... 獲取工具并建立映射for tool in tools:self.tool_to_session[tool.name] = sessionself.available_tools.append({"type": "function","function": {"name": tool.name,"description": tool.description,"parameters": tool.inputSchema}})
async def connect_to_servers(self):# 連接所有配置的服務器with open("server_config.json", "r") as file:data = json.load(file)servers = data.get("mcpServers", {})for server_name, server_config in servers.items():await self.connect_to_server(server_name, server_config)
6. 核心特性總結
多服務器支持:
- 可以同時連接多個MCP服務器
- 每個服務器的工具都被統一管理
- 通過工具名自動路由到正確的服務器
OpenAI格式兼容:
- 工具定義使用OpenAI的函數調用格式
- 支持完整的工具調用流程
異步處理:
- 全異步設計,支持并發處理
- 使用
AsyncExitStack
管理資源生命周期
配置化管理:
- 通過
server_config.json
配置多個服務器 - 支持動態加載服務器配置
這個實現相比單服務器版本的主要優勢是可以整合多個不同功能的MCP服務器,為用戶提供更豐富的工具集合。
示例
uv run connect_server_map_chatbot.py