隨著微服務和模塊化架構的發展,越來越多的系統傾向于采用可插拔、高內聚的設計模式。
MCP(Modular, Collaborative,Pluggable)
架構正是這樣一種強調模塊化、協作性和擴展性的設計思想。它允許開發者以“組件”方式組合功能,提升系統的靈活性與可維護性。本項目的目標是:
? 使用OpenWeather API
實現天氣數據獲取
? 搭建一個基于MCP
架構的天氣查詢服務器
? 通過stdio
方式實現客戶端與服務器的交互
? 展示MCP
協議下服務端與客戶端的標準通信流程
1. Server搭建流程
這里嘗試一個入門級的示例,那就是創建一個天氣查詢的服務器。通過使用OpenWeather API
,創建一個能夠實時查詢天氣的服務器(server),并使用stdio
方式進行通信。?測試查詢效果:
curl -s "https://api.openweathermap.org/data/2.5/weather?q=Beijing&appid='YOUR_API_KEY'&units=metric&lang=zh_cn"
成功通過測試后,即可開始創建server。
2. 天氣查詢Server創建流程
2.1 Server依賴安裝
為了通過 HTTP 請求查詢天氣數據,請在當前虛擬環境中添加以下依賴項:
uv add mcp httpx
代碼編寫 : MCP基本執行流程如下
# -*- coding: utf-8 -*-
"""
天氣查詢服務端(基于 MCP 架構)
功能:通過 OpenWeather API 獲取指定城市的實時天氣,并格式化返回給人類可讀文本。
通信方式:使用 MCP 協議,通過標準輸入輸出(stdio)與客戶端交互。
"""import json
import logging
import os
import httpx
from typing import Any, Dict, Optional
from mcp.server.fastmcp import FastMCP # 導入 MCP 框架中的服務器核心類# ===========================================
# 1. 日志配置
# ===========================================# 配置基礎日志系統,便于調試和運行時監控
logging.basicConfig(level=logging.INFO, # 日志級別為 INFO,顯示信息、警告和錯誤format="%(asctime)s [%(levelname)s] %(message)s",handlers=[logging.StreamHandler() # 輸出到控制臺]
)
logger = logging.getLogger(__name__) # 創建一個獨立的日志記錄器# ===========================================
# 2. 初始化 MCP 服務器
# ===========================================# 創建一個名為 "WeatherServer" 的 MCP 服務實例
# 該實例將注冊工具函數(tool),供外部客戶端調用
mcp = FastMCP("WeatherServer")# ===========================================
# 3. OpenWeather API 配置
# ===========================================# OpenWeather API 的基礎 URL(獲取當前天氣數據)
OPENWEATHER_API_BASE = "https://api.openweathermap.org/data/2.5/weather"# 從環境變量中讀取 API Key,避免硬編碼敏感信息
API_KEY = os.getenv("OPENWEATHER_API_KEY")
if not API_KEY:raise ValueError("OpenWeather API Key 未設置!\n""請運行以下命令設置環境變量:\n""export OPENWEATHER_API_KEY=your_actual_api_key")# 設置請求頭中的 User-Agent,部分 API 會檢查此字段
USER_AGENT = "weather-client/1.0"# ===========================================
# 4. 異步函數:從 OpenWeather API 獲取天氣數據
# ===========================================async def fetch_weather(city: str) -> Dict[str, Any]:"""向 OpenWeather API 發起異步 HTTP 請求,獲取指定城市的天氣信息。參數:city (str): 城市名稱(英文,如 Beijing、Shanghai)返回:dict: 成功時返回天氣數據字典;失敗時返回包含 'error' 鍵的錯誤信息字典。"""# 構造請求參數params = {"q": city, # 查詢城市名"appid": API_KEY, # 認證密鑰"units": "metric", # 使用攝氏度(公制單位)"lang": "zh_cn" # 返回中文(簡體)描述}# 設置請求頭headers = {"User-Agent": USER_AGENT}# 使用 httpx 的異步客戶端發起請求async with httpx.AsyncClient() as client:try:logger.info(f"正在請求 OpenWeather API,城市: {city}")response = await client.get(url=OPENWEATHER_API_BASE,params=params,headers=headers,timeout=30.0 # 設置 30 秒超時,防止請求掛起)response.raise_for_status() # 如果狀態碼不是 2xx,拋出異常data = response.json() # 解析 JSON 響應logger.info(f"成功獲取天氣數據: {data.get('name')}, {data.get('sys', {}).get('country')}")return data # 返回原始天氣數據except httpx.HTTPStatusError as e:# HTTP 狀態碼錯誤(如 404 城市不存在,401 密鑰無效)status_code = e.response.status_codeerror_msg = f"HTTP 錯誤: {status_code}"logger.error(f"請求失敗 [{status_code}] 查詢城市: {city}")return {"error": error_msg}except httpx.RequestError as e:# 網絡連接錯誤(如 DNS 失敗、連接超時)logger.error(f"網絡請求失敗: {str(e)}")return {"error": f"網絡錯誤: {str(e)}"}except Exception as e:# 其他未預期的異常(如 JSON 解析失敗等)logger.error(f"未知異常: {str(e)}")return {"error": f"系統錯誤: {str(e)}"}# ===========================================
# 5. 函數:將天氣數據格式化為易讀的文本
# ===========================================def format_weather(data: Dict[str, Any]) -> str:"""將從 API 獲取的天氣數據字典轉換為人類可讀的格式化字符串。參數:data (dict): 包含天氣信息的字典(來自 fetch_weather 的返回值)返回:str: 格式化后的天氣信息,包含城市、溫度、濕度、風速和天氣狀況。若輸入包含 'error' 鍵,則返回錯誤提示。"""# 檢查是否為錯誤響應if "error" in return f"?? {data['error']}"# 從字典中安全提取各項數據,使用 .get() 提供默認值以防 KeyErrorcity = data.get("name", "未知城市")country = data.get("sys", {}).get("country", "未知國家")temp = data.get("main", {}).get("temp", "N/A") # 溫度humidity = data.get("main", {}).get("humidity", "N/A") # 濕度wind_speed = data.get("wind", {}).get("speed", "N/A") # 風速# 天氣描述可能在 'weather' 列表的第一個元素中weather_list = data.get("weather", [{}]) # 默認為空列表,提供一個空字典description = weather_list[0].get("description", "未知天氣")# 使用 emoji 增強可讀性,組織成多行文本formatted = (f"🌍 {city}, {country}\n"f"🌡 溫度: {temp}°C\n"f"💧 濕度: {humidity}%\n"f"🌬 風速: {wind_speed} m/s\n"f"🌤 天氣: {description}\n")return formatted# ===========================================
# 6. MCP 工具函數:對外暴露的天氣查詢接口
# ===========================================@mcp.tool()
async def query_weather(city: str) -> str:"""MCP 工具函數:供客戶端調用,查詢指定城市的天氣。此函數會被 MCP 框架自動注冊,并通過 stdio 與客戶端通信。參數:city (str): 要查詢的城市名稱(必須為英文,如 'Beijing')返回:str: 格式化后的天氣信息文本,或錯誤提示。示例調用(由 MCP 客戶端發起):{"tool": "query_weather", "arguments": {"city": "Beijing"}}"""logger.info(f"收到 MCP 客戶端請求:查詢城市天氣 -> {city}")# 第一步:獲取原始天氣數據(異步)raw_data = await fetch_weather(city)# 第二步:格式化為人類可讀文本result = format_weather(raw_data)# 記錄結果(僅記錄第一行,避免日志過長)logger.info(f"返回天氣信息 -> {result.splitlines()[0]}")# 返回結果給客戶端return result# ===========================================
# 7. 主程序入口:啟動 MCP 服務器
# ===========================================if __name__ == "__main__":"""當前腳本作為主程序運行時,啟動 MCP 服務器。服務器將通過標準輸入/輸出(stdio)與客戶端通信。"""logger.info("🟩 MCP 天氣查詢服務器已啟動...")logger.info("💡 等待客戶端通過 stdio 發送請求...")# 啟動服務器,監聽標準輸入輸出# transport='stdio' 表示使用標準流進行通信(適用于 MCP 客戶端集成)mcp.run(transport="stdio")logger.info("🛑 MCP 服務器已關閉。")
2.2 Client 創建流程
創建 MCP 客戶端項目
# 創建項目目錄
cd /root/autodl-tmp/MCP
uv init mcp-chatbot
cd mcp-chatbot
創建MCP客戶端虛擬環境
# 創建虛擬環境
uv venv# 激活虛擬環境
source .venv/bin/activate
uv
會自動識別當前項目主目錄并創建虛擬環境。?然后即可通過 add
方法在虛擬環境中安裝相關的庫。
# 安裝 MCP SDK
uv add mcp openai python-dotenv httpx
接下來創建.env
文件,并寫入OpenAI
的API-Key
,以及反向代理地址。借助反向代理,國內可以無門檻直連OpenAI
官方服務器,并調用官方API
。
寫入如下內容:
LLM_API_KEY="your-openai-api-key"
BASE_URL="https://your-reverse-proxy-url.com/v1"
MODEL=gpt-4o
創建servers_config.json:
創建weather_server.py:
import json
import logging
import os
import httpx
from typing import Any, Dict
from mcp.server.fastmcp import FastMCP# ===========================================
# 日志配置
# ===========================================
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)# ===========================================
# 初始化 MCP 服務器
# ===========================================
mcp = FastMCP("WeatherServer")# ===========================================
# OpenWeather API 配置
# ===========================================
OPENWEATHER_API_BASE = "https://api.openweathermap.org/data/2.5/weather"
API_KEY = os.getenv("OPENWEATHER_API_KEY")
if not API_KEY:raise ValueError("請設置環境變量 OPENWEATHER_API_KEY")USER_AGENT = "weather-app/1.0"# ===========================================
# 異步函數:從 OpenWeather API 獲取天氣數據
# ===========================================
async def fetch_weather(city: str) -> Dict[str, Any]:"""從 OpenWeather API 獲取指定城市的天氣信息。:param city: 城市名稱(英文):return: 包含天氣數據或錯誤信息的字典"""params = {"q": city,"appid": API_KEY,"units": "metric","lang": "zh_cn",}headers = {"User-Agent": USER_AGENT}async with httpx.AsyncClient() as client:try:response = await client.get(OPENWEATHER_API_BASE,params=params,headers=headers,timeout=30.0)response.raise_for_status()return response.json()except httpx.HTTPStatusError as e:status = e.response.status_codelogger.error(f"HTTP 錯誤 [{status}] 查詢城市: {city}")return {"error": f"HTTP 錯誤: {status}"}except Exception as e:logger.error(f"請求失敗: {str(e)}")return {"error": f"請求失敗: {str(e)}"}# ===========================================
# 函數:格式化天氣數據為人類可讀文本
# 支持傳入 dict 或 JSON 字符串
# ===========================================
def format_weather(data: Dict[str, Any] | str) -> str:"""將天氣數據格式化為易讀的文本。支持輸入為字典或 JSON 字符串。:param data: 天氣數據(dict 或 JSON 字符串):return: 格式化后的天氣信息字符串"""# 如果傳入的是字符串,嘗試解析為字典if isinstance(data, str):try:data = json.loads(data)except Exception as e:return f"無法解析天氣數據: {e}"# 如果數據中包含錯誤信息,直接返回錯誤提示if "error" inreturn f"?? {data['error']}"# 提取數據時做容錯處理,防止 KeyErrorcity = data.get("name", "未知")country = data.get("sys", {}).get("country", "未知")temp = data.get("main", {}).get("temp", "N/A")humidity = data.get("main", {}).get("humidity", "N/A")wind_speed = data.get("wind", {}).get("speed", "N/A")# weather 字段可能為空列表,提供默認值避免索引錯誤weather_list = data.get("weather", [{}])description = weather_list[0].get("description", "未知")# 使用 emoji 增強可讀性,組織成多行輸出return (f"🌍 {city}, {country}\n"f"🌡 溫度: {temp}°C\n"f"💧 濕度: {humidity}%\n"f"🌬 風速: {wind_speed} m/s\n"f"🌤 天氣: {description}\n")# ===========================================
# MCP 工具函數:查詢天氣
# ===========================================
@mcp.tool()
async def query_weather(city: str) -> str:"""輸入指定城市的英文名稱,返回今日天氣查詢結果。:param city: 城市名稱(需使用英文,如 Beijing):return: 格式化后的天氣信息"""logger.info(f"正在查詢城市天氣: {city}")data = await fetch_weather(city)result = format_weather(data)return result# ===========================================
# 主程序入口
# ===========================================
if __name__ == "__main__":"""啟動 MCP 服務器,通過標準輸入輸出(stdio)與客戶端通信。"""logger.info("🟩 天氣查詢服務器已啟動,等待客戶端請求...")mcp.run(transport="stdio")logger.info("🛑 服務器已關閉。")
然后在config.json
中寫入如下內容:
{"mcpServers": {"weather": {"command": "python","args": ["weather_server.py"]}}
}
創建main.py:
# -*- coding: utf-8 -*-
"""
main.py - 基于 MCP 協議的多服務器 AI 客戶端主程序
實現功能:加載配置、連接多個 MCP 服務器、集成 OpenAI Function Calling,支持大模型動態調用外部工具(如天氣查詢、數據庫操作等)。
"""# ===========================================
# 一、導入所需庫
# ===========================================import asyncio
import json
import logging
import os
import shutil
from contextlib import AsyncExitStack
from typing import Any, Dict, List, Optionalimport httpx
from dotenv import load_dotenv
from openai import OpenAI # OpenAI Python SDK,用于與 OpenAI 兼容 API 交互
from mcp import ClientSession, StdioServerParameters # MCP 協議核心庫
from mcp.client.stdio import stdio_client # 通過 stdio 與 MCP 服務器通信"""
導入的庫說明:- asyncio: Python 中的異步編程庫,用于處理異步任務(如并發連接多個服務器)。
- json: 用于序列化和反序列化 JSON 格式數據,讀取配置文件。
- logging: 配置日志輸出,便于調試和運行時監控。
- os: 與操作系統交互,讀取環境變量(如 API Key)。
- shutil: 提供文件和目錄的高層操作接口(當前未使用,但可用于未來擴展)。
- contextlib: 提供異步上下文管理器 AsyncExitStack,用于資源自動清理。
- httpx: 異步 HTTP 客戶端庫,可用于未來擴展(如健康檢查、Web 請求)。
- dotenv: 從 .env 文件加載環境變量,避免硬編碼敏感信息。
- openai: OpenAI Python SDK,用于調用大模型 API(支持 Function Calling)。
- mcp: MCP(Model Context Protocol)協議客戶端庫,用于與本地工具服務器通信。
"""# ===========================================
# 二、配置加載類 (Configuration)
# ===========================================class Configuration:"""功能: 管理 MCP 客戶端的環境變量和配置文件。方法:__init__: 從 .env 文件加載環境變量,獲取 LLM_API_KEY、BASE_URL 和 MODEL。load_config: 從指定路徑加載 JSON 配置文件,返回配置字典。"""def __init__(self) -> None:"""初始化配置類,加載 .env 文件并讀取關鍵環境變量。"""load_dotenv() # 加載 .env 文件中的環境變量# 從環境變量中讀取 LLM 配置self.api_key = os.getenv("LLM_API_KEY")self.base_url = os.getenv("BASE_URL")self.model = os.getenv("MODEL")# 若未設置 API Key,拋出異常if not self.api_key:raise ValueError("? 未找到 LLM_API_KEY,請在 .env 文件中配置")@staticmethoddef load_config(file_path: str) -> Dict[str, Any]:"""從 JSON 文件加載服務器配置。Args:file_path (str): JSON 配置文件路徑(如 "servers_config.json")Returns:Dict[str, Any]: 解析后的服務器配置字典Raises:FileNotFoundError: 若文件不存在json.JSONDecodeError: 若 JSON 格式錯誤"""with open(file_path, "r", encoding="utf-8") as f:return json.load(f)# ===========================================
# 三、MCP 服務器客戶端類 (Server)
# ===========================================class Server:"""功能: 管理單個 MCP 服務器的連接、工具調用與資源清理。方法:initialize: 初始化與 MCP 服務器的連接,使用 stdio_client 建立通信。list_tools: 獲取該服務器支持的所有工具列表。execute_tool: 執行指定工具,支持重試機制。cleanup: 清理資源,關閉與服務器的連接。"""def __init__(self, name: str, config: Dict[str, Any]) -> None:self.name: str = name # 服務器名稱(如 "weather")self.config: Dict[str, Any] = config # 服務器配置(command, args 等)self.session: Optional[ClientSession] = None # MCP 會話對象self.exit_stack: AsyncExitStack = AsyncExitStack() # 資源管理器self._cleanup_lock = asyncio.Lock() # 防止并發清理async def initialize(self) -> None:"""初始化與 MCP 服務器的連接。使用 stdio_client 啟動服務器進程并通過標準輸入輸出通信。"""command = self.config["command"]if command is None:raise ValueError("command 不能為空")# 構建服務器啟動參數server_params = StdioServerParameters(command=command,args=self.config["args"],env={**os.environ, **self.config["env"]} if self.config.get("env") else None,)try:# 建立 stdio 通信通道stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))read_stream, write_stream = stdio_transport# 創建 MCP 會話session = await self.exit_stack.enter_async_context(ClientSession(read_stream, write_stream))await session.initialize() # 完成握手協議self.session = sessionlogging.info(f"? 成功連接到 MCP 服務器: {self.name}")except Exception as e:logging.error(f"? 初始化服務器失敗 {self.name}: {e}")await self.cleanup()raiseasync def list_tools(self) -> List[Any]:"""獲取該 MCP 服務器支持的工具列表。Returns:List[Any]: 工具對象列表,包含名稱、描述和輸入 schema。"""if not self.session:raise RuntimeError(f"Server {self.name} not initialized")tools_response = await self.session.list_tools()tools = []for item in tools_response:if isinstance(item, tuple) and item[0] == "tools":for tool in item[1]:tools.append(Tool(tool.name, tool.description, tool.inputSchema))return toolsasync def execute_tool(self,tool_name: str,arguments: Dict[str, Any],retries: int = 2,delay: float = 1.0) -> Any:"""執行指定工具,并支持重試機制。Args:tool_name (str): 工具名稱arguments (Dict[str, Any]): 工具調用參數retries (int): 最大重試次數delay (float): 每次重試之間的延遲(秒)Returns:Any: 工具調用結果"""if not self.session:raise RuntimeError(f"Server {self.name} not initialized")attempt = 0while attempt < retries:try:logging.info(f"🔁 執行工具: {tool_name} (服務器: {self.name})")result = await self.session.call_tool(tool_name, arguments)return resultexcept Exception as e:attempt += 1logging.warning(f"?? 工具執行失敗: {e} (第 {attempt} 次嘗試)")if attempt < retries:await asyncio.sleep(delay)else:logging.error("? 已達到最大重試次數,調用失敗。")raiseasync def cleanup(self) -> None:"""清理服務器資源,關閉通信通道。"""async with self._cleanup_lock:try:await self.exit_stack.aclose()self.session = Nonelogging.info(f"🔌 已斷開服務器: {self.name}")except Exception as e:logging.error(f"? 清理服務器 {self.name} 時出錯: {e}")# ===========================================
# 四、工具封裝類 (Tool)
# ===========================================class Tool:"""功能: 封裝從 MCP 服務器獲取的工具信息,便于傳遞給 LLM。方法:format_for_llm: 將工具信息格式化為適合 LLM 理解的文本描述。"""def __init__(self, name: str, description: str, input_schema: Dict[str, Any]) -> None:self.name: str = nameself.description: str = descriptionself.input_schema: Dict[str, Any] = input_schemadef format_for_llm(self) -> str:"""生成用于 LLM 提示的工具描述文本。Returns:str: 包含工具名、描述、參數及是否必填的格式化字符串。"""args_desc = []if "properties" in self.input_schema:for param_name, param_info in self.input_schema["properties"].items():arg_desc = f"- {param_name}: {param_info.get('description', 'No description')}"if param_name in self.input_schema.get("required", []):arg_desc += " (required)"args_desc.append(arg_desc)return (f"Tool: {self.name}\n"f"Description: {self.description}\n"f"Arguments:\n"f"{chr(10).join(args_desc)}")# ===========================================
# 五、LLM 客戶端封裝類 (LLMClient)
# ===========================================class LLMClient:"""功能: 使用 OpenAI SDK 與大語言模型進行交互,支持 Function Calling。方法:get_response: 向大模型發送消息,并可傳入工具定義(Function Calling 格式)。"""def __init__(self, api_key: str, base_url: Optional[str], model: str) -> None:self.client = OpenAI(api_key=api_key, base_url=base_url)self.model = modeldef get_response(self,messages: List[Dict[str, Any]],tools: Optional[List[Dict[str, Any]]] = None) -> Any:"""發送消息給大模型 API,支持傳入工具參數(即 Function Calling 格式)。Args:messages (List[Dict]): 對話消息列表tools (List[Dict]): 工具定義列表(可選)Returns:Any: API 返回的響應對象"""payload = {"model": self.model,"messages": messages,"tools": tools,}try:response = self.client.chat.completions.create(**payload)return responseexcept Exception as e:logging.error(f"? 調用 LLM 失敗: {e}")raise# ===========================================
# 六、多服務器 MCP 客戶端類 (MultiServerMCPClient)
# ===========================================class MultiServerMCPClient:"""功能: 管理多個 MCP 服務器,并使用 OpenAI Function Calling 機制與大模型交互。方法:connect_to_servers: 根據配置文件啟動多個服務器,獲取并注冊工具。transform_json: 將 MCP 工具 schema 轉換為 OpenAI 所需格式。chat_base: 核心對話邏輯,支持多輪工具調用。create_function_response_messages: 執行工具調用并將結果注入對話。process_query: 處理用戶查詢,返回最終 AI 回答。_call_mcp_tool: 根據 server_tool 格式調用對應 MCP 工具。chat_loop: 主交互循環,接收用戶輸入并輸出 AI 回答。cleanup: 關閉所有服務器連接和資源。"""def __init__(self) -> None:self.exit_stack = AsyncExitStack()config = Configuration()self.openai_api_key = config.api_keyself.base_url = config.base_urlself.model = config.modelself.client = LLMClient(self.openai_api_key, self.base_url, self.model)self.servers: Dict[str, Server] = {} # server_name -> Server 實例self.tools_by_server: Dict[str, List[Any]] = {} # server -> toolsself.all_tools: List[Dict[str, Any]] = [] # 所有工具(OpenAI 格式)async def connect_to_servers(self, servers_config: Dict[str, Any]) -> None:"""根據配置文件同時啟動多個 MCP 服務器并獲取其工具列表。Args:servers_config (Dict): 包含 mcpServers 的配置字典"""mcp_servers = servers_config.get("mcpServers", {})for server_name, srv_config in mcp_servers.items():server = Server(server_name, srv_config)await server.initialize()self.servers[server_name] = servertools = await server.list_tools()self.tools_by_server[server_name] = toolsfor tool in tools:function_name = f"{server_name}_{tool.name}"self.all_tools.append({"type": "function","function": {"name": function_name,"description": tool.description,"input_schema": tool.input_schema}})# 轉換為 OpenAI Function Calling 所需格式self.all_tools = await self.transform_json(self.all_tools)logging.info("\n? 已連接到下列服務器:")for name in self.servers:srv_cfg = mcp_servers[name]logging.info(f" - {name}: command={srv_cfg['command']}, args={srv_cfg['args']}")logging.info("\n匯總的工具:")for t in self.all_tools:logging.info(f" - {t['function']['name']}")async def transform_json(self, json_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:"""將工具的 input_schema 轉換為 OpenAI 所需的 parameters 格式。Args:json_data (List[Dict]): 原始工具列表Returns:List[Dict]: 轉換后的 OpenAI 兼容格式"""result = []for item in json_data:if not isinstance(item, dict) or "type" not in item or "function" not in item:continueold_func = item["function"]if not isinstance(old_func, dict) or "name" not in old_func or "description" not in old_func:continuenew_func = {"name": old_func["name"],"description": old_func["description"],"parameters": {}}if "input_schema" in old_func and isinstance(old_func["input_schema"], dict):old_schema = old_func["input_schema"]new_func["parameters"]["type"] = old_schema.get("type", "object")new_func["parameters"]["properties"] = old_schema.get("properties", {})new_func["parameters"]["required"] = old_schema.get("required", [])new_item = {"type": item["type"],"function": new_func}result.append(new_item)return resultasync def chat_base(self, messages: List[Dict[str, Any]]) -> Any:"""使用 OpenAI 接口進行對話,支持多次工具調用。Args:messages (List[Dict]): 當前對話上下文Returns:Any: 模型最終響應"""response = self.client.get_response(messages, tools=self.all_tools)if response.choices[0].finish_reason == "tool_calls":while True:messages = await self.create_function_response_messages(messages, response)response = self.client.get_response(messages, tools=self.all_tools)if response.choices[0].finish_reason != "tool_calls":breakreturn responseasync def create_function_response_messages(self,messages: List[Dict[str, Any]],response: Any) -> List[Dict[str, Any]]:"""解析模型返回的工具調用,執行并返回結果。Args:messages: 原始消息列表response: 模型返回的包含 tool_calls 的響應Returns:更新后的消息列表,包含工具調用結果"""function_call_messages = response.choices[0].message.tool_callsmessages.append(response.choices[0].message.model_dump())for function_call_message in function_call_messages:tool_name = function_call_message.function.nametool_args = json.loads(function_call_message.function.arguments)function_response = await self._call_mcp_tool(tool_name, tool_args)messages.append({"role": "tool","content": function_response,"tool_call_id": function_call_message.id,})return messagesasync def process_query(self, user_query: str) -> str:"""處理用戶查詢,支持模型調用工具并返回最終回答。Args:user_query (str): 用戶輸入的問題Returns:str: AI 的最終回答"""messages = [{"role": "user", "content": user_query}]response = self.client.get_response(messages, tools=self.all_tools)content = response.choices[0]if content.finish_reason == "tool_calls":tool_call = content.message.tool_calls[0]tool_name = tool_call.function.nametool_args = json.loads(tool_call.function.arguments)logging.info(f"\n[ 調用工具: {tool_name}, 參數: {tool_args} ]\n")result = await self._call_mcp_tool(tool_name, tool_args)messages.append(content.message.model_dump())messages.append({"role": "tool","content": result,"tool_call_id": tool_call.id,})response = self.client.get_response(messages, tools=self.all_tools)return response.choices[0].message.contentreturn content.message.contentasync def _call_mcp_tool(self, tool_full_name: str, tool_args: Dict[str, Any]) -> str:"""調用 MCP 工具,支持 server_tool 格式解析。Args:tool_full_name: 如 "weather_query_weather"tool_args: 工具參數Returns:str: 工具執行結果"""parts = tool_full_name.split("_", 1)if len(parts) != 2:return f"? 無效的工具名稱: {tool_full_name}"server_name, tool_name = partsserver = self.servers.get(server_name)if not server:return f"? 找不到服務器: {server_name}"resp = await server.execute_tool(tool_name, tool_args)return resp.content if resp.content else "工具執行無輸出"async def chat_loop(self) -> None:"""主聊天循環,接收用戶輸入并輸出 AI 回答。輸入 'quit' 可退出。"""logging.info("\n🤖 多服務器 MCP + Function Calling 客戶端已啟動!輸入 'quit' 退出。")messages: List[Dict[str, Any]] = []while True:query = input("\n你: ").strip()if query.lower() == "quit":breaktry:messages.append({"role": "user", "content": query})messages = messages[-20:] # 保留最近 20 條response = await self.chat_base(messages)messages.append(response.choices[0].message.model_dump())result = response.choices[0].message.contentprint(f"\nAI: {result}")except Exception as e:print(f"\n?? 調用過程出錯: {e}")async def cleanup(self) -> None:"""關閉所有資源。"""await self.exit_stack.aclose()# ===========================================
# 七、主函數 (main)
# ===========================================async def main() -> None:"""主程序入口點。流程:1. 加載 .env 環境變量和 JSON 配置文件。2. 連接所有 MCP 服務器并獲取工具。3. 啟動交互式聊天循環。4. 程序結束時清理資源。異常處理:- 捕獲配置文件錯誤、連接失敗、調用異常等。- 輸出詳細日志,確保程序優雅退出。"""config = Configuration()servers_config = config.load_config("servers_config.json")client = MultiServerMCPClient()try:await client.connect_to_servers(servers_config)await client.chat_loop()finally:try:await asyncio.sleep(0.1)await client.cleanup()except RuntimeError as e:if "Attempted to exit cancel scope" in str(e):logging.info("退出時檢測到 cancel scope 異常,已忽略。")else:raise# ===========================================
# 八、程序啟動入口
# ===========================================if __name__ == "__main__":"""程序啟動入口,運行主異步函數。"""asyncio.run(main())
3. 運行測試
# 當前項目的主目錄下輸入uv run進行運行
uv run main.py
可以進行多輪對話并進行天氣查詢:
并支持多工具并行調用:
4. 拓展工具集成
在當前主目錄下創建 write_server.py
服務器:
# write_server.py
from typing import Any
from mcp.server.fastmcp import FastMCP
import os# 初始化 MCP 服務器
mcp = FastMCP("WriteServer")@mcp.tool()
async def write_file(content: str) -> str:"""將內容寫入 output.txt 文件。"""try:# 確保目錄存在directory = os.path.dirname("output.txt")if directory:os.makedirs(directory, exist_ok=True)# 寫入文件with open("output.txt", "w", encoding="utf-8") as f:f.write(content)return "? 已成功寫入本地文件。"except PermissionError:return "? 無寫入權限。"except Exception as e:return f"? 寫入失敗: {str(e)}"if __name__ == "__main__":mcp.run(transport="stdio")
同時寫入配置文件:
{"mcpServers": {"weather": {"command": "python","args": ["weather_server.py"]},"write": {"command": "python","args": ["write_server.py"]}}
}
重啟對話:
5. MPC服務器在線管理與實時下載
5.1 npm registry 簡介介紹
npm registry(Node Package Manager Registry)
是一個 開源的JavaScript
包管理平臺,它存儲著成千上萬的JavaScript
和Node.js
庫、工具和框架。開發者可以將自己的代碼庫作為包發布到npm registry,
供其他開發者使用。它是npm (Node Package Manager)
工具的核心組件,npm
是當前最流行的JavaScript
包管理工具,廣泛應用于前端和后端開發中。
npm registry
的作用是為JavaScript/Node.js
開發者 提供一個集中的資源庫,用戶可以通過npm
或npx
等工具來安裝、更新和使用這些包。除此之外,npm registry
還支持其他語言的工具和腳本,比如通過uvx,Python
工具也能方便地通過npm registry
進行下載和管理。
優點:
-
無需手動下載和安裝依賴
通過
npm
或npx
,開發者可以輕松地 實時下載并運行 所需的包,無需手動下載、解壓和安裝依賴項。npx
甚至支持臨時下載并執行工具,而不必安裝到本地環境中,減少了不必要的手動操作。 -
集中管理和共享
npm registry
提供了一個集中管理和分發代碼的場所,開發者可以方便地發布自己的工具、庫,并與全球其他開發者共享。這促進了 開源生態系統 的發展,并且讓其他開發者能夠輕松使用這些工具。 -
跨語言支持(通過 uvx)
npm registry
是以JavaScript/Node.js
為主,但通過uvx
等工具,它也可以方便地管理 Python 包 和其他語言的工具,這使得 跨語言開發 更加簡潔和高效。 -
簡化依賴管理和版本控制
在開發過程中,
npm registry
不僅能幫助開發者快速獲取第三方庫,還能自動處理依賴版本的管理。通過npm
配置文件(如package.json
),開發者可以清晰地查看和管理項目所依賴的所有庫,并且可以隨時更新、安裝或回滾特定版本。 -
跨平臺支持
npm registry
支持的工具和包廣泛適用于不同操作系統(如Windows、macOS、Linux
等)。npm registry
提供了一個集中、開放、實時更新的生態系統,極大地簡化了開發者在項目中使用外部工具和庫的過程。開發者只需要通過簡單的命令(如npm install
或npx
),就能實時下載最新版本的庫、工具和框架,而無需處理繁瑣的版本管理和依賴配置。實時下載和運行工具包的便捷性,使得開發工作更加高效,能夠快速迭代和創新,同時促進了開源社區的蓬勃發展。
5.2 將開發好的庫上傳至npm registry
接下來我們嘗試將一個 Python 編寫的
MCP
服務器 發布為一個npm
包,并能夠通過npx
或uvx
快速運行該服務器。這種方法使得您可以跨平臺發布和使用 Python 腳本,而不需要其他開發者手動安裝和配置 Python 環境。
(1) 準備 Python 代碼
編寫一個Python腳本,也就是一個MCP服務器。以查詢天氣為例!
(2) 創建一個 Node.js 項目
初始化
Node.js
項目:首先,我們需要一個package.json
文件,這是npm
包的核心配置文件。我們可以通過npm init
命令來初始化一個新的Node.js
項目。
# 打開終端,進入到項目文件夾,然后運行以下命令
npm init
創建一個新的 package.json
文件。在提問時,可以按默認值按下 Enter
,或者輸入自定義內容。
安裝 uvx 工具
npm install uvx --save
(3) 配置 package.json 來運行 Python 腳本
-
在 package.json 文件中,添加一個 bin 字段,告訴 npm 包如何啟動我們的 Python 腳本。
-
打開 package.json 文件,并將其修改為類似下面的樣子:
{"name": "weather-server","version": "1.0.0","description": "A weather server that fetches weather data from OpenWeather API","main": "index.js","bin": {"mcp-server-git": "./weather_server.py"},"dependencies": {"uvx": "^latest"},"scripts": {"start": "uvx weather-server"},"author": "","license": "ISC" }
-
bin
字段:將我們的 Python 腳本路徑指定為命令。這里,"mcp-server-git"
將成為用戶運行命令時執行的腳本名稱,"./weather_server.py"
指定 Python 腳本路徑。 -
scripts
字段:指定使用uvx
啟動 Python 腳本。在項目根目錄下創建一個簡單的
index.js
文件來調用 Python 腳本# index.js 文件 const { exec } = require('child_process');exec('python weather_server.py --api_key YOUR_API_KEY', (error, stdout, stderr) => {if (error) {console.error(`exec error: ${error}`);return;}console.log(`stdout: ${stdout}`);console.error(`stderr: ${stderr}`); });
這個腳本將運行我們的 Python 腳本并傳遞 API Key
(4) 創建一個 .npmignore 文件
如果項目包含不需要發布到 npm 的文件(如 Python 環境相關的文件、緩存文件等),可以在項目根目錄創建一個 .npmignore
文件,并列出這些文件。
*.pyc
__pycache__
*.env
(5) 發布包到 npm
-
登錄您的 npm賬號
npm login
注意這里需要訪問npm
官方網站:https://www.npmjs.com/signup進行注冊,并且設置npm為官方鏡像源:npm config set registry https://registry.npmjs.org/
然后才能順利的登錄和發布,發布到 npm:使用以下命令將您的包發布到
npm registry
npm publish
這將把包上傳到
npm registry
,其他用戶就可以通過npx
或uvx
下載并運行您的 Python 服務器了。
(6) 使用 npx 或 uvx 來運行 MCP 服務器
發布成功后,嘗試在Cherry Studio中運行這個天氣查詢服務器。
6. 總結
- MCP 服務端構建
- 使用
mcp.server.fastmcp
搭建了基于FastMCP
的天氣查詢服務器;- 通過
httpx
異步調用 OpenWeather API,獲取實時天氣數據;- 實現了結構化錯誤處理與中文格式化輸出,提升可讀性。
- 客戶端集成與交互
- 基于
MultiServerMCPClient
構建了支持多服務器管理的客戶端;- 實現了 OpenAI Function Calling 風格的工具調用機制;
- 支持多輪對話、上下文維護與異常重試,具備生產級穩定性。
- 多工具擴展能力
- 成功集成
write_file
工具,實現“查詢天氣 + 寫入文件”復合任務;- 在
config.json
中統一管理多個 MCP 服務器,體現模塊化設計優勢;- 展示了 AI Agent 調用多個外部工具完成復雜指令的能力。
- 工具標準化與分發
- 將 Python 編寫的 MCP 服務器封裝為 npm 包;
- 利用
uvx
和npx
實現跨語言運行,無需手動安裝依賴;- 成功發布至 npm registry,實現“一鍵下載、即用即走”的工具分發模式。
- 平臺集成驗證
- 在
Cherry Studio
中成功加載并運行 MCP 服務器;- 驗證了 AI 可自動識別工具、發起調用并整合結果;
- 構建了“用戶提問 → AI 決策 → 工具執行 → 返回結果”的完整閉環。
7. 項目價值與展望
方向 | 當前成果 | 未來展望 |
---|---|---|
模塊化架構 | MCP 實現功能解耦 | 支持熱插拔、動態加載 |
AI 工具生態 | 支持自定義工具 | 建立開源 MCP 工具市場 |
跨平臺分發 | npm + uvx 實現跨語言運行 | 推動 MCP 成為標準協議 |
本地化部署 | 完全本地運行,保護隱私 | 支持邊緣計算與離線模式 |
8. 結語
本項目不僅驗證了 MCP 架構在 AI Agent 中的強大擴展性,更探索了一條 “本地服務 → 標準協議 → 全球共享” 的工具開發與分發路徑。未來,隨著更多開發者加入 MCP 生態,我們將迎來一個真正開放、協作、智能化的 AI 工具時代。