MCP协议 进阶 MCP 错误处理 重试策略 降级

MCP错误处理策略:构建健壮的客户端集成

AIEng Hub
阅读约 15 分钟

引言

错误处理是MCP客户端开发中最容易被忽视但却最关键的部分。一个健壮的错误处理策略决定了AI工具系统的可靠性。

错误分类

MCP错误层次

MCP错误分类
├── 协议层错误
│   ├── JSON-RPC解析错误 (-32700)
│   ├── 无效请求 (-32600)
│   ├── 方法未找到 (-32601)
│   ├── 参数无效 (-32602)
│   └── 内部错误 (-32603)
├── 传输层错误
│   ├── 连接断开
│   ├── 超时
│   └── 流错误
└── 应用层错误
    ├── 工具执行失败
    ├── 资源不可用
    └── 权限不足

错误类型定义

// 错误类型分类
enum MCPErrorCategory {
  PROTOCOL = "protocol",     // 协议层
  TRANSPORT = "transport",    // 传输层
  APPLICATION = "application", // 应用层
}

// 结构化错误
class MCPError extends Error {
  constructor(
    message: string,
    public category: MCPErrorCategory,
    public code?: number,
    public recoverable: boolean = false,
    public details?: any
  ) {
    super(message);
    this.name = "MCPError";
  }
}

// 传输层错误
class TransportError extends MCPError {
  constructor(message: string, recoverable: boolean = true) {
    super(message, MCPErrorCategory.TRANSPORT, undefined, recoverable);
    this.name = "TransportError";
  }
}

// 协议层错误
class ProtocolError extends MCPError {
  constructor(code: number, message: string) {
    super(message, MCPErrorCategory.PROTOCOL, code, false);
    this.name = "ProtocolError";
  }
}

// 应用层错误
class ToolExecutionError extends MCPError {
  constructor(toolName: string, errorMsg: string) {
    super(
      `工具 '${toolName}' 执行失败: ${errorMsg}`,
      MCPErrorCategory.APPLICATION,
      undefined,
      false,
      { toolName }
    );
    this.name = "ToolExecutionError";
  }
}

重试策略

可重试 vs 不可重试

function isRetryable(error: MCPError): boolean {
  switch (true) {
    // 可重试:传输层错误
    case error instanceof TransportError:
      return true;

    // 不可重试:协议层错误
    case error instanceof ProtocolError:
      return false;

    // 工具执行错误:根据具体场景判断
    case error instanceof ToolExecutionError:
      // 如果工具执行有副作用,不自动重试
      return false;

    default:
      return false;
  }
}

带重试的执行器

interface RetryConfig {
  maxRetries: number;
  baseDelay: number;
  maxDelay: number;
  retryableErrors: Array<new (...args: any[]) => MCPError>;
}

class RetryExecutor {
  private config: RetryConfig;

  constructor(config?: Partial<RetryConfig>) {
    this.config = {
      maxRetries: 3,
      baseDelay: 1000,
      maxDelay: 10000,
      retryableErrors: [TransportError],
      ...config,
    };
  }

  async execute<T>(
    fn: () => Promise<T>,
    context: string
  ): Promise<T> {
    let lastError: Error;

    for (let attempt = 0; attempt <= this.config.maxRetries; attempt++) {
      try {
        return await fn();
      } catch (error) {
        lastError = error;

        if (!this.shouldRetry(error, attempt)) {
          throw error;
        }

        const delay = this.calculateDelay(attempt);
        console.warn(
          `[${context}] 第 ${attempt + 1} 次重试,等待 ${delay}ms`
        );
        await this.sleep(delay);
      }
    }

    throw lastError!;
  }

  private shouldRetry(error: any, attempt: number): boolean {
    if (attempt >= this.config.maxRetries) return false;

    return this.config.retryableErrors.some(
      (ErrorType) => error instanceof ErrorType
    );
  }

  private calculateDelay(attempt: number): number {
    const delay = Math.min(
      this.config.baseDelay * Math.pow(2, attempt),
      this.config.maxDelay
    );
    // 增加随机抖动,避免惊群效应
    return delay + Math.random() * 1000;
  }

  private sleep(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

// 使用示例
const executor = new RetryExecutor({
  maxRetries: 3,
  baseDelay: 1000,
});

// 带重试的工具调用
const result = await executor.execute(
  () => client.callTool({ name: "search", arguments: { query: "MCP" } }),
  "search_tool"
);

Python重试实现

import asyncio
import random
from functools import wraps


def retryable(
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 30.0,
    retryable_exceptions: tuple = (ConnectionError, TimeoutError),
):
    """重试装饰器"""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            last_exception = None

            for attempt in range(max_retries + 1):
                try:
                    return await func(*args, **kwargs)
                except retryable_exceptions as e:
                    last_exception = e
                    if attempt < max_retries:
                        delay = min(
                            base_delay * (2 ** attempt) +
                            random.uniform(0, 1),
                            max_delay
                        )
                        print(
                            f"重试 {attempt + 1}/{max_retries}, "
                            f"等待 {delay:.1f}s..."
                        )
                        await asyncio.sleep(delay)
                    else:
                        print(f"重试耗尽,最终失败: {e}")

            raise last_exception
        return wrapper
    return decorator


# 使用
class MCPClient:
    @retryable(max_retries=3)
    async def call_tool(self, name: str, arguments: dict):
        # 工具调用逻辑
        pass

降级策略

功能降级

// 降级级别
enum DegradationLevel {
  NONE = "none",          // 正常
  PARTIAL = "partial",    // 部分降级
  DEGRADED = "degraded",  // 严重降级
  OFFLINE = "offline",    // 离线模式
}

class DegradationManager {
  private level: DegradationLevel = DegradationLevel.NONE;
  private failedTools: Set<string> = new Set();

  recordFailure(toolName: string): void {
    this.failedTools.add(toolName);

    if (this.failedTools.size >= 5) {
      this.level = DegradationLevel.PARTIAL;
    }

    if (this.failedTools.size >= 10) {
      this.level = DegradationLevel.DEGRADED;
    }
  }

  recoverFailure(toolName: string): void {
    this.failedTools.delete(toolName);

    if (this.failedTools.size === 0) {
      this.level = DegradationLevel.NONE;
    }
  }

  isToolAvailable(toolName: string): boolean {
    if (this.failedTools.has(toolName)) {
      return false;
    }
    return this.level !== DegradationLevel.OFFLINE;
  }

  getStatus(): string {
    switch (this.level) {
      case DegradationLevel.NONE:
        return "所有功能正常";
      case DegradationLevel.PARTIAL:
        return `${this.failedTools.size} 个工具不可用`;
      case DegradationLevel.DEGRADED:
        return "多个工具不可用,建议检查连接";
      case DegradationLevel.OFFLINE:
        return "客户端离线";
    }
  }
}

降级操作实现

// 选择器:尝试第一个可用的服务
async function executeWithFallback(
  primary: () => Promise<any>,
  fallback: () => Promise<any>,
  context: string
): Promise<any> {
  try {
    return await primary();
  } catch (error) {
    console.warn(`主服务失败 (${context}),切换到备用服务`);
    try {
      return await fallback();
    } catch (fallbackError) {
      throw new Error(
        `主服务和备用服务均失败: ${error.message} | ${fallbackError.message}`
      );
    }
  }
}

// 缓存降级:当工具不可用时使用缓存
class CacheFallback<T> {
  private cache = new Map<string, { value: T; timestamp: number }>();
  private ttl: number;

  constructor(ttlMs: number = 60000) {
    this.ttl = ttlMs;
  }

  async getOrFetch(
    key: string,
    fetcher: () => Promise<T>,
    useCacheOnError: boolean = true
  ): Promise<T> {
    // 检查缓存是否有效
    const cached = this.cache.get(key);
    if (cached && Date.now() - cached.timestamp < this.ttl) {
      return cached.value;
    }

    try {
      const value = await fetcher();
      this.cache.set(key, { value, timestamp: Date.now() });
      return value;
    } catch (error) {
      // 获取失败时使用过期缓存
      if (useCacheOnError && cached) {
        console.warn(`获取失败,使用缓存数据`);
        return cached.value;
      }
      throw error;
    }
  }
}

日志追踪

请求追踪

class RequestTracer {
  private requestId: string;

  constructor() {
    this.requestId = this.generateId();
  }

  private generateId(): string {
    return `mcp-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
  }

  async traceToolCall<T>(
    toolName: string,
    args: any,
    executor: () => Promise<T>
  ): Promise<T> {
    const startTime = Date.now();
    console.log(
      JSON.stringify({
        event: "tool_call_start",
        requestId: this.requestId,
        tool: toolName,
        timestamp: new Date().toISOString(),
      })
    );

    try {
      const result = await executor();
      const duration = Date.now() - startTime;
      console.log(
        JSON.stringify({
          event: "tool_call_success",
          requestId: this.requestId,
          tool: toolName,
          duration,
          timestamp: new Date().toISOString(),
        })
      );
      return result;
    } catch (error) {
      const duration = Date.now() - startTime;
      console.error(
        JSON.stringify({
          event: "tool_call_error",
          requestId: this.requestId,
          tool: toolName,
          duration,
          error: error.message,
          timestamp: new Date().toISOString(),
        })
      );
      throw error;
    }
  }
}

监控告警

错误指标收集

interface ErrorMetrics {
  totalErrors: number;
  errorsByType: Map<string, number>;
  errorsByTool: Map<string, number>;
  recentErrors: Array<{
    timestamp: Date;
    type: string;
    tool: string;
    message: string;
  }>;
}

class ErrorMonitor {
  private metrics: ErrorMetrics = {
    totalErrors: 0,
    errorsByType: new Map(),
    errorsByTool: new Map(),
    recentErrors: [],
  };

  recordError(error: MCPError, toolName?: string): void {
    this.metrics.totalErrors++;
    this.metrics.errorsByType.set(
      error.category,
      (this.metrics.errorsByType.get(error.category) || 0) + 1
    );

    if (toolName) {
      this.metrics.errorsByTool.set(
        toolName,
        (this.metrics.errorsByTool.get(toolName) || 0) + 1
      );
    }

    // 保留最近20条错误
    this.metrics.recentErrors.push({
      timestamp: new Date(),
      type: error.category,
      tool: toolName || "unknown",
      message: error.message,
    });
    if (this.metrics.recentErrors.length > 20) {
      this.metrics.recentErrors.shift();
    }

    // 触发告警:错误率过高
    if (this.getErrorRate() > 0.1) {
      this.triggerAlert("错误率超过 10%");
    }
  }

  getErrorRate(): number {
    // 计算最近5分钟的错误率
    const recent = this.metrics.recentErrors.filter(
      e => Date.now() - e.timestamp.getTime() < 300000
    );
    return recent.length / 300; // 假设每秒有1个请求
  }

  private triggerAlert(message: string): void {
    console.error(`[ALERT] ${message}`);
    // 可以集成发送告警通知
  }

  getReport(): string {
    return [
      "=== 错误监控报告 ===",
      `总错误数: ${this.metrics.totalErrors}`,
      "",
      "按错误类型:",
      ...Array.from(this.metrics.errorsByType.entries())
        .map(([type, count]) => `  ${type}: ${count}次`),
      "",
      "按工具:",
      ...Array.from(this.metrics.errorsByTool.entries())
        .map(([tool, count]) => `  ${tool}: ${count}次`),
      "",
      "最近错误:",
      ...this.metrics.recentErrors.slice(-5).map(
        e => `  [${e.timestamp.toISOString()}] ${e.type}: ${e.message}`
      ),
    ].join("\n");
  }
}

错误处理流程

完整流程

工具调用请求


参数验证 ──失败──► 返回参数错误

    ▼ 通过
执行重试逻辑

    ├── 成功 ──► 返回结果

    └── 失败


    判断错误类型

    ┌────┴────┐
    │         │
  可重试    不可重试
    │         │
    ▼         ▼
  重试    检查降级方案
   (最多3次)   │
    │     ┌────┴────┐
    ▼     │         │
  最终   有缓存    无缓存
  失败     │         │
    │     ▼         ▼
    │  返回缓存   返回错误


记录日志 + 更新指标

最佳实践

实践说明示例
明确错误类型区分协议/传输/应用错误传输错误可重试,协议错误不可
有限重试最多3次,指数退避2^n秒递增
幂等性只重试无副作用的操作读操作可重试,写操作不可
优雅降级功能降级而非完全崩溃使用缓存或备用服务
充分日志记录错误上下文请求ID、工具名、耗时
监控告警错误率过高时触发10%错误率告警

总结

MCP客户端错误处理的核心策略:

策略目的实现方式
分类区分错误类型结构化错误层次
重试处理临时故障指数退避重试
降级保证核心功能缓存/备用服务
监控发现异常模式错误指标/告警
追踪定位根因请求链路追踪

下一步学习建议:


本文最后更新于 2024-07-16。