【MCP Node.js SDK 全棧進階指南】高級篇(1):MCP多服務器協作架構

隨著業務規模的不斷擴大和系統復雜度的提升,單一服務器架構往往無法滿足高并發、高可用性和彈性擴展的需求。在MCP生態系統中,多服務器協作架構成為構建大規模應用的必然選擇。本文將深入探討MCP TypeScript-SDK在多服務器環境下的部署、協作和管理,以及如何構建高可用、高性能、易擴展的分布式MCP應用。

目錄

  1. 多服務器架構基礎
    1.1 MCP多服務器架構概述
    1.2 分布式系統的挑戰與解決方案
    1.3 MCP服務器分類與職責劃分
    1.4 多服務器架構的常見模式

  2. 服務發現與路由
    2.1 服務注冊與發現機制
    2.2 基于DNS的服務發現
    2.3 動態路由與負載均衡
    2.4 服務健康檢查與自動恢復

  3. 狀態同步與一致性
    3.1 分布式狀態管理策略
    3.2 事件驅動的狀態同步
    3.3 實現最終一致性
    3.4 數據分區與沖突解決

  4. 負載均衡與容錯設計
    4.1 負載均衡策略與實現
    4.2 故障檢測與自動恢復
    4.3 優雅降級與熔斷機制
    4.4 構建高可用MCP集群

1. 多服務器架構基礎

在深入探討MCP多服務器協作之前,我們需要先了解多服務器架構的核心概念、挑戰以及MCP SDK提供的解決方案。

1.1 MCP多服務器架構概述

MCP(Model Context Protocol)作為一種為大型語言模型(LLM)交互設計的協議,其服務架構需要適應復雜多變的業務場景和負載模式。多服務器MCP架構是指將MCP服務分布在多個物理或虛擬服務器上,通過協作方式提供統一的服務能力。

// MCP服務器實例類型
interface McpServerInstance {id: string;            // 服務器唯一標識host: string;          // 主機地址port: number;          // 端口號role: 'master' | 'worker' | 'specialized'; // 服務器角色status: 'online' | 'offline' | 'degraded'; // 服務器狀態capacity: {            // 服務器容量信息maxConcurrentRequests: number;currentLoad: number;};features: string[];    // 支持的特性startTime: Date;       // 啟動時間
}// 多服務器集群配置
interface McpClusterConfig {serviceName: string;   // 服務名稱serviceVersion: string;// 服務版本discoveryMethod: 'static' | 'dns' | 'registry'; // 服務發現方式heartbeatInterval: number; // 心跳間隔syncStrategy: 'event-based' | 'polling' | 'hybrid'; // 同步策略loadBalanceStrategy: 'round-robin' | 'least-connections' | 'consistent-hash'; // 負載均衡策略failoverPolicy: {      // 故障轉移策略retryAttempts: number;retryDelay: number;circuitBreakerThreshold: number;};
}

多服務器MCP架構具有以下幾個核心特點:

  1. 水平擴展性:通過增加服務器數量來線性提升系統整體處理能力
  2. 高可用性:即使部分服務器故障,系統整體仍可持續提供服務
  3. 負載分散:將請求負載分散到多個服務器,避免單點壓力過大
  4. 資源隔離:可按業務功能或客戶需求隔離資源,提高安全性和穩定性
  5. 靈活部署:支持混合云、多區域、邊緣計算等多樣化部署模式

1.2 分布式系統的挑戰與解決方案

構建分布式MCP服務面臨一系列挑戰,MCP TypeScript-SDK針對這些挑戰提供了相應的解決方案:

挑戰表現MCP SDK解決方案
網絡不可靠網絡延遲、分區、丟包重試機制、異步通信、斷線重連
一致性問題數據不一致、沖突事件驅動同步、版本控制、沖突解決策略
服務協調服務發現、路由服務注冊表、服務健康檢查、動態路由
故障處理節點故障、服務降級故障檢測、自動恢復、熔斷機制
性能瓶頸請求擁塞、資源競爭負載均衡、請求限流、資源隔離
// MCP服務器故障處理配置
interface McpFailoverConfig {detection: {method: 'heartbeat' | 'ping' | 'health-endpoint';interval: number; // 檢測間隔timeout: number;  // 超時時間thresholds: {warning: number; // 警告閾值critical: number; // 臨界閾值}};recovery: {strategy: 'restart' | 'replace' | 'redirect';cooldownPeriod: number; // 冷卻時間maxAttempts: number;    // 最大嘗試次數};notification: {channels: string[]; // 通知渠道templates: Record<string, string>; // 通知模板}
}

1.3 MCP服務器分類與職責劃分

在多服務器架構中,不同的MCP服務器可以承擔不同的角色和職責,實現功能分離和專業化:

1.3.1 按角色分類
  1. 主服務器(Master)

    • 管理集群狀態和配置
    • 協調跨服務器的資源分配
    • 監控集群健康狀態
    • 通常部署較少數量,配置較高
  2. 工作服務器(Worker)

    • 處理客戶端的實際請求
    • 執行MCP資源訪問和工具調用
    • 可大規模部署,構成系統處理能力的主體
    • 通常是無狀態的,便于橫向擴展
  3. 專用服務器(Specialized)

    • 專注于特定功能或業務場景
    • 例如:AI推理服務器、數據處理服務器等
    • 可根據需求定制化配置
    • 適合資源密集型或安全敏感型業務
// MCP服務器角色定義和職責配置
import { McpServer } from '@modelcontextprotocol/sdk';// 主服務器配置
const masterConfig = {name: "mcp-master-server",description: "MCP集群主服務器",version: "1.0.0",cluster: {role: "master",workers: ["worker-1", "worker-2", "worker-3"],electionStrategy: "fixed", // 固定主服務器stateSync: {method: "push",interval: 5000}}
};// 工作服務器配置
const workerConfig = {name: "mcp-worker-server",description: "MCP集群工作服務器",version: "1.0.0",cluster: {role: "worker",masterId: "master-1",maxConcurrentRequests: 1000,resourceCacheSize: 500,reportInterval: 2000}
};// 專用服務器配置
const specializedConfig = {name: "mcp-specialized-server",description: "MCP圖像處理專用服務器",version: "1.0.0",cluster: {role: "specialized",serviceType: "image-processing",supportedOperations: ["resize", "filter", "recognize"],resourceRequirements: {gpu: true,minMemory: "16G",cpuCores: 8}}
};
1.3.2 按功能分類
  1. API網關服務器

    • 請求路由和負載均衡
    • 認證和授權處理
    • 請求限流和緩存
    • 請求/響應轉換
  2. 資源服務器

    • 管理MCP資源模板
    • 處理資源訪問請求
    • 資源緩存和優化
    • 資源版本控制
  3. 工具服務器

    • 托管和執行MCP工具
    • 工具依賴管理
    • 工具執行環境隔離
    • 工具性能監控
  4. 狀態同步服務器

    • 維護全局狀態
    • 協調跨服務器狀態同步
    • 處理分布式事務
    • 解決數據沖突
// 按功能劃分的MCP服務器實現示例
import { McpServer } from '@modelcontextprotocol/sdk';
import express from 'express';
import { createProxyMiddleware } from 'http-proxy-middleware';// API網關服務器
const apiGatewayApp = express();
const apiGatewayServer = new McpServer({name: "mcp-api-gateway",description: "MCP API網關服務器",version: "1.0.0"
});// 路由配置
const routeConfig = {'/api/resources': { target: 'http://resource-server:3001', pathRewrite: {'^/api': ''} },'/api/tools': { target: 'http://tool-server:3002', pathRewrite: {'^/api': ''} }
};// 設置API代理
Object.entries(routeConfig).forEach(([path, config]) => {apiGatewayApp.use(path, createProxyMiddleware(config));
});// 資源服務器
const resourceServer = new McpServer({name: "mcp-resource-server",description: "MCP資源服務器",version: "1.0.0"
});// 注冊資源模板
resourceServer.registerResourceTemplate({name: "users",description: "用戶資源",schema: userSchema
});// 工具服務器
const toolServer = new McpServer({name: "mcp-tool-server",description: "MCP工具服務器",version: "1.0.0"
});// 注冊工具
toolServer.registerTool({name: "calculator",description: "計算工具",parameters: calculatorSchema,execute: async (params) => {// 計算邏輯return { result: calculateExpression(params.expression) };}
});

1.4 多服務器架構的常見模式

在實際應用中,MCP多服務器架構可以采用多種模式,根據業務需求和資源條件靈活選擇:

1.4.1 主從模式(Master-Slave)

最常見的分布式架構模式,一個主服務器協調多個從服務器的工作。

┌────────────┐      ┌────────────┐
│            │      │            │
│   Master   │?────?│   Slave 1  │
│            │      │            │
└────────────┘      └────────────┘▲│▼
┌────────────┐      ┌────────────┐
│            │      │            │
│   Slave 2  │?────?│   Slave 3  │
│            │      │            │
└────────────┘      └────────────┘
// 主從模式實現示例
import { McpServer, EventEmitter } from '@modelcontextprotocol/sdk';
import { createClient } from 'redis';// 創建Redis客戶端用于服務器間通信
const pubClient = createClient({ url: process.env.REDIS_URL });
const subClient = pubClient.duplicate();// 主服務器實現
class MasterMcpServer extends McpServer {private slaves: Map<string, SlaveInfo> = new Map();constructor(config) {super(config);// 訂閱從服務器狀態更新subClient.subscribe('slave-status', (message) => {const slaveStatus = JSON.parse(message);this.updateSlaveStatus(slaveStatus);});// 定期檢查從服務器健康狀態setInterval(() => this.checkSlavesHealth(), 10000);}// 注冊從服務器registerSlave(slaveId: string, info: SlaveInfo) {this.slaves.set(slaveId, { ...info, lastHeartbeat: Date.now() });this.emit('slave-registered', { slaveId, info });}// 更新從服務器狀態updateSlaveStatus(status: SlaveStatus) {const slave = this.slaves.get(status.slaveId);if (slave) {this.slaves.set(status.slaveId, { ...slave, ...status, lastHeartbeat: Date.now() });}}// 檢查從服務器健康狀態checkSlavesHealth() {const now = Date.now();for (const [slaveId, info] of this.slaves.entries()) {if (now - info.lastHeartbeat > 30000) { // 30秒無心跳this.emit('slave-offline', { slaveId });this.slaves.delete(slaveId);}}}
}// 從服務器實現
class SlaveMcpServer extends McpServer {private masterId: string;constructor(config) {super(config);this.masterId = config.masterId;// 定期向主服務器發送心跳setInterval(() => this.sendHeartbeat(), 5000);// 訂閱主服務器指令subClient.subscribe(`slave-command:${this.id}`, (message) => {this.handleMasterCommand(JSON.parse(message));});}// 發送心跳sendHeartbeat() {const status = {slaveId: this.id,load: this.getCurrentLoad(),memory: process.memoryUsage(),status: 'online'};pubClient.publish('slave-status', JSON.stringify(status));}// 處理主服務器指令handleMasterCommand(command) {switch (command.type) {case 'restart':this.restart();break;case 'update-config':this.updateConfig(command.config);break;// 其他指令處理...}}
}
1.4.2 去中心化模式(Peer-to-Peer)

所有服務器地位平等,通過協作提供服務,無單點故障風險。

┌────────────┐      ┌────────────┐
│            │?────?│            │
│   Node 1   │      │   Node 2   │
│            │?────?│            │
└────────────┘      └────────────┘▲  ▲               ▲│  └───────┐   ┌───┘│          ▼   ▼
┌────────────┐      ┌────────────┐
│            │?────?│            │
│   Node 3   │      │   Node 4   │
│            │?────?│            │
└────────────┘      └────────────┘
// 去中心化模式實現示例
import { McpServer } from '@modelcontextprotocol/sdk';
import * as IPFS from 'ipfs-core';
import { v4 as uuidv4 } from 'uuid';// P2P MCP服務器
class P2PMcpServer extends McpServer {private ipfs;private peerId: string;private peers: Map<string, PeerInfo> = new Map();private eventTopic = 'mcp-p2p-events';constructor(config) {super(config);this.peerId = uuidv4();this.initializeP2P();}// 初始化P2P網絡async initializeP2P() {// 創建IPFS節點this.ipfs = await IPFS.create();// 獲取本節點IDconst nodeInfo = await this.ipfs.id();console.log(`P2P節點ID: ${nodeInfo.id}`);// 訂閱事件主題await this.ipfs.pubsub.subscribe(this.eventTopic, (msg) => {this.handleP2PEvent(msg);});// 定期發布狀態setInterval(() => this.broadcastStatus(), 5000);// 發布上線事件this.broadcastEvent({type: 'node-online',peerId: this.peerId,timestamp: Date.now(),info: {resources: this.getResourceNames(),tools: this.getToolNames(),capacity: this.getCapacity()}});}// 處理P2P事件handleP2PEvent(message) {try {const event = JSON.parse(message.data.toString());// 忽略自己發出的消息if (event.peerId === this.peerId) return;switch (event.type) {case 'node-online':this.addPeer(event.peerId, event.info);// 向新節點發送歡迎消息this.sendDirectEvent(event.peerId, {type: 'welcome',peerId: this.peerId,timestamp: Date.now()});break;case 'node-offline':this.removePeer(event.peerId);break;case 'status-update':this.updatePeerStatus(event.peerId, event.status);break;case 'resource-request':this.handleResourceRequest(event);break;case 'tool-execution':this.handleToolExecution(event);break;}} catch (error) {console.error('處理P2P事件錯誤:', error);}}// 廣播事件到所有節點async broadcastEvent(event) {await this.ipfs.pubsub.publish(this.eventTopic,Buffer.from(JSON.stringify(event)));}// 發送事件到特定節點async sendDirectEvent(targetPeerId, event) {const peer = this.peers.get(targetPeerId);if (!peer || !peer.directTopic) return;await this.ipfs.pubsub.publish(peer.directTopic,Buffer.from(JSON.stringify(event)));}// 定期廣播狀態broadcastStatus() {this.broadcastEvent({type: 'status-update',peerId: this.peerId,timestamp: Date.now(),status: {load: this.getCurrentLoad(),memory: process.memoryUsage(),uptime: process.uptime()}});}// 添加對等節點addPeer(peerId: string, info: any) {this.peers.set(peerId, {...info,directTopic: `mcp-p2p-direct:${peerId}`,lastSeen: Date.now()});// 訂閱此節點的直接通信主題this.ipfs.pubsub.subscribe(`mcp-p2p-direct:${this.peerId}`, (msg) => {this.handleDirectMessage(msg);});}// 移除對等節點removePeer(peerId: string) {this.peers.delete(peerId);}// 更新對等節點狀態updatePeerStatus(peerId: string, status: any) {const peer = this.peers.get(peerId);if (peer) {this.peers.set(peerId, { ...peer, ...status, lastSeen: Date.now() });}}
}
1.4.3 微服務模式(Microservices)

將MCP功能分解為多個獨立服務,每個服務專注于特定功能域。

┌─────────────────────────────────────────────┐
│                 API Gateway                  │
└─────────────┬─────────────┬─────────────────┘│             │                 ┌─────────▼───┐   ┌─────▼───────┐   ┌─────────────┐│  Resource   │   │    Tool     │   │  Identity   ││   Service   │   │   Service   │   │   Service   │└─────────────┘   └─────────────┘   └─────────────┘│             │                 │┌─────────▼───┐   ┌─────▼───────┐   ┌─────▼───────┐│   Storage   │   │ Execution   │   │  Security   ││   Service   │   │  Service    │   │   Service   │└─────────────┘   └─────────────┘   └─────────────┘
// 微服務模式實現示例
import { McpServer } from '@modelcontextprotocol/sdk';
import express from 'express';
import axios from 'axios';
import { MongoClient } from 'mongodb';
import { v4 as uuidv4 } from 'uuid';
import amqp from 'amqplib';// 服務發現客戶端
class ServiceDiscovery {private serviceRegistry: Map<string, ServiceInfo[]> = new Map();private consulUrl: string;constructor(consulUrl: string) {this.consulUrl = consulUrl;this.refreshRegistry();setInterval(() => this.refreshRegistry(), 30000);}// 刷新服務注冊表async refreshRegistry() {try {const response = await axios.get(`${this.consulUrl}/v1/catalog/services`);for (const [serviceName, tags] of Object.entries(response.data)) {if (tags.includes('mcp')) {const instances = await this.getServiceInstances(serviceName);this.serviceRegistry.set(serviceName, instances);}}} catch (error) {console.error('刷新服務注冊表失敗:', error);}}// 獲取服務實例列表async getServiceInstances(serviceName: string): Promise<ServiceInfo[]> {try {const response = await axios.get(`${this.consulUrl}/v1/health/service/${serviceName}?passing=true`);return response.data.map(entry => ({id: entry.Service.ID,address: entry.Service.Address,port: entry.Service.Port,tags: entry.Service.Tags,meta: entry.Service.Meta}));} catch (error) {console.error(`獲取服務${serviceName}實例失敗:`, error);return [];}}// 獲取服務實例getService(serviceName: string, tags: string[] = []): ServiceInfo | null {const instances = this.serviceRegistry.get(serviceName) || [];// 篩選滿足標簽要求的實例const eligibleInstances = instances.filter(instance => tags.every(tag => instance.tags.includes(tag)));if (eligibleInstances.length === 0) return null;// 簡單負載均衡:隨機選擇一個實例const randomIndex = Math.floor(Math.random() * eligibleInstances.length);return eligibleInstances[randomIndex];}
}// MCP資源服務
class ResourceMcpService {private server: McpServer;private app = express();private mongoClient: MongoClient;private serviceId: string;private discovery: ServiceDiscovery;private messageBroker: amqp.Connection;private channel: amqp.Channel;constructor(config) {this.server = new McpServer({name: "mcp-resource-service",description: "MCP資源微服務",version: "1.0.0"});this.serviceId = uuidv4();this.discovery = new ServiceDiscovery(config.consulUrl);// 初始化Express中間件this.initializeExpress();// 連接MongoDBthis.connectMongo(config.mongoUrl);// 連接消息代理this.connectMessageBroker(config.rabbitMqUrl);// 注冊服務this.registerService(config.consulUrl);}// 初始化ExpressinitializeExpress() {this.app.use(express.json());// 資源API端點this.app.get('/resources/:name', this.getResource.bind(this));this.app.post('/resources/:name', this.createResource.bind(this));this.app.put('/resources/:name/:id', this.updateResource.bind(this));this.app.delete('/resources/:name/:id', this.deleteResource.bind(this));// 健康檢查端點this.app.get('/health', (req, res) => {res.status(200).json({ status: 'healthy', serviceId: this.serviceId });});// 啟動服務器const port = process.env.PORT || 3000;this.app.listen(port, () => {console.log(`資源服務運行在端口 ${port}`);});}// 連接MongoDBasync connectMongo(mongoUrl: string) {try {this.mongoClient = new MongoClient(mongoUrl);await this.mongoClient.connect();console.log('已連接到MongoDB');} catch (error) {console.error('MongoDB連接失敗:', error);process.exit(1);}}// 連接消息代理async connectMessageBroker(rabbitMqUrl: string) {try {this.messageBroker = await amqp.connect(rabbitMqUrl);this.channel = await this.messageBroker.createChannel();// 聲明交換機和隊列await this.channel.assertExchange('mcp-events', 'topic', { durable: true });const { queue } = await this.channel.assertQueue(`resource-service-${this.serviceId}`,{ exclusive: true });// 綁定感興趣的主題await this.channel.bindQueue(queue, 'mcp-events', 'resource.*');// 消費消息this.channel.consume(queue, (msg) => {if (msg) {this.handleMessage(msg);this.channel.ack(msg);}});console.log('已連接到消息代理');} catch (error) {console.error('消息代理連接失敗:', error);}}// 處理消息handleMessage(msg: amqp.ConsumeMessage) {try {const content = JSON.parse(msg.content.toString());const routingKey = msg.fields.routingKey;console.log(`收到消息: ${routingKey}`, content);// 根據路由鍵處理不同類型的消息switch (routingKey) {case 'resource.created':this.handleResourceCreated(content);break;case 'resource.updated':this.handleResourceUpdated(content);break;case 'resource.deleted':this.handleResourceDeleted(content);break;}} catch (error) {console.error('處理消息錯誤:', error);}}// 發布事件publishEvent(routingKey: string, data: any) {this.channel.publish('mcp-events',routingKey,Buffer.from(JSON.stringify({...data,serviceId: this.serviceId,timestamp: Date.now()})));}// API處理器async getResource(req, res) {try {const { name } = req.params;const filter = req.query.filter ? JSON.parse(req.query.filter) : {};const db = this.mongoClient.db('mcp-resources');const collection = db.collection(name);const resources = await collection.find(filter).toArray();res.json({ success: true, data: resources });} catch (error) {res.status(500).json({success: false,error: error.message});}}async createResource(req, res) {try {const { name } = req.params;const resourceData = req.body;const db = this.mongoClient.db('mcp-resources');const collection = db.collection(name);const result = await collection.insertOne({...resourceData,_id: uuidv4(),createdAt: new Date(),updatedAt: new Date()});// 發布資源創建事件this.publishEvent('resource.created', {resourceName: name,resourceId: result.insertedId,data: resourceData});res.status(201).json({success: true,data: { id: result.insertedId }});} catch (error) {res.status(500).json({success: false,error: error.message});}}// 其他API處理器...// 注冊服務到Consulasync registerService(consulUrl: string) {try {await axios.put(`${consulUrl}/v1/agent/service/register`, {ID: this.serviceId,Name: 'mcp-resource-service',Tags: ['mcp', 'resource'],Address: process.env.SERVICE_HOST || 'localhost',Port: parseInt(process.env.PORT || '3000'),Check: {HTTP: `http://${process.env.SERVICE_HOST || 'localhost'}:${process.env.PORT || '3000'}/health`,Interval: '15s',Timeout: '5s'}});console.log(`服務已注冊,ID: ${this.serviceId}`);// 注冊服務注銷鉤子process.on('SIGINT', () => this.deregisterService(consulUrl));process.on('SIGTERM', () => this.deregisterService(consulUrl));} catch (error) {console.error('服務注冊失敗:', error);}}// 注銷服務async deregisterService(consulUrl: string) {try {await axios.put(`${consulUrl}/v1/agent/service/deregister/${this.serviceId}`);console.log('服務已注銷');process.exit(0);} catch (error) {console.error('服務注銷失敗:', error);process.exit(1);}}
}

多服務器架構為MCP應用提供了更高的可用性、可伸縮性和靈活性,但同時也帶來了系統復雜度的提升。在后續章節中,我們將深入探討如何應對這些挑戰,構建穩健的多服務器MCP系統。

2. 服務發現與路由

在多服務器MCP架構中,服務發現和路由機制是確保系統高效運行的關鍵組件。它們使客戶端能夠動態找到可用的服務實例,并將請求路由到最合適的服務器上。

2.1 服務注冊與發現機制

服務發現的核心是讓服務消費者能夠在不需要硬編碼服務提供者地址的情況下,動態找到并調用所需的服務。在MCP系統中,服務發現機制通常涉及以下幾個關鍵組件:

2.1.1 服務注冊表

服務注冊表是服務發現的核心數據存儲,記錄了所有可用服務實例的關鍵信息:

// 服務注冊表接口
interface ServiceRegistry {// 注冊服務實例register(instance: ServiceInstance): Promise<void>;// 注銷服務實例deregister(instanceId: string): Promise<void>;// 獲取指定服務的所有實例getInstances(serviceName: string): Promise<ServiceInstance[]>;// 查詢符合特定條件的服務實例findServices(query: ServiceQuery): Promise<ServiceInstance[]>;// 監聽服務變更watchService(serviceName: string, callback: (instances: ServiceInstance[]) => void): void;
}// 服務實例數據結構
interface ServiceInstance {id: string;               // 實例唯一標識serviceName: string;      // 服務名稱host: string;             // 主機地址port: number;             // 端口號status: 'UP' | 'DOWN' | 'STARTING' | 'OUT_OF_SERVICE'; // 實例狀態metadata: Record<string, string>; // 元數據,如版本、環境等healthCheckUrl?: string;  // 健康檢查URLregistrationTime: number; // 注冊時間lastUpdateTime: number;   // 最后更新時間
}
2.1.2 基于MCP實現的分布式服務注冊表

以下是一個使用MCP服務器實現的分布式服務注冊表示例:

import { McpServer } from '@modelcontextprotocol/sdk';
import { createClient } from 'redis';
import express from 'express';// 基于Redis的MCP服務注冊表實現
class McpServiceRegistry implements ServiceRegistry {private redisClient;private readonly KEY_PREFIX = 'mcp:registry:';private readonly TTL = 60; // 服務記錄過期時間(秒)constructor(redisUrl: string) {this.redisClient = createClient({ url: redisUrl });this.redisClient.connect();}// 注冊服務實例async register(instance: ServiceInstance): Promise<void> {const key = `${this.KEY_PREFIX}${instance.serviceName}:${instance.id}`;const now = Date.now();// 更新注冊時間或最后更新時間instance.lastUpdateTime = now;if (!instance.registrationTime) {instance.registrationTime = now;}// 將服務實例信息存儲到Redisawait this.redisClient.set(key, JSON.stringify(instance), { EX: this.TTL });// 添加到服務名稱集合中,方便按服務名查詢await this.redisClient.sAdd(`${this.KEY_PREFIX}${instance.serviceName}`, instance.id);}// 注銷服務實例async deregister(instanceId: string): Promise<void> {// 獲取服務實例信息const pattern = `${this.KEY_PREFIX}*:${instanceId}`;const keys = await this.redisClient.keys(pattern);if (keys.length > 0) {const key = keys[0];const serviceName = key.split(':')[2];// 從Redis中刪除服務實例await this.redisClient.del(key);// 從服務名稱集合中移除await this.redisClient.sRem(`${this.KEY_PREFIX}${serviceName}`, instanceId);}}// 獲取指定服務的所有實例async getInstances(serviceName: string): Promise<ServiceInstance[]> {const serviceKey = `${this.KEY_PREFIX}${serviceName}`;const instanceIds = await this.redisClient.sMembers(serviceKey);if (instanceIds.length === 0) {return [];}// 批量獲取所有服務實例信息const keys = instanceIds.map(id => `${this.KEY_PREFIX}${serviceName}:${id}`);const instancesData = await this.redisClient.mGet(keys);// 過濾并解析有效的實例數據return instancesData.filter(Boolean).map(data => JSON.parse(data)).filter(instance => instance.status !== 'DOWN');}// 查詢符合特定條件的服務實例async findServices(query: ServiceQuery): Promise<ServiceInstance[]> {const { serviceName, status, metadata } = query;// 獲取所有匹配服務名的實例const instances = await this.getInstances(serviceName);// 應用過濾條件return instances.filter(instance => {// 狀態過濾if (status && instance.status !== status) {return false;}// 元數據過濾if (metadata) {for (const [key, value] of Object.entries(metadata)) {if (instance.metadata[key] !== value) {return false;}}}return true;});}// 監聽服務變更watchService(serviceName: string, callback: (instances: ServiceInstance[]) => void): void {// 創建訂閱客戶端const subClient = this.redisClient.duplicate();// 訂閱服務變更事件subClient.subscribe(`${this.KEY_PREFIX}${serviceName}:changes`, (message) => {this.getInstances(serviceName).then(callback);});}
}// 服務注冊表API服務器
function createRegistryServer(registry: McpServiceRegistry): express.Express {const app = express();app.use(express.json());// 服務注冊端點app.post('/services', async (req, res) => {try {const instance = req.body as ServiceInstance;await registry.register(instance);res.status(201).json({ success: true, message: '服務實例已注冊' });} catch (error) {res.status(500).json({ success: false, error: error.message });}});// 服務注銷端點app.delete('/services/:instanceId', async (req, res) => {try {await registry.deregister(req.params.instanceId);res.status(200).json({ success: true, message: '服務實例已注銷' });} catch (error) {res.status(500).json({ success: false, error: error.message });}});// 獲取服務實例端點app.get('/services/:serviceName', async (req, res) => {try {const instances = await registry.getInstances(req.params.serviceName);res.status(200).json({ success: true, data: instances });} catch (error) {res.status(500).json({ success: false, error: error.message });}});// 服務查詢端點app.get('/services', async (req, res) => {try {const query = {serviceName: req.query.serviceName as string,status: req.query.status as 'UP' | 'DOWN' | 'STARTING' | 'OUT_OF_SERVICE',metadata: req.query.metadata ? JSON.parse(req.query.metadata as string) : undefined};const instances = await registry.findServices(query);res.status(200).json({ success: true, data: instances });} catch (error) {res.status(500).json({ success: false, error: error.message });}});return app;
}// 使用MCP服務器封裝服務注冊表
class McpRegistryServer extends McpServer {private registry: McpServiceRegistry;private httpServer: express.Express;constructor(config: {name: string;description: string;version: string;redisUrl: string;port: number;}) {super({name: config.name,description: config.description,version: config.version});// 創建服務注冊表this.registry = new McpServiceRegistry(config.redisUrl);// 創建HTTP服務器this.httpServer = createRegistryServer(this.registry);// 啟動HTTP服務器this.httpServer.listen(config.port, () => {console.log(`注冊表服務器運行在端口 ${config.port}`);});// 作為MCP資源暴露服務注冊表this.exposeRegistryAsResource();}// 將注冊表作為MCP資源暴露private exposeRegistryAsResource() {// 定義服務實例資源模板this.registerResourceTemplate({name: "serviceInstances",description: "MCP服務實例資源",schema: {type: "object",properties: {id: { type: "string" },serviceName: { type: "string" },host: { type: "string" },port: { type: "number" },status: { type: "string", enum: ["UP", "DOWN", "STARTING", "OUT_OF_SERVICE"] },metadata: { type: "object" },healthCheckUrl: { type: "string" },registrationTime: { type: "number" },lastUpdateTime: { type: "number" }},required: ["id", "serviceName", "host", 

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/81419.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/81419.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/81419.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

git 修改用戶名和郵箱

在 Git 中修改用戶名和郵箱地址是常見的任務&#xff0c;這可以確保你的提交記錄使用正確的身份信息。你可以通過簡單的命令來完成這一操作。 全局配置 修改全局用戶名 要修改全局的用戶名&#xff0c;請執行以下命令&#xff1a; git config --global user.name "New…

[算法學習]——通過RMQ與dfs序實現O(1)求LCA(含封裝板子)

每周五篇博客&#xff1a;&#xff08;3/5&#xff09; 碎碎念 其實不是我想多水一篇博客&#xff0c;本來這篇是歐拉序的博客&#xff0c;結果dfs序也是可以O1求lca的&#xff0c;而且常數更優&#xff0c;結果就變成這樣了。。。 前置知識 [算法學習]——dfs序 思想 分…

spark local模式

Spark Local 模式是一種在單臺機器上運行 Spark 應用程序的模式&#xff0c;無需搭建分布式集群&#xff0c;適合開發調試、學習以及運行小規模數據處理任務。以下為你詳細介紹該模式&#xff1a; 特點 簡易性&#xff1a;無需額外配置分布式集群&#xff0c;在單機上就能快速…

用 RxSwift 實現 UITableView 的響應式綁定(超實用示例)

目錄 前言 一、環境準備 1.安裝 RxSwift 和 RxCocoa 2.導入模塊 二、實現一個簡單的UITableView 1.實現一個簡單的 UITableView 1.實現步驟 1.我們聲明一個ViewModel 2.ViewModel和UITableView 綁定 2.實現 UITableView 的代理方法 三、處理點擊事件 前言 在 iOS 開發…

【C++】通過紅黑樹封裝map和set

前言&#xff1a; 通過之前的學習&#xff0c;我們已經學會了紅黑樹和map、set。這次我們要實現自己的map和set&#xff0c;對&#xff0c;使用紅黑樹進行封裝&#xff01; 當然&#xff0c;紅黑樹內容這里就不在贅述&#xff0c;我們會復用紅黑樹的代碼&#xff0c;所以先將…

非凸科技受邀出席AI SPARK活動,共探生成式AI驅動金融新生態

4月19日&#xff0c;由AI SPARK社區主辦的“生成式AI創新與應用構建”主題沙龍在北京舉行。活動聚焦生成式AI的技術突破與產業融合&#xff0c;圍繞大模型優化、多模態應用、存內計算等前沿議題展開深度探討。非凸科技受邀出席并發表主題演講&#xff0c;深入解析金融垂直大模型…

【Java IO流】IO流詳解

參考筆記&#xff1a;【Java基礎-3】吃透Java IO&#xff1a;字節流、字符流、緩沖流_javaio-CSDN博客 目錄 1.IO流簡介 1.1 什么是IO流&#xff1f; 1.2 IO流的分類 1.3 字符流和字節流的其他區別 1.4 Java IO流體系圖 2.字符編碼詳解 3. Java的char類型與 Unicode、U…

驅動開發系列56 - Linux Graphics QXL顯卡驅動代碼分析(三)顯示模式設置

一:概述 如之前介紹,在qxl_pci_probe 中會調用 qxl_modeset_init 來初始化屏幕分辨率和刷新率,本文詳細看下 qxl_modeset_init 的實現過程。即QXL設備的顯示模式設置,是如何配置CRTC,Encoder,Connector 的以及創建和更新幀緩沖區的。 二:qxl_modeset_init 分析 in…

Vue3開發常見性能問題知多少

文章目錄 1 常見性能優化瓶頸及原因1.1 響應式數據的過度使用1.2 虛擬 DOM 的頻繁更新1.3 組件渲染的冗余1.4 大列表渲染的性能問題1.5 計算屬性和偵聽器的濫用1.6 事件處理函數的頻繁綁定1.7 異步組件的加載性能2 解決方案與優化技巧2.1 合理使用響應式數據2.2 優化虛擬 DOM 更…

Rust Ubuntu下編譯生成環境win程序踩坑指南

前言&#xff1a; 1&#xff0c;公司要給一線搞一個升級程序&#xff0c;需要在win下跑。 之前都是找開發總監幫忙&#xff0c;但是他最近比較忙。就讓我自己搞。有了下文.。說來慚愧&#xff0c;之前寫過一篇ubuntu下編譯windows的文章。里面的demo就一句話 fuck world。依賴…

openharmony 4.1 運行busybox工具包(保姆教程)

1.下載 鏈接&#xff1a;Index of /downloads/binaries 進入其中后&#xff0c;找到 挑選適合你系統架構的版本&#xff0c;例如我這邊是 https://busybox.net/downloads/binaries/1.31.0-defconfig-multiarch-musl/busybox-armv7r 右鍵復制鏈接 打開迅雷&#xff0c;直接粘…

算法四 習題 1.3

數組實現棧 #include <iostream> #include <vector> #include <stdexcept> using namespace std;class MyStack { private:vector<int> data; // 用于存儲棧元素的數組public:// 構造函數MyStack() {}// 入棧操作void push(int val) {data.push_back…

GD32F407單片機開發入門(十七)內部RTC實時時鐘及實戰含源碼

文章目錄 一.概要二.RTC基本特點三.GD32單片機RTC內部結構圖四.配置一個RTC走秒例程五.工程源代碼下載六.小結 一.概要 RTC&#xff08;Real-Time Clock&#xff09;是一種用于追蹤和記錄實際時間的時鐘系統。RTC模塊提供了一個包含日期&#xff08;年/月/日&#xff09;和時間…

新能源汽車運動控制器核心芯片選型與優化:MCU、DCDC與CANFD協同設計

摘要&#xff1a;隨著新能源汽車產業的迅猛發展&#xff0c;汽車運動控制器的性能和可靠性面臨著更高的要求。本文深入探討了新能源汽車運動控制器中MCU&#xff08;微控制單元&#xff09;、DCDC電源管理芯片和CANFD總線通信芯片的選型要點、優化策略及其協同設計方案。通過綜…

2.maven 手動安裝 jar包

1.背景 有的時候&#xff0c;maven倉庫無法下載&#xff0c;可以手動安裝。本文以pentaho-aggdesigner-algorithm-5.1.5-jhyde.jar為例。 2.預先準備 下載文件到本地指定位置。 2.1.安裝pom mvn install:install-file \-Dfile/home/wind/tmp/pentaho-aggdesigner-5.1.5-jh…

OpenCV 圖形API(75)圖像與通道拼接函數-----將 4 個單通道圖像矩陣 (GMat) 合并為一個 4 通道的多通道圖像矩陣函數merge4()

操作系統&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 編程語言&#xff1a;C11 算法描述 由4個單通道矩陣創建一個4通道矩陣。 該函數將多個矩陣合并為一個單一的多通道矩陣。也就是說&#xff0c;輸出矩陣的每一個元素都是輸入矩陣對…

AI日報 · 2025年05月02日 | 再見GPT-4!OpenAI CEO 確認 GPT-4 已從 ChatGPT 界面正式移除

1、OpenAI CEO 確認 GPT-4 已從 ChatGPT 界面正式移除 在處理 GPT-4o 更新問題的同時&#xff0c;OpenAI CEO Sam Altman 于 5 月 1 日在 X 平臺發文&#xff0c;正式確認初代 GPT-4 模型已從 ChatGPT 主用戶界面中移除。此舉遵循了 OpenAI 此前公布的計劃&#xff0c;即在 4 …

patch命令在代碼管理中的應用

patch 是一個用于將差異文件&#xff08;補丁&#xff09;應用到源代碼的工具&#xff0c;常用于修復 bug、添加功能或調整代碼結構。在您提供的代碼中&#xff0c;patch 命令通過一系列補丁文件&#xff08;.patch&#xff09;修改了 open-amp 庫的源代碼。 patch 命令的核心作…

spring-ai集成langfuse

1、pom文件 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/POM/4.…

PyTorch 與 TensorFlow:深度學習框架的深度剖析與實戰對比

PyTorch 與 TensorFlow&#xff1a;深度學習框架的深度剖析與實戰對比 摘要 &#xff1a;本文深入對比 PyTorch 與 TensorFlow 兩大深度學習框架&#xff0c;從核心架構、優缺點、適用場景等多維度剖析&#xff0c;結合實例講解&#xff0c;幫助開發者清晰理解兩者特性&#x…