引言
MCP Client与Server之间的连接管理是构建可靠AI工具系统的基石。本文将系统讲解连接的生命周期、保活机制、重连策略和多Server管理。
连接生命周期
完整生命周期
┌─────────────┐
│ INITIAL │
└──────┬──────┘
│ connect + initialize
┌──────▼──────┐
┌────►│ CONNECTED │◄──────────────┐
│ └──────┬──────┘ │
│ │ │
│ ┌──────▼──────┐ │
│ │ ACTIVE │───────────────┤
│ └──────┬──────┘ ping失败 │
│ │ │
│ ┌──────▼──────┐ │
│ │ DISCONNECTED│ │
│ └──────┬──────┘ │
│ │ 重连 │
└────────────┘ │
│
┌─────────────┐ │
│ CLOSED │◄──────────────┘
└─────────────┘ 主动关闭
状态管理实现
enum ConnectionState {
INITIAL = "initial",
CONNECTING = "connecting",
CONNECTED = "connected",
ACTIVE = "active",
DISCONNECTED = "disconnected",
CLOSED = "closed",
}
class MCPConnectionManager {
private state: ConnectionState = ConnectionState.INITIAL;
private client: Client | null = null;
private transport: Transport | null = null;
private stateListeners: Array<(state: ConnectionState) => void> = [];
private setState(newState: ConnectionState): void {
this.state = newState;
this.stateListeners.forEach(l => l(newState));
}
onStateChange(listener: (state: ConnectionState) => void): void {
this.stateListeners.push(listener);
}
async connect(serverConfig: ServerConfig): Promise<void> {
if (this.state === ConnectionState.CONNECTED ||
this.state === ConnectionState.ACTIVE) {
console.warn("已经连接");
return;
}
this.setState(ConnectionState.CONNECTING);
try {
this.transport = new StdioClientTransport({
command: serverConfig.command,
args: serverConfig.args,
});
this.client = new Client(
{ name: "app-client", version: "1.0.0" },
{ capabilities: {} }
);
await this.client.connect(this.transport);
this.setState(ConnectionState.ACTIVE);
this.startHeartbeat();
} catch (error) {
this.setState(ConnectionState.DISCONNECTED);
throw error;
}
}
async disconnect(): Promise<void> {
if (this.client) {
await this.client.close();
this.client = null;
}
this.transport = null;
this.setState(ConnectionState.CLOSED);
}
isConnected(): boolean {
return this.state === ConnectionState.CONNECTED ||
this.state === ConnectionState.ACTIVE;
}
}
心跳保活机制
Ping实现
class HeartbeatManager {
private interval: NodeJS.Timeout | null = null;
private consecutiveFailures = 0;
private maxFailures = 3;
constructor(
private client: Client,
private onFailure: () => void,
private intervalMs: number = 30000
) {}
start(): void {
this.interval = setInterval(async () => {
try {
await this.client.ping();
this.consecutiveFailures = 0;
console.debug("心跳正常");
} catch (error) {
this.consecutiveFailures++;
console.warn(
`心跳失败 (${this.consecutiveFailures}/${this.maxFailures})`
);
if (this.consecutiveFailures >= this.maxFailures) {
console.error("心跳连续失败,触发重连");
this.onFailure();
}
}
}, this.intervalMs);
}
stop(): void {
if (this.interval) {
clearInterval(this.interval);
this.interval = null;
}
}
reset(): void {
this.consecutiveFailures = 0;
}
}
Python心跳实现
import asyncio
from typing import Optional
class HeartbeatMonitor:
"""心跳监控器"""
def __init__(
self,
session: "ClientSession",
interval: float = 30.0,
max_failures: int = 3,
on_failure: Optional[callable] = None,
):
self.session = session
self.interval = interval
self.max_failures = max_failures
self.on_failure = on_failure
self._task: Optional[asyncio.Task] = None
self._failures = 0
async def start(self):
"""启动心跳监控"""
self._task = asyncio.create_task(self._run())
async def stop(self):
"""停止心跳监控"""
if self._task:
self._task.cancel()
self._task = None
async def _run(self):
while True:
try:
await asyncio.sleep(self.interval)
await self.session.ping()
self._failures = 0
except Exception as e:
self._failures += 1
print(f"心跳失败 ({self._failures}/{self.max_failures}): {e}")
if self._failures >= self.max_failures:
print("心跳连续失败,触发重连")
if self.on_failure:
await self.on_failure()
break
断线重连策略
指数退避重连
class ReconnectManager {
private retryCount = 0;
private maxRetries = 5;
private baseDelay = 1000; // 1秒
private maxDelay = 30000; // 30秒
private isReconnecting = false;
async reconnect(
connectFn: () => Promise<void>
): Promise<void> {
if (this.isReconnecting) return;
this.isReconnecting = true;
while (this.retryCount < this.maxRetries) {
try {
const delay = Math.min(
this.baseDelay * Math.pow(2, this.retryCount) +
Math.random() * 1000, // 添加随机抖动
this.maxDelay
);
console.log(
`等待 ${delay}ms 后重连 (第 ${this.retryCount + 1} 次)`
);
await this.sleep(delay);
await connectFn();
console.log("重连成功");
this.retryCount = 0;
this.isReconnecting = false;
return;
} catch (error) {
this.retryCount++;
console.error(`重连失败: ${error.message}`);
}
}
this.isReconnecting = false;
throw new Error(`重连失败,已重试 ${this.maxRetries} 次`);
}
private sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
reset(): void {
this.retryCount = 0;
}
}
Python重连实现
import asyncio
import random
class ReconnectHandler:
"""带指数退避的重连处理器"""
def __init__(
self,
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 30.0,
):
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
self._retry_count = 0
async def reconnect(self, connect_fn: callable):
"""执行重连"""
while self._retry_count < self.max_retries:
# 指数退避 + 随机抖动
delay = min(
self.base_delay * (2 ** self._retry_count) +
random.uniform(0, 1),
self.max_delay
)
print(f"等待 {delay:.1f}s 后重连 "
f"(第 {self._retry_count + 1} 次)")
await asyncio.sleep(delay)
try:
await connect_fn()
print("重连成功")
self._retry_count = 0
return
except Exception as e:
self._retry_count += 1
print(f"重连失败: {e}")
raise RuntimeError(f"重连失败,已重试 {self.max_retries} 次")
完整重连客户端
class ResilientMCPClient {
private connectionMgr: MCPConnectionManager;
private heartbeat: HeartbeatManager;
private reconnector: ReconnectManager;
private serverConfig: ServerConfig;
constructor(config: ServerConfig) {
this.serverConfig = config;
this.connectionMgr = new MCPConnectionManager();
this.reconnector = new ReconnectManager();
// 心跳失败触发重连
this.heartbeat = new HeartbeatManager(
null!, // client将在connect后设置
() => this.handleHeartbeatFailure(),
30000
);
}
async connect(): Promise<void> {
await this.connectionMgr.connect(this.serverConfig);
this.heartbeat.start();
this.connectionMgr.onStateChange((state) => {
if (state === ConnectionState.ACTIVE) {
this.heartbeat.reset();
}
});
}
private async handleHeartbeatFailure(): Promise<void> {
console.warn("心跳失败,准备重连...");
try {
await this.reconnector.reconnect(() => this.connect());
} catch (error) {
console.error("重连最终失败:", error);
}
}
async disconnect(): Promise<void> {
this.heartbeat.stop();
await this.connectionMgr.disconnect();
}
}
多Server管理
Server路由
// 多Server配置
interface MCPServerConfig {
name: string;
connection: {
command: string;
args: string[];
env?: Record<string, string>;
};
tools?: string[]; // 可选:只使用特定工具
}
class MultiServerManager {
private servers: Map<string, ResilientMCPClient> = new Map();
async addServer(config: MCPServerConfig): Promise<void> {
const client = new ResilientMCPClient(config.connection);
await client.connect();
this.servers.set(config.name, client);
console.log(`Server '${config.name}' 已连接`);
}
async removeServer(name: string): Promise<void> {
const client = this.servers.get(name);
if (client) {
await client.disconnect();
this.servers.delete(name);
}
}
async callTool(serverName: string, toolName: string, args: any) {
const server = this.servers.get(serverName);
if (!server) {
throw new Error(`Server '${serverName}' 未连接`);
}
return server.callTool(toolName, args);
}
async callToolAuto(toolName: string, args: any) {
// 在所有Server中自动查找工具
for (const [name, server] of this.servers) {
try {
const tools = await server.listTools();
if (tools.some(t => t.name === toolName)) {
return server.callTool(toolName, args);
}
} catch {
continue;
}
}
throw new Error(`工具 '${toolName}' 在所有Server中均未找到`);
}
async disconnectAll(): Promise<void> {
for (const [name, server] of this.servers) {
await server.disconnect();
console.log(`Server '${name}' 已断开`);
}
this.servers.clear();
}
}
连接池技术
Python连接池
from contextlib import asynccontextmanager
from typing import AsyncIterator
class MCPConnectionPool:
"""MCP连接池"""
def __init__(
self,
server_params: StdioServerParameters,
min_size: int = 2,
max_size: int = 5,
):
self.server_params = server_params
self.min_size = min_size
self.max_size = max_size
self._available: list = []
self._in_use: set = set()
self._lock = asyncio.Lock()
async def initialize(self):
"""创建初始连接"""
for _ in range(self.min_size):
session = await self._create_session()
self._available.append(session)
async def acquire(self):
"""获取连接"""
async with self._lock:
while not self._available:
if len(self._in_use) < self.max_size:
session = await self._create_session()
self._available.append(session)
else:
# 等待可用连接
await asyncio.sleep(0.1)
session = self._available.pop()
self._in_use.add(id(session))
return session
async def release(self, session):
"""释放连接"""
async with self._lock:
self._in_use.discard(id(session))
if len(self._available) < self.max_size:
self._available.append(session)
else:
await session.__aexit__(None, None, None)
async def _create_session(self):
read, write = await stdio_client(
self.server_params
).__aenter__()
session = ClientSession(read, write)
await session.initialize()
return session
async def close(self):
"""关闭所有连接"""
for session in self._available:
await session.__aexit__(None, None, None)
self._available.clear()
连接配置管理
配置文件格式
{
"mcpServers": {
"filesystem": {
"command": "npx",
"args": [
"-y",
"@modelcontextprotocol/server-filesystem",
"/home/user/projects"
],
"env": {
"MCP_DEBUG": "false"
},
"connection": {
"reconnect": {
"maxRetries": 5,
"baseDelay": 1000,
"maxDelay": 30000
},
"heartbeat": {
"interval": 30000,
"maxFailures": 3
}
}
}
}
}
最佳实践
| 场景 | 推荐配置 |
|---|---|
| 本地开发 | stdio, 心跳30s |
| 远程服务 | SSE, 心跳15s |
| 生产环境 | stdio + 心跳 + 重连 |
| 多Server | 独立连接 + 工具路由 |
| 高并发 | 连接池 (5-10) |
总结
连接管理是MCP客户端稳定运行的关键:
| 组件 | 功能 | 重要性 |
|---|---|---|
| 生命周期 | 管理连接状态机 | P0 |
| 心跳 | 检测连接健康 | P0 |
| 重连 | 自动恢复连接 | P0 |
| 多Server | 管理多个连接 | P1 |
| 连接池 | 复用连接优化性能 | P1 |
下一步学习建议:
本文最后更新于 2024-07-14。