引言
随着 RAG 系统的数据量和用户量增长,响应延迟和系统吞吐量成为核心挑战。本文将系统梳理从文档处理到回答生成全链路的性能优化技巧。
端到端延迟分解:
文档索引(离线) → 查询理解 → 检索 → 重排序 → 生成(在线)
│ │ │ │
▼ ▼ ▼ ▼
占比: 5% 30% 15% 50%
优化重点: 缓存 索引 模型部署 流式+KV
│ │ │ │
▼ ▼ ▼ ▼
预计收益: -50% -70% -30% -40%
一、文档索引优化
1.1 分块策略优化
from langchain.text_splitter import (
RecursiveCharacterTextSplitter
)
# 非优化版本
bad_splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=0 # 无重叠,检索可能遗漏边界内容
)
# 优化版本
good_splitter = RecursiveCharacterTextSplitter(
chunk_size=512,
chunk_overlap=128, # 20-25% 重叠
length_function=len,
separators=["\n\n", "\n", "。", ".", ""]
)
1.2 并行索引
from concurrent.futures import ThreadPoolExecutor
import numpy as np
class ParallelIndexer:
def __init__(self, embed_model, batch_size=64):
self.embed_model = embed_model
self.batch_size = batch_size
def index_documents(self, documents, num_workers=4):
# 分片处理
chunks = np.array_split(documents, num_workers)
with ThreadPoolExecutor(max_workers=num_workers) as executor:
futures = [
executor.submit(self._index_batch, chunk)
for chunk in chunks
]
for f in futures:
f.result() # 确保全部完成
def _index_batch(self, documents):
"""批量编码并写入"""
for i in range(0, len(documents), self.batch_size):
batch = documents[i:i + self.batch_size]
texts = [doc.text for doc in batch]
embeddings = self.embed_model.encode(texts)
self.vector_store.add_embeddings(
texts=texts,
embeddings=embeddings
)
二、检索加速
2.1 索引类型选择
| 索引类型 | 查询延迟(100万向量) | 内存占用 | 推荐场景 |
|---|---|---|---|
| 暴力搜索 | 500ms | 低 | 调试,< 1万向量 |
| IVF_FLAT | 10-50ms | 高 | 精度优先 |
| IVF_SQ8 | 5-20ms | 中 | 平衡方案 |
| HNSW | 3-10ms | 很高 | 速度优先 |
| DISKANN | 10-30ms | 极低 | 超大容量 |
# HNSW 参数调优
hnsw_params = {
"M": 16, # 8-64, 越大精度越高,内存越大
"efConstruction": 200, # 100-500, 构建质量
"efSearch": 64 # 搜索宽度,越大召回越高,延迟越慢
}
# M=16, efSearch=64 是推荐的平衡配置
2.2 向量维度压缩
# OpenAI 维度压缩
response = client.embeddings.create(
model="text-embedding-3-large",
input=text,
dimensions=256 # 从 3072 压缩到 256
)
# PCA 降维
from sklearn.decomposition import PCA
def reduce_dimensions(
embeddings: np.ndarray,
target_dim: int = 256
) -> np.ndarray:
pca = PCA(n_components=target_dim)
return pca.fit_transform(embeddings)
三、缓存策略
3.1 多级缓存
查询 ──→ L1: 本地内存缓存 ──→ L2: Redis缓存 ──→ 向量检索
(毫秒级) (毫秒级) (10-50ms)
命中率: 20% 命中率: 40% 未命中
class MultiLevelCache:
"""多级缓存"""
def __init__(self):
self.l1_cache = {} # 本地内存
self.l2_cache = Redis(host='localhost')
self.l1_ttl = 60 # 1分钟
self.l2_ttl = 3600 # 1小时
def get(self, key: str, compute_fn):
# L1: 内存缓存
if key in self.l1_cache:
return self.l1_cache[key]
# L2: Redis 缓存
cached = self.l2_cache.get(key)
if cached:
self.l1_cache[key] = cached
return cached
# 未命中:执行计算
result = compute_fn()
self.l1_cache[key] = result
self.l2_cache.setex(key, self.l2_ttl, result)
return result
3.2 语义缓存
基于语义相似度的缓存,相似的查询可以共享结果:
from sentence_transformers import SentenceTransformer
class SemanticCache:
"""语义缓存:相似查询命中相同结果"""
def __init__(self, threshold=0.95):
self.model = SentenceTransformer('all-MiniLM-L6-v2')
self.cache = []
self.threshold = threshold
def get(self, query: str):
query_emb = self.model.encode(query)
for cached_query, result in self.cache:
cached_emb = self.model.encode(cached_query)
similarity = cosine_similarity(
[query_emb], [cached_emb]
)[0][0]
if similarity > self.threshold:
return result
return None
def set(self, query: str, result):
self.cache.append((query, result))
四、LLM 生成优化
4.1 流式输出
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.post("/rag/stream")
async def rag_stream(query: str):
async def generate():
# 检索
docs = await retrieve_documents(query)
# 流式生成
stream = await llm.astream(
f"基于以下文档回答问题:\n{docs}\n问题:{query}"
)
async for chunk in stream:
yield chunk.choices[0].delta.content or ""
return StreamingResponse(generate(), media_type="text/plain")
4.2 上下文压缩
from langchain.retrievers.document_compressors import (
LLMChainExtractor
)
# 使用 LLM 提取最相关的内容片段
compressor = LLMChainExtractor.from_llm(llm)
compression_retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=retriever
)
compressed_docs = compression_retriever.invoke(query)
# 输出: ["Milvus 的 QPS 达到 10000..."] (原来: ["完整的文档段落"])
4.3 Prompt 模板优化
# ❌ 不优化:全量上下文
prompt_bad = f"""
基于以下文档回答问题:
{docs}
问题:{query}
"""
# ✅ 优化:结构化 + 摘要
def optimize_prompt(docs: list[str], query: str, max_tokens: int = 3000):
"""在 token 预算内选择最佳内容"""
# 1. 优先使用高分的文档
if get_total_tokens(docs) > max_tokens:
# 2. 需要截断时,用摘要替代完整文档
summaries = [summarize(doc) for doc in docs[:3]]
docs = summaries
prompt = f"""
【背景信息】
{docs}
【用户问题】
{query}
【要求】
1. 仅基于上述背景信息回答
2. 如背景信息不足,请明确说明
3. 保持回答简洁、准确
"""
return prompt
五、部署架构优化
5.1 微服务化
# docker-compose.prod.yml
services:
rag-api:
image: rag-api:latest
deploy:
replicas: 3 # 水平扩展
resources:
limits:
cpus: '2'
memory: 4G
vector-db:
image: milvus:latest
deploy:
mode: replicated
replicas: 5 # 向量库分布式
redis:
image: redis:alpine
deploy:
replicas: 3 # 缓存集群
5.2 连接池与请求合并
class RequestBatcher:
"""请求合并:多个查询合并为一次批量检索"""
def __init__(self, max_batch_size=32, max_wait_ms=50):
self.queue = []
self.max_batch_size = max_batch_size
self.max_wait_ms = max_wait_ms
async def submit(self, query: str):
future = asyncio.Future()
self.queue.append((query, future))
if len(self.queue) >= self.max_batch_size:
await self._process_batch()
else:
# 等待积累
asyncio.create_task(self._delayed_process())
return await future
async def _process_batch(self):
queries = [q for q, _ in self.queue]
futures = [f for _, f in self.queue]
# 批量检索(一次网络请求)
results = await self.retriever.abatch(queries)
for future, result in zip(futures, results):
future.set_result(result)
self.queue.clear()
六、性能指标与监控
| 指标 | P50 目标 | P99 目标 | P99.9 目标 |
|---|---|---|---|
| 检索延迟 | < 20ms | < 100ms | < 500ms |
| 生成首字节 | < 500ms | < 2s | < 5s |
| 总响应时间 | < 3s | < 8s | < 15s |
| 索引吞吐量 | > 1000 doc/s | — | — |
| 缓存命中率 | > 40% | > 60% | > 80% |
七、总结
| 优化环节 | 最大收益操作 | 预期延迟降低 |
|---|---|---|
| 索引 | 选择合适的索引类型(HNSW) | 70-90% |
| 检索 | 向量维度压缩 | 30-50% |
| 缓存 | 实现多级缓存 | 50-80%(命中时) |
| 生成 | 流式输出 + KV 缓存 | 首字节 50% |
| 部署 | 水平扩展 + 连接池 | 多倍吞吐量 |
性能优化的黄金法则:先测量,再优化。 使用 APM 工具(如 Grafana + Prometheus)建立基线,每次优化前后对比指标。
相关资源: