企業級分布式 MCP 方案

飛書原文檔鏈接地址:https://ik3te1knhq.feishu.cn/wiki/D8kSwC9tFi61CMkRdd8cMxNTnpg

企業級分布式 MCP 方案

[!TIP]
背景:現階段 MCP Client 和 MCP Server 是一對一的連接方式,若當前 MCP Server 掛掉了,那么 MCP Client 便不能使用 MCP Server 提供的工具能力。工具穩定性的提供得不到保證

解決:做了一些分布式 Client 連接的探索,一個 MCP Client 端可以連接多個 MCP Server(分布式部署),目前采用的方案如下:

  1. 新建一個包含服務名和對應連接的類
  2. 另外實現監聽機制,可以動態的應對 MCP Server 節點上下線,去動態調整 mcpAsyncClientList
  3. (讀操作)獲取 MCP Server 相關信息的,采用從 mcpAsyncClientList 列表中隨機中獲取一個去發起請求,比如獲取工具列表信息
  4. (寫操作)對應 MCP Server 需要更改的信息,由 MCP Client 端發起,需要修改所有的 MCP Server

public class LoadbalancedAsyncClient implements EventListener {
private String serviceName;
private List mcpAsyncClientList;

}
給社區貢獻代碼:https://github.com/alibaba/spring-ai-alibaba/pull/755

模塊代碼解析

yaml 文件

spring:ai:mcp:client:enabled: truename: mcp-client-webfluxversion: 1.0.0type: SYNCnacos-enabled: true # 開啟nacos-client配置,啟動分布式alibaba:mcp:nacos: ## nacos的基礎配置信息enabled: trueserver-addr: <nacos-sever-addr>service-namespace: <nacos-namespace>  service-group: <nacos-group>client:sse:connections: // 注冊在nacos的MCP Server服務,這里mcp-server1代表多節點nacos-server1: mcp-server1nacos-server2: mcp-server2

自動注入部分

NacosMcpSseClientProperties(配置類)
@ConfigurationProperties("spring.ai.alibaba.mcp.client.sse")
public class NacosMcpSseClientProperties {public static final String _CONFIG_PREFIX _= "spring.ai.alibaba.mcp.client.sse";private final Map<String, String> connections = new HashMap<>();public Map<String, String> getConnections() {return connections;}}
NacosMcpSseClientAutoConfiguration

提供 Map<String, List> 的 bean

  • 鍵代表服務名
  • 值為對應的后續連接的 WebFluxSseClientTransport 列表
@AutoConfiguration
@EnableConfigurationProperties({ NacosMcpSseClientProperties.class, NacosMcpRegistryProperties.class })
public class NacosMcpSseClientAutoConfiguration {private static final Logger _logger _= LoggerFactory._getLogger_(NacosMcpSseClientAutoConfiguration.class);public NacosMcpSseClientAutoConfiguration() {}@Beanpublic NamingService nacosNamingService(NacosMcpRegistryProperties nacosMcpRegistryProperties) {Properties nacosProperties = nacosMcpRegistryProperties.getNacosProperties();try {return NamingFactory._createNamingService_(nacosProperties);}catch (NacosException e) {throw new RuntimeException(e);}}@Bean(name = "server2NamedTransport")public Map<String, List<NamedClientMcpTransport>> server2NamedTransport(NacosMcpSseClientProperties nacosMcpSseClientProperties, NamingService namingService,ObjectProvider<WebClient.Builder> webClientBuilderProvider,ObjectProvider<ObjectMapper> objectMapperProvider) {Map<String, List<NamedClientMcpTransport>> server2NamedTransport = new HashMap<>();WebClient.Builder webClientBuilderTemplate = (WebClient.Builder) webClientBuilderProvider.getIfAvailable(WebClient::_builder_);ObjectMapper objectMapper = (ObjectMapper) objectMapperProvider.getIfAvailable(ObjectMapper::new);Map<String, String> connections = nacosMcpSseClientProperties.getConnections();connections.forEach((serviceKey, serviceName) -> {try {List<Instance> instances = namingService.selectInstances(serviceName, true);List<NamedClientMcpTransport> namedTransports = new ArrayList<>();for (Instance instance : instances) {String url = instance.getMetadata().getOrDefault("scheme", "http") + "://" + instance.getIp() + ":"+ instance.getPort();WebClient.Builder webClientBuilder = webClientBuilderTemplate.clone().baseUrl(url);WebFluxSseClientTransport transport = new WebFluxSseClientTransport(webClientBuilder, objectMapper);namedTransports.add(new NamedClientMcpTransport(serviceName + "-" + instance.getInstanceId(), transport));}server2NamedTransport.put(serviceName, namedTransports);}catch (NacosException e) {_logger_.error("nacos naming service: {} error", serviceName, e);}});return server2NamedTransport;}}
NacosMcpClientAutoConfiguration

提供和 MCP Server 進行交互的客戶端

  • List
  • List
@AutoConfiguration(after = { NacosMcpSseClientAutoConfiguration.class, McpClientAutoConfiguration.class })
@ConditionalOnClass({ McpSchema.class })
@EnableConfigurationProperties({ McpClientCommonProperties.class })
@ConditionalOnProperty(prefix = "spring.ai.mcp.client", name = { "nacos-enabled" }, havingValue = "true",matchIfMissing = false)
public class NacosMcpClientAutoConfiguration {public NacosMcpClientAutoConfiguration() {}private String connectedClientName(String clientName, String serverConnectionName) {return clientName + " - " + serverConnectionName;}@Bean@ConditionalOnProperty(prefix = "spring.ai.mcp.client", name = { "type" }, havingValue = "SYNC",matchIfMissing = true)public List<LoadbalancedMcpSyncClient> loadbalancedMcpSyncClientList(ObjectProvider<McpSyncClientConfigurer> mcpSyncClientConfigurerProvider,McpClientCommonProperties commonProperties,@Qualifier("server2NamedTransport") ObjectProvider<Map<String, List<NamedClientMcpTransport>>> server2NamedTransportProvider,ObjectProvider<NamingService> namingServiceProvider) {NamingService namingService = namingServiceProvider.getObject();McpSyncClientConfigurer mcpSyncClientConfigurer = mcpSyncClientConfigurerProvider.getObject();List<LoadbalancedMcpSyncClient> loadbalancedMcpSyncClients = new ArrayList<>();Map<String, List<NamedClientMcpTransport>> server2NamedTransport = server2NamedTransportProvider.getObject();for (Map.Entry<String, List<NamedClientMcpTransport>> entry : server2NamedTransport.entrySet()) {String serviceName = entry.getKey();List<NamedClientMcpTransport> namedTransports = entry.getValue();List<McpSyncClient> mcpSyncClients = new ArrayList<>();McpSyncClient syncClient;for (NamedClientMcpTransport namedTransport : namedTransports) {McpSchema.Implementation clientInfo = new McpSchema.Implementation(this.connectedClientName(commonProperties.getName(), namedTransport.name()),commonProperties.getVersion());McpClient.SyncSpec syncSpec = McpClient._sync_(namedTransport.transport()).clientInfo(clientInfo).requestTimeout(commonProperties.getRequestTimeout());syncSpec = mcpSyncClientConfigurer.configure(namedTransport.name(), syncSpec);syncClient = syncSpec.build();if (commonProperties.isInitialized()) {syncClient.initialize();}mcpSyncClients.add(syncClient);}LoadbalancedMcpSyncClient loadbalancedMcpSyncClient = LoadbalancedMcpSyncClient._builder_().serviceName(serviceName).mcpSyncClientList(mcpSyncClients).namingService(namingService).build();loadbalancedMcpSyncClient.subscribe();loadbalancedMcpSyncClients.add(loadbalancedMcpSyncClient);}return loadbalancedMcpSyncClients;}@Bean@ConditionalOnProperty(prefix = "spring.ai.mcp.client", name = { "type" }, havingValue = "ASYNC")public List<LoadbalancedMcpAsyncClient> loadbalancedMcpAsyncClientList(ObjectProvider<McpAsyncClientConfigurer> mcpAsyncClientConfigurerProvider,McpClientCommonProperties commonProperties,@Qualifier("server2NamedTransport") ObjectProvider<Map<String, List<NamedClientMcpTransport>>> server2NamedTransportProvider,ObjectProvider<NamingService> namingServiceProvider) {NamingService namingService = namingServiceProvider.getObject();McpAsyncClientConfigurer mcpAsyncClientConfigurer = mcpAsyncClientConfigurerProvider.getObject();List<LoadbalancedMcpAsyncClient> loadbalancedMcpAsyncClients = new ArrayList<>();Map<String, List<NamedClientMcpTransport>> server2NamedTransport = server2NamedTransportProvider.getObject();for (Map.Entry<String, List<NamedClientMcpTransport>> entry : server2NamedTransport.entrySet()) {String serviceName = entry.getKey();List<NamedClientMcpTransport> namedTransports = entry.getValue();List<McpAsyncClient> mcpAsyncClients = new ArrayList<>();McpAsyncClient asyncClient;for (NamedClientMcpTransport namedTransport : namedTransports) {McpSchema.Implementation clientInfo = new McpSchema.Implementation(this.connectedClientName(commonProperties.getName(), namedTransport.name()),commonProperties.getVersion());McpClient.AsyncSpec asyncSpec = McpClient._async_(namedTransport.transport()).clientInfo(clientInfo).requestTimeout(commonProperties.getRequestTimeout());asyncSpec = mcpAsyncClientConfigurer.configure(namedTransport.name(), asyncSpec);asyncClient = asyncSpec.build();if (commonProperties.isInitialized()) {asyncClient.initialize().block();}mcpAsyncClients.add(asyncClient);}LoadbalancedMcpAsyncClient loadbalancedMcpAsyncClient = LoadbalancedMcpAsyncClient._builder_().serviceName(serviceName).mcpAsyncClientList(mcpAsyncClients).namingService(namingService).build();loadbalancedMcpAsyncClient.subscribe();loadbalancedMcpAsyncClients.add(loadbalancedMcpAsyncClient);}return loadbalancedMcpAsyncClients;}}

Client 端部分

LoadbalancedMcpAsyncClient

各字段含義:

  • String serviceName:MCP Server 注冊的服務名稱
  • List<McpAsyncClient> mcpAsyncClientList:對應的多節點客戶端
  • NamingService namingService:Nacos 服務
  • List<Instance> instances:Nacos 中 MCP Server 的實例列表

其余方法的使用和 McpAsyncClient 保持一致,已經全面封裝好了

  1. 讀操作:通過 getMcpAsyncClient()方法輪詢得到 McpAsyncClient 列表
  2. 寫操作:對所有 List<McpAsyncClient> 進行操作

通過實現 EventListener 接口,動態增加 or 減少 McpAsyncClient

public class LoadbalancedMcpAsyncClient implements EventListener {private static final Logger _logger _= LoggerFactory._getLogger_(LoadbalancedMcpAsyncClient.class);private final String serviceName;private final List<McpAsyncClient> mcpAsyncClientList;private final AtomicInteger currentIndex = new AtomicInteger(0);private final NamingService namingService;private List<Instance> instances;public LoadbalancedMcpAsyncClient(String serviceName, List<McpAsyncClient> mcpAsyncClientList,NamingService namingService) {Assert._notNull_(serviceName, "serviceName cannot be null");Assert._notNull_(mcpAsyncClientList, "mcpAsyncClientList cannot be null");Assert._notNull_(namingService, "namingService cannot be null");this.serviceName = serviceName;this.mcpAsyncClientList = mcpAsyncClientList;try {this.namingService = namingService;this.instances = namingService.selectInstances(serviceName, true);}catch (NacosException e) {throw new RuntimeException(String._format_("Failed to get instances for service: %s", serviceName));}}public void subscribe() {try {this.namingService.subscribe(this.serviceName, this);}catch (NacosException e) {throw new RuntimeException(String._format_("Failed to subscribe to service: %s", this.serviceName));}}public String getServiceName() {return serviceName;}public List<McpAsyncClient> getMcpAsyncClientList() {return mcpAsyncClientList;}public NamingService getNamingService() {return this.namingService;}public List<Instance> getInstances() {return this.instances;}private McpAsyncClient getMcpAsyncClient() {if (mcpAsyncClientList.isEmpty()) {throw new IllegalStateException("No McpAsyncClient available");}int index = currentIndex.getAndIncrement() % mcpAsyncClientList.size();return mcpAsyncClientList.get(index);}// ------------------------------------------------------------------------------------------------------------------------------------------------public McpSchema.ServerCapabilities getServerCapabilities() {return getMcpAsyncClient().getServerCapabilities();}public McpSchema.Implementation getServerInfo() {return getMcpAsyncClient().getServerInfo();}public boolean isInitialized() {return getMcpAsyncClient().isInitialized();}public McpSchema.ClientCapabilities getClientCapabilities() {return getMcpAsyncClient().getClientCapabilities();}public McpSchema.Implementation getClientInfo() {return getMcpAsyncClient().getClientInfo();}public void close() {Iterator<McpAsyncClient> iterator = mcpAsyncClientList.iterator();while (iterator.hasNext()) {McpAsyncClient mcpAsyncClient = iterator.next();mcpAsyncClient.close();iterator.remove();_logger_.info("Closed and removed McpSyncClient: {}", mcpAsyncClient.getClientInfo().name());}}public Mono<Void> closeGracefully() {Iterator<McpAsyncClient> iterator = mcpAsyncClientList.iterator();List<Mono<Void>> closeMonos = new ArrayList<>();while (iterator.hasNext()) {McpAsyncClient mcpAsyncClient = iterator.next();Mono<Void> voidMono = mcpAsyncClient.closeGracefully().doOnSuccess(v -> {iterator.remove();_logger_.info("Closed and removed McpAsyncClient: {}", mcpAsyncClient.getClientInfo().name());});closeMonos.add(voidMono);}return Mono._when_(closeMonos);}public Mono<Object> ping() {return getMcpAsyncClient().ping();}public Mono<Void> addRoot(McpSchema.Root root) {return Mono._when_(mcpAsyncClientList.stream().map(mcpAsyncClient -> mcpAsyncClient.addRoot(root)).collect(Collectors._toList_()));}public Mono<Void> removeRoot(String rootUri) {return Mono._when_(mcpAsyncClientList.stream().map(mcpAsyncClient -> mcpAsyncClient.removeRoot(rootUri)).collect(Collectors._toList_()));}public Mono<Void> rootsListChangedNotification() {return Mono._when_(mcpAsyncClientList.stream().map(McpAsyncClient::rootsListChangedNotification).collect(Collectors._toList_()));}public Mono<McpSchema.CallToolResult> callTool(McpSchema.CallToolRequest callToolRequest) {return getMcpAsyncClient().callTool(callToolRequest);}public Mono<McpSchema.ListToolsResult> listTools() {return getMcpAsyncClient().listTools();}public Mono<McpSchema.ListToolsResult> listTools(String cursor) {return getMcpAsyncClient().listTools(cursor);}public Mono<McpSchema.ListResourcesResult> listResources() {return getMcpAsyncClient().listResources();}public Mono<McpSchema.ListResourcesResult> listResources(String cursor) {return getMcpAsyncClient().listResources(cursor);}public Mono<McpSchema.ReadResourceResult> readResource(McpSchema.Resource resource) {return getMcpAsyncClient().readResource(resource);}public Mono<McpSchema.ReadResourceResult> readResource(McpSchema.ReadResourceRequest readResourceRequest) {return getMcpAsyncClient().readResource(readResourceRequest);}public Mono<McpSchema.ListResourceTemplatesResult> listResourceTemplates() {return getMcpAsyncClient().listResourceTemplates();}public Mono<McpSchema.ListResourceTemplatesResult> listResourceTemplates(String cursor) {return getMcpAsyncClient().listResourceTemplates(cursor);}public Mono<Void> subscribeResource(McpSchema.SubscribeRequest subscribeRequest) {return Mono._when_(mcpAsyncClientList.stream().map(mcpAsyncClient -> mcpAsyncClient.subscribeResource(subscribeRequest)).collect(Collectors._toList_()));}public Mono<Void> unsubscribeResource(McpSchema.UnsubscribeRequest unsubscribeRequest) {return Mono._when_(mcpAsyncClientList.stream().map(mcpAsyncClient -> mcpAsyncClient.unsubscribeResource(unsubscribeRequest)).collect(Collectors._toList_()));}public Mono<McpSchema.ListPromptsResult> listPrompts() {return getMcpAsyncClient().listPrompts();}public Mono<McpSchema.ListPromptsResult> listPrompts(String cursor) {return getMcpAsyncClient().listPrompts(cursor);}public Mono<McpSchema.GetPromptResult> getPrompt(McpSchema.GetPromptRequest getPromptRequest) {return getMcpAsyncClient().getPrompt(getPromptRequest);}public Mono<Void> setLoggingLevel(McpSchema.LoggingLevel loggingLevel) {return Mono._when_(mcpAsyncClientList.stream().map(mcpAsyncClient -> mcpAsyncClient.setLoggingLevel(loggingLevel)).collect(Collectors._toList_()));}// ------------------------------------------------------------------------------------------------------------------------------------------------@Overridepublic void onEvent(Event event) {if (event instanceof NamingEvent namingEvent) {if (this.serviceName.equals(namingEvent.getServiceName())) {_logger_.info("Received service instance change event for service: {}", namingEvent.getServiceName());List<Instance> instances = namingEvent.getInstances();_logger_.info("Updated instances count: {}", instances.size());// 打印每個實例的詳細信息instances.forEach(instance -> {_logger_.info("Instance: {}:{} (Healthy: {}, Enabled: {}, Metadata: {})", instance.getIp(),instance.getPort(), instance.isHealthy(), instance.isEnabled(),JacksonUtils._toJson_(instance.getMetadata()));});updateClientList(instances);}}}private void updateClientList(List<Instance> currentInstances) {McpClientCommonProperties commonProperties = ApplicationContextHolder._getBean_(McpClientCommonProperties.class);McpAsyncClientConfigurer mcpSyncClientConfigurer = ApplicationContextHolder._getBean_(McpAsyncClientConfigurer.class);ObjectMapper objectMapper = ApplicationContextHolder._getBean_(ObjectMapper.class);WebClient.Builder webClientBuilderTemplate = ApplicationContextHolder._getBean_(WebClient.Builder.class);// 移除的實例列表List<Instance> removeInstances = instances.stream().filter(instance -> !currentInstances.contains(instance)).collect(Collectors._toList_());// 新增的實例列表List<Instance> addInstances = currentInstances.stream().filter(instance -> !instances.contains(instance)).collect(Collectors._toList_());// 刪除McpAsyncClient實例List<String> clientInfoNames = removeInstances.stream().map(instance -> connectedClientName(commonProperties.getName(),this.serviceName + "-" + instance.getInstanceId())).toList();Iterator<McpAsyncClient> iterator = mcpAsyncClientList.iterator();while (iterator.hasNext()) {McpAsyncClient mcpAsyncClient = iterator.next();McpSchema.Implementation clientInfo = mcpAsyncClient.getClientInfo();if (clientInfoNames.contains(clientInfo.name())) {_logger_.info("Removing McpAsyncClient: {}", clientInfo.name());mcpAsyncClient.closeGracefully().subscribe(v -> {iterator.remove();}, e -> _logger_.error("Failed to remove McpAsyncClient: {}", clientInfo.name(), e));}}// 新增McpAsyncClient實例McpAsyncClient asyncClient;for (Instance instance : addInstances) {String baseUrl = instance.getMetadata().getOrDefault("scheme", "http") + "://" + instance.getIp() + ":"+ instance.getPort();WebClient.Builder webClientBuilder = webClientBuilderTemplate.clone().baseUrl(baseUrl);WebFluxSseClientTransport transport = new WebFluxSseClientTransport(webClientBuilder, objectMapper);NamedClientMcpTransport namedTransport = new NamedClientMcpTransport(serviceName + "-" + instance.getInstanceId(), transport);McpSchema.Implementation clientInfo = new McpSchema.Implementation(this.connectedClientName(commonProperties.getName(), namedTransport.name()),commonProperties.getVersion());McpClient.AsyncSpec asyncSpec = McpClient._async_(namedTransport.transport()).clientInfo(clientInfo).requestTimeout(commonProperties.getRequestTimeout());asyncSpec = mcpSyncClientConfigurer.configure(namedTransport.name(), asyncSpec);asyncClient = asyncSpec.build();if (commonProperties.isInitialized()) {asyncClient.initialize().block();}_logger_.info("Added McpAsyncClient: {}", clientInfo.name());mcpAsyncClientList.add(asyncClient);}private String connectedClientName(String clientName, String serverConnectionName) {return clientName + " - " + serverConnectionName;}public static Builder builder() {return new Builder();}public static class Builder {private String serviceName;private List<McpAsyncClient> mcpAsyncClientList;private NamingService namingService;public Builder serviceName(String serviceName) {this.serviceName = serviceName;return this;}public Builder mcpAsyncClientList(List<McpAsyncClient> mcpAsyncClientList) {this.mcpAsyncClientList = mcpAsyncClientList;return this;}public Builder namingService(NamingService namingService) {this.namingService = namingService;return this;}public LoadbalancedMcpAsyncClient build() {return new LoadbalancedMcpAsyncClient(this.serviceName, this.mcpAsyncClientList, this.namingService);}}}
LoadbalancedMcpSyncClient

同上

public class LoadbalancedMcpSyncClient implements EventListener {private static final Logger _logger _= LoggerFactory._getLogger_(LoadbalancedMcpAsyncClient.class);private final String serviceName;private final List<McpSyncClient> mcpSyncClientList;private final AtomicInteger currentIndex = new AtomicInteger(0);private final NamingService namingService;private List<Instance> instances;public LoadbalancedMcpSyncClient(String serviceName, List<McpSyncClient> mcpSyncClientList,NamingService namingService) {Assert._notNull_(serviceName, "Service name must not be null");Assert._notNull_(mcpSyncClientList, "McpSyncClient list must not be null");Assert._notNull_(namingService, "NamingService must not be null");this.serviceName = serviceName;this.mcpSyncClientList = mcpSyncClientList;try {this.namingService = namingService;this.instances = namingService.selectInstances(serviceName, true);}catch (NacosException e) {throw new RuntimeException(String._format_("Failed to get instances for service: %s", serviceName));}}public void subscribe() {try {this.namingService.subscribe(this.serviceName, this);}catch (NacosException e) {throw new RuntimeException(String._format_("Failed to subscribe to service: %s", this.serviceName));}}public String getServiceName() {return this.serviceName;}public List<McpSyncClient> getMcpSyncClientList() {return this.mcpSyncClientList;}public NamingService getNamingService() {return this.namingService;}public List<Instance> getInstances() {return this.instances;}private McpSyncClient getMcpSyncClient() {if (mcpSyncClientList.isEmpty()) {throw new IllegalStateException("No McpAsyncClient available");}int index = currentIndex.getAndIncrement() % mcpSyncClientList.size();return mcpSyncClientList.get(index);}// ------------------------------------------------------------------------------------------------------------------------------------------------public McpSchema.ServerCapabilities getServerCapabilities() {return getMcpSyncClient().getServerCapabilities();}public McpSchema.Implementation getServerInfo() {return getMcpSyncClient().getServerInfo();}public McpSchema.ClientCapabilities getClientCapabilities() {return getMcpSyncClient().getClientCapabilities();}public McpSchema.Implementation getClientInfo() {return getMcpSyncClient().getClientInfo();}public void close() {Iterator<McpSyncClient> iterator = mcpSyncClientList.iterator();while (iterator.hasNext()) {McpSyncClient mcpSyncClient = iterator.next();mcpSyncClient.close();iterator.remove();_logger_.info("Closed and removed McpSyncClient: {}", mcpSyncClient.getClientInfo().name());}}public boolean closeGracefully() {List<Boolean> flagList = new ArrayList<>();Iterator<McpSyncClient> iterator = mcpSyncClientList.iterator();while (iterator.hasNext()) {McpSyncClient mcpSyncClient = iterator.next();boolean flag = mcpSyncClient.closeGracefully();flagList.add(flag);if (flag) {iterator.remove();_logger_.info("Closed and removed McpSyncClient: {}", mcpSyncClient.getClientInfo().name());}}return !flagList.stream().allMatch(flag -> flag);}public Object ping() {return getMcpSyncClient().ping();}public void addRoot(McpSchema.Root root) {for (McpSyncClient mcpSyncClient : mcpSyncClientList) {mcpSyncClient.addRoot(root);}}public void removeRoot(String rootUri) {for (McpSyncClient mcpSyncClient : mcpSyncClientList) {mcpSyncClient.removeRoot(rootUri);}}public McpSchema.CallToolResult callTool(McpSchema.CallToolRequest callToolRequest) {return getMcpSyncClient().callTool(callToolRequest);}public McpSchema.ListToolsResult listTools() {return getMcpSyncClient().listTools();}public McpSchema.ListToolsResult listTools(String cursor) {return getMcpSyncClient().listTools(cursor);}public McpSchema.ListResourcesResult listResources(String cursor) {return getMcpSyncClient().listResources(cursor);}public McpSchema.ListResourcesResult listResources() {return getMcpSyncClient().listResources();}public McpSchema.ReadResourceResult readResource(McpSchema.Resource resource) {return getMcpSyncClient().readResource(resource);}public McpSchema.ReadResourceResult readResource(McpSchema.ReadResourceRequest readResourceRequest) {return getMcpSyncClient().readResource(readResourceRequest);}public McpSchema.ListResourceTemplatesResult listResourceTemplates(String cursor) {return getMcpSyncClient().listResourceTemplates(cursor);}public McpSchema.ListResourceTemplatesResult listResourceTemplates() {return getMcpSyncClient().listResourceTemplates();}public void subscribeResource(McpSchema.SubscribeRequest subscribeRequest) {for (McpSyncClient mcpSyncClient : mcpSyncClientList) {mcpSyncClient.subscribeResource(subscribeRequest);}}public void unsubscribeResource(McpSchema.UnsubscribeRequest unsubscribeRequest) {for (McpSyncClient mcpSyncClient : mcpSyncClientList) {mcpSyncClient.unsubscribeResource(unsubscribeRequest);}}public McpSchema.ListPromptsResult listPrompts(String cursor) {return getMcpSyncClient().listPrompts(cursor);}public McpSchema.ListPromptsResult listPrompts() {return getMcpSyncClient().listPrompts();}public McpSchema.GetPromptResult getPrompt(McpSchema.GetPromptRequest getPromptRequest) {return getMcpSyncClient().getPrompt(getPromptRequest);}public void setLoggingLevel(McpSchema.LoggingLevel loggingLevel) {for (McpSyncClient mcpSyncClient : mcpSyncClientList) {mcpSyncClient.setLoggingLevel(loggingLevel);}}// ------------------------------------------------------------------------------------------------------------------------------------------------@Overridepublic void onEvent(Event event) {if (event instanceof NamingEvent namingEvent) {if (this.serviceName.equals(namingEvent.getServiceName())) {_logger_.info("Received service instance change event for service: {}", namingEvent.getServiceName());List<Instance> instances = namingEvent.getInstances();_logger_.info("Updated instances count: {}", instances.size());// 打印每個實例的詳細信息instances.forEach(instance -> {_logger_.info("Instance: {}:{} (Healthy: {}, Enabled: {}, Metadata: {})", instance.getIp(),instance.getPort(), instance.isHealthy(), instance.isEnabled(),JacksonUtils._toJson_(instance.getMetadata()));});updateClientList(instances);}}}private void updateClientList(List<Instance> currentInstances) {McpClientCommonProperties commonProperties = ApplicationContextHolder._getBean_(McpClientCommonProperties.class);McpSyncClientConfigurer mcpSyncClientConfigurer = ApplicationContextHolder._getBean_(McpSyncClientConfigurer.class);ObjectMapper objectMapper = ApplicationContextHolder._getBean_(ObjectMapper.class);WebClient.Builder webClientBuilderTemplate = ApplicationContextHolder._getBean_(WebClient.Builder.class);// 移除的實例列表List<Instance> removeInstances = instances.stream().filter(instance -> !currentInstances.contains(instance)).collect(Collectors._toList_());// 新增的實例列表List<Instance> addInstances = currentInstances.stream().filter(instance -> !instances.contains(instance)).collect(Collectors._toList_());// 刪除McpSyncClient實例List<String> clientInfoNames = removeInstances.stream().map(instance -> connectedClientName(commonProperties.getName(),this.serviceName + "-" + instance.getInstanceId())).toList();Iterator<McpSyncClient> iterator = mcpSyncClientList.iterator();while (iterator.hasNext()) {McpSyncClient mcpSyncClient = iterator.next();McpSchema.Implementation clientInfo = mcpSyncClient.getClientInfo();if (clientInfoNames.contains(clientInfo.name())) {_logger_.info("Removing McpsyncClient: {}", clientInfo.name());if (mcpSyncClient.closeGracefully()) {iterator.remove();}else {_logger_.warn("Failed to remove mcpSyncClient: {}", clientInfo.name());}}}// 新增McpSyncClient實例McpSyncClient syncClient;for (Instance instance : addInstances) {String baseUrl = instance.getMetadata().getOrDefault("scheme", "http") + "://" + instance.getIp() + ":"+ instance.getPort();WebClient.Builder webClientBuilder = webClientBuilderTemplate.clone().baseUrl(baseUrl);WebFluxSseClientTransport transport = new WebFluxSseClientTransport(webClientBuilder, objectMapper);NamedClientMcpTransport namedTransport = new NamedClientMcpTransport(serviceName + "-" + instance.getInstanceId(), transport);McpSchema.Implementation clientInfo = new McpSchema.Implementation(this.connectedClientName(commonProperties.getName(), namedTransport.name()),commonProperties.getVersion());McpClient.SyncSpec syncSpec = McpClient._sync_(namedTransport.transport()).clientInfo(clientInfo).requestTimeout(commonProperties.getRequestTimeout());syncSpec = mcpSyncClientConfigurer.configure(namedTransport.name(), syncSpec);syncClient = syncSpec.build();if (commonProperties.isInitialized()) {syncClient.initialize();}_logger_.info("Added McpAsyncClient: {}", clientInfo.name());mcpSyncClientList.add(syncClient);}this.instances = currentInstances;}private String connectedClientName(String clientName, String serverConnectionName) {return clientName + " - " + serverConnectionName;}public static Builder builder() {return new Builder();}public static class Builder {private String serviceName;private List<McpSyncClient> mcpSyncClientList;private NamingService namingService;public Builder serviceName(String serviceName) {this.serviceName = serviceName;return this;}public Builder mcpSyncClientList(List<McpSyncClient> mcpSyncClientList) {this.mcpSyncClientList = mcpSyncClientList;return this;}public Builder namingService(NamingService namingService) {this.namingService = namingService;return this;}public LoadbalancedMcpSyncClient build() {return new LoadbalancedMcpSyncClient(this.serviceName, this.mcpSyncClientList, this.namingService);}}}

工具類

ApplicationContextHolder
@Component
public class ApplicationContextHolder implements ApplicationContextAware {private static ApplicationContext _applicationContext_;@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {ApplicationContextHolder._applicationContext _= applicationContext;}public static <T> T getBean(Class<T> clazz) {return _applicationContext_.getBean(clazz);}}

效果演示

我在 nacos 中,注冊了 MCP Server 服務,部署兩個節點

  • 同一臺機器以不同端口號啟動的 MCP Server 服務,分別是 19000、19001,注冊在 Nacos 中以 mcp-server-provider 為服務名

yml 配置如下

server:port: 8080spring:application:name: mcp-client-webflux_  _ai:alibaba:mcp:nacos:enabled: trueserver-addr: 127.0.0.1:8848username: nacospassword: nacosclient:sse:connections:nacos-server1: mcp-server-providermcp:client:enabled: truename: mcp-client-webfluxversion: 0.0.1initialized: truerequest-timeout: 600snacos-enabled: true

我們能發現已經成功注入 LoadbalancedMcpSyncClient 類,其中 mcp-server-provider 有兩個實例,對應的兩個 McpSyncClient

我們停掉其中的 MCP Server19001 端口的服務,通過 removeInstances 獲取移除的實例列表,同步在 mcpSyncClientList 移除對應的 McpSyncClient

我們再新啟動 MCP Server19001 端口的服務,通過 addInstances 獲取新增的實例列表,同步在 mcpSyncClientList 新增對應的 McpSyncClient

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/78162.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/78162.shtml
英文地址,請注明出處:http://en.pswp.cn/web/78162.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【AI提示詞】奧卡姆剃刀思維模型專家

提示說明 一位專注于奧卡姆剃刀思維模型的專業人士&#xff0c;擅長將簡潔性原則應用于復雜問題的分析與解決。 提示詞 # Role: 奧卡姆剃刀思維模型專家## Profile - language: 中文 - description: 一位專注于奧卡姆剃刀思維模型的專業人士&#xff0c;擅長將簡潔性原則應用…

2.1 行列式

引言 行列式是線性代數的核心工具&#xff0c;貫穿矩陣運算、特征值計算與微分方程求解。本文系統梳理2.1節核心考點&#xff0c;結合公式速查與典型例題&#xff0c;助你高效突破行列式難點&#xff01; 考點一&#xff1a;數值型行列式計算 1?? 行列式的定義 (1) 定義方…

單詞規律(簡單)

思路和同構字符串那道題一樣。、但是這道題要注意的地方就是&#xff0c;檢查 pattern 和 s 的單詞數量是否一致以及在進行字符串比較的時候應該用equals來進行比較&#xff0c;而不能用“&#xff01;”&#xff0c;“&#xff01;”比較的是對象引用而非內容。 class Soluti…

【C++】認識map和set

目錄 前言&#xff1a; 一&#xff1a;認識map和set 二&#xff1a;map和set的使用 1.set的使用 2.map的使用 三&#xff1a;map的insert方法返回值 四&#xff1a;map的[ ]的使用 五&#xff1a;multiset和multimap 六&#xff1a;map和set的底層數據結構 七&#x…

Mybatis中的一級二級緩存掃盲

思維導圖&#xff1a; MyBatis 提供了一級緩存和二級緩存機制&#xff0c;用于提高數據庫查詢的性能&#xff0c;減少對數據庫的訪問次數。&#xff08;本質上是減少IO次數&#xff09;。 一級緩存 1. 概念 一級緩存也稱為會話緩存&#xff0c;它是基于 SqlSession 的緩存。在同…

uniapp 實現低功耗藍牙連接并讀寫數據實戰指南

在物聯網應用場景中&#xff0c;低功耗藍牙&#xff08;BLE&#xff09;憑借其低能耗、連接便捷的特點&#xff0c;成為設備間數據交互的重要方式。Uniapp 作為一款跨平臺開發框架&#xff0c;提供了豐富的 API 支持&#xff0c;使得在多個端實現低功耗藍牙功能變得輕松高效。本…

OpenSSL應用實踐:嵌入式數據安全實戰指南

文章目錄 OpenSSL應用實踐:嵌入式數據安全實戰指南一、嵌入式安全現狀與OpenSSL適配方案1.1 嵌入式安全挑戰1.2 OpenSSL精簡方案二、開發環境搭建2.1 交叉編譯工具鏈2.2 OpenSSL交叉編譯三、核心功能實現3.1 AES-GCM加密實踐四、實戰項目:安全OTA升級4.1 系統架構4.2 關鍵代碼…

harmonyOS 手機,雙折疊,平板,PC端屏幕適配

由于HarmonyOS設備的屏幕尺寸和分辨率各不相同&#xff0c;開發者需要采取適當的措施來適配不同的屏幕。 1.EntryAbility.ets文件里&#xff1a;onWindowStageCreate方法里判斷設備類型&#xff0c; 如果是pad&#xff0c;需全屏展示&#xff08;按客戶需求來&#xff0c;本次…

跟韓學AiOps系列之2025學MySQL系列_如何在MySQL中開啟和提交事務?!

跟韓學AiOps系列之2025學MySQL系列_如何在MySQL中開啟和提交事務&#xff1f;! 文章目錄 一、事務的基本操作1. 開啟事務2. 執行事務內操作3. 提交事務4. 回滾事務 二、驗證示例&#xff08;適用于 MySQL 5.7&#xff09;步驟 1&#xff1a;準備測試表和數據步驟 2&#xff1a…

Java生成微信小程序碼及小程序短鏈接

使用wx-java-miniapp-spring-boot-starter 生成微信小程序碼及小程序短鏈接 在pom.xml文件中引入依賴 <dependency><groupId>com.github.binarywang</groupId><artifactId>wx-java-miniapp-spring-boot-starter</artifactId><version>4.7…

如何讓通義千問大模型支持結構化輸出?

之前的文章提到通義千問API無法通過with_structured_output/json schema的方式支持結構化輸出&#xff0c;如果就是想使用通義千問大模型做結構化輸出&#xff0c;應該怎么辦呢&#xff1f;有兩種辦法 使用Ollama來運行通義千問大模型 從Ollama博客文章 Structured output 中…

一條 SQL 查詢語句是如何執行的(MySQL)

第一講&#xff1a;一條 SQL 查詢語句是如何執行的 總覽圖示 MySQL 查詢的執行流程可以大致分為以下步驟&#xff08;如圖所示&#xff09;&#xff1a; 連接器&#xff08;Connection&#xff09;查詢緩存&#xff08;Query Cache&#xff0c;MySQL 8.0 已廢棄&#xff09;…

汽車OTA在線升級法規分析

摘要 本文介紹了R156法規即《關于批準車輛的軟件升級和軟件升級管理體系統一規定的法規》、該法規專注于汽車軟件升級功能&#xff0c;并為此提出了一系列具體要求&#xff0c;旨在確保軟件升級流程的安全性、可控性和合規性&#xff0c;從而順應汽車行業智能化、聯網化的發展趨…

Notepad編輯器實現換行符替換

在不同的Note編輯器中&#xff0c;批量把換行替換為空的方法有所不同&#xff0c;以下是常見編輯器的操作方法&#xff1a; Notepad 打開文件后&#xff0c;按CtrlH打開“查找和替換”對話框&#xff0c;在“查找”字段中輸入\r\n&#xff0c;在“替換為”字段中輸入一個空格…

Rust多線程性能優化:打破Arc+鎖的瓶頸,效率提升10倍

一、引言 在 Rust 開發中&#xff0c;多線程編程是提升程序性能的重要手段。Arc&#xff08;原子引用計數&#xff09;和鎖的組合是實現多線程數據共享的常見方式。然而&#xff0c;很多程序員在使用 Arc 和鎖時會遇到性能瓶頸&#xff0c;導致程序運行效率低下。本文將深入剖…

【安裝指南】Centos7 在 Docker 上安裝 RabbitMQ4.0.x

目錄 前置知識:RabbitMQ 的介紹 一、單機安裝 RabbitMQ 4.0.7版本 1.1 在線拉取鏡像 二、延遲插件的安裝 2.1 安裝延遲插件 步驟一:下載延遲插件 步驟二:將延遲插件放到插件目錄 步驟三:啟動延遲插件 步驟四:重啟 RabbitMQ 服務 步驟五:驗收成果 步驟六:手動…

【quantity】5 derive_more庫 2.0 版介紹

derive_more 是一個 Rust 過程宏庫&#xff0c;旨在通過派生宏自動生成常見 trait 的實現&#xff0c;減少樣板代碼。2.0 版本帶來了多項改進和新特性。 主要特性 1. 支持的 Trait 派生 derive_more 2.0 支持派生以下 trait&#xff1a; 基本操作 trait: Display - 格式化顯…

網站備份,網站數據備份的步驟

網站備份&#xff08;尤其是網站數據備份&#xff09;是保障業務連續性、防止數據丟失和應對安全威脅的關鍵措施。以下是系統化的備份步驟和實施建議&#xff0c;涵蓋技術操作、策略規劃及常見問題處理&#xff1a; 一、備份前的準備工作 明確備份范圍 核心數據&#xff1a;…

OpenCV 圖形API(72)圖像與通道拼接函數-----根據指定的方式翻轉圖像(GMat)函數 flip()

操作系統&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 編程語言&#xff1a;C11 算法描述 翻轉一個2D矩陣&#xff0c;圍繞垂直軸、水平軸或同時圍繞兩個軸。 該函數以三種不同的方式之一翻轉矩陣&#xff08;行和列的索引是從0開始的&a…

醫生視角下轉錄組學的生物信息學分析

醫生視角下轉錄組學的生物信息學分析 轉錄組學的生物信息學分析是醫生解決臨床與科研問題的有力工具。這里羅列醫學轉錄組學相關的幾個概念&#xff0c;從使用者&#xff08;醫生&#xff09;的角度看待理解相關技術&#xff0c;為后續使用該技術說明臨床和科研問題奠定基礎。…