吳恩達MCP課程(5):mcp_chatbot_prompt_resource.py

前提條件:
1、吳恩達MCP課程(5):research_server_prompt_resource.py
2、server_config_prompt_resource.json文件

{"mcpServers": {"filesystem": {"command": "npx","args": ["--y","@modelcontextprotocol/server-filesystem","."]},"research": {"command": "uv","args": ["run", "research_server_prompt_resource.py"]},"fetch": {"command": "uvx","args": ["mcp-server-fetch"]}}
}

代碼

原課程用的anthropic的,下面改成openai,并用千問模型做測試

from dotenv import load_dotenv
import openai
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from contextlib import AsyncExitStack
import json
import asyncio
import osload_dotenv()class MCP_ChatBot:def __init__(self):self.exit_stack = AsyncExitStack()self.client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"),base_url=os.getenv("OPENAI_API_BASE"))# Tools list required for OpenAI APIself.available_tools = []# Prompts list for quick display self.available_prompts = []# Sessions dict maps tool/prompt names or resource URIs to MCP client sessionsself.sessions = {}async def connect_to_server(self, server_name, server_config):try:server_params = StdioServerParameters(**server_config)stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))read, write = stdio_transportsession = await self.exit_stack.enter_async_context(ClientSession(read, write))await session.initialize()try:# List available toolsresponse = await session.list_tools()for tool in response.tools:self.sessions[tool.name] = sessionself.available_tools.append({"type": "function","function": {"name": tool.name,"description": tool.description,"parameters": tool.inputSchema}})# List available promptsprompts_response = await session.list_prompts()if prompts_response and prompts_response.prompts:for prompt in prompts_response.prompts:self.sessions[prompt.name] = sessionself.available_prompts.append({"name": prompt.name,"description": prompt.description,"arguments": prompt.arguments})# List available resourcesresources_response = await session.list_resources()if resources_response and resources_response.resources:for resource in resources_response.resources:resource_uri = str(resource.uri)self.sessions[resource_uri] = sessionexcept Exception as e:print(f"Error {e}")except Exception as e:print(f"Error connecting to {server_name}: {e}")async def connect_to_servers(self):try:with open("server_config_prompt_resource.json", "r") as file:data = json.load(file)servers = data.get("mcpServers", {})for server_name, server_config in servers.items():await self.connect_to_server(server_name, server_config)except Exception as e:print(f"Error loading server config: {e}")raiseasync def process_query(self, query):messages = [{"role":"user", "content":query}]while True:response = self.client.chat.completions.create(model="qwen-turbo",tools=self.available_tools,messages=messages)message = response.choices[0].message# 檢查是否有普通文本內容if message.content:print(message.content)messages.append({"role": "assistant", "content": message.content})break# 檢查是否有工具調用elif message.tool_calls:# 添加助手消息到歷史messages.append({"role": "assistant", "content": None,"tool_calls": message.tool_calls})# 處理每個工具調用for tool_call in message.tool_calls:tool_id = tool_call.idtool_name = tool_call.function.nametool_args = json.loads(tool_call.function.arguments)print(f"Calling tool {tool_name} with args {tool_args}")# 獲取session并調用工具session = self.sessions.get(tool_name)if not session:print(f"Tool '{tool_name}' not found.")breakresult = await session.call_tool(tool_name, arguments=tool_args)# 添加工具結果到消息歷史messages.append({"role": "tool","tool_call_id": tool_id,"content": result.content})else:breakasync def get_resource(self, resource_uri):session = self.sessions.get(resource_uri)# Fallback for papers URIs - try any papers resource sessionif not session and resource_uri.startswith("papers://"):for uri, sess in self.sessions.items():if uri.startswith("papers://"):session = sessbreakif not session:print(f"Resource '{resource_uri}' not found.")returntry:result = await session.read_resource(uri=resource_uri)if result and result.contents:print(f"\nResource: {resource_uri}")print("Content:")print(result.contents[0].text)else:print("No content available.")except Exception as e:print(f"Error: {e}")async def list_prompts(self):"""List all available prompts."""if not self.available_prompts:print("No prompts available.")returnprint("\nAvailable prompts:")for prompt in self.available_prompts:print(f"- {prompt['name']}: {prompt['description']}")if prompt['arguments']:print(f"  Arguments:")for arg in prompt['arguments']:arg_name = arg.name if hasattr(arg, 'name') else arg.get('name', '')print(f"    - {arg_name}")async def execute_prompt(self, prompt_name, args):"""Execute a prompt with the given arguments."""session = self.sessions.get(prompt_name)if not session:print(f"Prompt '{prompt_name}' not found.")returntry:result = await session.get_prompt(prompt_name, arguments=args)if result and result.messages:prompt_content = result.messages[0].content# Extract text from content (handles different formats)if isinstance(prompt_content, str):text = prompt_contentelif hasattr(prompt_content, 'text'):text = prompt_content.textelse:# Handle list of content itemstext = " ".join(item.text if hasattr(item, 'text') else str(item) for item in prompt_content)print(f"\nExecuting prompt '{prompt_name}'...")await self.process_query(text)except Exception as e:print(f"Error: {e}")async def chat_loop(self):print("\nMCP Chatbot Started!")print("Type your queries or 'quit' to exit.")print("Use @folders to see available topics")print("Use @<topic> to search papers in that topic")print("Use /prompts to list available prompts")print("Use /prompt <name> <arg1=value1> to execute a prompt")while True:try:query = input("\nQuery: ").strip()if not query:continueif query.lower() == 'quit':break# Check for @resource syntax firstif query.startswith('@'):# Remove @ sign  topic = query[1:]if topic == "folders":resource_uri = "papers://folders"else:resource_uri = f"papers://{topic}"await self.get_resource(resource_uri)continue# Check for /command syntaxif query.startswith('/'):parts = query.split()command = parts[0].lower()if command == '/prompts':await self.list_prompts()elif command == '/prompt':if len(parts) < 2:print("Usage: /prompt <name> <arg1=value1> <arg2=value2>")continueprompt_name = parts[1]args = {}# Parse argumentsfor arg in parts[2:]:if '=' in arg:key, value = arg.split('=', 1)args[key] = valueawait self.execute_prompt(prompt_name, args)else:print(f"Unknown command: {command}")continueawait self.process_query(query)except Exception as e:print(f"\nError: {str(e)}")async def cleanup(self):await self.exit_stack.aclose()async def main():chatbot = MCP_ChatBot()try:await chatbot.connect_to_servers()await chatbot.chat_loop()finally:await chatbot.cleanup()if __name__ == "__main__":asyncio.run(main())

代碼解釋

這段代碼實現了一個基于MCP(Model Context Protocol)的聊天機器人,能夠連接到多個MCP服務器,使用它們提供的工具、提示和資源。下面是對代碼的詳細解釋:

類結構與初始化

class MCP_ChatBot:def __init__(self):self.exit_stack = AsyncExitStack()self.client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"),base_url=os.getenv("OPENAI_API_BASE"))# Tools list required for OpenAI APIself.available_tools = []# Prompts list for quick display self.available_prompts = []# Sessions dict maps tool/prompt names or resource URIs to MCP client sessionsself.sessions = {}
  • AsyncExitStack:用于管理多個異步上下文管理器,確保資源正確釋放
  • openai.OpenAI:創建OpenAI客戶端,使用環境變量中的API密鑰和基礎URL
  • available_tools:存儲可用工具列表,用于OpenAI API的工具調用功能
  • available_prompts:存儲可用提示列表,用于快速顯示
  • sessions:字典,將工具名稱、提示名稱或資源URI映射到對應的MCP客戶端會話

服務器連接

async def connect_to_server(self, server_name, server_config):# 創建服務器參數、建立連接并初始化會話# 獲取可用工具、提示和資源

這個方法負責:

  1. 使用提供的配置創建StdioServerParameters
  2. 通過stdio_client建立與服務器的連接
  3. 創建并初始化ClientSession
  4. 獲取服務器提供的工具、提示和資源
  5. 將它們添加到相應的列表和字典中
async def connect_to_servers(self):# 從配置文件加載服務器信息并連接到每個服務器

這個方法從server_config_prompt_resource.json文件加載服務器配置,并為每個服務器調用connect_to_server方法。

查詢處理

async def process_query(self, query):# 處理用戶查詢,使用OpenAI API和可用工具

這個方法是聊天機器人的核心,它:

  1. 創建包含用戶查詢的消息列表
  2. 調用OpenAI API,傳遞消息和可用工具
  3. 處理API的響應:
    • 如果有普通文本內容,打印并添加到消息歷史
    • 如果有工具調用,執行每個工具調用并將結果添加到消息歷史

工具調用的處理流程:

  1. 從響應中提取工具ID、名稱和參數
  2. sessions字典中獲取對應的會話
  3. 調用工具并獲取結果
  4. 將結果添加到消息歷史中

資源和提示管理

async def get_resource(self, resource_uri):# 獲取并顯示指定URI的資源內容

這個方法用于獲取和顯示資源內容,特別是論文資源。它會:

  1. sessions字典中獲取對應的會話
  2. 對于以papers://開頭的URI,如果找不到對應會話,會嘗試使用任何處理papers的會話
  3. 調用read_resource方法獲取資源內容
  4. 打印資源內容
async def list_prompts(self):# 列出所有可用的提示

這個方法列出所有可用的提示及其描述和參數。

async def execute_prompt(self, prompt_name, args):# 執行指定的提示,并處理結果

這個方法執行指定的提示,并將結果傳遞給process_query方法進行處理。它處理不同格式的提示內容,確保能夠正確提取文本。

聊天循環

async def chat_loop(self):# 主聊天循環,處理用戶輸入

這個方法是聊天機器人的主循環,它:

  1. 打印歡迎信息和使用說明
  2. 循環接收用戶輸入
  3. 根據輸入的不同格式執行不同操作:
    • 如果輸入是quit,退出循環
    • 如果輸入以@開頭,調用get_resource方法獲取資源
    • 如果輸入以/開頭,執行命令(如列出提示或執行提示)
    • 否則,調用process_query方法處理普通查詢

資源清理

async def cleanup(self):# 清理資源

這個方法調用exit_stack.aclose()來清理所有資源。

主函數

async def main():chatbot = MCP_ChatBot()try:await chatbot.connect_to_servers()await chatbot.chat_loop()finally:await chatbot.cleanup()

主函數創建MCP_ChatBot實例,連接到服務器,運行聊天循環,并確保在結束時清理資源。

代碼運行結果

uv run mcp_chatbot_prompt_resource.py

在這里插入圖片描述

在這里插入圖片描述
在這里插入圖片描述
在這里插入圖片描述
在這里插入圖片描述
在這里插入圖片描述

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/82767.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/82767.shtml
英文地址,請注明出處:http://en.pswp.cn/web/82767.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【Linux】Linux基礎指令3

1. which指令 功能&#xff1a;搜索系統指定的命令 2. whereis指令 功能&#xff1a;?于找到程序的源、?進制?件或?冊 3. grep指令 語法&#xff1a; grep [ 選項 ] 搜尋字符串 ?件 功能&#xff1a;在?件中搜索字符串&#xff0c;將找到的?打印出來 常?選項&…

李沐《動手學深度學習》d2l安裝教程

文章目錄 最新回答報錯提醒安裝對應版本安裝C工具和Windows SDK 最新回答 安裝舊版本即可 pip install d2l0.17.0 WARNING: Ignoring invalid distribution -pencv-python (e:\python3.10\lib\site-packages) Looking in indexes: https://pypi.tuna.tsinghua.edu.cn/simple C…

CMake 為 Debug 版本的庫或可執行文件添加 d 后綴

在使用 CMake 構建項目時,我們經常需要區分 Debug 和 Release 構建版本。一個常見的做法是為 Debug 版本的庫或可執行文件添加后綴(如 d),例如 libmylibd.so 或 myappd.exe。 本文將介紹幾種在 CMake 中實現為 Debug 版本自動添加 d 后綴的方法。 方法一:使用 CMAKE_DEBU…

echarts樹狀圖與vue3

父組件 - 使用RadialTreeView <template><div class"page-container"><div class"header-title">美國產品圖譜 (ECharts Radial Tree)</div><RadialTreeView :chart-data"radialData" background-color"#E6E6F…

C# 日志管理功能代碼

一、功能概述 本應用通過 AsyncFileLogger 類提供了靈活的日志控制功能&#xff0c;可在運行時通過 UI 界面啟用或禁用日志記錄。日志系統具有以下特點&#xff1a; 可控制開關&#xff1a;通過按鈕隨時啟用或禁用日志&#xff0c;無需重啟應用異步寫入&#xff1a;日志記錄采…

CSS 性能優化

目錄 CSS 性能優化CSS 提高性能的方法1. 選擇器優化1.1 選擇器性能原則1.2 選擇器優化示例 2. 重排&#xff08;Reflow&#xff09;和重繪&#xff08;Repaint&#xff09;優化2.1 重排和重繪的概念2.2 觸發重排的操作2.3 觸發重繪的操作2.4 優化重排和重繪的方法 3. 資源優化3…

【JJ斗地主-注冊安全分析報告】

前言 由于網站注冊入口容易被黑客攻擊&#xff0c;存在如下安全問題&#xff1a; 暴力破解密碼&#xff0c;造成用戶信息泄露短信盜刷的安全問題&#xff0c;影響業務及導致用戶投訴帶來經濟損失&#xff0c;尤其是后付費客戶&#xff0c;風險巨大&#xff0c;造成虧損無底洞 …

SON.stringify()和JSON.parse()之間的轉換

1.JSON.stringify() 作用&#xff1a;將對象、數組轉換成字符串 const obj {code: "500",message: "出錯了", }; const jsonString JSON.stringify(obj); console.log(jsonString);//"{"code":"Mark Lee","message"…

MongoDB $type 操作符詳解

MongoDB $type 操作符詳解 引言 MongoDB 是一款流行的開源文檔型數據庫,它提供了豐富的查詢操作符來滿足不同的數據查詢需求。在 MongoDB 中,$type 操作符是一個非常有用的查詢操作符,它允許用戶根據文檔中字段的類型來查詢文檔。本文將詳細介紹 MongoDB 的 $type 操作符,…

RagFlow優化代碼解析(一)

引子 前文寫到RagFlow的環境搭建&推理測試&#xff0c;感興趣的童鞋可以移步&#xff08;RagFlow環境搭建&推理測試-CSDN博客&#xff09;。前文也寫過RagFLow參數配置&測試的文檔&#xff0c;詳見&#xff08;RagFlow環境搭建&推理測試-CSDN博客&#xff09;…

永磁同步電機控制算法--模糊PI轉速控制器

一、原理介紹 在常規的PID控制系統的基礎上提出了一種模糊PID以及矢量變換方法相結合的控制系統&#xff0c;經過仿真分析對比證明&#xff1a; 模糊PID控制系統能夠有效的提高永磁同步電機的轉速響應速度&#xff0c;降低轉矩脈動&#xff0c;增強了整體控制系統的抗干擾能力…

MySQL基本操作(續)

第3章&#xff1a;MySQL基本操作&#xff08;續&#xff09; 3.3 表操作 表是關系型數據庫中存儲數據的基本結構&#xff0c;由行和列組成。在MySQL中&#xff0c;表操作包括創建表、查看表結構、修改表和刪除表等。本節將詳細介紹這些操作。 3.3.1 創建表 在MySQL中&#…

探索未知驚喜,盲盒抽卡機小程序系統開發新啟航

在消費市場不斷追求新鮮感與驚喜體驗的當下&#xff0c;盲盒抽卡機以其獨特的魅力&#xff0c;迅速成為眾多消費者熱衷的娛樂與消費方式。我們緊跟這一潮流趨勢&#xff0c;專注于盲盒抽卡機小程序系統的開發&#xff0c;致力于為商家和用戶打造一個充滿趣味與驚喜的數字化平臺…

89.實現添加收藏的功能的后端實現

實現完查看收藏列表之后&#xff0c;實現的是添加收藏的功能 我的設想是&#xff1a;在對話界面中&#xff0c;如果用戶認為AI的回答非常好&#xff0c;可以通過點擊該回答對應的氣泡中的圖標&#xff0c;對該內容進行添加 所以后端實現為&#xff1a; service類中添加&…

OD 算法題 B卷【猴子吃桃】

文章目錄 猴子吃桃 猴子吃桃 猴子喜歡吃桃&#xff0c;桃園有N棵桃樹&#xff0c;第i棵桃樹上有Ni個桃&#xff0c;看守將在H(>N)小時后回來&#xff1b;猴子可以決定吃桃的速度K(個/小時)&#xff0c;每個小時他會選擇一棵桃樹&#xff0c;從中吃掉K個桃&#xff0c;如果這…

ubuntu 端口復用

需求描述&#xff1a;復用服務器的 80端口&#xff0c;同時處理 ssh 和 http 請求&#xff0c;也就是 ssh 連接和 http 訪問服務器的時候都可以指定 80 端口&#xff0c;然后服務器可以正確分發請求給 ssh 或者 http。 此時&#xff0c;ssh 監聽的端口為 22&#xff0c;而 htt…

Hive中ORC存儲格式的優化方法

優化Hive中的ORC(Optimized Row Columnar)存儲格式可顯著提升查詢性能、降低存儲成本。以下是詳細的優化方法,涵蓋參數配置、數據組織、寫入優化及監控調優等維度: 一、ORC核心參數優化 1. 存儲與壓縮參數 SET orc.block.size=268435456; -- 塊大小(默認256MB)…

Vim 設置搜索高亮底色

在 Vim 中&#xff0c;默認搜索命中會高亮顯示&#xff0c;方便用戶快速定位關鍵字。但有些用戶希望自定義搜索匹配的底色或前景色&#xff0c;以適應不同的配色方案或提高可讀性。本文將詳細介紹如何修改 Vim 的搜索高亮顏色。 一、Vim 搜索高亮機制 Vim 用內置的高亮組&…

【計算機網絡】非阻塞IO——poll實現多路轉接

&#x1f525;個人主頁&#x1f525;&#xff1a;孤寂大仙V &#x1f308;收錄專欄&#x1f308;&#xff1a;計算機網絡 &#x1f339;往期回顧&#x1f339;&#xff1a;【計算機網絡】非阻塞IO——select實現多路轉接 &#x1f516;流水不爭&#xff0c;爭的是滔滔不息 一、…

vscode使用系列之快速生成html模板

一.歡迎來到我的酒館 vscode&#xff0c;yyds! 目錄 一.歡迎來到我的酒館二.vscode下載安裝1.關于vscode你需要知道2.開始下載安裝 三.vscode快速創建html模板 二.vscode下載安裝 1.關于vscode你需要知道 Q&#xff1a;為什么使用vscode? A&#xff1a;使用vscode寫…