前言
官方示例的代碼中,mcp一般是配置到yml中或者json文件中,使用自動裝配的方式注入服務,這種方式不方便在程序啟動后添加新的服務,這里參考cherry studio的方式動態添加mcp服務
1.確定方案
-
mcp服務的維護放到mysql業務數據庫維護,前端通過json來添加mcp服務,為什么是json?因為開源的mcp服務都提供json示例,拿來即用
-
后端輪詢mcp服務確保服務可用狀態
-
前端動態切換模型時,根據模型是否支持工具調和是否啟用mcp來控制是否使用工具
-
一個mcp服務下可能有一系列的工具,提供一個查看mcp服務工具的頁面
2. pom依賴
<!-- Spring AI MCP 核心包 (包含ToolCallbackProvider) --><dependency><groupId>org.springframework.ai</groupId><artifactId>spring-ai-starter-mcp-client</artifactId></dependency><!-- Spring AI Model (ToolCallback接口等) --><dependency><groupId>org.springframework.ai</groupId><artifactId>spring-ai-model</artifactId><version>${spring-ai.version}</version></dependency>
其他的依賴見前面幾篇文章
3. 新增mcp表
CREATE TABLE `ai_mcp` (`id` bigint NOT NULL AUTO_INCREMENT,`content_type` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_german2_ci NOT NULL COMMENT '連接類型,sse,stdio',`name` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_german2_ci NOT NULL COMMENT 'mcp名稱',`description` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_german2_ci DEFAULT NULL COMMENT '功能描述',`config_json` json NOT NULL COMMENT '配置參數,json類型',`status` tinyint(1) DEFAULT NULL COMMENT '可用狀態,1可用,0不可用',`update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=11 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_german2_ci;
4.補充增刪改查
使用的mybatis-plus,省略mapper層
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.lanyu.springainovel.entity.AiMcp;
import org.lanyu.springainovel.entity.ServerConfig;
import org.lanyu.springainovel.mapper.AiMcpMapper;
import org.lanyu.springainovel.util.ConversionUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.sql.Timestamp;
import java.util.List;@Service
public class AiMcpService {public AiMcpService() {super();}@Autowiredprivate AiMcpMapper aiMcpMapper;public List<AiMcp> getAllEnabledMcp() {QueryWrapper<AiMcp> query = new QueryWrapper<>();query.eq("status", 1);return aiMcpMapper.selectList(query);}public void addMcp(AiMcp mcp, DynamicMcpClientManager dynamicMcpClientManager) {mcp.setUpdateTime(new Timestamp(System.currentTimeMillis()));// 從configJson中提取mcpServers作為mcp名稱和類型try {ObjectMapper objectMapper = new ObjectMapper();JsonNode root = objectMapper.readTree(mcp.getConfigJson());JsonNode serversNode = root.path("mcpServers");if (serversNode.isObject() && serversNode.size() > 0) {// 獲取第一個服務器名稱作為mcp名稱String serverName = serversNode.fieldNames().next();mcp.setName(serverName);// 獲取服務器類型JsonNode serverNode = serversNode.path(serverName);String type = serverNode.path("type").asText("");if ("sse".equalsIgnoreCase(type) || "stdio".equalsIgnoreCase(type)) {mcp.setContentType(type);}}} catch (Exception e) {// 日志輸出}aiMcpMapper.insert(mcp);if (mcp.getStatus() != null && mcp.getStatus() == 1) {// 解析 configJson,提取所有 servertry {ObjectMapper objectMapper = new ObjectMapper();JsonNode root = objectMapper.readTree(mcp.getConfigJson());JsonNode serversNode = root.path("mcpServers");if (serversNode.isObject()) {serversNode.fields().forEachRemaining(entry -> {String serverName = entry.getKey();JsonNode serverNode = entry.getValue();String type = serverNode.path("type").asText("");if ("sse".equalsIgnoreCase(type) || "stdio".equalsIgnoreCase(type)) {ServerConfig config = ConversionUtil.parseServerConfigFromJson(serverName, serverNode, type);if (config != null) {dynamicMcpClientManager.addOrUpdateServerConfig(serverName, config);}}});}} catch (Exception e) {// 日志輸出}}}public void updateMcp(AiMcp mcp) {mcp.setUpdateTime(new Timestamp(System.currentTimeMillis()));// 從configJson中提取mcpServers作為mcp名稱try {ObjectMapper objectMapper = new ObjectMapper();JsonNode root = objectMapper.readTree(mcp.getConfigJson());JsonNode serversNode = root.path("mcpServers");if (serversNode.isObject() && serversNode.size() > 0) {// 獲取第一個服務器名稱作為mcp名稱String serverName = serversNode.fieldNames().next();mcp.setName(serverName);// 獲取服務器類型JsonNode serverNode = serversNode.path(serverName);String type = serverNode.path("type").asText("");if ("sse".equalsIgnoreCase(type) || "stdio".equalsIgnoreCase(type)) {mcp.setContentType(type);}}} catch (Exception e) {// 日志輸出}aiMcpMapper.updateById(mcp);}public void deleteMcp(Long id, DynamicMcpClientManager dynamicMcpClientManager) {AiMcp mcp = aiMcpMapper.selectById(id);if (mcp != null) {// 解析 configJson,提取所有 servertry {ObjectMapper objectMapper = new ObjectMapper();JsonNode root = objectMapper.readTree(mcp.getConfigJson());JsonNode serversNode = root.path("mcpServers");if (serversNode.isObject()) {serversNode.fields().forEachRemaining(entry -> {String serverName = entry.getKey();dynamicMcpClientManager.removeServerConfig(serverName);});}} catch (Exception e) {// 日志輸出}aiMcpMapper.deleteById(id);}}public AiMcp getById(Long id) {return aiMcpMapper.selectById(id);}public AiMcp getByName(String name) {QueryWrapper<AiMcp> query = new QueryWrapper<>();query.eq("name", name);return aiMcpMapper.selectOne(query);}public Page<AiMcp> pageQuery(String name, int page, int size) {QueryWrapper<AiMcp> query = new QueryWrapper<>();if (name != null && !name.isEmpty()) {query.like("name", name);}query.orderByDesc("update_time");return aiMcpMapper.selectPage(new Page<>(page, size), query);}
}
5.創建mcp管理類(核心)
/*** 動態MCP客戶端管理器,支持熱加載和配置變更。** <p>此服務提供以下功能:* <ul>* <li>動態讀取MCP服務器配置</li>* <li>自動連接和重連MCP服務器</li>* <li>實時發現新工具</li>* <li>配置變更時自動更新連接</li>* <li>健康檢查和故障恢復</li>* </ul>** @author Spring AI Team* @since 1.1.0*/
@Service
public class DynamicMcpClientManager implements DisposableBean {private static final Logger logger = LoggerFactory.getLogger(DynamicMcpClientManager.class);/*** 活躍的MCP客戶端,key為服務器名,value為McpSyncClient實例*/private final Map<String, McpSyncClient> activeClients = new ConcurrentHashMap<>();/*** 當前連接狀態的配置,key為服務器名,value為ServerConfig*/private final Map<String, ServerConfig> currentConfigs = new ConcurrentHashMap<>();/*** 定時任務調度器*/private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);/*** AiMcpService用于數據庫操作*/private final AiMcpService aiMcpService;/*** 緩存的工具回調列表,定期刷新*/private volatile List<SyncMcpToolCallback> cachedToolCallbacks = new ArrayList<>();/*** 上次工具回調刷新的時間戳*/private volatile long lastToolRefresh = 0;/*** 工具回調緩存的有效期(毫秒)*/private static final long TOOL_CACHE_TTL = 30000; // 30秒緩存/*** 最大重連次數,所有重連邏輯統一引用該常量*/private static final int MAX_RECONNECT_ATTEMPTS = 3;public DynamicMcpClientManager(AiMcpService aiMcpService) {this.aiMcpService = aiMcpService;}@PostConstructpublic void init() {loadSpringAiMcpConfiguration();
// scheduler.scheduleWithFixedDelay(this::loadSpringAiMcpConfiguration, 30, 30, TimeUnit.SECONDS);scheduler.scheduleWithFixedDelay(this::performHealthCheck, 30, 30, TimeUnit.SECONDS);logger.info("動態MCP客戶端管理器已啟動");}/*** 從數據庫加載MCP配置,只有配置實際變更時才調用updateServerConfigs*/public void loadSpringAiMcpConfiguration() {logger.info("從數據庫加載MCP配置");Map<String, ServerConfig> newConfigs = new ConcurrentHashMap<>();try {for (AiMcp mcp : aiMcpService.getAllEnabledMcp()) {if (mcp.getConfigJson() == null || mcp.getConfigJson().isEmpty()) {logger.warn("MCP配置[{}]缺少configJson,跳過", mcp.getName());continue;}try {JsonNode root = new ObjectMapper().readTree(mcp.getConfigJson());JsonNode serversNode = root.path("mcpServers");if (serversNode.isMissingNode() || !serversNode.isObject()) {logger.warn("MCP配置[{}]的mcpServers字段缺失或格式錯誤,跳過", mcp.getName());continue;}serversNode.fields().forEachRemaining(entry -> {String serverName = entry.getKey();JsonNode serverNode = entry.getValue();String type = serverNode.path("type").asText("");ServerConfig config = ConversionUtil.parseServerConfigFromJson(serverName, serverNode, type);if (config != null) {newConfigs.put(serverName, config);}});} catch (Exception e) {logger.warn("MCP配置[{}]解析configJson失敗: {},跳過", mcp.getName(), e.getMessage());}}} catch (Exception e) {logger.error("從數據庫加載MCP配置失敗", e);}// 只有配置實際變更時才更新if (!newConfigs.equals(currentConfigs)) {updateServerConfigs(newConfigs);} else {logger.debug("MCP配置無變更,無需更新");}}/*** 更新服務器配置并重新連接。重連次數由MAX_RECONNECT_ATTEMPTS控制。*/public synchronized void updateServerConfigs(Map<String, ServerConfig> newConfigs) {logger.info("更新MCP服務器配置,共 {} 個服務器", newConfigs.size());// 移除不再存在的服務器currentConfigs.keySet().removeIf(serverName -> {if (!newConfigs.containsKey(serverName)) {disconnectServer(serverName);return true;}return false;});// 添加或更新服務器配置for (Map.Entry<String, ServerConfig> entry : newConfigs.entrySet()) {String serverName = entry.getKey();ServerConfig newConfig = entry.getValue();ServerConfig oldConfig = currentConfigs.get(serverName);// 如果配置發生變化或新加,嘗試連接if (oldConfig == null || !configEquals(oldConfig, newConfig)) {currentConfigs.put(serverName, newConfig);if (newConfig.isEnabled()) {boolean connected = false;int attempts = 0;while (!connected && attempts < MAX_RECONNECT_ATTEMPTS) {attempts++;logger.info("[重連策略] 連接/重連MCP服務器[{}],第{}次嘗試...", serverName, attempts);connectServer(serverName, newConfig);if (activeClients.containsKey(serverName)) {logger.info("[重連策略] 連接/重連MCP服務器[{}]成功", serverName);connected = true;} else {logger.warn("[重連策略] 連接/重連MCP服務器[{}]失敗,等待2分鐘后重試...", serverName);try {Thread.sleep(120_000);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}if (!connected) {logger.error("[重連策略] MCP服務器[{}]重連已達最大次數({}),將自動禁用該服務", serverName, MAX_RECONNECT_ATTEMPTS);disableMcpInDatabase(serverName);}} else {disconnectServer(serverName);}}}// 清除工具緩存,強制重新加載invalidateToolCache();}/*** 連接到MCP服務器(重連和首次連接參數完全一致,重連前徹底釋放資源)*/private void connectServer(String serverName, ServerConfig config) {logger.info("連接到MCP服務器: {} ({})", serverName, config.getUrl());// 1. 徹底釋放舊資源disconnectServer(serverName);try {// 2. 構建全新 McpSyncClient,參數與首次一致io.modelcontextprotocol.client.McpSyncClient client;if (config.getUrl().startsWith("stdio://")) {logger.info("使用STDIO傳輸連接服務器: {}", serverName);// TODO: STDIO傳輸實現logger.warn("STDIO傳輸暫未實現,服務器: {}", serverName);return;} else {logger.info("使用SSE傳輸連接服務器: {} -> {} (endpoint: {})", serverName, config.getUrl(), config.getSseEndpoint());HttpClientSseClientTransport transport = HttpClientSseClientTransport.builder(config.getUrl()).clientBuilder(HttpClient.newBuilder()).sseEndpoint(config.getSseEndpoint()).build();client = McpClient.sync(transport).requestTimeout(config.getTimeout()).build();}// 3. 初始化客戶端client.initialize();activeClients.put(serverName, client);logger.info("成功連接到MCP服務器: {}", serverName);} catch (Exception e) {logger.error("連接MCP服務器失敗: {} - {} (url: {}, sseEndpoint: {})", serverName, e.getMessage(), config.getUrl(), config.getSseEndpoint(), e);}}/*** 徹底斷開MCP服務器連接,釋放所有資源*/private void disconnectServer(String serverName) {io.modelcontextprotocol.client.McpSyncClient client = activeClients.remove(serverName);if (client != null) {try {client.close();logger.info("已斷開MCP服務器連接: {}", serverName);} catch (Exception e) {logger.warn("斷開MCP服務器連接時出錯: {} - {}", serverName, e.getMessage());}}// 清理相關緩存// cachedToolCallbacks = new ArrayList<>(); // 若有必要可清理}/*** 獲取所有活躍的客戶端*/public List<McpSyncClient> getActiveClients() {return new ArrayList<>(activeClients.values());}/*** 獲取所有可用的工具回調(帶緩存)*/public List<ToolCallback> getAvailableToolCallbacks() {long now = System.currentTimeMillis();if (now - lastToolRefresh > TOOL_CACHE_TTL || cachedToolCallbacks.isEmpty()) {refreshToolCallbacks();lastToolRefresh = now;}return new ArrayList<>(cachedToolCallbacks);}/*** 強制刷新工具回調,重新從所有活躍客戶端獲取工具*/public synchronized void refreshToolCallbacks() {logger.info("刷新MCP工具回調");List<McpSyncClient> clients = getActiveClients();if (clients.isEmpty()) {cachedToolCallbacks = new ArrayList<>();return;}try {logger.info("準備調用createToolCallbacks");List<SyncMcpToolCallback> newCallbacks = SimpleMcpToolCallbackProvider.createToolCallbacks(clients);logger.info("createToolCallbacks調用工具結果:{}", newCallbacks.size());cachedToolCallbacks = newCallbacks;logger.info("刷新{}個工具", newCallbacks.size());} catch (Exception e) {logger.error("刷新工具回調失敗", e);}}/*** 獲取指定Mcp服務的工具列表*/public List<SyncMcpToolCallback> getToolByMcpName(String mcpName) {McpSyncClient client = activeClients.get(mcpName);if (client == null) {return new ArrayList<>();}try {return SimpleMcpToolCallbackProvider.getToolCallbacksByClientName(client);} catch (Exception e) {return new ArrayList<>();}}/*** 清除工具緩存*/public void invalidateToolCache() {lastToolRefresh = 0;logger.debug("工具緩存已清除");}/*** 執行健康檢查*/private void performHealthCheck() {logger.info("執行MCP客戶端健康檢查");List<String> unhealthyServers = new ArrayList<>();for (Map.Entry<String, McpSyncClient> entry : activeClients.entrySet()) {String serverName = entry.getKey();McpSyncClient client = entry.getValue();try {List<McpSchema.Tool> tools = client.listTools().tools();//logger.info("檢查工具:{}", tools.toString());} catch (Exception e) {logger.warn("MCP服務器 {} 健康檢查失敗: {},將嘗試重連", serverName, e.getMessage());unhealthyServers.add(serverName);}}// 只負責發現斷開,重連交給 reconnectWithLimitfor (String serverName : unhealthyServers) {ServerConfig config = currentConfigs.get(serverName);if (config != null && config.isEnabled()) {logger.info("MCP服務器 {} 已斷開,將嘗試重連", serverName);reconnectWithLimit(serverName, config);}}}/*** 比較兩個配置是否相等*/private boolean configEquals(ServerConfig config1, ServerConfig config2) {if (config1 == config2) return true;if (config1 == null || config2 == null) return false;return java.util.Objects.equals(config1.getUrl(), config2.getUrl()) &&java.util.Objects.equals(config1.getSseEndpoint(), config2.getSseEndpoint()) &&java.util.Objects.equals(config1.getHeaders(), config2.getHeaders()) &&java.util.Objects.equals(config1.getTimeout(), config2.getTimeout()) &&config1.isEnabled() == config2.isEnabled();}private void reconnectWithLimit(String serverName, ServerConfig config) {int attempts = 0;boolean connected = false;while (attempts < MAX_RECONNECT_ATTEMPTS && !connected) {attempts++;logger.warn("重連MCP服務器[{}],第{}次嘗試...,最大{}次", serverName, attempts, MAX_RECONNECT_ATTEMPTS);try {connectServer(serverName, config);if (activeClients.containsKey(serverName)) {logger.info("重連MCP服務器[{}]成功", serverName);connected = true;}} catch (Exception e) {logger.error("重連MCP服務器[{}]失敗: {}", serverName, e.getMessage());}}if (!connected) {logger.error("MCP服務器[{}]重連已達最大次數({}),將自動禁用該服務", serverName, MAX_RECONNECT_ATTEMPTS);disableMcpInDatabase(serverName);}}private void disableMcpInDatabase(String serverName) {try {org.lanyu.springainovel.entity.AiMcp mcp = aiMcpService.getByName(serverName);if (mcp != null && mcp.getStatus() != null && mcp.getStatus() != 0) {mcp.setStatus(0);aiMcpService.updateMcp(mcp);logger.warn("已將MCP服務[{}]在數據庫中禁用(status=0)", serverName);}// 同步禁用緩存ServerConfigServerConfig config = currentConfigs.get(serverName);if (config != null && config.isEnabled()) {config.setEnabled(false);logger.warn("已將MCP服務[{}]在緩存中禁用(enable=false)", serverName);}// 同步禁用McpConfigurationService的lastKnownConfigstry {// 反射獲取AiMcpService中的McpConfigurationServicejava.lang.reflect.Field field = aiMcpService.getClass().getDeclaredField("mcpConfigurationService");field.setAccessible(true);Object mcpConfigServiceObj = field.get(aiMcpService);if (mcpConfigServiceObj != null) {java.lang.reflect.Field lastKnownConfigsField = mcpConfigServiceObj.getClass().getDeclaredField("lastKnownConfigs");lastKnownConfigsField.setAccessible(true);@SuppressWarnings("unchecked")Map<String, ServerConfig> lastKnownConfigs = (Map<String, ServerConfig>) lastKnownConfigsField.get(mcpConfigServiceObj);ServerConfig lastConfig = lastKnownConfigs.get(serverName);if (lastConfig != null && lastConfig.isEnabled()) {lastConfig.setEnabled(false);logger.warn("已將MCP服務[{}]在lastKnownConfigs中禁用(enable=false)", serverName);}}} catch (Exception e) {logger.error("同步禁用lastKnownConfigs緩存失敗: {}", e.getMessage());}} catch (Exception e) {logger.error("禁用MCP服務[{}]數據庫操作失敗: {}", serverName, e.getMessage());}}/*** 新增或更新單個MCP服務器配置*/public synchronized void addOrUpdateServerConfig(String serverName, ServerConfig config) {Map<String, ServerConfig> newConfigs = new ConcurrentHashMap<>(currentConfigs);newConfigs.put(serverName, config);updateServerConfigs(newConfigs);}/*** 移除單個MCP服務器配置*/public synchronized void removeServerConfig(String serverName) {Map<String, ServerConfig> newConfigs = new ConcurrentHashMap<>(currentConfigs);newConfigs.remove(serverName);updateServerConfigs(newConfigs);}@Overridepublic void destroy() throws Exception {logger.info("關閉動態MCP客戶端管理器");// 關閉調度器scheduler.shutdown();try {if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {scheduler.shutdownNow();}} catch (InterruptedException e) {scheduler.shutdownNow();Thread.currentThread().interrupt();}// 關閉所有客戶端連接for (String serverName : new ArrayList<>(activeClients.keySet())) {disconnectServer(serverName);}}
}
/*** 簡化的MCP工具回調提供者,用于演示目的。** <p>此類提供了從MCP客戶端創建工具回調的基本功能。** @author Spring AI Team* @since 1.1.0*/
public class SimpleMcpToolCallbackProvider {private static final Logger logger = LoggerFactory.getLogger(SimpleMcpToolCallbackProvider.class);private static final ObjectMapper objectMapper = new ObjectMapper();/*** 從MCP客戶端列表創建工具回調列表*/public static List<SyncMcpToolCallback> createToolCallbacks(List<McpSyncClient> clients) {List<SyncMcpToolCallback> callbacks = new ArrayList<>();for (McpSyncClient client : clients) {try {// 獲取工具列表McpSchema.ListToolsResult toolsResult = client.listTools();if (toolsResult != null && toolsResult.tools() != null) {for (McpSchema.Tool tool : toolsResult.tools()) {callbacks.add(new SyncMcpToolCallback(client, tool));}}} catch (Exception e) {logger.error("獲取客戶端工具失敗", e);}}return callbacks;}/*** 從MCP客戶端列表創建工具回調列表*/public static List<SyncMcpToolCallback> getToolCallbacksByClientName(McpSyncClient client) {List<SyncMcpToolCallback> callbacks = new ArrayList<>();try {// 獲取工具列表McpSchema.ListToolsResult toolsResult = client.listTools();if (toolsResult != null && toolsResult.tools() != null) {for (McpSchema.Tool tool : toolsResult.tools()) {callbacks.add(new SyncMcpToolCallback(client, tool));}}} catch (Exception e) {logger.error("獲取客戶端工具失敗", e);}return callbacks;}/*** 簡化的MCP工具回調實現*/public static class SimpleMcpToolCallback implements ToolCallback {private final McpSyncClient client;private final McpSchema.Tool tool;private final ToolDefinition toolDefinition;public SimpleMcpToolCallback(McpSyncClient client, McpSchema.Tool tool) {this.client = client;this.tool = tool;this.toolDefinition = ToolDefinition.builder().name(tool.name()).description(tool.description()).inputSchema(tool.inputSchema() != null ? tool.inputSchema().toString() : "{}").build();}@Overridepublic ToolDefinition getToolDefinition() {return toolDefinition;}@Overridepublic String call(String arguments) {try {// 解析參數@SuppressWarnings("unchecked")Map<String, Object> args = objectMapper.readValue(arguments, Map.class);// 調用MCP工具McpSchema.CallToolRequest request = new McpSchema.CallToolRequest(tool.name(), args);McpSchema.CallToolResult result = client.callTool(request);if (result != null && result.content() != null && !result.content().isEmpty()) {// 提取文本內容StringBuilder response = new StringBuilder();for (Object content : result.content()) {if (content instanceof McpSchema.TextContent) {McpSchema.TextContent textContent = (McpSchema.TextContent) content;response.append(textContent.text());} else {response.append(content.toString());}}return response.toString();} else {return "工具執行完成,無返回內容";}} catch (Exception e) {logger.error("調用MCP工具失敗: {}", tool.name(), e);return "錯誤:" + e.getMessage();}}}
}
/*** 服務器配置類*/
public class ServerConfig {public ServerConfig() { super(); }private String name;private String url;private String sseEndpoint; // SSE端點,默認為/sseprivate Map<String, String> headers;private Duration timeout = Duration.ofSeconds(30);private boolean enabled = true;// Getters and Setterspublic String getName() { return name; }public void setName(String name) { this.name = name; }public String getUrl() { return url; }public void setUrl(String url) { this.url = url; }public String getSseEndpoint() { return sseEndpoint; }public void setSseEndpoint(String sseEndpoint) { this.sseEndpoint = sseEndpoint; }public Map<String, String> getHeaders() { return headers; }public void setHeaders(Map<String, String> headers) { this.headers = headers; }public Duration getTimeout() { return timeout; }public void setTimeout(Duration timeout) { this.timeout = timeout; }public boolean isEnabled() { return enabled; }public void setEnabled(boolean enabled) { this.enabled = enabled; }/*** 獲取完整的SSE URL(基礎URL + SSE端點)*/public String getFullSseUrl() {if (url == null) return null;if (url.startsWith("stdio://")) return url;String baseUrl = url.endsWith("/") ? url.substring(0, url.length() - 1) : url;String endpoint = sseEndpoint.startsWith("/") ? sseEndpoint : "/" + sseEndpoint;return baseUrl + endpoint;}
}
6.Mcp的相關接口
/*** MCP工具控制器*/
@Tag(name = "mcp工具", description = "供統一的REST API來調用MCP工具")
@RestController
@RequestMapping("/mcp")
public class McpToolController {private static final Logger logger = LoggerFactory.getLogger(McpToolController.class);/*** 動態MCP客戶端管理器,負責動態連接和管理MCP服務器*/@Autowired(required = false)private DynamicMcpClientManager dynamicClientManager;/*** AiMcpService,負責MCP工具的數據庫操作*/@Autowiredprivate AiMcpService aiMcpService;public McpToolController() {super(); // 默認無參構造,調用父類構造函數}/*** 獲取當前有效的工具回調數組,優先使用動態管理器** @return ToolCallback[] 當前可用的工具回調*/private ToolCallback[] getCurrentToolCallbacks() {List<ToolCallback> dynamicTools = dynamicClientManager.getAvailableToolCallbacks();if (!dynamicTools.isEmpty()) {return dynamicTools.toArray(new ToolCallback[0]);}return new ToolCallback[0];}/*** 獲取MCP配置狀態** @return 配置狀態信息*/@Operation(summary = "獲取MCP配置狀態", description = "返回當前MCP工具的配置模式、工具數量、可用工具列表等狀態信息。")@GetMapping("/status")public Map<String, Object> getStatus() {Map<String, Object> status = new HashMap<>();ToolCallback[] currentCallbacks = getCurrentToolCallbacks();status.put("toolCount", currentCallbacks.length);status.put("hasDynamicManager", dynamicClientManager != null);// 添加工具列表List<Map<String, String>> toolList = new ArrayList<>();for (ToolCallback callback : currentCallbacks) {Map<String, String> tool = new HashMap<>();tool.put("name", callback.getToolDefinition().name());tool.put("description", callback.getToolDefinition().description());toolList.add(tool);}status.put("tools", toolList);logger.debug("📊 狀態查詢 - 工具數: {}", currentCallbacks.length);return status;}/*** 獲取可用工具列表** @return 工具列表*/@Operation(summary = "獲取可用MCP工具列表", description = "返回當前可用的MCP工具及其描述、輸入參數等信息。")@GetMapping("/tools")public Map<String, Object> getTools() {Map<String, Object> result = new HashMap<>();try {ToolCallback[] currentCallbacks = getCurrentToolCallbacks();List<Map<String, Object>> toolList = new ArrayList<>();for (ToolCallback callback : currentCallbacks) {Map<String, Object> tool = new HashMap<>();tool.put("name", callback.getToolDefinition().name());tool.put("description", callback.getToolDefinition().description());tool.put("inputSchema", callback.getToolDefinition().inputSchema());toolList.add(tool);}result.put("success", true);result.put("toolCount", currentCallbacks.length);result.put("tools", toolList);logger.debug("🔧 工具列表查詢 - 返回 {} 個工具", currentCallbacks.length);} catch (Exception e) {logger.error("? 獲取工具列表失敗", e);result.put("success", false);result.put("message", "獲取工具列表失敗: " + e.getMessage());result.put("toolCount", 0);result.put("tools", new ArrayList<>());}return result;}/*** 獲取可用工具列表** @return 工具列表*/@Operation(summary = "獲取指定MCP工具列表", description = "返回當前MCP服務的工具及其描述、輸入參數等信息。")@GetMapping("/toolsByName")public Map<String, Object> toolsByName(@RequestParam String mcpName) {Map<String, Object> result = new HashMap<>();List<SyncMcpToolCallback> tools = dynamicClientManager.getToolByMcpName(mcpName);List<Map<String, Object>> toolList = new ArrayList<>();for (ToolCallback callback : tools) {Map<String, Object> tool = new HashMap<>();tool.put("name", callback.getToolDefinition().name());tool.put("description", callback.getToolDefinition().description());tool.put("inputSchema", callback.getToolDefinition().inputSchema());toolList.add(tool);}result.put("success", true);result.put("toolCount", tools.size());result.put("tools", toolList);logger.debug("🔧 工具列表查詢 - 返回 {} 個工具", tools.size());return result;}/*** 調用MCP工具** @param toolName 工具名稱* @param arguments 工具參數(JSON格式)* @return 調用結果*/@Operation(summary = "調用指定MCP工具", description = "根據工具名稱和參數調用MCP工具,返回調用結果。參數為JSON字符串。")@PostMapping("/call/{toolName}")public Map<String, Object> callTool(@PathVariable String toolName, @RequestBody String arguments) {Map<String, Object> result = new HashMap<>();try {ToolCallback[] currentCallbacks = getCurrentToolCallbacks();if (currentCallbacks.length == 0) {result.put("success", false);result.put("message", "當前沒有可用的MCP工具");return result;}// 查找指定的工具ToolCallback targetTool = null;for (ToolCallback callback : currentCallbacks) {if (callback.getToolDefinition().name().equals(toolName)) {targetTool = callback;break;}}if (targetTool == null) {result.put("success", false);result.put("message", "工具不存在: " + toolName);result.put("availableTools", Arrays.stream(currentCallbacks).map(cb -> cb.getToolDefinition().name()).toArray());return result;}logger.debug("🔧 調用工具: {}, 參數: {}", toolName, arguments);// 調用工具String toolResult = targetTool.call(arguments);result.put("success", true);result.put("toolName", toolName);result.put("result", toolResult);logger.info("? 工具調用成功: {} -> {}", toolName, toolResult);} catch (Exception e) {logger.error("? 工具調用失敗: {}", toolName, e);result.put("success", false);result.put("message", "調用失敗: " + e.getMessage());result.put("toolName", toolName);}return result;}/*** 刷新工具列表(重新從MCP服務器獲取)** @return 刷新結果*/@Operation(summary = "刷新MCP工具列表", description = "重新從MCP服務器獲取工具列表并刷新緩存。")@PostMapping("/refresh")public Map<String, Object> refreshTools() {Map<String, Object> result = new HashMap<>();try {if (dynamicClientManager != null) {logger.info("🔄 刷新MCP工具列表");dynamicClientManager.refreshToolCallbacks();List<ToolCallback> tools = dynamicClientManager.getAvailableToolCallbacks();result.put("success", true);result.put("message", "工具列表已刷新");result.put("toolCount", tools.size());logger.info("? 工具列表刷新完成,發現 {} 個工具", tools.size());} else {result.put("success", false);result.put("message", "動態管理器不可用");}} catch (Exception e) {logger.error("? 刷新工具列表失敗", e);result.put("success", false);result.put("message", "刷新失敗: " + e.getMessage());}return result;}/*** 新增MCP工具*/@Operation(summary = "新增MCP工具", description = "新增一條MCP工具記錄")@PostMapping("/entity")public RestVO<String> addMcp(@RequestBody AiMcp mcp) {try {aiMcpService.addMcp(mcp, dynamicClientManager);return RestVO.success("新增成功");} catch (Exception e) {return RestVO.fail("新增失敗: " + e.getMessage());}}/*** 修改MCP工具*/@Operation(summary = "修改MCP工具", description = "根據ID修改MCP工具記錄")@PutMapping("/entity/{id}")public RestVO<String> updateMcp(@PathVariable Long id, @RequestBody AiMcp mcp) {try {mcp.setId(id);aiMcpService.updateMcp(mcp);return RestVO.success("修改成功");} catch (Exception e) {return RestVO.fail("修改失敗: " + e.getMessage());}}/*** 刪除MCP工具*/@Operation(summary = "刪除MCP工具", description = "根據ID刪除MCP工具記錄")@DeleteMapping("/entity/{id}")public RestVO<String> deleteMcp(@PathVariable Long id) {try {aiMcpService.deleteMcp(id, dynamicClientManager);return RestVO.success("刪除成功");} catch (Exception e) {return RestVO.fail("刪除失敗: " + e.getMessage());}}/*** 分頁查詢MCP工具,支持名稱模糊搜索,按添加日期倒序*/@Operation(summary = "分頁查詢MCP工具", description = "分頁查詢MCP工具,支持名稱模糊搜索,按添加日期倒序排列")@GetMapping("/entity/page")public RestVO<Map<String, Object>> pageMcp(@RequestParam(defaultValue = "") String name,@RequestParam(defaultValue = "1") int page,@RequestParam(defaultValue = "10") int size) {try {Page<AiMcp> resultPage = aiMcpService.pageQuery(name, page, size);Map<String, Object> result = new HashMap<>();result.put("total", resultPage.getTotal());result.put("pages", resultPage.getPages());result.put("current", resultPage.getCurrent());result.put("size", resultPage.getSize());result.put("records", resultPage.getRecords());return RestVO.success(result);} catch (Exception e) {return RestVO.fail("分頁查詢失敗: " + e.getMessage());}}
}
7.測試mcp工具
添加一個高德地圖的mcp ,key從高德開放平臺獲取
{"mcpServers": {"amap-amap-sse": {"url": "https://mcp.amap.com","type": "sse","sseEndpoint": "/sse?key=xxxxxxxxxxxxxxxxxx"}}
}
查詢可用工具
8.ChatClient接入toolCallbacks
private Flux<FluxVO> getFluxVOFlux(List<Message> messageList, AiModel myModel, QuestionVO body) {Prompt prompt = new Prompt(messageList);AtomicBoolean inThinking = new AtomicBoolean(false);StringBuffer outputText = body.getMemory() ? new StringBuffer() : null;ChatClient chatModel = myModel.getChatClient();// 1. 先構造 Publisher<ChatResponse>Flux<ChatResponse> publisher;//判斷是否需要啟用mcp工具if (body.getUseTools()) {List<ToolCallback> toolCallbacks = dynamicMcpClientManager.getAvailableToolCallbacks();publisher = chatModel.prompt(prompt).toolCallbacks(toolCallbacks).stream().chatResponse();} else {publisher = chatModel.prompt(prompt).stream().chatResponse();}// 主動推送一條“處理中”消息Flux<FluxVO> proactiveMsg = Flux.just(FluxVO.builder().text("").status("before").build());Flux<FluxVO> resp = Flux.from(publisher).doFirst(() -> {System.out.println("-------------開始輸出");if (body.getMemory()) {chatMemoryService.saveMessage(body);}}).doFinally(signalType -> {System.out.println("-------------流式處理結束");if (body.getMemory() && outputText != null) {chatMemoryService.saveMessage(body.getSessionId(), "ASSISTANT", outputText.toString(), body.getModel());}});return Flux.concat(proactiveMsg, resp);
getFluxVOFlux 之前的代碼可以參考同系列前幾章節