引言
错误处理是MCP客户端开发中最容易被忽视但却最关键的部分。一个健壮的错误处理策略决定了AI工具系统的可靠性。
错误分类
MCP错误层次
MCP错误分类
├── 协议层错误
│ ├── JSON-RPC解析错误 (-32700)
│ ├── 无效请求 (-32600)
│ ├── 方法未找到 (-32601)
│ ├── 参数无效 (-32602)
│ └── 内部错误 (-32603)
├── 传输层错误
│ ├── 连接断开
│ ├── 超时
│ └── 流错误
└── 应用层错误
├── 工具执行失败
├── 资源不可用
└── 权限不足
错误类型定义
// 错误类型分类
enum MCPErrorCategory {
PROTOCOL = "protocol", // 协议层
TRANSPORT = "transport", // 传输层
APPLICATION = "application", // 应用层
}
// 结构化错误
class MCPError extends Error {
constructor(
message: string,
public category: MCPErrorCategory,
public code?: number,
public recoverable: boolean = false,
public details?: any
) {
super(message);
this.name = "MCPError";
}
}
// 传输层错误
class TransportError extends MCPError {
constructor(message: string, recoverable: boolean = true) {
super(message, MCPErrorCategory.TRANSPORT, undefined, recoverable);
this.name = "TransportError";
}
}
// 协议层错误
class ProtocolError extends MCPError {
constructor(code: number, message: string) {
super(message, MCPErrorCategory.PROTOCOL, code, false);
this.name = "ProtocolError";
}
}
// 应用层错误
class ToolExecutionError extends MCPError {
constructor(toolName: string, errorMsg: string) {
super(
`工具 '${toolName}' 执行失败: ${errorMsg}`,
MCPErrorCategory.APPLICATION,
undefined,
false,
{ toolName }
);
this.name = "ToolExecutionError";
}
}
重试策略
可重试 vs 不可重试
function isRetryable(error: MCPError): boolean {
switch (true) {
// 可重试:传输层错误
case error instanceof TransportError:
return true;
// 不可重试:协议层错误
case error instanceof ProtocolError:
return false;
// 工具执行错误:根据具体场景判断
case error instanceof ToolExecutionError:
// 如果工具执行有副作用,不自动重试
return false;
default:
return false;
}
}
带重试的执行器
interface RetryConfig {
maxRetries: number;
baseDelay: number;
maxDelay: number;
retryableErrors: Array<new (...args: any[]) => MCPError>;
}
class RetryExecutor {
private config: RetryConfig;
constructor(config?: Partial<RetryConfig>) {
this.config = {
maxRetries: 3,
baseDelay: 1000,
maxDelay: 10000,
retryableErrors: [TransportError],
...config,
};
}
async execute<T>(
fn: () => Promise<T>,
context: string
): Promise<T> {
let lastError: Error;
for (let attempt = 0; attempt <= this.config.maxRetries; attempt++) {
try {
return await fn();
} catch (error) {
lastError = error;
if (!this.shouldRetry(error, attempt)) {
throw error;
}
const delay = this.calculateDelay(attempt);
console.warn(
`[${context}] 第 ${attempt + 1} 次重试,等待 ${delay}ms`
);
await this.sleep(delay);
}
}
throw lastError!;
}
private shouldRetry(error: any, attempt: number): boolean {
if (attempt >= this.config.maxRetries) return false;
return this.config.retryableErrors.some(
(ErrorType) => error instanceof ErrorType
);
}
private calculateDelay(attempt: number): number {
const delay = Math.min(
this.config.baseDelay * Math.pow(2, attempt),
this.config.maxDelay
);
// 增加随机抖动,避免惊群效应
return delay + Math.random() * 1000;
}
private sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
// 使用示例
const executor = new RetryExecutor({
maxRetries: 3,
baseDelay: 1000,
});
// 带重试的工具调用
const result = await executor.execute(
() => client.callTool({ name: "search", arguments: { query: "MCP" } }),
"search_tool"
);
Python重试实现
import asyncio
import random
from functools import wraps
def retryable(
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
retryable_exceptions: tuple = (ConnectionError, TimeoutError),
):
"""重试装饰器"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries + 1):
try:
return await func(*args, **kwargs)
except retryable_exceptions as e:
last_exception = e
if attempt < max_retries:
delay = min(
base_delay * (2 ** attempt) +
random.uniform(0, 1),
max_delay
)
print(
f"重试 {attempt + 1}/{max_retries}, "
f"等待 {delay:.1f}s..."
)
await asyncio.sleep(delay)
else:
print(f"重试耗尽,最终失败: {e}")
raise last_exception
return wrapper
return decorator
# 使用
class MCPClient:
@retryable(max_retries=3)
async def call_tool(self, name: str, arguments: dict):
# 工具调用逻辑
pass
降级策略
功能降级
// 降级级别
enum DegradationLevel {
NONE = "none", // 正常
PARTIAL = "partial", // 部分降级
DEGRADED = "degraded", // 严重降级
OFFLINE = "offline", // 离线模式
}
class DegradationManager {
private level: DegradationLevel = DegradationLevel.NONE;
private failedTools: Set<string> = new Set();
recordFailure(toolName: string): void {
this.failedTools.add(toolName);
if (this.failedTools.size >= 5) {
this.level = DegradationLevel.PARTIAL;
}
if (this.failedTools.size >= 10) {
this.level = DegradationLevel.DEGRADED;
}
}
recoverFailure(toolName: string): void {
this.failedTools.delete(toolName);
if (this.failedTools.size === 0) {
this.level = DegradationLevel.NONE;
}
}
isToolAvailable(toolName: string): boolean {
if (this.failedTools.has(toolName)) {
return false;
}
return this.level !== DegradationLevel.OFFLINE;
}
getStatus(): string {
switch (this.level) {
case DegradationLevel.NONE:
return "所有功能正常";
case DegradationLevel.PARTIAL:
return `${this.failedTools.size} 个工具不可用`;
case DegradationLevel.DEGRADED:
return "多个工具不可用,建议检查连接";
case DegradationLevel.OFFLINE:
return "客户端离线";
}
}
}
降级操作实现
// 选择器:尝试第一个可用的服务
async function executeWithFallback(
primary: () => Promise<any>,
fallback: () => Promise<any>,
context: string
): Promise<any> {
try {
return await primary();
} catch (error) {
console.warn(`主服务失败 (${context}),切换到备用服务`);
try {
return await fallback();
} catch (fallbackError) {
throw new Error(
`主服务和备用服务均失败: ${error.message} | ${fallbackError.message}`
);
}
}
}
// 缓存降级:当工具不可用时使用缓存
class CacheFallback<T> {
private cache = new Map<string, { value: T; timestamp: number }>();
private ttl: number;
constructor(ttlMs: number = 60000) {
this.ttl = ttlMs;
}
async getOrFetch(
key: string,
fetcher: () => Promise<T>,
useCacheOnError: boolean = true
): Promise<T> {
// 检查缓存是否有效
const cached = this.cache.get(key);
if (cached && Date.now() - cached.timestamp < this.ttl) {
return cached.value;
}
try {
const value = await fetcher();
this.cache.set(key, { value, timestamp: Date.now() });
return value;
} catch (error) {
// 获取失败时使用过期缓存
if (useCacheOnError && cached) {
console.warn(`获取失败,使用缓存数据`);
return cached.value;
}
throw error;
}
}
}
日志追踪
请求追踪
class RequestTracer {
private requestId: string;
constructor() {
this.requestId = this.generateId();
}
private generateId(): string {
return `mcp-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
}
async traceToolCall<T>(
toolName: string,
args: any,
executor: () => Promise<T>
): Promise<T> {
const startTime = Date.now();
console.log(
JSON.stringify({
event: "tool_call_start",
requestId: this.requestId,
tool: toolName,
timestamp: new Date().toISOString(),
})
);
try {
const result = await executor();
const duration = Date.now() - startTime;
console.log(
JSON.stringify({
event: "tool_call_success",
requestId: this.requestId,
tool: toolName,
duration,
timestamp: new Date().toISOString(),
})
);
return result;
} catch (error) {
const duration = Date.now() - startTime;
console.error(
JSON.stringify({
event: "tool_call_error",
requestId: this.requestId,
tool: toolName,
duration,
error: error.message,
timestamp: new Date().toISOString(),
})
);
throw error;
}
}
}
监控告警
错误指标收集
interface ErrorMetrics {
totalErrors: number;
errorsByType: Map<string, number>;
errorsByTool: Map<string, number>;
recentErrors: Array<{
timestamp: Date;
type: string;
tool: string;
message: string;
}>;
}
class ErrorMonitor {
private metrics: ErrorMetrics = {
totalErrors: 0,
errorsByType: new Map(),
errorsByTool: new Map(),
recentErrors: [],
};
recordError(error: MCPError, toolName?: string): void {
this.metrics.totalErrors++;
this.metrics.errorsByType.set(
error.category,
(this.metrics.errorsByType.get(error.category) || 0) + 1
);
if (toolName) {
this.metrics.errorsByTool.set(
toolName,
(this.metrics.errorsByTool.get(toolName) || 0) + 1
);
}
// 保留最近20条错误
this.metrics.recentErrors.push({
timestamp: new Date(),
type: error.category,
tool: toolName || "unknown",
message: error.message,
});
if (this.metrics.recentErrors.length > 20) {
this.metrics.recentErrors.shift();
}
// 触发告警:错误率过高
if (this.getErrorRate() > 0.1) {
this.triggerAlert("错误率超过 10%");
}
}
getErrorRate(): number {
// 计算最近5分钟的错误率
const recent = this.metrics.recentErrors.filter(
e => Date.now() - e.timestamp.getTime() < 300000
);
return recent.length / 300; // 假设每秒有1个请求
}
private triggerAlert(message: string): void {
console.error(`[ALERT] ${message}`);
// 可以集成发送告警通知
}
getReport(): string {
return [
"=== 错误监控报告 ===",
`总错误数: ${this.metrics.totalErrors}`,
"",
"按错误类型:",
...Array.from(this.metrics.errorsByType.entries())
.map(([type, count]) => ` ${type}: ${count}次`),
"",
"按工具:",
...Array.from(this.metrics.errorsByTool.entries())
.map(([tool, count]) => ` ${tool}: ${count}次`),
"",
"最近错误:",
...this.metrics.recentErrors.slice(-5).map(
e => ` [${e.timestamp.toISOString()}] ${e.type}: ${e.message}`
),
].join("\n");
}
}
错误处理流程
完整流程
工具调用请求
│
▼
参数验证 ──失败──► 返回参数错误
│
▼ 通过
执行重试逻辑
│
├── 成功 ──► 返回结果
│
└── 失败
│
▼
判断错误类型
│
┌────┴────┐
│ │
可重试 不可重试
│ │
▼ ▼
重试 检查降级方案
(最多3次) │
│ ┌────┴────┐
▼ │ │
最终 有缓存 无缓存
失败 │ │
│ ▼ ▼
│ 返回缓存 返回错误
│
▼
记录日志 + 更新指标
最佳实践
| 实践 | 说明 | 示例 |
|---|---|---|
| 明确错误类型 | 区分协议/传输/应用错误 | 传输错误可重试,协议错误不可 |
| 有限重试 | 最多3次,指数退避 | 2^n秒递增 |
| 幂等性 | 只重试无副作用的操作 | 读操作可重试,写操作不可 |
| 优雅降级 | 功能降级而非完全崩溃 | 使用缓存或备用服务 |
| 充分日志 | 记录错误上下文 | 请求ID、工具名、耗时 |
| 监控告警 | 错误率过高时触发 | 10%错误率告警 |
总结
MCP客户端错误处理的核心策略:
| 策略 | 目的 | 实现方式 |
|---|---|---|
| 分类 | 区分错误类型 | 结构化错误层次 |
| 重试 | 处理临时故障 | 指数退避重试 |
| 降级 | 保证核心功能 | 缓存/备用服务 |
| 监控 | 发现异常模式 | 错误指标/告警 |
| 追踪 | 定位根因 | 请求链路追踪 |
下一步学习建议:
本文最后更新于 2024-07-16。