RAG系统 高级 RAG 性能优化 延迟优化 缓存

RAG性能优化技巧:从查询到生成的全面加速指南

AIEng Hub
阅读约 25 分钟

引言

随着 RAG 系统的数据量和用户量增长,响应延迟和系统吞吐量成为核心挑战。本文将系统梳理从文档处理到回答生成全链路的性能优化技巧。

端到端延迟分解:
文档索引(离线) → 查询理解 → 检索 → 重排序 → 生成(在线)
                      │       │       │        │
                      ▼       ▼       ▼        ▼
占比:               5%      30%     15%      50%
优化重点:          缓存    索引   模型部署  流式+KV
                      │       │       │        │
                      ▼       ▼       ▼        ▼
预计收益:         -50%    -70%    -30%     -40%

一、文档索引优化

1.1 分块策略优化

from langchain.text_splitter import (
    RecursiveCharacterTextSplitter
)

# 非优化版本
bad_splitter = RecursiveCharacterTextSplitter(
    chunk_size=500,
    chunk_overlap=0  # 无重叠,检索可能遗漏边界内容
)

# 优化版本
good_splitter = RecursiveCharacterTextSplitter(
    chunk_size=512,
    chunk_overlap=128,  # 20-25% 重叠
    length_function=len,
    separators=["\n\n", "\n", "", ".", ""]
)

1.2 并行索引

from concurrent.futures import ThreadPoolExecutor
import numpy as np

class ParallelIndexer:
    def __init__(self, embed_model, batch_size=64):
        self.embed_model = embed_model
        self.batch_size = batch_size

    def index_documents(self, documents, num_workers=4):
        # 分片处理
        chunks = np.array_split(documents, num_workers)

        with ThreadPoolExecutor(max_workers=num_workers) as executor:
            futures = [
                executor.submit(self._index_batch, chunk)
                for chunk in chunks
            ]
            for f in futures:
                f.result()  # 确保全部完成

    def _index_batch(self, documents):
        """批量编码并写入"""
        for i in range(0, len(documents), self.batch_size):
            batch = documents[i:i + self.batch_size]
            texts = [doc.text for doc in batch]
            embeddings = self.embed_model.encode(texts)
            self.vector_store.add_embeddings(
                texts=texts,
                embeddings=embeddings
            )

二、检索加速

2.1 索引类型选择

索引类型查询延迟(100万向量)内存占用推荐场景
暴力搜索500ms调试,< 1万向量
IVF_FLAT10-50ms精度优先
IVF_SQ85-20ms平衡方案
HNSW3-10ms很高速度优先
DISKANN10-30ms极低超大容量
# HNSW 参数调优
hnsw_params = {
    "M": 16,           # 8-64, 越大精度越高,内存越大
    "efConstruction": 200,  # 100-500, 构建质量
    "efSearch": 64     # 搜索宽度,越大召回越高,延迟越慢
}
# M=16, efSearch=64 是推荐的平衡配置

2.2 向量维度压缩

# OpenAI 维度压缩
response = client.embeddings.create(
    model="text-embedding-3-large",
    input=text,
    dimensions=256  # 从 3072 压缩到 256
)

# PCA 降维
from sklearn.decomposition import PCA

def reduce_dimensions(
    embeddings: np.ndarray,
    target_dim: int = 256
) -> np.ndarray:
    pca = PCA(n_components=target_dim)
    return pca.fit_transform(embeddings)

三、缓存策略

3.1 多级缓存

查询 ──→ L1: 本地内存缓存 ──→ L2: Redis缓存 ──→ 向量检索
            (毫秒级)            (毫秒级)         (10-50ms)
            命中率: 20%          命中率: 40%      未命中
class MultiLevelCache:
    """多级缓存"""
    def __init__(self):
        self.l1_cache = {}  # 本地内存
        self.l2_cache = Redis(host='localhost')
        self.l1_ttl = 60      # 1分钟
        self.l2_ttl = 3600    # 1小时

    def get(self, key: str, compute_fn):
        # L1: 内存缓存
        if key in self.l1_cache:
            return self.l1_cache[key]

        # L2: Redis 缓存
        cached = self.l2_cache.get(key)
        if cached:
            self.l1_cache[key] = cached
            return cached

        # 未命中:执行计算
        result = compute_fn()
        self.l1_cache[key] = result
        self.l2_cache.setex(key, self.l2_ttl, result)
        return result

3.2 语义缓存

基于语义相似度的缓存,相似的查询可以共享结果:

from sentence_transformers import SentenceTransformer

class SemanticCache:
    """语义缓存:相似查询命中相同结果"""
    def __init__(self, threshold=0.95):
        self.model = SentenceTransformer('all-MiniLM-L6-v2')
        self.cache = []
        self.threshold = threshold

    def get(self, query: str):
        query_emb = self.model.encode(query)

        for cached_query, result in self.cache:
            cached_emb = self.model.encode(cached_query)
            similarity = cosine_similarity(
                [query_emb], [cached_emb]
            )[0][0]

            if similarity > self.threshold:
                return result
        return None

    def set(self, query: str, result):
        self.cache.append((query, result))

四、LLM 生成优化

4.1 流式输出

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.post("/rag/stream")
async def rag_stream(query: str):
    async def generate():
        # 检索
        docs = await retrieve_documents(query)

        # 流式生成
        stream = await llm.astream(
            f"基于以下文档回答问题:\n{docs}\n问题:{query}"
        )

        async for chunk in stream:
            yield chunk.choices[0].delta.content or ""

    return StreamingResponse(generate(), media_type="text/plain")

4.2 上下文压缩

from langchain.retrievers.document_compressors import (
    LLMChainExtractor
)

# 使用 LLM 提取最相关的内容片段
compressor = LLMChainExtractor.from_llm(llm)

compression_retriever = ContextualCompressionRetriever(
    base_compressor=compressor,
    base_retriever=retriever
)

compressed_docs = compression_retriever.invoke(query)
# 输出: ["Milvus 的 QPS 达到 10000..."] (原来: ["完整的文档段落"])

4.3 Prompt 模板优化

# ❌ 不优化:全量上下文
prompt_bad = f"""
基于以下文档回答问题:
{docs}
问题:{query}
"""

# ✅ 优化:结构化 + 摘要
def optimize_prompt(docs: list[str], query: str, max_tokens: int = 3000):
    """在 token 预算内选择最佳内容"""
    # 1. 优先使用高分的文档
    if get_total_tokens(docs) > max_tokens:
        # 2. 需要截断时,用摘要替代完整文档
        summaries = [summarize(doc) for doc in docs[:3]]
        docs = summaries

    prompt = f"""
【背景信息】
{docs}

【用户问题】
{query}

【要求】
1. 仅基于上述背景信息回答
2. 如背景信息不足,请明确说明
3. 保持回答简洁、准确
"""
    return prompt

五、部署架构优化

5.1 微服务化

# docker-compose.prod.yml
services:
  rag-api:
    image: rag-api:latest
    deploy:
      replicas: 3  # 水平扩展
      resources:
        limits:
          cpus: '2'
          memory: 4G

  vector-db:
    image: milvus:latest
    deploy:
      mode: replicated
      replicas: 5  # 向量库分布式

  redis:
    image: redis:alpine
    deploy:
      replicas: 3  # 缓存集群

5.2 连接池与请求合并

class RequestBatcher:
    """请求合并:多个查询合并为一次批量检索"""
    def __init__(self, max_batch_size=32, max_wait_ms=50):
        self.queue = []
        self.max_batch_size = max_batch_size
        self.max_wait_ms = max_wait_ms

    async def submit(self, query: str):
        future = asyncio.Future()
        self.queue.append((query, future))

        if len(self.queue) >= self.max_batch_size:
            await self._process_batch()
        else:
            # 等待积累
            asyncio.create_task(self._delayed_process())

        return await future

    async def _process_batch(self):
        queries = [q for q, _ in self.queue]
        futures = [f for _, f in self.queue]

        # 批量检索(一次网络请求)
        results = await self.retriever.abatch(queries)

        for future, result in zip(futures, results):
            future.set_result(result)

        self.queue.clear()

六、性能指标与监控

指标P50 目标P99 目标P99.9 目标
检索延迟< 20ms< 100ms< 500ms
生成首字节< 500ms< 2s< 5s
总响应时间< 3s< 8s< 15s
索引吞吐量> 1000 doc/s
缓存命中率> 40%> 60%> 80%

七、总结

优化环节最大收益操作预期延迟降低
索引选择合适的索引类型(HNSW)70-90%
检索向量维度压缩30-50%
缓存实现多级缓存50-80%(命中时)
生成流式输出 + KV 缓存首字节 50%
部署水平扩展 + 连接池多倍吞吐量

性能优化的黄金法则:先测量,再优化。 使用 APM 工具(如 Grafana + Prometheus)建立基线,每次优化前后对比指标。


相关资源: