RAG系统 高级 RAG 系统架构 生产部署 系统设计

RAG系统架构设计:从MVP到生产级的演进之路

AIEng Hub
阅读约 30 分钟

引言

许多 RAG 项目从简单的”加载文档→检索→生成”开始,但随着数据量和用户数增长,架构必须不断演进。本文将梳理 RAG 系统的架构设计原则和从简单到复杂的演进路径。

阶段1                  阶段2                  阶段3
  MVP              生产优化             企业级平台
  │                   │                   │
  ▼                   ▼                   ▼
┌──────┐          ┌─────────┐         ┌───────────┐
│文档  │          │多数据源  │         │多语言     │
│索引  │          │增量更新  │         │权限控制   │
│检索  │          │缓存层   │         │多云部署   │
│生成  │          │监控告警 │         │审计日志   │
└──────┘          └─────────┘         └───────────┘

一、MVP 阶段架构

1.1 最小可行性架构

用户 ──→ Embedding模型 ──→ 向量数据库 ──→ LLM ──→ 回答
           │                   │            │
           ▼                   ▼            ▼
      OpenAI API          Chroma/        GPT-4
                           Pinecone

1.2 技术栈选择

组件MVP 推荐理由
EmbeddingOpenAI text-embedding-3-small即开即用,成本最低
向量数据库Chroma零配置嵌入运行
文档加载LangChain DirectoryLoader支持主流格式
LLMGPT-4 / Claude效果最好
框架LangChain / LlamaIndex快速集成

1.3 MVP 代码示例

from langchain_chroma import Chroma
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import DirectoryLoader
from langchain.chains import RetrievalQA

# 1. 加载文档
loader = DirectoryLoader("./docs", glob="**/*.md")
documents = loader.load()

# 2. 分块
splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200
)
chunks = splitter.split_documents(documents)

# 3. 索引
vector_store = Chroma.from_documents(
    documents=chunks,
    embedding=OpenAIEmbeddings(),
    persist_directory="./chroma_db"
)

# 4. 检索生成
qa = RetrievalQA.from_chain_type(
    llm=ChatOpenAI(model="gpt-4", temperature=0),
    retriever=vector_store.as_retriever()
)

# 5. 使用
response = qa.invoke({"query": "文档中提到了哪些关键概念?"})

二、生产优化阶段架构

2.1 架构升级

                           ┌──────────────┐
                           │    API网关    │
                           └──────┬───────┘

             ┌────────────────────┼────────────────────┐
             │                    │                    │
             ▼                    ▼                    ▼
     ┌──────────────┐   ┌────────────────┐   ┌──────────────┐
     │  查询优化     │   │  文档处理服务   │   │  权限控制    │
     │  - 改写       │   │  - 异步管道    │   │  - 租户隔离  │
     │  - 扩展       │   │  - 增量更新   │   │  - RBAC     │
     │  - 分解       │   │  - 格式转换   │   │             │
     └──────┬───────┘   └───────┬────────┘   └──────────────┘
            │                   │
            ▼                   ▼
     ┌──────────────┐   ┌────────────────┐
     │   缓存层     │   │  向量数据库     │
     │  Redis       │   │  Pinecone/     │
     │  LRU缓存     │   │  Milvus集群    │
     └──────────────┘   └────────────────┘
            │                   │
            └─────────┬─────────┘

              ┌──────────────┐
              │   重排序     │
              │  Cross-      │
              │  encoder     │
              └──────┬───────┘


              ┌──────────────┐
              │   LLM        │
              │  流式输出    │
              └──────────────┘

2.2 关键组件设计

# 1. 文档处理服务(异步管道)
from celery import Celery

app = Celery('doc_processor', broker='redis://localhost:6379')

@app.task
def process_document(doc_path: str):
    """异步处理文档"""
    # 格式转换
    text = convert_to_text(doc_path)
    # 清洗
    cleaned = clean_document(text)
    # 分块
    chunks = split_document(cleaned)
    # 索引
    index_documents(chunks)
    # 更新元数据
    update_search_metadata(doc_path, len(chunks))

# 2. 缓存层
from redis import Redis
import hashlib

class QueryCache:
    def __init__(self):
        self.redis = Redis(host='localhost', port=6379, db=0)
        self.ttl = 3600  # 1小时过期

    def get_or_compute(self, query: str, compute_fn):
        cache_key = f"rag:query:{hashlib.md5(query.encode()).hexdigest()}"
        cached = self.redis.get(cache_key)
        if cached:
            return cached

        result = compute_fn(query)
        self.redis.setex(cache_key, self.ttl, str(result))
        return result

三、企业级平台架构

3.1 完整架构

┌──────────────────────────────────────────────────────────┐
│                     接入层                                 │
│   REST API │ WebSocket │ gRPC │ 消息队列(Kafka)          │
└────────────────────────┬─────────────────────────────────┘

┌────────────────────────┴─────────────────────────────────┐
│                     服务层                                 │
│                                                           │
│  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────────┐  │
│  │ 查询服务  │ │ 索引服务  │ │ 管理服务  │ │ 分析服务   │  │
│  │ - 改写   │ │ - 增量   │ │ - 租户   │ │ - 指标    │  │
│  │ - 检索   │ │ - 批量   │ │ - 权限   │ │ - 日志    │  │
│  │ - 生成   │ │ - 调度   │ │ - 配额   │ │ - 审计    │  │
│  └──────────┘ └──────────┘ └──────────┘ └────────────┘  │
└────────────────────────┬─────────────────────────────────┘

┌────────────────────────┴─────────────────────────────────┐
│                     数据层                                 │
│                                                           │
│  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────────┐  │
│  │ 向量数据库│ │ 对象存储 │ │ 关系数据库 │ │ 消息队列   │  │
│  │ Milvus   │ │ MinIO/S3│ │PostgreSQL│ │Kafka/Rabbit│  │
│  └──────────┘ └──────────┘ └──────────┘ └────────────┘  │
└──────────────────────────────────────────────────────────┘

3.2 核心设计原则

原则说明实现方式
无状态服务查询服务无状态,可水平扩展Kubernetes HPA
异步处理文档索引等耗时操作异步执行Celery/Kafka 任务队列
缓存分层多级缓存减少重复计算Redis + 本地 LRU
熔断降级避免级联故障熔断器 + 限流
可观测性全链路追踪OpenTelemetry + Grafana

3.3 多租户隔离

class TenantAwareRAG:
    """多租户 RAG 服务"""
    def __init__(self):
        self.tenants = {}  # tenant_id → (vector_store, index)

    def get_tenant_store(self, tenant_id: str):
        if tenant_id not in self.tenants:
            # 每个租户独立的向量数据库集合
            collection_name = f"rag_{tenant_id}"
            self.tenants[tenant_id] = Chroma(
                collection_name=collection_name,
                embedding_function=OpenAIEmbeddings()
            )
        return self.tenants[tenant_id]

    def query(self, tenant_id: str, query: str):
        store = self.get_tenant_store(tenant_id)
        # 检索只在当前租户的数据中进行
        results = store.similarity_search(query)
        return self.generate(query, results)

四、数据流设计

4.1 实时索引与批量索引

实时索引(小文档)        批量索引(大文档)
    │                         │
    ▼                         ▼
API写入                   定时任务触发
    │                         │
    ▼                         ▼
解析文档                   读取文件列表
    │                         │
    ▼                         ▼
分块+Embedding             并行处理
    │                         │
    ▼                         ▼
写入向量库                 合并写入
    │                         │
    ▼                         ▼
即时可检索                 延迟可检索

4.2 增量更新策略

class IncrementalIndexer:
    """增量索引更新"""
    def __init__(self, vector_store, metadata_db):
        self.vector_store = vector_store
        self.metadata_db = metadata_db  # 记录文档版本

    def add_or_update(self, doc_id: str, content: str):
        # 检查是否需要更新
        existing_version = self.metadata_db.get(doc_id)

        if existing_version and self._is_unchanged(content, existing_version):
            return  # 无变更,跳过

        # 删除旧版本
        if existing_version:
            self.vector_store.delete(ids=[doc_id])

        # 创建新索引
        chunks = self._chunk(content)
        embeddings = self._embed(chunks)
        self.vector_store.add_embeddings(
            ids=[f"{doc_id}_{i}" for i in range(len(chunks))],
            embeddings=embeddings,
            metadatas=[{"doc_id": doc_id, "chunk": i} for i in range(len(chunks))]
        )

        # 更新版本
        self.metadata_db[doc_id] = {
            "version": time.time(),
            "num_chunks": len(chunks)
        }

五、架构决策要点

决策点小规模(10万以下文档)中规模(100万以下)大规模(100万以上)
向量库ChromaPinecone / QdrantMilvus 集群
部署单机多机 DockerKubernetes
EmbeddingOpenAI API混合(API+本地)本地 GPU 集群
框架LangChain自定义服务微服务架构
缓存RedisRedis Cluster
监控Prometheus + Grafana全链路追踪

六、总结

RAG 系统架构演进的三个阶段:

  1. MVP 阶段 — 用最少的代码验证价值,核心是快速上线
  2. 生产优化阶段 — 引入查询优化、缓存、异步处理,解决性能和可靠性
  3. 企业平台阶段 — 多租户、权限、可观测性、高可用,支持业务规模化

架构设计的核心原则:先简单后复杂,不要过度设计。


相关资源: