達夢數據庫手寫MCP服務
文件名稱
server.py
源代碼
參考mysql-mcp-server
寫的dameng數據庫版本的
點擊訪問mysql-mcp-server的github倉庫
mcp服務端
import asyncio
import logging
import os
import sys
from dmPython import connect
from mcp.server import Server
from mcp.types import Resource, Tool, TextContent
from pydantic import AnyUrl# Configure logging
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("dameng_mcp_server")def get_db_config():"""Get database configuration from environment variables."""config = {"user": os.getenv("DAMENG_USER","SYSDBA"),"password": os.getenv("DAMENG_PASSWORD","SYSDBA"),"server": os.getenv("DAMENG_HOST", "localhost"),"port": 5236,"schema": os.getenv("DAMENG_DATABASE","")}if not all([config["user"], config["password"]]):logger.error("Missing required database configuration. Please check environment variables:")logger.error("DAMENG_USER and DAMENG_PASSWORD are required")raise ValueError("Missing required database configuration")return config# Initialize server
app = Server("dameng_mcp_server")@app.list_resources()
async def list_resources() -> list[Resource]:"""List Dameng tables as resources."""config = get_db_config()try:with connect(**config) as conn:with conn.cursor() as cursor:cursor.execute("SELECT TABLE_NAME FROM ALL_TABLES")tables = cursor.fetchall()logger.info(f"Found tables: {tables}")resources = []for table in tables:resources.append(Resource(uri=f"dameng://{table[0]}/data",name=f"Table: {table[0]}",mimeType="text/plain",description=f"Data in table: {table[0]}"))return resourcesexcept Exception as e:logger.error(f"Failed to list resources: {str(e)}")return []@app.read_resource()
async def read_resource(uri: AnyUrl) -> str:"""Read table contents."""config = get_db_config()uri_str = str(uri)logger.info(f"Reading resource: {uri_str}")if not uri_str.startswith("dm://"):raise ValueError(f"Invalid URI scheme: {uri_str}")parts = uri_str[9:].split('/')table = parts[0]try:with connect(**config) as conn:with conn.cursor() as cursor:cursor.execute(f"SELECT * FROM {table} LIMIT 100")columns = [desc[0] for desc in cursor.description]rows = cursor.fetchall()result = [",".join(map(str, row)) for row in rows]return "\n".join([",".join(columns)] + result)except Exception as e:logger.error(f"Schema error reading resource {uri}: {str(e)}")raise RuntimeError(f"Schema error: {str(e)}")@app.list_tools()
async def list_tools() -> list[Tool]:"""List available Dameng tools."""logger.info("Listing tools...")return [Tool(name="execute_sql",description="Execute an SQL query on the Dameng server",inputSchema={"type": "object","properties": {"query": {"type": "string","description": "The SQL query to execute"}},"required": ["query"]})]@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:"""Execute SQL commands."""config = get_db_config()logger.info(f"Calling tool: {name} with arguments: {arguments}")if name != "execute_sql":raise ValueError(f"Unknown tool: {name}")query = arguments.get("query")if not query:raise ValueError("Query is required")try:with connect(**config) as conn:with conn.cursor() as cursor:cursor.execute(query)# Handle all other queries that return result sets (SELECT, SHOW, DESCRIBE etc.)if cursor.description is not None:columns = [desc[0] for desc in cursor.description]try:rows = cursor.fetchall()result = [",".join(map(str, row)) for row in rows]return [TextContent(type="text", text="\n".join([",".join(columns)] + result))]except Exception as e:logger.warning(f"Error fetching results: {str(e)}")return [TextContent(type="text", text=f"Query executed but error fetching results: {str(e)}")]# Non-SELECT querieselse:conn.commit()return [TextContent(type="text", text=f"Query executed successfully. Rows affected: {cursor.rowcount}")]except Exception as e:logger.error(f"Error executing SQL '{query}': {e}")return [TextContent(type="text", text=f"Error executing query: {str(e)}")]async def main():"""Main entry point to run the MCP server."""from mcp.server.stdio import stdio_server# Add additional debug outputprint("Starting Dameng MCP server with config:", file=sys.stderr)config = get_db_config()print(f"Server: {config['server']}", file=sys.stderr)print(f"Port: {config['port']}", file=sys.stderr)print(f"User: {config['user']}", file=sys.stderr)print(f"Schema: {config['schema']}", file=sys.stderr)logger.info("Starting Dameng MCP server...")logger.info(f"Schema config: {config['server']}/{config['schema']} as {config['user']}")async with stdio_server() as (read_stream, write_stream):try:await app.run(read_stream,write_stream,app.create_initialization_options())except Exception as e:logger.error(f"Server error: {str(e)}", exc_info=True)raiseif __name__ == "__main__":asyncio.run(main())
客戶端
這段引用尚硅谷宋紅康老師手寫的客戶端
import asyncio
import os
import json
from typing import Optional, List
from contextlib import AsyncExitStack
from datetime import datetime
import re
from openai import OpenAI
from dotenv import load_dotenv
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_clientload_dotenv()class MCPClient:def __init__(self):self.exit_stack = AsyncExitStack()self.openai_api_key = os.getenv("DASHSCOPE_API_KEY")self.base_url = os.getenv("BASE_URL")self.model = os.getenv("MODEL")if not self.openai_api_key:raise ValueError("? 未找到 OpenAI API Key,請在 .env 文件中設置 DASHSCOPE_API_KEY")self.client = OpenAI(api_key=self.openai_api_key, base_url=self.base_url)self.session: Optional[ClientSession] = Noneasync def connect_to_server(self, server_script_path: str):# 對服務器腳本進行判斷,只允許是 .py 或 .jsis_python = server_script_path.endswith('.py')is_js = server_script_path.endswith('.js')if not (is_python or is_js):raise ValueError("服務器腳本必須是 .py 或 .js 文件")# 確定啟動命令,.py 用 python,.js 用 nodecommand = "python" if is_python else "node"# 構造 MCP 所需的服務器參數,包含啟動命令、腳本路徑參數、環境變量(為 None 表示默認)server_params = StdioServerParameters(command=command, args=[server_script_path], env=None)# 啟動 MCP 工具服務進程(并建立 stdio 通信)stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))# 拆包通信通道,讀取服務端返回的數據,并向服務端發送請求self.stdio, self.write = stdio_transport# 創建 MCP 客戶端會話對象self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))# 初始化會話await self.session.initialize()# 獲取工具列表并打印response = await self.session.list_tools()tools = response.toolsprint("\n已連接到服務器,支持以下工具:", [tool.name for tool in tools])async def process_query(self, query: str) -> str:# 準備初始消息和獲取工具列表messages = [{"role": "user", "content": query}]response = await self.session.list_tools()available_tools = [{"type": "function","function": {"name": tool.name,"description": tool.description,"input_schema": tool.inputSchema}} for tool in response.tools]# 提取問題的關鍵詞,對文件名進行生成。# 在接收到用戶提問后就應該生成出最后輸出的 md 文檔的文件名,# 因為導出時若再生成文件名會導致部分組件無法識別該名稱。keyword_match = re.search(r'(關于|分析|查詢|搜索|查看)([^的\s,。、?\n]+)', query)keyword = keyword_match.group(2) if keyword_match else "分析對象"safe_keyword = re.sub(r'[\\/:*?"<>|]', '', keyword)[:20]timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')md_filename = f"sentiment_{safe_keyword}_{timestamp}.md"md_path = os.path.join("./sentiment_reports", md_filename)# 更新查詢,將文件名添加到原始查詢中,使大模型在調用工具鏈時可以識別到該信息# 然后調用 plan_tool_usage 獲取工具調用計劃query = query.strip() + f" [md_filename={md_filename}] [md_path={md_path}]"messages = [{"role": "user", "content": query}]tool_plan = await self.plan_tool_usage(query, available_tools)tool_outputs = {}messages = [{"role": "user", "content": query}]# 依次執行工具調用,并收集結果for step in tool_plan:tool_name = step["name"]tool_args = step["arguments"]for key, val in tool_args.items():if isinstance(val, str) and val.startswith("{{") and val.endswith("}}"):ref_key = val.strip("{} ")resolved_val = tool_outputs.get(ref_key, val)tool_args[key] = resolved_val# 注入統一的文件名或路徑(用于分析和郵件)if tool_name == "analyze_sentiment" and "filename" not in tool_args:tool_args["filename"] = md_filenameif tool_name == "send_email_with_attachment" and "attachment_path" not in tool_args:tool_args["attachment_path"] = md_pathresult = await self.session.call_tool(tool_name, tool_args)tool_outputs[tool_name] = result.content[0].textmessages.append({"role": "tool","tool_call_id": tool_name,"content": result.content[0].text})# 調用大模型生成回復信息,并輸出保存結果final_response = self.client.chat.completions.create(model=self.model,messages=messages)final_output = final_response.choices[0].message.content# 對輔助函數進行定義,目的是把文本清理成合法的文件名def clean_filename(text: str) -> str:text = text.strip()text = re.sub(r'[\\/:*?\"<>|]', '', text)return text[:50]# 使用清理函數處理用戶查詢,生成用于文件命名的前綴,并添加時間戳、設置輸出目錄# 最后構建出完整的文件路徑用于保存記錄safe_filename = clean_filename(query)timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')filename = f"{safe_filename}_{timestamp}.txt"output_dir = "./llm_outputs"os.makedirs(output_dir, exist_ok=True)file_path = os.path.join(output_dir, filename)# 將對話內容寫入 md 文檔,其中包含用戶的原始提問以及模型的最終回復結果with open(file_path, "w", encoding="utf-8") as f:f.write(f"🗣 用戶提問:{query}\n\n")f.write(f"🤖 模型回復:\n{final_output}\n")print(f"📄 對話記錄已保存為:{file_path}")return final_outputasync def chat_loop(self):# 初始化提示信息print("\n🤖 MCP 客戶端已啟動!輸入 'quit' 退出")# 進入主循環中等待用戶輸入while True:try:query = input("\n你: ").strip()if query.lower() == 'quit':break# 處理用戶的提問,并返回結果response = await self.process_query(query)print(f"\n🤖 AI: {response}")except Exception as e:print(f"\n?? 發生錯誤: {str(e)}")async def plan_tool_usage(self, query: str, tools: List[dict]) -> List[dict]:# 構造系統提示詞 system_prompt。# 將所有可用工具組織為文本列表插入提示中,并明確指出工具名,# 限定返回格式是 JSON,防止其輸出錯誤格式的數據。print("\n📤 提交給大模型的工具定義:")print(json.dumps(tools, ensure_ascii=False, indent=2))tool_list_text = "\n".join([f"- {tool['function']['name']}: {tool['function']['description']}"for tool in tools])system_prompt = {"role": "system","content": ("你是一個智能任務規劃助手,用戶會給出一句自然語言請求。\n""你只能從以下工具中選擇(嚴格使用工具名稱):\n"f"{tool_list_text}\n""如果多個工具需要串聯,后續步驟中可以使用 {{上一步工具名}} 占位。\n""返回格式:JSON 數組,每個對象包含 name 和 arguments 字段。\n""不要返回自然語言,不要使用未列出的工具名。")}# 構造對話上下文并調用模型。# 將系統提示和用戶的自然語言一起作為消息輸入,并選用當前的模型。planning_messages = [system_prompt,{"role": "user", "content": query}]response = self.client.chat.completions.create(model=self.model,messages=planning_messages,tools=tools,tool_choice="none")# 提取出模型返回的 JSON 內容content = response.choices[0].message.content.strip()match = re.search(r"```(?:json)?\\s*([\s\S]+?)\\s*```", content)if match:json_text = match.group(1)else:json_text = content# 在解析 JSON 之后返回調用計劃try:plan = json.loads(json_text)return plan if isinstance(plan, list) else []except Exception as e:print(f"? 工具調用鏈規劃失敗: {e}\n原始返回: {content}")return []async def cleanup(self):await self.exit_stack.aclose()async def main():server_script_path = "D:\\soft\\PythonProject\\mcp-project\\dmserver.py"# server_script_path = "D:\\soft\\SoftProject\\mysql_mcp_server\\src\mysql_mcp_server\\server.py"# server_script_path = "D:\\soft\\SoftProject\\mysql_mcp_server\\src\\dameng\\server.py"client = MCPClient()try:await client.connect_to_server(server_script_path)await client.chat_loop()finally:await client.cleanup()if __name__ == "__main__":asyncio.run(main())