吳恩達MCP課程(4):connect_server_mcp_chatbot

目錄

    • 完整代碼
    • 代碼解釋
      • 1. 導入和初始化
      • 2. 類型定義
      • 3. MCP_ChatBot 類初始化
      • 4. 查詢處理 (process_query)
      • 5. 服務器連接管理
      • 6. 核心特性總結
    • 示例

完整代碼

原課程代碼是用Anthropic寫的,下面代碼是用OpenAI改寫的,模型則用阿里巴巴的模型做測試
.env 文件為:

OPENAI_API_KEY=sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
OPENAI_API_BASE=https://dashscope.aliyuncs.com/compatible-mode/v1

另外,課程代碼只是單輪對話,下面代碼修改為多輪對話,更適合千問模型的調用方式

from dotenv import load_dotenv
import openai
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from typing import List,TypedDict
from contextlib import AsyncExitStack
from typing import Dict
import asyncio
import json
import osload_dotenv()class ToolDefinition(TypedDict):name: strdescription: strinput_schema: dictclass MCP_ChatBot:def __init__(self):# Initialize session and client objectsself.sessions: List[ClientSession] = []  # newself.exit_stack = AsyncExitStack()self.client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"),base_url=os.getenv("OPENAI_API_BASE"))self.available_tools: List[ToolDefinition] = []  # newself.tool_to_session: Dict[str, ClientSession] = {}self.messages = []async def process_query(self, query):self.messages.append({'role':'user', 'content':query})response = self.client.chat.completions.create(model='qwen-turbo',# max_tokens=2024,tools=self.available_tools,messages=self.messages )process_query = Truewhile process_query:# 獲取助手的回復message = response.choices[0].message# 檢查是否有普通文本內容if message.content:print(message.content)process_query = False# 檢查是否有工具調用elif message.tool_calls:# 添加助手消息到歷史self.messages.append({"role": "assistant", "content": None,"tool_calls": message.tool_calls})# 處理每個工具調用for tool_call in message.tool_calls:tool_id = tool_call.idtool_name = tool_call.function.nametool_args = json.loads(tool_call.function.arguments)print(f"Calling tool {tool_name} with args {tool_args}")# 執行工具調用session = self.tool_to_session[tool_name]result = await session.call_tool(tool_name, arguments=tool_args)# 添加工具結果到消息歷史self.messages.append({"role": "tool","tool_call_id": tool_id,"content": result.content})# 獲取下一個回復response = self.client.chat.completions.create(model='qwen-turbo',# max_tokens=2024,tools=self.available_tools,messages=self.messages )self.messages.append({"role": "assistant", "content": response.choices[0].message.content})# 如果只有文本回復,則結束處理if response.choices[0].message.content and not response.choices[0].message.tool_calls:print(response.choices[0].message.content)process_query = Falseasync def chat_loop(self):"""Run an interactive chat loop"""print("\nMCP Chatbot Started!")print("Type your queries or 'quit' to exit.")while True:try:query = input("\nQuery: ").strip()if query.lower() == 'quit':breakawait self.process_query(query)print("\n")except Exception as e:print(f"\nError: {str(e)}")async def connect_to_server(self, server_name: str, server_config: dict) -> None:"""Connect to a single MCP server."""try:server_params = StdioServerParameters(**server_config)stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))read, write = stdio_transportsession = await self.exit_stack.enter_async_context(ClientSession(read, write))await session.initialize()self.sessions.append(session)# List available tools for this sessionresponse = await session.list_tools()tools = response.toolsprint(f"\nConnected to {server_name} with tools:", [t.name for t in tools])for tool in tools:  # newself.tool_to_session[tool.name] = sessionself.available_tools.append({"type": "function","function": {"name": tool.name,"description": tool.description,"parameters": tool.inputSchema}})except Exception as e:print(f"Failed to connect to {server_name}: {e}")async def connect_to_servers(self):  # new"""Connect to all configured MCP servers."""try:with open("server_config.json", "r") as file:data = json.load(file)servers = data.get("mcpServers", {})for server_name, server_config in servers.items():await self.connect_to_server(server_name, server_config)except Exception as e:print(f"Error loading server configuration: {e}")raise           async def clenup(self):await self.exit_stack.aclose()async def main():chatbot = MCP_ChatBot()try:await chatbot.connect_to_servers()await chatbot.chat_loop()finally:await chatbot.clenup()if __name__ == "__main__":asyncio.run(main())"""
1、Fetch the content of this website: https://modelcontextprotocol.io/docs/concepts/architecture. 
2、save the content in the file "mcp_summary.md"
"""

代碼解釋

1. 導入和初始化

from dotenv import load_dotenv
import openai
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from typing import List,TypedDict
from contextlib import AsyncExitStack
from typing import Dict
import asyncio
import json
import osload_dotenv()
  • 導入必要的庫,包括OpenAI客戶端、MCP協議相關模塊、異步處理模塊等
  • load_dotenv() 加載環境變量配置

2. 類型定義

class ToolDefinition(TypedDict):name: strdescription: strinput_schema: dict

定義工具的類型結構,用于類型提示。

3. MCP_ChatBot 類初始化

class MCP_ChatBot:def __init__(self):self.sessions: List[ClientSession] = []  # 存儲多個MCP會話self.exit_stack = AsyncExitStack()  # 管理異步資源self.client = openai.OpenAI(  # OpenAI客戶端api_key=os.getenv("OPENAI_API_KEY"),base_url=os.getenv("OPENAI_API_BASE"))self.available_tools: List[ToolDefinition] = []  # 可用工具列表self.tool_to_session: Dict[str, ClientSession] = {}  # 工具名到會話的映射self.messages = []  # 對話歷史

關鍵特性:

  • 多會話支持sessions 列表存儲多個MCP服務器會話
  • 工具映射tool_to_session 將工具名映射到對應的會話,實現工具路由
  • 資源管理:使用 AsyncExitStack 管理異步資源的生命周期

4. 查詢處理 (process_query)

async def process_query(self, query):self.messages.append({'role':'user', 'content':query})response = self.client.chat.completions.create(model='qwen-turbo',tools=self.available_tools,messages=self.messages )

核心處理邏輯:

  1. 消息循環處理:使用 while process_query 循環處理多輪對話
  2. 工具調用處理:檢測并執行工具調用,通過 tool_to_session 路由到正確的MCP服務器
  3. 結果整合:將工具執行結果添加到對話歷史中

5. 服務器連接管理

async def connect_to_server(self, server_name: str, server_config: dict) -> None:# 建立單個服務器連接server_params = StdioServerParameters(**server_config)stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))# ... 獲取工具并建立映射for tool in tools:self.tool_to_session[tool.name] = sessionself.available_tools.append({"type": "function","function": {"name": tool.name,"description": tool.description,"parameters": tool.inputSchema}})
async def connect_to_servers(self):# 連接所有配置的服務器with open("server_config.json", "r") as file:data = json.load(file)servers = data.get("mcpServers", {})for server_name, server_config in servers.items():await self.connect_to_server(server_name, server_config)

6. 核心特性總結

多服務器支持

  • 可以同時連接多個MCP服務器
  • 每個服務器的工具都被統一管理
  • 通過工具名自動路由到正確的服務器

OpenAI格式兼容

  • 工具定義使用OpenAI的函數調用格式
  • 支持完整的工具調用流程

異步處理

  • 全異步設計,支持并發處理
  • 使用 AsyncExitStack 管理資源生命周期

配置化管理

  • 通過 server_config.json 配置多個服務器
  • 支持動態加載服務器配置

這個實現相比單服務器版本的主要優勢是可以整合多個不同功能的MCP服務器,為用戶提供更豐富的工具集合。

示例

uv run connect_server_map_chatbot.py

請添加圖片描述

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/907751.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/907751.shtml
英文地址,請注明出處:http://en.pswp.cn/news/907751.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

C++內存學習

引入 在實例化對象時,不管是編譯器還是我們自己,會使用構造函數給成員變量一個合適的初始值。 但是經過構造函數之后,我們還不能將其稱為成員變量的初始化: 構造函數中的語句只能稱為賦初值,而不能稱作初始化 因為初…

MySQL 大戰 PostgreSQL

一、底層架構對比 ??維度????MySQL????PostgreSQL????存儲引擎??多引擎支持(InnoDB、MyISAM等)單一存儲引擎(支持擴展如Zheap、Zedstore)??事務實現??基于UNDO日志的MVCC基于堆表(Heap)的MVCC??鎖機制??…

基于FPGA的二叉決策樹cart算法verilog實現,訓練環節采用MATLAB仿真

目錄 1.算法運行效果圖預覽 2.算法運行軟件版本 3.部分核心程序 4.算法理論概述 5.算法完整程序工程 1.算法運行效果圖預覽 (完整程序運行后無水印) MATLAB訓練結果 上述決策樹判決條件&#xff1a; 分類的決策樹1 if x21<17191.5 then node 2 elseif x21>17191…

【RAG】RAG綜述|一文了解RAG|從零開始(下)

文章目錄 5. RAG的架構5.1 Naive RAG5.2 Advanced RAG5.2.1 檢索前處理和數據索引技術5.2.2 知識分片技術5.2.3 分層索引5.2.4 檢索技術5.2.4.1 優化用戶查詢5.2.4.2 通過假想文檔嵌入修復查詢和文檔不對稱5.2.4.3 Routing5.2.4.5 自查詢檢索5.2.4.6 混合搜索5.2.4.7 圖檢索5.2…

山東大學軟件學院項目實訓-基于大模型的模擬面試系統-面試官和面試記錄的分享功能(2)

本文記錄在發布文章時&#xff0c;可以添加自己創建的面試官和面試記錄到文章中這一功能的實現。 前端 首先是在原本的界面的底部添加了兩個多選框&#xff08;后期需要美化調整&#xff09; 實現的代碼&#xff1a; <el-col style"margin-top: 1rem;"><e…

FPGA純verilog實現MIPI-DSI視頻編碼輸出,提供工程源碼和技術支持

目錄 1、前言工程概述免責聲明 2、相關方案推薦我已有的所有工程源碼總目錄----方便你快速找到自己喜歡的項目我這里已有的 MIPI 編解碼方案 3、設計思路框架工程設計原理框圖FPGA內部彩條RGB數據位寬轉換RGB數據緩存MIPI-DSI協議層編碼MIPI-DPHY物理層串化MIPI-LVDS顯示屏工程…

LXQt修改開始菜單高亮

開始菜單紅色高亮很難看 mkdir -p ~/.local/share/lxqt/palettes/ mkdir -p ~/.local/share/lxqt/themes/ cp /usr/share/lxqt/palettes/Dark ~/.local/share/lxqt/palettes/Darker cp -p /usr/share/lxqt/themes/dark ~/.local/share/lxqt/themes/darker lxqt-panel.qss L…

DeepSeek-R1-0528-Qwen3-8B 本地ollama離線運行使用和llamafactory lora微調

參考: https://huggingface.co/deepseek-ai/DeepSeek-R1-0528-Qwen3-8B 量化版本: https://huggingface.co/unsloth/DeepSeek-R1-0528-Qwen3-8B-GGUF https://docs.unsloth.ai/basics/deepseek-r1-0528-how-to-run-locally 1、ollama運行 升級ollama版本到0.9.0 支持直接…

vue3 + WebSocket + Node 搭建前后端分離項目 開箱即用

[TOC](vue3 WebSocket Node 搭建前后端分離項目) 開箱即用 前言 top1&#xff1a;vue3.5搭建前端H5 top2&#xff1a;Node.js koa搭建后端服務接口 top3&#xff1a;WebSocket 長連接實現用戶在線聊天 top4&#xff1a;接口實現模塊化 Mysql 自定義 top5&#xff1a;文件上…

Vue 前端代碼規范實戰:ESLint v9、Prettier 與 Stylelint 集成指南與最佳實踐

&#x1f680; 作者主頁&#xff1a; 有來技術 &#x1f525; 開源項目&#xff1a; youlai-mall ︱vue3-element-admin︱youlai-boot︱vue-uniapp-template &#x1f33a; 倉庫主頁&#xff1a; GitCode︱ Gitee ︱ Github &#x1f496; 歡迎點贊 &#x1f44d; 收藏 ?評論 …

docker docker-ce docker.io

Ubuntu安裝 ??更新軟件包列表?? 首先確保軟件包列表是最新的&#xff1a; sudo apt-get update 使用正確的卸載命令?? 替換 docker-engine 為 docker-ce 或 docker.io&#xff1a; sudo apt-get remove docker docker-ce docker.io containerd runc ??檢查已安裝的 Do…

C++ 初階 | 類和對象易錯知識點(下)

目錄 0.引言 1.初始化列表 2.static 靜態成員變量&#xff1a; 靜態成員函數&#xff1a; 3.友元函數 4.內部類 定義&#xff1a; 特點&#xff1a; 應用&#xff1a; 5.優化寫法 6.例題 求和12...n (不能用for/while/if/else等關鍵字) 7.總結 0.引言 今天&…

使用yocto搭建qemuarm64環境

環境 yocto下載 # 源碼下載 git clone git://git.yoctoproject.org/poky git reset --hard b223b6d533a6d617134c1c5bec8ed31657dd1268 構建 # 編譯鏡像 export MACHINE"qemuarm64" . oe-init-build-env bitbake core-image-full-cmdline 運行 # 跑虛擬機 export …

AWS WebRTC:獲取ICE服務地址(part 3):STUN服務和TURN服務的作用

STUN服務和TURN服務的作用&#xff1a; 服務全稱作用是否中繼流量適用場景STUNSession Traversal Utilities for NAT 協助設備發現自己的公網地址&#xff08;srflx candidate&#xff09; ? 不中繼&#xff0c;僅輔助NAT 穿透成功時使用TURNTraversal Using Relays around N…

分析XSSstrike源碼

#用于學習web安全自動化工具# 我能收獲什么&#xff1f; 1.XSS漏洞檢測機制 學習如何構造和發送XSS payload如何識別響應中的回顯&#xff0c;WAF&#xff0c;過濾規則等如何使用詞典&#xff0c;編碼策略&#xff0c;上下文探測等繞過過濾器 2.Python安全工具開發技巧 使…

npm run build 報錯:Some chunks are larger than 500 KB after minification

當我們的 Vue 項目太大&#xff0c;使用 npm run build 打包項目的時候&#xff0c;就有可能會遇到以下報錯&#xff1a; (!) Some chunks are larger than 500 kB after minification. Consider: - Using dynamic import() to code-split the application - Use build.rollup…

【LLM相關知識點】關于LLM項目實施流程的簡單整理(一)

【LLM相關知識點】關于LLM項目實施流程的簡單整理&#xff08;一&#xff09; 文章目錄 【LLM相關知識點】關于LLM項目實施流程的簡單整理&#xff08;一&#xff09;零、學習計劃梳理&#xff1a;結合ChatGPT從零開始學習LLM & 多模態大模型一、大模型相關應用場景和頭部企…

海上石油鉆井平臺人員安全管控解決方案

一、行業挑戰與需求分析 海上鉆井平臺面臨復雜環境風險&#xff08;如易燃易爆、金屬干擾、極端氣象&#xff09;和人員管理難題&#xff08;如定位模糊、應急響應延遲&#xff09;。傳統RFID或藍牙定位技術存在精度不足&#xff08;1-5米&#xff09;、抗干擾能力差等問題&am…

@Docker Compose 部署 Pushgateway

文章目錄 Docker Compose 部署 Pushgateway1. 目的2. 適用范圍3. 先決條件4. 部署步驟4.1 創建項目目錄4.2 創建 docker-compose.yml 文件4.3 啟動 Pushgateway 服務4.4 驗證服務運行狀態4.5 測試 Pushgateway 訪問 5. 配置 Prometheus 采集 Pushgateway 數據6. 日常維護6.1 查…

項目 react+taro 編寫的微信 小程序,什么命令,可以減少console的顯示

在 Taro 項目中&#xff0c;為了減少 console 的顯示&#xff08;例如 console.log、console.info 等&#xff09;&#xff0c;可以通過配置 terser-webpack-plugin 來移除生產環境中的 console 調用。 配置步驟&#xff1a; 修改 index.js 文件 在 mini.webpackChain 中添加 …