引言
在上一篇文章中,我们了解了MCP协议的基本概念。本文将深入实战,带你从零开始构建一个生产级的MCP Server。
我们将涵盖:
- 项目架构设计
- 工具和资源定义
- 错误处理与日志
- 性能优化
- 部署策略
MCP Server 架构设计
核心架构图
┌─────────────────────────────────────────────────────────────┐
│ MCP Server 架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Transport │◄──►│ Server │◄──►│ Handlers │ │
│ │ (传输层) │ │ (核心层) │ │ (处理层) │ │
│ └──────────────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ │
│ │ Tools │ │ Resources│ │
│ │ 工具集 │ │ 资源集 │ │
│ └──────────┘ └──────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
项目结构
my-mcp-server/
├── src/
│ ├── index.ts # 入口文件
│ ├── server.ts # Server核心配置
│ ├── handlers/
│ │ ├── tools.ts # 工具处理器
│ │ ├── resources.ts # 资源处理器
│ │ └── prompts.ts # 提示处理器
│ ├── tools/
│ │ ├── calculator.ts # 计算器工具
│ │ ├── file-reader.ts # 文件读取工具
│ │ └── api-client.ts # API调用工具
│ ├── utils/
│ │ ├── logger.ts # 日志工具
│ │ ├── validator.ts # 验证工具
│ │ └── errors.ts # 错误处理
│ └── types/
│ └── index.ts # 类型定义
├── tests/
│ ├── unit/
│ └── integration/
├── dist/ # 编译输出
├── package.json
├── tsconfig.json
└── README.md
实战:构建文件系统MCP Server
1. 项目初始化
# 创建项目目录
mkdir mcp-filesystem-server
cd mcp-filesystem-server
# 初始化项目
npm init -y
# 安装核心依赖
npm install @modelcontextprotocol/sdk zod
# 安装开发依赖
npm install -D typescript @types/node ts-node nodemon
# 初始化TypeScript
npx tsc --init
2. package.json 配置
{
"name": "mcp-filesystem-server",
"version": "1.0.0",
"description": "MCP文件系统服务器",
"type": "module",
"main": "dist/index.js",
"scripts": {
"build": "tsc",
"dev": "nodemon --exec ts-node src/index.ts",
"start": "node dist/index.js",
"test": "vitest"
},
"dependencies": {
"@modelcontextprotocol/sdk": "^1.0.0",
"zod": "^3.22.0"
},
"devDependencies": {
"@types/node": "^20.0.0",
"nodemon": "^3.0.0",
"ts-node": "^10.9.0",
"typescript": "^5.3.0",
"vitest": "^1.0.0"
}
}
3. TypeScript 配置
{
"compilerOptions": {
"target": "ES2022",
"module": "Node16",
"moduleResolution": "Node16",
"outDir": "./dist",
"rootDir": "./src",
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true,
"resolveJsonModule": true,
"declaration": true,
"declarationMap": true,
"sourceMap": true
},
"include": ["src/**/*"],
"exclude": ["node_modules", "dist"]
}
4. 核心 Server 实现
// src/server.ts
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
CallToolRequestSchema,
ListToolsRequestSchema,
ListResourcesRequestSchema,
ReadResourceRequestSchema,
ErrorCode,
McpError,
} from "@modelcontextprotocol/sdk/types.js";
import { z } from "zod";
import * as fs from "fs/promises";
import * as path from "path";
// 配置类型
interface ServerConfig {
name: string;
version: string;
allowedDirectories: string[];
}
// 工具参数定义
const ReadFileArgs = z.object({
path: z.string().describe("文件路径"),
});
const WriteFileArgs = z.object({
path: z.string().describe("文件路径"),
content: z.string().describe("文件内容"),
});
const ListDirectoryArgs = z.object({
path: z.string().describe("目录路径"),
});
const SearchFilesArgs = z.object({
path: z.string().describe("搜索目录"),
pattern: z.string().describe("搜索模式"),
recursive: z.boolean().optional().default(false),
});
export class FileSystemServer {
private server: Server;
private config: ServerConfig;
constructor(config: ServerConfig) {
this.config = config;
this.server = new Server(
{
name: config.name,
version: config.version,
},
{
capabilities: {
tools: {},
resources: {},
},
}
);
this.setupHandlers();
}
// 验证路径是否在允许范围内
private validatePath(filePath: string): string {
const normalized = path.normalize(filePath);
const isAllowed = this.config.allowedDirectories.some((dir) =>
normalized.startsWith(path.normalize(dir))
);
if (!isAllowed) {
throw new McpError(
ErrorCode.InvalidRequest,
`路径不在允许范围内: ${filePath}`
);
}
return normalized;
}
// 设置请求处理器
private setupHandlers(): void {
// 列出可用工具
this.server.setRequestHandler(ListToolsRequestSchema, async () => {
return {
tools: [
{
name: "read_file",
description: "读取文件内容。支持文本文件,返回文件内容。",
inputSchema: {
type: "object",
properties: {
path: {
type: "string",
description: "文件路径(绝对路径或相对路径)",
},
},
required: ["path"],
},
},
{
name: "write_file",
description: "写入文件内容。如果文件不存在则创建,存在则覆盖。",
inputSchema: {
type: "object",
properties: {
path: {
type: "string",
description: "文件路径",
},
content: {
type: "string",
description: "要写入的文件内容",
},
},
required: ["path", "content"],
},
},
{
name: "list_directory",
description: "列出目录内容,包括文件和子目录。",
inputSchema: {
type: "object",
properties: {
path: {
type: "string",
description: "目录路径",
},
},
required: ["path"],
},
},
{
name: "search_files",
description: "在目录中搜索文件,支持文件名匹配。",
inputSchema: {
type: "object",
properties: {
path: {
type: "string",
description: "搜索起始目录",
},
pattern: {
type: "string",
description: "搜索模式(支持通配符如 *.ts)",
},
recursive: {
type: "boolean",
description: "是否递归搜索子目录",
default: false,
},
},
required: ["path", "pattern"],
},
},
{
name: "get_file_info",
description: "获取文件元信息,包括大小、修改时间等。",
inputSchema: {
type: "object",
properties: {
path: {
type: "string",
description: "文件路径",
},
},
required: ["path"],
},
},
],
};
});
// 调用工具
this.server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
try {
switch (name) {
case "read_file":
return await this.handleReadFile(ReadFileArgs.parse(args));
case "write_file":
return await this.handleWriteFile(WriteFileArgs.parse(args));
case "list_directory":
return await this.handleListDirectory(ListDirectoryArgs.parse(args));
case "search_files":
return await this.handleSearchFiles(SearchFilesArgs.parse(args));
case "get_file_info":
return await this.handleGetFileInfo(ReadFileArgs.parse(args));
default:
throw new McpError(
ErrorCode.MethodNotFound,
`未知工具: ${name}`
);
}
} catch (error) {
if (error instanceof z.ZodError) {
throw new McpError(
ErrorCode.InvalidParams,
`参数验证失败: ${error.message}`
);
}
throw error;
}
});
// 列出可用资源
this.server.setRequestHandler(ListResourcesRequestSchema, async () => {
return {
resources: [
{
uri: "file:///welcome.md",
name: "欢迎文档",
mimeType: "text/markdown",
description: "MCP文件系统服务器的欢迎文档",
},
],
};
});
// 读取资源
this.server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
const { uri } = request.params;
if (uri === "file:///welcome.md") {
return {
contents: [
{
uri,
mimeType: "text/markdown",
text: `# 欢迎使用 MCP 文件系统服务器
这是一个功能强大的文件系统管理工具,支持以下操作:
## 可用工具
- **read_file**: 读取文件内容
- **write_file**: 写入文件内容
- **list_directory**: 列出目录内容
- **search_files**: 搜索文件
- **get_file_info**: 获取文件信息
## 安全说明
所有操作都在配置的允许目录范围内进行,确保系统安全。`,
},
],
};
}
throw new McpError(
ErrorCode.InvalidRequest,
`未知资源: ${uri}`
);
});
}
// 读取文件
private async handleReadFile(args: z.infer<typeof ReadFileArgs>) {
const validatedPath = this.validatePath(args.path);
try {
const content = await fs.readFile(validatedPath, "utf-8");
return {
content: [
{
type: "text",
text: content,
},
],
};
} catch (error: any) {
return {
content: [
{
type: "text",
text: `读取文件失败: ${error.message}`,
},
],
isError: true,
};
}
}
// 写入文件
private async handleWriteFile(args: z.infer<typeof WriteFileArgs>) {
const validatedPath = this.validatePath(args.path);
try {
// 确保目录存在
const dir = path.dirname(validatedPath);
await fs.mkdir(dir, { recursive: true });
await fs.writeFile(validatedPath, args.content, "utf-8");
return {
content: [
{
type: "text",
text: `文件写入成功: ${validatedPath}`,
},
],
};
} catch (error: any) {
return {
content: [
{
type: "text",
text: `写入文件失败: ${error.message}`,
},
],
isError: true,
};
}
}
// 列出目录
private async handleListDirectory(args: z.infer<typeof ListDirectoryArgs>) {
const validatedPath = this.validatePath(args.path);
try {
const entries = await fs.readdir(validatedPath, { withFileTypes: true });
const formatted = entries
.map((entry) => {
const type = entry.isDirectory() ? "[DIR]" : "[FILE]";
return `${type} ${entry.name}`;
})
.join("\n");
return {
content: [
{
type: "text",
text: formatted || "目录为空",
},
],
};
} catch (error: any) {
return {
content: [
{
type: "text",
text: `列出目录失败: ${error.message}`,
},
],
isError: true,
};
}
}
// 搜索文件
private async handleSearchFiles(args: z.infer<typeof SearchFilesArgs>) {
const validatedPath = this.validatePath(args.path);
const results: string[] = [];
const search = async (dir: string, depth: number) => {
if (!args.recursive && depth > 0) return;
const entries = await fs.readdir(dir, { withFileTypes: true });
for (const entry of entries) {
const fullPath = path.join(dir, entry.name);
if (entry.isDirectory() && args.recursive) {
await search(fullPath, depth + 1);
} else if (entry.name.match(args.pattern.replace(/\*/g, ".*"))) {
results.push(fullPath);
}
}
};
try {
await search(validatedPath, 0);
return {
content: [
{
type: "text",
text: results.length > 0
? results.join("\n")
: "未找到匹配的文件",
},
],
};
} catch (error: any) {
return {
content: [
{
type: "text",
text: `搜索文件失败: ${error.message}`,
},
],
isError: true,
};
}
}
// 获取文件信息
private async handleGetFileInfo(args: z.infer<typeof ReadFileArgs>) {
const validatedPath = this.validatePath(args.path);
try {
const stats = await fs.stat(validatedPath);
const info = {
path: validatedPath,
size: stats.size,
created: stats.birthtime.toISOString(),
modified: stats.mtime.toISOString(),
isFile: stats.isFile(),
isDirectory: stats.isDirectory(),
};
return {
content: [
{
type: "text",
text: JSON.stringify(info, null, 2),
},
],
};
} catch (error: any) {
return {
content: [
{
type: "text",
text: `获取文件信息失败: ${error.message}`,
},
],
isError: true,
};
}
}
// 启动服务器
async start(): Promise<void> {
const transport = new StdioServerTransport();
await this.server.connect(transport);
console.error(`${this.config.name} v${this.config.version} 已启动`);
}
}
5. 入口文件
// src/index.ts
import { FileSystemServer } from "./server.js";
// 从环境变量或命令行参数获取配置
const allowedDirs = process.env.ALLOWED_DIRS
? process.env.ALLOWED_DIRS.split(",")
: [process.cwd()];
const server = new FileSystemServer({
name: "filesystem-server",
version: "1.0.0",
allowedDirectories: allowedDirs,
});
server.start().catch((error) => {
console.error("服务器启动失败:", error);
process.exit(1);
});
进阶功能实现
1. 日志系统
// src/utils/logger.ts
export enum LogLevel {
DEBUG = 0,
INFO = 1,
WARN = 2,
ERROR = 3,
}
export class Logger {
private level: LogLevel;
private prefix: string;
constructor(prefix: string = "MCP", level: LogLevel = LogLevel.INFO) {
this.prefix = prefix;
this.level = level;
}
private log(level: LogLevel, message: string, ...args: any[]): void {
if (level < this.level) return;
const timestamp = new Date().toISOString();
const levelStr = LogLevel[level].padStart(5);
const formatted = `[${timestamp}] [${levelStr}] [${this.prefix}] ${message}`;
if (level >= LogLevel.ERROR) {
console.error(formatted, ...args);
} else {
console.error(formatted, ...args);
}
}
debug(message: string, ...args: any[]): void {
this.log(LogLevel.DEBUG, message, ...args);
}
info(message: string, ...args: any[]): void {
this.log(LogLevel.INFO, message, ...args);
}
warn(message: string, ...args: any[]): void {
this.log(LogLevel.WARN, message, ...args);
}
error(message: string, ...args: any[]): void {
this.log(LogLevel.ERROR, message, ...args);
}
}
export const logger = new Logger();
2. 中间件系统
// src/middleware/index.ts
import { McpError, ErrorCode } from "@modelcontextprotocol/sdk/types.js";
export interface MiddlewareContext {
toolName: string;
arguments: any;
startTime: number;
}
export type Middleware = (
context: MiddlewareContext,
next: () => Promise<any>
) => Promise<any>;
export class MiddlewareChain {
private middlewares: Middleware[] = [];
use(middleware: Middleware): void {
this.middlewares.push(middleware);
}
async execute(context: MiddlewareContext, handler: () => Promise<any>): Promise<any> {
let index = 0;
const next = async (): Promise<any> => {
if (index >= this.middlewares.length) {
return await handler();
}
const middleware = this.middlewares[index++];
return await middleware(context, next);
};
return await next();
}
}
// 常用中间件
export const loggingMiddleware: Middleware = async (context, next) => {
console.error(`[${new Date().toISOString()}] 调用工具: ${context.toolName}`);
const result = await next();
const duration = Date.now() - context.startTime;
console.error(`[${new Date().toISOString()}] 工具完成: ${context.toolName} (${duration}ms)`);
return result;
};
export const validationMiddleware = (schema: any): Middleware => {
return async (context, next) => {
try {
context.arguments = schema.parse(context.arguments);
return await next();
} catch (error: any) {
throw new McpError(
ErrorCode.InvalidParams,
`参数验证失败: ${error.message}`
);
}
};
};
3. 性能监控
// src/utils/metrics.ts
export class MetricsCollector {
private metrics: Map<string, number[]> = new Map();
record(toolName: string, duration: number): void {
if (!this.metrics.has(toolName)) {
this.metrics.set(toolName, []);
}
this.metrics.get(toolName)!.push(duration);
}
getStats(toolName: string) {
const times = this.metrics.get(toolName);
if (!times || times.length === 0) return null;
const sorted = [...times].sort((a, b) => a - b);
const sum = sorted.reduce((a, b) => a + b, 0);
return {
count: times.length,
avg: Math.round(sum / times.length),
min: sorted[0],
max: sorted[sorted.length - 1],
p95: sorted[Math.floor(sorted.length * 0.95)],
p99: sorted[Math.floor(sorted.length * 0.99)],
};
}
getAllStats() {
const result: Record<string, any> = {};
for (const [tool, _] of this.metrics) {
result[tool] = this.getStats(tool);
}
return result;
}
reset(): void {
this.metrics.clear();
}
}
export const metrics = new MetricsCollector();
测试策略
1. 单元测试
// tests/unit/server.test.ts
import { describe, it, expect, beforeEach } from "vitest";
import { FileSystemServer } from "../../src/server.js";
describe("FileSystemServer", () => {
let server: FileSystemServer;
beforeEach(() => {
server = new FileSystemServer({
name: "test-server",
version: "1.0.0",
allowedDirectories: ["/tmp/test"],
});
});
it("应该正确验证允许的路径", () => {
// 测试路径验证逻辑
const validPath = "/tmp/test/file.txt";
const invalidPath = "/etc/passwd";
// 验证实现...
});
it("应该拒绝访问范围外的路径", async () => {
// 测试安全边界...
});
});
2. 集成测试
// tests/integration/tools.test.ts
import { describe, it, expect, beforeAll, afterAll } from "vitest";
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";
describe("工具集成测试", () => {
let client: Client;
let transport: StdioClientTransport;
beforeAll(async () => {
transport = new StdioClientTransport({
command: "node",
args: ["dist/index.js"],
});
client = new Client(
{ name: "test-client", version: "1.0.0" },
{ capabilities: {} }
);
await client.connect(transport);
});
afterAll(async () => {
await client.close();
});
it("应该列出所有工具", async () => {
const tools = await client.listTools();
expect(tools.tools.length).toBeGreaterThan(0);
});
it("应该成功调用 read_file 工具", async () => {
// 创建测试文件...
// 调用工具并验证结果...
});
});
部署与运维
1. Docker 部署
# Dockerfile
FROM node:20-alpine
WORKDIR /app
# 复制依赖文件
COPY package*.json ./
RUN npm ci --only=production
# 复制编译后的代码
COPY dist/ ./dist/
# 创建数据目录
RUN mkdir -p /data
# 环境变量
ENV NODE_ENV=production
ENV ALLOWED_DIRS=/data
# 非root用户运行
USER node
CMD ["node", "dist/index.js"]
# docker-compose.yml
version: "3.8"
services:
mcp-filesystem:
build: .
container_name: mcp-filesystem-server
volumes:
- ./data:/data:ro
environment:
- ALLOWED_DIRS=/data
restart: unless-stopped
2. 配置管理
// mcp-config.json
{
"mcpServers": {
"filesystem": {
"command": "docker",
"args": [
"run",
"-i",
"--rm",
"-v",
"/home/user/docs:/data:ro",
"mcp-filesystem-server"
],
"env": {
"ALLOWED_DIRS": "/data"
}
}
}
}
性能优化建议
| 优化项 | 说明 | 实施建议 |
|---|---|---|
| 缓存 | 缓存频繁访问的文件 | 使用 LRU 缓存策略 |
| 流式处理 | 大文件分块读取 | 实现 ReadableStream |
| 并发控制 | 限制同时处理的请求数 | 使用 p-limit 库 |
| 压缩 | 传输大内容时启用压缩 | 使用 gzip |
| 连接池 | 复用外部连接 | 数据库/HTTP 连接池 |
总结
本文详细介绍了MCP Server的开发实战:
✅ 架构设计 - 清晰的分层架构
✅ 完整实现 - 文件系统Server示例
✅ 进阶功能 - 日志、中间件、监控
✅ 测试覆盖 - 单元测试和集成测试
✅ 部署运维 - Docker化部署方案
下一步建议:
- 学习 MCP Client集成指南
- 了解 MCP安全最佳实践
- 探索 MCP生态工具
本文最后更新于 2024-02-10,如有问题欢迎在社区讨论。