MCP协议 进阶 MCP 连接管理 连接池 重连

MCP连接管理:生命周期、保活与重连策略

AIEng Hub
阅读约 15 分钟

引言

MCP Client与Server之间的连接管理是构建可靠AI工具系统的基石。本文将系统讲解连接的生命周期、保活机制、重连策略和多Server管理。

连接生命周期

完整生命周期

                    ┌─────────────┐
                    │   INITIAL   │
                    └──────┬──────┘
                           │ connect + initialize
                    ┌──────▼──────┐
              ┌────►│  CONNECTED  │◄──────────────┐
              │     └──────┬──────┘               │
              │            │                      │
              │     ┌──────▼──────┐               │
              │     │   ACTIVE    │───────────────┤
              │     └──────┬──────┘  ping失败     │
              │            │                      │
              │     ┌──────▼──────┐               │
              │     │ DISCONNECTED│               │
              │     └──────┬──────┘               │
              │            │ 重连                  │
              └────────────┘                      │

                    ┌─────────────┐               │
                    │   CLOSED    │◄──────────────┘
                    └─────────────┘  主动关闭

状态管理实现

enum ConnectionState {
  INITIAL = "initial",
  CONNECTING = "connecting",
  CONNECTED = "connected",
  ACTIVE = "active",
  DISCONNECTED = "disconnected",
  CLOSED = "closed",
}

class MCPConnectionManager {
  private state: ConnectionState = ConnectionState.INITIAL;
  private client: Client | null = null;
  private transport: Transport | null = null;
  private stateListeners: Array<(state: ConnectionState) => void> = [];

  private setState(newState: ConnectionState): void {
    this.state = newState;
    this.stateListeners.forEach(l => l(newState));
  }

  onStateChange(listener: (state: ConnectionState) => void): void {
    this.stateListeners.push(listener);
  }

  async connect(serverConfig: ServerConfig): Promise<void> {
    if (this.state === ConnectionState.CONNECTED ||
        this.state === ConnectionState.ACTIVE) {
      console.warn("已经连接");
      return;
    }

    this.setState(ConnectionState.CONNECTING);

    try {
      this.transport = new StdioClientTransport({
        command: serverConfig.command,
        args: serverConfig.args,
      });

      this.client = new Client(
        { name: "app-client", version: "1.0.0" },
        { capabilities: {} }
      );

      await this.client.connect(this.transport);
      this.setState(ConnectionState.ACTIVE);
      this.startHeartbeat();
    } catch (error) {
      this.setState(ConnectionState.DISCONNECTED);
      throw error;
    }
  }

  async disconnect(): Promise<void> {
    if (this.client) {
      await this.client.close();
      this.client = null;
    }
    this.transport = null;
    this.setState(ConnectionState.CLOSED);
  }

  isConnected(): boolean {
    return this.state === ConnectionState.CONNECTED ||
           this.state === ConnectionState.ACTIVE;
  }
}

心跳保活机制

Ping实现

class HeartbeatManager {
  private interval: NodeJS.Timeout | null = null;
  private consecutiveFailures = 0;
  private maxFailures = 3;

  constructor(
    private client: Client,
    private onFailure: () => void,
    private intervalMs: number = 30000
  ) {}

  start(): void {
    this.interval = setInterval(async () => {
      try {
        await this.client.ping();
        this.consecutiveFailures = 0;
        console.debug("心跳正常");
      } catch (error) {
        this.consecutiveFailures++;
        console.warn(
          `心跳失败 (${this.consecutiveFailures}/${this.maxFailures})`
        );

        if (this.consecutiveFailures >= this.maxFailures) {
          console.error("心跳连续失败,触发重连");
          this.onFailure();
        }
      }
    }, this.intervalMs);
  }

  stop(): void {
    if (this.interval) {
      clearInterval(this.interval);
      this.interval = null;
    }
  }

  reset(): void {
    this.consecutiveFailures = 0;
  }
}

Python心跳实现

import asyncio
from typing import Optional


class HeartbeatMonitor:
    """心跳监控器"""

    def __init__(
        self,
        session: "ClientSession",
        interval: float = 30.0,
        max_failures: int = 3,
        on_failure: Optional[callable] = None,
    ):
        self.session = session
        self.interval = interval
        self.max_failures = max_failures
        self.on_failure = on_failure
        self._task: Optional[asyncio.Task] = None
        self._failures = 0

    async def start(self):
        """启动心跳监控"""
        self._task = asyncio.create_task(self._run())

    async def stop(self):
        """停止心跳监控"""
        if self._task:
            self._task.cancel()
            self._task = None

    async def _run(self):
        while True:
            try:
                await asyncio.sleep(self.interval)
                await self.session.ping()
                self._failures = 0
            except Exception as e:
                self._failures += 1
                print(f"心跳失败 ({self._failures}/{self.max_failures}): {e}")

                if self._failures >= self.max_failures:
                    print("心跳连续失败,触发重连")
                    if self.on_failure:
                        await self.on_failure()
                    break

断线重连策略

指数退避重连

class ReconnectManager {
  private retryCount = 0;
  private maxRetries = 5;
  private baseDelay = 1000;  // 1秒
  private maxDelay = 30000;  // 30秒
  private isReconnecting = false;

  async reconnect(
    connectFn: () => Promise<void>
  ): Promise<void> {
    if (this.isReconnecting) return;
    this.isReconnecting = true;

    while (this.retryCount < this.maxRetries) {
      try {
        const delay = Math.min(
          this.baseDelay * Math.pow(2, this.retryCount) +
            Math.random() * 1000,  // 添加随机抖动
          this.maxDelay
        );

        console.log(
          `等待 ${delay}ms 后重连 (第 ${this.retryCount + 1} 次)`
        );
        await this.sleep(delay);

        await connectFn();
        console.log("重连成功");
        this.retryCount = 0;
        this.isReconnecting = false;
        return;
      } catch (error) {
        this.retryCount++;
        console.error(`重连失败: ${error.message}`);
      }
    }

    this.isReconnecting = false;
    throw new Error(`重连失败,已重试 ${this.maxRetries} 次`);
  }

  private sleep(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms));
  }

  reset(): void {
    this.retryCount = 0;
  }
}

Python重连实现

import asyncio
import random


class ReconnectHandler:
    """带指数退避的重连处理器"""

    def __init__(
        self,
        max_retries: int = 5,
        base_delay: float = 1.0,
        max_delay: float = 30.0,
    ):
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay
        self._retry_count = 0

    async def reconnect(self, connect_fn: callable):
        """执行重连"""
        while self._retry_count < self.max_retries:
            # 指数退避 + 随机抖动
            delay = min(
                self.base_delay * (2 ** self._retry_count) +
                random.uniform(0, 1),
                self.max_delay
            )

            print(f"等待 {delay:.1f}s 后重连 "
                  f"(第 {self._retry_count + 1} 次)")
            await asyncio.sleep(delay)

            try:
                await connect_fn()
                print("重连成功")
                self._retry_count = 0
                return
            except Exception as e:
                self._retry_count += 1
                print(f"重连失败: {e}")

        raise RuntimeError(f"重连失败,已重试 {self.max_retries} 次")

完整重连客户端

class ResilientMCPClient {
  private connectionMgr: MCPConnectionManager;
  private heartbeat: HeartbeatManager;
  private reconnector: ReconnectManager;
  private serverConfig: ServerConfig;

  constructor(config: ServerConfig) {
    this.serverConfig = config;
    this.connectionMgr = new MCPConnectionManager();
    this.reconnector = new ReconnectManager();

    // 心跳失败触发重连
    this.heartbeat = new HeartbeatManager(
      null!,  // client将在connect后设置
      () => this.handleHeartbeatFailure(),
      30000
    );
  }

  async connect(): Promise<void> {
    await this.connectionMgr.connect(this.serverConfig);
    this.heartbeat.start();
    this.connectionMgr.onStateChange((state) => {
      if (state === ConnectionState.ACTIVE) {
        this.heartbeat.reset();
      }
    });
  }

  private async handleHeartbeatFailure(): Promise<void> {
    console.warn("心跳失败,准备重连...");
    try {
      await this.reconnector.reconnect(() => this.connect());
    } catch (error) {
      console.error("重连最终失败:", error);
    }
  }

  async disconnect(): Promise<void> {
    this.heartbeat.stop();
    await this.connectionMgr.disconnect();
  }
}

多Server管理

Server路由

// 多Server配置
interface MCPServerConfig {
  name: string;
  connection: {
    command: string;
    args: string[];
    env?: Record<string, string>;
  };
  tools?: string[];  // 可选:只使用特定工具
}

class MultiServerManager {
  private servers: Map<string, ResilientMCPClient> = new Map();

  async addServer(config: MCPServerConfig): Promise<void> {
    const client = new ResilientMCPClient(config.connection);
    await client.connect();
    this.servers.set(config.name, client);
    console.log(`Server '${config.name}' 已连接`);
  }

  async removeServer(name: string): Promise<void> {
    const client = this.servers.get(name);
    if (client) {
      await client.disconnect();
      this.servers.delete(name);
    }
  }

  async callTool(serverName: string, toolName: string, args: any) {
    const server = this.servers.get(serverName);
    if (!server) {
      throw new Error(`Server '${serverName}' 未连接`);
    }
    return server.callTool(toolName, args);
  }

  async callToolAuto(toolName: string, args: any) {
    // 在所有Server中自动查找工具
    for (const [name, server] of this.servers) {
      try {
        const tools = await server.listTools();
        if (tools.some(t => t.name === toolName)) {
          return server.callTool(toolName, args);
        }
      } catch {
        continue;
      }
    }
    throw new Error(`工具 '${toolName}' 在所有Server中均未找到`);
  }

  async disconnectAll(): Promise<void> {
    for (const [name, server] of this.servers) {
      await server.disconnect();
      console.log(`Server '${name}' 已断开`);
    }
    this.servers.clear();
  }
}

连接池技术

Python连接池

from contextlib import asynccontextmanager
from typing import AsyncIterator


class MCPConnectionPool:
    """MCP连接池"""

    def __init__(
        self,
        server_params: StdioServerParameters,
        min_size: int = 2,
        max_size: int = 5,
    ):
        self.server_params = server_params
        self.min_size = min_size
        self.max_size = max_size
        self._available: list = []
        self._in_use: set = set()
        self._lock = asyncio.Lock()

    async def initialize(self):
        """创建初始连接"""
        for _ in range(self.min_size):
            session = await self._create_session()
            self._available.append(session)

    async def acquire(self):
        """获取连接"""
        async with self._lock:
            while not self._available:
                if len(self._in_use) < self.max_size:
                    session = await self._create_session()
                    self._available.append(session)
                else:
                    # 等待可用连接
                    await asyncio.sleep(0.1)

            session = self._available.pop()
            self._in_use.add(id(session))
            return session

    async def release(self, session):
        """释放连接"""
        async with self._lock:
            self._in_use.discard(id(session))
            if len(self._available) < self.max_size:
                self._available.append(session)
            else:
                await session.__aexit__(None, None, None)

    async def _create_session(self):
        read, write = await stdio_client(
            self.server_params
        ).__aenter__()
        session = ClientSession(read, write)
        await session.initialize()
        return session

    async def close(self):
        """关闭所有连接"""
        for session in self._available:
            await session.__aexit__(None, None, None)
        self._available.clear()

连接配置管理

配置文件格式

{
  "mcpServers": {
    "filesystem": {
      "command": "npx",
      "args": [
        "-y",
        "@modelcontextprotocol/server-filesystem",
        "/home/user/projects"
      ],
      "env": {
        "MCP_DEBUG": "false"
      },
      "connection": {
        "reconnect": {
          "maxRetries": 5,
          "baseDelay": 1000,
          "maxDelay": 30000
        },
        "heartbeat": {
          "interval": 30000,
          "maxFailures": 3
        }
      }
    }
  }
}

最佳实践

场景推荐配置
本地开发stdio, 心跳30s
远程服务SSE, 心跳15s
生产环境stdio + 心跳 + 重连
多Server独立连接 + 工具路由
高并发连接池 (5-10)

总结

连接管理是MCP客户端稳定运行的关键:

组件功能重要性
生命周期管理连接状态机P0
心跳检测连接健康P0
重连自动恢复连接P0
多Server管理多个连接P1
连接池复用连接优化性能P1

下一步学习建议:


本文最后更新于 2024-07-14。