MCP协议 进阶 MCP 服务端开发 TypeScript Node.js

MCP服务端开发实战:从入门到精通

AIEng Hub
阅读约 35 分钟

引言

在上一篇文章中,我们了解了MCP协议的基本概念。本文将深入实战,带你从零开始构建一个生产级的MCP Server

我们将涵盖:

  • 项目架构设计
  • 工具和资源定义
  • 错误处理与日志
  • 性能优化
  • 部署策略

MCP Server 架构设计

核心架构图

┌─────────────────────────────────────────────────────────────┐
│                    MCP Server 架构                           │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│   ┌──────────────┐    ┌──────────────┐    ┌──────────────┐ │
│   │   Transport  │◄──►│    Server    │◄──►│   Handlers   │ │
│   │   (传输层)    │    │   (核心层)    │    │   (处理层)    │ │
│   └──────────────┘    └──────┬───────┘    └──────┬───────┘ │
│                              │                    │         │
│                              ▼                    ▼         │
│                        ┌──────────┐        ┌──────────┐    │
│                        │  Tools   │        │ Resources│    │
│                        │  工具集   │        │  资源集   │    │
│                        └──────────┘        └──────────┘    │
│                                                              │
└─────────────────────────────────────────────────────────────┘

项目结构

my-mcp-server/
├── src/
│   ├── index.ts              # 入口文件
│   ├── server.ts             # Server核心配置
│   ├── handlers/
│   │   ├── tools.ts          # 工具处理器
│   │   ├── resources.ts      # 资源处理器
│   │   └── prompts.ts        # 提示处理器
│   ├── tools/
│   │   ├── calculator.ts     # 计算器工具
│   │   ├── file-reader.ts    # 文件读取工具
│   │   └── api-client.ts     # API调用工具
│   ├── utils/
│   │   ├── logger.ts         # 日志工具
│   │   ├── validator.ts      # 验证工具
│   │   └── errors.ts         # 错误处理
│   └── types/
│       └── index.ts          # 类型定义
├── tests/
│   ├── unit/
│   └── integration/
├── dist/                     # 编译输出
├── package.json
├── tsconfig.json
└── README.md

实战:构建文件系统MCP Server

1. 项目初始化

# 创建项目目录
mkdir mcp-filesystem-server
cd mcp-filesystem-server

# 初始化项目
npm init -y

# 安装核心依赖
npm install @modelcontextprotocol/sdk zod

# 安装开发依赖
npm install -D typescript @types/node ts-node nodemon

# 初始化TypeScript
npx tsc --init

2. package.json 配置

{
  "name": "mcp-filesystem-server",
  "version": "1.0.0",
  "description": "MCP文件系统服务器",
  "type": "module",
  "main": "dist/index.js",
  "scripts": {
    "build": "tsc",
    "dev": "nodemon --exec ts-node src/index.ts",
    "start": "node dist/index.js",
    "test": "vitest"
  },
  "dependencies": {
    "@modelcontextprotocol/sdk": "^1.0.0",
    "zod": "^3.22.0"
  },
  "devDependencies": {
    "@types/node": "^20.0.0",
    "nodemon": "^3.0.0",
    "ts-node": "^10.9.0",
    "typescript": "^5.3.0",
    "vitest": "^1.0.0"
  }
}

3. TypeScript 配置

{
  "compilerOptions": {
    "target": "ES2022",
    "module": "Node16",
    "moduleResolution": "Node16",
    "outDir": "./dist",
    "rootDir": "./src",
    "strict": true,
    "esModuleInterop": true,
    "skipLibCheck": true,
    "forceConsistentCasingInFileNames": true,
    "resolveJsonModule": true,
    "declaration": true,
    "declarationMap": true,
    "sourceMap": true
  },
  "include": ["src/**/*"],
  "exclude": ["node_modules", "dist"]
}

4. 核心 Server 实现

// src/server.ts
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
  CallToolRequestSchema,
  ListToolsRequestSchema,
  ListResourcesRequestSchema,
  ReadResourceRequestSchema,
  ErrorCode,
  McpError,
} from "@modelcontextprotocol/sdk/types.js";
import { z } from "zod";
import * as fs from "fs/promises";
import * as path from "path";

// 配置类型
interface ServerConfig {
  name: string;
  version: string;
  allowedDirectories: string[];
}

// 工具参数定义
const ReadFileArgs = z.object({
  path: z.string().describe("文件路径"),
});

const WriteFileArgs = z.object({
  path: z.string().describe("文件路径"),
  content: z.string().describe("文件内容"),
});

const ListDirectoryArgs = z.object({
  path: z.string().describe("目录路径"),
});

const SearchFilesArgs = z.object({
  path: z.string().describe("搜索目录"),
  pattern: z.string().describe("搜索模式"),
  recursive: z.boolean().optional().default(false),
});

export class FileSystemServer {
  private server: Server;
  private config: ServerConfig;

  constructor(config: ServerConfig) {
    this.config = config;
    this.server = new Server(
      {
        name: config.name,
        version: config.version,
      },
      {
        capabilities: {
          tools: {},
          resources: {},
        },
      }
    );

    this.setupHandlers();
  }

  // 验证路径是否在允许范围内
  private validatePath(filePath: string): string {
    const normalized = path.normalize(filePath);
    const isAllowed = this.config.allowedDirectories.some((dir) =>
      normalized.startsWith(path.normalize(dir))
    );

    if (!isAllowed) {
      throw new McpError(
        ErrorCode.InvalidRequest,
        `路径不在允许范围内: ${filePath}`
      );
    }

    return normalized;
  }

  // 设置请求处理器
  private setupHandlers(): void {
    // 列出可用工具
    this.server.setRequestHandler(ListToolsRequestSchema, async () => {
      return {
        tools: [
          {
            name: "read_file",
            description: "读取文件内容。支持文本文件,返回文件内容。",
            inputSchema: {
              type: "object",
              properties: {
                path: {
                  type: "string",
                  description: "文件路径(绝对路径或相对路径)",
                },
              },
              required: ["path"],
            },
          },
          {
            name: "write_file",
            description: "写入文件内容。如果文件不存在则创建,存在则覆盖。",
            inputSchema: {
              type: "object",
              properties: {
                path: {
                  type: "string",
                  description: "文件路径",
                },
                content: {
                  type: "string",
                  description: "要写入的文件内容",
                },
              },
              required: ["path", "content"],
            },
          },
          {
            name: "list_directory",
            description: "列出目录内容,包括文件和子目录。",
            inputSchema: {
              type: "object",
              properties: {
                path: {
                  type: "string",
                  description: "目录路径",
                },
              },
              required: ["path"],
            },
          },
          {
            name: "search_files",
            description: "在目录中搜索文件,支持文件名匹配。",
            inputSchema: {
              type: "object",
              properties: {
                path: {
                  type: "string",
                  description: "搜索起始目录",
                },
                pattern: {
                  type: "string",
                  description: "搜索模式(支持通配符如 *.ts)",
                },
                recursive: {
                  type: "boolean",
                  description: "是否递归搜索子目录",
                  default: false,
                },
              },
              required: ["path", "pattern"],
            },
          },
          {
            name: "get_file_info",
            description: "获取文件元信息,包括大小、修改时间等。",
            inputSchema: {
              type: "object",
              properties: {
                path: {
                  type: "string",
                  description: "文件路径",
                },
              },
              required: ["path"],
            },
          },
        ],
      };
    });

    // 调用工具
    this.server.setRequestHandler(CallToolRequestSchema, async (request) => {
      const { name, arguments: args } = request.params;

      try {
        switch (name) {
          case "read_file":
            return await this.handleReadFile(ReadFileArgs.parse(args));
          case "write_file":
            return await this.handleWriteFile(WriteFileArgs.parse(args));
          case "list_directory":
            return await this.handleListDirectory(ListDirectoryArgs.parse(args));
          case "search_files":
            return await this.handleSearchFiles(SearchFilesArgs.parse(args));
          case "get_file_info":
            return await this.handleGetFileInfo(ReadFileArgs.parse(args));
          default:
            throw new McpError(
              ErrorCode.MethodNotFound,
              `未知工具: ${name}`
            );
        }
      } catch (error) {
        if (error instanceof z.ZodError) {
          throw new McpError(
            ErrorCode.InvalidParams,
            `参数验证失败: ${error.message}`
          );
        }
        throw error;
      }
    });

    // 列出可用资源
    this.server.setRequestHandler(ListResourcesRequestSchema, async () => {
      return {
        resources: [
          {
            uri: "file:///welcome.md",
            name: "欢迎文档",
            mimeType: "text/markdown",
            description: "MCP文件系统服务器的欢迎文档",
          },
        ],
      };
    });

    // 读取资源
    this.server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
      const { uri } = request.params;

      if (uri === "file:///welcome.md") {
        return {
          contents: [
            {
              uri,
              mimeType: "text/markdown",
              text: `# 欢迎使用 MCP 文件系统服务器

这是一个功能强大的文件系统管理工具,支持以下操作:

## 可用工具

- **read_file**: 读取文件内容
- **write_file**: 写入文件内容
- **list_directory**: 列出目录内容
- **search_files**: 搜索文件
- **get_file_info**: 获取文件信息

## 安全说明

所有操作都在配置的允许目录范围内进行,确保系统安全。`,
            },
          ],
        };
      }

      throw new McpError(
        ErrorCode.InvalidRequest,
        `未知资源: ${uri}`
      );
    });
  }

  // 读取文件
  private async handleReadFile(args: z.infer<typeof ReadFileArgs>) {
    const validatedPath = this.validatePath(args.path);

    try {
      const content = await fs.readFile(validatedPath, "utf-8");
      return {
        content: [
          {
            type: "text",
            text: content,
          },
        ],
      };
    } catch (error: any) {
      return {
        content: [
          {
            type: "text",
            text: `读取文件失败: ${error.message}`,
          },
        ],
        isError: true,
      };
    }
  }

  // 写入文件
  private async handleWriteFile(args: z.infer<typeof WriteFileArgs>) {
    const validatedPath = this.validatePath(args.path);

    try {
      // 确保目录存在
      const dir = path.dirname(validatedPath);
      await fs.mkdir(dir, { recursive: true });

      await fs.writeFile(validatedPath, args.content, "utf-8");
      return {
        content: [
          {
            type: "text",
            text: `文件写入成功: ${validatedPath}`,
          },
        ],
      };
    } catch (error: any) {
      return {
        content: [
          {
            type: "text",
            text: `写入文件失败: ${error.message}`,
          },
        ],
        isError: true,
      };
    }
  }

  // 列出目录
  private async handleListDirectory(args: z.infer<typeof ListDirectoryArgs>) {
    const validatedPath = this.validatePath(args.path);

    try {
      const entries = await fs.readdir(validatedPath, { withFileTypes: true });
      const formatted = entries
        .map((entry) => {
          const type = entry.isDirectory() ? "[DIR]" : "[FILE]";
          return `${type} ${entry.name}`;
        })
        .join("\n");

      return {
        content: [
          {
            type: "text",
            text: formatted || "目录为空",
          },
        ],
      };
    } catch (error: any) {
      return {
        content: [
          {
            type: "text",
            text: `列出目录失败: ${error.message}`,
          },
        ],
        isError: true,
      };
    }
  }

  // 搜索文件
  private async handleSearchFiles(args: z.infer<typeof SearchFilesArgs>) {
    const validatedPath = this.validatePath(args.path);
    const results: string[] = [];

    const search = async (dir: string, depth: number) => {
      if (!args.recursive && depth > 0) return;

      const entries = await fs.readdir(dir, { withFileTypes: true });

      for (const entry of entries) {
        const fullPath = path.join(dir, entry.name);

        if (entry.isDirectory() && args.recursive) {
          await search(fullPath, depth + 1);
        } else if (entry.name.match(args.pattern.replace(/\*/g, ".*"))) {
          results.push(fullPath);
        }
      }
    };

    try {
      await search(validatedPath, 0);
      return {
        content: [
          {
            type: "text",
            text: results.length > 0
              ? results.join("\n")
              : "未找到匹配的文件",
          },
        ],
      };
    } catch (error: any) {
      return {
        content: [
          {
            type: "text",
            text: `搜索文件失败: ${error.message}`,
          },
        ],
        isError: true,
      };
    }
  }

  // 获取文件信息
  private async handleGetFileInfo(args: z.infer<typeof ReadFileArgs>) {
    const validatedPath = this.validatePath(args.path);

    try {
      const stats = await fs.stat(validatedPath);
      const info = {
        path: validatedPath,
        size: stats.size,
        created: stats.birthtime.toISOString(),
        modified: stats.mtime.toISOString(),
        isFile: stats.isFile(),
        isDirectory: stats.isDirectory(),
      };

      return {
        content: [
          {
            type: "text",
            text: JSON.stringify(info, null, 2),
          },
        ],
      };
    } catch (error: any) {
      return {
        content: [
          {
            type: "text",
            text: `获取文件信息失败: ${error.message}`,
          },
        ],
        isError: true,
      };
    }
  }

  // 启动服务器
  async start(): Promise<void> {
    const transport = new StdioServerTransport();
    await this.server.connect(transport);
    console.error(`${this.config.name} v${this.config.version} 已启动`);
  }
}

5. 入口文件

// src/index.ts
import { FileSystemServer } from "./server.js";

// 从环境变量或命令行参数获取配置
const allowedDirs = process.env.ALLOWED_DIRS
  ? process.env.ALLOWED_DIRS.split(",")
  : [process.cwd()];

const server = new FileSystemServer({
  name: "filesystem-server",
  version: "1.0.0",
  allowedDirectories: allowedDirs,
});

server.start().catch((error) => {
  console.error("服务器启动失败:", error);
  process.exit(1);
});

进阶功能实现

1. 日志系统

// src/utils/logger.ts
export enum LogLevel {
  DEBUG = 0,
  INFO = 1,
  WARN = 2,
  ERROR = 3,
}

export class Logger {
  private level: LogLevel;
  private prefix: string;

  constructor(prefix: string = "MCP", level: LogLevel = LogLevel.INFO) {
    this.prefix = prefix;
    this.level = level;
  }

  private log(level: LogLevel, message: string, ...args: any[]): void {
    if (level < this.level) return;

    const timestamp = new Date().toISOString();
    const levelStr = LogLevel[level].padStart(5);
    const formatted = `[${timestamp}] [${levelStr}] [${this.prefix}] ${message}`;

    if (level >= LogLevel.ERROR) {
      console.error(formatted, ...args);
    } else {
      console.error(formatted, ...args);
    }
  }

  debug(message: string, ...args: any[]): void {
    this.log(LogLevel.DEBUG, message, ...args);
  }

  info(message: string, ...args: any[]): void {
    this.log(LogLevel.INFO, message, ...args);
  }

  warn(message: string, ...args: any[]): void {
    this.log(LogLevel.WARN, message, ...args);
  }

  error(message: string, ...args: any[]): void {
    this.log(LogLevel.ERROR, message, ...args);
  }
}

export const logger = new Logger();

2. 中间件系统

// src/middleware/index.ts
import { McpError, ErrorCode } from "@modelcontextprotocol/sdk/types.js";

export interface MiddlewareContext {
  toolName: string;
  arguments: any;
  startTime: number;
}

export type Middleware = (
  context: MiddlewareContext,
  next: () => Promise<any>
) => Promise<any>;

export class MiddlewareChain {
  private middlewares: Middleware[] = [];

  use(middleware: Middleware): void {
    this.middlewares.push(middleware);
  }

  async execute(context: MiddlewareContext, handler: () => Promise<any>): Promise<any> {
    let index = 0;

    const next = async (): Promise<any> => {
      if (index >= this.middlewares.length) {
        return await handler();
      }
      const middleware = this.middlewares[index++];
      return await middleware(context, next);
    };

    return await next();
  }
}

// 常用中间件
export const loggingMiddleware: Middleware = async (context, next) => {
  console.error(`[${new Date().toISOString()}] 调用工具: ${context.toolName}`);
  const result = await next();
  const duration = Date.now() - context.startTime;
  console.error(`[${new Date().toISOString()}] 工具完成: ${context.toolName} (${duration}ms)`);
  return result;
};

export const validationMiddleware = (schema: any): Middleware => {
  return async (context, next) => {
    try {
      context.arguments = schema.parse(context.arguments);
      return await next();
    } catch (error: any) {
      throw new McpError(
        ErrorCode.InvalidParams,
        `参数验证失败: ${error.message}`
      );
    }
  };
};

3. 性能监控

// src/utils/metrics.ts
export class MetricsCollector {
  private metrics: Map<string, number[]> = new Map();

  record(toolName: string, duration: number): void {
    if (!this.metrics.has(toolName)) {
      this.metrics.set(toolName, []);
    }
    this.metrics.get(toolName)!.push(duration);
  }

  getStats(toolName: string) {
    const times = this.metrics.get(toolName);
    if (!times || times.length === 0) return null;

    const sorted = [...times].sort((a, b) => a - b);
    const sum = sorted.reduce((a, b) => a + b, 0);

    return {
      count: times.length,
      avg: Math.round(sum / times.length),
      min: sorted[0],
      max: sorted[sorted.length - 1],
      p95: sorted[Math.floor(sorted.length * 0.95)],
      p99: sorted[Math.floor(sorted.length * 0.99)],
    };
  }

  getAllStats() {
    const result: Record<string, any> = {};
    for (const [tool, _] of this.metrics) {
      result[tool] = this.getStats(tool);
    }
    return result;
  }

  reset(): void {
    this.metrics.clear();
  }
}

export const metrics = new MetricsCollector();

测试策略

1. 单元测试

// tests/unit/server.test.ts
import { describe, it, expect, beforeEach } from "vitest";
import { FileSystemServer } from "../../src/server.js";

describe("FileSystemServer", () => {
  let server: FileSystemServer;

  beforeEach(() => {
    server = new FileSystemServer({
      name: "test-server",
      version: "1.0.0",
      allowedDirectories: ["/tmp/test"],
    });
  });

  it("应该正确验证允许的路径", () => {
    // 测试路径验证逻辑
    const validPath = "/tmp/test/file.txt";
    const invalidPath = "/etc/passwd";

    // 验证实现...
  });

  it("应该拒绝访问范围外的路径", async () => {
    // 测试安全边界...
  });
});

2. 集成测试

// tests/integration/tools.test.ts
import { describe, it, expect, beforeAll, afterAll } from "vitest";
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";

describe("工具集成测试", () => {
  let client: Client;
  let transport: StdioClientTransport;

  beforeAll(async () => {
    transport = new StdioClientTransport({
      command: "node",
      args: ["dist/index.js"],
    });

    client = new Client(
      { name: "test-client", version: "1.0.0" },
      { capabilities: {} }
    );

    await client.connect(transport);
  });

  afterAll(async () => {
    await client.close();
  });

  it("应该列出所有工具", async () => {
    const tools = await client.listTools();
    expect(tools.tools.length).toBeGreaterThan(0);
  });

  it("应该成功调用 read_file 工具", async () => {
    // 创建测试文件...
    // 调用工具并验证结果...
  });
});

部署与运维

1. Docker 部署

# Dockerfile
FROM node:20-alpine

WORKDIR /app

# 复制依赖文件
COPY package*.json ./
RUN npm ci --only=production

# 复制编译后的代码
COPY dist/ ./dist/

# 创建数据目录
RUN mkdir -p /data

# 环境变量
ENV NODE_ENV=production
ENV ALLOWED_DIRS=/data

# 非root用户运行
USER node

CMD ["node", "dist/index.js"]
# docker-compose.yml
version: "3.8"
services:
  mcp-filesystem:
    build: .
    container_name: mcp-filesystem-server
    volumes:
      - ./data:/data:ro
    environment:
      - ALLOWED_DIRS=/data
    restart: unless-stopped

2. 配置管理

// mcp-config.json
{
  "mcpServers": {
    "filesystem": {
      "command": "docker",
      "args": [
        "run",
        "-i",
        "--rm",
        "-v",
        "/home/user/docs:/data:ro",
        "mcp-filesystem-server"
      ],
      "env": {
        "ALLOWED_DIRS": "/data"
      }
    }
  }
}

性能优化建议

优化项说明实施建议
缓存缓存频繁访问的文件使用 LRU 缓存策略
流式处理大文件分块读取实现 ReadableStream
并发控制限制同时处理的请求数使用 p-limit 库
压缩传输大内容时启用压缩使用 gzip
连接池复用外部连接数据库/HTTP 连接池

总结

本文详细介绍了MCP Server的开发实战:

架构设计 - 清晰的分层架构
完整实现 - 文件系统Server示例
进阶功能 - 日志、中间件、监控
测试覆盖 - 单元测试和集成测试
部署运维 - Docker化部署方案

下一步建议:


本文最后更新于 2024-02-10,如有问题欢迎在社区讨论。