引言
将RAG系统从原型推向生产环境,面临着诸多挑战:
- 性能瓶颈 - 高并发下的延迟和吞吐量问题
- 成本控制 - LLM调用和向量存储的费用管理
- 质量保证 - 回答准确性和一致性的持续监控
- 系统稳定性 - 故障恢复和降级策略
本文将从架构设计、性能优化、监控运维等多个维度,分享RAG生产化的最佳实践。
生产级RAG架构设计
典型生产架构
┌─────────────────────────────────────────────────────────────────┐
│ 生产级RAG架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ │
│ │ CDN/WAF │ │
│ └──────┬──────┘ │
│ │ │
│ ┌──────▼──────┐ ┌─────────────┐ ┌─────────────────┐ │
│ │ API Gateway │────►│ Rate Limiter │────►│ Load Balancer │ │
│ └─────────────┘ └─────────────┘ └────────┬────────┘ │
│ │ │
│ ┌────────────────────────────────────────────────┼─────────┐ │
│ │ RAG Service Cluster │ │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │
│ │ │ Service │ │ Service │ │ Service │◄───────┘ │ │
│ │ │ #1 │ │ #2 │ │ #N │ │ │
│ │ └────┬────┘ └────┬────┘ └────┬────┘ │ │
│ │ └─────────────┼─────────────┘ │ │
│ │ │ │ │
│ │ ┌──────────────────┼──────────────────┐ │ │
│ │ │ ▼ │ │ │
│ │ │ ┌─────────┐ ┌─────────┐ ┌──────┴─┐ │ │
│ │ │ │ Query │ │ Retrieve│ │ Generate│ │ │
│ │ │ │ Rewrite │──►│ + Rerank│──►│ Answer │ │ │
│ │ │ └─────────┘ └─────────┘ └────────┘ │ │
│ │ └─────────────────────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌──────────────────────────┼──────────────────────────────┐ │
│ │ ▼ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────┐ │ │
│ │ │ Vector DB │ │ Cache │ │ LLM Provider │ │ │
│ │ │ (Milvus/ │ │ (Redis) │ │ (OpenAI/ │ │ │
│ │ │ Pinecone) │ │ │ │ Anthropic) │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Observability Stack │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │Metrics │ │ Logging │ │Tracing │ │ Alerting│ │ │
│ │ │Prometheus│ │ ELK │ │ Jaeger │ │ PagerDuty│ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
核心组件选型
| 组件 | 推荐方案 | 备选方案 |
|---|---|---|
| API网关 | Kong / AWS API Gateway | Nginx / Traefik |
| 向量数据库 | Milvus / Pinecone | Weaviate / pgvector |
| 缓存 | Redis Cluster | Memcached |
| 消息队列 | Kafka / RabbitMQ | Redis Pub/Sub |
| 监控 | Prometheus + Grafana | Datadog |
| 日志 | ELK Stack / Loki | Splunk |
性能优化策略
1. 查询优化
查询改写(Query Rewriting)
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
query_rewrite_template = """
分析用户的查询,生成多个相关查询以提高检索召回率。
原始查询:{query}
请生成3个不同角度的相关查询:
1.
2.
3.
"""
rewrite_chain = LLMChain(
llm=llm,
prompt=PromptTemplate.from_template(query_rewrite_template)
)
def rewrite_query(query):
"""生成多角度的查询变体"""
result = rewrite_chain.invoke({"query": query})
queries = [q.strip() for q in result["text"].split("\n") if q.strip()]
return [query] + queries # 包含原始查询
HyDE(假设文档嵌入)
hyde_template = """
根据用户查询,生成一个假设的理想回答文档。
这个文档应该包含回答该问题所需的关键信息。
查询:{query}
假设文档:
"""
def hyde_retrieval(query):
# 生成假设文档
hypothetical_doc = llm.predict(hyde_template.format(query=query))
# 用假设文档做检索
query_embedding = embeddings.embed_query(hypothetical_doc)
results = vector_store.similarity_search_by_vector(query_embedding, k=5)
return results
2. 检索优化
多级检索策略
class MultiStageRetriever:
"""多级检索:粗排 + 精排"""
def __init__(self, vector_store, bm25_retriever):
self.vector_store = vector_store
self.bm25_retriever = bm25_retriever
async def retrieve(self, query, top_k=5):
# 第一阶段:多路召回(向量 + 关键词)
vector_results = await self.vector_store.asimilarity_search(
query, k=top_k*4
)
bm25_results = await self.bm25_retriever.aget_relevant_documents(
query
)
# 融合结果
fused_results = self.reciprocal_rank_fusion(
[vector_results, bm25_results]
)
# 第二阶段:重排序
reranked = await self.rerank(query, fused_results[:top_k*2])
return reranked[:top_k]
def reciprocal_rank_fusion(self, result_lists, k=60):
"""RRF融合算法"""
scores = {}
for results in result_lists:
for rank, doc in enumerate(results):
doc_id = doc.metadata.get("id")
if doc_id not in scores:
scores[doc_id] = {"doc": doc, "score": 0}
# RRF公式
scores[doc_id]["score"] += 1 / (k + rank + 1)
# 按分数排序
sorted_results = sorted(
scores.values(),
key=lambda x: x["score"],
reverse=True
)
return [r["doc"] for r in sorted_results]
async def rerank(self, query, documents):
"""使用重排序模型"""
pairs = [[query, doc.page_content] for doc in documents]
scores = await reranker.predict(pairs)
# 按重排序分数排序
scored_docs = list(zip(documents, scores))
scored_docs.sort(key=lambda x: x[1], reverse=True)
return [doc for doc, _ in scored_docs]
向量索引优化
# Milvus索引配置优化
index_params = {
"index_type": "IVF_FLAT", # 或 HNSW
"params": {
"nlist": 4096, # 聚类中心数,根据数据量调整
"nprobe": 128 # 查询时搜索的聚类数
}
}
# HNSW配置(高召回场景)
hnsw_params = {
"index_type": "HNSW",
"params": {
"M": 16, # 每个节点的最大连接数
"efConstruction": 200, # 构建时的搜索范围
"ef": 64 # 查询时的搜索范围
}
}
3. 生成优化
上下文压缩
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor
# 使用LLM压缩上下文
compressor = LLMChainExtractor.from_llm(llm)
compression_retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=retriever
)
# 检索并自动压缩
compressed_docs = compression_retriever.get_relevant_documents(query)
流式生成
async def stream_answer(query, context):
"""流式返回答案,提升用户体验"""
prompt = build_prompt(query, context)
# 流式调用LLM
async for chunk in llm.astream(prompt):
yield chunk.content
4. 缓存策略
多级缓存架构
import hashlib
import json
from functools import wraps
class RAGCache:
def __init__(self, redis_client):
self.redis = redis_client
self.local_cache = {} # L1: 本地内存
self.embedding_cache = {}
def get_cache_key(self, query, params=None):
"""生成缓存键"""
key_data = f"{query}:{json.dumps(params, sort_keys=True)}"
return f"rag:query:{hashlib.md5(key_data.encode()).hexdigest()}"
async def get(self, query, params=None):
"""多级缓存读取"""
key = self.get_cache_key(query, params)
# L1: 本地缓存
if key in self.local_cache:
return self.local_cache[key]
# L2: Redis缓存
cached = await self.redis.get(key)
if cached:
result = json.loads(cached)
self.local_cache[key] = result # 回填本地缓存
return result
return None
async def set(self, query, result, params=None, ttl=3600):
"""写入缓存"""
key = self.get_cache_key(query, params)
# 写入Redis
await self.redis.setex(
key,
ttl,
json.dumps(result)
)
# 写入本地缓存(短TTL)
self.local_cache[key] = result
async def get_embedding(self, text):
"""Embedding结果缓存"""
key = f"rag:emb:{hashlib.md5(text.encode()).hexdigest()}"
cached = await self.redis.get(key)
if cached:
return json.loads(cached)
return None
async def set_embedding(self, text, embedding):
"""缓存Embedding结果"""
key = f"rag:emb:{hashlib.md5(text.encode()).hexdigest()}"
await self.redis.setex(
key,
86400 * 7, # 7天
json.dumps(embedding)
)
语义缓存
from sentence_transformers import SentenceTransformer
import numpy as np
class SemanticCache:
"""基于语义相似度的缓存"""
def __init__(self, embeddings_model, similarity_threshold=0.95):
self.embeddings = embeddings_model
self.threshold = similarity_threshold
self.cache = {} # query_embedding -> result
async def get(self, query):
query_emb = self.embeddings.embed_query(query)
# 查找相似查询
for cached_query_emb, result in self.cache.items():
similarity = cosine_similarity(query_emb, cached_query_emb)
if similarity > self.threshold:
return result
return None
async def set(self, query, result):
query_emb = self.embeddings.embed_query(query)
self.cache[tuple(query_emb)] = result
成本控制策略
1. 智能路由
class LLMRouter:
"""根据查询复杂度路由到不同的模型"""
def __init__(self):
self.models = {
"simple": "gpt-3.5-turbo", # 简单查询
"standard": "gpt-4o-mini", # 标准查询
"complex": "gpt-4", # 复杂查询
}
self.costs = {
"gpt-3.5-turbo": 0.0015,
"gpt-4o-mini": 0.0006,
"gpt-4": 0.03,
}
async def classify_query(self, query):
"""分类查询复杂度"""
prompt = f"""
分析以下查询的复杂度,输出:simple/standard/complex
标准:
- simple:事实性查询,直接回答
- standard:需要简单推理
- complex:需要多步推理或综合分析
查询:{query}
复杂度:
"""
response = await self.models["simple"].predict(prompt)
return response.strip().lower()
async def route(self, query, context):
"""智能路由"""
complexity = await self.classify_query(query)
model = self.models.get(complexity, "gpt-4o-mini")
# 生成答案
answer = await self.generate(model, query, context)
return {
"answer": answer,
"model_used": model,
"estimated_cost": self.estimate_cost(query, answer, model)
}
2. Token优化
class TokenOptimizer:
"""优化Token使用"""
def __init__(self, max_context_tokens=4000):
self.max_tokens = max_context_tokens
self.tokenizer = tiktoken.encoding_for_model("gpt-4")
def count_tokens(self, text):
return len(self.tokenizer.encode(text))
def optimize_context(self, query, documents):
"""优化上下文长度"""
query_tokens = self.count_tokens(query)
available_tokens = self.max_tokens - query_tokens - 500 # 预留生成空间
optimized_docs = []
current_tokens = 0
for doc in documents:
doc_tokens = self.count_tokens(doc.page_content)
if current_tokens + doc_tokens > available_tokens:
# 截断文档
remaining = available_tokens - current_tokens
truncated = self.truncate_to_tokens(
doc.page_content,
remaining
)
if truncated:
doc.page_content = truncated
optimized_docs.append(doc)
break
optimized_docs.append(doc)
current_tokens += doc_tokens
return optimized_docs
def truncate_to_tokens(self, text, max_tokens):
"""按token截断文本"""
tokens = self.tokenizer.encode(text)
if len(tokens) <= max_tokens:
return text
truncated = self.tokenizer.decode(tokens[:max_tokens])
return truncated
3. 批量处理
class BatchProcessor:
"""批量处理查询以降低成本"""
def __init__(self, batch_size=10, max_wait_time=1.0):
self.batch_size = batch_size
self.max_wait_time = max_wait_time
self.pending_queries = []
self.results = {}
async def submit(self, query):
"""提交查询到批处理队列"""
query_id = str(uuid.uuid4())
future = asyncio.Future()
self.pending_queries.append({
"id": query_id,
"query": query,
"future": future
})
# 触发批处理检查
if len(self.pending_queries) >= self.batch_size:
await self.process_batch()
return await future
async def process_batch(self):
"""处理批量查询"""
if not self.pending_queries:
return
batch = self.pending_queries[:self.batch_size]
self.pending_queries = self.pending_queries[self.batch_size:]
# 批量检索
contexts = await self.batch_retrieve([q["query"] for q in batch])
# 批量生成(如果API支持)
answers = await self.batch_generate(
[q["query"] for q in batch],
contexts
)
# 分发结果
for item, answer in zip(batch, answers):
item["future"].set_result(answer)
高可用设计
1. 故障转移
class FailoverLLM:
"""LLM故障转移"""
def __init__(self, providers):
self.providers = providers
self.current_index = 0
self.circuit_breakers = {
name: CircuitBreaker() for name in providers
}
async def generate(self, prompt):
"""带故障转移的生成"""
for i in range(len(self.providers)):
provider_name = list(self.providers.keys())[
(self.current_index + i) % len(self.providers)
]
provider = self.providers[provider_name]
cb = self.circuit_breakers[provider_name]
if not cb.can_execute():
continue
try:
result = await provider.generate(prompt)
cb.record_success()
return result
except Exception as e:
cb.record_failure()
logger.error(f"Provider {provider_name} failed: {e}")
continue
raise Exception("All providers failed")
class CircuitBreaker:
"""熔断器"""
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failures = 0
self.last_failure_time = None
self.state = "CLOSED" # CLOSED, OPEN, HALF_OPEN
def can_execute(self):
if self.state == "OPEN":
if time.time() - self.last_failure_time > self.timeout:
self.state = "HALF_OPEN"
return True
return False
return True
def record_success(self):
self.failures = 0
self.state = "CLOSED"
def record_failure(self):
self.failures += 1
self.last_failure_time = time.time()
if self.failures >= self.failure_threshold:
self.state = "OPEN"
2. 降级策略
class DegradationStrategy:
"""服务降级策略"""
def __init__(self, vector_store, simple_qa_model):
self.vector_store = vector_store
self.simple_qa = simple_qa_model
self.degradation_level = 0 # 0-3
async def answer(self, query):
"""根据降级级别提供服务"""
if self.degradation_level == 0:
# 正常模式:完整RAG流程
return await self.full_rag(query)
elif self.degradation_level == 1:
# 轻度降级:跳过重排序
return await self.skip_rerank(query)
elif self.degradation_level == 2:
# 中度降级:仅向量检索 + 轻量模型
return await self.lightweight_rag(query)
elif self.degradation_level == 3:
# 重度降级:仅缓存或预定义回答
return await self.cache_only(query)
async def full_rag(self, query):
"""完整RAG流程"""
docs = await self.retrieve(query)
reranked = await self.rerank(query, docs)
answer = await self.generate(query, reranked)
return answer
async def skip_rerank(self, query):
"""跳过重排序"""
docs = await self.retrieve(query)
answer = await self.generate(query, docs[:3])
return answer
async def lightweight_rag(self, query):
"""轻量级RAG"""
docs = await self.vector_store.asimilarity_search(query, k=3)
# 使用轻量模型
answer = await self.simple_qa.generate(query, docs)
return answer
async def cache_only(self, query):
"""仅使用缓存"""
# 尝试语义缓存
cached = await self.semantic_cache.get(query)
if cached:
return cached
return "系统繁忙,请稍后重试"
监控与可观测性
1. 关键指标定义
from prometheus_client import Counter, Histogram, Gauge
# 定义监控指标
rag_metrics = {
# 查询指标
"query_total": Counter("rag_query_total", "Total queries", ["status"]),
"query_duration": Histogram("rag_query_duration", "Query duration"),
# 检索指标
"retrieval_latency": Histogram("rag_retrieval_latency", "Retrieval latency"),
"retrieval_results": Gauge("rag_retrieval_results", "Number of retrieved docs"),
# 生成指标
"generation_latency": Histogram("rag_generation_latency", "Generation latency"),
"tokens_used": Counter("rag_tokens_used", "Tokens used", ["model"]),
# 质量指标
"answer_relevance": Gauge("rag_answer_relevance", "Answer relevance score"),
"context_precision": Gauge("rag_context_precision", "Context precision"),
# 缓存指标
"cache_hit": Counter("rag_cache_hit", "Cache hits", ["cache_type"]),
"cache_miss": Counter("rag_cache_miss", "Cache misses", ["cache_type"]),
}
2. 追踪系统
from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
# 配置追踪
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
jaeger_exporter = JaegerExporter(
agent_host_name="localhost",
agent_port=6831,
)
span_processor = BatchSpanProcessor(jaeger_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
# 在RAG流程中添加追踪
async def rag_with_tracing(query):
with tracer.start_as_current_span("rag_query") as span:
span.set_attribute("query", query)
# 检索阶段
with tracer.start_span("retrieval"):
docs = await retrieve(query)
span.set_attribute("retrieved_docs", len(docs))
# 生成阶段
with tracer.start_span("generation"):
answer = await generate(query, docs)
span.set_attribute("answer_length", len(answer))
return answer
3. 质量评估
class RAGEvaluator:
"""RAG质量评估器"""
def __init__(self, llm):
self.llm = llm
async def evaluate_answer(self, query, answer, context):
"""评估回答质量"""
eval_prompt = f"""
评估以下RAG回答的质量,从1-5分打分。
查询:{query}
上下文:{context}
回答:{answer}
评估维度:
1. 相关性(回答是否针对查询)
2. 准确性(回答是否基于上下文)
3. 完整性(是否涵盖查询要点)
4. 简洁性(是否简洁明了)
请输出JSON格式:
{{
"relevance": 分数,
"accuracy": 分数,
"completeness": 分数,
"conciseness": 分数,
"overall": 总分,
"feedback": "改进建议"
}}
"""
result = await self.llm.predict(eval_prompt)
return json.loads(result)
async def evaluate_retrieval(self, query, retrieved_docs, ground_truth):
"""评估检索质量"""
# 计算召回率
retrieved_ids = {doc.metadata["id"] for doc in retrieved_docs}
ground_truth_ids = set(ground_truth)
recall = len(retrieved_ids & ground_truth_ids) / len(ground_truth_ids)
# 计算精确率
precision = len(retrieved_ids & ground_truth_ids) / len(retrieved_ids)
# 计算MRR
mrr = 0
for i, doc in enumerate(retrieved_docs):
if doc.metadata["id"] in ground_truth_ids:
mrr = 1 / (i + 1)
break
return {
"recall": recall,
"precision": precision,
"mrr": mrr,
"f1": 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
}
安全与合规
1. 输入过滤
import re
class InputSanitizer:
"""输入清洗"""
def __init__(self):
self.max_length = 1000
self.blocked_patterns = [
r"ignore previous instructions",
r"system prompt",
r"you are now",
]
def sanitize(self, query):
"""清洗用户输入"""
# 长度检查
if len(query) > self.max_length:
raise ValueError("Query too long")
# 注入攻击检测
for pattern in self.blocked_patterns:
if re.search(pattern, query, re.IGNORECASE):
raise ValueError("Potential prompt injection detected")
# 敏感信息检测
if self.contains_pii(query):
raise ValueError("Query contains PII")
return query.strip()
def contains_pii(self, text):
"""检测PII信息"""
patterns = {
"email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
"phone": r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b",
"ssn": r"\b\d{3}-\d{2}-\d{4}\b",
}
for pattern in patterns.values():
if re.search(pattern, text):
return True
return False
2. 输出审核
class OutputModerator:
"""输出生成审核"""
def __init__(self, moderation_model):
self.moderation = moderation_model
async def moderate(self, answer):
"""审核生成内容"""
# 调用审核API
result = await self.moderation.check(answer)
if result["flagged"]:
# 记录并处理
logger.warning(f"Content flagged: {result}")
return self.get_safe_response()
return answer
def get_safe_response(self):
return "抱歉,无法回答这个问题。"
3. 审计日志
import json
from datetime import datetime
class AuditLogger:
"""审计日志"""
def __init__(self, storage):
self.storage = storage
async def log_query(self, user_id, query, context, answer, metadata):
"""记录完整查询日志"""
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"user_id": hashlib.sha256(user_id.encode()).hexdigest(), # 脱敏
"query_hash": hashlib.sha256(query.encode()).hexdigest(),
"context_sources": [doc.metadata.get("source") for doc in context],
"answer_hash": hashlib.sha256(answer.encode()).hexdigest(),
"model_used": metadata.get("model"),
"tokens_used": metadata.get("tokens"),
"latency_ms": metadata.get("latency"),
}
await self.storage.store(log_entry)
部署最佳实践
Docker Compose配置
version: '3.8'
services:
rag-api:
build: .
ports:
- "8000:8000"
environment:
- REDIS_URL=redis://redis:6379
- MILVUS_HOST=milvus
- OPENAI_API_KEY=${OPENAI_API_KEY}
depends_on:
- redis
- milvus
deploy:
replicas: 3
resources:
limits:
cpus: '2'
memory: 4G
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
command: redis-server --appendonly yes --maxmemory 2gb --maxmemory-policy allkeys-lru
milvus:
image: milvusdb/milvus:latest
volumes:
- milvus_data:/var/lib/milvus
environment:
- ETCD_ENDPOINTS=etcd:2379
- MINIO_ADDRESS=minio:9000
prometheus:
image: prom/prometheus
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus_data:/prometheus
grafana:
image: grafana/grafana
ports:
- "3000:3000"
volumes:
- grafana_data:/var/lib/grafana
volumes:
redis_data:
milvus_data:
prometheus_data:
grafana_data:
Kubernetes配置
apiVersion: apps/v1
kind: Deployment
metadata:
name: rag-service
spec:
replicas: 3
selector:
matchLabels:
app: rag-service
template:
metadata:
labels:
app: rag-service
spec:
containers:
- name: rag-api
image: rag-service:latest
ports:
- containerPort: 8000
env:
- name: WORKERS
value: "4"
resources:
requests:
memory: "2Gi"
cpu: "1000m"
limits:
memory: "4Gi"
cpu: "2000m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
hpa:
minReplicas: 3
maxReplicas: 10
targetCPUUtilizationPercentage: 70
---
apiVersion: v1
kind: Service
metadata:
name: rag-service
spec:
selector:
app: rag-service
ports:
- port: 80
targetPort: 8000
type: LoadBalancer
总结
生产级RAG系统需要考虑:
| 维度 | 关键要点 |
|---|---|
| 性能 | 多级检索、智能缓存、异步处理 |
| 成本 | 模型路由、Token优化、批量处理 |
| 可用性 | 故障转移、降级策略、健康检查 |
| 可观测性 | 全链路追踪、质量评估、实时监控 |
| 安全 | 输入过滤、输出审核、审计日志 |
生产检查清单:
- 实现多级检索策略
- 配置智能缓存机制
- 建立故障转移机制
- 部署降级策略
- 配置完整监控体系
- 实现质量评估流程
- 建立安全防护机制
- 制定灾难恢复计划
- 完成压力测试
- 编写运维手册
本文最后更新于 2024-03-01,如有问题欢迎在社区讨论。