引言
随着MCP协议在企业场景的广泛应用,安全性成为了不可忽视的关键议题。AI模型通过MCP Server访问外部系统,如果缺乏完善的安全机制,可能导致数据泄露、权限越界甚至系统被攻击。
本文将深入探讨:
- MCP安全模型与威胁分析
- 认证与授权机制实现
- 输入验证与沙箱隔离
- 审计日志与监控
- 企业级安全部署方案
MCP安全威胁模型
威胁分类
┌─────────────────────────────────────────────────────────────┐
│ MCP 安全威胁模型 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 外部威胁 │
│ ├─ 恶意工具注入 (Malicious Tool Injection) │
│ ├─ 提示词注入 (Prompt Injection) │
│ ├─ 数据窃取 (Data Exfiltration) │
│ └─ 拒绝服务 (DoS Attacks) │
│ │
│ 内部威胁 │
│ ├─ 权限越界 (Privilege Escalation) │
│ ├─ 敏感数据泄露 (Sensitive Data Exposure) │
│ ├─ 未授权访问 (Unauthorized Access) │
│ └─ 审计绕过 (Audit Evasion) │
│ │
│ 传输威胁 │
│ ├─ 中间人攻击 (MITM) │
│ ├─ 消息篡改 (Message Tampering) │
│ └─ 重放攻击 (Replay Attacks) │
│ │
└─────────────────────────────────────────────────────────────┘
风险评估矩阵
| 威胁 | 可能性 | 影响 | 风险等级 | 优先级 |
|---|---|---|---|---|
| 权限越界 | 中 | 高 | 高 | P0 |
| 恶意工具注入 | 低 | 高 | 中 | P1 |
| 数据泄露 | 中 | 高 | 高 | P0 |
| 提示词注入 | 高 | 中 | 高 | P0 |
| DoS攻击 | 中 | 中 | 中 | P1 |
| 中间人攻击 | 低 | 高 | 中 | P1 |
认证与授权
1. 基于OAuth 2.0的认证
// auth/oauth-provider.ts
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import {
CallToolRequestSchema,
ErrorCode,
McpError,
} from "@modelcontextprotocol/sdk/types.js";
import jwt from "jsonwebtoken";
import { createRemoteJWKSet, jwtVerify } from "jose";
interface AuthConfig {
jwksUrl: string;
issuer: string;
audience: string;
requiredScopes: string[];
}
interface AuthenticatedRequest {
token: string;
userId: string;
scopes: string[];
claims: Record<string, any>;
}
export class OAuthAuthProvider {
private jwks: ReturnType<typeof createRemoteJWKSet>;
private config: AuthConfig;
constructor(config: AuthConfig) {
this.config = config;
this.jwks = createRemoteJWKSet(new URL(config.jwksUrl));
}
async authenticate(token: string): Promise<AuthenticatedRequest> {
try {
const { payload } = await jwtVerify(token, this.jwks, {
issuer: this.config.issuer,
audience: this.config.audience,
});
const scopes = (payload.scope as string)?.split(" ") || [];
// 检查必需权限
const missingScopes = this.config.requiredScopes.filter(
(s) => !scopes.includes(s)
);
if (missingScopes.length > 0) {
throw new McpError(
ErrorCode.InvalidRequest,
`缺少必需权限: ${missingScopes.join(", ")}`
);
}
return {
token,
userId: payload.sub as string,
scopes,
claims: payload as Record<string, any>,
};
} catch (error) {
throw new McpError(
ErrorCode.InvalidRequest,
`认证失败: ${error.message}`
);
}
}
checkPermission(auth: AuthenticatedRequest, resource: string, action: string): boolean {
const requiredScope = `${resource}:${action}`;
return auth.scopes.includes(requiredScope) || auth.scopes.includes("admin");
}
}
// 在 Server 中使用
export function createAuthenticatedServer(authProvider: OAuthAuthProvider) {
const server = new Server(
{ name: "secure-server", version: "1.0.0" },
{ capabilities: { tools: {} } }
);
// 包装工具调用以进行认证
server.setRequestHandler(CallToolRequestSchema, async (request) => {
// 从请求头或元数据中获取 token
const token = extractTokenFromRequest(request);
if (!token) {
throw new McpError(ErrorCode.InvalidRequest, "缺少认证令牌");
}
// 验证令牌
const auth = await authProvider.authenticate(token);
// 检查权限
if (!authProvider.checkPermission(auth, "files", "read")) {
throw new McpError(ErrorCode.InvalidRequest, "权限不足");
}
// 执行工具调用
return await executeToolWithAuth(request, auth);
});
return server;
}
2. API Key 认证方案
# auth/api_key_auth.py
import hashlib
import secrets
from datetime import datetime, timedelta
from typing import Optional, Dict
from dataclasses import dataclass
from enum import Enum
class Permission(Enum):
READ = "read"
WRITE = "write"
DELETE = "delete"
ADMIN = "admin"
@dataclass
class APIKey:
key_id: str
key_hash: str
name: str
permissions: list[Permission]
created_at: datetime
expires_at: Optional[datetime]
last_used: Optional[datetime]
rate_limit: int # requests per minute
is_active: bool
class APIKeyManager:
"""API Key 管理器"""
def __init__(self):
self._keys: Dict[str, APIKey] = {}
self._usage: Dict[str, list[datetime]] = {}
def generate_key(
self,
name: str,
permissions: list[Permission],
expires_days: Optional[int] = None,
rate_limit: int = 100
) -> tuple[str, str]:
"""生成新的 API Key"""
key_id = secrets.token_urlsafe(16)
raw_key = f"mcp_{secrets.token_urlsafe(32)}"
key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
expires_at = None
if expires_days:
expires_at = datetime.now() + timedelta(days=expires_days)
api_key = APIKey(
key_id=key_id,
key_hash=key_hash,
name=name,
permissions=permissions,
created_at=datetime.now(),
expires_at=expires_at,
last_used=None,
rate_limit=rate_limit,
is_active=True
)
self._keys[key_id] = api_key
return key_id, raw_key
def validate_key(self, raw_key: str) -> Optional[APIKey]:
"""验证 API Key"""
key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
for key in self._keys.values():
if key.key_hash == key_hash:
# 检查是否过期
if key.expires_at and datetime.now() > key.expires_at:
return None
# 检查是否激活
if not key.is_active:
return None
# 检查速率限制
if not self._check_rate_limit(key.key_id, key.rate_limit):
raise RateLimitExceeded(f"超出速率限制: {key.rate_limit}/分钟")
# 更新最后使用时间
key.last_used = datetime.now()
return key
return None
def _check_rate_limit(self, key_id: str, limit: int) -> bool:
"""检查速率限制"""
now = datetime.now()
minute_ago = now - timedelta(minutes=1)
if key_id not in self._usage:
self._usage[key_id] = []
# 清理旧记录
self._usage[key_id] = [
t for t in self._usage[key_id] if t > minute_ago
]
# 检查是否超出限制
if len(self._usage[key_id]) >= limit:
return False
# 记录本次使用
self._usage[key_id].append(now)
return True
def revoke_key(self, key_id: str) -> bool:
"""撤销 API Key"""
if key_id in self._keys:
self._keys[key_id].is_active = False
return True
return False
def check_permission(self, key: APIKey, permission: Permission) -> bool:
"""检查权限"""
return permission in key.permissions or Permission.ADMIN in key.permissions
class RateLimitExceeded(Exception):
"""速率限制异常"""
pass
3. 基于角色的访问控制 (RBAC)
// auth/rbac.ts
interface Role {
name: string;
permissions: Permission[];
}
interface Permission {
resource: string;
action: "read" | "write" | "delete" | "execute";
conditions?: Condition[];
}
interface Condition {
type: "path_match" | "time_range" | "ip_allowlist";
config: Record<string, any>;
}
class RBACManager {
private roles: Map<string, Role> = new Map();
private userRoles: Map<string, string[]> = new Map();
constructor() {
// 预定义角色
this.defineRole("admin", [
{ resource: "*", action: "execute" },
]);
this.defineRole("developer", [
{ resource: "files", action: "read" },
{ resource: "files", action: "write" },
{ resource: "code", action: "read" },
{ resource: "code", action: "write" },
]);
this.defineRole("analyst", [
{ resource: "files", action: "read" },
{ resource: "data", action: "read" },
{ resource: "reports", action: "read" },
{ resource: "reports", action: "write" },
]);
this.defineRole("readonly", [
{ resource: "files", action: "read", conditions: [
{ type: "path_match", config: { pattern: "/docs/**" } }
]},
{ resource: "data", action: "read" },
]);
}
defineRole(name: string, permissions: Permission[]): void {
this.roles.set(name, { name, permissions });
}
assignRole(userId: string, roleName: string): void {
if (!this.userRoles.has(userId)) {
this.userRoles.set(userId, []);
}
const roles = this.userRoles.get(userId)!;
if (!roles.includes(roleName)) {
roles.push(roleName);
}
}
checkAccess(
userId: string,
resource: string,
action: string,
context?: Record<string, any>
): boolean {
const userRoleList = this.userRoles.get(userId) || [];
for (const roleName of userRoleList) {
const role = this.roles.get(roleName);
if (!role) continue;
for (const permission of role.permissions) {
if (this.matchesPermission(permission, resource, action, context)) {
return true;
}
}
}
return false;
}
private matchesPermission(
permission: Permission,
resource: string,
action: string,
context?: Record<string, any>
): boolean {
// 检查资源匹配
const resourceMatch =
permission.resource === "*" ||
permission.resource === resource ||
resource.startsWith(permission.resource + "/");
// 检查动作匹配
const actionMatch = permission.action === action;
if (!resourceMatch || !actionMatch) return false;
// 检查条件
if (permission.conditions) {
for (const condition of permission.conditions) {
if (!this.evaluateCondition(condition, context)) {
return false;
}
}
}
return true;
}
private evaluateCondition(
condition: Condition,
context?: Record<string, any>
): boolean {
switch (condition.type) {
case "path_match":
const path = context?.["path"] as string;
const pattern = condition.config.pattern as string;
return this.matchGlob(path, pattern);
case "time_range":
const now = new Date();
const start = new Date(condition.config.start as string);
const end = new Date(condition.config.end as string);
return now >= start && now <= end;
case "ip_allowlist":
const clientIp = context?.["clientIp"] as string;
const allowlist = condition.config.ips as string[];
return allowlist.includes(clientIp);
default:
return true;
}
}
private matchGlob(path: string, pattern: string): boolean {
const regex = new RegExp(
"^" + pattern.replace(/\*\*/g, ".*").replace(/\*/g, "[^/]*") + "$"
);
return regex.test(path);
}
}
输入验证与净化
1. 严格输入验证
// validation/input-validator.ts
import { z } from "zod";
import { McpError, ErrorCode } from "@modelcontextprotocol/sdk/types.js";
// 定义验证规则
const PathSchema = z.string().refine(
(path) => {
// 防止路径遍历
const normalized = path.replace(/\\/g, "/");
return !normalized.includes("..") && !normalized.startsWith("/");
},
{ message: "路径包含非法字符" }
);
const CommandSchema = z.string().refine(
(cmd) => {
// 禁止危险命令
const dangerous = ["rm", "del", "format", "mkfs", "dd"];
return !dangerous.some((d) => cmd.toLowerCase().includes(d));
},
{ message: "命令包含危险操作" }
);
const SQLSchema = z.string().refine(
(sql) => {
// 检测 SQL 注入
const suspicious = [
/;\s*drop\s+/i,
/;\s*delete\s+/i,
/union\s+select/i,
/exec\s*\(/i,
/--/,
/\/\*/,
];
return !suspicious.some((pattern) => pattern.test(sql));
},
{ message: "SQL 语句包含可疑内容" }
);
export class InputValidator {
private allowedPaths: string[];
constructor(allowedPaths: string[]) {
this.allowedPaths = allowedPaths.map((p) =>
require("path").resolve(p)
);
}
validatePath(inputPath: string): string {
try {
const validated = PathSchema.parse(inputPath);
const resolved = require("path").resolve(validated);
// 检查是否在允许范围内
const isAllowed = this.allowedPaths.some((allowed) =>
resolved.startsWith(allowed)
);
if (!isAllowed) {
throw new McpError(
ErrorCode.InvalidParams,
`路径 ${inputPath} 不在允许范围内`
);
}
return resolved;
} catch (error) {
if (error instanceof z.ZodError) {
throw new McpError(
ErrorCode.InvalidParams,
`路径验证失败: ${error.errors[0].message}`
);
}
throw error;
}
}
validateCommand(command: string): string {
try {
return CommandSchema.parse(command);
} catch (error) {
throw new McpError(
ErrorCode.InvalidParams,
`命令验证失败: 包含危险操作`
);
}
}
validateSQL(sql: string): string {
try {
return SQLSchema.parse(sql);
} catch (error) {
throw new McpError(
ErrorCode.InvalidParams,
`SQL 验证失败: 可能包含注入攻击`
);
}
}
sanitizeString(input: string): string {
// HTML 实体编码
return input
.replace(/&/g, "&")
.replace(/</g, "<")
.replace(/>/g, ">")
.replace(/"/g, """)
.replace(/'/g, "'")
.replace(/\//g, "/");
}
validateFileSize(size: number, maxSize: number = 10 * 1024 * 1024): void {
if (size > maxSize) {
throw new McpError(
ErrorCode.InvalidParams,
`文件大小超过限制: ${size} > ${maxSize}`
);
}
}
}
2. 沙箱隔离
# sandbox/docker_sandbox.py
import docker
import tempfile
import os
import shutil
from typing import Optional, List
from dataclasses import dataclass
from pathlib import Path
@dataclass
class SandboxConfig:
image: str = "python:3.11-slim"
memory_limit: str = "512m"
cpu_limit: float = 1.0
timeout: int = 30
network_disabled: bool = True
read_only: bool = True
allowed_paths: List[str] = None
class DockerSandbox:
"""Docker 沙箱执行环境"""
def __init__(self, config: SandboxConfig = None):
self.config = config or SandboxConfig()
self.client = docker.from_env()
self.container = None
self.temp_dir = None
def __enter__(self):
"""创建沙箱环境"""
self.temp_dir = tempfile.mkdtemp()
# 准备挂载卷
volumes = {}
if self.config.allowed_paths:
for path in self.config.allowed_paths:
volumes[path] = {
"bind": path,
"mode": "ro" if self.config.read_only else "rw"
}
# 创建容器
self.container = self.client.containers.run(
self.config.image,
command="sleep infinity",
detach=True,
mem_limit=self.config.memory_limit,
cpu_quota=int(self.config.cpu_limit * 100000),
network_disabled=self.config.network_disabled,
volumes=volumes,
security_opt=["no-new-privileges:true"],
cap_drop=["ALL"],
read_only=self.config.read_only,
tmpfs={"/tmp": "noexec,nosuid,size=100m"},
)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""清理沙箱环境"""
if self.container:
try:
self.container.stop(timeout=5)
self.container.remove(force=True)
except Exception as e:
print(f"清理容器失败: {e}")
if self.temp_dir and os.path.exists(self.temp_dir):
shutil.rmtree(self.temp_dir, ignore_errors=True)
def execute(self, command: List[str], working_dir: str = None) -> dict:
"""在沙箱中执行命令"""
if not self.container:
raise RuntimeError("沙箱未初始化")
exec_result = self.container.exec_run(
command,
workdir=working_dir,
demux=True,
)
return {
"exit_code": exec_result.exit_code,
"stdout": exec_result.output[0].decode() if exec_result.output[0] else "",
"stderr": exec_result.output[1].decode() if exec_result.output[1] else "",
}
def execute_code(self, code: str, language: str = "python") -> dict:
"""执行代码片段"""
# 将代码写入临时文件
code_file = os.path.join(self.temp_dir, f"script.{language}")
with open(code_file, "w") as f:
f.write(code)
# 复制到容器
self.container.put_archive(
"/tmp",
self._create_tar_archive(code_file)
)
# 执行代码
if language == "python":
return self.execute(["python", "/tmp/script.py"], "/tmp")
elif language == "javascript":
return self.execute(["node", "/tmp/script.js"], "/tmp")
else:
raise ValueError(f"不支持的编程语言: {language}")
def _create_tar_archive(self, file_path: str) -> bytes:
"""创建 tar 归档"""
import tarfile
import io
tar_buffer = io.BytesIO()
with tarfile.open(fileobj=tar_buffer, mode="w") as tar:
tar.add(file_path, arcname=os.path.basename(file_path))
return tar_buffer.getvalue()
# 使用示例
def safe_execute_tool(tool_code: str, input_data: dict) -> dict:
"""在沙箱中安全执行工具代码"""
config = SandboxConfig(
memory_limit="256m",
cpu_limit=0.5,
timeout=10,
network_disabled=True,
read_only=True,
)
with DockerSandbox(config) as sandbox:
# 准备执行脚本
script = f"""
import json
import sys
# 限制导入
allowed_modules = ['json', 'math', 'random', 'datetime', 're']
for mod in sys.modules:
if mod not in allowed_modules:
sys.modules[mod] = None
# 执行工具代码
{tool_code}
# 处理输入
input_data = json.loads('{json.dumps(input_data)}')
result = execute(input_data)
print(json.dumps(result))
"""
result = sandbox.execute_code(script, "python")
if result["exit_code"] != 0:
raise ToolExecutionError(f"执行失败: {result['stderr']}")
return json.loads(result["stdout"])
审计日志与监控
1. 审计日志系统
// audit/audit-logger.ts
import { createWriteStream } from "fs";
import { format } from "date-fns";
interface AuditEvent {
timestamp: Date;
eventId: string;
eventType: "tool_call" | "resource_access" | "auth_success" | "auth_failure" | "error";
userId?: string;
sessionId: string;
toolName?: string;
resourceUri?: string;
input?: any;
output?: any;
success: boolean;
errorMessage?: string;
metadata?: Record<string, any>;
}
class AuditLogger {
private logStream: ReturnType<typeof createWriteStream>;
private buffer: AuditEvent[] = [];
private flushInterval: NodeJS.Timeout;
constructor(logPath: string) {
this.logStream = createWriteStream(logPath, { flags: "a" });
this.flushInterval = setInterval(() => this.flush(), 5000);
}
log(event: Omit<AuditEvent, "timestamp" | "eventId">): void {
const fullEvent: AuditEvent = {
...event,
timestamp: new Date(),
eventId: this.generateEventId(),
};
this.buffer.push(fullEvent);
// 如果缓冲区太大,立即刷新
if (this.buffer.length >= 100) {
this.flush();
}
}
private flush(): void {
if (this.buffer.length === 0) return;
const lines = this.buffer.map((event) => JSON.stringify(event)).join("\n");
this.logStream.write(lines + "\n");
this.buffer = [];
}
private generateEventId(): string {
return `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}
close(): void {
clearInterval(this.flushInterval);
this.flush();
this.logStream.end();
}
}
// 在 Server 中集成审计日志
export function createAuditableServer(auditLogger: AuditLogger) {
const server = new Server(
{ name: "auditable-server", version: "1.0.0" },
{ capabilities: { tools: {} } }
);
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const startTime = Date.now();
const sessionId = extractSessionId(request);
try {
// 记录调用开始
auditLogger.log({
eventType: "tool_call",
sessionId,
toolName: request.params.name,
input: request.params.arguments,
success: true,
});
const result = await executeTool(request);
const duration = Date.now() - startTime;
// 记录调用成功
auditLogger.log({
eventType: "tool_call",
sessionId,
toolName: request.params.name,
input: request.params.arguments,
output: result,
success: true,
metadata: { duration },
});
return result;
} catch (error) {
// 记录调用失败
auditLogger.log({
eventType: "tool_call",
sessionId,
toolName: request.params.name,
input: request.params.arguments,
success: false,
errorMessage: error.message,
metadata: { duration: Date.now() - startTime },
});
throw error;
}
});
return server;
}
2. 实时监控
# monitoring/security_monitor.py
import asyncio
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Dict, List, Callable
from dataclasses import dataclass, field
from enum import Enum
class AlertSeverity(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class SecurityAlert:
timestamp: datetime
severity: AlertSeverity
alert_type: str
description: str
source: str
metadata: dict = field(default_factory=dict)
class SecurityMonitor:
"""安全监控器"""
def __init__(self):
self.event_counts: Dict[str, List[datetime]] = defaultdict(list)
self.alert_handlers: List[Callable[[SecurityAlert], None]] = []
self.running = False
# 阈值配置
self.thresholds = {
"failed_auth": {"count": 5, "window": 300}, # 5分钟内5次失败
"tool_errors": {"count": 10, "window": 60}, # 1分钟内10次错误
"suspicious_input": {"count": 3, "window": 300},
}
def add_alert_handler(self, handler: Callable[[SecurityAlert], None]):
"""添加告警处理器"""
self.alert_handlers.append(handler)
async def start_monitoring(self):
"""开始监控"""
self.running = True
while self.running:
self._cleanup_old_events()
self._check_thresholds()
await asyncio.sleep(10)
def stop(self):
"""停止监控"""
self.running = False
def record_event(self, event_type: str, metadata: dict = None):
"""记录事件"""
self.event_counts[event_type].append(datetime.now())
# 实时检查特定事件
if event_type == "suspicious_input":
self._trigger_alert(
AlertSeverity.HIGH,
"suspicious_input",
"检测到可疑输入",
"input_validator",
metadata or {}
)
def _cleanup_old_events(self):
"""清理过期事件"""
now = datetime.now()
max_window = max(t["window"] for t in self.thresholds.values())
cutoff = now - timedelta(seconds=max_window)
for event_type in self.event_counts:
self.event_counts[event_type] = [
t for t in self.event_counts[event_type] if t > cutoff
]
def _check_thresholds(self):
"""检查阈值"""
now = datetime.now()
for event_type, threshold in self.thresholds.items():
events = self.event_counts[event_type]
window_start = now - timedelta(seconds=threshold["window"])
recent_events = [e for e in events if e > window_start]
if len(recent_events) >= threshold["count"]:
severity = self._calculate_severity(event_type, len(recent_events))
self._trigger_alert(
severity,
f"threshold_exceeded_{event_type}",
f"{event_type} 超过阈值: {len(recent_events)} 次/{threshold['window']}秒",
"threshold_monitor",
{"count": len(recent_events), "threshold": threshold["count"]}
)
def _calculate_severity(self, event_type: str, count: int) -> AlertSeverity:
"""计算告警级别"""
if event_type == "failed_auth":
if count >= 20:
return AlertSeverity.CRITICAL
elif count >= 10:
return AlertSeverity.HIGH
return AlertSeverity.MEDIUM
elif event_type == "tool_errors":
if count >= 50:
return AlertSeverity.CRITICAL
elif count >= 20:
return AlertSeverity.HIGH
return AlertSeverity.LOW
return AlertSeverity.MEDIUM
def _trigger_alert(self, severity: AlertSeverity, alert_type: str,
description: str, source: str, metadata: dict):
"""触发告警"""
alert = SecurityAlert(
timestamp=datetime.now(),
severity=severity,
alert_type=alert_type,
description=description,
source=source,
metadata=metadata
)
for handler in self.alert_handlers:
try:
handler(alert)
except Exception as e:
print(f"告警处理器失败: {e}")
# 告警处理器示例
async def slack_alert_handler(alert: SecurityAlert):
"""Slack 告警"""
import aiohttp
webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
color_map = {
AlertSeverity.LOW: "#36a64f",
AlertSeverity.MEDIUM: "#daa520",
AlertSeverity.HIGH: "#ff6600",
AlertSeverity.CRITICAL: "#ff0000",
}
payload = {
"attachments": [{
"color": color_map.get(alert.severity, "#808080"),
"title": f"🚨 MCP 安全告警 - {alert.severity.value.upper()}",
"fields": [
{"title": "类型", "value": alert.alert_type, "short": True},
{"title": "来源", "value": alert.source, "short": True},
{"title": "描述", "value": alert.description, "short": False},
{"title": "时间", "value": alert.timestamp.isoformat(), "short": True},
],
}]
}
async with aiohttp.ClientSession() as session:
await session.post(webhook_url, json=payload)
企业级部署方案
1. 安全架构设计
┌─────────────────────────────────────────────────────────────────┐
│ 企业级 MCP 安全架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ WAF/API │──────▶ Gateway │──────▶ Load │ │
│ │ Gateway │ │ (Kong/ │ │ Balancer │ │
│ │ │ │ Istio) │ │ │ │
│ └─────────────┘ └─────────────┘ └──────┬──────┘ │
│ │ │
│ ┌─────────────────────────────────────────────────┘ │
│ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ │ MCP Server │ │ MCP Server │ │ MCP Server │ │
│ │ │ (Auth) │ │ (Files) │ │ (DB) │ │
│ │ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │ │
│ │ └──────────────────┼──────────────────┘ │
│ │ │ │
│ │ ┌────────┴────────┐ │
│ │ │ Service Mesh │ │
│ │ │ (mTLS/Istio) │ │
│ │ └────────┬────────┘ │
│ │ │ │
│ └─────────────────────────────┼───────────────────────────────┘
│ │
│ ┌────────────┴────────────┐
│ │ │
│ ┌─────┴─────┐ ┌─────┴─────┐
│ │ Vault │ │ Audit │
│ │ (Secrets) │ │ Log │
│ └───────────┘ └───────────┘
│
└─────────────────────────────────────────────────────────────────┘
2. Kubernetes 部署配置
# k8s/mcp-server-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: mcp-secure-server
namespace: mcp
spec:
replicas: 3
selector:
matchLabels:
app: mcp-secure-server
template:
metadata:
labels:
app: mcp-secure-server
spec:
serviceAccountName: mcp-server-sa
securityContext:
runAsNonRoot: true
runAsUser: 1000
fsGroup: 1000
containers:
- name: server
image: registry.company.com/mcp/secure-server:v1.0.0
imagePullPolicy: Always
ports:
- containerPort: 3000
name: http
env:
- name: NODE_ENV
value: "production"
- name: JWT_SECRET
valueFrom:
secretKeyRef:
name: mcp-secrets
key: jwt-secret
- name: ALLOWED_DIRS
value: "/data/readonly"
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
securityContext:
allowPrivilegeEscalation: false
readOnlyRootFilesystem: true
capabilities:
drop:
- ALL
volumeMounts:
- name: data
mountPath: /data/readonly
readOnly: true
- name: tmp
mountPath: /tmp
volumes:
- name: data
persistentVolumeClaim:
claimName: mcp-data-pvc
- name: tmp
emptyDir:
sizeLimit: 100Mi
---
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: mcp-server-network-policy
namespace: mcp
spec:
podSelector:
matchLabels:
app: mcp-secure-server
policyTypes:
- Ingress
- Egress
ingress:
- from:
- namespaceSelector:
matchLabels:
name: gateway
ports:
- protocol: TCP
port: 3000
egress:
- to:
- namespaceSelector:
matchLabels:
name: observability
ports:
- protocol: TCP
port: 4317 # OTLP
3. 密钥管理
# secrets/vault_integration.py
import hvac
from typing import Optional
class VaultSecretsManager:
"""HashiCorp Vault 密钥管理"""
def __init__(self, vault_url: str, token: Optional[str] = None):
self.client = hvac.Client(url=vault_url)
if token:
self.client.token = token
elif not self.client.is_authenticated():
# 使用 Kubernetes 认证
self._k8s_auth()
def _k8s_auth(self):
"""Kubernetes 服务账号认证"""
with open("/var/run/secrets/kubernetes.io/serviceaccount/token") as f:
jwt = f.read()
self.client.auth.kubernetes.login(
role="mcp-server",
jwt=jwt
)
def get_secret(self, path: str, key: str) -> str:
"""获取密钥"""
response = self.client.secrets.kv.v2.read_secret_version(
path=path
)
return response["data"]["data"][key]
def rotate_api_key(self, key_id: str) -> tuple[str, str]:
"""轮换 API Key"""
# 生成新密钥
import secrets
new_key = f"mcp_{secrets.token_urlsafe(32)}"
# 存储到 Vault
self.client.secrets.kv.v2.create_or_update_secret(
path=f"api-keys/{key_id}",
secret={"key": new_key, "rotated_at": datetime.now().isoformat()}
)
return key_id, new_key
# 在应用中使用
secrets_manager = VaultSecretsManager("https://vault.company.com")
def get_database_password():
return secrets_manager.get_secret("mcp/database", "password")
安全最佳实践清单
开发阶段
- 所有输入进行严格验证
- 使用参数化查询防止注入
- 实现最小权限原则
- 敏感操作添加二次确认
- 错误信息不泄露系统细节
部署阶段
- 启用 TLS 加密通信
- 配置网络隔离策略
- 使用密钥管理系统
- 启用审计日志
- 配置监控告警
运维阶段
- 定期轮换密钥
- 审查访问日志
- 更新依赖组件
- 进行安全渗透测试
- 制定应急响应计划
总结
本文深入探讨了 MCP 协议的企业级安全实践:
✅ 威胁建模 - 全面的安全风险分析
✅ 认证授权 - OAuth、API Key、RBAC 实现
✅ 输入安全 - 严格验证与沙箱隔离
✅ 审计监控 - 完整的日志与告警系统
✅ 企业部署 - Kubernetes 安全架构
安全是一个持续的过程,建议:
- 定期进行安全审计
- 关注 MCP 协议安全更新
- 参与社区安全讨论
- 建立安全响应流程
本文最后更新于 2024-02-20,如有问题欢迎在社区讨论。