dameng-mcp-server達夢MCP服務

達夢數據庫手寫MCP服務

文件名稱

server.py

源代碼

參考mysql-mcp-server寫的dameng數據庫版本的
點擊訪問mysql-mcp-server的github倉庫
在這里插入圖片描述

mcp服務端

import asyncio
import logging
import os
import sys
from dmPython import connect
from mcp.server import Server
from mcp.types import Resource, Tool, TextContent
from pydantic import AnyUrl# Configure logging
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("dameng_mcp_server")def get_db_config():"""Get database configuration from environment variables."""config = {"user": os.getenv("DAMENG_USER","SYSDBA"),"password": os.getenv("DAMENG_PASSWORD","SYSDBA"),"server": os.getenv("DAMENG_HOST", "localhost"),"port": 5236,"schema": os.getenv("DAMENG_DATABASE","")}if not all([config["user"], config["password"]]):logger.error("Missing required database configuration. Please check environment variables:")logger.error("DAMENG_USER and DAMENG_PASSWORD are required")raise ValueError("Missing required database configuration")return config# Initialize server
app = Server("dameng_mcp_server")@app.list_resources()
async def list_resources() -> list[Resource]:"""List Dameng tables as resources."""config = get_db_config()try:with connect(**config) as conn:with conn.cursor() as cursor:cursor.execute("SELECT TABLE_NAME FROM ALL_TABLES")tables = cursor.fetchall()logger.info(f"Found tables: {tables}")resources = []for table in tables:resources.append(Resource(uri=f"dameng://{table[0]}/data",name=f"Table: {table[0]}",mimeType="text/plain",description=f"Data in table: {table[0]}"))return resourcesexcept Exception as e:logger.error(f"Failed to list resources: {str(e)}")return []@app.read_resource()
async def read_resource(uri: AnyUrl) -> str:"""Read table contents."""config = get_db_config()uri_str = str(uri)logger.info(f"Reading resource: {uri_str}")if not uri_str.startswith("dm://"):raise ValueError(f"Invalid URI scheme: {uri_str}")parts = uri_str[9:].split('/')table = parts[0]try:with connect(**config) as conn:with conn.cursor() as cursor:cursor.execute(f"SELECT * FROM {table} LIMIT 100")columns = [desc[0] for desc in cursor.description]rows = cursor.fetchall()result = [",".join(map(str, row)) for row in rows]return "\n".join([",".join(columns)] + result)except Exception as e:logger.error(f"Schema error reading resource {uri}: {str(e)}")raise RuntimeError(f"Schema error: {str(e)}")@app.list_tools()
async def list_tools() -> list[Tool]:"""List available Dameng tools."""logger.info("Listing tools...")return [Tool(name="execute_sql",description="Execute an SQL query on the Dameng server",inputSchema={"type": "object","properties": {"query": {"type": "string","description": "The SQL query to execute"}},"required": ["query"]})]@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:"""Execute SQL commands."""config = get_db_config()logger.info(f"Calling tool: {name} with arguments: {arguments}")if name != "execute_sql":raise ValueError(f"Unknown tool: {name}")query = arguments.get("query")if not query:raise ValueError("Query is required")try:with connect(**config) as conn:with conn.cursor() as cursor:cursor.execute(query)# Handle all other queries that return result sets (SELECT, SHOW, DESCRIBE etc.)if cursor.description is not None:columns = [desc[0] for desc in cursor.description]try:rows = cursor.fetchall()result = [",".join(map(str, row)) for row in rows]return [TextContent(type="text", text="\n".join([",".join(columns)] + result))]except Exception as e:logger.warning(f"Error fetching results: {str(e)}")return [TextContent(type="text", text=f"Query executed but error fetching results: {str(e)}")]# Non-SELECT querieselse:conn.commit()return [TextContent(type="text", text=f"Query executed successfully. Rows affected: {cursor.rowcount}")]except Exception as e:logger.error(f"Error executing SQL '{query}': {e}")return [TextContent(type="text", text=f"Error executing query: {str(e)}")]async def main():"""Main entry point to run the MCP server."""from mcp.server.stdio import stdio_server# Add additional debug outputprint("Starting Dameng MCP server with config:", file=sys.stderr)config = get_db_config()print(f"Server: {config['server']}", file=sys.stderr)print(f"Port: {config['port']}", file=sys.stderr)print(f"User: {config['user']}", file=sys.stderr)print(f"Schema: {config['schema']}", file=sys.stderr)logger.info("Starting Dameng MCP server...")logger.info(f"Schema config: {config['server']}/{config['schema']} as {config['user']}")async with stdio_server() as (read_stream, write_stream):try:await app.run(read_stream,write_stream,app.create_initialization_options())except Exception as e:logger.error(f"Server error: {str(e)}", exc_info=True)raiseif __name__ == "__main__":asyncio.run(main())

客戶端

這段引用尚硅谷宋紅康老師手寫的客戶端

import asyncio
import os
import json
from typing import Optional, List
from contextlib import AsyncExitStack
from datetime import datetime
import re
from openai import OpenAI
from dotenv import load_dotenv
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_clientload_dotenv()class MCPClient:def __init__(self):self.exit_stack = AsyncExitStack()self.openai_api_key = os.getenv("DASHSCOPE_API_KEY")self.base_url = os.getenv("BASE_URL")self.model = os.getenv("MODEL")if not self.openai_api_key:raise ValueError("? 未找到 OpenAI API Key,請在 .env 文件中設置 DASHSCOPE_API_KEY")self.client = OpenAI(api_key=self.openai_api_key, base_url=self.base_url)self.session: Optional[ClientSession] = Noneasync def connect_to_server(self, server_script_path: str):# 對服務器腳本進行判斷,只允許是 .py 或 .jsis_python = server_script_path.endswith('.py')is_js = server_script_path.endswith('.js')if not (is_python or is_js):raise ValueError("服務器腳本必須是 .py 或 .js 文件")# 確定啟動命令,.py 用 python,.js 用 nodecommand = "python" if is_python else "node"# 構造 MCP 所需的服務器參數,包含啟動命令、腳本路徑參數、環境變量(為 None 表示默認)server_params = StdioServerParameters(command=command, args=[server_script_path], env=None)# 啟動 MCP 工具服務進程(并建立 stdio 通信)stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))# 拆包通信通道,讀取服務端返回的數據,并向服務端發送請求self.stdio, self.write = stdio_transport# 創建 MCP 客戶端會話對象self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))# 初始化會話await self.session.initialize()# 獲取工具列表并打印response = await self.session.list_tools()tools = response.toolsprint("\n已連接到服務器,支持以下工具:", [tool.name for tool in tools])async def process_query(self, query: str) -> str:# 準備初始消息和獲取工具列表messages = [{"role": "user", "content": query}]response = await self.session.list_tools()available_tools = [{"type": "function","function": {"name": tool.name,"description": tool.description,"input_schema": tool.inputSchema}} for tool in response.tools]# 提取問題的關鍵詞,對文件名進行生成。# 在接收到用戶提問后就應該生成出最后輸出的 md 文檔的文件名,# 因為導出時若再生成文件名會導致部分組件無法識別該名稱。keyword_match = re.search(r'(關于|分析|查詢|搜索|查看)([^的\s,。、?\n]+)', query)keyword = keyword_match.group(2) if keyword_match else "分析對象"safe_keyword = re.sub(r'[\\/:*?"<>|]', '', keyword)[:20]timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')md_filename = f"sentiment_{safe_keyword}_{timestamp}.md"md_path = os.path.join("./sentiment_reports", md_filename)# 更新查詢,將文件名添加到原始查詢中,使大模型在調用工具鏈時可以識別到該信息# 然后調用 plan_tool_usage 獲取工具調用計劃query = query.strip() + f" [md_filename={md_filename}] [md_path={md_path}]"messages = [{"role": "user", "content": query}]tool_plan = await self.plan_tool_usage(query, available_tools)tool_outputs = {}messages = [{"role": "user", "content": query}]# 依次執行工具調用,并收集結果for step in tool_plan:tool_name = step["name"]tool_args = step["arguments"]for key, val in tool_args.items():if isinstance(val, str) and val.startswith("{{") and val.endswith("}}"):ref_key = val.strip("{} ")resolved_val = tool_outputs.get(ref_key, val)tool_args[key] = resolved_val# 注入統一的文件名或路徑(用于分析和郵件)if tool_name == "analyze_sentiment" and "filename" not in tool_args:tool_args["filename"] = md_filenameif tool_name == "send_email_with_attachment" and "attachment_path" not in tool_args:tool_args["attachment_path"] = md_pathresult = await self.session.call_tool(tool_name, tool_args)tool_outputs[tool_name] = result.content[0].textmessages.append({"role": "tool","tool_call_id": tool_name,"content": result.content[0].text})# 調用大模型生成回復信息,并輸出保存結果final_response = self.client.chat.completions.create(model=self.model,messages=messages)final_output = final_response.choices[0].message.content# 對輔助函數進行定義,目的是把文本清理成合法的文件名def clean_filename(text: str) -> str:text = text.strip()text = re.sub(r'[\\/:*?\"<>|]', '', text)return text[:50]# 使用清理函數處理用戶查詢,生成用于文件命名的前綴,并添加時間戳、設置輸出目錄# 最后構建出完整的文件路徑用于保存記錄safe_filename = clean_filename(query)timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')filename = f"{safe_filename}_{timestamp}.txt"output_dir = "./llm_outputs"os.makedirs(output_dir, exist_ok=True)file_path = os.path.join(output_dir, filename)# 將對話內容寫入 md 文檔,其中包含用戶的原始提問以及模型的最終回復結果with open(file_path, "w", encoding="utf-8") as f:f.write(f"🗣 用戶提問:{query}\n\n")f.write(f"🤖 模型回復:\n{final_output}\n")print(f"📄 對話記錄已保存為:{file_path}")return final_outputasync def chat_loop(self):# 初始化提示信息print("\n🤖 MCP 客戶端已啟動!輸入 'quit' 退出")# 進入主循環中等待用戶輸入while True:try:query = input("\n你: ").strip()if query.lower() == 'quit':break# 處理用戶的提問,并返回結果response = await self.process_query(query)print(f"\n🤖 AI: {response}")except Exception as e:print(f"\n?? 發生錯誤: {str(e)}")async def plan_tool_usage(self, query: str, tools: List[dict]) -> List[dict]:# 構造系統提示詞 system_prompt。# 將所有可用工具組織為文本列表插入提示中,并明確指出工具名,# 限定返回格式是 JSON,防止其輸出錯誤格式的數據。print("\n📤 提交給大模型的工具定義:")print(json.dumps(tools, ensure_ascii=False, indent=2))tool_list_text = "\n".join([f"- {tool['function']['name']}: {tool['function']['description']}"for tool in tools])system_prompt = {"role": "system","content": ("你是一個智能任務規劃助手,用戶會給出一句自然語言請求。\n""你只能從以下工具中選擇(嚴格使用工具名稱):\n"f"{tool_list_text}\n""如果多個工具需要串聯,后續步驟中可以使用 {{上一步工具名}} 占位。\n""返回格式:JSON 數組,每個對象包含 name 和 arguments 字段。\n""不要返回自然語言,不要使用未列出的工具名。")}# 構造對話上下文并調用模型。# 將系統提示和用戶的自然語言一起作為消息輸入,并選用當前的模型。planning_messages = [system_prompt,{"role": "user", "content": query}]response = self.client.chat.completions.create(model=self.model,messages=planning_messages,tools=tools,tool_choice="none")# 提取出模型返回的 JSON 內容content = response.choices[0].message.content.strip()match = re.search(r"```(?:json)?\\s*([\s\S]+?)\\s*```", content)if match:json_text = match.group(1)else:json_text = content# 在解析 JSON 之后返回調用計劃try:plan = json.loads(json_text)return plan if isinstance(plan, list) else []except Exception as e:print(f"? 工具調用鏈規劃失敗: {e}\n原始返回: {content}")return []async def cleanup(self):await self.exit_stack.aclose()async def main():server_script_path = "D:\\soft\\PythonProject\\mcp-project\\dmserver.py"# server_script_path = "D:\\soft\\SoftProject\\mysql_mcp_server\\src\mysql_mcp_server\\server.py"# server_script_path = "D:\\soft\\SoftProject\\mysql_mcp_server\\src\\dameng\\server.py"client = MCPClient()try:await client.connect_to_server(server_script_path)await client.chat_loop()finally:await client.cleanup()if __name__ == "__main__":asyncio.run(main())

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/77964.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/77964.shtml
英文地址,請注明出處:http://en.pswp.cn/web/77964.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

IntelliJ IDEA 內存優化

優化插件使用 1&#xff09;卸載不必要插件&#xff1a;進入 “設置”→“插件”→“已安裝”&#xff0c;查看并卸載不常用的插件&#xff0c;如代碼分析、代碼覆蓋率等不常用的插件&#xff0c;只保留必要的插件。2&#xff09;定期清理與更新插件&#xff1a;定期檢查插件更…

TCL中環深化全球布局,技術迭代應對行業調整

在全球能源轉型加速與光伏行業深度調整的雙重背景下,TCL中環憑借前瞻性的全球化布局與核心技術突破,持續鞏固行業領先地位。2024年年報顯示,報告期內實現營業收入284.19億元,凈利潤為-108.06億元。面對行業周期性虧損與產業鏈價格非理性競爭帶來的業績壓力,公司自2024年下半年起…

dubbo 異步化實踐

DubboService public class AsyncOrderFacadeImpl implements AsyncOrderFacade {private Logger logger LoggerFactory.getLogger(AsyncOrderFacadeImpl.class);// 構建線程池ThreadPoolExecutor threadPoolExecutor new ThreadPoolExecutor(1000, 1000, 10, TimeUnit.SECOND…

CSS3布局方式介紹

CSS3布局方式介紹 CSS3布局(Layout)系統是現代網頁設計中用于構建頁面結構和控制元素排列的一組強大工具。CSS3提供了多種布局方式,每種方式都有其適用場景,其中最常用的是Flexbox和CSS Grid。 先看傳統上幾種布局方式,再較詳細的介紹現代布局方式Flexbox和CSS Grid。 傳…

MoonBit支持國產芯片開發--性能媲美C

MoonBit支持國產芯片開發–性能媲美C 在 ESP32-C3 上實現生命游戲 過去&#xff0c;我們曾在文章《硬件實現&#xff1a;在ESP32-C6單片機上運行MoonBit WASM-4小游戲》中&#xff0c;展示了如何通過 WebAssembly (WASM) 將 MoonBit 程序移植到物理硬件&#xff0c;初步探索其…

【RAG 框架部署】LangChain-Chatchat (原 Langchain-ChatGLM) + Ollama

目錄 前言 一、什么是RAG&#xff1f; 二、環境準備和Ollama搭建 1、conda虛擬環境配置 2、Ollama搭建 三、LangChain-Chatchat搭建 1、框架安裝 2、文件配置 3、初始化知識庫 4、啟動Langchan-Chatchat 前言 由于LangChain-Chatchat的 0.3.0 版本已修改為支持不同模…

python對接馬來西亞股票完整代碼

StockTV全球股票數據API對接實戰&#xff1a;構建智能金融分析系統 一、StockTV API核心功能解析 StockTV作為覆蓋200國家證券市場的數據平臺&#xff0c;其API提供三大核心模塊的對接能力&#xff1a; 市場列表查詢 - 獲取指定國家的股票基礎數據個股詳情檢索 - 查詢實時行情…

普通IT的股票交易成長史--20250430晚

聲明&#xff1a;本文章的內容只是自己學習的總結&#xff0c;不構成投資建議。文中觀點基本來自yt站Andylee&#xff0c;美股Alpha姐&#xff0c;綜合自己的觀點得出。感謝他們的無私分享。 送給自己的話&#xff1a; 倉位就是生命&#xff0c;絕對不能滿倉&#xff01;&…

windows 下 oracle 數據庫的備份與還原

1、備份 創建備份出來的文件存放的位置。 創建目錄對象&#xff0c;在數據庫中創建一個目錄對象&#xff0c;該對象指向文件系統中用于存儲導出文件的實際目錄&#xff08; sql 命令&#xff0c;可以在 plsql 中執行&#xff09;。 -- 創建目錄對象&#xff0c;\D:\Oracle19c\…

基于單片機的智能藥盒系統

標題:基于單片機的智能藥盒系統 內容:1.摘要 本文聚焦于基于單片機的智能藥盒系統。背景方面&#xff0c;隨著人口老齡化加劇&#xff0c;老年人按時準確服藥問題愈發凸顯&#xff0c;同時現代快節奏生活也使人們容易遺忘服藥時間。目的是設計并實現一個能幫助人們按時、按量服…

“100% 成功的 PyTorch CUDA GPU 支持” 安裝攻略

#工作記錄 一、總述 在深度學習領域&#xff0c;PyTorch 憑借其靈活性和強大的功能&#xff0c;成為了眾多開發者和研究者的首選框架。而 CUDA GPU 支持能夠顯著加速 PyTorch 的計算過程&#xff0c;大幅提升訓練和推理效率。然而&#xff0c;安裝帶有 CUDA GPU 支持的 PyTor…

圖數據庫榜單網站

圖數據庫榜單 https://db-engines.com/en/ranking/graphdbms點擊跳轉

Android Jetpack Compose 面試題大全(2025最新整理)

基礎概念 什么是 Jetpack Compose&#xff1f;它與傳統 Android UI 開發有何不同&#xff1f; Compose 是 Android 的現代聲明式 UI 工具包&#xff0c;使用 Kotlin 編寫不同于傳統的基于 View 和 XML 的 imperative 方式&#xff0c;Compose 使用聲明式范式主要區別&#xff1…

添加了addResourceHandlers 但沒用

B站黑馬的視頻 public class WebMvcConfig extends WebMvcConfigurationSupport { /** * 設置靜態資源映射 * param registry */ Override protected void addResourceHandlers(ResourceHandlerRegistry registry) { log.info("開始進…

STM32實現simpleFOC控制無刷電機

一、FOC基礎知識學習 使用simpleFOC控制無刷電機前&#xff0c;需要大概了解一下相關知識&#xff0c;包括力矩控制、速度控制、位置控制的原理和它們之間的聯系。 推薦學習資料&#xff1a; 教你寫一個比SimpleFOC更好的電機庫_嗶哩嗶哩_bilibili 《燈哥手把手教你寫FOC算…

【數據結構】快慢指針

一、快慢指針的原理 定義&#xff1a; 快指針&#xff1a;每次移動兩步 慢指針&#xff1a;每次移動一步 終止條件&#xff1a; 當快指針到達鏈表末尾時停止 事件復雜度&#xff1a; 始終為O(n),僅需依次遍歷 空間復雜度&#xff1a; …

畢業論文 | 基于STM32的自動煙霧報警系統設計

基于STM32的煙霧報警系統 一、系統設計原理1. **系統架構**2. **工作原理**二、核心公式與算法1. **MQ-2傳感器濃度計算**2. **溫度傳感器數據處理**3. **校準與濾波**三、關鍵代碼實現1. **ADC初始化與數據讀取(以MQ-2為例)**2. **報警邏輯與閾值設置**3. **EEPROM存儲閾值*…

Android Gradle插件開發

文章目錄 1. Gradle插件是什么2. 為什么需要插件3. 編寫插件位置4. 編寫插件5. 自定義插件擴展5.1 訂閱擴展對象5.2 把擴展添加給Plugin并使用5.3 配置參數5.4 嵌套擴展5.4.1 定義擴展5.4.2 獲取擴展屬性5.4.3 使用5.4.4 執行5.4.5 輸出 6. 編寫在單獨項目里6.1 新建Module6.2 …

PPIO X OWL:一鍵開啟任務自動化的高效革命

2024年&#xff0c;僅憑一PPIO X OWL&#xff1a;一鍵開啟任務自動化的高效革命篇技術論文&#xff0c;OWL的Github倉庫便在24小時斬獲了15k Star&#xff0c;成為2024年增速最快的多智能體協作框架&#xff0c;重新定義了任務自動化的效率邊界。Camel AI團隊開源全棧方案&…

分布式事務,事務失效,TC事務協調者

1. 概述 本方案書旨在解決分布式系統中事務一致性問題&#xff0c;重點闡述全局事務標識&#xff08;XID&#xff09;的傳遞與存儲機制、事務協調者&#xff08;TC&#xff09;的設計與部署&#xff0c;以及分布式事務失效場景的應對策略。基于業界成熟框架&#xff08;如Seat…