AI Agent 进阶 生产部署 Docker Kubernetes 监控

AI Agent生产部署指南:从开发到上线的完整实践

AIEng Hub
阅读约 32 分钟

引言

开发一个Agent只是第一步,将其稳定、安全、高效地部署到生产环境才是真正的挑战。本文将从实际生产经验出发,系统讲解AI Agent生产部署的完整流程和最佳实践。

生产部署架构设计

典型生产架构

┌─────────────────────────────────────────────────────────────────┐
│                         生产环境架构                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  ┌──────────────┐                                               │
│  │   负载均衡器   │  Nginx / AWS ALB / CloudFlare                 │
│  │ Load Balancer│                                               │
│  └──────┬───────┘                                               │
│         │                                                        │
│  ┌──────┴───────────────────────────────────────┐               │
│  │              Kubernetes Cluster               │               │
│  │  ┌─────────────┐  ┌─────────────┐            │               │
│  │  │  Agent Pod  │  │  Agent Pod  │  (HPA)     │               │
│  │  │  Replica 1  │  │  Replica 2  │            │               │
│  │  └──────┬──────┘  └──────┬──────┘            │               │
│  │         └─────────────────┘                   │               │
│  │                    │                          │               │
│  │  ┌─────────────────┼─────────────────┐       │               │
│  │  │                 │                 │       │               │
│  │  ▼                 ▼                 ▼       │               │
│  │  ┌─────────┐  ┌─────────┐  ┌─────────────┐   │               │
│  │  │  Redis  │  │  Vector │  │   Postgres  │   │               │
│  │  │  Cache  │  │   DB    │  │   Metadata  │   │               │
│  │  └─────────┘  └─────────┘  └─────────────┘   │               │
│  └───────────────────────────────────────────────┘               │
│                                                                  │
│  ┌───────────────────────────────────────────────┐               │
│  │              监控与日志系统                      │               │
│  │  Prometheus + Grafana + ELK Stack            │               │
│  └───────────────────────────────────────────────┘               │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

核心组件说明

组件用途推荐方案
API网关流量入口、认证、限流Nginx, Kong, AWS API Gateway
应用层Agent服务运行Kubernetes + Docker
缓存层会话状态、临时数据Redis Cluster
向量存储长期记忆、知识检索Chroma, Pinecone, Weaviate
关系数据库用户数据、配置信息PostgreSQL, MySQL
监控系统指标采集、告警Prometheus + Grafana
日志系统日志收集、分析ELK Stack, Loki

容器化部署

1. Dockerfile 最佳实践

# Dockerfile.agent
FROM python:3.11-slim as builder

# 安装构建依赖
RUN apt-get update && apt-get install -y \
    gcc \
    g++ \
    && rm -rf /var/lib/apt/lists/*

# 创建虚拟环境
RUN python -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"

# 安装Python依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 生产阶段
FROM python:3.11-slim

# 安全:创建非root用户
RUN groupadd -r agent && useradd -r -g agent agent

# 从builder复制虚拟环境
COPY --from=builder /opt/venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"

# 设置工作目录
WORKDIR /app

# 复制应用代码
COPY --chown=agent:agent ./src ./src
COPY --chown=agent:agent ./config ./config

# 切换非root用户
USER agent

# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD python -c "import requests; requests.get('http://localhost:8000/health')" || exit 1

# 暴露端口
EXPOSE 8000

# 启动命令
CMD ["uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]

2. Docker Compose 本地开发

# docker-compose.yml
version: '3.8'

services:
  agent:
    build:
      context: .
      dockerfile: Dockerfile.agent
    ports:
      - "8000:8000"
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - REDIS_URL=redis://redis:6379
      - DATABASE_URL=postgresql://postgres:password@postgres:5432/agent_db
      - VECTOR_DB_PATH=/data/vector_db
    volumes:
      - vector_data:/data/vector_db
      - ./logs:/app/logs
    depends_on:
      - redis
      - postgres
      - chroma
    networks:
      - agent-network
    restart: unless-stopped

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    networks:
      - agent-network
    restart: unless-stopped

  postgres:
    image: postgres:15-alpine
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: password
      POSTGRES_DB: agent_db
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data
    networks:
      - agent-network
    restart: unless-stopped

  chroma:
    image: chromadb/chroma:latest
    ports:
      - "8001:8000"
    volumes:
      - chroma_data:/chroma/chroma
    networks:
      - agent-network
    restart: unless-stopped

  # 监控
  prometheus:
    image: prom/prometheus:latest
    ports:
      - "9090:9090"
    volumes:
      - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
      - prometheus_data:/prometheus
    networks:
      - agent-network

  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
    volumes:
      - grafana_data:/var/lib/grafana
      - ./monitoring/grafana/dashboards:/etc/grafana/provisioning/dashboards
    networks:
      - agent-network

volumes:
  vector_data:
  redis_data:
  postgres_data:
  chroma_data:
  prometheus_data:
  grafana_data:

networks:
  agent-network:
    driver: bridge

3. Kubernetes 部署配置

# k8s/namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
  name: agent-system
  labels:
    name: agent-system

---
# k8s/configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: agent-config
  namespace: agent-system
data:
  LOG_LEVEL: "INFO"
  MAX_WORKERS: "4"
  REQUEST_TIMEOUT: "30"
  RATE_LIMIT_RPS: "10"

---
# k8s/secret.yaml
apiVersion: v1
kind: Secret
metadata:
  name: agent-secrets
  namespace: agent-system
type: Opaque
stringData:
  OPENAI_API_KEY: "sk-..."
  DATABASE_URL: "postgresql://..."
  REDIS_PASSWORD: "..."

---
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: agent-deployment
  namespace: agent-system
  labels:
    app: agent
spec:
  replicas: 3
  selector:
    matchLabels:
      app: agent
  template:
    metadata:
      labels:
        app: agent
    spec:
      containers:
      - name: agent
        image: your-registry/agent:latest
        ports:
        - containerPort: 8000
          name: http
        envFrom:
        - configMapRef:
            name: agent-config
        env:
        - name: OPENAI_API_KEY
          valueFrom:
            secretKeyRef:
              name: agent-secrets
              key: OPENAI_API_KEY
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: agent-secrets
              key: DATABASE_URL
        resources:
          requests:
            memory: "512Mi"
            cpu: "250m"
          limits:
            memory: "2Gi"
            cpu: "1000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5
        volumeMounts:
        - name: tmp
          mountPath: /tmp
      volumes:
      - name: tmp
        emptyDir: {}
      securityContext:
        runAsNonRoot: true
        runAsUser: 1000
        fsGroup: 1000

---
# k8s/service.yaml
apiVersion: v1
kind: Service
metadata:
  name: agent-service
  namespace: agent-system
spec:
  selector:
    app: agent
  ports:
  - protocol: TCP
    port: 80
    targetPort: 8000
  type: ClusterIP

---
# k8s/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: agent-hpa
  namespace: agent-system
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: agent-deployment
  minReplicas: 3
  maxReplicas: 20
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80
  behavior:
    scaleUp:
      stabilizationWindowSeconds: 60
      policies:
      - type: Percent
        value: 100
        periodSeconds: 15
    scaleDown:
      stabilizationWindowSeconds: 300
      policies:
      - type: Percent
        value: 10
        periodSeconds: 60

---
# k8s/ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: agent-ingress
  namespace: agent-system
  annotations:
    nginx.ingress.kubernetes.io/rate-limit: "100"
    nginx.ingress.kubernetes.io/rate-limit-window: "1m"
    cert-manager.io/cluster-issuer: "letsencrypt-prod"
spec:
  tls:
  - hosts:
    - api.yourdomain.com
    secretName: agent-tls
  rules:
  - host: api.yourdomain.com
    http:
      paths:
      - path: /
        pathType: Prefix
        backend:
          service:
            name: agent-service
            port:
              number: 80

监控与可观测性

1. 应用指标采集

# monitoring/metrics.py
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from fastapi import FastAPI, Response
import time

# 定义指标
REQUEST_COUNT = Counter(
    'agent_requests_total',
    'Total requests',
    ['method', 'endpoint', 'status']
)

REQUEST_DURATION = Histogram(
    'agent_request_duration_seconds',
    'Request duration',
    ['method', 'endpoint'],
    buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0]
)

LLM_CALLS = Counter(
    'agent_llm_calls_total',
    'LLM API calls',
    ['model', 'status']
)

LLM_LATENCY = Histogram(
    'agent_llm_latency_seconds',
    'LLM API latency',
    ['model'],
    buckets=[0.5, 1.0, 2.0, 5.0, 10.0, 30.0]
)

ACTIVE_SESSIONS = Gauge(
    'agent_active_sessions',
    'Number of active sessions'
)

TOOL_CALLS = Counter(
    'agent_tool_calls_total',
    'Tool calls',
    ['tool_name', 'status']
)

MEMORY_USAGE = Gauge(
    'agent_memory_usage_bytes',
    'Memory usage',
    ['type']
)

def setup_metrics(app: FastAPI):
    """配置指标采集"""
    
    @app.middleware("http")
    async def metrics_middleware(request, call_next):
        start_time = time.time()
        
        response = await call_next(request)
        
        duration = time.time() - start_time
        
        # 记录指标
        REQUEST_COUNT.labels(
            method=request.method,
            endpoint=request.url.path,
            status=response.status_code
        ).inc()
        
        REQUEST_DURATION.labels(
            method=request.method,
            endpoint=request.url.path
        ).observe(duration)
        
        return response
    
    @app.get("/metrics")
    async def metrics():
        return Response(
            content=generate_latest(),
            media_type="text/plain"
        )

class LLMMetrics:
    """LLM调用指标追踪"""
    
    def __init__(self, model: str):
        self.model = model
        self.start_time = None
    
    def __enter__(self):
        self.start_time = time.time()
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        duration = time.time() - self.start_time
        status = "success" if exc_type is None else "error"
        
        LLM_CALLS.labels(model=self.model, status=status).inc()
        LLM_LATENCY.labels(model=self.model).observe(duration)

# 使用示例
async def call_llm_with_metrics(prompt: str, model: str = "gpt-4"):
    with LLMMetrics(model):
        # 实际的LLM调用
        response = await openai_client.chat.completions.create(
            model=model,
            messages=[{"role": "user", "content": prompt}]
        )
        return response

2. 日志管理

# logging_config.py
import logging
import sys
from pythonjsonlogger import jsonlogger
from logging.handlers import RotatingFileHandler

def setup_logging(
    log_level: str = "INFO",
    log_format: str = "json",
    log_file: str = None
):
    """配置结构化日志"""
    
    # 创建logger
    logger = logging.getLogger()
    logger.setLevel(getattr(logging, log_level.upper()))
    
    # 清除现有处理器
    logger.handlers = []
    
    # 日志格式
    if log_format == "json":
        formatter = jsonlogger.JsonFormatter(
            '%(timestamp)s %(level)s %(name)s %(message)s '
            '%(request_id)s %(user_id)s %(duration_ms)s',
            rename_fields={
                'levelname': 'level',
                'asctime': 'timestamp'
            }
        )
    else:
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
    
    # 控制台输出
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setFormatter(formatter)
    logger.addHandler(console_handler)
    
    # 文件输出
    if log_file:
        file_handler = RotatingFileHandler(
            log_file,
            maxBytes=10*1024*1024,  # 10MB
            backupCount=5
        )
        file_handler.setFormatter(formatter)
        logger.addHandler(file_handler)
    
    return logger

# 上下文日志
import contextvars

request_id_var = contextvars.ContextVar('request_id', default=None)
user_id_var = contextvars.ContextVar('user_id', default=None)

class ContextualLogger:
    """带上下文的日志记录器"""
    
    def __init__(self, name: str):
        self.logger = logging.getLogger(name)
    
    def _log(self, level: int, message: str, extra: dict = None):
        extra = extra or {}
        extra['request_id'] = request_id_var.get()
        extra['user_id'] = user_id_var.get()
        self.logger.log(level, message, extra=extra)
    
    def info(self, message: str, extra: dict = None):
        self._log(logging.INFO, message, extra)
    
    def error(self, message: str, extra: dict = None):
        self._log(logging.ERROR, message, extra)
    
    def warning(self, message: str, extra: dict = None):
        self._log(logging.WARNING, message, extra)

# FastAPI中间件
from fastapi import Request
import uuid

async def logging_middleware(request: Request, call_next):
    """请求日志中间件"""
    request_id = str(uuid.uuid4())
    request_id_var.set(request_id)
    
    start_time = time.time()
    
    logger = ContextualLogger("agent.api")
    logger.info(
        f"Request started: {request.method} {request.url.path}",
        extra={"client_ip": request.client.host}
    )
    
    try:
        response = await call_next(request)
        duration = (time.time() - start_time) * 1000
        
        logger.info(
            f"Request completed: {response.status_code}",
            extra={
                "status_code": response.status_code,
                "duration_ms": duration
            }
        )
        
        response.headers["X-Request-ID"] = request_id
        return response
        
    except Exception as e:
        duration = (time.time() - start_time) * 1000
        logger.error(
            f"Request failed: {str(e)}",
            extra={
                "error": str(e),
                "duration_ms": duration
            }
        )
        raise

3. 告警规则

# monitoring/prometheus-rules.yaml
groups:
- name: agent-alerts
  rules:
  # 高错误率告警
  - alert: HighErrorRate
    expr: |
      (
        sum(rate(agent_requests_total{status=~"5.."}[5m]))
        /
        sum(rate(agent_requests_total[5m]))
      ) > 0.05
    for: 5m
    labels:
      severity: critical
    annotations:
      summary: "Agent错误率过高"
      description: "错误率超过5%,当前值: {{ $value }}"

  # LLM调用失败告警
  - alert: LLMCallFailures
    expr: |
      sum(rate(agent_llm_calls_total{status="error"}[10m])) > 10
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "LLM调用频繁失败"
      description: "10分钟内失败次数: {{ $value }}"

  # 响应时间过长
  - alert: HighLatency
    expr: |
      histogram_quantile(0.95, 
        sum(rate(agent_request_duration_seconds_bucket[5m])) by (le)
      ) > 10
    for: 10m
    labels:
      severity: warning
    annotations:
      summary: "P95响应时间过长"
      description: "P95延迟: {{ $value }}s"

  # 内存使用过高
  - alert: HighMemoryUsage
    expr: |
      container_memory_usage_bytes{container="agent"}
      /
      container_spec_memory_limit_bytes{container="agent"}
      > 0.85
    for: 10m
    labels:
      severity: warning
    annotations:
      summary: "Agent内存使用过高"
      description: "内存使用率: {{ $value | humanizePercentage }}"

  # Pod重启频繁
  - alert: FrequentPodRestarts
    expr: |
      increase(kube_pod_container_status_restarts_total{
        container="agent"
      }[1h]) > 5
    for: 0m
    labels:
      severity: critical
    annotations:
      summary: "Agent Pod频繁重启"
      description: "1小时内重启次数: {{ $value }}"

性能优化

1. 连接池管理

# utils/connection_pool.py
import asyncio
from contextlib import asynccontextmanager
from typing import AsyncGenerator
import aioredis
import asyncpg

class ConnectionPools:
    """全局连接池管理"""
    
    def __init__(self):
        self.redis_pool = None
        self.db_pool = None
    
    async def initialize(self):
        """初始化所有连接池"""
        # Redis连接池
        self.redis_pool = aioredis.ConnectionPool.from_url(
            "redis://localhost",
            max_connections=50,
            min_connections=10
        )
        
        # PostgreSQL连接池
        self.db_pool = await asyncpg.create_pool(
            "postgresql://user:pass@localhost/db",
            min_size=10,
            max_size=50,
            command_timeout=60
        )
    
    async def close(self):
        """关闭所有连接池"""
        if self.redis_pool:
            await self.redis_pool.disconnect()
        if self.db_pool:
            await self.db_pool.close()

# 全局实例
pools = ConnectionPools()

@asynccontextmanager
async def get_redis() -> AsyncGenerator[aioredis.Redis, None]:
    """获取Redis连接"""
    redis = aioredis.Redis(connection_pool=pools.redis_pool)
    try:
        yield redis
    finally:
        await redis.close()

@asynccontextmanager
async def get_db() -> AsyncGenerator[asyncpg.Connection, None]:
    """获取数据库连接"""
    async with pools.db_pool.acquire() as conn:
        yield conn

2. 缓存策略

# utils/cache.py
import json
import hashlib
from functools import wraps
from typing import Optional, Any
import aioredis

class CacheManager:
    """多级缓存管理"""
    
    def __init__(self, redis_client: aioredis.Redis):
        self.redis = redis_client
        self.local_cache = {}  # L1: 本地内存
        self.local_ttl = 60  # 本地缓存60秒
    
    def _generate_key(self, prefix: str, *args, **kwargs) -> str:
        """生成缓存key"""
        key_data = json.dumps({"args": args, "kwargs": kwargs}, sort_keys=True)
        hash_val = hashlib.md5(key_data.encode()).hexdigest()
        return f"{prefix}:{hash_val}"
    
    async def get(self, key: str) -> Optional[Any]:
        """获取缓存"""
        # L1: 本地缓存
        if key in self.local_cache:
            return self.local_cache[key]
        
        # L2: Redis缓存
        value = await self.redis.get(key)
        if value:
            data = json.loads(value)
            self.local_cache[key] = data
            return data
        
        return None
    
    async def set(self, key: str, value: Any, ttl: int = 3600):
        """设置缓存"""
        # 更新本地缓存
        self.local_cache[key] = value
        
        # 更新Redis
        await self.redis.setex(
            key,
            ttl,
            json.dumps(value)
        )
    
    async def delete(self, key: str):
        """删除缓存"""
        self.local_cache.pop(key, None)
        await self.redis.delete(key)

def cached(prefix: str, ttl: int = 3600):
    """缓存装饰器"""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # 生成key
            cache_key = f"{prefix}:{func.__name__}"
            
            # 尝试获取缓存
            cache = kwargs.get('cache_manager')
            if cache:
                cached_value = await cache.get(cache_key)
                if cached_value is not None:
                    return cached_value
            
            # 执行函数
            result = await func(*args, **kwargs)
            
            # 写入缓存
            if cache:
                await cache.set(cache_key, result, ttl)
            
            return result
        return wrapper
    return decorator

# 使用示例
class AgentService:
    def __init__(self, cache_manager: CacheManager):
        self.cache = cache_manager
    
    @cached(prefix="llm_response", ttl=1800)
    async def generate_response(self, query: str, **kwargs):
        """生成响应(带缓存)"""
        # 实际的LLM调用
        response = await self.call_llm(query)
        return response

3. 流式响应优化

# utils/streaming.py
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from typing import AsyncGenerator
import asyncio

async def stream_llm_response(prompt: str, model: str = "gpt-4") -> AsyncGenerator[str, None]:
    """流式LLM响应"""
    
    # 模拟流式响应
    response_chunks = [
        "这是",
        "一个",
        "流式",
        "响应",
        "示例。",
    ]
    
    for chunk in response_chunks:
        yield f"data: {chunk}\n\n"
        await asyncio.sleep(0.1)  # 模拟延迟
    
    yield "data: [DONE]\n\n"

@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
    """流式聊天接口"""
    return StreamingResponse(
        stream_llm_response(request.message),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
        }
    )

安全加固

1. API安全

# security/middleware.py
from fastapi import FastAPI, Request, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from slowapi import Limiter
from slowapi.util import get_remote_address
import jwt
import re

# 限流器
limiter = Limiter(key_func=get_remote_address)

# JWT验证
security = HTTPBearer()

async def verify_token(credentials: HTTPAuthorizationCredentials):
    """验证JWT Token"""
    try:
        payload = jwt.decode(
            credentials.credentials,
            SECRET_KEY,
            algorithms=["HS256"]
        )
        return payload
    except jwt.ExpiredSignatureError:
        raise HTTPException(status_code=401, detail="Token已过期")
    except jwt.InvalidTokenError:
        raise HTTPException(status_code=401, detail="无效的Token")

# 输入验证
class InputValidator:
    """输入验证器"""
    
    # 危险字符模式
    DANGEROUS_PATTERNS = [
        r'<script[^>]*>.*?</script>',
        r'javascript:',
        r'on\w+\s*=',
        r'\.\.\/',
        r'\.\.\\',
    ]
    
    @classmethod
    def sanitize(cls, text: str) -> str:
        """清理输入"""
        # 移除危险模式
        for pattern in cls.DANGEROUS_PATTERNS:
            text = re.sub(pattern, '', text, flags=re.IGNORECASE | re.DOTALL)
        
        # 限制长度
        max_length = 10000
        if len(text) > max_length:
            text = text[:max_length]
        
        return text.strip()
    
    @classmethod
    def validate_prompt(cls, prompt: str) -> bool:
        """验证Prompt安全性"""
        # 检查Prompt注入攻击
        injection_keywords = [
            "ignore previous instructions",
            "disregard",
            "system prompt",
            "you are now",
        ]
        
        prompt_lower = prompt.lower()
        for keyword in injection_keywords:
            if keyword in prompt_lower:
                return False
        
        return True

# 应用到FastAPI
app = FastAPI()
app.state.limiter = limiter

@app.middleware("http")
async def security_middleware(request: Request, call_next):
    """安全中间件"""
    # 1. 输入清理
    if request.method == "POST":
        body = await request.body()
        # 这里可以添加请求体验证
    
    # 2. 检查User-Agent
    user_agent = request.headers.get("user-agent", "")
    if not user_agent or len(user_agent) < 10:
        raise HTTPException(status_code=403, detail="无效的User-Agent")
    
    response = await call_next(request)
    
    # 3. 安全响应头
    response.headers["X-Content-Type-Options"] = "nosniff"
    response.headers["X-Frame-Options"] = "DENY"
    response.headers["X-XSS-Protection"] = "1; mode=block"
    response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
    
    return response

@app.post("/chat")
@limiter.limit("10/minute")
async def chat(request: Request, chat_request: ChatRequest):
    """带限流的聊天接口"""
    # 验证输入
    sanitized_input = InputValidator.sanitize(chat_request.message)
    
    if not InputValidator.validate_prompt(sanitized_input):
        raise HTTPException(status_code=400, detail="检测到不安全的输入")
    
    # 处理请求
    response = await process_chat(sanitized_input)
    return response

2. 数据安全

# security/encryption.py
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64
import os

class DataEncryption:
    """数据加密管理"""
    
    def __init__(self, master_key: str = None):
        self.master_key = master_key or os.environ.get("ENCRYPTION_KEY")
        self.cipher = self._create_cipher()
    
    def _create_cipher(self) -> Fernet:
        """创建加密器"""
        kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=32,
            salt=os.urandom(16),
            iterations=100000,
        )
        key = base64.urlsafe_b64encode(kdf.derive(self.master_key.encode()))
        return Fernet(key)
    
    def encrypt(self, data: str) -> str:
        """加密数据"""
        return self.cipher.encrypt(data.encode()).decode()
    
    def decrypt(self, encrypted_data: str) -> str:
        """解密数据"""
        return self.cipher.decrypt(encrypted_data.encode()).decode()
    
    def encrypt_sensitive_fields(self, data: dict, fields: list) -> dict:
        """加密敏感字段"""
        result = data.copy()
        for field in fields:
            if field in result:
                result[field] = self.encrypt(str(result[field]))
        return result

# 使用示例
encryption = DataEncryption()

# 加密API密钥
encrypted_key = encryption.encrypt("sk-...")

# 加密用户敏感信息
user_data = {
    "name": "张三",
    "email": "zhangsan@example.com",
    "phone": "13800138000"
}
encrypted_data = encryption.encrypt_sensitive_fields(
    user_data, 
    ["email", "phone"]
)

CI/CD 流程

1. GitHub Actions 工作流

# .github/workflows/deploy.yml
name: CI/CD Pipeline

on:
  push:
    branches: [ main, develop ]
  pull_request:
    branches: [ main ]

env:
  REGISTRY: ghcr.io
  IMAGE_NAME: ${{ github.repository }}

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v3
    
    - name: Set up Python
      uses: actions/setup-python@v4
      with:
        python-version: '3.11'
    
    - name: Install dependencies
      run: |
        pip install -r requirements.txt
        pip install -r requirements-dev.txt
    
    - name: Run linting
      run: |
        flake8 src/
        black --check src/
        mypy src/
    
    - name: Run tests
      run: |
        pytest tests/ --cov=src --cov-report=xml
    
    - name: Upload coverage
      uses: codecov/codecov-action@v3
      with:
        file: ./coverage.xml

  build:
    needs: test
    runs-on: ubuntu-latest
    permissions:
      contents: read
      packages: write
    steps:
    - uses: actions/checkout@v3
    
    - name: Set up Docker Buildx
      uses: docker/setup-buildx-action@v2
    
    - name: Log in to Container Registry
      uses: docker/login-action@v2
      with:
        registry: ${{ env.REGISTRY }}
        username: ${{ github.actor }}
        password: ${{ secrets.GITHUB_TOKEN }}
    
    - name: Extract metadata
      id: meta
      uses: docker/metadata-action@v4
      with:
        images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
        tags: |
          type=ref,event=branch
          type=ref,event=pr
          type=sha,prefix={{branch}}-
          type=raw,value=latest,enable={{is_default_branch}}
    
    - name: Build and push
      uses: docker/build-push-action@v4
      with:
        context: .
        push: true
        tags: ${{ steps.meta.outputs.tags }}
        labels: ${{ steps.meta.outputs.labels }}
        cache-from: type=gha
        cache-to: type=gha,mode=max

  deploy-staging:
    needs: build
    if: github.ref == 'refs/heads/develop'
    runs-on: ubuntu-latest
    environment: staging
    steps:
    - uses: actions/checkout@v3
    
    - name: Deploy to Staging
      run: |
        echo "${{ secrets.KUBECONFIG_STAGING }}" | base64 -d > kubeconfig
        export KUBECONFIG=kubeconfig
        
        # 更新镜像
        kubectl set image deployment/agent-deployment \
          agent=${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:develop-${{ github.sha }} \
          -n agent-system
        
        # 等待部署完成
        kubectl rollout status deployment/agent-deployment -n agent-system

  deploy-production:
    needs: build
    if: github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest
    environment: production
    steps:
    - uses: actions/checkout@v3
    
    - name: Deploy to Production
      run: |
        echo "${{ secrets.KUBECONFIG_PRODUCTION }}" | base64 -d > kubeconfig
        export KUBECONFIG=kubeconfig
        
        # 使用蓝绿部署
        kubectl apply -f k8s/deployment-green.yaml
        kubectl rollout status deployment/agent-deployment-green -n agent-system
        
        # 切换服务
        kubectl patch service agent-service -p '{"spec":{"selector":{"version":"green"}}}'
        
        # 清理旧版本
        kubectl delete deployment agent-deployment-blue -n agent-system || true

2. 部署脚本

#!/bin/bash
# scripts/deploy.sh

set -e

ENVIRONMENT=$1
VERSION=$2

if [ -z "$ENVIRONMENT" ] || [ -z "$VERSION" ]; then
    echo "Usage: ./deploy.sh <environment> <version>"
    echo "Example: ./deploy.sh production v1.2.3"
    exit 1
fi

echo "🚀 Deploying version $VERSION to $ENVIRONMENT"

# 配置kubectl
if [ "$ENVIRONMENT" == "production" ]; then
    export KUBECONFIG=~/.kube/config-prod
    NAMESPACE="agent-prod"
else
    export KUBECONFIG=~/.kube/config-staging
    NAMESPACE="agent-staging"
fi

# 更新镜像
echo "📦 Updating deployment..."
kubectl set image deployment/agent-deployment \
    agent=ghcr.io/your-org/agent:$VERSION \
    -n $NAMESPACE

# 等待部署完成
echo "⏳ Waiting for rollout..."
kubectl rollout status deployment/agent-deployment -n $NAMESPACE --timeout=300s

# 验证部署
echo "✅ Verifying deployment..."
PODS=$(kubectl get pods -n $NAMESPACE -l app=agent -o jsonpath='{.items[*].status.phase}')
if [[ "$PODS" == *"Running"* ]]; then
    echo "✅ Deployment successful!"
    
    # 运行健康检查
    kubectl run health-check --rm -i --restart=Never \
        --image=curlimages/curl:latest \
        -- curl -s http://agent-service.$NAMESPACE.svc.cluster.local/health
else
    echo "❌ Deployment failed! Rolling back..."
    kubectl rollout undo deployment/agent-deployment -n $NAMESPACE
    exit 1
fi

echo "🎉 Deployment complete!"

总结

本文全面介绍了AI Agent生产部署的完整流程,核心要点包括:

  1. 架构设计:分层架构,包含网关、应用、数据、监控层
  2. 容器化:Docker多阶段构建,K8s编排,HPA自动扩缩容
  3. 可观测性:Prometheus指标,结构化日志,智能告警
  4. 性能优化:连接池、多级缓存、流式响应
  5. 安全加固:JWT认证、输入验证、数据加密
  6. CI/CD:自动化测试、镜像构建、蓝绿部署

生产部署 checklist:

  • 容器镜像安全扫描
  • 敏感信息使用Secret管理
  • 配置健康检查和就绪检查
  • 设置资源限制和请求
  • 配置水平自动扩缩容
  • 建立监控和告警体系
  • 制定回滚策略
  • 进行压力测试

相关资源


本文最后更新于 2024-02-25,如有问题欢迎在社区讨论。