【SpringAI】8.通過json動態添加mcp服務

前言

官方示例的代碼中,mcp一般是配置到yml中或者json文件中,使用自動裝配的方式注入服務,這種方式不方便在程序啟動后添加新的服務,這里參考cherry studio的方式動態添加mcp服務

1.確定方案

  • mcp服務的維護放到mysql業務數據庫維護,前端通過json來添加mcp服務,為什么是json?因為開源的mcp服務都提供json示例,拿來即用

  • 后端輪詢mcp服務確保服務可用狀態

  • 前端動態切換模型時,根據模型是否支持工具調和是否啟用mcp來控制是否使用工具

  • 一個mcp服務下可能有一系列的工具,提供一個查看mcp服務工具的頁面

2. pom依賴

 <!-- Spring AI MCP 核心包 (包含ToolCallbackProvider) --><dependency><groupId>org.springframework.ai</groupId><artifactId>spring-ai-starter-mcp-client</artifactId></dependency><!-- Spring AI Model (ToolCallback接口等) --><dependency><groupId>org.springframework.ai</groupId><artifactId>spring-ai-model</artifactId><version>${spring-ai.version}</version></dependency>

其他的依賴見前面幾篇文章

3. 新增mcp表

CREATE TABLE `ai_mcp` (`id` bigint NOT NULL AUTO_INCREMENT,`content_type` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_german2_ci NOT NULL COMMENT '連接類型,sse,stdio',`name` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_german2_ci NOT NULL COMMENT 'mcp名稱',`description` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_german2_ci DEFAULT NULL COMMENT '功能描述',`config_json` json NOT NULL COMMENT '配置參數,json類型',`status` tinyint(1) DEFAULT NULL COMMENT '可用狀態,1可用,0不可用',`update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=11 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_german2_ci;

4.補充增刪改查

使用的mybatis-plus,省略mapper層


import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.lanyu.springainovel.entity.AiMcp;
import org.lanyu.springainovel.entity.ServerConfig;
import org.lanyu.springainovel.mapper.AiMcpMapper;
import org.lanyu.springainovel.util.ConversionUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.sql.Timestamp;
import java.util.List;@Service
public class AiMcpService {public AiMcpService() {super();}@Autowiredprivate AiMcpMapper aiMcpMapper;public List<AiMcp> getAllEnabledMcp() {QueryWrapper<AiMcp> query = new QueryWrapper<>();query.eq("status", 1);return aiMcpMapper.selectList(query);}public void addMcp(AiMcp mcp, DynamicMcpClientManager dynamicMcpClientManager) {mcp.setUpdateTime(new Timestamp(System.currentTimeMillis()));// 從configJson中提取mcpServers作為mcp名稱和類型try {ObjectMapper objectMapper = new ObjectMapper();JsonNode root = objectMapper.readTree(mcp.getConfigJson());JsonNode serversNode = root.path("mcpServers");if (serversNode.isObject() && serversNode.size() > 0) {// 獲取第一個服務器名稱作為mcp名稱String serverName = serversNode.fieldNames().next();mcp.setName(serverName);// 獲取服務器類型JsonNode serverNode = serversNode.path(serverName);String type = serverNode.path("type").asText("");if ("sse".equalsIgnoreCase(type) || "stdio".equalsIgnoreCase(type)) {mcp.setContentType(type);}}} catch (Exception e) {// 日志輸出}aiMcpMapper.insert(mcp);if (mcp.getStatus() != null && mcp.getStatus() == 1) {// 解析 configJson,提取所有 servertry {ObjectMapper objectMapper = new ObjectMapper();JsonNode root = objectMapper.readTree(mcp.getConfigJson());JsonNode serversNode = root.path("mcpServers");if (serversNode.isObject()) {serversNode.fields().forEachRemaining(entry -> {String serverName = entry.getKey();JsonNode serverNode = entry.getValue();String type = serverNode.path("type").asText("");if ("sse".equalsIgnoreCase(type) || "stdio".equalsIgnoreCase(type)) {ServerConfig config = ConversionUtil.parseServerConfigFromJson(serverName, serverNode, type);if (config != null) {dynamicMcpClientManager.addOrUpdateServerConfig(serverName, config);}}});}} catch (Exception e) {// 日志輸出}}}public void updateMcp(AiMcp mcp) {mcp.setUpdateTime(new Timestamp(System.currentTimeMillis()));// 從configJson中提取mcpServers作為mcp名稱try {ObjectMapper objectMapper = new ObjectMapper();JsonNode root = objectMapper.readTree(mcp.getConfigJson());JsonNode serversNode = root.path("mcpServers");if (serversNode.isObject() && serversNode.size() > 0) {// 獲取第一個服務器名稱作為mcp名稱String serverName = serversNode.fieldNames().next();mcp.setName(serverName);// 獲取服務器類型JsonNode serverNode = serversNode.path(serverName);String type = serverNode.path("type").asText("");if ("sse".equalsIgnoreCase(type) || "stdio".equalsIgnoreCase(type)) {mcp.setContentType(type);}}} catch (Exception e) {// 日志輸出}aiMcpMapper.updateById(mcp);}public void deleteMcp(Long id, DynamicMcpClientManager dynamicMcpClientManager) {AiMcp mcp = aiMcpMapper.selectById(id);if (mcp != null) {// 解析 configJson,提取所有 servertry {ObjectMapper objectMapper = new ObjectMapper();JsonNode root = objectMapper.readTree(mcp.getConfigJson());JsonNode serversNode = root.path("mcpServers");if (serversNode.isObject()) {serversNode.fields().forEachRemaining(entry -> {String serverName = entry.getKey();dynamicMcpClientManager.removeServerConfig(serverName);});}} catch (Exception e) {// 日志輸出}aiMcpMapper.deleteById(id);}}public AiMcp getById(Long id) {return aiMcpMapper.selectById(id);}public AiMcp getByName(String name) {QueryWrapper<AiMcp> query = new QueryWrapper<>();query.eq("name", name);return aiMcpMapper.selectOne(query);}public Page<AiMcp> pageQuery(String name, int page, int size) {QueryWrapper<AiMcp> query = new QueryWrapper<>();if (name != null && !name.isEmpty()) {query.like("name", name);}query.orderByDesc("update_time");return aiMcpMapper.selectPage(new Page<>(page, size), query);}
}

5.創建mcp管理類(核心)

/*** 動態MCP客戶端管理器,支持熱加載和配置變更。** <p>此服務提供以下功能:* <ul>* <li>動態讀取MCP服務器配置</li>* <li>自動連接和重連MCP服務器</li>* <li>實時發現新工具</li>* <li>配置變更時自動更新連接</li>* <li>健康檢查和故障恢復</li>* </ul>** @author Spring AI Team* @since 1.1.0*/
@Service
public class DynamicMcpClientManager implements DisposableBean {private static final Logger logger = LoggerFactory.getLogger(DynamicMcpClientManager.class);/*** 活躍的MCP客戶端,key為服務器名,value為McpSyncClient實例*/private final Map<String, McpSyncClient> activeClients = new ConcurrentHashMap<>();/*** 當前連接狀態的配置,key為服務器名,value為ServerConfig*/private final Map<String, ServerConfig> currentConfigs = new ConcurrentHashMap<>();/*** 定時任務調度器*/private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);/*** AiMcpService用于數據庫操作*/private final AiMcpService aiMcpService;/*** 緩存的工具回調列表,定期刷新*/private volatile List<SyncMcpToolCallback> cachedToolCallbacks = new ArrayList<>();/*** 上次工具回調刷新的時間戳*/private volatile long lastToolRefresh = 0;/*** 工具回調緩存的有效期(毫秒)*/private static final long TOOL_CACHE_TTL = 30000; // 30秒緩存/*** 最大重連次數,所有重連邏輯統一引用該常量*/private static final int MAX_RECONNECT_ATTEMPTS = 3;public DynamicMcpClientManager(AiMcpService aiMcpService) {this.aiMcpService = aiMcpService;}@PostConstructpublic void init() {loadSpringAiMcpConfiguration();
//        scheduler.scheduleWithFixedDelay(this::loadSpringAiMcpConfiguration, 30, 30, TimeUnit.SECONDS);scheduler.scheduleWithFixedDelay(this::performHealthCheck, 30, 30, TimeUnit.SECONDS);logger.info("動態MCP客戶端管理器已啟動");}/*** 從數據庫加載MCP配置,只有配置實際變更時才調用updateServerConfigs*/public void loadSpringAiMcpConfiguration() {logger.info("從數據庫加載MCP配置");Map<String, ServerConfig> newConfigs = new ConcurrentHashMap<>();try {for (AiMcp mcp : aiMcpService.getAllEnabledMcp()) {if (mcp.getConfigJson() == null || mcp.getConfigJson().isEmpty()) {logger.warn("MCP配置[{}]缺少configJson,跳過", mcp.getName());continue;}try {JsonNode root = new ObjectMapper().readTree(mcp.getConfigJson());JsonNode serversNode = root.path("mcpServers");if (serversNode.isMissingNode() || !serversNode.isObject()) {logger.warn("MCP配置[{}]的mcpServers字段缺失或格式錯誤,跳過", mcp.getName());continue;}serversNode.fields().forEachRemaining(entry -> {String serverName = entry.getKey();JsonNode serverNode = entry.getValue();String type = serverNode.path("type").asText("");ServerConfig config = ConversionUtil.parseServerConfigFromJson(serverName, serverNode, type);if (config != null) {newConfigs.put(serverName, config);}});} catch (Exception e) {logger.warn("MCP配置[{}]解析configJson失敗: {},跳過", mcp.getName(), e.getMessage());}}} catch (Exception e) {logger.error("從數據庫加載MCP配置失敗", e);}// 只有配置實際變更時才更新if (!newConfigs.equals(currentConfigs)) {updateServerConfigs(newConfigs);} else {logger.debug("MCP配置無變更,無需更新");}}/*** 更新服務器配置并重新連接。重連次數由MAX_RECONNECT_ATTEMPTS控制。*/public synchronized void updateServerConfigs(Map<String, ServerConfig> newConfigs) {logger.info("更新MCP服務器配置,共 {} 個服務器", newConfigs.size());// 移除不再存在的服務器currentConfigs.keySet().removeIf(serverName -> {if (!newConfigs.containsKey(serverName)) {disconnectServer(serverName);return true;}return false;});// 添加或更新服務器配置for (Map.Entry<String, ServerConfig> entry : newConfigs.entrySet()) {String serverName = entry.getKey();ServerConfig newConfig = entry.getValue();ServerConfig oldConfig = currentConfigs.get(serverName);// 如果配置發生變化或新加,嘗試連接if (oldConfig == null || !configEquals(oldConfig, newConfig)) {currentConfigs.put(serverName, newConfig);if (newConfig.isEnabled()) {boolean connected = false;int attempts = 0;while (!connected && attempts < MAX_RECONNECT_ATTEMPTS) {attempts++;logger.info("[重連策略] 連接/重連MCP服務器[{}],第{}次嘗試...", serverName, attempts);connectServer(serverName, newConfig);if (activeClients.containsKey(serverName)) {logger.info("[重連策略] 連接/重連MCP服務器[{}]成功", serverName);connected = true;} else {logger.warn("[重連策略] 連接/重連MCP服務器[{}]失敗,等待2分鐘后重試...", serverName);try {Thread.sleep(120_000);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}if (!connected) {logger.error("[重連策略] MCP服務器[{}]重連已達最大次數({}),將自動禁用該服務", serverName, MAX_RECONNECT_ATTEMPTS);disableMcpInDatabase(serverName);}} else {disconnectServer(serverName);}}}// 清除工具緩存,強制重新加載invalidateToolCache();}/*** 連接到MCP服務器(重連和首次連接參數完全一致,重連前徹底釋放資源)*/private void connectServer(String serverName, ServerConfig config) {logger.info("連接到MCP服務器: {} ({})", serverName, config.getUrl());// 1. 徹底釋放舊資源disconnectServer(serverName);try {// 2. 構建全新 McpSyncClient,參數與首次一致io.modelcontextprotocol.client.McpSyncClient client;if (config.getUrl().startsWith("stdio://")) {logger.info("使用STDIO傳輸連接服務器: {}", serverName);// TODO: STDIO傳輸實現logger.warn("STDIO傳輸暫未實現,服務器: {}", serverName);return;} else {logger.info("使用SSE傳輸連接服務器: {} -> {} (endpoint: {})", serverName, config.getUrl(), config.getSseEndpoint());HttpClientSseClientTransport transport = HttpClientSseClientTransport.builder(config.getUrl()).clientBuilder(HttpClient.newBuilder()).sseEndpoint(config.getSseEndpoint()).build();client = McpClient.sync(transport).requestTimeout(config.getTimeout()).build();}// 3. 初始化客戶端client.initialize();activeClients.put(serverName, client);logger.info("成功連接到MCP服務器: {}", serverName);} catch (Exception e) {logger.error("連接MCP服務器失敗: {} - {} (url: {}, sseEndpoint: {})", serverName, e.getMessage(), config.getUrl(), config.getSseEndpoint(), e);}}/*** 徹底斷開MCP服務器連接,釋放所有資源*/private void disconnectServer(String serverName) {io.modelcontextprotocol.client.McpSyncClient client = activeClients.remove(serverName);if (client != null) {try {client.close();logger.info("已斷開MCP服務器連接: {}", serverName);} catch (Exception e) {logger.warn("斷開MCP服務器連接時出錯: {} - {}", serverName, e.getMessage());}}// 清理相關緩存// cachedToolCallbacks = new ArrayList<>(); // 若有必要可清理}/*** 獲取所有活躍的客戶端*/public List<McpSyncClient> getActiveClients() {return new ArrayList<>(activeClients.values());}/*** 獲取所有可用的工具回調(帶緩存)*/public List<ToolCallback> getAvailableToolCallbacks() {long now = System.currentTimeMillis();if (now - lastToolRefresh > TOOL_CACHE_TTL || cachedToolCallbacks.isEmpty()) {refreshToolCallbacks();lastToolRefresh = now;}return new ArrayList<>(cachedToolCallbacks);}/*** 強制刷新工具回調,重新從所有活躍客戶端獲取工具*/public synchronized void refreshToolCallbacks() {logger.info("刷新MCP工具回調");List<McpSyncClient> clients = getActiveClients();if (clients.isEmpty()) {cachedToolCallbacks = new ArrayList<>();return;}try {logger.info("準備調用createToolCallbacks");List<SyncMcpToolCallback> newCallbacks = SimpleMcpToolCallbackProvider.createToolCallbacks(clients);logger.info("createToolCallbacks調用工具結果:{}", newCallbacks.size());cachedToolCallbacks = newCallbacks;logger.info("刷新{}個工具", newCallbacks.size());} catch (Exception e) {logger.error("刷新工具回調失敗", e);}}/*** 獲取指定Mcp服務的工具列表*/public List<SyncMcpToolCallback> getToolByMcpName(String mcpName) {McpSyncClient client = activeClients.get(mcpName);if (client == null) {return new ArrayList<>();}try {return SimpleMcpToolCallbackProvider.getToolCallbacksByClientName(client);} catch (Exception e) {return new ArrayList<>();}}/*** 清除工具緩存*/public void invalidateToolCache() {lastToolRefresh = 0;logger.debug("工具緩存已清除");}/*** 執行健康檢查*/private void performHealthCheck() {logger.info("執行MCP客戶端健康檢查");List<String> unhealthyServers = new ArrayList<>();for (Map.Entry<String, McpSyncClient> entry : activeClients.entrySet()) {String serverName = entry.getKey();McpSyncClient client = entry.getValue();try {List<McpSchema.Tool> tools = client.listTools().tools();//logger.info("檢查工具:{}", tools.toString());} catch (Exception e) {logger.warn("MCP服務器 {} 健康檢查失敗: {},將嘗試重連", serverName, e.getMessage());unhealthyServers.add(serverName);}}// 只負責發現斷開,重連交給 reconnectWithLimitfor (String serverName : unhealthyServers) {ServerConfig config = currentConfigs.get(serverName);if (config != null && config.isEnabled()) {logger.info("MCP服務器 {} 已斷開,將嘗試重連", serverName);reconnectWithLimit(serverName, config);}}}/*** 比較兩個配置是否相等*/private boolean configEquals(ServerConfig config1, ServerConfig config2) {if (config1 == config2) return true;if (config1 == null || config2 == null) return false;return java.util.Objects.equals(config1.getUrl(), config2.getUrl()) &&java.util.Objects.equals(config1.getSseEndpoint(), config2.getSseEndpoint()) &&java.util.Objects.equals(config1.getHeaders(), config2.getHeaders()) &&java.util.Objects.equals(config1.getTimeout(), config2.getTimeout()) &&config1.isEnabled() == config2.isEnabled();}private void reconnectWithLimit(String serverName, ServerConfig config) {int attempts = 0;boolean connected = false;while (attempts < MAX_RECONNECT_ATTEMPTS && !connected) {attempts++;logger.warn("重連MCP服務器[{}],第{}次嘗試...,最大{}次", serverName, attempts, MAX_RECONNECT_ATTEMPTS);try {connectServer(serverName, config);if (activeClients.containsKey(serverName)) {logger.info("重連MCP服務器[{}]成功", serverName);connected = true;}} catch (Exception e) {logger.error("重連MCP服務器[{}]失敗: {}", serverName, e.getMessage());}}if (!connected) {logger.error("MCP服務器[{}]重連已達最大次數({}),將自動禁用該服務", serverName, MAX_RECONNECT_ATTEMPTS);disableMcpInDatabase(serverName);}}private void disableMcpInDatabase(String serverName) {try {org.lanyu.springainovel.entity.AiMcp mcp = aiMcpService.getByName(serverName);if (mcp != null && mcp.getStatus() != null && mcp.getStatus() != 0) {mcp.setStatus(0);aiMcpService.updateMcp(mcp);logger.warn("已將MCP服務[{}]在數據庫中禁用(status=0)", serverName);}// 同步禁用緩存ServerConfigServerConfig config = currentConfigs.get(serverName);if (config != null && config.isEnabled()) {config.setEnabled(false);logger.warn("已將MCP服務[{}]在緩存中禁用(enable=false)", serverName);}// 同步禁用McpConfigurationService的lastKnownConfigstry {// 反射獲取AiMcpService中的McpConfigurationServicejava.lang.reflect.Field field = aiMcpService.getClass().getDeclaredField("mcpConfigurationService");field.setAccessible(true);Object mcpConfigServiceObj = field.get(aiMcpService);if (mcpConfigServiceObj != null) {java.lang.reflect.Field lastKnownConfigsField = mcpConfigServiceObj.getClass().getDeclaredField("lastKnownConfigs");lastKnownConfigsField.setAccessible(true);@SuppressWarnings("unchecked")Map<String, ServerConfig> lastKnownConfigs = (Map<String, ServerConfig>) lastKnownConfigsField.get(mcpConfigServiceObj);ServerConfig lastConfig = lastKnownConfigs.get(serverName);if (lastConfig != null && lastConfig.isEnabled()) {lastConfig.setEnabled(false);logger.warn("已將MCP服務[{}]在lastKnownConfigs中禁用(enable=false)", serverName);}}} catch (Exception e) {logger.error("同步禁用lastKnownConfigs緩存失敗: {}", e.getMessage());}} catch (Exception e) {logger.error("禁用MCP服務[{}]數據庫操作失敗: {}", serverName, e.getMessage());}}/*** 新增或更新單個MCP服務器配置*/public synchronized void addOrUpdateServerConfig(String serverName, ServerConfig config) {Map<String, ServerConfig> newConfigs = new ConcurrentHashMap<>(currentConfigs);newConfigs.put(serverName, config);updateServerConfigs(newConfigs);}/*** 移除單個MCP服務器配置*/public synchronized void removeServerConfig(String serverName) {Map<String, ServerConfig> newConfigs = new ConcurrentHashMap<>(currentConfigs);newConfigs.remove(serverName);updateServerConfigs(newConfigs);}@Overridepublic void destroy() throws Exception {logger.info("關閉動態MCP客戶端管理器");// 關閉調度器scheduler.shutdown();try {if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {scheduler.shutdownNow();}} catch (InterruptedException e) {scheduler.shutdownNow();Thread.currentThread().interrupt();}// 關閉所有客戶端連接for (String serverName : new ArrayList<>(activeClients.keySet())) {disconnectServer(serverName);}}
}
/*** 簡化的MCP工具回調提供者,用于演示目的。** <p>此類提供了從MCP客戶端創建工具回調的基本功能。** @author Spring AI Team* @since 1.1.0*/
public class SimpleMcpToolCallbackProvider {private static final Logger logger = LoggerFactory.getLogger(SimpleMcpToolCallbackProvider.class);private static final ObjectMapper objectMapper = new ObjectMapper();/*** 從MCP客戶端列表創建工具回調列表*/public static List<SyncMcpToolCallback> createToolCallbacks(List<McpSyncClient> clients) {List<SyncMcpToolCallback> callbacks = new ArrayList<>();for (McpSyncClient client : clients) {try {// 獲取工具列表McpSchema.ListToolsResult toolsResult = client.listTools();if (toolsResult != null && toolsResult.tools() != null) {for (McpSchema.Tool tool : toolsResult.tools()) {callbacks.add(new SyncMcpToolCallback(client, tool));}}} catch (Exception e) {logger.error("獲取客戶端工具失敗", e);}}return callbacks;}/*** 從MCP客戶端列表創建工具回調列表*/public static List<SyncMcpToolCallback> getToolCallbacksByClientName(McpSyncClient client) {List<SyncMcpToolCallback> callbacks = new ArrayList<>();try {// 獲取工具列表McpSchema.ListToolsResult toolsResult = client.listTools();if (toolsResult != null && toolsResult.tools() != null) {for (McpSchema.Tool tool : toolsResult.tools()) {callbacks.add(new SyncMcpToolCallback(client, tool));}}} catch (Exception e) {logger.error("獲取客戶端工具失敗", e);}return callbacks;}/*** 簡化的MCP工具回調實現*/public static class SimpleMcpToolCallback implements ToolCallback {private final McpSyncClient client;private final McpSchema.Tool tool;private final ToolDefinition toolDefinition;public SimpleMcpToolCallback(McpSyncClient client, McpSchema.Tool tool) {this.client = client;this.tool = tool;this.toolDefinition = ToolDefinition.builder().name(tool.name()).description(tool.description()).inputSchema(tool.inputSchema() != null ? tool.inputSchema().toString() : "{}").build();}@Overridepublic ToolDefinition getToolDefinition() {return toolDefinition;}@Overridepublic String call(String arguments) {try {// 解析參數@SuppressWarnings("unchecked")Map<String, Object> args = objectMapper.readValue(arguments, Map.class);// 調用MCP工具McpSchema.CallToolRequest request = new McpSchema.CallToolRequest(tool.name(), args);McpSchema.CallToolResult result = client.callTool(request);if (result != null && result.content() != null && !result.content().isEmpty()) {// 提取文本內容StringBuilder response = new StringBuilder();for (Object content : result.content()) {if (content instanceof McpSchema.TextContent) {McpSchema.TextContent textContent = (McpSchema.TextContent) content;response.append(textContent.text());} else {response.append(content.toString());}}return response.toString();} else {return "工具執行完成,無返回內容";}} catch (Exception e) {logger.error("調用MCP工具失敗: {}", tool.name(), e);return "錯誤:" + e.getMessage();}}}
}
/*** 服務器配置類*/
public class ServerConfig {public ServerConfig() { super(); }private String name;private String url;private String sseEndpoint; // SSE端點,默認為/sseprivate Map<String, String> headers;private Duration timeout = Duration.ofSeconds(30);private boolean enabled = true;// Getters and Setterspublic String getName() { return name; }public void setName(String name) { this.name = name; }public String getUrl() { return url; }public void setUrl(String url) { this.url = url; }public String getSseEndpoint() { return sseEndpoint; }public void setSseEndpoint(String sseEndpoint) { this.sseEndpoint = sseEndpoint; }public Map<String, String> getHeaders() { return headers; }public void setHeaders(Map<String, String> headers) { this.headers = headers; }public Duration getTimeout() { return timeout; }public void setTimeout(Duration timeout) { this.timeout = timeout; }public boolean isEnabled() { return enabled; }public void setEnabled(boolean enabled) { this.enabled = enabled; }/*** 獲取完整的SSE URL(基礎URL + SSE端點)*/public String getFullSseUrl() {if (url == null) return null;if (url.startsWith("stdio://")) return url;String baseUrl = url.endsWith("/") ? url.substring(0, url.length() - 1) : url;String endpoint = sseEndpoint.startsWith("/") ? sseEndpoint : "/" + sseEndpoint;return baseUrl + endpoint;}
}

6.Mcp的相關接口


/*** MCP工具控制器*/
@Tag(name = "mcp工具", description = "供統一的REST API來調用MCP工具")
@RestController
@RequestMapping("/mcp")
public class McpToolController {private static final Logger logger = LoggerFactory.getLogger(McpToolController.class);/*** 動態MCP客戶端管理器,負責動態連接和管理MCP服務器*/@Autowired(required = false)private DynamicMcpClientManager dynamicClientManager;/*** AiMcpService,負責MCP工具的數據庫操作*/@Autowiredprivate AiMcpService aiMcpService;public McpToolController() {super(); // 默認無參構造,調用父類構造函數}/*** 獲取當前有效的工具回調數組,優先使用動態管理器** @return ToolCallback[] 當前可用的工具回調*/private ToolCallback[] getCurrentToolCallbacks() {List<ToolCallback> dynamicTools = dynamicClientManager.getAvailableToolCallbacks();if (!dynamicTools.isEmpty()) {return dynamicTools.toArray(new ToolCallback[0]);}return new ToolCallback[0];}/*** 獲取MCP配置狀態** @return 配置狀態信息*/@Operation(summary = "獲取MCP配置狀態", description = "返回當前MCP工具的配置模式、工具數量、可用工具列表等狀態信息。")@GetMapping("/status")public Map<String, Object> getStatus() {Map<String, Object> status = new HashMap<>();ToolCallback[] currentCallbacks = getCurrentToolCallbacks();status.put("toolCount", currentCallbacks.length);status.put("hasDynamicManager", dynamicClientManager != null);// 添加工具列表List<Map<String, String>> toolList = new ArrayList<>();for (ToolCallback callback : currentCallbacks) {Map<String, String> tool = new HashMap<>();tool.put("name", callback.getToolDefinition().name());tool.put("description", callback.getToolDefinition().description());toolList.add(tool);}status.put("tools", toolList);logger.debug("📊 狀態查詢 - 工具數: {}", currentCallbacks.length);return status;}/*** 獲取可用工具列表** @return 工具列表*/@Operation(summary = "獲取可用MCP工具列表", description = "返回當前可用的MCP工具及其描述、輸入參數等信息。")@GetMapping("/tools")public Map<String, Object> getTools() {Map<String, Object> result = new HashMap<>();try {ToolCallback[] currentCallbacks = getCurrentToolCallbacks();List<Map<String, Object>> toolList = new ArrayList<>();for (ToolCallback callback : currentCallbacks) {Map<String, Object> tool = new HashMap<>();tool.put("name", callback.getToolDefinition().name());tool.put("description", callback.getToolDefinition().description());tool.put("inputSchema", callback.getToolDefinition().inputSchema());toolList.add(tool);}result.put("success", true);result.put("toolCount", currentCallbacks.length);result.put("tools", toolList);logger.debug("🔧 工具列表查詢 - 返回 {} 個工具", currentCallbacks.length);} catch (Exception e) {logger.error("? 獲取工具列表失敗", e);result.put("success", false);result.put("message", "獲取工具列表失敗: " + e.getMessage());result.put("toolCount", 0);result.put("tools", new ArrayList<>());}return result;}/*** 獲取可用工具列表** @return 工具列表*/@Operation(summary = "獲取指定MCP工具列表", description = "返回當前MCP服務的工具及其描述、輸入參數等信息。")@GetMapping("/toolsByName")public Map<String, Object> toolsByName(@RequestParam String mcpName) {Map<String, Object> result = new HashMap<>();List<SyncMcpToolCallback> tools = dynamicClientManager.getToolByMcpName(mcpName);List<Map<String, Object>> toolList = new ArrayList<>();for (ToolCallback callback : tools) {Map<String, Object> tool = new HashMap<>();tool.put("name", callback.getToolDefinition().name());tool.put("description", callback.getToolDefinition().description());tool.put("inputSchema", callback.getToolDefinition().inputSchema());toolList.add(tool);}result.put("success", true);result.put("toolCount", tools.size());result.put("tools", toolList);logger.debug("🔧 工具列表查詢 - 返回 {} 個工具", tools.size());return result;}/*** 調用MCP工具** @param toolName  工具名稱* @param arguments 工具參數(JSON格式)* @return 調用結果*/@Operation(summary = "調用指定MCP工具", description = "根據工具名稱和參數調用MCP工具,返回調用結果。參數為JSON字符串。")@PostMapping("/call/{toolName}")public Map<String, Object> callTool(@PathVariable String toolName, @RequestBody String arguments) {Map<String, Object> result = new HashMap<>();try {ToolCallback[] currentCallbacks = getCurrentToolCallbacks();if (currentCallbacks.length == 0) {result.put("success", false);result.put("message", "當前沒有可用的MCP工具");return result;}// 查找指定的工具ToolCallback targetTool = null;for (ToolCallback callback : currentCallbacks) {if (callback.getToolDefinition().name().equals(toolName)) {targetTool = callback;break;}}if (targetTool == null) {result.put("success", false);result.put("message", "工具不存在: " + toolName);result.put("availableTools", Arrays.stream(currentCallbacks).map(cb -> cb.getToolDefinition().name()).toArray());return result;}logger.debug("🔧 調用工具: {}, 參數: {}", toolName, arguments);// 調用工具String toolResult = targetTool.call(arguments);result.put("success", true);result.put("toolName", toolName);result.put("result", toolResult);logger.info("? 工具調用成功: {} -> {}", toolName, toolResult);} catch (Exception e) {logger.error("? 工具調用失敗: {}", toolName, e);result.put("success", false);result.put("message", "調用失敗: " + e.getMessage());result.put("toolName", toolName);}return result;}/*** 刷新工具列表(重新從MCP服務器獲取)** @return 刷新結果*/@Operation(summary = "刷新MCP工具列表", description = "重新從MCP服務器獲取工具列表并刷新緩存。")@PostMapping("/refresh")public Map<String, Object> refreshTools() {Map<String, Object> result = new HashMap<>();try {if (dynamicClientManager != null) {logger.info("🔄 刷新MCP工具列表");dynamicClientManager.refreshToolCallbacks();List<ToolCallback> tools = dynamicClientManager.getAvailableToolCallbacks();result.put("success", true);result.put("message", "工具列表已刷新");result.put("toolCount", tools.size());logger.info("? 工具列表刷新完成,發現 {} 個工具", tools.size());} else {result.put("success", false);result.put("message", "動態管理器不可用");}} catch (Exception e) {logger.error("? 刷新工具列表失敗", e);result.put("success", false);result.put("message", "刷新失敗: " + e.getMessage());}return result;}/*** 新增MCP工具*/@Operation(summary = "新增MCP工具", description = "新增一條MCP工具記錄")@PostMapping("/entity")public RestVO<String> addMcp(@RequestBody AiMcp mcp) {try {aiMcpService.addMcp(mcp, dynamicClientManager);return RestVO.success("新增成功");} catch (Exception e) {return RestVO.fail("新增失敗: " + e.getMessage());}}/*** 修改MCP工具*/@Operation(summary = "修改MCP工具", description = "根據ID修改MCP工具記錄")@PutMapping("/entity/{id}")public RestVO<String> updateMcp(@PathVariable Long id, @RequestBody AiMcp mcp) {try {mcp.setId(id);aiMcpService.updateMcp(mcp);return RestVO.success("修改成功");} catch (Exception e) {return RestVO.fail("修改失敗: " + e.getMessage());}}/*** 刪除MCP工具*/@Operation(summary = "刪除MCP工具", description = "根據ID刪除MCP工具記錄")@DeleteMapping("/entity/{id}")public RestVO<String> deleteMcp(@PathVariable Long id) {try {aiMcpService.deleteMcp(id, dynamicClientManager);return RestVO.success("刪除成功");} catch (Exception e) {return RestVO.fail("刪除失敗: " + e.getMessage());}}/*** 分頁查詢MCP工具,支持名稱模糊搜索,按添加日期倒序*/@Operation(summary = "分頁查詢MCP工具", description = "分頁查詢MCP工具,支持名稱模糊搜索,按添加日期倒序排列")@GetMapping("/entity/page")public RestVO<Map<String, Object>> pageMcp(@RequestParam(defaultValue = "") String name,@RequestParam(defaultValue = "1") int page,@RequestParam(defaultValue = "10") int size) {try {Page<AiMcp> resultPage = aiMcpService.pageQuery(name, page, size);Map<String, Object> result = new HashMap<>();result.put("total", resultPage.getTotal());result.put("pages", resultPage.getPages());result.put("current", resultPage.getCurrent());result.put("size", resultPage.getSize());result.put("records", resultPage.getRecords());return RestVO.success(result);} catch (Exception e) {return RestVO.fail("分頁查詢失敗: " + e.getMessage());}}
}

7.測試mcp工具

添加一個高德地圖的mcp ,key從高德開放平臺獲取

{"mcpServers": {"amap-amap-sse": {"url": "https://mcp.amap.com","type": "sse","sseEndpoint": "/sse?key=xxxxxxxxxxxxxxxxxx"}}
}

查詢可用工具
在這里插入圖片描述

8.ChatClient接入toolCallbacks

private Flux<FluxVO> getFluxVOFlux(List<Message> messageList, AiModel myModel, QuestionVO body) {Prompt prompt = new Prompt(messageList);AtomicBoolean inThinking = new AtomicBoolean(false);StringBuffer outputText = body.getMemory() ? new StringBuffer() : null;ChatClient chatModel = myModel.getChatClient();// 1. 先構造 Publisher<ChatResponse>Flux<ChatResponse> publisher;//判斷是否需要啟用mcp工具if (body.getUseTools()) {List<ToolCallback> toolCallbacks = dynamicMcpClientManager.getAvailableToolCallbacks();publisher = chatModel.prompt(prompt).toolCallbacks(toolCallbacks).stream().chatResponse();} else {publisher = chatModel.prompt(prompt).stream().chatResponse();}// 主動推送一條“處理中”消息Flux<FluxVO> proactiveMsg = Flux.just(FluxVO.builder().text("").status("before").build());Flux<FluxVO> resp = Flux.from(publisher).doFirst(() -> {System.out.println("-------------開始輸出");if (body.getMemory()) {chatMemoryService.saveMessage(body);}}).doFinally(signalType -> {System.out.println("-------------流式處理結束");if (body.getMemory() && outputText != null) {chatMemoryService.saveMessage(body.getSessionId(), "ASSISTANT", outputText.toString(), body.getModel());}});return Flux.concat(proactiveMsg, resp);

getFluxVOFlux 之前的代碼可以參考同系列前幾章節

9.測試最終效果

在這里插入圖片描述

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/91748.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/91748.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/91748.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【PDF + ZIP 合并器:把ZIP文件打包至PDF文件中】

B站鏈接 PDF ZIP 合并器&#xff1a;把ZIP文件打包至PDF文件中_嗶哩嗶哩_bilibiliz 加強作者的工具 https://wwgw.lanzn.com/i8h1C32k9bef 密碼:30cv 新增c框架&#xff0c;加快運行速度

阿里云部署微調chatglm3

git Ifs install Git lfs 主要用于管理大型文件。在傳統的Git倉庫中&#xff0c;所有文件內容都會被完整記錄在每一次提交中&#xff0c;這會導致倉庫體積增大&#xff0c;克隆、拉取和推送操作變慢&#xff0c;甚至可能超出存儲限額。Git LFS通過將大文件替換成文本指針&#…

Linux網絡編程 ---五種IO模型

五種IO模型一、IO慢的原因二、五種IO模型三、如何設置非阻塞式IO&#xff1f;一、IO慢的原因 二、五種IO模型 阻塞式IO 非阻塞式IO 信號驅動IO 多路轉接 異步IO 三、如何設置非阻塞式IO&#xff1f; &#xff08;一&#xff09;用法說明 &#xff08;二&#xff0…

Obsidian結合CI/CD實現自動發布

CI/CDQuickAddJS腳本bat腳本sh腳本實現自動發版Hugo文章 需求來源 每次手動執行Hugo的命令&#xff0c;手動把public文件夾上傳到自己的服務器可以完成發版需求。 但是&#xff0c;作為一個內容創作者&#xff0c;我更希望的關注于自己的內容&#xff0c;而不是關注整個發版…

[硬件電路-141]:模擬電路 - 源電路,信號源與電源,能自己產生確定性波形的電路。

源電路&#xff08;Source Circuit&#xff09;是電子系統中為其他電路或負載提供特定信號或能量的基礎電路模塊&#xff0c;其核心功能是生成、調節或轉換所需的物理量&#xff08;如電壓、電流、波形、頻率等&#xff09;。以下是源電路的詳細解析&#xff1a;一、源電路的核…

Unity_數據持久化_PlayerPrefs基礎

Unity數據持久化 一、數據持久化基礎概念 1.1 什么是數據持久化 定義&#xff1a; 數據持久化就是將內存中的數據模型轉換為存儲模型&#xff0c;以及將存儲模型轉換為內存中的數據模型的統稱。 通俗解釋&#xff1a; 將游戲數據存儲到硬盤&#xff0c;硬盤中數據讀取到游戲中&…

什么是列存儲(Columnar Storage)?深度解析其原理與應用場景

列存儲的基本概念&#xff1a;顛覆傳統的數據組織方式列存儲&#xff08;Column Storage&#xff09;是一種革命性的數據庫存儲技術&#xff0c;它通過按列而非按行組織數據&#xff0c;從根本上改變了數據的物理存儲結構。與傳統行存儲數據庫不同&#xff0c;列式數據庫將每一…

機器人抓取流程介紹與實現——機器人抓取系統基礎系列(七)

機器人抓取系統基礎系列文章目錄 1. UR機械臂的ROS驅動安裝官方教程詳解——機器人抓取系統基礎系列&#xff08;一&#xff09; 2. MoveIt控制機械臂的運動實現——機器人抓取系統基礎系列&#xff08;二&#xff09; 3. 機器人&#xff08;機械臂&#xff09;的相機選型與安裝…

【Qt】QObject::startTimer: Timers cannot be started from another thread

QTimer對象的 start 函數調用必須和創建QTimer對象是同一個線程。 #include "QtTimerTest.h" #include <QDebug>QtTimerTest::QtTimerTest(QWidget *parent): QMainWindow(parent),m_timer(nullptr),m_timerThread(nullptr), m_workingThread(nullptr) {ui.set…

社會治安滿意度調查:為城市安全治理提供精準參考(滿意度調查公司)

在社會治理不斷深化的背景下&#xff0c;公眾對社會治安的感知與評價已成為衡量城市治理水平的重要維度&#xff08;社會治安滿意度調查&#xff09;&#xff08;公眾滿意度調查&#xff09;&#xff08;滿意度調查&#xff09;。為全面掌握市民對治安狀況的真實反饋&#xff0…

Python篇--- Python 的加載、緩存、覆蓋機制

要理解 import 與 if __name__ "__main__": 的關系&#xff0c;以及 Python 的加載、緩存、覆蓋機制&#xff0c;我們可以從 “模塊的兩種身份” 和 “導入的全過程” 入手&#xff0c;用通俗的例子一步步拆解。一、核心&#xff1a;模塊的 “雙重身份” 與 __name_…

Java設計模式之行為型模式(訪問者模式)應用場景分析

訪問者模式&#xff08;Visitor Pattern&#xff09;作為Java設計模式中的“隱形冠軍”&#xff0c;常被開發者低估其價值。這一模式通過“雙分派”機制巧妙解耦數據結構與操作&#xff0c;為復雜系統的擴展提供了強大武器。在大廠項目中&#xff0c;訪問者模式往往出現在業務邏…

【IDEA】JavaWeb自定義servlet模板

方法一&#xff1a;&#xff08;推薦去使用方法二&#xff0c;還能創建其它代碼模板&#xff09;使用servlet模板創建Servlet類如果創建時找不到servlet模板&#xff1a;File -> Project Structure然后應用 -> OK&#xff0c;如果還是找不到Servlet模板&#xff0c;看看項…

Linux選擇

在內存中運行著的進程稱為&#xff08; 服務 &#xff09;。負責控制systemd系統和服務管理器的工具為&#xff08; systemctl &#xff09;命令。systemd管理系統服務的基本單位是&#xff08; unit &#xff09;。分配和管理資源的基本單位是&#xff08; 進程 &#xf…

【Redis學習路|第一篇】初步認識Redis

概要: 深入探討NoSQL數據庫的核心特性&#xff0c;對比傳統關系型數據庫的差異&#xff0c;重點介紹Redis作為內存數據庫的優勢與應用場景。 文章目錄認識 NoSQLNoSQL vs SQL 對比1?? 結構化 vs 非結構化2?? 關聯 vs 非關聯3?? 查詢方式對比4?? 事務特性5?? 存儲方式…

java局域網聊天室小項目架構思路

java局域網聊天室小項目架構思路 項目需求 創建一個局域網聊天系統&#xff0c;要求&#xff1a;用戶在登錄界面登錄后進入聊天窗口界面&#xff0c;能實現多用戶同時在線聊天&#xff0c;并且用戶之間可以進行私聊 項目用到的技術棧 java網絡編程java多線程java面向對象編…

vulhub-corrosion2靶機

1.安裝靶機 https://download.vulnhub.com/corrosion/Corrosion2.ovahttps://download.vulnhub.com/corrosion/Corrosion2.ova 2.掃描IP 3.掃描端口 4.訪問端口 首先訪問一下80端口 訪問一個8080端口發現是一個apache的頁面 5.掃描目錄與漏洞探測 那么我們掃描一下目錄 80…

Mysql深入學習:慢sql執行

目錄 慢查詢日志 慢查詢主要步驟 11種慢查詢的場景分析 場景一&#xff1a;SQL 沒有建立索引 場景二&#xff1a;索引未生效的典型原因 場景三&#xff1a;LIMIT 深分頁導致性能下降 場景四&#xff1a;單表數據量過大導致 SQL 性能下降 場景五&#xff1a;ORDER BY 出現…

李宏毅深度學習教程 第8-9章 生成模型+擴散模型

【2025版】12 生成式對抗網絡GAN 一 – 基本概念介紹_嗶哩嗶哩_bilibili 目錄 1. GAN生成式對抗網絡 2. GAN的訓練 散度差異 3.WGAN 4.訓練GAN 5. 如何客觀評估GAN 6. 條件型生成&#xff08;按照要求&#xff09; 7. Cycle GAN&#xff08;互轉配對&#xff09; 8. d…

1.8 axios詳解

Axios的定義與核心特性Axios是一個基于Promise的現代化HTTP客戶端庫&#xff0c;主要用于在瀏覽器和Node.js 環境中發送HTTP請求&#xff0c;旨在簡化異步數據交互流程。其核心特性如下&#xff1a;跨平臺支持&#xff1a;在瀏覽器中通過XMLHttpRequest對象發送請求&#xff0c…