MCP协议 高级 MCP 安全 认证授权 沙箱

MCP安全最佳实践:构建可信的AI工具生态

AIEng Hub
阅读约 40 分钟

引言

随着MCP协议在企业场景的广泛应用,安全性成为了不可忽视的关键议题。AI模型通过MCP Server访问外部系统,如果缺乏完善的安全机制,可能导致数据泄露、权限越界甚至系统被攻击。

本文将深入探讨:

  • MCP安全模型与威胁分析
  • 认证与授权机制实现
  • 输入验证与沙箱隔离
  • 审计日志与监控
  • 企业级安全部署方案

MCP安全威胁模型

威胁分类

┌─────────────────────────────────────────────────────────────┐
│                    MCP 安全威胁模型                          │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│   外部威胁                                                    │
│   ├─ 恶意工具注入 (Malicious Tool Injection)                 │
│   ├─ 提示词注入 (Prompt Injection)                           │
│   ├─ 数据窃取 (Data Exfiltration)                            │
│   └─ 拒绝服务 (DoS Attacks)                                  │
│                                                              │
│   内部威胁                                                    │
│   ├─ 权限越界 (Privilege Escalation)                         │
│   ├─ 敏感数据泄露 (Sensitive Data Exposure)                  │
│   ├─ 未授权访问 (Unauthorized Access)                        │
│   └─ 审计绕过 (Audit Evasion)                                │
│                                                              │
│   传输威胁                                                    │
│   ├─ 中间人攻击 (MITM)                                       │
│   ├─ 消息篡改 (Message Tampering)                            │
│   └─ 重放攻击 (Replay Attacks)                               │
│                                                              │
└─────────────────────────────────────────────────────────────┘

风险评估矩阵

威胁可能性影响风险等级优先级
权限越界P0
恶意工具注入P1
数据泄露P0
提示词注入P0
DoS攻击P1
中间人攻击P1

认证与授权

1. 基于OAuth 2.0的认证

// auth/oauth-provider.ts
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import {
  CallToolRequestSchema,
  ErrorCode,
  McpError,
} from "@modelcontextprotocol/sdk/types.js";
import jwt from "jsonwebtoken";
import { createRemoteJWKSet, jwtVerify } from "jose";

interface AuthConfig {
  jwksUrl: string;
  issuer: string;
  audience: string;
  requiredScopes: string[];
}

interface AuthenticatedRequest {
  token: string;
  userId: string;
  scopes: string[];
  claims: Record<string, any>;
}

export class OAuthAuthProvider {
  private jwks: ReturnType<typeof createRemoteJWKSet>;
  private config: AuthConfig;

  constructor(config: AuthConfig) {
    this.config = config;
    this.jwks = createRemoteJWKSet(new URL(config.jwksUrl));
  }

  async authenticate(token: string): Promise<AuthenticatedRequest> {
    try {
      const { payload } = await jwtVerify(token, this.jwks, {
        issuer: this.config.issuer,
        audience: this.config.audience,
      });

      const scopes = (payload.scope as string)?.split(" ") || [];

      // 检查必需权限
      const missingScopes = this.config.requiredScopes.filter(
        (s) => !scopes.includes(s)
      );
      if (missingScopes.length > 0) {
        throw new McpError(
          ErrorCode.InvalidRequest,
          `缺少必需权限: ${missingScopes.join(", ")}`
        );
      }

      return {
        token,
        userId: payload.sub as string,
        scopes,
        claims: payload as Record<string, any>,
      };
    } catch (error) {
      throw new McpError(
        ErrorCode.InvalidRequest,
        `认证失败: ${error.message}`
      );
    }
  }

  checkPermission(auth: AuthenticatedRequest, resource: string, action: string): boolean {
    const requiredScope = `${resource}:${action}`;
    return auth.scopes.includes(requiredScope) || auth.scopes.includes("admin");
  }
}

// 在 Server 中使用
export function createAuthenticatedServer(authProvider: OAuthAuthProvider) {
  const server = new Server(
    { name: "secure-server", version: "1.0.0" },
    { capabilities: { tools: {} } }
  );

  // 包装工具调用以进行认证
  server.setRequestHandler(CallToolRequestSchema, async (request) => {
    // 从请求头或元数据中获取 token
    const token = extractTokenFromRequest(request);
    if (!token) {
      throw new McpError(ErrorCode.InvalidRequest, "缺少认证令牌");
    }

    // 验证令牌
    const auth = await authProvider.authenticate(token);

    // 检查权限
    if (!authProvider.checkPermission(auth, "files", "read")) {
      throw new McpError(ErrorCode.InvalidRequest, "权限不足");
    }

    // 执行工具调用
    return await executeToolWithAuth(request, auth);
  });

  return server;
}

2. API Key 认证方案

# auth/api_key_auth.py
import hashlib
import secrets
from datetime import datetime, timedelta
from typing import Optional, Dict
from dataclasses import dataclass
from enum import Enum


class Permission(Enum):
    READ = "read"
    WRITE = "write"
    DELETE = "delete"
    ADMIN = "admin"


@dataclass
class APIKey:
    key_id: str
    key_hash: str
    name: str
    permissions: list[Permission]
    created_at: datetime
    expires_at: Optional[datetime]
    last_used: Optional[datetime]
    rate_limit: int  # requests per minute
    is_active: bool


class APIKeyManager:
    """API Key 管理器"""

    def __init__(self):
        self._keys: Dict[str, APIKey] = {}
        self._usage: Dict[str, list[datetime]] = {}

    def generate_key(
        self,
        name: str,
        permissions: list[Permission],
        expires_days: Optional[int] = None,
        rate_limit: int = 100
    ) -> tuple[str, str]:
        """生成新的 API Key"""
        key_id = secrets.token_urlsafe(16)
        raw_key = f"mcp_{secrets.token_urlsafe(32)}"
        key_hash = hashlib.sha256(raw_key.encode()).hexdigest()

        expires_at = None
        if expires_days:
            expires_at = datetime.now() + timedelta(days=expires_days)

        api_key = APIKey(
            key_id=key_id,
            key_hash=key_hash,
            name=name,
            permissions=permissions,
            created_at=datetime.now(),
            expires_at=expires_at,
            last_used=None,
            rate_limit=rate_limit,
            is_active=True
        )

        self._keys[key_id] = api_key
        return key_id, raw_key

    def validate_key(self, raw_key: str) -> Optional[APIKey]:
        """验证 API Key"""
        key_hash = hashlib.sha256(raw_key.encode()).hexdigest()

        for key in self._keys.values():
            if key.key_hash == key_hash:
                # 检查是否过期
                if key.expires_at and datetime.now() > key.expires_at:
                    return None

                # 检查是否激活
                if not key.is_active:
                    return None

                # 检查速率限制
                if not self._check_rate_limit(key.key_id, key.rate_limit):
                    raise RateLimitExceeded(f"超出速率限制: {key.rate_limit}/分钟")

                # 更新最后使用时间
                key.last_used = datetime.now()
                return key

        return None

    def _check_rate_limit(self, key_id: str, limit: int) -> bool:
        """检查速率限制"""
        now = datetime.now()
        minute_ago = now - timedelta(minutes=1)

        if key_id not in self._usage:
            self._usage[key_id] = []

        # 清理旧记录
        self._usage[key_id] = [
            t for t in self._usage[key_id] if t > minute_ago
        ]

        # 检查是否超出限制
        if len(self._usage[key_id]) >= limit:
            return False

        # 记录本次使用
        self._usage[key_id].append(now)
        return True

    def revoke_key(self, key_id: str) -> bool:
        """撤销 API Key"""
        if key_id in self._keys:
            self._keys[key_id].is_active = False
            return True
        return False

    def check_permission(self, key: APIKey, permission: Permission) -> bool:
        """检查权限"""
        return permission in key.permissions or Permission.ADMIN in key.permissions


class RateLimitExceeded(Exception):
    """速率限制异常"""
    pass

3. 基于角色的访问控制 (RBAC)

// auth/rbac.ts
interface Role {
  name: string;
  permissions: Permission[];
}

interface Permission {
  resource: string;
  action: "read" | "write" | "delete" | "execute";
  conditions?: Condition[];
}

interface Condition {
  type: "path_match" | "time_range" | "ip_allowlist";
  config: Record<string, any>;
}

class RBACManager {
  private roles: Map<string, Role> = new Map();
  private userRoles: Map<string, string[]> = new Map();

  constructor() {
    // 预定义角色
    this.defineRole("admin", [
      { resource: "*", action: "execute" },
    ]);

    this.defineRole("developer", [
      { resource: "files", action: "read" },
      { resource: "files", action: "write" },
      { resource: "code", action: "read" },
      { resource: "code", action: "write" },
    ]);

    this.defineRole("analyst", [
      { resource: "files", action: "read" },
      { resource: "data", action: "read" },
      { resource: "reports", action: "read" },
      { resource: "reports", action: "write" },
    ]);

    this.defineRole("readonly", [
      { resource: "files", action: "read", conditions: [
        { type: "path_match", config: { pattern: "/docs/**" } }
      ]},
      { resource: "data", action: "read" },
    ]);
  }

  defineRole(name: string, permissions: Permission[]): void {
    this.roles.set(name, { name, permissions });
  }

  assignRole(userId: string, roleName: string): void {
    if (!this.userRoles.has(userId)) {
      this.userRoles.set(userId, []);
    }
    const roles = this.userRoles.get(userId)!;
    if (!roles.includes(roleName)) {
      roles.push(roleName);
    }
  }

  checkAccess(
    userId: string,
    resource: string,
    action: string,
    context?: Record<string, any>
  ): boolean {
    const userRoleList = this.userRoles.get(userId) || [];

    for (const roleName of userRoleList) {
      const role = this.roles.get(roleName);
      if (!role) continue;

      for (const permission of role.permissions) {
        if (this.matchesPermission(permission, resource, action, context)) {
          return true;
        }
      }
    }

    return false;
  }

  private matchesPermission(
    permission: Permission,
    resource: string,
    action: string,
    context?: Record<string, any>
  ): boolean {
    // 检查资源匹配
    const resourceMatch =
      permission.resource === "*" ||
      permission.resource === resource ||
      resource.startsWith(permission.resource + "/");

    // 检查动作匹配
    const actionMatch = permission.action === action;

    if (!resourceMatch || !actionMatch) return false;

    // 检查条件
    if (permission.conditions) {
      for (const condition of permission.conditions) {
        if (!this.evaluateCondition(condition, context)) {
          return false;
        }
      }
    }

    return true;
  }

  private evaluateCondition(
    condition: Condition,
    context?: Record<string, any>
  ): boolean {
    switch (condition.type) {
      case "path_match":
        const path = context?.["path"] as string;
        const pattern = condition.config.pattern as string;
        return this.matchGlob(path, pattern);

      case "time_range":
        const now = new Date();
        const start = new Date(condition.config.start as string);
        const end = new Date(condition.config.end as string);
        return now >= start && now <= end;

      case "ip_allowlist":
        const clientIp = context?.["clientIp"] as string;
        const allowlist = condition.config.ips as string[];
        return allowlist.includes(clientIp);

      default:
        return true;
    }
  }

  private matchGlob(path: string, pattern: string): boolean {
    const regex = new RegExp(
      "^" + pattern.replace(/\*\*/g, ".*").replace(/\*/g, "[^/]*") + "$"
    );
    return regex.test(path);
  }
}

输入验证与净化

1. 严格输入验证

// validation/input-validator.ts
import { z } from "zod";
import { McpError, ErrorCode } from "@modelcontextprotocol/sdk/types.js";

// 定义验证规则
const PathSchema = z.string().refine(
  (path) => {
    // 防止路径遍历
    const normalized = path.replace(/\\/g, "/");
    return !normalized.includes("..") && !normalized.startsWith("/");
  },
  { message: "路径包含非法字符" }
);

const CommandSchema = z.string().refine(
  (cmd) => {
    // 禁止危险命令
    const dangerous = ["rm", "del", "format", "mkfs", "dd"];
    return !dangerous.some((d) => cmd.toLowerCase().includes(d));
  },
  { message: "命令包含危险操作" }
);

const SQLSchema = z.string().refine(
  (sql) => {
    // 检测 SQL 注入
    const suspicious = [
      /;\s*drop\s+/i,
      /;\s*delete\s+/i,
      /union\s+select/i,
      /exec\s*\(/i,
      /--/,
      /\/\*/,
    ];
    return !suspicious.some((pattern) => pattern.test(sql));
  },
  { message: "SQL 语句包含可疑内容" }
);

export class InputValidator {
  private allowedPaths: string[];

  constructor(allowedPaths: string[]) {
    this.allowedPaths = allowedPaths.map((p) =>
      require("path").resolve(p)
    );
  }

  validatePath(inputPath: string): string {
    try {
      const validated = PathSchema.parse(inputPath);
      const resolved = require("path").resolve(validated);

      // 检查是否在允许范围内
      const isAllowed = this.allowedPaths.some((allowed) =>
        resolved.startsWith(allowed)
      );

      if (!isAllowed) {
        throw new McpError(
          ErrorCode.InvalidParams,
          `路径 ${inputPath} 不在允许范围内`
        );
      }

      return resolved;
    } catch (error) {
      if (error instanceof z.ZodError) {
        throw new McpError(
          ErrorCode.InvalidParams,
          `路径验证失败: ${error.errors[0].message}`
        );
      }
      throw error;
    }
  }

  validateCommand(command: string): string {
    try {
      return CommandSchema.parse(command);
    } catch (error) {
      throw new McpError(
        ErrorCode.InvalidParams,
        `命令验证失败: 包含危险操作`
      );
    }
  }

  validateSQL(sql: string): string {
    try {
      return SQLSchema.parse(sql);
    } catch (error) {
      throw new McpError(
        ErrorCode.InvalidParams,
        `SQL 验证失败: 可能包含注入攻击`
      );
    }
  }

  sanitizeString(input: string): string {
    // HTML 实体编码
    return input
      .replace(/&/g, "&amp;")
      .replace(/</g, "&lt;")
      .replace(/>/g, "&gt;")
      .replace(/"/g, "&quot;")
      .replace(/'/g, "&#x27;")
      .replace(/\//g, "&#x2F;");
  }

  validateFileSize(size: number, maxSize: number = 10 * 1024 * 1024): void {
    if (size > maxSize) {
      throw new McpError(
        ErrorCode.InvalidParams,
        `文件大小超过限制: ${size} > ${maxSize}`
      );
    }
  }
}

2. 沙箱隔离

# sandbox/docker_sandbox.py
import docker
import tempfile
import os
import shutil
from typing import Optional, List
from dataclasses import dataclass
from pathlib import Path


@dataclass
class SandboxConfig:
    image: str = "python:3.11-slim"
    memory_limit: str = "512m"
    cpu_limit: float = 1.0
    timeout: int = 30
    network_disabled: bool = True
    read_only: bool = True
    allowed_paths: List[str] = None


class DockerSandbox:
    """Docker 沙箱执行环境"""

    def __init__(self, config: SandboxConfig = None):
        self.config = config or SandboxConfig()
        self.client = docker.from_env()
        self.container = None
        self.temp_dir = None

    def __enter__(self):
        """创建沙箱环境"""
        self.temp_dir = tempfile.mkdtemp()

        # 准备挂载卷
        volumes = {}
        if self.config.allowed_paths:
            for path in self.config.allowed_paths:
                volumes[path] = {
                    "bind": path,
                    "mode": "ro" if self.config.read_only else "rw"
                }

        # 创建容器
        self.container = self.client.containers.run(
            self.config.image,
            command="sleep infinity",
            detach=True,
            mem_limit=self.config.memory_limit,
            cpu_quota=int(self.config.cpu_limit * 100000),
            network_disabled=self.config.network_disabled,
            volumes=volumes,
            security_opt=["no-new-privileges:true"],
            cap_drop=["ALL"],
            read_only=self.config.read_only,
            tmpfs={"/tmp": "noexec,nosuid,size=100m"},
        )

        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """清理沙箱环境"""
        if self.container:
            try:
                self.container.stop(timeout=5)
                self.container.remove(force=True)
            except Exception as e:
                print(f"清理容器失败: {e}")

        if self.temp_dir and os.path.exists(self.temp_dir):
            shutil.rmtree(self.temp_dir, ignore_errors=True)

    def execute(self, command: List[str], working_dir: str = None) -> dict:
        """在沙箱中执行命令"""
        if not self.container:
            raise RuntimeError("沙箱未初始化")

        exec_result = self.container.exec_run(
            command,
            workdir=working_dir,
            demux=True,
        )

        return {
            "exit_code": exec_result.exit_code,
            "stdout": exec_result.output[0].decode() if exec_result.output[0] else "",
            "stderr": exec_result.output[1].decode() if exec_result.output[1] else "",
        }

    def execute_code(self, code: str, language: str = "python") -> dict:
        """执行代码片段"""
        # 将代码写入临时文件
        code_file = os.path.join(self.temp_dir, f"script.{language}")
        with open(code_file, "w") as f:
            f.write(code)

        # 复制到容器
        self.container.put_archive(
            "/tmp",
            self._create_tar_archive(code_file)
        )

        # 执行代码
        if language == "python":
            return self.execute(["python", "/tmp/script.py"], "/tmp")
        elif language == "javascript":
            return self.execute(["node", "/tmp/script.js"], "/tmp")
        else:
            raise ValueError(f"不支持的编程语言: {language}")

    def _create_tar_archive(self, file_path: str) -> bytes:
        """创建 tar 归档"""
        import tarfile
        import io

        tar_buffer = io.BytesIO()
        with tarfile.open(fileobj=tar_buffer, mode="w") as tar:
            tar.add(file_path, arcname=os.path.basename(file_path))
        return tar_buffer.getvalue()


# 使用示例
def safe_execute_tool(tool_code: str, input_data: dict) -> dict:
    """在沙箱中安全执行工具代码"""
    config = SandboxConfig(
        memory_limit="256m",
        cpu_limit=0.5,
        timeout=10,
        network_disabled=True,
        read_only=True,
    )

    with DockerSandbox(config) as sandbox:
        # 准备执行脚本
        script = f"""
import json
import sys

# 限制导入
allowed_modules = ['json', 'math', 'random', 'datetime', 're']
for mod in sys.modules:
    if mod not in allowed_modules:
        sys.modules[mod] = None

# 执行工具代码
{tool_code}

# 处理输入
input_data = json.loads('{json.dumps(input_data)}')
result = execute(input_data)
print(json.dumps(result))
"""
        result = sandbox.execute_code(script, "python")

        if result["exit_code"] != 0:
            raise ToolExecutionError(f"执行失败: {result['stderr']}")

        return json.loads(result["stdout"])

审计日志与监控

1. 审计日志系统

// audit/audit-logger.ts
import { createWriteStream } from "fs";
import { format } from "date-fns";

interface AuditEvent {
  timestamp: Date;
  eventId: string;
  eventType: "tool_call" | "resource_access" | "auth_success" | "auth_failure" | "error";
  userId?: string;
  sessionId: string;
  toolName?: string;
  resourceUri?: string;
  input?: any;
  output?: any;
  success: boolean;
  errorMessage?: string;
  metadata?: Record<string, any>;
}

class AuditLogger {
  private logStream: ReturnType<typeof createWriteStream>;
  private buffer: AuditEvent[] = [];
  private flushInterval: NodeJS.Timeout;

  constructor(logPath: string) {
    this.logStream = createWriteStream(logPath, { flags: "a" });
    this.flushInterval = setInterval(() => this.flush(), 5000);
  }

  log(event: Omit<AuditEvent, "timestamp" | "eventId">): void {
    const fullEvent: AuditEvent = {
      ...event,
      timestamp: new Date(),
      eventId: this.generateEventId(),
    };

    this.buffer.push(fullEvent);

    // 如果缓冲区太大,立即刷新
    if (this.buffer.length >= 100) {
      this.flush();
    }
  }

  private flush(): void {
    if (this.buffer.length === 0) return;

    const lines = this.buffer.map((event) => JSON.stringify(event)).join("\n");
    this.logStream.write(lines + "\n");
    this.buffer = [];
  }

  private generateEventId(): string {
    return `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
  }

  close(): void {
    clearInterval(this.flushInterval);
    this.flush();
    this.logStream.end();
  }
}

// 在 Server 中集成审计日志
export function createAuditableServer(auditLogger: AuditLogger) {
  const server = new Server(
    { name: "auditable-server", version: "1.0.0" },
    { capabilities: { tools: {} } }
  );

  server.setRequestHandler(CallToolRequestSchema, async (request) => {
    const startTime = Date.now();
    const sessionId = extractSessionId(request);

    try {
      // 记录调用开始
      auditLogger.log({
        eventType: "tool_call",
        sessionId,
        toolName: request.params.name,
        input: request.params.arguments,
        success: true,
      });

      const result = await executeTool(request);
      const duration = Date.now() - startTime;

      // 记录调用成功
      auditLogger.log({
        eventType: "tool_call",
        sessionId,
        toolName: request.params.name,
        input: request.params.arguments,
        output: result,
        success: true,
        metadata: { duration },
      });

      return result;
    } catch (error) {
      // 记录调用失败
      auditLogger.log({
        eventType: "tool_call",
        sessionId,
        toolName: request.params.name,
        input: request.params.arguments,
        success: false,
        errorMessage: error.message,
        metadata: { duration: Date.now() - startTime },
      });

      throw error;
    }
  });

  return server;
}

2. 实时监控

# monitoring/security_monitor.py
import asyncio
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Dict, List, Callable
from dataclasses import dataclass, field
from enum import Enum


class AlertSeverity(Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"


@dataclass
class SecurityAlert:
    timestamp: datetime
    severity: AlertSeverity
    alert_type: str
    description: str
    source: str
    metadata: dict = field(default_factory=dict)


class SecurityMonitor:
    """安全监控器"""

    def __init__(self):
        self.event_counts: Dict[str, List[datetime]] = defaultdict(list)
        self.alert_handlers: List[Callable[[SecurityAlert], None]] = []
        self.running = False

        # 阈值配置
        self.thresholds = {
            "failed_auth": {"count": 5, "window": 300},  # 5分钟内5次失败
            "tool_errors": {"count": 10, "window": 60},  # 1分钟内10次错误
            "suspicious_input": {"count": 3, "window": 300},
        }

    def add_alert_handler(self, handler: Callable[[SecurityAlert], None]):
        """添加告警处理器"""
        self.alert_handlers.append(handler)

    async def start_monitoring(self):
        """开始监控"""
        self.running = True
        while self.running:
            self._cleanup_old_events()
            self._check_thresholds()
            await asyncio.sleep(10)

    def stop(self):
        """停止监控"""
        self.running = False

    def record_event(self, event_type: str, metadata: dict = None):
        """记录事件"""
        self.event_counts[event_type].append(datetime.now())

        # 实时检查特定事件
        if event_type == "suspicious_input":
            self._trigger_alert(
                AlertSeverity.HIGH,
                "suspicious_input",
                "检测到可疑输入",
                "input_validator",
                metadata or {}
            )

    def _cleanup_old_events(self):
        """清理过期事件"""
        now = datetime.now()
        max_window = max(t["window"] for t in self.thresholds.values())
        cutoff = now - timedelta(seconds=max_window)

        for event_type in self.event_counts:
            self.event_counts[event_type] = [
                t for t in self.event_counts[event_type] if t > cutoff
            ]

    def _check_thresholds(self):
        """检查阈值"""
        now = datetime.now()

        for event_type, threshold in self.thresholds.items():
            events = self.event_counts[event_type]
            window_start = now - timedelta(seconds=threshold["window"])
            recent_events = [e for e in events if e > window_start]

            if len(recent_events) >= threshold["count"]:
                severity = self._calculate_severity(event_type, len(recent_events))
                self._trigger_alert(
                    severity,
                    f"threshold_exceeded_{event_type}",
                    f"{event_type} 超过阈值: {len(recent_events)} 次/{threshold['window']}秒",
                    "threshold_monitor",
                    {"count": len(recent_events), "threshold": threshold["count"]}
                )

    def _calculate_severity(self, event_type: str, count: int) -> AlertSeverity:
        """计算告警级别"""
        if event_type == "failed_auth":
            if count >= 20:
                return AlertSeverity.CRITICAL
            elif count >= 10:
                return AlertSeverity.HIGH
            return AlertSeverity.MEDIUM
        elif event_type == "tool_errors":
            if count >= 50:
                return AlertSeverity.CRITICAL
            elif count >= 20:
                return AlertSeverity.HIGH
            return AlertSeverity.LOW
        return AlertSeverity.MEDIUM

    def _trigger_alert(self, severity: AlertSeverity, alert_type: str,
                       description: str, source: str, metadata: dict):
        """触发告警"""
        alert = SecurityAlert(
            timestamp=datetime.now(),
            severity=severity,
            alert_type=alert_type,
            description=description,
            source=source,
            metadata=metadata
        )

        for handler in self.alert_handlers:
            try:
                handler(alert)
            except Exception as e:
                print(f"告警处理器失败: {e}")


# 告警处理器示例
async def slack_alert_handler(alert: SecurityAlert):
    """Slack 告警"""
    import aiohttp

    webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"

    color_map = {
        AlertSeverity.LOW: "#36a64f",
        AlertSeverity.MEDIUM: "#daa520",
        AlertSeverity.HIGH: "#ff6600",
        AlertSeverity.CRITICAL: "#ff0000",
    }

    payload = {
        "attachments": [{
            "color": color_map.get(alert.severity, "#808080"),
            "title": f"🚨 MCP 安全告警 - {alert.severity.value.upper()}",
            "fields": [
                {"title": "类型", "value": alert.alert_type, "short": True},
                {"title": "来源", "value": alert.source, "short": True},
                {"title": "描述", "value": alert.description, "short": False},
                {"title": "时间", "value": alert.timestamp.isoformat(), "short": True},
            ],
        }]
    }

    async with aiohttp.ClientSession() as session:
        await session.post(webhook_url, json=payload)

企业级部署方案

1. 安全架构设计

┌─────────────────────────────────────────────────────────────────┐
│                      企业级 MCP 安全架构                          │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│   ┌─────────────┐      ┌─────────────┐      ┌─────────────┐    │
│   │   WAF/API   │──────▶   Gateway   │──────▶   Load      │    │
│   │   Gateway   │      │   (Kong/    │      │   Balancer  │    │
│   │             │      │   Istio)    │      │             │    │
│   └─────────────┘      └─────────────┘      └──────┬──────┘    │
│                                                     │            │
│   ┌─────────────────────────────────────────────────┘            │
│   │                                                              │
│   │   ┌─────────────┐    ┌─────────────┐    ┌─────────────┐    │
│   │   │  MCP Server │    │  MCP Server │    │  MCP Server │    │
│   │   │   (Auth)    │    │   (Files)   │    │   (DB)      │    │
│   │   └──────┬──────┘    └──────┬──────┘    └──────┬──────┘    │
│   │          │                  │                  │            │
│   │          └──────────────────┼──────────────────┘            │
│   │                             │                               │
│   │                    ┌────────┴────────┐                      │
│   │                    │   Service Mesh  │                      │
│   │                    │   (mTLS/Istio)  │                      │
│   │                    └────────┬────────┘                      │
│   │                             │                               │
│   └─────────────────────────────┼───────────────────────────────┘
│                                 │
│                    ┌────────────┴────────────┐
│                    │                         │
│              ┌─────┴─────┐            ┌─────┴─────┐
│              │  Vault    │            │   Audit   │
│              │ (Secrets) │            │   Log     │
│              └───────────┘            └───────────┘

└─────────────────────────────────────────────────────────────────┘

2. Kubernetes 部署配置

# k8s/mcp-server-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: mcp-secure-server
  namespace: mcp
spec:
  replicas: 3
  selector:
    matchLabels:
      app: mcp-secure-server
  template:
    metadata:
      labels:
        app: mcp-secure-server
    spec:
      serviceAccountName: mcp-server-sa
      securityContext:
        runAsNonRoot: true
        runAsUser: 1000
        fsGroup: 1000
      containers:
        - name: server
          image: registry.company.com/mcp/secure-server:v1.0.0
          imagePullPolicy: Always
          ports:
            - containerPort: 3000
              name: http
          env:
            - name: NODE_ENV
              value: "production"
            - name: JWT_SECRET
              valueFrom:
                secretKeyRef:
                  name: mcp-secrets
                  key: jwt-secret
            - name: ALLOWED_DIRS
              value: "/data/readonly"
          resources:
            requests:
              memory: "256Mi"
              cpu: "250m"
            limits:
              memory: "512Mi"
              cpu: "500m"
          securityContext:
            allowPrivilegeEscalation: false
            readOnlyRootFilesystem: true
            capabilities:
              drop:
                - ALL
          volumeMounts:
            - name: data
              mountPath: /data/readonly
              readOnly: true
            - name: tmp
              mountPath: /tmp
      volumes:
        - name: data
          persistentVolumeClaim:
            claimName: mcp-data-pvc
        - name: tmp
          emptyDir:
            sizeLimit: 100Mi
---
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: mcp-server-network-policy
  namespace: mcp
spec:
  podSelector:
    matchLabels:
      app: mcp-secure-server
  policyTypes:
    - Ingress
    - Egress
  ingress:
    - from:
        - namespaceSelector:
            matchLabels:
              name: gateway
      ports:
        - protocol: TCP
          port: 3000
  egress:
    - to:
        - namespaceSelector:
            matchLabels:
              name: observability
      ports:
        - protocol: TCP
          port: 4317  # OTLP

3. 密钥管理

# secrets/vault_integration.py
import hvac
from typing import Optional


class VaultSecretsManager:
    """HashiCorp Vault 密钥管理"""

    def __init__(self, vault_url: str, token: Optional[str] = None):
        self.client = hvac.Client(url=vault_url)
        if token:
            self.client.token = token
        elif not self.client.is_authenticated():
            # 使用 Kubernetes 认证
            self._k8s_auth()

    def _k8s_auth(self):
        """Kubernetes 服务账号认证"""
        with open("/var/run/secrets/kubernetes.io/serviceaccount/token") as f:
            jwt = f.read()

        self.client.auth.kubernetes.login(
            role="mcp-server",
            jwt=jwt
        )

    def get_secret(self, path: str, key: str) -> str:
        """获取密钥"""
        response = self.client.secrets.kv.v2.read_secret_version(
            path=path
        )
        return response["data"]["data"][key]

    def rotate_api_key(self, key_id: str) -> tuple[str, str]:
        """轮换 API Key"""
        # 生成新密钥
        import secrets
        new_key = f"mcp_{secrets.token_urlsafe(32)}"

        # 存储到 Vault
        self.client.secrets.kv.v2.create_or_update_secret(
            path=f"api-keys/{key_id}",
            secret={"key": new_key, "rotated_at": datetime.now().isoformat()}
        )

        return key_id, new_key


# 在应用中使用
secrets_manager = VaultSecretsManager("https://vault.company.com")

def get_database_password():
    return secrets_manager.get_secret("mcp/database", "password")

安全最佳实践清单

开发阶段

  • 所有输入进行严格验证
  • 使用参数化查询防止注入
  • 实现最小权限原则
  • 敏感操作添加二次确认
  • 错误信息不泄露系统细节

部署阶段

  • 启用 TLS 加密通信
  • 配置网络隔离策略
  • 使用密钥管理系统
  • 启用审计日志
  • 配置监控告警

运维阶段

  • 定期轮换密钥
  • 审查访问日志
  • 更新依赖组件
  • 进行安全渗透测试
  • 制定应急响应计划

总结

本文深入探讨了 MCP 协议的企业级安全实践:

威胁建模 - 全面的安全风险分析
认证授权 - OAuth、API Key、RBAC 实现
输入安全 - 严格验证与沙箱隔离
审计监控 - 完整的日志与告警系统
企业部署 - Kubernetes 安全架构

安全是一个持续的过程,建议:

  • 定期进行安全审计
  • 关注 MCP 协议安全更新
  • 参与社区安全讨论
  • 建立安全响应流程

本文最后更新于 2024-02-20,如有问题欢迎在社区讨论。