【LLM】MCP(Python):實現 stdio 通信的Client與Server

本文將詳細介紹如何使用 Model Context Protocol (MCP) 在 Python 中實現基于 STDIO 通信的 Client 與 Server。MCP 是一個開放協議,它使 LLM 應用與外部數據源和工具之間的無縫集成成為可能。無論你是構建 AI 驅動的 IDE、改善 chat 交互,還是構建自定義的 AI 工作流,MCP 提供了一種標準化的方式,將 LLM 與它們所需的上下文連接起來。部分靈感來源:Se7en。

提示
在開始前,請確保你已經安裝了必要的依賴包:

pip install openai mcp

本文中,我們將介紹如何配置環境、編寫 MCP Server 以及實現 MCP Client。


環境配置

在使用 MCP 之前,需要先配置相關環境變量,以便 Client 與 Server 都能正確加載所需的參數。你可以在項目根目錄下創建一個 .env 文件,并寫入以下內容:

此外,創建一個 .env 文件來存儲您的配置:

MODEL_NAME=deepseek-chat
BASE_URL=https://api.deepseek.com/v1
API_KEY=your_api_key_here

上述配置中,MODEL_NAME 表示使用的 OpenAI 模型名稱(例如 “deepseek-chat”),BASE_URL 指向 OpenAI API 的基礎地址,而 API_KEY 則為訪問 API 所需的密鑰。


書寫 Server 的規范

構建 MCP Server(特別是基于 stdio 通信)時,推薦遵循統一規范,提升可讀性、可維護性與復用性

  • 服務命名統一
    使用 MCP_SERVER_NAME 作為唯一名稱,貫穿日志、初始化等環節。
  • 日志配置清晰
    統一使用 logging 模塊,推薦 INFO 級別,便于調試和追蹤。
  • 工具注冊規范
    通過 @mcp.tool() 裝飾器注冊工具函數,要求:
    • 命名清晰
    • 參數有類型注解
    • 注釋說明參數與返回值(推薦中文)
    • 加入邊界檢查或異常處理
  • 使用標準 stdio 啟動方式
    通過 async with stdio_server() 獲取輸入輸出流,統一調用 _mcp_server.run(...) 啟動服務。
  • 初始化選項規范
    使用 InitializationOptions 設置服務名、版本及能力聲明(通常由 FastMCP 提供)。
  • 通用模板

    import asyncio
    import logging
    from mcp.server.fastmcp import FastMCP
    from mcp.server import InitializationOptions, NotificationOptions
    from mcp.server.stdio import stdio_server  # STDIO 通信方式# 定義唯一服務名稱
    MCP_SERVER_NAME = "your-stdio-server-name"# 配置日志輸出
    logging.basicConfig(level=logging.INFO,format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    )
    logger = logging.getLogger(MCP_SERVER_NAME)# 創建 FastMCP 實例
    mcp = FastMCP(MCP_SERVER_NAME)# 定義工具
    @mcp.tool()
    def your_tool_name(param1: type, param2: type) -> return_type:"""工具描述。參數:- param1 (type): 描述- param2 (type): 描述返回:- return_type: 描述"""# 工具實現pass# 啟動 MCP Server 主函數
    async def main():# 創建 stdio 通信通道async with stdio_server() as (read_stream, write_stream):# 構建初始化選項init_options = InitializationOptions(server_name=MCP_SERVER_NAME,server_version="1.0.0",capabilities=mcp._mcp_server.get_capabilities(notification_options=NotificationOptions(),experimental_capabilities={}))logger.info("MCP Server 以 STDIO 模式啟動中...")# 啟動 Serverawait mcp._mcp_server.run(read_stream, write_stream, init_options)# 主程序入口
    if __name__ == "__main__":asyncio.run(main())
    

編寫 MCP Server

MCP Server 的實現主要基于標準輸入輸出(STDIO)進行通信。服務端通過注冊工具,向外界提供如加法、減法、乘法以及除法等計算功能。下面簡述服務端的主要實現步驟:

  1. 初始化 FastMCP 實例
    服務端首先創建一個 FastMCP 實例,并為其命名(例如 “math-stdio-server”)。
  2. 工具注冊
    使用裝飾器的方式注冊加法、減法、乘法、除法等工具,每個工具均包含詳細的參數說明和返回值說明。
  3. 日志配置
    通過 Python 標準日志模塊對服務端進行日志配置,以便記錄服務運行狀態和錯誤信息。
  4. 建立 STDIO 通信
    使用 stdio_server() 函數建立基于 STDIO 的通信,并構造初始化選項,包含服務器名稱、版本以及能力說明。隨后,調用 MCP 內部的服務啟動函數開始監聽和處理來自 Client 的請求。
import asyncio
import logging
from mcp.server.fastmcp import FastMCP
from mcp.server import InitializationOptions, NotificationOptions
from mcp.server.stdio import stdio_server  # 直接導入 stdio_server 函數# 定義服務器名稱
MCP_SERVER_NAME = "math-stdio-server"# 配置日志
logging.basicConfig(level=logging.INFO,format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(MCP_SERVER_NAME)# 初始化 FastMCP 實例
mcp = FastMCP(MCP_SERVER_NAME)# 注冊加法工具
@mcp.tool()
def add(a: float, b: float) -> float:"""加法工具參數:- a (float): 第一個數字(必填)- b (float): 第二個數字(必填)返回:- float: a 與 b 的和"""return a + b# 注冊減法工具
@mcp.tool()
def subtract(a: float, b: float) -> float:"""減法工具參數:- a (float): 被減數(必填)- b (float): 減數(必填)返回:- float: a 與 b 的差"""return a - b# 注冊乘法工具
@mcp.tool()
def multiply(a: float, b: float) -> float:"""乘法工具參數:- a (float): 第一個數字(必填)- b (float): 第二個數字(必填)返回:- float: a 與 b 的積"""return a * b# 注冊除法工具
@mcp.tool()
def divide(a: float, b: float) -> float:"""除法工具參數:- a (float): 分子(必填)- b (float): 分母(必填,且不能為零)返回:- float: a 與 b 的商"""if b == 0:raise ValueError("除數不能為零")return a / basync def main():# 使用 stdio_server 建立 STDIO 通信async with stdio_server() as (read_stream, write_stream):# 構造初始化選項init_options = InitializationOptions(server_name=MCP_SERVER_NAME,server_version="1.0.0",capabilities=mcp._mcp_server.get_capabilities(notification_options=NotificationOptions(),experimental_capabilities={}))logger.info("通過 STDIO 模式啟動 MCP Server ...")# 使用內部的 _mcp_server 運行服務await mcp._mcp_server.run(read_stream, write_stream, init_options)if __name__ == "__main__":asyncio.run(main())

編寫 MCP Client

MCP Client 主要實現與多個基于 STDIO 的服務器建立連接,并通過 OpenAI API 對用戶的自然語言查詢進行處理,調用相應工具獲得最終結果。客戶端的主要邏輯可以分為以下部分:

  1. 初始化客戶端
    在 MCPClient 類的構造函數中,傳入所需的模型名稱、OpenAI API 基礎地址、API 密鑰以及包含服務端腳本路徑的列表。客戶端將使用這些參數初始化 OpenAI 異步客戶端,同時準備一個 AsyncExitStack 來管理所有異步上下文。
  2. 建立多個 STDIO 連接
    通過遍歷服務器腳本列表,為每個腳本生成唯一標識符(如 server0server1 等),然后依次調用 stdio_client 函數建立連接,并通過 ClientSession 完成初始化。在連接成功后,從每個服務器獲取可用工具列表,并將工具名稱加上前綴(例如 server0_add)保存到映射表中,避免工具名稱沖突。
  3. 處理用戶查詢
    process_query 方法中,客戶端首先根據用戶的輸入構造消息,然后匯總所有連接服務器提供的工具,傳遞給 OpenAI API 進行處理。當 API 返回調用工具的請求時,客戶端根據工具名稱找到對應服務器會話,并執行相應的工具調用,收集返回結果后再交由 API 生成后續回復,直至所有工具調用處理完成。
  4. 交互式對話循環
    客戶端提供一個簡單的命令行交互循環,用戶輸入查詢后,調用 process_query 方法獲取最終回復,并打印在終端上。如果用戶輸入 quit 或使用 Ctrl+C 中斷,則客戶端將平滑退出并釋放所有資源。
  5. 資源清理
    最后,在退出前,通過 AsyncExitStack 統一關閉所有連接,確保資源不會泄露。
import asyncio
import json
import os
import sys
from typing import List
from contextlib import AsyncExitStackfrom mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from openai import AsyncOpenAIclass MCPClient:def __init__(self, model_name: str, base_url: str, api_key: str, server_scripts: List[str]):"""初始化 MCP 客戶端,支持多個 stdio 服務器。:param model_name: OpenAI 模型名稱,例如 "deepseek-chat"。:param base_url: OpenAI API 基礎地址,例如 "https://api.deepseek.com/v1"。:param api_key: OpenAI API 密鑰。:param server_scripts: stdio 服務腳本路徑列表。"""self.model_name = model_nameself.base_url = base_urlself.api_key = api_keyself.server_scripts = server_scriptsself.sessions = {}         # server_id -> (session, session_ctx, stdio_ctx)self.tool_mapping = {}     # 帶前綴的工具名 -> (session, 原始工具名)self.client = AsyncOpenAI(base_url=base_url, api_key=api_key)self.exit_stack = AsyncExitStack()async def initialize_sessions(self):"""初始化所有 stdio 服務器連接,并收集工具映射。"""for i, script in enumerate(self.server_scripts):if not (os.path.exists(script) and script.endswith(".py")):print(f"腳本 {script} 不存在或不是 .py 文件,跳過。")continueserver_id = f"server{i}"params = StdioServerParameters(command="python", args=[script], env=None)try:stdio_ctx = stdio_client(params)stdio = await self.exit_stack.enter_async_context(stdio_ctx)session_ctx = ClientSession(*stdio)session = await self.exit_stack.enter_async_context(session_ctx)await session.initialize()self.sessions[server_id] = (session, session_ctx, stdio_ctx)response = await session.list_tools()for tool in response.tools:self.tool_mapping[f"{server_id}_{tool.name}"] = (session, tool.name)print(f"已連接到 {script},工具:{[tool.name for tool in response.tools]}")except Exception as e:print(f"連接 {script} 失敗:{e}")async def cleanup(self):"""釋放所有資源。"""try:await self.exit_stack.aclose()print("所有連接資源已釋放")except asyncio.CancelledError:passexcept Exception as e:print(f"清理資源時異常:{e}")async def _gather_available_tools(self):"""匯總所有服務器的工具列表。"""tools = []for server_id, (session, _, _) in self.sessions.items():response = await session.list_tools()for tool in response.tools:tools.append({"type": "function","function": {"name": f"{server_id}_{tool.name}","description": tool.description,"parameters": tool.inputSchema,}})return toolsasync def process_query(self, query: str) -> str:"""處理查詢,調用 OpenAI API 和相應工具后返回結果。"""messages = [{"role": "user", "content": query}]available_tools = await self._gather_available_tools()try:response = await self.client.chat.completions.create(model=self.model_name, messages=messages, tools=available_tools)except Exception as e:return f"調用 OpenAI API 失敗:{e}"final_text = [response.choices[0].message.content or ""]message = response.choices[0].message# 當有工具調用時循環處理while message.tool_calls:for call in message.tool_calls:tool_name = call.function.nameif tool_name not in self.tool_mapping:final_text.append(f"未找到工具:{tool_name}")continuesession, original_tool = self.tool_mapping[tool_name]tool_args = json.loads(call.function.arguments)try:result = await session.call_tool(original_tool, tool_args)final_text.append(f"[調用 {tool_name} 參數: {tool_args}]")final_text.append(f"工具結果: {result.content}")except Exception as e:final_text.append(f"調用 {tool_name} 出錯:{e}")continuemessages += [{"role": "assistant", "tool_calls": [{"id": call.id,"type": "function","function": {"name": tool_name, "arguments": json.dumps(tool_args)}}]},{"role": "tool", "tool_call_id": call.id, "content": str(result.content)}]try:response = await self.client.chat.completions.create(model=self.model_name, messages=messages, tools=available_tools)except Exception as e:final_text.append(f"調用 OpenAI API 失敗:{e}")breakmessage = response.choices[0].messageif message.content:final_text.append(message.content)return "\n".join(final_text)async def chat_loop(self):"""交互式對話循環,捕獲中斷平滑退出。"""print("MCP 客戶端已啟動,輸入問題,輸入 'quit' 退出。")while True:try:query = input("問題: ").strip()if query.lower() == "quit":breakresult = await self.process_query(query)print("\n" + result)except KeyboardInterrupt:print("\n檢測到中斷信號,退出。")breakexcept Exception as e:print(f"發生錯誤:{e}")async def main():model_name = os.getenv("MODEL_NAME", "deepseek-chat")base_url = os.getenv("BASE_URL", "https://api.deepseek.com/v1")api_key = os.getenv("API_KEY")if not api_key:print("未設置 API_KEY 環境變量")sys.exit(1)# 示例:使用兩個 stdio 腳本server_scripts = ["server.py"]client = MCPClient(model_name, base_url, api_key, server_scripts)try:await client.initialize_sessions()await client.chat_loop()except KeyboardInterrupt:print("\n收到中斷信號")finally:await client.cleanup()if __name__ == "__main__":try:asyncio.run(main())except KeyboardInterrupt:print("程序已終止。")

總結

通過本文的介紹,我們了解了如何使用 MCP 協議在 Python 中構建基于 STDIO 通信的 Client 與 Server。服務端通過注冊多個工具為外部應用提供計算能力,而客戶端則利用 OpenAI API 和工具調用的方式,將自然語言查詢轉化為對具體工具的調用,最終將結果反饋給用戶。

這種基于 STDIO 的通信方式不僅簡化了服務端與客戶端之間的連接,還能方便地支持多服務器同時運行,為構建靈活高效的 LLM 應用提供了堅實的基礎。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/74693.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/74693.shtml
英文地址,請注明出處:http://en.pswp.cn/web/74693.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Docker 安裝 Elasticsearch 教程

目錄 一、安裝 Elasticsearch 二、安裝 Kibana 三、安裝 IK 分詞器 四、Elasticsearch 常用配置 五、Elasticsearch 常用命令 一、安裝 Elasticsearch (一)創建 Docker 網絡 因為后續還需要部署 Kibana 容器,所以需要讓 Elasticsearch…

Swagger @ApiOperation

ApiOperation 注解并非 Spring Boot 自帶的注解,而是來自 Swagger 框架,Swagger 是一個規范且完整的框架,用于生成、描述、調用和可視化 RESTful 風格的 Web 服務,而 ApiOperation 主要用于為 API 接口的操作添加描述信息。以下為…

【奇點時刻】GPT4o新圖像生成模型底層原理深度洞察報告(篇2)

由于上一篇解析深度不足,經過查看學習相關論文,以下是一份對 GPT-4o 最新的圖像生成模型 的深度梳理與洞察,從模型原理到社區解讀、對比傳統擴散模型,再到對未來趨勢的分析。為了便于閱讀,整理成以下七個部分&#xff…

C# 窗體應用(.FET Framework ) 打開文件操作

一、 打開文件或文件夾加載數據 1. 定義一個列表用來接收路徑 public List<string> paths new List<string>();2. 打開文件選擇一個文件并將文件放入列表中 OpenFileDialog open new OpenFileDialog(); // 過濾 open.Filter "(*.jpg;*.jpge;*.bmp;*.png…

Scala 面向對象編程總結

???抽象屬性和抽象方法 基本語法 定義抽象類&#xff1a;abstract class Person{} //通過 abstract 關鍵字標記抽象類定義抽象屬性&#xff1a;val|var name:String //一個屬性沒有初始化&#xff0c;就是抽象屬性定義抽象方法&#xff1a;def hello():String //只聲明而沒…

人工智能賦能工業制造:智能制造的未來之路

一、引言 隨著人工智能技術的飛速發展&#xff0c;其應用場景不斷拓展&#xff0c;從消費電子到醫療健康&#xff0c;從金融科技到交通運輸&#xff0c;幾乎涵蓋了所有行業。而工業制造作為國民經濟的支柱產業&#xff0c;也在人工智能的浪潮中迎來了深刻的變革。智能制造&…

元宇宙概念下,UI 設計如何打造沉浸式體驗?

一、元宇宙時代UI設計的核心趨勢 在元宇宙概念下&#xff0c;UI設計的核心目標是打造沉浸式體驗&#xff0c;讓用戶在虛擬世界中感受到身臨其境的交互效果。以下是元宇宙時代UI設計的幾個核心趨勢&#xff1a; 沉浸式體驗設計 元宇宙的核心是提供沉浸式體驗&#xff0c;UI設計…

AI 如何幫助我們提升自己,不被替代

在當今快速發展的時代&#xff0c;人工智能&#xff08;AI&#xff09;正逐漸滲透到生活的方方面面。許多人擔心 AI 會取代人類的工作&#xff0c;然而&#xff0c;AI 更多的是作為一種強大的賦能工具&#xff0c;幫助我們提升自身能力&#xff0c;讓我們在工作中更具競爭力。以…

基于SpringBoot+Vue實現的二手交易市場平臺功能一

一、前言介紹&#xff1a; 1.1 項目摘要 隨著社會的發展和人們生活水平的提高&#xff0c;消費者購買能力的提升導致產生了大量的閑置物品&#xff0c;這些閑置物品具有一定的經濟價值。特別是在高校環境中&#xff0c;學生群體作為一個具有一定消費水平的群體&#xff0c;每…

k8s安裝cri驅動創建storageclass動態類

部署nfs服務器 #所有k8s節點安裝nfs客戶端 yum install -y nfs-utils mkdir -p /nfs/share echo "/nfs/share *(rw,sync,no_root_squash)" >> /etc/exports systemctl enable --now nfs-serverhelm部署nfs的provisioner&sc 所有k8s節點安裝客戶端 yu…

SpringBoot + Netty + Vue + WebSocket實現在線聊天

最近想學學WebSocket做一個實時通訊的練手項目 主要用到的技術棧是WebSocket Netty Vue Pinia MySQL SpringBoot&#xff0c;實現一個持久化數據&#xff0c;單一群聊&#xff0c;支持多用戶的聊天界面 下面是實現的過程 后端 SpringBoot啟動的時候會占用一個端口&#xff…

大數據Spark(五十七):Spark運行架構與MapReduce區別

文章目錄 Spark運行架構與MapReduce區別 一、Spark運行架構 二、Spark與MapReduce區別 Spark運行架構與MapReduce區別 一、Spark運行架構 Master:Spark集群中資源管理主節點&#xff0c;負責管理Worker節點。Worker:Spark集群中資源管理的從節點&#xff0c;負責任務的運行…

【爬蟲】網頁抓包工具--Fiddler

網頁抓包工具對比&#xff1a;Fiddler與Sniff Master Fiddler基礎知識 Fiddler是一款強大的抓包工具&#xff0c;它的工作原理是作為web代理服務器運行&#xff0c;默認代理地址是127.0.0.1&#xff0c;端口8888。代理服務器位于客戶端和服務器之間&#xff0c;攔截所有HTTP/…

Redis:集群

為什么要有集群&#xff1f; Redis 集群&#xff08;Redis Cluster&#xff09;是 Redis 官方提供的分布式解決方案&#xff0c;用于解決單機 Redis 在數據容量、并發處理能力和高可用性上的局限。通過 Redis 集群&#xff0c;可以實現數據分片、故障轉移和高可用性&#xff0…

【2012】【論文筆記】太赫茲波在非磁化等離子體——

前言 類型 太赫茲 + 等離子體 太赫茲 + 等離子體 太赫茲+等離子體 期刊 物理學報 物理學報 物理學報 作者

Linux字符驅動設備開發入門之框架搭建

聲明 本博客所記錄的關于正點原子i.MX6ULL開發板的學習筆記&#xff0c;&#xff08;內容參照正點原子I.MX6U嵌入式linux驅動開發指南&#xff0c;可在正點原子官方獲取正點原子Linux開發板 — 正點原子資料下載中心 1.0.0 文檔&#xff09;&#xff0c;旨在如實記錄我在學校學…

小剛說C語言刷題——第15講 多分支結構

1.多分支結構 所謂多分支結構是指在選擇的時候有多種選擇。根據條件滿足哪個分支&#xff0c;就走對應分支的語句。 2.語法格式 if(條件1) 語句1; else if(條件2) 語句2; else if(條件3) 語句3; ....... else 語句n; 3.示例代碼 從鍵盤輸入三條邊的長度&#xff0c;…

Apache httpclient okhttp(1)

學習鏈接 Apache httpclient & okhttp&#xff08;1&#xff09; Apache httpclient & okhttp&#xff08;2&#xff09; httpcomponents-client github apache httpclient文檔 apache httpclient文檔詳細使用 log4j日志官方文檔 【Java基礎】- HttpURLConnection…

洛谷題單3-P1420 最長連號-python-流程圖重構

題目描述 輸入長度為 n n n 的一個正整數序列&#xff0c;要求輸出序列中最長連號的長度。 連號指在序列中&#xff0c;從小到大的連續自然數。 輸入格式 第一行&#xff0c;一個整數 n n n。 第二行&#xff0c; n n n 個整數 a i a_i ai?&#xff0c;之間用空格隔開…

使用binance-connector庫獲取Binance全市場的幣種價格,然后選擇一個幣種進行下單

一個完整的示例,展示如何使用 api 獲取Binance全市場的幣種價格,然后選擇一個最便宜的幣種進行下單操作 代碼經過修改,親測可用,目前只可用于現貨,合約的待開發 獲取市場價格:使用client.ticker_price()獲取所有交易對的當前價格 賬戶檢查:獲取賬戶余額,確保有足夠的資…