【MCP Node.js SDK 全栈进阶指南】高级篇(1):MCP多服务器协作架构

随着业务规模的不断扩大和系统复杂度的提升,单一服务器架构往往无法满足高并发、高可用性和弹性扩展的需求。在MCP生态系统中,多服务器协作架构成为构建大规模应用的必然选择。本文将深入探讨MCP TypeScript-SDK在多服务器环境下的部署、协作和管理,以及如何构建高可用、高性能、易扩展的分布式MCP应用。

目录

  1. 多服务器架构基础
    1.1 MCP多服务器架构概述
    1.2 分布式系统的挑战与解决方案
    1.3 MCP服务器分类与职责划分
    1.4 多服务器架构的常见模式

  2. 服务发现与路由
    2.1 服务注册与发现机制
    2.2 基于DNS的服务发现
    2.3 动态路由与负载均衡
    2.4 服务健康检查与自动恢复

  3. 状态同步与一致性
    3.1 分布式状态管理策略
    3.2 事件驱动的状态同步
    3.3 实现最终一致性
    3.4 数据分区与冲突解决

  4. 负载均衡与容错设计
    4.1 负载均衡策略与实现
    4.2 故障检测与自动恢复
    4.3 优雅降级与熔断机制
    4.4 构建高可用MCP集群

1. 多服务器架构基础

在深入探讨MCP多服务器协作之前,我们需要先了解多服务器架构的核心概念、挑战以及MCP SDK提供的解决方案。

1.1 MCP多服务器架构概述

MCP(Model Context Protocol)作为一种为大型语言模型(LLM)交互设计的协议,其服务架构需要适应复杂多变的业务场景和负载模式。多服务器MCP架构是指将MCP服务分布在多个物理或虚拟服务器上,通过协作方式提供统一的服务能力。

// MCP服务器实例类型
interface McpServerInstance {id: string;            // 服务器唯一标识host: string;          // 主机地址port: number;          // 端口号role: 'master' | 'worker' | 'specialized'; // 服务器角色status: 'online' | 'offline' | 'degraded'; // 服务器状态capacity: {            // 服务器容量信息maxConcurrentRequests: number;currentLoad: number;};features: string[];    // 支持的特性startTime: Date;       // 启动时间
}// 多服务器集群配置
interface McpClusterConfig {serviceName: string;   // 服务名称serviceVersion: string;// 服务版本discoveryMethod: 'static' | 'dns' | 'registry'; // 服务发现方式heartbeatInterval: number; // 心跳间隔syncStrategy: 'event-based' | 'polling' | 'hybrid'; // 同步策略loadBalanceStrategy: 'round-robin' | 'least-connections' | 'consistent-hash'; // 负载均衡策略failoverPolicy: {      // 故障转移策略retryAttempts: number;retryDelay: number;circuitBreakerThreshold: number;};
}

多服务器MCP架构具有以下几个核心特点:

  1. 水平扩展性:通过增加服务器数量来线性提升系统整体处理能力
  2. 高可用性:即使部分服务器故障,系统整体仍可持续提供服务
  3. 负载分散:将请求负载分散到多个服务器,避免单点压力过大
  4. 资源隔离:可按业务功能或客户需求隔离资源,提高安全性和稳定性
  5. 灵活部署:支持混合云、多区域、边缘计算等多样化部署模式

1.2 分布式系统的挑战与解决方案

构建分布式MCP服务面临一系列挑战,MCP TypeScript-SDK针对这些挑战提供了相应的解决方案:

挑战表现MCP SDK解决方案
网络不可靠网络延迟、分区、丢包重试机制、异步通信、断线重连
一致性问题数据不一致、冲突事件驱动同步、版本控制、冲突解决策略
服务协调服务发现、路由服务注册表、服务健康检查、动态路由
故障处理节点故障、服务降级故障检测、自动恢复、熔断机制
性能瓶颈请求拥塞、资源竞争负载均衡、请求限流、资源隔离
// MCP服务器故障处理配置
interface McpFailoverConfig {detection: {method: 'heartbeat' | 'ping' | 'health-endpoint';interval: number; // 检测间隔timeout: number;  // 超时时间thresholds: {warning: number; // 警告阈值critical: number; // 临界阈值}};recovery: {strategy: 'restart' | 'replace' | 'redirect';cooldownPeriod: number; // 冷却时间maxAttempts: number;    // 最大尝试次数};notification: {channels: string[]; // 通知渠道templates: Record<string, string>; // 通知模板}
}

1.3 MCP服务器分类与职责划分

在多服务器架构中,不同的MCP服务器可以承担不同的角色和职责,实现功能分离和专业化:

1.3.1 按角色分类
  1. 主服务器(Master)

    • 管理集群状态和配置
    • 协调跨服务器的资源分配
    • 监控集群健康状态
    • 通常部署较少数量,配置较高
  2. 工作服务器(Worker)

    • 处理客户端的实际请求
    • 执行MCP资源访问和工具调用
    • 可大规模部署,构成系统处理能力的主体
    • 通常是无状态的,便于横向扩展
  3. 专用服务器(Specialized)

    • 专注于特定功能或业务场景
    • 例如:AI推理服务器、数据处理服务器等
    • 可根据需求定制化配置
    • 适合资源密集型或安全敏感型业务
// MCP服务器角色定义和职责配置
import { McpServer } from '@modelcontextprotocol/sdk';// 主服务器配置
const masterConfig = {name: "mcp-master-server",description: "MCP集群主服务器",version: "1.0.0",cluster: {role: "master",workers: ["worker-1", "worker-2", "worker-3"],electionStrategy: "fixed", // 固定主服务器stateSync: {method: "push",interval: 5000}}
};// 工作服务器配置
const workerConfig = {name: "mcp-worker-server",description: "MCP集群工作服务器",version: "1.0.0",cluster: {role: "worker",masterId: "master-1",maxConcurrentRequests: 1000,resourceCacheSize: 500,reportInterval: 2000}
};// 专用服务器配置
const specializedConfig = {name: "mcp-specialized-server",description: "MCP图像处理专用服务器",version: "1.0.0",cluster: {role: "specialized",serviceType: "image-processing",supportedOperations: ["resize", "filter", "recognize"],resourceRequirements: {gpu: true,minMemory: "16G",cpuCores: 8}}
};
1.3.2 按功能分类
  1. API网关服务器

    • 请求路由和负载均衡
    • 认证和授权处理
    • 请求限流和缓存
    • 请求/响应转换
  2. 资源服务器

    • 管理MCP资源模板
    • 处理资源访问请求
    • 资源缓存和优化
    • 资源版本控制
  3. 工具服务器

    • 托管和执行MCP工具
    • 工具依赖管理
    • 工具执行环境隔离
    • 工具性能监控
  4. 状态同步服务器

    • 维护全局状态
    • 协调跨服务器状态同步
    • 处理分布式事务
    • 解决数据冲突
// 按功能划分的MCP服务器实现示例
import { McpServer } from '@modelcontextprotocol/sdk';
import express from 'express';
import { createProxyMiddleware } from 'http-proxy-middleware';// API网关服务器
const apiGatewayApp = express();
const apiGatewayServer = new McpServer({name: "mcp-api-gateway",description: "MCP API网关服务器",version: "1.0.0"
});// 路由配置
const routeConfig = {'/api/resources': { target: 'http://resource-server:3001', pathRewrite: {'^/api': ''} },'/api/tools': { target: 'http://tool-server:3002', pathRewrite: {'^/api': ''} }
};// 设置API代理
Object.entries(routeConfig).forEach(([path, config]) => {apiGatewayApp.use(path, createProxyMiddleware(config));
});// 资源服务器
const resourceServer = new McpServer({name: "mcp-resource-server",description: "MCP资源服务器",version: "1.0.0"
});// 注册资源模板
resourceServer.registerResourceTemplate({name: "users",description: "用户资源",schema: userSchema
});// 工具服务器
const toolServer = new McpServer({name: "mcp-tool-server",description: "MCP工具服务器",version: "1.0.0"
});// 注册工具
toolServer.registerTool({name: "calculator",description: "计算工具",parameters: calculatorSchema,execute: async (params) => {// 计算逻辑return { result: calculateExpression(params.expression) };}
});

1.4 多服务器架构的常见模式

在实际应用中,MCP多服务器架构可以采用多种模式,根据业务需求和资源条件灵活选择:

1.4.1 主从模式(Master-Slave)

最常见的分布式架构模式,一个主服务器协调多个从服务器的工作。

┌────────────┐      ┌────────────┐
│            │      │            │
│   Master   │◄────►│   Slave 1  │
│            │      │            │
└────────────┘      └────────────┘▲│▼
┌────────────┐      ┌────────────┐
│            │      │            │
│   Slave 2  │◄────►│   Slave 3  │
│            │      │            │
└────────────┘      └────────────┘
// 主从模式实现示例
import { McpServer, EventEmitter } from '@modelcontextprotocol/sdk';
import { createClient } from 'redis';// 创建Redis客户端用于服务器间通信
const pubClient = createClient({ url: process.env.REDIS_URL });
const subClient = pubClient.duplicate();// 主服务器实现
class MasterMcpServer extends McpServer {private slaves: Map<string, SlaveInfo> = new Map();constructor(config) {super(config);// 订阅从服务器状态更新subClient.subscribe('slave-status', (message) => {const slaveStatus = JSON.parse(message);this.updateSlaveStatus(slaveStatus);});// 定期检查从服务器健康状态setInterval(() => this.checkSlavesHealth(), 10000);}// 注册从服务器registerSlave(slaveId: string, info: SlaveInfo) {this.slaves.set(slaveId, { ...info, lastHeartbeat: Date.now() });this.emit('slave-registered', { slaveId, info });}// 更新从服务器状态updateSlaveStatus(status: SlaveStatus) {const slave = this.slaves.get(status.slaveId);if (slave) {this.slaves.set(status.slaveId, { ...slave, ...status, lastHeartbeat: Date.now() });}}// 检查从服务器健康状态checkSlavesHealth() {const now = Date.now();for (const [slaveId, info] of this.slaves.entries()) {if (now - info.lastHeartbeat > 30000) { // 30秒无心跳this.emit('slave-offline', { slaveId });this.slaves.delete(slaveId);}}}
}// 从服务器实现
class SlaveMcpServer extends McpServer {private masterId: string;constructor(config) {super(config);this.masterId = config.masterId;// 定期向主服务器发送心跳setInterval(() => this.sendHeartbeat(), 5000);// 订阅主服务器指令subClient.subscribe(`slave-command:${this.id}`, (message) => {this.handleMasterCommand(JSON.parse(message));});}// 发送心跳sendHeartbeat() {const status = {slaveId: this.id,load: this.getCurrentLoad(),memory: process.memoryUsage(),status: 'online'};pubClient.publish('slave-status', JSON.stringify(status));}// 处理主服务器指令handleMasterCommand(command) {switch (command.type) {case 'restart':this.restart();break;case 'update-config':this.updateConfig(command.config);break;// 其他指令处理...}}
}
1.4.2 去中心化模式(Peer-to-Peer)

所有服务器地位平等,通过协作提供服务,无单点故障风险。

┌────────────┐      ┌────────────┐
│            │◄────►│            │
│   Node 1   │      │   Node 2   │
│            │◄────►│            │
└────────────┘      └────────────┘▲  ▲               ▲│  └───────┐   ┌───┘│          ▼   ▼
┌────────────┐      ┌────────────┐
│            │◄────►│            │
│   Node 3   │      │   Node 4   │
│            │◄────►│            │
└────────────┘      └────────────┘
// 去中心化模式实现示例
import { McpServer } from '@modelcontextprotocol/sdk';
import * as IPFS from 'ipfs-core';
import { v4 as uuidv4 } from 'uuid';// P2P MCP服务器
class P2PMcpServer extends McpServer {private ipfs;private peerId: string;private peers: Map<string, PeerInfo> = new Map();private eventTopic = 'mcp-p2p-events';constructor(config) {super(config);this.peerId = uuidv4();this.initializeP2P();}// 初始化P2P网络async initializeP2P() {// 创建IPFS节点this.ipfs = await IPFS.create();// 获取本节点IDconst nodeInfo = await this.ipfs.id();console.log(`P2P节点ID: ${nodeInfo.id}`);// 订阅事件主题await this.ipfs.pubsub.subscribe(this.eventTopic, (msg) => {this.handleP2PEvent(msg);});// 定期发布状态setInterval(() => this.broadcastStatus(), 5000);// 发布上线事件this.broadcastEvent({type: 'node-online',peerId: this.peerId,timestamp: Date.now(),info: {resources: this.getResourceNames(),tools: this.getToolNames(),capacity: this.getCapacity()}});}// 处理P2P事件handleP2PEvent(message) {try {const event = JSON.parse(message.data.toString());// 忽略自己发出的消息if (event.peerId === this.peerId) return;switch (event.type) {case 'node-online':this.addPeer(event.peerId, event.info);// 向新节点发送欢迎消息this.sendDirectEvent(event.peerId, {type: 'welcome',peerId: this.peerId,timestamp: Date.now()});break;case 'node-offline':this.removePeer(event.peerId);break;case 'status-update':this.updatePeerStatus(event.peerId, event.status);break;case 'resource-request':this.handleResourceRequest(event);break;case 'tool-execution':this.handleToolExecution(event);break;}} catch (error) {console.error('处理P2P事件错误:', error);}}// 广播事件到所有节点async broadcastEvent(event) {await this.ipfs.pubsub.publish(this.eventTopic,Buffer.from(JSON.stringify(event)));}// 发送事件到特定节点async sendDirectEvent(targetPeerId, event) {const peer = this.peers.get(targetPeerId);if (!peer || !peer.directTopic) return;await this.ipfs.pubsub.publish(peer.directTopic,Buffer.from(JSON.stringify(event)));}// 定期广播状态broadcastStatus() {this.broadcastEvent({type: 'status-update',peerId: this.peerId,timestamp: Date.now(),status: {load: this.getCurrentLoad(),memory: process.memoryUsage(),uptime: process.uptime()}});}// 添加对等节点addPeer(peerId: string, info: any) {this.peers.set(peerId, {...info,directTopic: `mcp-p2p-direct:${peerId}`,lastSeen: Date.now()});// 订阅此节点的直接通信主题this.ipfs.pubsub.subscribe(`mcp-p2p-direct:${this.peerId}`, (msg) => {this.handleDirectMessage(msg);});}// 移除对等节点removePeer(peerId: string) {this.peers.delete(peerId);}// 更新对等节点状态updatePeerStatus(peerId: string, status: any) {const peer = this.peers.get(peerId);if (peer) {this.peers.set(peerId, { ...peer, ...status, lastSeen: Date.now() });}}
}
1.4.3 微服务模式(Microservices)

将MCP功能分解为多个独立服务,每个服务专注于特定功能域。

┌─────────────────────────────────────────────┐
│                 API Gateway                  │
└─────────────┬─────────────┬─────────────────┘│             │                 ┌─────────▼───┐   ┌─────▼───────┐   ┌─────────────┐│  Resource   │   │    Tool     │   │  Identity   ││   Service   │   │   Service   │   │   Service   │└─────────────┘   └─────────────┘   └─────────────┘│             │                 │┌─────────▼───┐   ┌─────▼───────┐   ┌─────▼───────┐│   Storage   │   │ Execution   │   │  Security   ││   Service   │   │  Service    │   │   Service   │└─────────────┘   └─────────────┘   └─────────────┘
// 微服务模式实现示例
import { McpServer } from '@modelcontextprotocol/sdk';
import express from 'express';
import axios from 'axios';
import { MongoClient } from 'mongodb';
import { v4 as uuidv4 } from 'uuid';
import amqp from 'amqplib';// 服务发现客户端
class ServiceDiscovery {private serviceRegistry: Map<string, ServiceInfo[]> = new Map();private consulUrl: string;constructor(consulUrl: string) {this.consulUrl = consulUrl;this.refreshRegistry();setInterval(() => this.refreshRegistry(), 30000);}// 刷新服务注册表async refreshRegistry() {try {const response = await axios.get(`${this.consulUrl}/v1/catalog/services`);for (const [serviceName, tags] of Object.entries(response.data)) {if (tags.includes('mcp')) {const instances = await this.getServiceInstances(serviceName);this.serviceRegistry.set(serviceName, instances);}}} catch (error) {console.error('刷新服务注册表失败:', error);}}// 获取服务实例列表async getServiceInstances(serviceName: string): Promise<ServiceInfo[]> {try {const response = await axios.get(`${this.consulUrl}/v1/health/service/${serviceName}?passing=true`);return response.data.map(entry => ({id: entry.Service.ID,address: entry.Service.Address,port: entry.Service.Port,tags: entry.Service.Tags,meta: entry.Service.Meta}));} catch (error) {console.error(`获取服务${serviceName}实例失败:`, error);return [];}}// 获取服务实例getService(serviceName: string, tags: string[] = []): ServiceInfo | null {const instances = this.serviceRegistry.get(serviceName) || [];// 筛选满足标签要求的实例const eligibleInstances = instances.filter(instance => tags.every(tag => instance.tags.includes(tag)));if (eligibleInstances.length === 0) return null;// 简单负载均衡:随机选择一个实例const randomIndex = Math.floor(Math.random() * eligibleInstances.length);return eligibleInstances[randomIndex];}
}// MCP资源服务
class ResourceMcpService {private server: McpServer;private app = express();private mongoClient: MongoClient;private serviceId: string;private discovery: ServiceDiscovery;private messageBroker: amqp.Connection;private channel: amqp.Channel;constructor(config) {this.server = new McpServer({name: "mcp-resource-service",description: "MCP资源微服务",version: "1.0.0"});this.serviceId = uuidv4();this.discovery = new ServiceDiscovery(config.consulUrl);// 初始化Express中间件this.initializeExpress();// 连接MongoDBthis.connectMongo(config.mongoUrl);// 连接消息代理this.connectMessageBroker(config.rabbitMqUrl);// 注册服务this.registerService(config.consulUrl);}// 初始化ExpressinitializeExpress() {this.app.use(express.json());// 资源API端点this.app.get('/resources/:name', this.getResource.bind(this));this.app.post('/resources/:name', this.createResource.bind(this));this.app.put('/resources/:name/:id', this.updateResource.bind(this));this.app.delete('/resources/:name/:id', this.deleteResource.bind(this));// 健康检查端点this.app.get('/health', (req, res) => {res.status(200).json({ status: 'healthy', serviceId: this.serviceId });});// 启动服务器const port = process.env.PORT || 3000;this.app.listen(port, () => {console.log(`资源服务运行在端口 ${port}`);});}// 连接MongoDBasync connectMongo(mongoUrl: string) {try {this.mongoClient = new MongoClient(mongoUrl);await this.mongoClient.connect();console.log('已连接到MongoDB');} catch (error) {console.error('MongoDB连接失败:', error);process.exit(1);}}// 连接消息代理async connectMessageBroker(rabbitMqUrl: string) {try {this.messageBroker = await amqp.connect(rabbitMqUrl);this.channel = await this.messageBroker.createChannel();// 声明交换机和队列await this.channel.assertExchange('mcp-events', 'topic', { durable: true });const { queue } = await this.channel.assertQueue(`resource-service-${this.serviceId}`,{ exclusive: true });// 绑定感兴趣的主题await this.channel.bindQueue(queue, 'mcp-events', 'resource.*');// 消费消息this.channel.consume(queue, (msg) => {if (msg) {this.handleMessage(msg);this.channel.ack(msg);}});console.log('已连接到消息代理');} catch (error) {console.error('消息代理连接失败:', error);}}// 处理消息handleMessage(msg: amqp.ConsumeMessage) {try {const content = JSON.parse(msg.content.toString());const routingKey = msg.fields.routingKey;console.log(`收到消息: ${routingKey}`, content);// 根据路由键处理不同类型的消息switch (routingKey) {case 'resource.created':this.handleResourceCreated(content);break;case 'resource.updated':this.handleResourceUpdated(content);break;case 'resource.deleted':this.handleResourceDeleted(content);break;}} catch (error) {console.error('处理消息错误:', error);}}// 发布事件publishEvent(routingKey: string, data: any) {this.channel.publish('mcp-events',routingKey,Buffer.from(JSON.stringify({...data,serviceId: this.serviceId,timestamp: Date.now()})));}// API处理器async getResource(req, res) {try {const { name } = req.params;const filter = req.query.filter ? JSON.parse(req.query.filter) : {};const db = this.mongoClient.db('mcp-resources');const collection = db.collection(name);const resources = await collection.find(filter).toArray();res.json({ success: true, data: resources });} catch (error) {res.status(500).json({success: false,error: error.message});}}async createResource(req, res) {try {const { name } = req.params;const resourceData = req.body;const db = this.mongoClient.db('mcp-resources');const collection = db.collection(name);const result = await collection.insertOne({...resourceData,_id: uuidv4(),createdAt: new Date(),updatedAt: new Date()});// 发布资源创建事件this.publishEvent('resource.created', {resourceName: name,resourceId: result.insertedId,data: resourceData});res.status(201).json({success: true,data: { id: result.insertedId }});} catch (error) {res.status(500).json({success: false,error: error.message});}}// 其他API处理器...// 注册服务到Consulasync registerService(consulUrl: string) {try {await axios.put(`${consulUrl}/v1/agent/service/register`, {ID: this.serviceId,Name: 'mcp-resource-service',Tags: ['mcp', 'resource'],Address: process.env.SERVICE_HOST || 'localhost',Port: parseInt(process.env.PORT || '3000'),Check: {HTTP: `http://${process.env.SERVICE_HOST || 'localhost'}:${process.env.PORT || '3000'}/health`,Interval: '15s',Timeout: '5s'}});console.log(`服务已注册,ID: ${this.serviceId}`);// 注册服务注销钩子process.on('SIGINT', () => this.deregisterService(consulUrl));process.on('SIGTERM', () => this.deregisterService(consulUrl));} catch (error) {console.error('服务注册失败:', error);}}// 注销服务async deregisterService(consulUrl: string) {try {await axios.put(`${consulUrl}/v1/agent/service/deregister/${this.serviceId}`);console.log('服务已注销');process.exit(0);} catch (error) {console.error('服务注销失败:', error);process.exit(1);}}
}

多服务器架构为MCP应用提供了更高的可用性、可伸缩性和灵活性,但同时也带来了系统复杂度的提升。在后续章节中,我们将深入探讨如何应对这些挑战,构建稳健的多服务器MCP系统。

2. 服务发现与路由

在多服务器MCP架构中,服务发现和路由机制是确保系统高效运行的关键组件。它们使客户端能够动态找到可用的服务实例,并将请求路由到最合适的服务器上。

2.1 服务注册与发现机制

服务发现的核心是让服务消费者能够在不需要硬编码服务提供者地址的情况下,动态找到并调用所需的服务。在MCP系统中,服务发现机制通常涉及以下几个关键组件:

2.1.1 服务注册表

服务注册表是服务发现的核心数据存储,记录了所有可用服务实例的关键信息:

// 服务注册表接口
interface ServiceRegistry {// 注册服务实例register(instance: ServiceInstance): Promise<void>;// 注销服务实例deregister(instanceId: string): Promise<void>;// 获取指定服务的所有实例getInstances(serviceName: string): Promise<ServiceInstance[]>;// 查询符合特定条件的服务实例findServices(query: ServiceQuery): Promise<ServiceInstance[]>;// 监听服务变更watchService(serviceName: string, callback: (instances: ServiceInstance[]) => void): void;
}// 服务实例数据结构
interface ServiceInstance {id: string;               // 实例唯一标识serviceName: string;      // 服务名称host: string;             // 主机地址port: number;             // 端口号status: 'UP' | 'DOWN' | 'STARTING' | 'OUT_OF_SERVICE'; // 实例状态metadata: Record<string, string>; // 元数据,如版本、环境等healthCheckUrl?: string;  // 健康检查URLregistrationTime: number; // 注册时间lastUpdateTime: number;   // 最后更新时间
}
2.1.2 基于MCP实现的分布式服务注册表

以下是一个使用MCP服务器实现的分布式服务注册表示例:

import { McpServer } from '@modelcontextprotocol/sdk';
import { createClient } from 'redis';
import express from 'express';// 基于Redis的MCP服务注册表实现
class McpServiceRegistry implements ServiceRegistry {private redisClient;private readonly KEY_PREFIX = 'mcp:registry:';private readonly TTL = 60; // 服务记录过期时间(秒)constructor(redisUrl: string) {this.redisClient = createClient({ url: redisUrl });this.redisClient.connect();}// 注册服务实例async register(instance: ServiceInstance): Promise<void> {const key = `${this.KEY_PREFIX}${instance.serviceName}:${instance.id}`;const now = Date.now();// 更新注册时间或最后更新时间instance.lastUpdateTime = now;if (!instance.registrationTime) {instance.registrationTime = now;}// 将服务实例信息存储到Redisawait this.redisClient.set(key, JSON.stringify(instance), { EX: this.TTL });// 添加到服务名称集合中,方便按服务名查询await this.redisClient.sAdd(`${this.KEY_PREFIX}${instance.serviceName}`, instance.id);}// 注销服务实例async deregister(instanceId: string): Promise<void> {// 获取服务实例信息const pattern = `${this.KEY_PREFIX}*:${instanceId}`;const keys = await this.redisClient.keys(pattern);if (keys.length > 0) {const key = keys[0];const serviceName = key.split(':')[2];// 从Redis中删除服务实例await this.redisClient.del(key);// 从服务名称集合中移除await this.redisClient.sRem(`${this.KEY_PREFIX}${serviceName}`, instanceId);}}// 获取指定服务的所有实例async getInstances(serviceName: string): Promise<ServiceInstance[]> {const serviceKey = `${this.KEY_PREFIX}${serviceName}`;const instanceIds = await this.redisClient.sMembers(serviceKey);if (instanceIds.length === 0) {return [];}// 批量获取所有服务实例信息const keys = instanceIds.map(id => `${this.KEY_PREFIX}${serviceName}:${id}`);const instancesData = await this.redisClient.mGet(keys);// 过滤并解析有效的实例数据return instancesData.filter(Boolean).map(data => JSON.parse(data)).filter(instance => instance.status !== 'DOWN');}// 查询符合特定条件的服务实例async findServices(query: ServiceQuery): Promise<ServiceInstance[]> {const { serviceName, status, metadata } = query;// 获取所有匹配服务名的实例const instances = await this.getInstances(serviceName);// 应用过滤条件return instances.filter(instance => {// 状态过滤if (status && instance.status !== status) {return false;}// 元数据过滤if (metadata) {for (const [key, value] of Object.entries(metadata)) {if (instance.metadata[key] !== value) {return false;}}}return true;});}// 监听服务变更watchService(serviceName: string, callback: (instances: ServiceInstance[]) => void): void {// 创建订阅客户端const subClient = this.redisClient.duplicate();// 订阅服务变更事件subClient.subscribe(`${this.KEY_PREFIX}${serviceName}:changes`, (message) => {this.getInstances(serviceName).then(callback);});}
}// 服务注册表API服务器
function createRegistryServer(registry: McpServiceRegistry): express.Express {const app = express();app.use(express.json());// 服务注册端点app.post('/services', async (req, res) => {try {const instance = req.body as ServiceInstance;await registry.register(instance);res.status(201).json({ success: true, message: '服务实例已注册' });} catch (error) {res.status(500).json({ success: false, error: error.message });}});// 服务注销端点app.delete('/services/:instanceId', async (req, res) => {try {await registry.deregister(req.params.instanceId);res.status(200).json({ success: true, message: '服务实例已注销' });} catch (error) {res.status(500).json({ success: false, error: error.message });}});// 获取服务实例端点app.get('/services/:serviceName', async (req, res) => {try {const instances = await registry.getInstances(req.params.serviceName);res.status(200).json({ success: true, data: instances });} catch (error) {res.status(500).json({ success: false, error: error.message });}});// 服务查询端点app.get('/services', async (req, res) => {try {const query = {serviceName: req.query.serviceName as string,status: req.query.status as 'UP' | 'DOWN' | 'STARTING' | 'OUT_OF_SERVICE',metadata: req.query.metadata ? JSON.parse(req.query.metadata as string) : undefined};const instances = await registry.findServices(query);res.status(200).json({ success: true, data: instances });} catch (error) {res.status(500).json({ success: false, error: error.message });}});return app;
}// 使用MCP服务器封装服务注册表
class McpRegistryServer extends McpServer {private registry: McpServiceRegistry;private httpServer: express.Express;constructor(config: {name: string;description: string;version: string;redisUrl: string;port: number;}) {super({name: config.name,description: config.description,version: config.version});// 创建服务注册表this.registry = new McpServiceRegistry(config.redisUrl);// 创建HTTP服务器this.httpServer = createRegistryServer(this.registry);// 启动HTTP服务器this.httpServer.listen(config.port, () => {console.log(`注册表服务器运行在端口 ${config.port}`);});// 作为MCP资源暴露服务注册表this.exposeRegistryAsResource();}// 将注册表作为MCP资源暴露private exposeRegistryAsResource() {// 定义服务实例资源模板this.registerResourceTemplate({name: "serviceInstances",description: "MCP服务实例资源",schema: {type: "object",properties: {id: { type: "string" },serviceName: { type: "string" },host: { type: "string" },port: { type: "number" },status: { type: "string", enum: ["UP", "DOWN", "STARTING", "OUT_OF_SERVICE"] },metadata: { type: "object" },healthCheckUrl: { type: "string" },registrationTime: { type: "number" },lastUpdateTime: { type: "number" }},required: ["id", "serviceName", "host", 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/81419.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

git 修改用户名和邮箱

在 Git 中修改用户名和邮箱地址是常见的任务&#xff0c;这可以确保你的提交记录使用正确的身份信息。你可以通过简单的命令来完成这一操作。 全局配置 修改全局用户名 要修改全局的用户名&#xff0c;请执行以下命令&#xff1a; git config --global user.name "New…

[算法学习]——通过RMQ与dfs序实现O(1)求LCA(含封装板子)

每周五篇博客&#xff1a;&#xff08;3/5&#xff09; 碎碎念 其实不是我想多水一篇博客&#xff0c;本来这篇是欧拉序的博客&#xff0c;结果dfs序也是可以O1求lca的&#xff0c;而且常数更优&#xff0c;结果就变成这样了。。。 前置知识 [算法学习]——dfs序 思想 分…

spark local模式

Spark Local 模式是一种在单台机器上运行 Spark 应用程序的模式&#xff0c;无需搭建分布式集群&#xff0c;适合开发调试、学习以及运行小规模数据处理任务。以下为你详细介绍该模式&#xff1a; 特点 简易性&#xff1a;无需额外配置分布式集群&#xff0c;在单机上就能快速…

用 RxSwift 实现 UITableView 的响应式绑定(超实用示例)

目录 前言 一、环境准备 1.安装 RxSwift 和 RxCocoa 2.导入模块 二、实现一个简单的UITableView 1.实现一个简单的 UITableView 1.实现步骤 1.我们声明一个ViewModel 2.ViewModel和UITableView 绑定 2.实现 UITableView 的代理方法 三、处理点击事件 前言 在 iOS 开发…

【C++】通过红黑树封装map和set

前言&#xff1a; 通过之前的学习&#xff0c;我们已经学会了红黑树和map、set。这次我们要实现自己的map和set&#xff0c;对&#xff0c;使用红黑树进行封装&#xff01; 当然&#xff0c;红黑树内容这里就不在赘述&#xff0c;我们会复用红黑树的代码&#xff0c;所以先将…

非凸科技受邀出席AI SPARK活动,共探生成式AI驱动金融新生态

4月19日&#xff0c;由AI SPARK社区主办的“生成式AI创新与应用构建”主题沙龙在北京举行。活动聚焦生成式AI的技术突破与产业融合&#xff0c;围绕大模型优化、多模态应用、存内计算等前沿议题展开深度探讨。非凸科技受邀出席并发表主题演讲&#xff0c;深入解析金融垂直大模型…

【Java IO流】IO流详解

参考笔记&#xff1a;【Java基础-3】吃透Java IO&#xff1a;字节流、字符流、缓冲流_javaio-CSDN博客 目录 1.IO流简介 1.1 什么是IO流&#xff1f; 1.2 IO流的分类 1.3 字符流和字节流的其他区别 1.4 Java IO流体系图 2.字符编码详解 3. Java的char类型与 Unicode、U…

驱动开发系列56 - Linux Graphics QXL显卡驱动代码分析(三)显示模式设置

一:概述 如之前介绍,在qxl_pci_probe 中会调用 qxl_modeset_init 来初始化屏幕分辨率和刷新率,本文详细看下 qxl_modeset_init 的实现过程。即QXL设备的显示模式设置,是如何配置CRTC,Encoder,Connector 的以及创建和更新帧缓冲区的。 二:qxl_modeset_init 分析 in…

Vue3开发常见性能问题知多少

文章目录 1 常见性能优化瓶颈及原因1.1 响应式数据的过度使用1.2 虚拟 DOM 的频繁更新1.3 组件渲染的冗余1.4 大列表渲染的性能问题1.5 计算属性和侦听器的滥用1.6 事件处理函数的频繁绑定1.7 异步组件的加载性能2 解决方案与优化技巧2.1 合理使用响应式数据2.2 优化虚拟 DOM 更…

Rust Ubuntu下编译生成环境win程序踩坑指南

前言&#xff1a; 1&#xff0c;公司要给一线搞一个升级程序&#xff0c;需要在win下跑。 之前都是找开发总监帮忙&#xff0c;但是他最近比较忙。就让我自己搞。有了下文.。说来惭愧&#xff0c;之前写过一篇ubuntu下编译windows的文章。里面的demo就一句话 fuck world。依赖…

openharmony 4.1 运行busybox工具包(保姆教程)

1.下载 链接&#xff1a;Index of /downloads/binaries 进入其中后&#xff0c;找到 挑选适合你系统架构的版本&#xff0c;例如我这边是 https://busybox.net/downloads/binaries/1.31.0-defconfig-multiarch-musl/busybox-armv7r 右键复制链接 打开迅雷&#xff0c;直接粘…

算法四 习题 1.3

数组实现栈 #include <iostream> #include <vector> #include <stdexcept> using namespace std;class MyStack { private:vector<int> data; // 用于存储栈元素的数组public:// 构造函数MyStack() {}// 入栈操作void push(int val) {data.push_back…

GD32F407单片机开发入门(十七)内部RTC实时时钟及实战含源码

文章目录 一.概要二.RTC基本特点三.GD32单片机RTC内部结构图四.配置一个RTC走秒例程五.工程源代码下载六.小结 一.概要 RTC&#xff08;Real-Time Clock&#xff09;是一种用于追踪和记录实际时间的时钟系统。RTC模块提供了一个包含日期&#xff08;年/月/日&#xff09;和时间…

新能源汽车运动控制器核心芯片选型与优化:MCU、DCDC与CANFD协同设计

摘要&#xff1a;随着新能源汽车产业的迅猛发展&#xff0c;汽车运动控制器的性能和可靠性面临着更高的要求。本文深入探讨了新能源汽车运动控制器中MCU&#xff08;微控制单元&#xff09;、DCDC电源管理芯片和CANFD总线通信芯片的选型要点、优化策略及其协同设计方案。通过综…

2.maven 手动安装 jar包

1.背景 有的时候&#xff0c;maven仓库无法下载&#xff0c;可以手动安装。本文以pentaho-aggdesigner-algorithm-5.1.5-jhyde.jar为例。 2.预先准备 下载文件到本地指定位置。 2.1.安装pom mvn install:install-file \-Dfile/home/wind/tmp/pentaho-aggdesigner-5.1.5-jh…

OpenCV 图形API(75)图像与通道拼接函数-----将 4 个单通道图像矩阵 (GMat) 合并为一个 4 通道的多通道图像矩阵函数merge4()

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 由4个单通道矩阵创建一个4通道矩阵。 该函数将多个矩阵合并为一个单一的多通道矩阵。也就是说&#xff0c;输出矩阵的每一个元素都是输入矩阵对…

AI日报 · 2025年05月02日 | 再见GPT-4!OpenAI CEO 确认 GPT-4 已从 ChatGPT 界面正式移除

1、OpenAI CEO 确认 GPT-4 已从 ChatGPT 界面正式移除 在处理 GPT-4o 更新问题的同时&#xff0c;OpenAI CEO Sam Altman 于 5 月 1 日在 X 平台发文&#xff0c;正式确认初代 GPT-4 模型已从 ChatGPT 主用户界面中移除。此举遵循了 OpenAI 此前公布的计划&#xff0c;即在 4 …

patch命令在代码管理中的应用

patch 是一个用于将差异文件&#xff08;补丁&#xff09;应用到源代码的工具&#xff0c;常用于修复 bug、添加功能或调整代码结构。在您提供的代码中&#xff0c;patch 命令通过一系列补丁文件&#xff08;.patch&#xff09;修改了 open-amp 库的源代码。 patch 命令的核心作…

spring-ai集成langfuse

1、pom文件 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/POM/4.…

PyTorch 与 TensorFlow:深度学习框架的深度剖析与实战对比

PyTorch 与 TensorFlow&#xff1a;深度学习框架的深度剖析与实战对比 摘要 &#xff1a;本文深入对比 PyTorch 与 TensorFlow 两大深度学习框架&#xff0c;从核心架构、优缺点、适用场景等多维度剖析&#xff0c;结合实例讲解&#xff0c;帮助开发者清晰理解两者特性&#x…