引言
当一个MCP Server从原型阶段走向生产环境,需要面对可靠性、性能、可维护性等挑战。本文将总结生产级MCP Server的最佳实践。
架构设计模式
分层架构
┌─────────────────────────────────────────────────────────┐
│ MCP Server 架构 │
├─────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ Transport Layer │ │
│ │ (stdio / SSE / WebSocket) │ │
│ └────────────────────┬────────────────────────────┘ │
│ │ │
│ ┌────────────────────▼────────────────────────────┐ │
│ │ Protocol Layer │ │
│ │ JSON-RPC 消息解析 / 路由 / 错误编码 │ │
│ └────────────────────┬────────────────────────────┘ │
│ │ │
│ ┌────────────────────▼────────────────────────────┐ │
│ │ Service Layer │ │
│ │ 认证 → 限流 → 日志 → 工具路由 │ │
│ └────────────────────┬────────────────────────────┘ │
│ │ │
│ ┌────────────────────▼────────────────────────────┐ │
│ │ Tool Layer │ │
│ │ 实际工具实现代码 │ │
│ └─────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────┘
各层职责
| 层级 | 职责 | 技术实现 |
|---|---|---|
| Transport Layer | 通信协议处理 | stdio/SSE/WebSocket |
| Protocol Layer | JSON-RPC消息处理 | MCP SDK |
| Service Layer | 通用服务逻辑 | 中间件链 |
| Tool Layer | 实际业务逻辑 | 工具处理器 |
错误处理模式
统一错误处理
// 自定义错误类
class MCPToolError extends Error {
constructor(
message: string,
public code: number = -32000,
public data?: any
) {
super(message);
this.name = "MCPToolError";
}
}
class PermissionError extends MCPToolError {
constructor(message: string) {
super(message, -32003);
}
}
class TimeoutError extends MCPToolError {
constructor(toolName: string, duration: number) {
super(`工具 ${toolName} 执行超时`, -32002, { toolName, duration });
}
}
// 全局错误处理中间件
function errorHandler(handler: Function) {
return async (...args: any[]) => {
try {
return await handler(...args);
} catch (error) {
if (error instanceof MCPToolError) {
return {
content: [{ type: "text", text: error.message }],
isError: true,
};
}
// 未知错误,统一处理
console.error("未处理的错误:", error);
return {
content: [{ type: "text", text: "服务器内部错误" }],
isError: true,
};
}
};
}
超时控制
function withTimeout<T>(
promise: Promise<T>,
timeoutMs: number,
toolName: string
): Promise<T> {
return Promise.race([
promise,
new Promise<T>((_, reject) =>
setTimeout(
() => reject(new TimeoutError(toolName, timeoutMs)),
timeoutMs
)
),
]);
}
// 使用
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
// 长时间操作设置超时
if (name === "process_large_file") {
return withTimeout(
handleLargeFile(args),
60000, // 60秒超时
name
);
}
return handleTool(name, args);
});
日志与监控
结构化日志
// 结构化日志系统
interface LogEntry {
timestamp: string;
level: "INFO" | "WARN" | "ERROR";
component: string;
message: string;
duration?: number;
toolName?: string;
error?: string;
requestId?: string;
}
class StructuredLogger {
private logs: LogEntry[] = [];
log(entry: LogEntry): void {
// 输出到stderr(MCP约定)
console.error(JSON.stringify(entry));
this.logs.push(entry);
// 保留最近1000条日志
if (this.logs.length > 1000) {
this.logs.shift();
}
}
info(component: string, message: string, data?: Partial<LogEntry>): void {
this.log({
timestamp: new Date().toISOString(),
level: "INFO",
component,
message,
...data,
});
}
error(component: string, message: string, error?: any): void {
this.log({
timestamp: new Date().toISOString(),
level: "ERROR",
component,
message,
error: error?.message || String(error),
});
}
// 工具调用日志
logToolCall(toolName: string, args: any, duration: number): void {
this.info("tool", `工具调用: ${toolName}`, {
toolName,
duration,
requestId: args._meta?.requestId,
});
}
}
const logger = new StructuredLogger();
健康检查端点
// 添加健康检查工具
toolRegistry.register({
name: "_health",
description: "服务器健康检查(内部使用)",
inputSchema: {
type: "object",
properties: {},
},
handler: async () => {
return {
content: [{
type: "text",
text: JSON.stringify({
status: "healthy",
uptime: process.uptime(),
memory: process.memoryUsage(),
toolCount: toolRegistry.getDefinitions().length,
version: "1.0.0",
}, null, 2),
}],
isError: false,
};
},
});
性能优化
缓存策略
// 带TTL的缓存
class TTLCache<K, V> {
private cache = new Map<K, { value: V; expiresAt: number }>();
constructor(private defaultTTL: number = 5000) {}
get(key: K): V | undefined {
const entry = this.cache.get(key);
if (entry && Date.now() < entry.expiresAt) {
return entry.value;
}
this.cache.delete(key);
return undefined;
}
set(key: K, value: V, ttl?: number): void {
this.cache.set(key, {
value,
expiresAt: Date.now() + (ttl ?? this.defaultTTL),
});
}
invalidate(pattern?: (key: K) => boolean): void {
if (pattern) {
for (const key of this.cache.keys()) {
if (pattern(key)) this.cache.delete(key);
}
} else {
this.cache.clear();
}
}
}
// 使用缓存
const toolCache = new TTLCache<string, any>(5000);
server.setRequestHandler(ListToolsRequestSchema, async () => {
// 缓存工具列表5秒
const cached = toolCache.get("tools");
if (cached) return cached;
const tools = { tools: toolRegistry.getDefinitions() };
toolCache.set("tools", tools);
return tools;
});
资源限制
// 并发控制
class ConcurrencyLimiter {
private active = new Map<string, number>();
constructor(private maxConcurrent: number = 5) {}
async acquire(toolName: string): Promise<boolean> {
const current = this.active.get(toolName) || 0;
if (current >= this.maxConcurrent) {
return false; // 已达到并发上限
}
this.active.set(toolName, current + 1);
return true;
}
release(toolName: string): void {
const current = this.active.get(toolName) || 0;
if (current <= 1) {
this.active.delete(toolName);
} else {
this.active.set(toolName, current - 1);
}
}
}
const concurrencyLimiter = new ConcurrencyLimiter(3);
// 在工具调用中使用
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name } = request.params;
if (!(await concurrencyLimiter.acquire(name))) {
return {
content: [{
type: "text",
text: `工具 '${name}' 当前并发数已达上限,请稍后重试`,
}],
isError: true,
};
}
try {
return await executeTool(request.params);
} finally {
concurrencyLimiter.release(name);
}
});
测试策略
单元测试
// tools/calculator.test.ts
import { describe, it, expect } from "vitest";
import { executeCalculator } from "./calculator";
describe("Calculator Tool", () => {
it("should add two numbers", () => {
const result = executeCalculator({
operation: "add",
a: 5,
b: 3,
});
expect(result).toEqual({ result: 8 });
});
it("should reject division by zero", () => {
expect(() =>
executeCalculator({ operation: "divide", a: 10, b: 0 })
).toThrow("除数不能为0");
});
it("should reject invalid operation", () => {
expect(() =>
executeCalculator({ operation: "modulo", a: 5, b: 3 })
).toThrow();
});
});
集成测试
// tests/server.integration.test.ts
import { describe, it, expect, beforeAll, afterAll } from "vitest";
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from
"@modelcontextprotocol/sdk/client/stdio.js";
describe("MCP Server Integration", () => {
let client: Client;
let transport: StdioClientTransport;
beforeAll(async () => {
transport = new StdioClientTransport({
command: "node",
args: ["dist/index.js"],
});
client = new Client(
{ name: "test-client", version: "1.0.0" },
{ capabilities: {} }
);
await client.connect(transport);
});
afterAll(async () => {
await client.close();
});
it("should list tools", async () => {
const result = await client.listTools();
expect(result.tools.length).toBeGreaterThan(0);
});
it("should call calculator tool", async () => {
const result = await client.callTool({
name: "calculate",
arguments: { operation: "add", a: 5, b: 3 },
});
expect(result.content[0].text).toContain("8");
});
it("should handle errors gracefully", async () => {
const result = await client.callTool({
name: "calculate",
arguments: { operation: "divide", a: 1, b: 0 },
});
expect(result.isError).toBe(true);
});
});
CI/CD配置
GitHub Actions
# .github/workflows/mcp-server.yml
name: MCP Server CI/CD
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-node@v4
with:
node-version: 20
cache: 'npm'
- run: npm ci
- run: npm run lint
- run: npm run test
- run: npm run build
deploy:
needs: test
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- run: npm ci && npm run build
- name: Build and push Docker
run: |
docker build -t my-mcp-server .
docker tag my-mcp-server registry.example.com/my-mcp-server:latest
docker push registry.example.com/my-mcp-server:latest
部署运维
健康检查脚本
#!/bin/bash
# healthcheck.sh - 用于Docker的健康检查
# 启动一个测试Client并验证Server响应
echo '{"jsonrpc":"2.0","id":1,"method":"tools/list"}' | \
timeout 5 node dist/index.js 2>/dev/null | \
grep -q '"tools"' && exit 0 || exit 1
Docker Compose部署
# docker-compose.yml
version: '3.8'
services:
mcp-server:
build: .
restart: unless-stopped
environment:
- NODE_ENV=production
- MCP_API_KEY=${MCP_API_KEY}
volumes:
- ./data:/data:ro
healthcheck:
test: ["CMD", "./healthcheck.sh"]
interval: 30s
timeout: 10s
retries: 3
deploy:
resources:
limits:
cpus: '1'
memory: 512M
总结
生产级MCP Server需要关注以下维度:
| 维度 | 关键实践 | 优先级 |
|---|---|---|
| 可靠性 | 统一错误处理、超时控制、重试 | P0 |
| 安全性 | 输入验证、路径控制、沙箱 | P0 |
| 可观测性 | 结构化日志、健康检查 | P1 |
| 性能 | 缓存、并发控制、资源限制 | P1 |
| 可测试性 | 单元测试、集成测试 | P1 |
| 可部署性 | Docker化、CI/CD | P2 |
下一步学习建议:
本文最后更新于 2024-07-10。