1、sse、stdio、streamable-http使用
參考:https://gofastmcp.com/deployment/running-server#the-run-method
stdio本地使用;sse、streamable-http遠程調用(
Streamable HTTP—New in version: 2.3.0)
調用:
stdio、sse
streamable-http
2、 MCPConfig多服務器使用案例
參考:
https://gofastmcp.com/clients/client
代碼
config如果只有一個服務,那call_tool函數不用前綴直接工具函數名,如果多個服務,需要添加前綴
from fastmcp import Client# Standard MCP configuration with multiple servers
config = {"mcpServers": {"trip": {"url": "http://localhost:8000/sse"},"math":{"command": "python","args": ["./api_mcp_server_math.py"],"env": {"DEBUG": "true"}}}
}async def main():async with Client(config) as client:tools = await client.list_tools()print("Available tools:")for tool in tools: ###打印結果系統也自動加了前綴print(f" - {tool.name}: {tool.description}")hotel_response = await client.call_tool("trip_check_hotel", {"cityname": "廣州","date": "后天"})math_response = await client.call_tool("math_add", {"a": 1, "b": 2})print("Math response:", math_response)print("Hotel response:", hotel_response)if __name__ == "__main__":import asyncioasyncio.run(main())
for tool in tools: ###打印結果系統也自動加了前綴
print(f" - {tool.name}: {tool.description}")
大模型自動調用工具執行
模型自動調用工具,LLM 根據用戶輸入自動選擇合適的工具;使用百煉qwen3思考模型流式輸出
#!/usr/bin/python3
# -*- coding: utf-8 -*-import json
import asyncio
from typing import List, Dict, Optionalfrom openai import OpenAI
from fastmcp import Clientclass IntegratedMCPClient:def __init__(self, config: Dict, model="qwen3-235b-a22b"):"""初始化集成的 MCP 客戶端Args:config: MCP 服務器配置字典model: 使用的模型名稱"""self.config = configself.model = model# 初始化 OpenAI 客戶端self.client = OpenAI(api_key="sk-424971",base_url="https://dashscope.ale-mode/v1")# 初始化 MCP 客戶端self.session = Client(config)self.tools = []self.server_tool_mapping = {} # 記錄工具來自哪個服務器async def prepare_tools(self):"""準備所有可用的工具"""try:# 獲取所有工具tools = await self.session.list_tools()print(f"發現 {len(tools)} 個可用工具:")self.tools = []self.server_tool_mapping = {}for tool in tools:print(f" - {tool.name}: {tool.description}")# 構建工具描述tool_def = {"type": "function","function": {"name": tool.name,"description": tool.description,"parameters": tool.inputSchema,}}self.tools.append(tool_def)# 記錄工具映射(如果需要區分來源)# 這里可以通過工具名前綴或其他方式來識別來源服務器if tool.name.startswith("trip_"):self.server_tool_mapping[tool.name] = "trip"elif tool.name.startswith("math_"):self.server_tool_mapping[tool.name] = "math"else:self.server_tool_mapping[tool.name] = "unknown"except Exception as e:print(f"準備工具時出錯: {e}")self.tools = []async def chat(self, messages: List[Dict]) -> Optional[object]:"""處理聊天對話,支持工具調用"""if not self.tools:await self.prepare_tools()try:# 使用流式響應,啟用思考過程response = self.client.chat.completions.create(model=self.model,messages=messages,tools=self.tools if self.tools else None,temperature=0,max_tokens=16000,stream=True,logprobs=False,stream_options={"include_usage": True},extra_body={"enable_thinking": True # 啟用思考過程})# 收集流式響應collected_content = ""collected_tool_calls = []finish_reason = Noneprint("AI: ", end="", flush=True)for chunk in response:# 安全檢查:防止空的 choicesif not chunk.choices:continuedelta = chunk.choices[0].deltafinish_reason = chunk.choices[0].finish_reason# 打印思考過程(Qwen 特有字段)reasoning = getattr(delta, "reasoning_content", None)if reasoning:print(f"{reasoning}", end="", flush=True)# 打印常規內容if delta.content:print(delta.content, end="", flush=True)collected_content += delta.content# 收集工具調用if hasattr(delta, 'tool_calls') and delta.tool_calls:if not collected_tool_calls:collected_tool_calls = [{"id": "", "function": {"name": "", "arguments": ""}} for _ in delta.tool_calls]for i, tool_call_delta in enumerate(delta.tool_calls):if tool_call_delta.id:collected_tool_calls[i]["id"] = tool_call_delta.idif tool_call_delta.function:if tool_call_delta.function.name:collected_tool_calls[i]["function"]["name"] = tool_call_delta.function.nameif tool_call_delta.function.arguments:collected_tool_calls[i]["function"]["arguments"] += tool_call_delta.function.argumentsprint() # 換行# 如果沒有工具調用,返回收集的內容if finish_reason != 'tool_calls' or not collected_tool_calls:class SimpleMessage:def __init__(self, content):self.content = contentreturn SimpleMessage(collected_content)# 處理工具調用print("\n[正在調用工具...]")assistant_message = {'role': 'assistant','content': collected_content,'tool_calls': [{"id": tc["id"],"type": "function","function": {"name": tc["function"]["name"],"arguments": tc["function"]["arguments"]}}for tc in collected_tool_calls]}print(f"[助手消息]: 調用了 {len(collected_tool_calls)} 個工具")messages.append(assistant_message)# 執行每個工具調用for tool_call in collected_tool_calls:try:tool_name = tool_call['function']['name']tool_args = json.loads(tool_call['function']['arguments'])server_name = self.server_tool_mapping.get(tool_name, "unknown")print(f"\n[執行工具]: {tool_name} (來自服務器: {server_name})")print(f"[工具參數]: {tool_args}")# 調用工具result = await self.session.call_tool(tool_name, tool_args)# 處理結果result_content = ""if result:if isinstance(result, list) and len(result) > 0:if hasattr(result[0], 'text'):result_content = result[0].textelse:result_content = str(result[0])else:result_content = str(result)else:result_content = "No result"# 添加工具結果到消息中messages.append({'role': 'tool','tool_call_id': tool_call['id'],'content': result_content})print(f"[工具結果]: {result_content}")except Exception as e:print(f"[工具調用錯誤]: {e}")messages.append({'role': 'tool','tool_call_id': tool_call['id'],'content': f"Error executing tool: {str(e)}"})# 使用工具結果獲取最終響應print("\n[生成最終回答...]")return await self.chat(messages)except Exception as e:print(f"聊天過程中出錯: {e}")class ErrorMessage:def __init__(self, content):self.content = contentreturn ErrorMessage(f"抱歉,處理您的請求時出現錯誤: {str(e)}")async def loop(self):"""交互式聊天循環"""print("=== 集成 MCP 客戶端啟動 ===")print("支持的命令:")print(" - quit/exit/bye: 退出程序")print(" - tools: 顯示可用工具")print(" - clear: 清除對話歷史")print("=====================================\n")conversation_history = []while True:try:async with self.session:# 如果是第一次進入,準備工具if not self.tools:await self.prepare_tools()question = input("\nUser: ").strip()# 處理特殊命令if question.lower() in ['quit', 'exit', 'bye']:print("再見!")breakelif question.lower() == 'tools':print(f"\n可用工具 ({len(self.tools)} 個):")for tool in self.tools:func = tool['function']server = self.server_tool_mapping.get(func['name'], 'unknown')print(f" - {func['name']} (服務器: {server})")print(f" 描述: {func['description']}")continueelif question.lower() == 'clear':conversation_history = []print("對話歷史已清除")continueelif not question:continue# 添加用戶消息到歷史user_message = {"role": "user", "content": question}current_messages = conversation_history + [user_message]# 獲取響應response = await self.chat(current_messages.copy())if response and hasattr(response, 'content'):print(f"\n回答: {response.content}")# 更新對話歷史conversation_history.append(user_message)conversation_history.append({"role": "assistant", "content": response.content})# 限制歷史長度,避免上下文過長if len(conversation_history) > 20: # 保留最近10輪對話conversation_history = conversation_history[-20:]except KeyboardInterrupt:print("\n\n程序被用戶中斷")breakexcept Exception as e:print(f"循環過程中出錯: {e}")continueasync def single_query(self, query: str) -> str:"""單次查詢接口,適用于非交互式使用"""try:async with self.session:if not self.tools:await self.prepare_tools()messages = [{"role": "user", "content": query}]response = await self.chat(messages)return response.content if response and hasattr(response, 'content') else "無響應"except Exception as e:return f"查詢失敗: {str(e)}"async def main():"""主函數"""# MCP 配置config = {"mcpServers": {"trip": {"url": "http://localhost:8000/sse"},"math": {"command": "python","args": ["./api_mcp_server_math.py"],"env": {"DEBUG": "true"}}}}print("正在初始化集成 MCP 客戶端...")mcp_client = IntegratedMCPClient(config)# 啟動交互式循環await mcp_client.loop()async def test_single_queries():"""測試單次查詢的示例"""config = {"mcpServers": {"trip": {"url": "http://localhost:8000/sse"},"math": {"command": "python", "args": ["./api_mcp_server_math.py"],"env": {"DEBUG": "true"}}}}mcp_client = IntegratedMCPClient(config)# 測試查詢test_queries = ["幫我查詢廣州后天的酒店信息","計算 15 加 27 等于多少","我想知道北京明天有什么好的酒店推薦","計算 100 乘以 50","你好,請介紹一下你的功能"]for query in test_queries:print(f"\n{'='*50}")print(f"測試查詢: {query}")print(f"{'='*50}")result = await mcp_client.single_query(query)print(f"結果: {result}")if __name__ == '__main__':# 運行交互式模式asyncio.run(main())# 或者運行測試模式# asyncio.run(test_single_queries())