引言
开发一个Agent只是第一步,将其稳定、安全、高效地部署到生产环境才是真正的挑战。本文将从实际生产经验出发,系统讲解AI Agent生产部署的完整流程和最佳实践。
生产部署架构设计
典型生产架构
┌─────────────────────────────────────────────────────────────────┐
│ 生产环境架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ │
│ │ 负载均衡器 │ Nginx / AWS ALB / CloudFlare │
│ │ Load Balancer│ │
│ └──────┬───────┘ │
│ │ │
│ ┌──────┴───────────────────────────────────────┐ │
│ │ Kubernetes Cluster │ │
│ │ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Agent Pod │ │ Agent Pod │ (HPA) │ │
│ │ │ Replica 1 │ │ Replica 2 │ │ │
│ │ └──────┬──────┘ └──────┬──────┘ │ │
│ │ └─────────────────┘ │ │
│ │ │ │ │
│ │ ┌─────────────────┼─────────────────┐ │ │
│ │ │ │ │ │ │
│ │ ▼ ▼ ▼ │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────────┐ │ │
│ │ │ Redis │ │ Vector │ │ Postgres │ │ │
│ │ │ Cache │ │ DB │ │ Metadata │ │ │
│ │ └─────────┘ └─────────┘ └─────────────┘ │ │
│ └───────────────────────────────────────────────┘ │
│ │
│ ┌───────────────────────────────────────────────┐ │
│ │ 监控与日志系统 │ │
│ │ Prometheus + Grafana + ELK Stack │ │
│ └───────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
核心组件说明
| 组件 | 用途 | 推荐方案 |
|---|---|---|
| API网关 | 流量入口、认证、限流 | Nginx, Kong, AWS API Gateway |
| 应用层 | Agent服务运行 | Kubernetes + Docker |
| 缓存层 | 会话状态、临时数据 | Redis Cluster |
| 向量存储 | 长期记忆、知识检索 | Chroma, Pinecone, Weaviate |
| 关系数据库 | 用户数据、配置信息 | PostgreSQL, MySQL |
| 监控系统 | 指标采集、告警 | Prometheus + Grafana |
| 日志系统 | 日志收集、分析 | ELK Stack, Loki |
容器化部署
1. Dockerfile 最佳实践
# Dockerfile.agent
FROM python:3.11-slim as builder
# 安装构建依赖
RUN apt-get update && apt-get install -y \
gcc \
g++ \
&& rm -rf /var/lib/apt/lists/*
# 创建虚拟环境
RUN python -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
# 安装Python依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 生产阶段
FROM python:3.11-slim
# 安全:创建非root用户
RUN groupadd -r agent && useradd -r -g agent agent
# 从builder复制虚拟环境
COPY --from=builder /opt/venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
# 设置工作目录
WORKDIR /app
# 复制应用代码
COPY --chown=agent:agent ./src ./src
COPY --chown=agent:agent ./config ./config
# 切换非root用户
USER agent
# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD python -c "import requests; requests.get('http://localhost:8000/health')" || exit 1
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
2. Docker Compose 本地开发
# docker-compose.yml
version: '3.8'
services:
agent:
build:
context: .
dockerfile: Dockerfile.agent
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- REDIS_URL=redis://redis:6379
- DATABASE_URL=postgresql://postgres:password@postgres:5432/agent_db
- VECTOR_DB_PATH=/data/vector_db
volumes:
- vector_data:/data/vector_db
- ./logs:/app/logs
depends_on:
- redis
- postgres
- chroma
networks:
- agent-network
restart: unless-stopped
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
networks:
- agent-network
restart: unless-stopped
postgres:
image: postgres:15-alpine
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: password
POSTGRES_DB: agent_db
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
networks:
- agent-network
restart: unless-stopped
chroma:
image: chromadb/chroma:latest
ports:
- "8001:8000"
volumes:
- chroma_data:/chroma/chroma
networks:
- agent-network
restart: unless-stopped
# 监控
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus_data:/prometheus
networks:
- agent-network
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- grafana_data:/var/lib/grafana
- ./monitoring/grafana/dashboards:/etc/grafana/provisioning/dashboards
networks:
- agent-network
volumes:
vector_data:
redis_data:
postgres_data:
chroma_data:
prometheus_data:
grafana_data:
networks:
agent-network:
driver: bridge
3. Kubernetes 部署配置
# k8s/namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
name: agent-system
labels:
name: agent-system
---
# k8s/configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: agent-config
namespace: agent-system
data:
LOG_LEVEL: "INFO"
MAX_WORKERS: "4"
REQUEST_TIMEOUT: "30"
RATE_LIMIT_RPS: "10"
---
# k8s/secret.yaml
apiVersion: v1
kind: Secret
metadata:
name: agent-secrets
namespace: agent-system
type: Opaque
stringData:
OPENAI_API_KEY: "sk-..."
DATABASE_URL: "postgresql://..."
REDIS_PASSWORD: "..."
---
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: agent-deployment
namespace: agent-system
labels:
app: agent
spec:
replicas: 3
selector:
matchLabels:
app: agent
template:
metadata:
labels:
app: agent
spec:
containers:
- name: agent
image: your-registry/agent:latest
ports:
- containerPort: 8000
name: http
envFrom:
- configMapRef:
name: agent-config
env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: agent-secrets
key: OPENAI_API_KEY
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: agent-secrets
key: DATABASE_URL
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "2Gi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
volumeMounts:
- name: tmp
mountPath: /tmp
volumes:
- name: tmp
emptyDir: {}
securityContext:
runAsNonRoot: true
runAsUser: 1000
fsGroup: 1000
---
# k8s/service.yaml
apiVersion: v1
kind: Service
metadata:
name: agent-service
namespace: agent-system
spec:
selector:
app: agent
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: ClusterIP
---
# k8s/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: agent-hpa
namespace: agent-system
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: agent-deployment
minReplicas: 3
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
behavior:
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Percent
value: 100
periodSeconds: 15
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 10
periodSeconds: 60
---
# k8s/ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: agent-ingress
namespace: agent-system
annotations:
nginx.ingress.kubernetes.io/rate-limit: "100"
nginx.ingress.kubernetes.io/rate-limit-window: "1m"
cert-manager.io/cluster-issuer: "letsencrypt-prod"
spec:
tls:
- hosts:
- api.yourdomain.com
secretName: agent-tls
rules:
- host: api.yourdomain.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: agent-service
port:
number: 80
监控与可观测性
1. 应用指标采集
# monitoring/metrics.py
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from fastapi import FastAPI, Response
import time
# 定义指标
REQUEST_COUNT = Counter(
'agent_requests_total',
'Total requests',
['method', 'endpoint', 'status']
)
REQUEST_DURATION = Histogram(
'agent_request_duration_seconds',
'Request duration',
['method', 'endpoint'],
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0]
)
LLM_CALLS = Counter(
'agent_llm_calls_total',
'LLM API calls',
['model', 'status']
)
LLM_LATENCY = Histogram(
'agent_llm_latency_seconds',
'LLM API latency',
['model'],
buckets=[0.5, 1.0, 2.0, 5.0, 10.0, 30.0]
)
ACTIVE_SESSIONS = Gauge(
'agent_active_sessions',
'Number of active sessions'
)
TOOL_CALLS = Counter(
'agent_tool_calls_total',
'Tool calls',
['tool_name', 'status']
)
MEMORY_USAGE = Gauge(
'agent_memory_usage_bytes',
'Memory usage',
['type']
)
def setup_metrics(app: FastAPI):
"""配置指标采集"""
@app.middleware("http")
async def metrics_middleware(request, call_next):
start_time = time.time()
response = await call_next(request)
duration = time.time() - start_time
# 记录指标
REQUEST_COUNT.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).inc()
REQUEST_DURATION.labels(
method=request.method,
endpoint=request.url.path
).observe(duration)
return response
@app.get("/metrics")
async def metrics():
return Response(
content=generate_latest(),
media_type="text/plain"
)
class LLMMetrics:
"""LLM调用指标追踪"""
def __init__(self, model: str):
self.model = model
self.start_time = None
def __enter__(self):
self.start_time = time.time()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
duration = time.time() - self.start_time
status = "success" if exc_type is None else "error"
LLM_CALLS.labels(model=self.model, status=status).inc()
LLM_LATENCY.labels(model=self.model).observe(duration)
# 使用示例
async def call_llm_with_metrics(prompt: str, model: str = "gpt-4"):
with LLMMetrics(model):
# 实际的LLM调用
response = await openai_client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}]
)
return response
2. 日志管理
# logging_config.py
import logging
import sys
from pythonjsonlogger import jsonlogger
from logging.handlers import RotatingFileHandler
def setup_logging(
log_level: str = "INFO",
log_format: str = "json",
log_file: str = None
):
"""配置结构化日志"""
# 创建logger
logger = logging.getLogger()
logger.setLevel(getattr(logging, log_level.upper()))
# 清除现有处理器
logger.handlers = []
# 日志格式
if log_format == "json":
formatter = jsonlogger.JsonFormatter(
'%(timestamp)s %(level)s %(name)s %(message)s '
'%(request_id)s %(user_id)s %(duration_ms)s',
rename_fields={
'levelname': 'level',
'asctime': 'timestamp'
}
)
else:
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# 控制台输出
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# 文件输出
if log_file:
file_handler = RotatingFileHandler(
log_file,
maxBytes=10*1024*1024, # 10MB
backupCount=5
)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
return logger
# 上下文日志
import contextvars
request_id_var = contextvars.ContextVar('request_id', default=None)
user_id_var = contextvars.ContextVar('user_id', default=None)
class ContextualLogger:
"""带上下文的日志记录器"""
def __init__(self, name: str):
self.logger = logging.getLogger(name)
def _log(self, level: int, message: str, extra: dict = None):
extra = extra or {}
extra['request_id'] = request_id_var.get()
extra['user_id'] = user_id_var.get()
self.logger.log(level, message, extra=extra)
def info(self, message: str, extra: dict = None):
self._log(logging.INFO, message, extra)
def error(self, message: str, extra: dict = None):
self._log(logging.ERROR, message, extra)
def warning(self, message: str, extra: dict = None):
self._log(logging.WARNING, message, extra)
# FastAPI中间件
from fastapi import Request
import uuid
async def logging_middleware(request: Request, call_next):
"""请求日志中间件"""
request_id = str(uuid.uuid4())
request_id_var.set(request_id)
start_time = time.time()
logger = ContextualLogger("agent.api")
logger.info(
f"Request started: {request.method} {request.url.path}",
extra={"client_ip": request.client.host}
)
try:
response = await call_next(request)
duration = (time.time() - start_time) * 1000
logger.info(
f"Request completed: {response.status_code}",
extra={
"status_code": response.status_code,
"duration_ms": duration
}
)
response.headers["X-Request-ID"] = request_id
return response
except Exception as e:
duration = (time.time() - start_time) * 1000
logger.error(
f"Request failed: {str(e)}",
extra={
"error": str(e),
"duration_ms": duration
}
)
raise
3. 告警规则
# monitoring/prometheus-rules.yaml
groups:
- name: agent-alerts
rules:
# 高错误率告警
- alert: HighErrorRate
expr: |
(
sum(rate(agent_requests_total{status=~"5.."}[5m]))
/
sum(rate(agent_requests_total[5m]))
) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "Agent错误率过高"
description: "错误率超过5%,当前值: {{ $value }}"
# LLM调用失败告警
- alert: LLMCallFailures
expr: |
sum(rate(agent_llm_calls_total{status="error"}[10m])) > 10
for: 5m
labels:
severity: warning
annotations:
summary: "LLM调用频繁失败"
description: "10分钟内失败次数: {{ $value }}"
# 响应时间过长
- alert: HighLatency
expr: |
histogram_quantile(0.95,
sum(rate(agent_request_duration_seconds_bucket[5m])) by (le)
) > 10
for: 10m
labels:
severity: warning
annotations:
summary: "P95响应时间过长"
description: "P95延迟: {{ $value }}s"
# 内存使用过高
- alert: HighMemoryUsage
expr: |
container_memory_usage_bytes{container="agent"}
/
container_spec_memory_limit_bytes{container="agent"}
> 0.85
for: 10m
labels:
severity: warning
annotations:
summary: "Agent内存使用过高"
description: "内存使用率: {{ $value | humanizePercentage }}"
# Pod重启频繁
- alert: FrequentPodRestarts
expr: |
increase(kube_pod_container_status_restarts_total{
container="agent"
}[1h]) > 5
for: 0m
labels:
severity: critical
annotations:
summary: "Agent Pod频繁重启"
description: "1小时内重启次数: {{ $value }}"
性能优化
1. 连接池管理
# utils/connection_pool.py
import asyncio
from contextlib import asynccontextmanager
from typing import AsyncGenerator
import aioredis
import asyncpg
class ConnectionPools:
"""全局连接池管理"""
def __init__(self):
self.redis_pool = None
self.db_pool = None
async def initialize(self):
"""初始化所有连接池"""
# Redis连接池
self.redis_pool = aioredis.ConnectionPool.from_url(
"redis://localhost",
max_connections=50,
min_connections=10
)
# PostgreSQL连接池
self.db_pool = await asyncpg.create_pool(
"postgresql://user:pass@localhost/db",
min_size=10,
max_size=50,
command_timeout=60
)
async def close(self):
"""关闭所有连接池"""
if self.redis_pool:
await self.redis_pool.disconnect()
if self.db_pool:
await self.db_pool.close()
# 全局实例
pools = ConnectionPools()
@asynccontextmanager
async def get_redis() -> AsyncGenerator[aioredis.Redis, None]:
"""获取Redis连接"""
redis = aioredis.Redis(connection_pool=pools.redis_pool)
try:
yield redis
finally:
await redis.close()
@asynccontextmanager
async def get_db() -> AsyncGenerator[asyncpg.Connection, None]:
"""获取数据库连接"""
async with pools.db_pool.acquire() as conn:
yield conn
2. 缓存策略
# utils/cache.py
import json
import hashlib
from functools import wraps
from typing import Optional, Any
import aioredis
class CacheManager:
"""多级缓存管理"""
def __init__(self, redis_client: aioredis.Redis):
self.redis = redis_client
self.local_cache = {} # L1: 本地内存
self.local_ttl = 60 # 本地缓存60秒
def _generate_key(self, prefix: str, *args, **kwargs) -> str:
"""生成缓存key"""
key_data = json.dumps({"args": args, "kwargs": kwargs}, sort_keys=True)
hash_val = hashlib.md5(key_data.encode()).hexdigest()
return f"{prefix}:{hash_val}"
async def get(self, key: str) -> Optional[Any]:
"""获取缓存"""
# L1: 本地缓存
if key in self.local_cache:
return self.local_cache[key]
# L2: Redis缓存
value = await self.redis.get(key)
if value:
data = json.loads(value)
self.local_cache[key] = data
return data
return None
async def set(self, key: str, value: Any, ttl: int = 3600):
"""设置缓存"""
# 更新本地缓存
self.local_cache[key] = value
# 更新Redis
await self.redis.setex(
key,
ttl,
json.dumps(value)
)
async def delete(self, key: str):
"""删除缓存"""
self.local_cache.pop(key, None)
await self.redis.delete(key)
def cached(prefix: str, ttl: int = 3600):
"""缓存装饰器"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# 生成key
cache_key = f"{prefix}:{func.__name__}"
# 尝试获取缓存
cache = kwargs.get('cache_manager')
if cache:
cached_value = await cache.get(cache_key)
if cached_value is not None:
return cached_value
# 执行函数
result = await func(*args, **kwargs)
# 写入缓存
if cache:
await cache.set(cache_key, result, ttl)
return result
return wrapper
return decorator
# 使用示例
class AgentService:
def __init__(self, cache_manager: CacheManager):
self.cache = cache_manager
@cached(prefix="llm_response", ttl=1800)
async def generate_response(self, query: str, **kwargs):
"""生成响应(带缓存)"""
# 实际的LLM调用
response = await self.call_llm(query)
return response
3. 流式响应优化
# utils/streaming.py
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from typing import AsyncGenerator
import asyncio
async def stream_llm_response(prompt: str, model: str = "gpt-4") -> AsyncGenerator[str, None]:
"""流式LLM响应"""
# 模拟流式响应
response_chunks = [
"这是",
"一个",
"流式",
"响应",
"示例。",
]
for chunk in response_chunks:
yield f"data: {chunk}\n\n"
await asyncio.sleep(0.1) # 模拟延迟
yield "data: [DONE]\n\n"
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
"""流式聊天接口"""
return StreamingResponse(
stream_llm_response(request.message),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
}
)
安全加固
1. API安全
# security/middleware.py
from fastapi import FastAPI, Request, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from slowapi import Limiter
from slowapi.util import get_remote_address
import jwt
import re
# 限流器
limiter = Limiter(key_func=get_remote_address)
# JWT验证
security = HTTPBearer()
async def verify_token(credentials: HTTPAuthorizationCredentials):
"""验证JWT Token"""
try:
payload = jwt.decode(
credentials.credentials,
SECRET_KEY,
algorithms=["HS256"]
)
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token已过期")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="无效的Token")
# 输入验证
class InputValidator:
"""输入验证器"""
# 危险字符模式
DANGEROUS_PATTERNS = [
r'<script[^>]*>.*?</script>',
r'javascript:',
r'on\w+\s*=',
r'\.\.\/',
r'\.\.\\',
]
@classmethod
def sanitize(cls, text: str) -> str:
"""清理输入"""
# 移除危险模式
for pattern in cls.DANGEROUS_PATTERNS:
text = re.sub(pattern, '', text, flags=re.IGNORECASE | re.DOTALL)
# 限制长度
max_length = 10000
if len(text) > max_length:
text = text[:max_length]
return text.strip()
@classmethod
def validate_prompt(cls, prompt: str) -> bool:
"""验证Prompt安全性"""
# 检查Prompt注入攻击
injection_keywords = [
"ignore previous instructions",
"disregard",
"system prompt",
"you are now",
]
prompt_lower = prompt.lower()
for keyword in injection_keywords:
if keyword in prompt_lower:
return False
return True
# 应用到FastAPI
app = FastAPI()
app.state.limiter = limiter
@app.middleware("http")
async def security_middleware(request: Request, call_next):
"""安全中间件"""
# 1. 输入清理
if request.method == "POST":
body = await request.body()
# 这里可以添加请求体验证
# 2. 检查User-Agent
user_agent = request.headers.get("user-agent", "")
if not user_agent or len(user_agent) < 10:
raise HTTPException(status_code=403, detail="无效的User-Agent")
response = await call_next(request)
# 3. 安全响应头
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
return response
@app.post("/chat")
@limiter.limit("10/minute")
async def chat(request: Request, chat_request: ChatRequest):
"""带限流的聊天接口"""
# 验证输入
sanitized_input = InputValidator.sanitize(chat_request.message)
if not InputValidator.validate_prompt(sanitized_input):
raise HTTPException(status_code=400, detail="检测到不安全的输入")
# 处理请求
response = await process_chat(sanitized_input)
return response
2. 数据安全
# security/encryption.py
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64
import os
class DataEncryption:
"""数据加密管理"""
def __init__(self, master_key: str = None):
self.master_key = master_key or os.environ.get("ENCRYPTION_KEY")
self.cipher = self._create_cipher()
def _create_cipher(self) -> Fernet:
"""创建加密器"""
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=os.urandom(16),
iterations=100000,
)
key = base64.urlsafe_b64encode(kdf.derive(self.master_key.encode()))
return Fernet(key)
def encrypt(self, data: str) -> str:
"""加密数据"""
return self.cipher.encrypt(data.encode()).decode()
def decrypt(self, encrypted_data: str) -> str:
"""解密数据"""
return self.cipher.decrypt(encrypted_data.encode()).decode()
def encrypt_sensitive_fields(self, data: dict, fields: list) -> dict:
"""加密敏感字段"""
result = data.copy()
for field in fields:
if field in result:
result[field] = self.encrypt(str(result[field]))
return result
# 使用示例
encryption = DataEncryption()
# 加密API密钥
encrypted_key = encryption.encrypt("sk-...")
# 加密用户敏感信息
user_data = {
"name": "张三",
"email": "zhangsan@example.com",
"phone": "13800138000"
}
encrypted_data = encryption.encrypt_sensitive_fields(
user_data,
["email", "phone"]
)
CI/CD 流程
1. GitHub Actions 工作流
# .github/workflows/deploy.yml
name: CI/CD Pipeline
on:
push:
branches: [ main, develop ]
pull_request:
branches: [ main ]
env:
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install -r requirements-dev.txt
- name: Run linting
run: |
flake8 src/
black --check src/
mypy src/
- name: Run tests
run: |
pytest tests/ --cov=src --cov-report=xml
- name: Upload coverage
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
build:
needs: test
runs-on: ubuntu-latest
permissions:
contents: read
packages: write
steps:
- uses: actions/checkout@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
- name: Log in to Container Registry
uses: docker/login-action@v2
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract metadata
id: meta
uses: docker/metadata-action@v4
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
tags: |
type=ref,event=branch
type=ref,event=pr
type=sha,prefix={{branch}}-
type=raw,value=latest,enable={{is_default_branch}}
- name: Build and push
uses: docker/build-push-action@v4
with:
context: .
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
cache-to: type=gha,mode=max
deploy-staging:
needs: build
if: github.ref == 'refs/heads/develop'
runs-on: ubuntu-latest
environment: staging
steps:
- uses: actions/checkout@v3
- name: Deploy to Staging
run: |
echo "${{ secrets.KUBECONFIG_STAGING }}" | base64 -d > kubeconfig
export KUBECONFIG=kubeconfig
# 更新镜像
kubectl set image deployment/agent-deployment \
agent=${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:develop-${{ github.sha }} \
-n agent-system
# 等待部署完成
kubectl rollout status deployment/agent-deployment -n agent-system
deploy-production:
needs: build
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
environment: production
steps:
- uses: actions/checkout@v3
- name: Deploy to Production
run: |
echo "${{ secrets.KUBECONFIG_PRODUCTION }}" | base64 -d > kubeconfig
export KUBECONFIG=kubeconfig
# 使用蓝绿部署
kubectl apply -f k8s/deployment-green.yaml
kubectl rollout status deployment/agent-deployment-green -n agent-system
# 切换服务
kubectl patch service agent-service -p '{"spec":{"selector":{"version":"green"}}}'
# 清理旧版本
kubectl delete deployment agent-deployment-blue -n agent-system || true
2. 部署脚本
#!/bin/bash
# scripts/deploy.sh
set -e
ENVIRONMENT=$1
VERSION=$2
if [ -z "$ENVIRONMENT" ] || [ -z "$VERSION" ]; then
echo "Usage: ./deploy.sh <environment> <version>"
echo "Example: ./deploy.sh production v1.2.3"
exit 1
fi
echo "🚀 Deploying version $VERSION to $ENVIRONMENT"
# 配置kubectl
if [ "$ENVIRONMENT" == "production" ]; then
export KUBECONFIG=~/.kube/config-prod
NAMESPACE="agent-prod"
else
export KUBECONFIG=~/.kube/config-staging
NAMESPACE="agent-staging"
fi
# 更新镜像
echo "📦 Updating deployment..."
kubectl set image deployment/agent-deployment \
agent=ghcr.io/your-org/agent:$VERSION \
-n $NAMESPACE
# 等待部署完成
echo "⏳ Waiting for rollout..."
kubectl rollout status deployment/agent-deployment -n $NAMESPACE --timeout=300s
# 验证部署
echo "✅ Verifying deployment..."
PODS=$(kubectl get pods -n $NAMESPACE -l app=agent -o jsonpath='{.items[*].status.phase}')
if [[ "$PODS" == *"Running"* ]]; then
echo "✅ Deployment successful!"
# 运行健康检查
kubectl run health-check --rm -i --restart=Never \
--image=curlimages/curl:latest \
-- curl -s http://agent-service.$NAMESPACE.svc.cluster.local/health
else
echo "❌ Deployment failed! Rolling back..."
kubectl rollout undo deployment/agent-deployment -n $NAMESPACE
exit 1
fi
echo "🎉 Deployment complete!"
总结
本文全面介绍了AI Agent生产部署的完整流程,核心要点包括:
- 架构设计:分层架构,包含网关、应用、数据、监控层
- 容器化:Docker多阶段构建,K8s编排,HPA自动扩缩容
- 可观测性:Prometheus指标,结构化日志,智能告警
- 性能优化:连接池、多级缓存、流式响应
- 安全加固:JWT认证、输入验证、数据加密
- CI/CD:自动化测试、镜像构建、蓝绿部署
生产部署 checklist:
- 容器镜像安全扫描
- 敏感信息使用Secret管理
- 配置健康检查和就绪检查
- 设置资源限制和请求
- 配置水平自动扩缩容
- 建立监控和告警体系
- 制定回滚策略
- 进行压力测试
相关资源
本文最后更新于 2024-02-25,如有问题欢迎在社区讨论。