RAG系统 高级 RAG 监控 评估 质量保障

RAG系统监控与评估:确保生产质量的全面方案

AIEng Hub
阅读约 25 分钟

引言

RAG 系统上线后,如何持续评估和监控其质量是一个关键问题。不同于传统软件系统的”功能正确性”验证,RAG 系统的质量涉及检索准确率、生成忠实度、上下文相关性等多个维度。

RAG 质量评估体系

    ├── 离线评估(开发/测试阶段)
    │   ├── 检索质量
    │   ├── 生成质量
    │   └── 端到端质量

    ├── 在线监控(生产阶段)
    │   ├── 性能指标
    │   ├── 质量指标
    │   └── 业务指标

    └── 反馈闭环
        ├── 用户反馈收集
        ├── Bad Case 分析
        └── 系统持续优化

一、离线评估

1.1 检索质量评估

指标含义计算公式目标值
Recall@K前K个结果中包含正确答案的比例相关文档数 / K> 85%
MRR第一个正确答案的平均排名倒数1/rank_avg> 0.8
NDCG@K考虑排序位置的质量加权累计增益 / 理想增益> 0.85
Precision@K前K个结果中相关的比例相关文档数 / K> 70%
def evaluate_retrieval(
    retriever,
    test_queries: list[str],
    relevant_docs: list[list[str]],  # 每个查询的正确文档ID
    k_values: list[int] = [1, 3, 5, 10]
):
    """评估检索质量"""
    results = {k: {'recall': 0, 'precision': 0} for k in k_values}
    total = len(test_queries)

    for query, relevant in zip(test_queries, relevant_docs):
        retrieved = retriever.invoke(query)
        retrieved_ids = [doc.id for doc in retrieved]

        for k in k_values:
            top_k = retrieved_ids[:k]
            hits = len(set(top_k) & set(relevant))

            results[k]['recall'] += hits / len(relevant)
            results[k]['precision'] += hits / k

    # 平均
    for k in k_values:
        results[k]['recall'] /= total
        results[k]['precision'] /= total

    return results

1.2 生成质量评估

RAGAS 框架

from ragas import evaluate
from ragas.metrics import (
    faithfulness,
    answer_relevancy,
    context_precision,
    context_recall
)
from datasets import Dataset

# 准备评估数据
eval_data = Dataset.from_dict({
    "question": ["RAG系统的核心组件有哪些?"],
    "answer": ["RAG系统的核心组件包括检索器、生成器和索引..."],
    "contexts": [["检索器负责从向量库中搜索相关文档..."]],
    "ground_truth": ["RAG系统由检索器、生成器和索引组成"]
})

# 计算各项指标
result = evaluate(
    dataset=eval_data,
    metrics=[
        faithfulness,          # 生成内容是否忠实于检索结果
        answer_relevancy,       # 回答是否与问题相关
        context_precision,      # 检索的上下文是否精确
        context_recall,         # 检索的上下文是否完整
    ]
)

print(result)
指标含义目标值
Faithfulness(忠实度)回答内容是否在检索文档中有依据> 0.9
Answer Relevancy(回答相关性)回答是否切题> 0.8
Context Precision(上下文精确度)检索结果中无关内容的比例> 0.85
Context Recall(上下文召回率)检索结果是否覆盖了回答所需的信息> 0.8

1.3 人工评估

class HumanEvaluationDataset:
    """人工评估数据集管理"""
    def __init__(self):
        self.cases = []

    def add_case(self, query, answer, context, rating,
                 comment=""):
        self.cases.append({
            "query": query,
            "answer": answer,
            "context": context,
            "rating": rating,           # 1-5分
            "comment": comment,
            "timestamp": time.time()
        })

    def get_statistics(self):
        ratings = [c["rating"] for c in self.cases]
        return {
            "avg_rating": sum(ratings) / len(ratings),
            "rating_distribution": {
                "5": ratings.count(5),
                "4": ratings.count(4),
                "3": ratings.count(3),
                "2": ratings.count(2),
                "1": ratings.count(1),
            },
            "total_cases": len(self.cases)
        }

二、在线监控

2.1 核心监控指标

                               ┌──────────────────┐
                               │  RAG 监控仪表盘   │
                               ├──────────────────┤
  ┌──────────────────┐         │                  │         ┌──────────────────┐
  │   业务指标        │         │   ┌──────────┐   │         │   告警规则       │
  │ • QPS            │         │   │ 延迟 P50 │   │         │ • 延迟 > 5s     │
  │ • 日活用户       │─────────┼──▶│ 延迟 P99 │   ├─────────▶│ • 错误率 > 1%   │
  │ • 对话轮次       │         │   └──────────┘   │         │ • 准确性 < 80%  │
  └──────────────────┘         │   ┌──────────┐   │         └──────────────────┘
                               │   │ 缓存命中率 │   │
  ┌──────────────────┐         │   └──────────┘   │
  │   质量指标        │         │   ┌──────────┐   │
  │ • 用户正向反馈   │─────────┼──▶│ Token消耗  │   │
  │ • 用户负向反馈   │         │   └──────────┘   │
  │ • 回答采纳率     │         └──────────────────┘
  └──────────────────┘

2.2 Prometheus 指标暴露

from prometheus_client import (
    Counter, Histogram, Gauge, generate_latest
)
from fastapi import FastAPI, Response
import time

app = FastAPI()

# 定义指标
RETRIEVAL_LATENCY = Histogram(
    'rag_retrieval_latency_seconds',
    '检索延迟分布',
    buckets=[.01, .025, .05, .1, .25, .5, 1, 2.5]
)

GENERATION_LATENCY = Histogram(
    'rag_generation_latency_seconds',
    '生成延迟分布',
    buckets=[.5, 1, 2, 5, 10, 30]
)

QUERY_COUNTER = Counter(
    'rag_queries_total',
    '总查询次数',
    ['status']  # success/failure
)

CACHE_HIT_RATIO = Gauge(
    'rag_cache_hit_ratio',
    '缓存命中率'
)

@app.post("/rag/query")
async def rag_query(query: str):
    start = time.time()

    try:
        # 检索
        with RETRIEVAL_LATENCY.time():
            docs = await retrieve(query)

        # 生成
        with GENERATION_LATENCY.time():
            answer = await generate(query, docs)

        QUERY_COUNTER.labels(status='success').inc()
        return {"answer": answer}

    except Exception as e:
        QUERY_COUNTER.labels(status='failure').inc()
        raise

@app.get("/metrics")
async def metrics():
    return Response(
        generate_latest(),
        media_type="text/plain"
    )

2.3 用户反馈采集

class FeedbackCollector:
    """在线用户反馈收集"""
    def __init__(self, db_client):
        self.db = db_client

    def collect_feedback(
        self,
        query_id: str,
        rating: int,         # 1-5
        has_issue: bool,     # 是否点踩
        comment: str = ""
    ):
        feedback = {
            "query_id": query_id,
            "rating": rating,
            "has_issue": has_issue,
            "comment": comment,
            "timestamp": time.time()
        }
        self.db.insert("feedback", feedback)

        # 如果评分低,自动创建 Bad Case
        if rating <= 2 or has_issue:
            self.create_bad_case(query_id, feedback)

    def get_daily_quality_score(self) -> float:
        """计算每日质量分"""
        today_feedback = self.db.query(
            "feedback",
            f"date(timestamp) = CURRENT_DATE"
        )
        if not today_feedback:
            return 1.0

        avg_rating = sum(f['rating'] for f in today_feedback) / len(today_feedback)
        issue_ratio = sum(1 for f in today_feedback if f['has_issue']) / len(today_feedback)

        return avg_rating / 5 * (1 - issue_ratio * 0.5)

三、Bad Case 分析

3.1 Bad Case 自动检测

class BadCaseAnalyzer:
    """自动 Bad Case 检测和分析"""
    def analyze(self, query, answer, context, feedback=None):
        issues = []

        # 1. 空回答检测
        if not answer or len(answer) < 10:
            issues.append("empty_answer")

        # 2. 检索为空
        if not context:
            issues.append("no_context")

        # 3. 幻觉检测(基于 NLI 模型)
        if self.detect_hallucination(answer, context):
            issues.append("hallucination")

        # 4. 长度异常
        if len(answer) > 10000:
            issues.append("too_long")

        # 5. 重复检测
        if self.detect_repetition(answer):
            issues.append("repetition")

        return {
            "query": query,
            "issues": issues,
            "severity": "high" if len(issues) >= 2 else "medium",
            "suggested_action": self.suggest_fix(issues)
        }

    def detect_hallucination(
        self, answer: str, context: list[str]
    ) -> bool:
        """使用 NLI 模型检测幻觉"""
        from transformers import pipeline
        nli = pipeline(
            "text-classification",
            model="roberta-large-mnli"
        )

        # 检查回答中的每个句子是否在上下文中有依据
        sentences = answer.split('')
        for sent in sentences:
            if len(sent) < 10:
                continue
            result = nli(f"{context} [SEP] {sent}")
            if result[0]['label'] == 'CONTRADICTION':
                return True
        return False

3.2 数据飞轮

每日 Bad Case 分析


按根因分类

    ├── 检索问题 → 优化检索策略 → 重新索引
    ├── 生成问题 → 优化 Prompt → 调整参数
    └── 数据问题 → 清洗数据 → 补充知识


更新测试集 → 验证修复 → 部署上线

    └── 持续循环(数据飞轮)

四、健康检查

class RAGHealthCheck:
    """RAG 系统健康检查"""
    def __init__(self):
        self.checks = {
            "vector_db": self.check_vector_db,
            "embedding": self.check_embedding,
            "llm": self.check_llm,
            "cache": self.check_cache,
        }

    async def check_all(self) -> dict:
        results = {}
        for name, check_fn in self.checks.items():
            try:
                result = await check_fn()
                results[name] = {
                    "status": "healthy" if result else "unhealthy"
                }
            except Exception as e:
                results[name] = {
                    "status": "unhealthy",
                    "error": str(e)
                }
        return results

    async def check_vector_db(self):
        """检查向量数据库可用性"""
        test_vector = [0.1] * 768
        results = self.vector_store.similarity_search_by_vector(
            test_vector, k=1
        )
        return len(results) > 0

    async def check_embedding(self):
        """检查 Embedding 服务"""
        result = self.embed_model.embed_query("test")
        return len(result) > 0

    async def check_llm(self):
        """检查 LLM 是否可用"""
        response = self.llm.invoke("ping")
        return len(response.content) > 0

五、总结

评估类型频率工具/方法核心指标
离线检索评估每次模型更新自定义脚本Recall@K, MRR
离线生成评估每次模型更新RAGASFaithfulness, Relevancy
在线性能监控实时Prometheus + Grafana延迟, QPS, 错误率
在线质量监控每日用户反馈 + NLI评分, Bad Case 率
Bad Case 分析每周自动化 + 人工根因分布, 修复率

RAG 质量保障的核心:建立评估→监控→反馈→改进的数据飞轮。


相关资源: