引言
许多 RAG 项目从简单的”加载文档→检索→生成”开始,但随着数据量和用户数增长,架构必须不断演进。本文将梳理 RAG 系统的架构设计原则和从简单到复杂的演进路径。
阶段1 阶段2 阶段3
MVP 生产优化 企业级平台
│ │ │
▼ ▼ ▼
┌──────┐ ┌─────────┐ ┌───────────┐
│文档 │ │多数据源 │ │多语言 │
│索引 │ │增量更新 │ │权限控制 │
│检索 │ │缓存层 │ │多云部署 │
│生成 │ │监控告警 │ │审计日志 │
└──────┘ └─────────┘ └───────────┘
一、MVP 阶段架构
1.1 最小可行性架构
用户 ──→ Embedding模型 ──→ 向量数据库 ──→ LLM ──→ 回答
│ │ │
▼ ▼ ▼
OpenAI API Chroma/ GPT-4
Pinecone
1.2 技术栈选择
| 组件 | MVP 推荐 | 理由 |
|---|---|---|
| Embedding | OpenAI text-embedding-3-small | 即开即用,成本最低 |
| 向量数据库 | Chroma | 零配置嵌入运行 |
| 文档加载 | LangChain DirectoryLoader | 支持主流格式 |
| LLM | GPT-4 / Claude | 效果最好 |
| 框架 | LangChain / LlamaIndex | 快速集成 |
1.3 MVP 代码示例
from langchain_chroma import Chroma
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import DirectoryLoader
from langchain.chains import RetrievalQA
# 1. 加载文档
loader = DirectoryLoader("./docs", glob="**/*.md")
documents = loader.load()
# 2. 分块
splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
chunks = splitter.split_documents(documents)
# 3. 索引
vector_store = Chroma.from_documents(
documents=chunks,
embedding=OpenAIEmbeddings(),
persist_directory="./chroma_db"
)
# 4. 检索生成
qa = RetrievalQA.from_chain_type(
llm=ChatOpenAI(model="gpt-4", temperature=0),
retriever=vector_store.as_retriever()
)
# 5. 使用
response = qa.invoke({"query": "文档中提到了哪些关键概念?"})
二、生产优化阶段架构
2.1 架构升级
┌──────────────┐
│ API网关 │
└──────┬───────┘
│
┌────────────────────┼────────────────────┐
│ │ │
▼ ▼ ▼
┌──────────────┐ ┌────────────────┐ ┌──────────────┐
│ 查询优化 │ │ 文档处理服务 │ │ 权限控制 │
│ - 改写 │ │ - 异步管道 │ │ - 租户隔离 │
│ - 扩展 │ │ - 增量更新 │ │ - RBAC │
│ - 分解 │ │ - 格式转换 │ │ │
└──────┬───────┘ └───────┬────────┘ └──────────────┘
│ │
▼ ▼
┌──────────────┐ ┌────────────────┐
│ 缓存层 │ │ 向量数据库 │
│ Redis │ │ Pinecone/ │
│ LRU缓存 │ │ Milvus集群 │
└──────────────┘ └────────────────┘
│ │
└─────────┬─────────┘
▼
┌──────────────┐
│ 重排序 │
│ Cross- │
│ encoder │
└──────┬───────┘
│
▼
┌──────────────┐
│ LLM │
│ 流式输出 │
└──────────────┘
2.2 关键组件设计
# 1. 文档处理服务(异步管道)
from celery import Celery
app = Celery('doc_processor', broker='redis://localhost:6379')
@app.task
def process_document(doc_path: str):
"""异步处理文档"""
# 格式转换
text = convert_to_text(doc_path)
# 清洗
cleaned = clean_document(text)
# 分块
chunks = split_document(cleaned)
# 索引
index_documents(chunks)
# 更新元数据
update_search_metadata(doc_path, len(chunks))
# 2. 缓存层
from redis import Redis
import hashlib
class QueryCache:
def __init__(self):
self.redis = Redis(host='localhost', port=6379, db=0)
self.ttl = 3600 # 1小时过期
def get_or_compute(self, query: str, compute_fn):
cache_key = f"rag:query:{hashlib.md5(query.encode()).hexdigest()}"
cached = self.redis.get(cache_key)
if cached:
return cached
result = compute_fn(query)
self.redis.setex(cache_key, self.ttl, str(result))
return result
三、企业级平台架构
3.1 完整架构
┌──────────────────────────────────────────────────────────┐
│ 接入层 │
│ REST API │ WebSocket │ gRPC │ 消息队列(Kafka) │
└────────────────────────┬─────────────────────────────────┘
│
┌────────────────────────┴─────────────────────────────────┐
│ 服务层 │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────────┐ │
│ │ 查询服务 │ │ 索引服务 │ │ 管理服务 │ │ 分析服务 │ │
│ │ - 改写 │ │ - 增量 │ │ - 租户 │ │ - 指标 │ │
│ │ - 检索 │ │ - 批量 │ │ - 权限 │ │ - 日志 │ │
│ │ - 生成 │ │ - 调度 │ │ - 配额 │ │ - 审计 │ │
│ └──────────┘ └──────────┘ └──────────┘ └────────────┘ │
└────────────────────────┬─────────────────────────────────┘
│
┌────────────────────────┴─────────────────────────────────┐
│ 数据层 │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────────┐ │
│ │ 向量数据库│ │ 对象存储 │ │ 关系数据库 │ │ 消息队列 │ │
│ │ Milvus │ │ MinIO/S3│ │PostgreSQL│ │Kafka/Rabbit│ │
│ └──────────┘ └──────────┘ └──────────┘ └────────────┘ │
└──────────────────────────────────────────────────────────┘
3.2 核心设计原则
| 原则 | 说明 | 实现方式 |
|---|---|---|
| 无状态服务 | 查询服务无状态,可水平扩展 | Kubernetes HPA |
| 异步处理 | 文档索引等耗时操作异步执行 | Celery/Kafka 任务队列 |
| 缓存分层 | 多级缓存减少重复计算 | Redis + 本地 LRU |
| 熔断降级 | 避免级联故障 | 熔断器 + 限流 |
| 可观测性 | 全链路追踪 | OpenTelemetry + Grafana |
3.3 多租户隔离
class TenantAwareRAG:
"""多租户 RAG 服务"""
def __init__(self):
self.tenants = {} # tenant_id → (vector_store, index)
def get_tenant_store(self, tenant_id: str):
if tenant_id not in self.tenants:
# 每个租户独立的向量数据库集合
collection_name = f"rag_{tenant_id}"
self.tenants[tenant_id] = Chroma(
collection_name=collection_name,
embedding_function=OpenAIEmbeddings()
)
return self.tenants[tenant_id]
def query(self, tenant_id: str, query: str):
store = self.get_tenant_store(tenant_id)
# 检索只在当前租户的数据中进行
results = store.similarity_search(query)
return self.generate(query, results)
四、数据流设计
4.1 实时索引与批量索引
实时索引(小文档) 批量索引(大文档)
│ │
▼ ▼
API写入 定时任务触发
│ │
▼ ▼
解析文档 读取文件列表
│ │
▼ ▼
分块+Embedding 并行处理
│ │
▼ ▼
写入向量库 合并写入
│ │
▼ ▼
即时可检索 延迟可检索
4.2 增量更新策略
class IncrementalIndexer:
"""增量索引更新"""
def __init__(self, vector_store, metadata_db):
self.vector_store = vector_store
self.metadata_db = metadata_db # 记录文档版本
def add_or_update(self, doc_id: str, content: str):
# 检查是否需要更新
existing_version = self.metadata_db.get(doc_id)
if existing_version and self._is_unchanged(content, existing_version):
return # 无变更,跳过
# 删除旧版本
if existing_version:
self.vector_store.delete(ids=[doc_id])
# 创建新索引
chunks = self._chunk(content)
embeddings = self._embed(chunks)
self.vector_store.add_embeddings(
ids=[f"{doc_id}_{i}" for i in range(len(chunks))],
embeddings=embeddings,
metadatas=[{"doc_id": doc_id, "chunk": i} for i in range(len(chunks))]
)
# 更新版本
self.metadata_db[doc_id] = {
"version": time.time(),
"num_chunks": len(chunks)
}
五、架构决策要点
| 决策点 | 小规模(10万以下文档) | 中规模(100万以下) | 大规模(100万以上) |
|---|---|---|---|
| 向量库 | Chroma | Pinecone / Qdrant | Milvus 集群 |
| 部署 | 单机 | 多机 Docker | Kubernetes |
| Embedding | OpenAI API | 混合(API+本地) | 本地 GPU 集群 |
| 框架 | LangChain | 自定义服务 | 微服务架构 |
| 缓存 | 无 | Redis | Redis Cluster |
| 监控 | 无 | Prometheus + Grafana | 全链路追踪 |
六、总结
RAG 系统架构演进的三个阶段:
- MVP 阶段 — 用最少的代码验证价值,核心是快速上线
- 生产优化阶段 — 引入查询优化、缓存、异步处理,解决性能和可靠性
- 企业平台阶段 — 多租户、权限、可观测性、高可用,支持业务规模化
架构设计的核心原则:先简单后复杂,不要过度设计。
相关资源: