LLM工程化 进阶 LLM 成本管理 监控 FinOps

LLM 成本监控与管理:从粗放式到精细化运营

AIEng Hub
阅读约 18 分钟

LLM 成本监控与管理:从粗放式到精细化运营

随着 LLM 应用的规模化落地,推理成本已成为 AI 项目的核心 KPI。本文将从监控体系搭建、成本归因分析到降本策略,帮助您建立完整的 LLM 成本管理体系。

一、LLM 成本构成分析

1.1 成本模型拆解

典型的 LLM 应用成本构成:

总成本 = 基础设施成本 + 模型推理成本 + 运维成本

基础设施成本 = GPU/TPU 租赁费用 + 网络费用 + 存储费用
模型推理成本 = Input Tokens × 单价 + Output Tokens × 单价
运维成本 = 人力成本 + 监控工具费用 + 容灾备份费用

1.2 不同部署模式的成本对比

部署模式适用场景成本特点单位成本
公有云 API快速验证/低频调用按量付费,无固定成本$0.002-0.06/1K tokens
私有云托管中等规模/数据敏感固定成本 + 弹性扩展$2-5/小时/GPU
自建集群大规模/高频调用高固定成本,低边际成本需详细 TCO 计算

1.3 Token 成本计算

以 GPT-4 和 Llama-2-70B 对比为例:

模型Input 单价Output 单价1M tokens 成本
GPT-4 Turbo$0.01/1K$0.03/1K$40
GPT-3.5 Turbo$0.0005/1K$0.0015/1K$2
Llama-2-70B (AWS)~$0.001/1K~$0.001/1K~$2
自建 Llama-2-70B$1.5/小时 (A100)-~$0.5 (高并发)

盈亏平衡点分析:

  • 月调用量 < 100M tokens:公有云 API 更经济
  • 月调用量 > 500M tokens:自建集群更经济

二、监控体系搭建

2.1 核心监控指标

# 核心指标定义
LLM_METRICS = {
    # 成本指标
    "llm_cost_total": "累计成本",
    "llm_cost_per_request": "单次请求成本",
    "llm_cost_per_1k_tokens": "每千 token 成本",
    "llm_cost_by_model": "分模型成本",
    "llm_cost_by_app": "分应用成本",
    
    # 用量指标
    "llm_tokens_input_total": "累计输入 tokens",
    "llm_tokens_output_total": "累计输出 tokens",
    "llm_requests_total": "累计请求数",
    "llm_tokens_per_request": "单次请求 token 数",
    
    # 性能指标(影响成本效率)
    "llm_latency_first_token": "首 token 延迟",
    "llm_latency_per_token": "每 token 延迟",
    "llm_throughput_tokens_per_sec": "吞吐量",
    "llm_cache_hit_rate": "缓存命中率",
}

2.2 Prometheus + Grafana 监控方案

# cost_metrics.py
from prometheus_client import Counter, Histogram, Gauge, CollectorRegistry
import time
from functools import wraps

class LLMCostMonitor:
    def __init__(self, registry=None):
        self.registry = registry or CollectorRegistry()
        
        # 成本计数器
        self.cost_by_model = Counter(
            'llm_cost_dollars_total',
            'Total LLM cost in USD',
            ['model', 'app_id', 'environment'],
            registry=self.registry
        )
        
        # Token 计数器
        self.tokens_input = Counter(
            'llm_tokens_input_total',
            'Total input tokens',
            ['model', 'app_id'],
            registry=self.registry
        )
        self.tokens_output = Counter(
            'llm_tokens_output_total',
            'Total output tokens',
            ['model', 'app_id'],
            registry=self.registry
        )
        
        # 延迟直方图
        self.latency_histogram = Histogram(
            'llm_request_duration_seconds',
            'Request latency',
            ['model', 'operation'],  # operation: prefill/decode
            buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0],
            registry=self.registry
        )
        
        # 缓存命中率
        self.cache_hits = Counter(
            'llm_cache_hits_total',
            'Cache hit count',
            ['cache_type'],
            registry=self.registry
        )
        self.cache_misses = Counter(
            'llm_cache_misses_total',
            'Cache miss count',
            ['cache_type'],
            registry=self.registry
        )
        
        # 模型价格配置(USD per 1K tokens)
        self.pricing = {
            "gpt-4": {"input": 0.03, "output": 0.06},
            "gpt-4-turbo": {"input": 0.01, "output": 0.03},
            "gpt-3.5-turbo": {"input": 0.0015, "output": 0.002},
            "claude-3-opus": {"input": 0.015, "output": 0.075},
            "claude-3-sonnet": {"input": 0.003, "output": 0.015},
            "llama-2-70b": {"input": 0.001, "output": 0.001},
        }
    
    def record_request(self, model, app_id, input_tokens, output_tokens, 
                       environment="production", latency=0):
        """记录单次请求的成本和指标"""
        
        # 记录 tokens
        self.tokens_input.labels(model=model, app_id=app_id).inc(input_tokens)
        self.tokens_output.labels(model=model, app_id=app_id).inc(output_tokens)
        
        # 计算成本
        if model in self.pricing:
            price = self.pricing[model]
            cost = (input_tokens * price["input"] + 
                   output_tokens * price["output"]) / 1000
            
            self.cost_by_model.labels(
                model=model, 
                app_id=app_id,
                environment=environment
            ).inc(cost)
        
        # 记录延迟
        self.latency_histogram.labels(model=model, operation="total").observe(latency)
    
    def record_cache_event(self, cache_type, hit=True):
        """记录缓存事件"""
        if hit:
            self.cache_hits.labels(cache_type=cache_type).inc()
        else:
            self.cache_misses.labels(cache_type=cache_type).inc()
    
    def get_cache_hit_rate(self, cache_type):
        """获取缓存命中率"""
        hits = self.cache_hits.labels(cache_type=cache_type)._value.get()
        misses = self.cache_misses.labels(cache_type=cache_type)._value.get()
        total = hits + misses
        return hits / total if total > 0 else 0

# 使用示例
monitor = LLMCostMonitor()

# 在 API 调用处埋点
def call_llm(model, prompt, app_id="default"):
    start = time.time()
    
    # 模拟 API 调用
    response = openai.ChatCompletion.create(
        model=model,
        messages=[{"role": "user", "content": prompt}]
    )
    
    latency = time.time() - start
    input_tokens = response.usage.prompt_tokens
    output_tokens = response.usage.completion_tokens
    
    # 记录监控数据
    monitor.record_request(
        model=model,
        app_id=app_id,
        input_tokens=input_tokens,
        output_tokens=output_tokens,
        latency=latency
    )
    
    return response

2.3 中间件集成

# FastAPI 中间件示例
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware

class LLMCostMiddleware(BaseHTTPMiddleware):
    def __init__(self, app, monitor: LLMCostMonitor):
        super().__init__(app)
        self.monitor = monitor
    
    async def dispatch(self, request: Request, call_next):
        # 提取请求信息
        model = request.headers.get("X-LLM-Model", "unknown")
        app_id = request.headers.get("X-App-ID", "default")
        
        start_time = time.time()
        response = await call_next(request)
        latency = time.time() - start_time
        
        # 从响应中提取 token 用量
        if hasattr(response, "usage"):
            self.monitor.record_request(
                model=model,
                app_id=app_id,
                input_tokens=response.usage.prompt_tokens,
                output_tokens=response.usage.completion_tokens,
                latency=latency
            )
        
        return response

app = FastAPI()
monitor = LLMCostMonitor()
app.add_middleware(LLMCostMiddleware, monitor=monitor)

三、成本归因分析

3.1 多维度成本分析

# cost_analytics.py
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, List

class CostAnalytics:
    def __init__(self, metrics_store):
        self.store = metrics_store
    
    def get_cost_by_dimension(self, start_time, end_time, dimension="model"):
        """按维度分析成本"""
        query = f"""
        SELECT {dimension}, 
               SUM(cost) as total_cost,
               SUM(input_tokens) as total_input,
               SUM(output_tokens) as total_output,
               COUNT(*) as request_count
        FROM llm_requests
        WHERE timestamp BETWEEN %s AND %s
        GROUP BY {dimension}
        ORDER BY total_cost DESC
        """
        return self.store.query(query, (start_time, end_time))
    
    def get_cost_trend(self, days=30, granularity="day"):
        """获取成本趋势"""
        query = f"""
        SELECT DATE_TRUNC('{granularity}', timestamp) as period,
               SUM(cost) as daily_cost,
               SUM(input_tokens + output_tokens) as daily_tokens
        FROM llm_requests
        WHERE timestamp > NOW() - INTERVAL '{days} days'
        GROUP BY period
        ORDER BY period
        """
        return self.store.query(query)
    
    def identify_cost_anomalies(self, threshold_percentile=95):
        """识别成本异常"""
        query = f"""
        WITH stats AS (
            SELECT app_id,
                   AVG(cost) as avg_cost,
                   PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY cost) as p95_cost
            FROM llm_requests
            WHERE timestamp > NOW() - INTERVAL '7 days'
            GROUP BY app_id
        )
        SELECT r.*, s.avg_cost, s.p95_cost
        FROM llm_requests r
        JOIN stats s ON r.app_id = s.app_id
        WHERE r.cost > s.p95_cost * 2
        AND r.timestamp > NOW() - INTERVAL '24 hours'
        """
        return self.store.query(query)
    
    def generate_cost_report(self, start_time, end_time):
        """生成成本报告"""
        data = {
            "period": f"{start_time} to {end_time}",
            "summary": self._get_summary(start_time, end_time),
            "by_model": self.get_cost_by_dimension(start_time, end_time, "model"),
            "by_app": self.get_cost_by_dimension(start_time, end_time, "app_id"),
            "trend": self.get_cost_trend(),
            "anomalies": self.identify_cost_anomalies(),
        }
        return data
    
    def _get_summary(self, start_time, end_time):
        """获取汇总数据"""
        query = """
        SELECT 
            SUM(cost) as total_cost,
            SUM(input_tokens) as total_input,
            SUM(output_tokens) as total_output,
            COUNT(*) as total_requests,
            AVG(cost) as avg_cost_per_request,
            AVG(input_tokens + output_tokens) as avg_tokens_per_request
        FROM llm_requests
        WHERE timestamp BETWEEN %s AND %s
        """
        result = self.store.query(query, (start_time, end_time))
        return result[0] if result else {}

# 生成周报
analytics = CostAnalytics(metrics_store)
report = analytics.generate_cost_report(
    start_time=datetime.now() - timedelta(days=7),
    end_time=datetime.now()
)

3.2 成本归因可视化

# visualization.py
import matplotlib.pyplot as plt
import seaborn as sns

def plot_cost_breakdown(analytics: CostAnalytics, start_time, end_time):
    """绘制成本分解图"""
    fig, axes = plt.subplots(2, 2, figsize=(15, 12))
    
    # 1. 按模型成本占比
    model_data = analytics.get_cost_by_dimension(start_time, end_time, "model")
    axes[0, 0].pie(
        model_data['total_cost'], 
        labels=model_data['model'],
        autopct='%1.1f%%'
    )
    axes[0, 0].set_title('Cost by Model')
    
    # 2. 按应用成本趋势
    trend_data = analytics.get_cost_trend(days=30)
    axes[0, 1].plot(trend_data['period'], trend_data['daily_cost'])
    axes[0, 1].set_title('Daily Cost Trend')
    axes[0, 1].tick_params(axis='x', rotation=45)
    
    # 3. 输入 vs 输出 token 成本
    app_data = analytics.get_cost_by_dimension(start_time, end_time, "app_id")
    x = range(len(app_data))
    width = 0.35
    axes[1, 0].bar([i - width/2 for i in x], app_data['total_input'], 
                   width, label='Input Tokens')
    axes[1, 0].bar([i + width/2 for i in x], app_data['total_output'], 
                   width, label='Output Tokens')
    axes[1, 0].set_title('Token Usage by App')
    axes[1, 0].set_xticks(x)
    axes[1, 0].set_xticklabels(app_data['app_id'], rotation=45)
    axes[1, 0].legend()
    
    # 4. 成本效率散点图
    axes[1, 1].scatter(
        app_data['request_count'], 
        app_data['total_cost'],
        s=app_data['total_output'] / 1000,  # 气泡大小表示输出 token
        alpha=0.6
    )
    axes[1, 1].set_xlabel('Request Count')
    axes[1, 1].set_ylabel('Total Cost ($)')
    axes[1, 1].set_title('Cost Efficiency by App')
    
    plt.tight_layout()
    return fig

四、预算管控与告警

4.1 预算管理系统

# budget_manager.py
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional
import json

@dataclass
class BudgetConfig:
    app_id: str
    daily_limit: float
    monthly_limit: float
    alert_thresholds: List[float]  # 如 [0.5, 0.8, 0.95]
    notification_channels: List[str]  # email, slack, webhook

class BudgetManager:
    def __init__(self, metrics_store, notification_service):
        self.store = metrics_store
        self.notifier = notification_service
        self.budgets: Dict[str, BudgetConfig] = {}
    
    def set_budget(self, config: BudgetConfig):
        """设置预算"""
        self.budgets[config.app_id] = config
    
    def check_budget(self, app_id: str):
        """检查预算使用情况"""
        if app_id not in self.budgets:
            return None
        
        config = self.budgets[app_id]
        
        # 查询今日成本
        today_cost = self._get_cost_since(
            app_id, 
            datetime.now().replace(hour=0, minute=0, second=0)
        )
        
        # 查询本月成本
        month_start = datetime.now().replace(day=1, hour=0, minute=0, second=0)
        month_cost = self._get_cost_since(app_id, month_start)
        
        status = {
            "app_id": app_id,
            "daily_usage": today_cost,
            "daily_limit": config.daily_limit,
            "daily_percent": today_cost / config.daily_limit if config.daily_limit > 0 else 0,
            "monthly_usage": month_cost,
            "monthly_limit": config.monthly_limit,
            "monthly_percent": month_cost / config.monthly_limit if config.monthly_limit > 0 else 0,
        }
        
        # 检查告警阈值
        self._check_alerts(config, status)
        
        return status
    
    def _get_cost_since(self, app_id: str, since: datetime) -> float:
        """获取自某时间起的累计成本"""
        query = """
        SELECT SUM(cost) as total
        FROM llm_requests
        WHERE app_id = %s AND timestamp > %s
        """
        result = self.store.query(query, (app_id, since))
        return result[0]['total'] if result and result[0]['total'] else 0.0
    
    def _check_alerts(self, config: BudgetConfig, status: dict):
        """检查并发送告警"""
        alerts = []
        
        for threshold in config.alert_thresholds:
            if status["daily_percent"] >= threshold:
                alerts.append({
                    "type": "daily_budget",
                    "threshold": threshold,
                    "current": status["daily_percent"],
                    "message": f"App {config.app_id} 已使用 {threshold*100:.0f}% 日预算"
                })
            
            if status["monthly_percent"] >= threshold:
                alerts.append({
                    "type": "monthly_budget",
                    "threshold": threshold,
                    "current": status["monthly_percent"],
                    "message": f"App {config.app_id} 已使用 {threshold*100:.0f}% 月预算"
                })
        
        # 去重并发送
        for alert in alerts:
            self.notifier.send(alert, channels=config.notification_channels)
    
    def should_throttle(self, app_id: str) -> bool:
        """判断是否需要限流"""
        status = self.check_budget(app_id)
        if not status:
            return False
        
        # 日预算超支 100% 或月预算超支 95% 时限流
        return (status["daily_percent"] >= 1.0 or 
                status["monthly_percent"] >= 0.95)

# 限流中间件
class BudgetThrottleMiddleware(BaseHTTPMiddleware):
    def __init__(self, app, budget_manager: BudgetManager):
        super().__init__(app)
        self.budget = budget_manager
    
    async def dispatch(self, request: Request, call_next):
        app_id = request.headers.get("X-App-ID", "default")
        
        if self.budget.should_throttle(app_id):
            return JSONResponse(
                status_code=429,
                content={"error": "Budget limit exceeded. Please contact admin."}
            )
        
        return await call_next(request)

4.2 告警配置示例

# budget-config.yaml
budgets:
  - app_id: "chatbot-production"
    daily_limit: 500.0    # $500/天
    monthly_limit: 10000.0  # $10000/月
    alert_thresholds: [0.5, 0.8, 0.95]
    notification_channels:
      - type: slack
        webhook: "https://hooks.slack.com/services/xxx"
      - type: email
        recipients: ["admin@company.com", "finance@company.com"]
  
  - app_id: "internal-tools"
    daily_limit: 100.0
    monthly_limit: 2000.0
    alert_thresholds: [0.8, 0.95]
    notification_channels:
      - type: webhook
        url: "https://internal.company.com/alerts"

五、降本增效策略

5.1 模型路由策略

# model_router.py
from typing import List, Dict
import random

class SmartModelRouter:
    """基于成本和质量的智能模型路由"""
    
    def __init__(self):
        self.models = {
            "simple": {
                "name": "gpt-3.5-turbo",
                "cost_input": 0.0015,
                "cost_output": 0.002,
                "quality_score": 70,
                "max_tokens": 4096,
            },
            "complex": {
                "name": "gpt-4-turbo",
                "cost_input": 0.01,
                "cost_output": 0.03,
                "quality_score": 95,
                "max_tokens": 128000,
            },
            "creative": {
                "name": "claude-3-sonnet",
                "cost_input": 0.003,
                "cost_output": 0.015,
                "quality_score": 85,
                "max_tokens": 200000,
            }
        }
        
        # 提示词复杂度分类器(简化版)
        self.complexity_keywords = {
            "high": ["分析", "推理", "证明", "详细解释", "比较", "评估"],
            "low": ["你好", "谢谢", "", "", "简单", "概述"]
        }
    
    def classify_complexity(self, prompt: str) -> str:
        """分类提示词复杂度"""
        prompt_lower = prompt.lower()
        
        high_score = sum(1 for kw in self.complexity_keywords["high"] if kw in prompt_lower)
        low_score = sum(1 for kw in self.complexity_keywords["low"] if kw in prompt_lower)
        
        if high_score > low_score:
            return "high"
        elif low_score > 0:
            return "low"
        return "medium"
    
    def route(self, prompt: str, context: Dict = None) -> str:
        """选择最优模型"""
        complexity = self.classify_complexity(prompt)
        
        # 根据复杂度路由
        if complexity == "low" and len(prompt) < 500:
            return self.models["simple"]["name"]
        elif complexity == "high" or (context and context.get("require_quality")):
            return self.models["complex"]["name"]
        
        # 默认使用中等成本模型
        return self.models["creative"]["name"]
    
    def estimate_cost(self, prompt: str, expected_output_tokens: int = 500) -> Dict:
        """预估不同模型的成本"""
        input_tokens = len(prompt) // 4  # 粗略估算
        
        estimates = {}
        for tier, config in self.models.items():
            cost = (input_tokens * config["cost_input"] + 
                   expected_output_tokens * config["cost_output"]) / 1000
            estimates[tier] = {
                "model": config["name"],
                "estimated_cost": cost,
                "quality_score": config["quality_score"]
            }
        
        return estimates

# 使用示例
router = SmartModelRouter()

prompt = "请详细分析这份财报数据并给出投资建议"
selected_model = router.route(prompt, context={"require_quality": True})
estimates = router.estimate_cost(prompt, expected_output_tokens=1000)

print(f"Selected model: {selected_model}")
print(f"Cost estimates: {estimates}")

5.2 缓存策略优化

# semantic_cache.py
import hashlib
import json
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np

class SemanticCache:
    """语义缓存:基于向量相似度的缓存"""
    
    def __init__(self, similarity_threshold=0.95):
        self.encoder = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
        self.cache = {}  # hash -> {embedding, response, metadata}
        self.similarity_threshold = similarity_threshold
        self.stats = {"hits": 0, "misses": 0, "savings": 0.0}
    
    def _get_embedding(self, text: str) -> np.ndarray:
        """获取文本向量"""
        return self.encoder.encode(text)
    
    def _compute_hash(self, text: str) -> str:
        """计算文本哈希"""
        return hashlib.md5(text.encode()).hexdigest()
    
    def get(self, query: str, model: str) -> Optional[Dict]:
        """从缓存获取"""
        query_embedding = self._get_embedding(query)
        
        # 查找相似缓存
        best_match = None
        best_similarity = 0
        
        for cached_hash, cached_data in self.cache.items():
            if cached_data["model"] != model:
                continue
            
            similarity = cosine_similarity(
                [query_embedding], 
                [cached_data["embedding"]]
            )[0][0]
            
            if similarity > best_similarity:
                best_similarity = similarity
                best_match = cached_data
        
        if best_similarity >= self.similarity_threshold:
            self.stats["hits"] += 1
            self.stats["savings"] += best_match.get("cost", 0)
            return {
                "response": best_match["response"],
                "similarity": best_similarity,
                "cached": True
            }
        
        self.stats["misses"] += 1
        return None
    
    def set(self, query: str, response: str, model: str, cost: float = 0):
        """存入缓存"""
        query_hash = self._compute_hash(query)
        
        self.cache[query_hash] = {
            "embedding": self._get_embedding(query),
            "response": response,
            "model": model,
            "cost": cost,
            "timestamp": datetime.now().isoformat()
        }
    
    def get_stats(self) -> Dict:
        """获取缓存统计"""
        total = self.stats["hits"] + self.stats["misses"]
        hit_rate = self.stats["hits"] / total if total > 0 else 0
        
        return {
            "hit_rate": hit_rate,
            "hits": self.stats["hits"],
            "misses": self.stats["misses"],
            "estimated_savings": self.stats["savings"],
            "cache_size": len(self.cache)
        }

# 集成到调用流程
class CachedLLMClient:
    def __init__(self, monitor: LLMCostMonitor):
        self.monitor = monitor
        self.cache = SemanticCache(similarity_threshold=0.93)
    
    def generate(self, prompt: str, model: str, app_id: str = "default") -> str:
        # 1. 检查缓存
        cached = self.cache.get(prompt, model)
        if cached:
            self.monitor.record_cache_event("semantic", hit=True)
            return cached["response"]
        
        self.monitor.record_cache_event("semantic", hit=False)
        
        # 2. 调用 API
        start = time.time()
        response = call_llm_api(model, prompt)
        latency = time.time() - start
        
        # 3. 记录成本
        input_tokens = response.usage.prompt_tokens
        output_tokens = response.usage.completion_tokens
        cost = self._calculate_cost(model, input_tokens, output_tokens)
        
        self.monitor.record_request(
            model=model,
            app_id=app_id,
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            latency=latency
        )
        
        # 4. 更新缓存
        self.cache.set(prompt, response.choices[0].message.content, model, cost)
        
        return response.choices[0].message.content

5.3 成本优化效果对比

实施优化策略前后的成本对比(月调用量 10M tokens):

优化措施成本降低实施难度效果持久性
模型路由30-40%
语义缓存20-30%
提示词压缩15-25%
量化部署50-70%
批量处理10-20%
综合优化60-80%--

六、成本报告与治理

6.1 自动化成本报告

# cost_reporting.py
from jinja2 import Template
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

class CostReporter:
    def __init__(self, analytics: CostAnalytics, budget_manager: BudgetManager):
        self.analytics = analytics
        self.budget = budget_manager
        
        self.email_template = Template("""
        <h2>LLM 成本周报 ({{ period }})</h2>
        
        <h3>总体概况</h3>
        <ul>
            <li>总成本: ${{ summary.total_cost | round(2) }}</li>
            <li>总请求数: {{ summary.total_requests }}</li>
            <li>平均单次成本: ${{ summary.avg_cost_per_request | round(4) }}</li>
            <li>总 Token 数: {{ summary.total_input + summary.total_output }}</li>
        </ul>
        
        <h3>Top 5 高成本应用</h3>
        <table border="1">
            <tr><th>应用</th><th>成本</th><th>占比</th></tr>
            {% for app in by_app[:5] %}
            <tr>
                <td>{{ app.app_id }}</td>
                <td>${{ app.total_cost | round(2) }}</td>
                <td>{{ (app.total_cost / summary.total_cost * 100) | round(1) }}%</td>
            </tr>
            {% endfor %}
        </table>
        
        <h3>预算使用情况</h3>
        {% for app_id, status in budget_status.items() %}
        <p><strong>{{ app_id }}</strong></p>
        <ul>
            <li>日预算: {{ (status.daily_percent * 100) | round(1) }}% (${{ status.daily_usage | round(2) }} / ${{ status.daily_limit }})</li>
            <li>月预算: {{ (status.monthly_percent * 100) | round(1) }}% (${{ status.monthly_usage | round(2) }} / ${{ status.monthly_limit }})</li>
        </ul>
        {% endfor %}
        """)
    
    def generate_weekly_report(self) -> str:
        """生成周报"""
        end_time = datetime.now()
        start_time = end_time - timedelta(days=7)
        
        report_data = self.analytics.generate_cost_report(start_time, end_time)
        
        # 获取预算状态
        budget_status = {}
        for app_id in [r["app_id"] for r in report_data["by_app"]]:
            status = self.budget.check_budget(app_id)
            if status:
                budget_status[app_id] = status
        
        html = self.email_template.render(
            period=f"{start_time.date()} to {end_time.date()}",
            **report_data,
            budget_status=budget_status
        )
        
        return html
    
    def send_email_report(self, recipients: List[str]):
        """发送邮件报告"""
        html = self.generate_weekly_report()
        
        msg = MIMEMultipart()
        msg['Subject'] = 'LLM 成本周报'
        msg['From'] = 'cost-report@company.com'
        msg['To'] = ', '.join(recipients)
        
        msg.attach(MIMEText(html, 'html'))
        
        # 发送邮件
        with smtplib.SMTP('smtp.company.com') as server:
            server.send_message(msg)

七、总结

LLM 成本管理是一个持续优化的过程,需要从监控、分析、管控到优化形成闭环:

  1. 监控体系:建立多维度的成本监控,实现实时可视化
  2. 归因分析:按模型、应用、用户等维度分析成本构成
  3. 预算管控:设置分层预算和告警,防止成本失控
  4. 降本策略
    • 智能模型路由,按需选择模型
    • 语义缓存减少重复调用
    • 量化部署降低单位成本
    • 批量处理提升资源利用率

建议实施路径:

  • 第 1 周:部署监控体系,获取成本数据
  • 第 2-3 周:分析成本构成,识别优化机会
  • 第 4 周:实施模型路由和缓存策略
  • 第 2 个月:评估效果,迭代优化

通过系统化的成本管理,通常可实现 60-80% 的成本降低,同时保持服务质量。


参考资源: