隨著業務規模的不斷擴大和系統復雜度的提升,單一服務器架構往往無法滿足高并發、高可用性和彈性擴展的需求。在MCP生態系統中,多服務器協作架構成為構建大規模應用的必然選擇。本文將深入探討MCP TypeScript-SDK在多服務器環境下的部署、協作和管理,以及如何構建高可用、高性能、易擴展的分布式MCP應用。
目錄
-
多服務器架構基礎
1.1 MCP多服務器架構概述
1.2 分布式系統的挑戰與解決方案
1.3 MCP服務器分類與職責劃分
1.4 多服務器架構的常見模式 -
服務發現與路由
2.1 服務注冊與發現機制
2.2 基于DNS的服務發現
2.3 動態路由與負載均衡
2.4 服務健康檢查與自動恢復 -
狀態同步與一致性
3.1 分布式狀態管理策略
3.2 事件驅動的狀態同步
3.3 實現最終一致性
3.4 數據分區與沖突解決 -
負載均衡與容錯設計
4.1 負載均衡策略與實現
4.2 故障檢測與自動恢復
4.3 優雅降級與熔斷機制
4.4 構建高可用MCP集群
1. 多服務器架構基礎
在深入探討MCP多服務器協作之前,我們需要先了解多服務器架構的核心概念、挑戰以及MCP SDK提供的解決方案。
1.1 MCP多服務器架構概述
MCP(Model Context Protocol)作為一種為大型語言模型(LLM)交互設計的協議,其服務架構需要適應復雜多變的業務場景和負載模式。多服務器MCP架構是指將MCP服務分布在多個物理或虛擬服務器上,通過協作方式提供統一的服務能力。
// MCP服務器實例類型
interface McpServerInstance {id: string; // 服務器唯一標識host: string; // 主機地址port: number; // 端口號role: 'master' | 'worker' | 'specialized'; // 服務器角色status: 'online' | 'offline' | 'degraded'; // 服務器狀態capacity: { // 服務器容量信息maxConcurrentRequests: number;currentLoad: number;};features: string[]; // 支持的特性startTime: Date; // 啟動時間
}// 多服務器集群配置
interface McpClusterConfig {serviceName: string; // 服務名稱serviceVersion: string;// 服務版本discoveryMethod: 'static' | 'dns' | 'registry'; // 服務發現方式heartbeatInterval: number; // 心跳間隔syncStrategy: 'event-based' | 'polling' | 'hybrid'; // 同步策略loadBalanceStrategy: 'round-robin' | 'least-connections' | 'consistent-hash'; // 負載均衡策略failoverPolicy: { // 故障轉移策略retryAttempts: number;retryDelay: number;circuitBreakerThreshold: number;};
}
多服務器MCP架構具有以下幾個核心特點:
- 水平擴展性:通過增加服務器數量來線性提升系統整體處理能力
- 高可用性:即使部分服務器故障,系統整體仍可持續提供服務
- 負載分散:將請求負載分散到多個服務器,避免單點壓力過大
- 資源隔離:可按業務功能或客戶需求隔離資源,提高安全性和穩定性
- 靈活部署:支持混合云、多區域、邊緣計算等多樣化部署模式
1.2 分布式系統的挑戰與解決方案
構建分布式MCP服務面臨一系列挑戰,MCP TypeScript-SDK針對這些挑戰提供了相應的解決方案:
挑戰 | 表現 | MCP SDK解決方案 |
---|---|---|
網絡不可靠 | 網絡延遲、分區、丟包 | 重試機制、異步通信、斷線重連 |
一致性問題 | 數據不一致、沖突 | 事件驅動同步、版本控制、沖突解決策略 |
服務協調 | 服務發現、路由 | 服務注冊表、服務健康檢查、動態路由 |
故障處理 | 節點故障、服務降級 | 故障檢測、自動恢復、熔斷機制 |
性能瓶頸 | 請求擁塞、資源競爭 | 負載均衡、請求限流、資源隔離 |
// MCP服務器故障處理配置
interface McpFailoverConfig {detection: {method: 'heartbeat' | 'ping' | 'health-endpoint';interval: number; // 檢測間隔timeout: number; // 超時時間thresholds: {warning: number; // 警告閾值critical: number; // 臨界閾值}};recovery: {strategy: 'restart' | 'replace' | 'redirect';cooldownPeriod: number; // 冷卻時間maxAttempts: number; // 最大嘗試次數};notification: {channels: string[]; // 通知渠道templates: Record<string, string>; // 通知模板}
}
1.3 MCP服務器分類與職責劃分
在多服務器架構中,不同的MCP服務器可以承擔不同的角色和職責,實現功能分離和專業化:
1.3.1 按角色分類
-
主服務器(Master):
- 管理集群狀態和配置
- 協調跨服務器的資源分配
- 監控集群健康狀態
- 通常部署較少數量,配置較高
-
工作服務器(Worker):
- 處理客戶端的實際請求
- 執行MCP資源訪問和工具調用
- 可大規模部署,構成系統處理能力的主體
- 通常是無狀態的,便于橫向擴展
-
專用服務器(Specialized):
- 專注于特定功能或業務場景
- 例如:AI推理服務器、數據處理服務器等
- 可根據需求定制化配置
- 適合資源密集型或安全敏感型業務
// MCP服務器角色定義和職責配置
import { McpServer } from '@modelcontextprotocol/sdk';// 主服務器配置
const masterConfig = {name: "mcp-master-server",description: "MCP集群主服務器",version: "1.0.0",cluster: {role: "master",workers: ["worker-1", "worker-2", "worker-3"],electionStrategy: "fixed", // 固定主服務器stateSync: {method: "push",interval: 5000}}
};// 工作服務器配置
const workerConfig = {name: "mcp-worker-server",description: "MCP集群工作服務器",version: "1.0.0",cluster: {role: "worker",masterId: "master-1",maxConcurrentRequests: 1000,resourceCacheSize: 500,reportInterval: 2000}
};// 專用服務器配置
const specializedConfig = {name: "mcp-specialized-server",description: "MCP圖像處理專用服務器",version: "1.0.0",cluster: {role: "specialized",serviceType: "image-processing",supportedOperations: ["resize", "filter", "recognize"],resourceRequirements: {gpu: true,minMemory: "16G",cpuCores: 8}}
};
1.3.2 按功能分類
-
API網關服務器:
- 請求路由和負載均衡
- 認證和授權處理
- 請求限流和緩存
- 請求/響應轉換
-
資源服務器:
- 管理MCP資源模板
- 處理資源訪問請求
- 資源緩存和優化
- 資源版本控制
-
工具服務器:
- 托管和執行MCP工具
- 工具依賴管理
- 工具執行環境隔離
- 工具性能監控
-
狀態同步服務器:
- 維護全局狀態
- 協調跨服務器狀態同步
- 處理分布式事務
- 解決數據沖突
// 按功能劃分的MCP服務器實現示例
import { McpServer } from '@modelcontextprotocol/sdk';
import express from 'express';
import { createProxyMiddleware } from 'http-proxy-middleware';// API網關服務器
const apiGatewayApp = express();
const apiGatewayServer = new McpServer({name: "mcp-api-gateway",description: "MCP API網關服務器",version: "1.0.0"
});// 路由配置
const routeConfig = {'/api/resources': { target: 'http://resource-server:3001', pathRewrite: {'^/api': ''} },'/api/tools': { target: 'http://tool-server:3002', pathRewrite: {'^/api': ''} }
};// 設置API代理
Object.entries(routeConfig).forEach(([path, config]) => {apiGatewayApp.use(path, createProxyMiddleware(config));
});// 資源服務器
const resourceServer = new McpServer({name: "mcp-resource-server",description: "MCP資源服務器",version: "1.0.0"
});// 注冊資源模板
resourceServer.registerResourceTemplate({name: "users",description: "用戶資源",schema: userSchema
});// 工具服務器
const toolServer = new McpServer({name: "mcp-tool-server",description: "MCP工具服務器",version: "1.0.0"
});// 注冊工具
toolServer.registerTool({name: "calculator",description: "計算工具",parameters: calculatorSchema,execute: async (params) => {// 計算邏輯return { result: calculateExpression(params.expression) };}
});
1.4 多服務器架構的常見模式
在實際應用中,MCP多服務器架構可以采用多種模式,根據業務需求和資源條件靈活選擇:
1.4.1 主從模式(Master-Slave)
最常見的分布式架構模式,一個主服務器協調多個從服務器的工作。
┌────────────┐ ┌────────────┐
│ │ │ │
│ Master │?────?│ Slave 1 │
│ │ │ │
└────────────┘ └────────────┘▲│▼
┌────────────┐ ┌────────────┐
│ │ │ │
│ Slave 2 │?────?│ Slave 3 │
│ │ │ │
└────────────┘ └────────────┘
// 主從模式實現示例
import { McpServer, EventEmitter } from '@modelcontextprotocol/sdk';
import { createClient } from 'redis';// 創建Redis客戶端用于服務器間通信
const pubClient = createClient({ url: process.env.REDIS_URL });
const subClient = pubClient.duplicate();// 主服務器實現
class MasterMcpServer extends McpServer {private slaves: Map<string, SlaveInfo> = new Map();constructor(config) {super(config);// 訂閱從服務器狀態更新subClient.subscribe('slave-status', (message) => {const slaveStatus = JSON.parse(message);this.updateSlaveStatus(slaveStatus);});// 定期檢查從服務器健康狀態setInterval(() => this.checkSlavesHealth(), 10000);}// 注冊從服務器registerSlave(slaveId: string, info: SlaveInfo) {this.slaves.set(slaveId, { ...info, lastHeartbeat: Date.now() });this.emit('slave-registered', { slaveId, info });}// 更新從服務器狀態updateSlaveStatus(status: SlaveStatus) {const slave = this.slaves.get(status.slaveId);if (slave) {this.slaves.set(status.slaveId, { ...slave, ...status, lastHeartbeat: Date.now() });}}// 檢查從服務器健康狀態checkSlavesHealth() {const now = Date.now();for (const [slaveId, info] of this.slaves.entries()) {if (now - info.lastHeartbeat > 30000) { // 30秒無心跳this.emit('slave-offline', { slaveId });this.slaves.delete(slaveId);}}}
}// 從服務器實現
class SlaveMcpServer extends McpServer {private masterId: string;constructor(config) {super(config);this.masterId = config.masterId;// 定期向主服務器發送心跳setInterval(() => this.sendHeartbeat(), 5000);// 訂閱主服務器指令subClient.subscribe(`slave-command:${this.id}`, (message) => {this.handleMasterCommand(JSON.parse(message));});}// 發送心跳sendHeartbeat() {const status = {slaveId: this.id,load: this.getCurrentLoad(),memory: process.memoryUsage(),status: 'online'};pubClient.publish('slave-status', JSON.stringify(status));}// 處理主服務器指令handleMasterCommand(command) {switch (command.type) {case 'restart':this.restart();break;case 'update-config':this.updateConfig(command.config);break;// 其他指令處理...}}
}
1.4.2 去中心化模式(Peer-to-Peer)
所有服務器地位平等,通過協作提供服務,無單點故障風險。
┌────────────┐ ┌────────────┐
│ │?────?│ │
│ Node 1 │ │ Node 2 │
│ │?────?│ │
└────────────┘ └────────────┘▲ ▲ ▲│ └───────┐ ┌───┘│ ▼ ▼
┌────────────┐ ┌────────────┐
│ │?────?│ │
│ Node 3 │ │ Node 4 │
│ │?────?│ │
└────────────┘ └────────────┘
// 去中心化模式實現示例
import { McpServer } from '@modelcontextprotocol/sdk';
import * as IPFS from 'ipfs-core';
import { v4 as uuidv4 } from 'uuid';// P2P MCP服務器
class P2PMcpServer extends McpServer {private ipfs;private peerId: string;private peers: Map<string, PeerInfo> = new Map();private eventTopic = 'mcp-p2p-events';constructor(config) {super(config);this.peerId = uuidv4();this.initializeP2P();}// 初始化P2P網絡async initializeP2P() {// 創建IPFS節點this.ipfs = await IPFS.create();// 獲取本節點IDconst nodeInfo = await this.ipfs.id();console.log(`P2P節點ID: ${nodeInfo.id}`);// 訂閱事件主題await this.ipfs.pubsub.subscribe(this.eventTopic, (msg) => {this.handleP2PEvent(msg);});// 定期發布狀態setInterval(() => this.broadcastStatus(), 5000);// 發布上線事件this.broadcastEvent({type: 'node-online',peerId: this.peerId,timestamp: Date.now(),info: {resources: this.getResourceNames(),tools: this.getToolNames(),capacity: this.getCapacity()}});}// 處理P2P事件handleP2PEvent(message) {try {const event = JSON.parse(message.data.toString());// 忽略自己發出的消息if (event.peerId === this.peerId) return;switch (event.type) {case 'node-online':this.addPeer(event.peerId, event.info);// 向新節點發送歡迎消息this.sendDirectEvent(event.peerId, {type: 'welcome',peerId: this.peerId,timestamp: Date.now()});break;case 'node-offline':this.removePeer(event.peerId);break;case 'status-update':this.updatePeerStatus(event.peerId, event.status);break;case 'resource-request':this.handleResourceRequest(event);break;case 'tool-execution':this.handleToolExecution(event);break;}} catch (error) {console.error('處理P2P事件錯誤:', error);}}// 廣播事件到所有節點async broadcastEvent(event) {await this.ipfs.pubsub.publish(this.eventTopic,Buffer.from(JSON.stringify(event)));}// 發送事件到特定節點async sendDirectEvent(targetPeerId, event) {const peer = this.peers.get(targetPeerId);if (!peer || !peer.directTopic) return;await this.ipfs.pubsub.publish(peer.directTopic,Buffer.from(JSON.stringify(event)));}// 定期廣播狀態broadcastStatus() {this.broadcastEvent({type: 'status-update',peerId: this.peerId,timestamp: Date.now(),status: {load: this.getCurrentLoad(),memory: process.memoryUsage(),uptime: process.uptime()}});}// 添加對等節點addPeer(peerId: string, info: any) {this.peers.set(peerId, {...info,directTopic: `mcp-p2p-direct:${peerId}`,lastSeen: Date.now()});// 訂閱此節點的直接通信主題this.ipfs.pubsub.subscribe(`mcp-p2p-direct:${this.peerId}`, (msg) => {this.handleDirectMessage(msg);});}// 移除對等節點removePeer(peerId: string) {this.peers.delete(peerId);}// 更新對等節點狀態updatePeerStatus(peerId: string, status: any) {const peer = this.peers.get(peerId);if (peer) {this.peers.set(peerId, { ...peer, ...status, lastSeen: Date.now() });}}
}
1.4.3 微服務模式(Microservices)
將MCP功能分解為多個獨立服務,每個服務專注于特定功能域。
┌─────────────────────────────────────────────┐
│ API Gateway │
└─────────────┬─────────────┬─────────────────┘│ │ ┌─────────▼───┐ ┌─────▼───────┐ ┌─────────────┐│ Resource │ │ Tool │ │ Identity ││ Service │ │ Service │ │ Service │└─────────────┘ └─────────────┘ └─────────────┘│ │ │┌─────────▼───┐ ┌─────▼───────┐ ┌─────▼───────┐│ Storage │ │ Execution │ │ Security ││ Service │ │ Service │ │ Service │└─────────────┘ └─────────────┘ └─────────────┘
// 微服務模式實現示例
import { McpServer } from '@modelcontextprotocol/sdk';
import express from 'express';
import axios from 'axios';
import { MongoClient } from 'mongodb';
import { v4 as uuidv4 } from 'uuid';
import amqp from 'amqplib';// 服務發現客戶端
class ServiceDiscovery {private serviceRegistry: Map<string, ServiceInfo[]> = new Map();private consulUrl: string;constructor(consulUrl: string) {this.consulUrl = consulUrl;this.refreshRegistry();setInterval(() => this.refreshRegistry(), 30000);}// 刷新服務注冊表async refreshRegistry() {try {const response = await axios.get(`${this.consulUrl}/v1/catalog/services`);for (const [serviceName, tags] of Object.entries(response.data)) {if (tags.includes('mcp')) {const instances = await this.getServiceInstances(serviceName);this.serviceRegistry.set(serviceName, instances);}}} catch (error) {console.error('刷新服務注冊表失敗:', error);}}// 獲取服務實例列表async getServiceInstances(serviceName: string): Promise<ServiceInfo[]> {try {const response = await axios.get(`${this.consulUrl}/v1/health/service/${serviceName}?passing=true`);return response.data.map(entry => ({id: entry.Service.ID,address: entry.Service.Address,port: entry.Service.Port,tags: entry.Service.Tags,meta: entry.Service.Meta}));} catch (error) {console.error(`獲取服務${serviceName}實例失敗:`, error);return [];}}// 獲取服務實例getService(serviceName: string, tags: string[] = []): ServiceInfo | null {const instances = this.serviceRegistry.get(serviceName) || [];// 篩選滿足標簽要求的實例const eligibleInstances = instances.filter(instance => tags.every(tag => instance.tags.includes(tag)));if (eligibleInstances.length === 0) return null;// 簡單負載均衡:隨機選擇一個實例const randomIndex = Math.floor(Math.random() * eligibleInstances.length);return eligibleInstances[randomIndex];}
}// MCP資源服務
class ResourceMcpService {private server: McpServer;private app = express();private mongoClient: MongoClient;private serviceId: string;private discovery: ServiceDiscovery;private messageBroker: amqp.Connection;private channel: amqp.Channel;constructor(config) {this.server = new McpServer({name: "mcp-resource-service",description: "MCP資源微服務",version: "1.0.0"});this.serviceId = uuidv4();this.discovery = new ServiceDiscovery(config.consulUrl);// 初始化Express中間件this.initializeExpress();// 連接MongoDBthis.connectMongo(config.mongoUrl);// 連接消息代理this.connectMessageBroker(config.rabbitMqUrl);// 注冊服務this.registerService(config.consulUrl);}// 初始化ExpressinitializeExpress() {this.app.use(express.json());// 資源API端點this.app.get('/resources/:name', this.getResource.bind(this));this.app.post('/resources/:name', this.createResource.bind(this));this.app.put('/resources/:name/:id', this.updateResource.bind(this));this.app.delete('/resources/:name/:id', this.deleteResource.bind(this));// 健康檢查端點this.app.get('/health', (req, res) => {res.status(200).json({ status: 'healthy', serviceId: this.serviceId });});// 啟動服務器const port = process.env.PORT || 3000;this.app.listen(port, () => {console.log(`資源服務運行在端口 ${port}`);});}// 連接MongoDBasync connectMongo(mongoUrl: string) {try {this.mongoClient = new MongoClient(mongoUrl);await this.mongoClient.connect();console.log('已連接到MongoDB');} catch (error) {console.error('MongoDB連接失敗:', error);process.exit(1);}}// 連接消息代理async connectMessageBroker(rabbitMqUrl: string) {try {this.messageBroker = await amqp.connect(rabbitMqUrl);this.channel = await this.messageBroker.createChannel();// 聲明交換機和隊列await this.channel.assertExchange('mcp-events', 'topic', { durable: true });const { queue } = await this.channel.assertQueue(`resource-service-${this.serviceId}`,{ exclusive: true });// 綁定感興趣的主題await this.channel.bindQueue(queue, 'mcp-events', 'resource.*');// 消費消息this.channel.consume(queue, (msg) => {if (msg) {this.handleMessage(msg);this.channel.ack(msg);}});console.log('已連接到消息代理');} catch (error) {console.error('消息代理連接失敗:', error);}}// 處理消息handleMessage(msg: amqp.ConsumeMessage) {try {const content = JSON.parse(msg.content.toString());const routingKey = msg.fields.routingKey;console.log(`收到消息: ${routingKey}`, content);// 根據路由鍵處理不同類型的消息switch (routingKey) {case 'resource.created':this.handleResourceCreated(content);break;case 'resource.updated':this.handleResourceUpdated(content);break;case 'resource.deleted':this.handleResourceDeleted(content);break;}} catch (error) {console.error('處理消息錯誤:', error);}}// 發布事件publishEvent(routingKey: string, data: any) {this.channel.publish('mcp-events',routingKey,Buffer.from(JSON.stringify({...data,serviceId: this.serviceId,timestamp: Date.now()})));}// API處理器async getResource(req, res) {try {const { name } = req.params;const filter = req.query.filter ? JSON.parse(req.query.filter) : {};const db = this.mongoClient.db('mcp-resources');const collection = db.collection(name);const resources = await collection.find(filter).toArray();res.json({ success: true, data: resources });} catch (error) {res.status(500).json({success: false,error: error.message});}}async createResource(req, res) {try {const { name } = req.params;const resourceData = req.body;const db = this.mongoClient.db('mcp-resources');const collection = db.collection(name);const result = await collection.insertOne({...resourceData,_id: uuidv4(),createdAt: new Date(),updatedAt: new Date()});// 發布資源創建事件this.publishEvent('resource.created', {resourceName: name,resourceId: result.insertedId,data: resourceData});res.status(201).json({success: true,data: { id: result.insertedId }});} catch (error) {res.status(500).json({success: false,error: error.message});}}// 其他API處理器...// 注冊服務到Consulasync registerService(consulUrl: string) {try {await axios.put(`${consulUrl}/v1/agent/service/register`, {ID: this.serviceId,Name: 'mcp-resource-service',Tags: ['mcp', 'resource'],Address: process.env.SERVICE_HOST || 'localhost',Port: parseInt(process.env.PORT || '3000'),Check: {HTTP: `http://${process.env.SERVICE_HOST || 'localhost'}:${process.env.PORT || '3000'}/health`,Interval: '15s',Timeout: '5s'}});console.log(`服務已注冊,ID: ${this.serviceId}`);// 注冊服務注銷鉤子process.on('SIGINT', () => this.deregisterService(consulUrl));process.on('SIGTERM', () => this.deregisterService(consulUrl));} catch (error) {console.error('服務注冊失敗:', error);}}// 注銷服務async deregisterService(consulUrl: string) {try {await axios.put(`${consulUrl}/v1/agent/service/deregister/${this.serviceId}`);console.log('服務已注銷');process.exit(0);} catch (error) {console.error('服務注銷失敗:', error);process.exit(1);}}
}
多服務器架構為MCP應用提供了更高的可用性、可伸縮性和靈活性,但同時也帶來了系統復雜度的提升。在后續章節中,我們將深入探討如何應對這些挑戰,構建穩健的多服務器MCP系統。
2. 服務發現與路由
在多服務器MCP架構中,服務發現和路由機制是確保系統高效運行的關鍵組件。它們使客戶端能夠動態找到可用的服務實例,并將請求路由到最合適的服務器上。
2.1 服務注冊與發現機制
服務發現的核心是讓服務消費者能夠在不需要硬編碼服務提供者地址的情況下,動態找到并調用所需的服務。在MCP系統中,服務發現機制通常涉及以下幾個關鍵組件:
2.1.1 服務注冊表
服務注冊表是服務發現的核心數據存儲,記錄了所有可用服務實例的關鍵信息:
// 服務注冊表接口
interface ServiceRegistry {// 注冊服務實例register(instance: ServiceInstance): Promise<void>;// 注銷服務實例deregister(instanceId: string): Promise<void>;// 獲取指定服務的所有實例getInstances(serviceName: string): Promise<ServiceInstance[]>;// 查詢符合特定條件的服務實例findServices(query: ServiceQuery): Promise<ServiceInstance[]>;// 監聽服務變更watchService(serviceName: string, callback: (instances: ServiceInstance[]) => void): void;
}// 服務實例數據結構
interface ServiceInstance {id: string; // 實例唯一標識serviceName: string; // 服務名稱host: string; // 主機地址port: number; // 端口號status: 'UP' | 'DOWN' | 'STARTING' | 'OUT_OF_SERVICE'; // 實例狀態metadata: Record<string, string>; // 元數據,如版本、環境等healthCheckUrl?: string; // 健康檢查URLregistrationTime: number; // 注冊時間lastUpdateTime: number; // 最后更新時間
}
2.1.2 基于MCP實現的分布式服務注冊表
以下是一個使用MCP服務器實現的分布式服務注冊表示例:
import { McpServer } from '@modelcontextprotocol/sdk';
import { createClient } from 'redis';
import express from 'express';// 基于Redis的MCP服務注冊表實現
class McpServiceRegistry implements ServiceRegistry {private redisClient;private readonly KEY_PREFIX = 'mcp:registry:';private readonly TTL = 60; // 服務記錄過期時間(秒)constructor(redisUrl: string) {this.redisClient = createClient({ url: redisUrl });this.redisClient.connect();}// 注冊服務實例async register(instance: ServiceInstance): Promise<void> {const key = `${this.KEY_PREFIX}${instance.serviceName}:${instance.id}`;const now = Date.now();// 更新注冊時間或最后更新時間instance.lastUpdateTime = now;if (!instance.registrationTime) {instance.registrationTime = now;}// 將服務實例信息存儲到Redisawait this.redisClient.set(key, JSON.stringify(instance), { EX: this.TTL });// 添加到服務名稱集合中,方便按服務名查詢await this.redisClient.sAdd(`${this.KEY_PREFIX}${instance.serviceName}`, instance.id);}// 注銷服務實例async deregister(instanceId: string): Promise<void> {// 獲取服務實例信息const pattern = `${this.KEY_PREFIX}*:${instanceId}`;const keys = await this.redisClient.keys(pattern);if (keys.length > 0) {const key = keys[0];const serviceName = key.split(':')[2];// 從Redis中刪除服務實例await this.redisClient.del(key);// 從服務名稱集合中移除await this.redisClient.sRem(`${this.KEY_PREFIX}${serviceName}`, instanceId);}}// 獲取指定服務的所有實例async getInstances(serviceName: string): Promise<ServiceInstance[]> {const serviceKey = `${this.KEY_PREFIX}${serviceName}`;const instanceIds = await this.redisClient.sMembers(serviceKey);if (instanceIds.length === 0) {return [];}// 批量獲取所有服務實例信息const keys = instanceIds.map(id => `${this.KEY_PREFIX}${serviceName}:${id}`);const instancesData = await this.redisClient.mGet(keys);// 過濾并解析有效的實例數據return instancesData.filter(Boolean).map(data => JSON.parse(data)).filter(instance => instance.status !== 'DOWN');}// 查詢符合特定條件的服務實例async findServices(query: ServiceQuery): Promise<ServiceInstance[]> {const { serviceName, status, metadata } = query;// 獲取所有匹配服務名的實例const instances = await this.getInstances(serviceName);// 應用過濾條件return instances.filter(instance => {// 狀態過濾if (status && instance.status !== status) {return false;}// 元數據過濾if (metadata) {for (const [key, value] of Object.entries(metadata)) {if (instance.metadata[key] !== value) {return false;}}}return true;});}// 監聽服務變更watchService(serviceName: string, callback: (instances: ServiceInstance[]) => void): void {// 創建訂閱客戶端const subClient = this.redisClient.duplicate();// 訂閱服務變更事件subClient.subscribe(`${this.KEY_PREFIX}${serviceName}:changes`, (message) => {this.getInstances(serviceName).then(callback);});}
}// 服務注冊表API服務器
function createRegistryServer(registry: McpServiceRegistry): express.Express {const app = express();app.use(express.json());// 服務注冊端點app.post('/services', async (req, res) => {try {const instance = req.body as ServiceInstance;await registry.register(instance);res.status(201).json({ success: true, message: '服務實例已注冊' });} catch (error) {res.status(500).json({ success: false, error: error.message });}});// 服務注銷端點app.delete('/services/:instanceId', async (req, res) => {try {await registry.deregister(req.params.instanceId);res.status(200).json({ success: true, message: '服務實例已注銷' });} catch (error) {res.status(500).json({ success: false, error: error.message });}});// 獲取服務實例端點app.get('/services/:serviceName', async (req, res) => {try {const instances = await registry.getInstances(req.params.serviceName);res.status(200).json({ success: true, data: instances });} catch (error) {res.status(500).json({ success: false, error: error.message });}});// 服務查詢端點app.get('/services', async (req, res) => {try {const query = {serviceName: req.query.serviceName as string,status: req.query.status as 'UP' | 'DOWN' | 'STARTING' | 'OUT_OF_SERVICE',metadata: req.query.metadata ? JSON.parse(req.query.metadata as string) : undefined};const instances = await registry.findServices(query);res.status(200).json({ success: true, data: instances });} catch (error) {res.status(500).json({ success: false, error: error.message });}});return app;
}// 使用MCP服務器封裝服務注冊表
class McpRegistryServer extends McpServer {private registry: McpServiceRegistry;private httpServer: express.Express;constructor(config: {name: string;description: string;version: string;redisUrl: string;port: number;}) {super({name: config.name,description: config.description,version: config.version});// 創建服務注冊表this.registry = new McpServiceRegistry(config.redisUrl);// 創建HTTP服務器this.httpServer = createRegistryServer(this.registry);// 啟動HTTP服務器this.httpServer.listen(config.port, () => {console.log(`注冊表服務器運行在端口 ${config.port}`);});// 作為MCP資源暴露服務注冊表this.exposeRegistryAsResource();}// 將注冊表作為MCP資源暴露private exposeRegistryAsResource() {// 定義服務實例資源模板this.registerResourceTemplate({name: "serviceInstances",description: "MCP服務實例資源",schema: {type: "object",properties: {id: { type: "string" },serviceName: { type: "string" },host: { type: "string" },port: { type: "number" },status: { type: "string", enum: ["UP", "DOWN", "STARTING", "OUT_OF_SERVICE"] },metadata: { type: "object" },healthCheckUrl: { type: "string" },registrationTime: { type: "number" },lastUpdateTime: { type: "number" }},required: ["id", "serviceName", "host",