RAG系统 高级 RAG 生产优化 系统架构 性能调优

RAG生产环境优化完全指南

AIEng Hub
阅读约 35 分钟

引言

将RAG系统从原型推向生产环境,面临着诸多挑战:

  • 性能瓶颈 - 高并发下的延迟和吞吐量问题
  • 成本控制 - LLM调用和向量存储的费用管理
  • 质量保证 - 回答准确性和一致性的持续监控
  • 系统稳定性 - 故障恢复和降级策略

本文将从架构设计、性能优化、监控运维等多个维度,分享RAG生产化的最佳实践。

生产级RAG架构设计

典型生产架构

┌─────────────────────────────────────────────────────────────────┐
│                         生产级RAG架构                            │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│   ┌─────────────┐                                               │
│   │   CDN/WAF   │                                               │
│   └──────┬──────┘                                               │
│          │                                                       │
│   ┌──────▼──────┐     ┌─────────────┐     ┌─────────────────┐  │
│   │  API Gateway │────►│  Rate Limiter │────►│  Load Balancer  │  │
│   └─────────────┘     └─────────────┘     └────────┬────────┘  │
│                                                    │             │
│   ┌────────────────────────────────────────────────┼─────────┐ │
│   │              RAG Service Cluster               │         │ │
│   │  ┌─────────┐  ┌─────────┐  ┌─────────┐        │         │ │
│   │  │ Service │  │ Service │  │ Service │◄───────┘         │ │
│   │  │   #1    │  │   #2    │  │   #N    │                  │ │
│   │  └────┬────┘  └────┬────┘  └────┬────┘                  │ │
│   │       └─────────────┼─────────────┘                      │ │
│   │                     │                                     │ │
│   │  ┌──────────────────┼──────────────────┐                 │ │
│   │  │                  ▼                  │                 │ │
│   │  │  ┌─────────┐  ┌─────────┐  ┌──────┴─┐               │ │
│   │  │  │  Query  │  │ Retrieve│  │ Generate│               │ │
│   │  │  │ Rewrite │──►│  + Rerank│──►│  Answer │               │ │
│   │  │  └─────────┘  └─────────┘  └────────┘               │ │
│   │  └─────────────────────────────────────┘                 │ │
│   └──────────────────────────────────────────────────────────┘ │
│                              │                                   │
│   ┌──────────────────────────┼──────────────────────────────┐  │
│   │                          ▼                              │  │
│   │  ┌─────────────┐  ┌─────────────┐  ┌─────────────────┐ │  │
│   │  │  Vector DB  │  │    Cache    │  │  LLM Provider   │ │  │
│   │  │  (Milvus/   │  │   (Redis)   │  │  (OpenAI/       │ │  │
│   │  │   Pinecone) │  │             │  │   Anthropic)    │ │  │
│   │  └─────────────┘  └─────────────┘  └─────────────────┘ │  │
│   └─────────────────────────────────────────────────────────┘  │
│                                                                  │
│   ┌─────────────────────────────────────────────────────────┐  │
│   │              Observability Stack                        │  │
│   │  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐    │  │
│   │  │Metrics  │  │ Logging │  │Tracing  │  │ Alerting│    │  │
│   │  │Prometheus│  │  ELK    │  │  Jaeger │  │ PagerDuty│   │  │
│   │  └─────────┘  └─────────┘  └─────────┘  └─────────┘    │  │
│   └─────────────────────────────────────────────────────────┘  │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

核心组件选型

组件推荐方案备选方案
API网关Kong / AWS API GatewayNginx / Traefik
向量数据库Milvus / PineconeWeaviate / pgvector
缓存Redis ClusterMemcached
消息队列Kafka / RabbitMQRedis Pub/Sub
监控Prometheus + GrafanaDatadog
日志ELK Stack / LokiSplunk

性能优化策略

1. 查询优化

查询改写(Query Rewriting)

from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate

query_rewrite_template = """
分析用户的查询,生成多个相关查询以提高检索召回率。
原始查询:{query}

请生成3个不同角度的相关查询:
1. 
2. 
3. 
"""

rewrite_chain = LLMChain(
    llm=llm,
    prompt=PromptTemplate.from_template(query_rewrite_template)
)

def rewrite_query(query):
    """生成多角度的查询变体"""
    result = rewrite_chain.invoke({"query": query})
    queries = [q.strip() for q in result["text"].split("\n") if q.strip()]
    return [query] + queries  # 包含原始查询

HyDE(假设文档嵌入)

hyde_template = """
根据用户查询,生成一个假设的理想回答文档。
这个文档应该包含回答该问题所需的关键信息。

查询:{query}

假设文档:
"""

def hyde_retrieval(query):
    # 生成假设文档
    hypothetical_doc = llm.predict(hyde_template.format(query=query))
    
    # 用假设文档做检索
    query_embedding = embeddings.embed_query(hypothetical_doc)
    results = vector_store.similarity_search_by_vector(query_embedding, k=5)
    
    return results

2. 检索优化

多级检索策略

class MultiStageRetriever:
    """多级检索:粗排 + 精排"""
    
    def __init__(self, vector_store, bm25_retriever):
        self.vector_store = vector_store
        self.bm25_retriever = bm25_retriever
    
    async def retrieve(self, query, top_k=5):
        # 第一阶段:多路召回(向量 + 关键词)
        vector_results = await self.vector_store.asimilarity_search(
            query, k=top_k*4
        )
        bm25_results = await self.bm25_retriever.aget_relevant_documents(
            query
        )
        
        # 融合结果
        fused_results = self.reciprocal_rank_fusion(
            [vector_results, bm25_results]
        )
        
        # 第二阶段:重排序
        reranked = await self.rerank(query, fused_results[:top_k*2])
        
        return reranked[:top_k]
    
    def reciprocal_rank_fusion(self, result_lists, k=60):
        """RRF融合算法"""
        scores = {}
        
        for results in result_lists:
            for rank, doc in enumerate(results):
                doc_id = doc.metadata.get("id")
                if doc_id not in scores:
                    scores[doc_id] = {"doc": doc, "score": 0}
                # RRF公式
                scores[doc_id]["score"] += 1 / (k + rank + 1)
        
        # 按分数排序
        sorted_results = sorted(
            scores.values(), 
            key=lambda x: x["score"], 
            reverse=True
        )
        
        return [r["doc"] for r in sorted_results]
    
    async def rerank(self, query, documents):
        """使用重排序模型"""
        pairs = [[query, doc.page_content] for doc in documents]
        scores = await reranker.predict(pairs)
        
        # 按重排序分数排序
        scored_docs = list(zip(documents, scores))
        scored_docs.sort(key=lambda x: x[1], reverse=True)
        
        return [doc for doc, _ in scored_docs]

向量索引优化

# Milvus索引配置优化
index_params = {
    "index_type": "IVF_FLAT",  # 或 HNSW
    "params": {
        "nlist": 4096,  # 聚类中心数,根据数据量调整
        "nprobe": 128   # 查询时搜索的聚类数
    }
}

# HNSW配置(高召回场景)
hnsw_params = {
    "index_type": "HNSW",
    "params": {
        "M": 16,        # 每个节点的最大连接数
        "efConstruction": 200,  # 构建时的搜索范围
        "ef": 64        # 查询时的搜索范围
    }
}

3. 生成优化

上下文压缩

from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor

# 使用LLM压缩上下文
compressor = LLMChainExtractor.from_llm(llm)

compression_retriever = ContextualCompressionRetriever(
    base_compressor=compressor,
    base_retriever=retriever
)

# 检索并自动压缩
compressed_docs = compression_retriever.get_relevant_documents(query)

流式生成

async def stream_answer(query, context):
    """流式返回答案,提升用户体验"""
    
    prompt = build_prompt(query, context)
    
    # 流式调用LLM
    async for chunk in llm.astream(prompt):
        yield chunk.content

4. 缓存策略

多级缓存架构

import hashlib
import json
from functools import wraps

class RAGCache:
    def __init__(self, redis_client):
        self.redis = redis_client
        self.local_cache = {}  # L1: 本地内存
        self.embedding_cache = {}
    
    def get_cache_key(self, query, params=None):
        """生成缓存键"""
        key_data = f"{query}:{json.dumps(params, sort_keys=True)}"
        return f"rag:query:{hashlib.md5(key_data.encode()).hexdigest()}"
    
    async def get(self, query, params=None):
        """多级缓存读取"""
        key = self.get_cache_key(query, params)
        
        # L1: 本地缓存
        if key in self.local_cache:
            return self.local_cache[key]
        
        # L2: Redis缓存
        cached = await self.redis.get(key)
        if cached:
            result = json.loads(cached)
            self.local_cache[key] = result  # 回填本地缓存
            return result
        
        return None
    
    async def set(self, query, result, params=None, ttl=3600):
        """写入缓存"""
        key = self.get_cache_key(query, params)
        
        # 写入Redis
        await self.redis.setex(
            key, 
            ttl, 
            json.dumps(result)
        )
        
        # 写入本地缓存(短TTL)
        self.local_cache[key] = result
    
    async def get_embedding(self, text):
        """Embedding结果缓存"""
        key = f"rag:emb:{hashlib.md5(text.encode()).hexdigest()}"
        
        cached = await self.redis.get(key)
        if cached:
            return json.loads(cached)
        
        return None
    
    async def set_embedding(self, text, embedding):
        """缓存Embedding结果"""
        key = f"rag:emb:{hashlib.md5(text.encode()).hexdigest()}"
        await self.redis.setex(
            key, 
            86400 * 7,  # 7天
            json.dumps(embedding)
        )

语义缓存

from sentence_transformers import SentenceTransformer
import numpy as np

class SemanticCache:
    """基于语义相似度的缓存"""
    
    def __init__(self, embeddings_model, similarity_threshold=0.95):
        self.embeddings = embeddings_model
        self.threshold = similarity_threshold
        self.cache = {}  # query_embedding -> result
    
    async def get(self, query):
        query_emb = self.embeddings.embed_query(query)
        
        # 查找相似查询
        for cached_query_emb, result in self.cache.items():
            similarity = cosine_similarity(query_emb, cached_query_emb)
            if similarity > self.threshold:
                return result
        
        return None
    
    async def set(self, query, result):
        query_emb = self.embeddings.embed_query(query)
        self.cache[tuple(query_emb)] = result

成本控制策略

1. 智能路由

class LLMRouter:
    """根据查询复杂度路由到不同的模型"""
    
    def __init__(self):
        self.models = {
            "simple": "gpt-3.5-turbo",      # 简单查询
            "standard": "gpt-4o-mini",      # 标准查询
            "complex": "gpt-4",              # 复杂查询
        }
        self.costs = {
            "gpt-3.5-turbo": 0.0015,
            "gpt-4o-mini": 0.0006,
            "gpt-4": 0.03,
        }
    
    async def classify_query(self, query):
        """分类查询复杂度"""
        prompt = f"""
        分析以下查询的复杂度,输出:simple/standard/complex
        
        标准:
        - simple:事实性查询,直接回答
        - standard:需要简单推理
        - complex:需要多步推理或综合分析
        
        查询:{query}
        
        复杂度:
        """
        
        response = await self.models["simple"].predict(prompt)
        return response.strip().lower()
    
    async def route(self, query, context):
        """智能路由"""
        complexity = await self.classify_query(query)
        model = self.models.get(complexity, "gpt-4o-mini")
        
        # 生成答案
        answer = await self.generate(model, query, context)
        
        return {
            "answer": answer,
            "model_used": model,
            "estimated_cost": self.estimate_cost(query, answer, model)
        }

2. Token优化

class TokenOptimizer:
    """优化Token使用"""
    
    def __init__(self, max_context_tokens=4000):
        self.max_tokens = max_context_tokens
        self.tokenizer = tiktoken.encoding_for_model("gpt-4")
    
    def count_tokens(self, text):
        return len(self.tokenizer.encode(text))
    
    def optimize_context(self, query, documents):
        """优化上下文长度"""
        query_tokens = self.count_tokens(query)
        available_tokens = self.max_tokens - query_tokens - 500  # 预留生成空间
        
        optimized_docs = []
        current_tokens = 0
        
        for doc in documents:
            doc_tokens = self.count_tokens(doc.page_content)
            
            if current_tokens + doc_tokens > available_tokens:
                # 截断文档
                remaining = available_tokens - current_tokens
                truncated = self.truncate_to_tokens(
                    doc.page_content, 
                    remaining
                )
                if truncated:
                    doc.page_content = truncated
                    optimized_docs.append(doc)
                break
            
            optimized_docs.append(doc)
            current_tokens += doc_tokens
        
        return optimized_docs
    
    def truncate_to_tokens(self, text, max_tokens):
        """按token截断文本"""
        tokens = self.tokenizer.encode(text)
        if len(tokens) <= max_tokens:
            return text
        
        truncated = self.tokenizer.decode(tokens[:max_tokens])
        return truncated

3. 批量处理

class BatchProcessor:
    """批量处理查询以降低成本"""
    
    def __init__(self, batch_size=10, max_wait_time=1.0):
        self.batch_size = batch_size
        self.max_wait_time = max_wait_time
        self.pending_queries = []
        self.results = {}
    
    async def submit(self, query):
        """提交查询到批处理队列"""
        query_id = str(uuid.uuid4())
        future = asyncio.Future()
        
        self.pending_queries.append({
            "id": query_id,
            "query": query,
            "future": future
        })
        
        # 触发批处理检查
        if len(self.pending_queries) >= self.batch_size:
            await self.process_batch()
        
        return await future
    
    async def process_batch(self):
        """处理批量查询"""
        if not self.pending_queries:
            return
        
        batch = self.pending_queries[:self.batch_size]
        self.pending_queries = self.pending_queries[self.batch_size:]
        
        # 批量检索
        contexts = await self.batch_retrieve([q["query"] for q in batch])
        
        # 批量生成(如果API支持)
        answers = await self.batch_generate(
            [q["query"] for q in batch], 
            contexts
        )
        
        # 分发结果
        for item, answer in zip(batch, answers):
            item["future"].set_result(answer)

高可用设计

1. 故障转移

class FailoverLLM:
    """LLM故障转移"""
    
    def __init__(self, providers):
        self.providers = providers
        self.current_index = 0
        self.circuit_breakers = {
            name: CircuitBreaker() for name in providers
        }
    
    async def generate(self, prompt):
        """带故障转移的生成"""
        for i in range(len(self.providers)):
            provider_name = list(self.providers.keys())[
                (self.current_index + i) % len(self.providers)
            ]
            provider = self.providers[provider_name]
            cb = self.circuit_breakers[provider_name]
            
            if not cb.can_execute():
                continue
            
            try:
                result = await provider.generate(prompt)
                cb.record_success()
                return result
            except Exception as e:
                cb.record_failure()
                logger.error(f"Provider {provider_name} failed: {e}")
                continue
        
        raise Exception("All providers failed")

class CircuitBreaker:
    """熔断器"""
    
    def __init__(self, failure_threshold=5, timeout=60):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failures = 0
        self.last_failure_time = None
        self.state = "CLOSED"  # CLOSED, OPEN, HALF_OPEN
    
    def can_execute(self):
        if self.state == "OPEN":
            if time.time() - self.last_failure_time > self.timeout:
                self.state = "HALF_OPEN"
                return True
            return False
        return True
    
    def record_success(self):
        self.failures = 0
        self.state = "CLOSED"
    
    def record_failure(self):
        self.failures += 1
        self.last_failure_time = time.time()
        
        if self.failures >= self.failure_threshold:
            self.state = "OPEN"

2. 降级策略

class DegradationStrategy:
    """服务降级策略"""
    
    def __init__(self, vector_store, simple_qa_model):
        self.vector_store = vector_store
        self.simple_qa = simple_qa_model
        self.degradation_level = 0  # 0-3
    
    async def answer(self, query):
        """根据降级级别提供服务"""
        
        if self.degradation_level == 0:
            # 正常模式:完整RAG流程
            return await self.full_rag(query)
        
        elif self.degradation_level == 1:
            # 轻度降级:跳过重排序
            return await self.skip_rerank(query)
        
        elif self.degradation_level == 2:
            # 中度降级:仅向量检索 + 轻量模型
            return await self.lightweight_rag(query)
        
        elif self.degradation_level == 3:
            # 重度降级:仅缓存或预定义回答
            return await self.cache_only(query)
    
    async def full_rag(self, query):
        """完整RAG流程"""
        docs = await self.retrieve(query)
        reranked = await self.rerank(query, docs)
        answer = await self.generate(query, reranked)
        return answer
    
    async def skip_rerank(self, query):
        """跳过重排序"""
        docs = await self.retrieve(query)
        answer = await self.generate(query, docs[:3])
        return answer
    
    async def lightweight_rag(self, query):
        """轻量级RAG"""
        docs = await self.vector_store.asimilarity_search(query, k=3)
        # 使用轻量模型
        answer = await self.simple_qa.generate(query, docs)
        return answer
    
    async def cache_only(self, query):
        """仅使用缓存"""
        # 尝试语义缓存
        cached = await self.semantic_cache.get(query)
        if cached:
            return cached
        
        return "系统繁忙,请稍后重试"

监控与可观测性

1. 关键指标定义

from prometheus_client import Counter, Histogram, Gauge

# 定义监控指标
rag_metrics = {
    # 查询指标
    "query_total": Counter("rag_query_total", "Total queries", ["status"]),
    "query_duration": Histogram("rag_query_duration", "Query duration"),
    
    # 检索指标
    "retrieval_latency": Histogram("rag_retrieval_latency", "Retrieval latency"),
    "retrieval_results": Gauge("rag_retrieval_results", "Number of retrieved docs"),
    
    # 生成指标
    "generation_latency": Histogram("rag_generation_latency", "Generation latency"),
    "tokens_used": Counter("rag_tokens_used", "Tokens used", ["model"]),
    
    # 质量指标
    "answer_relevance": Gauge("rag_answer_relevance", "Answer relevance score"),
    "context_precision": Gauge("rag_context_precision", "Context precision"),
    
    # 缓存指标
    "cache_hit": Counter("rag_cache_hit", "Cache hits", ["cache_type"]),
    "cache_miss": Counter("rag_cache_miss", "Cache misses", ["cache_type"]),
}

2. 追踪系统

from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

# 配置追踪
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)

jaeger_exporter = JaegerExporter(
    agent_host_name="localhost",
    agent_port=6831,
)

span_processor = BatchSpanProcessor(jaeger_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)

# 在RAG流程中添加追踪
async def rag_with_tracing(query):
    with tracer.start_as_current_span("rag_query") as span:
        span.set_attribute("query", query)
        
        # 检索阶段
        with tracer.start_span("retrieval"):
            docs = await retrieve(query)
            span.set_attribute("retrieved_docs", len(docs))
        
        # 生成阶段
        with tracer.start_span("generation"):
            answer = await generate(query, docs)
            span.set_attribute("answer_length", len(answer))
        
        return answer

3. 质量评估

class RAGEvaluator:
    """RAG质量评估器"""
    
    def __init__(self, llm):
        self.llm = llm
    
    async def evaluate_answer(self, query, answer, context):
        """评估回答质量"""
        
        eval_prompt = f"""
        评估以下RAG回答的质量,从1-5分打分。
        
        查询:{query}
        
        上下文:{context}
        
        回答:{answer}
        
        评估维度:
        1. 相关性(回答是否针对查询)
        2. 准确性(回答是否基于上下文)
        3. 完整性(是否涵盖查询要点)
        4. 简洁性(是否简洁明了)
        
        请输出JSON格式:
        {{
            "relevance": 分数,
            "accuracy": 分数,
            "completeness": 分数,
            "conciseness": 分数,
            "overall": 总分,
            "feedback": "改进建议"
        }}
        """
        
        result = await self.llm.predict(eval_prompt)
        return json.loads(result)
    
    async def evaluate_retrieval(self, query, retrieved_docs, ground_truth):
        """评估检索质量"""
        
        # 计算召回率
        retrieved_ids = {doc.metadata["id"] for doc in retrieved_docs}
        ground_truth_ids = set(ground_truth)
        
        recall = len(retrieved_ids & ground_truth_ids) / len(ground_truth_ids)
        
        # 计算精确率
        precision = len(retrieved_ids & ground_truth_ids) / len(retrieved_ids)
        
        # 计算MRR
        mrr = 0
        for i, doc in enumerate(retrieved_docs):
            if doc.metadata["id"] in ground_truth_ids:
                mrr = 1 / (i + 1)
                break
        
        return {
            "recall": recall,
            "precision": precision,
            "mrr": mrr,
            "f1": 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
        }

安全与合规

1. 输入过滤

import re

class InputSanitizer:
    """输入清洗"""
    
    def __init__(self):
        self.max_length = 1000
        self.blocked_patterns = [
            r"ignore previous instructions",
            r"system prompt",
            r"you are now",
        ]
    
    def sanitize(self, query):
        """清洗用户输入"""
        # 长度检查
        if len(query) > self.max_length:
            raise ValueError("Query too long")
        
        # 注入攻击检测
        for pattern in self.blocked_patterns:
            if re.search(pattern, query, re.IGNORECASE):
                raise ValueError("Potential prompt injection detected")
        
        # 敏感信息检测
        if self.contains_pii(query):
            raise ValueError("Query contains PII")
        
        return query.strip()
    
    def contains_pii(self, text):
        """检测PII信息"""
        patterns = {
            "email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
            "phone": r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b",
            "ssn": r"\b\d{3}-\d{2}-\d{4}\b",
        }
        
        for pattern in patterns.values():
            if re.search(pattern, text):
                return True
        return False

2. 输出审核

class OutputModerator:
    """输出生成审核"""
    
    def __init__(self, moderation_model):
        self.moderation = moderation_model
    
    async def moderate(self, answer):
        """审核生成内容"""
        
        # 调用审核API
        result = await self.moderation.check(answer)
        
        if result["flagged"]:
            # 记录并处理
            logger.warning(f"Content flagged: {result}")
            return self.get_safe_response()
        
        return answer
    
    def get_safe_response(self):
        return "抱歉,无法回答这个问题。"

3. 审计日志

import json
from datetime import datetime

class AuditLogger:
    """审计日志"""
    
    def __init__(self, storage):
        self.storage = storage
    
    async def log_query(self, user_id, query, context, answer, metadata):
        """记录完整查询日志"""
        
        log_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "user_id": hashlib.sha256(user_id.encode()).hexdigest(),  # 脱敏
            "query_hash": hashlib.sha256(query.encode()).hexdigest(),
            "context_sources": [doc.metadata.get("source") for doc in context],
            "answer_hash": hashlib.sha256(answer.encode()).hexdigest(),
            "model_used": metadata.get("model"),
            "tokens_used": metadata.get("tokens"),
            "latency_ms": metadata.get("latency"),
        }
        
        await self.storage.store(log_entry)

部署最佳实践

Docker Compose配置

version: '3.8'

services:
  rag-api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - REDIS_URL=redis://redis:6379
      - MILVUS_HOST=milvus
      - OPENAI_API_KEY=${OPENAI_API_KEY}
    depends_on:
      - redis
      - milvus
    deploy:
      replicas: 3
      resources:
        limits:
          cpus: '2'
          memory: 4G
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 10s
      retries: 3

  redis:
    image: redis:7-alpine
    volumes:
      - redis_data:/data
    command: redis-server --appendonly yes --maxmemory 2gb --maxmemory-policy allkeys-lru

  milvus:
    image: milvusdb/milvus:latest
    volumes:
      - milvus_data:/var/lib/milvus
    environment:
      - ETCD_ENDPOINTS=etcd:2379
      - MINIO_ADDRESS=minio:9000

  prometheus:
    image: prom/prometheus
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
      - prometheus_data:/prometheus

  grafana:
    image: grafana/grafana
    ports:
      - "3000:3000"
    volumes:
      - grafana_data:/var/lib/grafana

volumes:
  redis_data:
  milvus_data:
  prometheus_data:
  grafana_data:

Kubernetes配置

apiVersion: apps/v1
kind: Deployment
metadata:
  name: rag-service
spec:
  replicas: 3
  selector:
    matchLabels:
      app: rag-service
  template:
    metadata:
      labels:
        app: rag-service
    spec:
      containers:
      - name: rag-api
        image: rag-service:latest
        ports:
        - containerPort: 8000
        env:
        - name: WORKERS
          value: "4"
        resources:
          requests:
            memory: "2Gi"
            cpu: "1000m"
          limits:
            memory: "4Gi"
            cpu: "2000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5
        hpa:
          minReplicas: 3
          maxReplicas: 10
          targetCPUUtilizationPercentage: 70
---
apiVersion: v1
kind: Service
metadata:
  name: rag-service
spec:
  selector:
    app: rag-service
  ports:
  - port: 80
    targetPort: 8000
  type: LoadBalancer

总结

生产级RAG系统需要考虑:

维度关键要点
性能多级检索、智能缓存、异步处理
成本模型路由、Token优化、批量处理
可用性故障转移、降级策略、健康检查
可观测性全链路追踪、质量评估、实时监控
安全输入过滤、输出审核、审计日志

生产检查清单:

  • 实现多级检索策略
  • 配置智能缓存机制
  • 建立故障转移机制
  • 部署降级策略
  • 配置完整监控体系
  • 实现质量评估流程
  • 建立安全防护机制
  • 制定灾难恢复计划
  • 完成压力测试
  • 编写运维手册

本文最后更新于 2024-03-01,如有问题欢迎在社区讨论。