RAG系统 进阶 RAG Chroma 向量数据库 本地部署

Chroma实战:轻量级向量数据库的本地部署与使用

AIEng Hub
阅读约 25 分钟

引言

Chroma 是目前最流行的开源、轻量级向量数据库,专为 AI 应用原生设计。相比于 Pinecone 等云服务,Chroma 的优势在于:完全本地运行、零依赖部署、与 LangChain/LlamaIndex 深度集成。

┌──────────────────────────────────────┐
│           Chroma 架构                  │
├──────────────────────────────────────┤
│  ┌──────────┐  ┌──────────┐          │
│  │ Client   │  │  Server  │          │
│  │ (嵌入模式)│  │ (独立模式)│          │
│  └─────┬────┘  └────┬─────┘          │
│        │            │                 │
│        ▼            ▼                 │
│  ┌──────────────────────────┐        │
│  │       Storage Layer      │        │
│  │  (SQLite / DuckDB / S3)  │        │
│  └──────────────────────────┘        │
│        │                             │
│        ▼                             │
│  ┌──────────────────────────┐        │
│  │   Embedding Function     │        │
│  │   (默认all-MiniLM-L6-v2) │        │
│  └──────────────────────────┘        │
└──────────────────────────────────────┘

一、核心概念

概念说明类比
Collection(集合)向量的分组容器数据库中的表
Document(文档)原始文本内容表的行
Embedding(向量)文档的向量表示行的主键索引
Metadata(元数据)文档的附加属性行的其他列
Distance(距离)相似度度量方式排序方式

二、安装与配置

2.1 安装

pip install chromadb

Chroma 默认使用嵌入模式(Embedded Mode),零配置即可运行:

2.2 两种运行模式

模式命令适用场景
嵌入模式(默认)在应用进程中运行单机开发、测试
客户端-服务端模式chroma run + 客户端连接多应用共享、生产环境
# 嵌入模式(最简单)
import chromadb
client = chromadb.Client()

# 持久化模式(推荐)
client = chromadb.PersistentClient(
    path="./chroma_data"  # 数据持久化到磁盘
)

# 客户端-服务端模式
# 先启动服务: chroma run --path ./chroma_data --port 8000
client = chromadb.HttpClient(
    host="localhost",
    port=8000
)

三、数据操作

3.1 创建集合

# 创建集合
collection = client.create_collection(
    name="rag_documents",
    metadata={"hnsw:space": "cosine"}  # 默认是l2
)

# 获取已有集合
collection = client.get_collection("rag_documents")

# 删除集合
client.delete_collection("rag_documents")

# 列出所有集合
print(client.list_collections())

3.2 写入数据

# 批量添加文档
collection.add(
    documents=[
        "RAG(检索增强生成)是一种将检索与生成相结合的技术架构",
        "向量数据库是实现高效语义搜索的核心基础设施",
        "Chroma是一个开源的向量数据库,专为AI应用设计",
    ],
    metadatas=[
        {"source": "wiki", "category": "基础概念", "date": "2025-01-01"},
        {"source": "wiki", "category": "基础设施", "date": "2025-01-02"},
        {"source": "manual", "category": "工具", "date": "2025-01-03"},
    ],
    ids=["doc1", "doc2", "doc3"]
)

# 更新文档
collection.update(
    ids=["doc1"],
    documents=["RAG是检索增强生成的缩写..."],
    metadatas=[{"source": "wiki", "updated": True}]
)

3.3 指定 Embedding 函数

from chromadb.utils import embedding_functions

# OpenAI Embedding
openai_ef = embedding_functions.OpenAIEmbeddingFunction(
    api_key="sk-...",
    model_name="text-embedding-3-small"
)

# 本地 Sentence Transformers
sentence_ef = embedding_functions.SentenceTransformerEmbeddingFunction(
    model_name="BAAI/bge-large-zh-v1.5"
)

# 使用自定义 Embedding
collection = client.create_collection(
    name="production",
    embedding_function=sentence_ef
)

四、检索查询

4.1 基础查询

# 按文本查询
results = collection.query(
    query_texts=["什么是RAG系统?"],
    n_results=5,
    include=["documents", "metadatas", "distances"]
)

print(f"查询结果:{results['documents'][0]}")
print(f"距离:{results['distances'][0]}")

# 按向量查询
query_vector = get_embedding("RAG系统的核心架构")
results = collection.query(
    query_embeddings=[query_vector],
    n_results=5
)

4.2 元数据过滤

# $and 过滤
results = collection.query(
    query_texts=["RAG"],
    n_results=10,
    where={
        "$and": [
            {"category": {"$eq": "基础概念"}},
            {"date": {"$gte": "2025-01-01"}}
        ]
    }
)

# $or 过滤
results = collection.query(
    query_texts=["向量数据库"],
    n_results=10,
    where={
        "$or": [
            {"source": "wiki"},
            {"source": "manual"}
        ]
    }
)

# 支持的操作符: $eq, $ne, $gt, $gte, $lt, $lte, $in, $nin, $contains, $not_contains

4.3 分页查询

# Chroma 不支持传统分页,但可以结合 offset
def paginated_query(
    collection, query: str, page_size: int = 10, max_results: int = 100
):
    all_results = []
    for offset in range(0, max_results, page_size):
        results = collection.query(
            query_texts=[query],
            n_results=page_size,
            offset=offset
        )
        if not results['ids'][0]:
            break
        all_results.extend(results['documents'][0])
    return all_results

五、高级功能

5.1 批量操作与性能优化

import hashlib
from typing import List, Dict

class ChromaBatchProcessor:
    def __init__(self, collection, batch_size: int = 100):
        self.collection = collection
        self.batch_size = batch_size

    def upsert_batch(self, documents: List[Dict[str, str]]):
        """批量写入,自动生成ID"""
        total = len(documents)
        for i in range(0, total, self.batch_size):
            batch = documents[i:i + self.batch_size]
            ids = [
                hashlib.md5(doc['text'].encode()).hexdigest()[:16]
                for doc in batch
            ]
            self.collection.upsert(
                ids=ids,
                documents=[doc['text'] for doc in batch],
                metadatas=[doc.get('metadata', {}) for doc in batch]
            )

    def peek_content(self, limit: int = 5):
        """查看集合中的样本数据"""
        return self.collection.peek(limit)

5.2 查询重写与扩展

def hybrid_search(
    collection,
    query: str,
    queries_weight: float = 0.7,
    mmr_lambda: float = 0.5
):
    """混合搜索 + MMR 去重"""
    # 基础语义检索
    base_results = collection.query(
        query_texts=[query],
        n_results=20
    )

    # MMR 多样化重排
    from collections import Counter

    def mmr_rerank(results, lambda_param=mmr_lambda):
        selected = []
        candidates = list(zip(
            results['ids'][0],
            results['documents'][0],
            results['distances'][0],
            results['metadatas'][0]
        ))
        while candidates and len(selected) < 5:
            mmr_scores = []
            for cand_id, doc, dist, meta in candidates:
                # 相似度得分
                sim_score = 1 - dist
                # 多样化惩罚
                if selected:
                    max_sim_to_selected = max(
                        1 - abs(dist - float(s[2]))
                        for s in selected
                    )
                else:
                    max_sim_to_selected = 0
                mmr_score = (
                    lambda_param * sim_score
                    - (1 - lambda_param) * max_sim_to_selected
                )
                mmr_scores.append(mmr_score)
            best_idx = mmr_scores.index(max(mmr_scores))
            selected.append(candidates.pop(best_idx))

        return selected

    return mmr_rerank(base_results)

六、与 RAG 框架集成

6.1 LangChain 集成

from langchain_chroma import Chroma
from langchain_openai import OpenAIEmbeddings

# 创建 LangChain 包装的 Chroma
vector_store = Chroma(
    collection_name="rag_documents",
    embedding_function=OpenAIEmbeddings(),
    persist_directory="./chroma_langchain"
)

# 作为检索器使用
retriever = vector_store.as_retriever(
    search_type="similarity",
    search_kwargs={"k": 5}
)

# 与 LangChain RAG 链集成
from langchain.chains import RetrievalQA
from langchain_openai import ChatOpenAI

qa_chain = RetrievalQA.from_chain_type(
    llm=ChatOpenAI(model="gpt-4"),
    retriever=retriever,
    return_source_documents=True
)

6.2 LlamaIndex 集成

from llama_index.vector_stores.chroma import ChromaVectorStore
from llama_index.core import StorageContext, VectorStoreIndex
from llama_index.core import SimpleDirectoryReader

# 使用 Chroma 作为后端
chroma_client = chromadb.PersistentClient(path="./chroma_llama")
chroma_collection = chroma_client.get_or_create_collection(
    "documents"
)
vector_store = ChromaVectorStore(
    chroma_collection=chroma_collection
)
storage_context = StorageContext.from_defaults(
    vector_store=vector_store
)

# 创建索引
documents = SimpleDirectoryReader("./docs").load_data()
index = VectorStoreIndex.from_documents(
    documents,
    storage_context=storage_context
)

七、Chroma 的生产考量

考量点说明建议
数据持久化使用 PersistentClient生产环境必需
并发访问单进程写入,多进程读取服务端模式
备份策略备份 chroma_data 目录定时备份
容量规划单集合最大向量数< 100万向量推荐拆分集合
监控无内置监控自行实现健康检查

八、总结

Chroma 最适合的场景:

  • 本地开发和原型验证
  • 单机部署的中小规模应用(< 100万向量)
  • 与 LangChain/LlamaIndex 深度集成的项目
  • 数据隐私有高要求的场景
  • 快速迭代需要轻量级方案

当数据规模增长到百万级以上,或需要分布式部署时,可以考虑迁移到 Milvus 或 Qdrant。


相关资源: