說在前面
上一篇文章發布了一個mcp-server
,具體的server
是否能被正確的訪問到?是否能夠得到正常的返回? 在github
上找到一個客戶端的代碼實現,我把里面的大模型調用換成了支持國內大模型的方式,一起來驗證一下吧~
主要功能
- 連接
mcp-server
- 獲取
mcp
工具列表 - 調用大模型明確需要調用的方法以及參數
- 執行工具獲取返回值
- 調用大模型進行問題總結
- 詳盡的日志信息,幫助你更好的了解整個過程
一些說明
關于大模型的選擇
文章里用的是open-ai sdk
,但是因為萬能的阿里云連接國際,所以阿里百煉的api-key
也是通用的。百煉給新用戶都是有免費的大模型調用額度的,放心使用。
大模型的使用
可以參考阿里百煉的api說明,里面有詳細的參數,有興趣的可以自行拼接嘗試。
代碼
import asyncio
import json
import os
import sys
from typing import Optional, List, Dict
from contextlib import AsyncExitStackfrom mcp import ClientSession
from mcp.client.sse import sse_clientfrom openai import OpenAI
from dotenv import load_dotenvload_dotenv() # load environment variables from .envclass MCPClient:def __init__(self):# Initialize session and client objects# 表示對象可以是None,也可以是ClientSession類型self.session: Optional[ClientSession] = Noneself.exit_stack = AsyncExitStack()self.openai = OpenAI(api_key="your key your key your key",base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",)async def connect_to_sse_server(self, server_url: str):"""Connect to an MCP server running with SSE transport"""# Store the context managers so they stay aliveself._streams_context = sse_client(url=server_url)streams = await self._streams_context.__aenter__()# * 表示將streams解包,將其作為參數傳遞給ClientSession,假設streams是一個元組,# 那么*streams就等價于ClientSession(stream1, stream2, stream3)self._session_context = ClientSession(*streams)self.session: ClientSession = await self._session_context.__aenter__()# Initializeawait self.session.initialize()# List available tools to verify connectionprint("Initialized SSE client...")print("Listing tools...")response = await self.session.list_tools()tools = response.toolsprint("工具列表:", json.dumps(response.model_dump(), ensure_ascii=False, indent=2))print("\nConnected to server with tools:", [tool.name for tool in tools])async def cleanup(self):"""Properly clean up the session and streams"""if self._session_context:await self._session_context.__aexit__(None, None, None)if self._streams_context:await self._streams_context.__aexit__(None, None, None)async def process_query(self, query: str) -> str:"""Process a query using OpenAI and available tools"""messages = [{"role": "user","content": query}]response = await self.session.list_tools()# 轉換工具格式以適應OpenAI的函數調用要求available_tools: List[Dict] = [{ "type": "function","function": {"name": tool.name,"description": tool.description,"parameters": tool.inputSchema # 假設inputSchema符合OpenAI的參數格式要求}} for tool in response.tools]print(f"大模型調用詳情: {json.dumps(messages, ensure_ascii=False, indent=2)}")# 初始OpenAI API調用response = self.openai.chat.completions.create(model="qwen3-32b", # 使用OpenAI模型max_tokens=1000,messages=messages,tools=available_tools,tool_choice="auto", # 自動決定是否調用工具extra_body={"enable_thinking": False,})# 打印大模型決策過程print("\n===== 大模型工具調用決策 =====")print(f"原始響應: {json.dumps(response.model_dump(), ensure_ascii=False, indent=2)}")# 處理響應和工具調用tool_results = []final_text = []response_message = response.choices[0].messagetool_calls = response_message.tool_callsprint(f"是否調用工具: {'是' if tool_calls else '否'}")# 打印工具調用詳情if tool_calls:print(f"調用工具數量: {len(tool_calls)}")for i, call in enumerate(tool_calls):print(f"工具 {i+1}: {call.function.name}, 參數: {call.function.arguments}")# 如果有工具調用if tool_calls:for tool_call in tool_calls:tool_name = tool_call.function.nametool_args = json.loads(tool_call.function.arguments)# 執行工具調用print(f"\n===== 調用工具 '{tool_name}' =====")print(f"參數: {json.dumps(tool_args, ensure_ascii=False, indent=2)}")result = await self.session.call_tool(tool_name, tool_args)print(f"返回結果: {json.dumps(str(result.content), ensure_ascii=False, indent=2)}")tool_results.append({"call": tool_name, "result": result})final_text.append(f"[Calling tool {tool_name} with args {tool_args}]")# 將工具調用結果添加到對話歷史messages.append({"role": "assistant","content": None,"tool_calls": [tool_call.model_dump()]})messages.append({"role": "tool","tool_call_id": tool_call.id,"content": str(result.content)})print(f"大模型調用詳情: {json.dumps(messages, ensure_ascii=False, indent=2)}")# 獲取OpenAI的下一次響應response = self.openai.chat.completions.create(model="qwen3-32b",max_tokens=1000,messages=messages,extra_body={"enable_thinking": False,})final_response = response.choices[0].message.contentif final_response:final_text.append(final_response)else:# 沒有工具調用,直接使用響應內容if response_message.content:final_text.append(response_message.content)return "\n".join(final_text)async def chat_loop(self):"""Run an interactive chat loop"""print("\nMCP Client Started!")print("Type your queries or 'quit' to exit.")while True:try:query = input("\nQuery: ").strip()if query.lower() == 'quit':breakresponse = await self.process_query(query)print("\n" + response)except Exception as e:print(f"\nError: {str(e)}")async def main():# if len(sys.argv) < 2:# print("Usage: uv run client.py <URL of SSE MCP server (i.e. http://localhost:8080/sse)>")# sys.exit(1)client = MCPClient()try:# await client.connect_to_sse_server(server_url=sys.argv[1])await client.connect_to_sse_server(server_url="http://localhost:8080/sse")await client.chat_loop()finally:print("Cleaning up...")# await client.cleanup()if __name__ == "__main__":asyncio.run(main())
說到最后
以上。