MCP协议 高级 MCP Server开发 最佳实践 架构设计

MCP Server最佳实践:生产级服务端的架构与运维

AIEng Hub
阅读约 20 分钟

引言

当一个MCP Server从原型阶段走向生产环境,需要面对可靠性、性能、可维护性等挑战。本文将总结生产级MCP Server的最佳实践。

架构设计模式

分层架构

┌─────────────────────────────────────────────────────────┐
│                   MCP Server 架构                        │
├─────────────────────────────────────────────────────────┤
│                                                         │
│  ┌─────────────────────────────────────────────────┐  │
│  │                Transport Layer                   │  │
│  │    (stdio / SSE / WebSocket)                     │  │
│  └────────────────────┬────────────────────────────┘  │
│                       │                                │
│  ┌────────────────────▼────────────────────────────┐  │
│  │                Protocol Layer                    │  │
│  │    JSON-RPC 消息解析 / 路由 / 错误编码           │  │
│  └────────────────────┬────────────────────────────┘  │
│                       │                                │
│  ┌────────────────────▼────────────────────────────┐  │
│  │              Service Layer                       │  │
│  │    认证 → 限流 → 日志 → 工具路由                 │  │
│  └────────────────────┬────────────────────────────┘  │
│                       │                                │
│  ┌────────────────────▼────────────────────────────┐  │
│  │               Tool Layer                         │  │
│  │    实际工具实现代码                               │  │
│  └─────────────────────────────────────────────────┘  │
│                                                         │
└─────────────────────────────────────────────────────────┘

各层职责

层级职责技术实现
Transport Layer通信协议处理stdio/SSE/WebSocket
Protocol LayerJSON-RPC消息处理MCP SDK
Service Layer通用服务逻辑中间件链
Tool Layer实际业务逻辑工具处理器

错误处理模式

统一错误处理

// 自定义错误类
class MCPToolError extends Error {
  constructor(
    message: string,
    public code: number = -32000,
    public data?: any
  ) {
    super(message);
    this.name = "MCPToolError";
  }
}

class PermissionError extends MCPToolError {
  constructor(message: string) {
    super(message, -32003);
  }
}

class TimeoutError extends MCPToolError {
  constructor(toolName: string, duration: number) {
    super(`工具 ${toolName} 执行超时`, -32002, { toolName, duration });
  }
}

// 全局错误处理中间件
function errorHandler(handler: Function) {
  return async (...args: any[]) => {
    try {
      return await handler(...args);
    } catch (error) {
      if (error instanceof MCPToolError) {
        return {
          content: [{ type: "text", text: error.message }],
          isError: true,
        };
      }

      // 未知错误,统一处理
      console.error("未处理的错误:", error);
      return {
        content: [{ type: "text", text: "服务器内部错误" }],
        isError: true,
      };
    }
  };
}

超时控制

function withTimeout<T>(
  promise: Promise<T>,
  timeoutMs: number,
  toolName: string
): Promise<T> {
  return Promise.race([
    promise,
    new Promise<T>((_, reject) =>
      setTimeout(
        () => reject(new TimeoutError(toolName, timeoutMs)),
        timeoutMs
      )
    ),
  ]);
}

// 使用
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;

  // 长时间操作设置超时
  if (name === "process_large_file") {
    return withTimeout(
      handleLargeFile(args),
      60000, // 60秒超时
      name
    );
  }

  return handleTool(name, args);
});

日志与监控

结构化日志

// 结构化日志系统
interface LogEntry {
  timestamp: string;
  level: "INFO" | "WARN" | "ERROR";
  component: string;
  message: string;
  duration?: number;
  toolName?: string;
  error?: string;
  requestId?: string;
}

class StructuredLogger {
  private logs: LogEntry[] = [];

  log(entry: LogEntry): void {
    // 输出到stderr(MCP约定)
    console.error(JSON.stringify(entry));
    this.logs.push(entry);

    // 保留最近1000条日志
    if (this.logs.length > 1000) {
      this.logs.shift();
    }
  }

  info(component: string, message: string, data?: Partial<LogEntry>): void {
    this.log({
      timestamp: new Date().toISOString(),
      level: "INFO",
      component,
      message,
      ...data,
    });
  }

  error(component: string, message: string, error?: any): void {
    this.log({
      timestamp: new Date().toISOString(),
      level: "ERROR",
      component,
      message,
      error: error?.message || String(error),
    });
  }

  // 工具调用日志
  logToolCall(toolName: string, args: any, duration: number): void {
    this.info("tool", `工具调用: ${toolName}`, {
      toolName,
      duration,
      requestId: args._meta?.requestId,
    });
  }
}

const logger = new StructuredLogger();

健康检查端点

// 添加健康检查工具
toolRegistry.register({
  name: "_health",
  description: "服务器健康检查(内部使用)",
  inputSchema: {
    type: "object",
    properties: {},
  },
  handler: async () => {
    return {
      content: [{
        type: "text",
        text: JSON.stringify({
          status: "healthy",
          uptime: process.uptime(),
          memory: process.memoryUsage(),
          toolCount: toolRegistry.getDefinitions().length,
          version: "1.0.0",
        }, null, 2),
      }],
      isError: false,
    };
  },
});

性能优化

缓存策略

// 带TTL的缓存
class TTLCache<K, V> {
  private cache = new Map<K, { value: V; expiresAt: number }>();

  constructor(private defaultTTL: number = 5000) {}

  get(key: K): V | undefined {
    const entry = this.cache.get(key);
    if (entry && Date.now() < entry.expiresAt) {
      return entry.value;
    }
    this.cache.delete(key);
    return undefined;
  }

  set(key: K, value: V, ttl?: number): void {
    this.cache.set(key, {
      value,
      expiresAt: Date.now() + (ttl ?? this.defaultTTL),
    });
  }

  invalidate(pattern?: (key: K) => boolean): void {
    if (pattern) {
      for (const key of this.cache.keys()) {
        if (pattern(key)) this.cache.delete(key);
      }
    } else {
      this.cache.clear();
    }
  }
}

// 使用缓存
const toolCache = new TTLCache<string, any>(5000);

server.setRequestHandler(ListToolsRequestSchema, async () => {
  // 缓存工具列表5秒
  const cached = toolCache.get("tools");
  if (cached) return cached;

  const tools = { tools: toolRegistry.getDefinitions() };
  toolCache.set("tools", tools);
  return tools;
});

资源限制

// 并发控制
class ConcurrencyLimiter {
  private active = new Map<string, number>();

  constructor(private maxConcurrent: number = 5) {}

  async acquire(toolName: string): Promise<boolean> {
    const current = this.active.get(toolName) || 0;
    if (current >= this.maxConcurrent) {
      return false; // 已达到并发上限
    }
    this.active.set(toolName, current + 1);
    return true;
  }

  release(toolName: string): void {
    const current = this.active.get(toolName) || 0;
    if (current <= 1) {
      this.active.delete(toolName);
    } else {
      this.active.set(toolName, current - 1);
    }
  }
}

const concurrencyLimiter = new ConcurrencyLimiter(3);

// 在工具调用中使用
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name } = request.params;

  if (!(await concurrencyLimiter.acquire(name))) {
    return {
      content: [{
        type: "text",
        text: `工具 '${name}' 当前并发数已达上限,请稍后重试`,
      }],
      isError: true,
    };
  }

  try {
    return await executeTool(request.params);
  } finally {
    concurrencyLimiter.release(name);
  }
});

测试策略

单元测试

// tools/calculator.test.ts
import { describe, it, expect } from "vitest";
import { executeCalculator } from "./calculator";

describe("Calculator Tool", () => {
  it("should add two numbers", () => {
    const result = executeCalculator({
      operation: "add",
      a: 5,
      b: 3,
    });
    expect(result).toEqual({ result: 8 });
  });

  it("should reject division by zero", () => {
    expect(() =>
      executeCalculator({ operation: "divide", a: 10, b: 0 })
    ).toThrow("除数不能为0");
  });

  it("should reject invalid operation", () => {
    expect(() =>
      executeCalculator({ operation: "modulo", a: 5, b: 3 })
    ).toThrow();
  });
});

集成测试

// tests/server.integration.test.ts
import { describe, it, expect, beforeAll, afterAll } from "vitest";
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from
  "@modelcontextprotocol/sdk/client/stdio.js";

describe("MCP Server Integration", () => {
  let client: Client;
  let transport: StdioClientTransport;

  beforeAll(async () => {
    transport = new StdioClientTransport({
      command: "node",
      args: ["dist/index.js"],
    });
    client = new Client(
      { name: "test-client", version: "1.0.0" },
      { capabilities: {} }
    );
    await client.connect(transport);
  });

  afterAll(async () => {
    await client.close();
  });

  it("should list tools", async () => {
    const result = await client.listTools();
    expect(result.tools.length).toBeGreaterThan(0);
  });

  it("should call calculator tool", async () => {
    const result = await client.callTool({
      name: "calculate",
      arguments: { operation: "add", a: 5, b: 3 },
    });
    expect(result.content[0].text).toContain("8");
  });

  it("should handle errors gracefully", async () => {
    const result = await client.callTool({
      name: "calculate",
      arguments: { operation: "divide", a: 1, b: 0 },
    });
    expect(result.isError).toBe(true);
  });
});

CI/CD配置

GitHub Actions

# .github/workflows/mcp-server.yml
name: MCP Server CI/CD

on:
  push:
    branches: [main]
  pull_request:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-node@v4
        with:
          node-version: 20
          cache: 'npm'

      - run: npm ci
      - run: npm run lint
      - run: npm run test
      - run: npm run build

  deploy:
    needs: test
    if: github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - run: npm ci && npm run build
      - name: Build and push Docker
        run: |
          docker build -t my-mcp-server .
          docker tag my-mcp-server registry.example.com/my-mcp-server:latest
          docker push registry.example.com/my-mcp-server:latest

部署运维

健康检查脚本

#!/bin/bash
# healthcheck.sh - 用于Docker的健康检查

# 启动一个测试Client并验证Server响应
echo '{"jsonrpc":"2.0","id":1,"method":"tools/list"}' | \
  timeout 5 node dist/index.js 2>/dev/null | \
  grep -q '"tools"' && exit 0 || exit 1

Docker Compose部署

# docker-compose.yml
version: '3.8'
services:
  mcp-server:
    build: .
    restart: unless-stopped
    environment:
      - NODE_ENV=production
      - MCP_API_KEY=${MCP_API_KEY}
    volumes:
      - ./data:/data:ro
    healthcheck:
      test: ["CMD", "./healthcheck.sh"]
      interval: 30s
      timeout: 10s
      retries: 3
    deploy:
      resources:
        limits:
          cpus: '1'
          memory: 512M

总结

生产级MCP Server需要关注以下维度:

维度关键实践优先级
可靠性统一错误处理、超时控制、重试P0
安全性输入验证、路径控制、沙箱P0
可观测性结构化日志、健康检查P1
性能缓存、并发控制、资源限制P1
可测试性单元测试、集成测试P1
可部署性Docker化、CI/CDP2

下一步学习建议:


本文最后更新于 2024-07-10。