AI/MLObservabilityandMonitoring AI/MLObservabilityandMonitoring

这个技能提供了一套全面的指南,用于监控生产环境中的人工智能和机器学习系统,包括大型语言模型(LLMs)、检索-生成(RAG)应用和传统机器学习模型。它涵盖了监控堆栈的搭建、关键性能指标的跟踪、模型性能监控、数据漂移检测、日志记录、追踪技术、告警策略、仪表板配置、A/B测试监控、成本优化等多个方面,是确保AI系统可靠性和性能的关键工具。

机器学习 0 次安装 0 次浏览 更新于 3/5/2026

AI/ML 可观测性和监控

概览

全面的指南,用于监控生产中的 AI/ML 系统,包括 LLM、RAG 应用和传统 ML 模型。这个技能涵盖了监控堆栈设置(Prometheus、Grafana、Jaeger、Phoenix)、指标跟踪(延迟、吞吐量、令牌使用、成本、错误)、模型性能监控、数据漂移检测、LLM 交互日志记录、使用 LangSmith/Phoenix 进行追踪、告警策略、仪表板、A/B 测试监控、成本优化、调试模式和生产部署清单。

前提条件

  • 理解可观测性概念(指标、日志、追踪)
  • 熟悉监控工具(Prometheus、Grafana)
  • 了解 LLM API 及其定价模型
  • 理解统计概念以检测漂移
  • 熟悉 Docker 和容器编排
  • 了解 FastAPI 和中间件模式

核心概念

监控堆栈组件

  • Prometheus: 指标收集和存储
  • Grafana: 可视化和仪表板
  • Loki/Promtail: 日志聚合
  • Jaeger: 分布式追踪
  • Phoenix/Arize: AI 特定的可观测性
  • LangSmith: LangChain 追踪和调试

关键指标

  • 延迟: 请求/响应时间(P50、P95、P99)
  • 吞吐量: 每秒/分钟请求数
  • 令牌使用: 每个模型的输入/输出令牌
  • 成本: 每个模型和端点的 USD 成本
  • 错误率: 速率限制、超时、验证错误

模型性能指标

  • 分类: 准确率、F1、精确度、召回率
  • 生成: ROUGE 分数、BLEU、语义相似度
  • RAG: Precision@K、Recall@K、MRR、NDCG

漂移检测

  • 统计漂移: KS 测试、JS 散度、人口稳定性指数
  • 嵌入漂移: 中心点距离、相似性偏移
  • 概念漂移: 模型行为随时间的变化

追踪

  • OpenTelemetry: 标准追踪框架
  • LangSmith Tracer: LangChain 特定追踪
  • Phoenix Instrumentor: LangChain 的自动追踪

实施指南

监控堆栈

Docker Compose 设置

# docker-compose.yml for observability stack
version: '3.8'

services:
  # Metrics Collection
  prometheus:
    image: prom/prometheus:latest
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml

  # Visualization
  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
    volumes:
      - grafana-storage:/var/lib/grafana

  # Logs
  loki:
    image: grafana/loki:latest
    ports:
      - "3100:3100"

  promtail:
    image: grafana/promtail:latest
    volumes:
      - ./promtail.yml:/etc/promtail/promtail.yml
      - /var/log:/var/log:ro

  # Tracing
  jaeger:
    image: jaegertracing/all-in-one:latest
    ports:
      - "16686:16686"
      - "14268:14268"

  # AI-Specific Observability
  arize:
    image: arizephoenix/phoenix:latest
    ports:
      - "6006:6006"

volumes:
  grafana-storage:

LangSmith 设置

# langsmith_config.py
import os
from langsmith import Client

os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
os.environ["LANGCHAIN_PROJECT"] = "my-ai-app"

client = Client()

要跟踪的指标

延迟跟踪

# metrics_collector.py
from prometheus_client import Histogram, Counter, Gauge
import time

# Request latency histogram
request_latency = Histogram(
    'llm_request_latency_seconds',
    'LLM request latency',
    ['model', 'endpoint']
)

@request_latency.time()
def track_llm_request(model: str, endpoint: str):
    # Decorator for tracking latency
    pass

# Manual tracking
start_time = time.time()
response = call_llm()
duration = time.time() - start_time
request_latency.labels(model='gpt-4', endpoint='/chat').observe(duration)

吞吐量跟踪

# Requests per second
requests_total = Counter(
    'llm_requests_total',
    'Total LLM requests',
    ['model', 'status']
)

requests_per_minute = Gauge(
    'llm_requests_per_minute',
    'LLM requests per minute',
    ['model']
)

# Track throughput
def track_request(model: str, status: str):
    requests_total.labels(model=model, status=status).inc()

令牌使用跟踪

# Token usage tracking
token_usage = Counter(
    'llm_token_usage_total',
    'Total tokens used',
    ['model', 'type']  # type: input, output
)

token_cost = Counter(
    'llm_token_cost_usd',
    'Total cost in USD',
    ['model']
)

def track_tokens(model: str, input_tokens: int, output_tokens: int):
    token_usage.labels(model=model, type='input').inc(input_tokens)
    token_usage.labels(model=model, type='output').inc(output_tokens)

    # Calculate cost
    input_cost = input_tokens * INPUT_COST_PER_1K[model] / 1000
    output_cost = output_tokens * OUTPUT_COST_PER_1K[model] / 1000
    token_cost.labels(model=model).inc(input_cost + output_cost)

成本跟踪

# Cost tracking configuration
COST_PER_1K_TOKENS = {
    'gpt-4': {'input': 0.03, 'output': 0.06},
    'gpt-3.5-turbo': {'input': 0.0015, 'output': 0.002},
    'claude-3-opus': {'input': 0.015, 'output': 0.075},
}

def calculate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
    costs = COST_PER_1K_TOKENS.get(model, {'input': 0, 'output': 0})
    return (input_tokens * costs['input'] + output_tokens * costs['output']) / 1000

# Daily cost tracking
daily_cost = Gauge(
    'llm_daily_cost_usd',
    'Daily LLM cost',
    ['model', 'date']
)

错误率跟踪

# Error tracking
error_total = Counter(
    'llm_errors_total',
    'Total LLM errors',
    ['model', 'error_type']
)

error_rate = Gauge(
    'llm_error_rate',
    'LLM error rate',
    ['model']
)

ERROR_TYPES = [
    'rate_limit_exceeded',
    'invalid_request',
    'timeout',
    'content_filter',
    'model_not_found',
]

def track_error(model: str, error_type: str):
    if error_type in ERROR_TYPES:
        error_total.labels(model=model, error_type=error_type).inc()

模型性能监控

质量指标

# quality_metrics.py
from sklearn.metrics import accuracy_score, f1_score, precision_recall_fscore_support
from rouge_score import rouge_scorer
import numpy as np

class QualityMonitor:
    def __init__(self):
        self.rouge_scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True)

    def track_classification(self, y_true, y_pred):
        """Track classification metrics"""
        metrics = {
            'accuracy': accuracy_score(y_true, y_pred),
            'f1': f1_score(y_true, y_pred, average='weighted'),
            'precision_recall': precision_recall_fscore_support(y_true, y_pred)
        }
        return metrics

    def track_generation_quality(self, reference: str, generated: str):
        """Track text generation quality"""
        scores = self.rouge_scorer.score(reference, generated)
        return {
            'rouge1': scores['rouge1'].fmeasure,
            'rouge2': scores['rouge2'].fmeasure,
            'rougeL': scores['rougeL'].fmeasure
        }

    def track_rag_retrieval(self, relevant_docs: list, retrieved_docs: list, k: int = 10):
        """Track RAG retrieval quality"""
        # Precision@K
        precision = len(set(relevant_docs) & set(retrieved_docs[:k])) / k

        # Recall@K
        recall = len(set(relevant_docs) & set(retrieved_docs[:k])) / len(relevant_docs) if relevant_docs else 0

        # MRR (Mean Reciprocal Rank)
        mrr = 0
        for i, doc in enumerate(retrieved_docs[:k], 1):
            if doc in relevant_docs:
                mrr = 1 / i
                break

        return {'precision_at_k': precision, 'recall_at_k': recall, 'mrr': mrr}

反馈收集

# feedback_collector.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import json
from datetime import datetime

class Feedback(BaseModel):
    request_id: str
    user_id: str
    rating: int  # 1-5 stars
    comment: str = None
    helpful: bool = None

app = FastAPI()

@app.post("/feedback")
async def collect_feedback(feedback: Feedback):
    """Collect user feedback on AI responses"""
    feedback_data = {
        **feedback.dict(),
        'timestamp': datetime.utcnow().isoformat()
    }

    # Store feedback
    await store_feedback(feedback_data)

    # Update metrics
    update_feedback_metrics(feedback.rating, feedback.helpful)

    return {"status": "recorded"}

def update_feedback_metrics(rating: int, helpful: bool):
    """Update feedback metrics"""
    # Track average rating
    # Track helpful percentage
    pass

数据漂移检测

统计漂移检测

# drift_detection.py
import numpy as np
from scipy import stats
from scipy.spatial.distance import jensenshannon
import pandas as pd

class DriftDetector:
    def __init__(self, reference_data: pd.DataFrame):
        self.reference_data = reference_data
        self.reference_stats = self._calculate_stats(reference_data)

    def _calculate_stats(self, data: pd.DataFrame):
        """Calculate reference statistics"""
        stats_dict = {}
        for column in data.select_dtypes(include=[np.number]).columns:
            stats_dict[column] = {
                'mean': data[column].mean(),
                'std': data[column].std(),
                'min': data[column].min(),
                'max': data[column].max(),
                'percentiles': data[column].quantile([0.25, 0.5, 0.75]).to_dict()
            }
        return stats_dict

    def detect_drift(self, current_data: pd.DataFrame, threshold: float = 0.05):
        """Detect statistical drift"""
        drift_results = {}

        for column in current_data.select_dtypes(include=[np.number]).columns:
            if column in self.reference_stats:
                # Kolmogorov-Smirnov test
                ks_stat, p_value = stats.ks_2samp(
                    self.reference_data[column],
                    current_data[column]
                )

                drift_results[column] = {
                    'ks_statistic': ks_stat,
                    'p_value': p_value,
                    'drift_detected': p_value < threshold
                }

        return drift_results

    def detect_distribution_drift(self, current_data: pd.DataFrame):
        """Detect distribution drift using JS divergence"""
        drift_results = {}

        for column in current_data.select_dtypes(include=[np.number]).columns:
            # Create histograms
            ref_hist, ref_bins = np.histogram(self.reference_data[column], bins=20, density=True)
            curr_hist, _ = np.histogram(current_data[column], bins=ref_bins, density=True)

            # Calculate Jensen-Shannon divergence
            js_divergence = jensenshannon(ref_hist, curr_hist)

            drift_results[column] = {
                'js_divergence': js_divergence,
                'drift_detected': js_divergence > 0.1  # Threshold
            }

        return drift_results

嵌入漂移检测

# embedding_drift.py
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

class EmbeddingDriftDetector:
    def __init__(self, reference_embeddings: np.ndarray):
        self.reference_embeddings = reference_embeddings
        self.reference_centroid = np.mean(reference_embeddings, axis=0)

    def detect_drift(self, current_embeddings: np.ndarray, threshold: float = 0.1):
        """Detect drift in embedding space"""
        current_centroid = np.mean(current_embeddings, axis=0)

        # Calculate centroid distance
        centroid_distance = 1 - cosine_similarity(
            [self.reference_centroid],
            [current_centroid]
        )[0][0]

        # Calculate average pairwise similarity
        ref_similarities = []
        curr_similarities = []

        for i in range(min(100, len(self.reference_embeddings))):
            ref_sim = np.mean(cosine_similarity(
                [self.reference_embeddings[i]],
                self.reference_embeddings[:100]
            ))
            ref_similarities.append(ref_sim)

        for i in range(min(100, len(current_embeddings))):
            curr_sim = np.mean(cosine_similarity(
                [current_embeddings[i]],
                current_embeddings[:100]
            ))
            curr_similarities.append(curr_sim)

        similarity_shift = abs(np.mean(ref_similarities) - np.mean(curr_similarities))

        return {
            'centroid_distance': centroid_distance,
            'similarity_shift': similarity_shift,
            'drift_detected': centroid_distance > threshold or similarity_shift > threshold
        }

日志记录 LLM 交互

结构化日志记录

# llm_logger.py
import json
import uuid
from datetime import datetime
from typing import Dict, Any, Optional

class LLMLogger:
    def __init__(self, log_file: str = "llm_interactions.jsonl"):
        self.log_file = log_file

    def log_interaction(
        self,
        model: str,
        prompt: str,
        response: str,
        input_tokens: int,
        output_tokens: int,
        latency: float,
        metadata: Optional[Dict[str, Any]] = None
    ):
        """Log LLM interaction"""
        interaction = {
            "id": str(uuid.uuid4()),
            "timestamp": datetime.utcnow().isoformat(),
            "model": model,
            "prompt": prompt,
            "response": response,
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "total_tokens": input_tokens + output_tokens,
            "latency_seconds": latency,
            "metadata": metadata or {}
        }

        with open(self.log_file, 'a') as f:
            f.write(json.dumps(interaction) + '
')

        return interaction["id"]

    def log_rag_interaction(
        self,
        query: str,
        retrieved_docs: list,
        response: str,
        retrieval_scores: list,
        metadata: Optional[Dict[str, Any]] = None
    ):
        """Log RAG interaction"""
        interaction = {
            "id": str(uuid.uuid4()),
            "timestamp": datetime.utcnow().isoformat(),
            "type": "rag",
            "query": query,
            "retrieved_docs": retrieved_docs,
            "retrieval_scores": retrieval_scores,
            "response": response,
            "metadata": metadata or {}
        }

        with open(self.log_file, 'a') as f:
            f.write(json.dumps(interaction) + '
')

        return interaction["id"]

FastAPI 中间件

# llm_middleware.py
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware
import time
import json

class LLMMiddleware(BaseHTTPMiddleware):
    def __init__(self, app, logger: LLMLogger):
        super().__init__(app)
        self.logger = logger

    async def dispatch(self, request: Request, call_next):
        # Start timer
        start_time = time.time()

        # Process request
        response = await call_next(request)

        # Calculate latency
        process_time = time.time() - start_time

        # Log if it's an LLM endpoint
        if request.url.path.startswith("/api/llm"):
            # Extract request body (if available)
            body = await request.body()

            # Log interaction
            self.logger.log_interaction(
                model=request.headers.get("X-Model", "unknown"),
                prompt=body.decode(),
                response="",
                input_tokens=0,
                output_tokens=0,
                latency=process_time
            )

        # Add latency header
        response.headers["X-Process-Time"] = str(process_time)

        return response

使用 LangSmith/Phoenix 进行追踪

LangSmith 集成

# langsmith_tracing.py
from langchain_openai import ChatOpenAI
from langchain.callbacks.tracers import LangChainTracer
from langchain.schema import HumanMessage

# Initialize tracer
tracer = LangChainTracer(project_name="my-ai-app")

# Initialize LLM with tracing
llm = ChatOpenAI(
    model="gpt-4",
    temperature=0,
    callbacks=[tracer]
)

# Run with automatic tracing
response = llm.invoke([HumanMessage(content="Hello, world!")])
print(response.content)

Phoenix 集成

# phoenix_tracing.py
import phoenix as px
from phoenix.trace.langchain import LangChainInstrumentor
from langchain_openai import ChatOpenAI
from langchain.schema import HumanMessage

# Start Phoenix UI
px.launch_app()

# Instrument LangChain
LangChainInstrumentor().instrument()

# Use LangChain as normal
llm = ChatOpenAI(model="gpt-4")
response = llm.invoke([HumanMessage(content="Hello, world!")])

自定义追踪

# custom_tracing.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter

# Setup tracing
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)

# Setup Jaeger exporter
jaeger_exporter = JaegerExporter(
    agent_host_name="localhost",
    agent_port=6831,
)
span_processor = BatchSpanProcessor(jaeger_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)

# Trace LLM call
def traced_llm_call(prompt: str):
    with tracer.start_as_current_span("llm_call") as span:
        span.set_attribute("prompt", prompt)

        # Call LLM
        response = call_llm(prompt)

        span.set_attribute("response", response)
        span.set_attribute("tokens", count_tokens(response))

        return response

告警策略

Prometheus 告警规则

# alert_rules.yml
groups:
  - name: llm_alerts
    interval: 30s
    rules:
      # High latency alert
      - alert: HighLLMLatency
        expr: histogram_quantile(0.95, llm_request_latency_seconds_bucket) > 30
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High LLM latency detected"
          description: "95th percentile latency is {{ $value }}s for model {{ $labels.model }}"

      # High error rate alert
      - alert: HighLLMErrorRate
        expr: rate(llm_errors_total[5m]) / rate(llm_requests_total[5m]) > 0.05
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "High LLM error rate detected"
          description: "Error rate is {{ $value | humanizePercentage }}"

      # Cost alert
      - alert: HighDailyCost
        expr: llm_daily_cost_usd > 100
        labels:
          severity: warning
        annotations:
          summary: "Daily cost exceeds threshold"
          description: "Daily cost is ${{ $value }}"

      # Data drift alert
      - alert: DataDriftDetected
        expr: data_drift_detected == 1
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Data drift detected"
          description: "Drift detected in {{ $labels.feature }}"

自定义告警

# alert_manager.py
from typing import Callable, Dict, Any
import smtplib
from email.mime.text import MIMEText

class AlertManager:
    def __init__(self):
        self.alert_handlers: Dict[str, Callable] = {}
        self.alert_history = []

    def register_handler(self, alert_type: str, handler: Callable):
        """Register alert handler"""
        self.alert_handlers[alert_type] = handler

    def trigger_alert(self, alert_type: str, data: Dict[str, Any]):
        """Trigger an alert"""
        self.alert_history.append({
            'type': alert_type,
            'data': data,
            'timestamp': datetime.utcnow().isoformat()
        })

        if alert_type in self.alert_handlers:
            self.alert_handlers[alert_type](data)

    def email_alert(self, data: Dict[str, Any]):
        """Send email alert"""
        msg = MIMEText(data['message'])
        msg['Subject'] = data['subject']
        msg['From'] = 'alerts@example.com'
        msg['To'] = data['recipient']

        with smtplib.SMTP('smtp.example.com') as server:
            server.send_message(msg)

    def slack_alert(self, data: Dict[str, Any]):
        """Send Slack alert"""
        import requests

        webhook_url = data['webhook_url']
        payload = {
            'text': data['message'],
            'attachments': data.get('attachments', [])
        }

        requests.post(webhook_url, json=payload)

# Usage
alert_manager = AlertManager()
alert_manager.register_handler('high_latency', alert_manager.email_alert)
alert_manager.register_handler('high_error_rate', alert_manager.slack_alert)

仪表板

Grafana 仪表板配置

{
  "dashboard": {
    "title": "AI/ML Observability",
    "panels": [
      {
        "title": "Request Rate",
        "targets": [
          {
            "expr": "rate(llm_requests_total[5m])",
            "legendFormat": "{{model}}"
          }
        ],
        "type": "graph"
      },
      {
        "title": "Latency (P95)",
        "targets": [
          {
            "expr": "histogram_quantile(0.95, llm_request_latency_seconds_bucket)",
            "legendFormat": "{{model}}"
          }
        ],
        "type": "graph"
      },
      {
        "title": "Token Usage",
        "targets": [
          {
            "expr": "rate(llm_token_usage_total[5m])",
            "legendFormat": "{{model}} - {{type}}"
          }
        ],
        "type": "graph"
      },
      {
        "title": "Error Rate",
        "targets": [
          {
            "expr": "rate(llm_errors_total[5m]) / rate(llm_requests_total[5m])",
            "legendFormat": "{{model}}"
          }
        ],
        "type": "graph"
      },
      {
        "title": "Daily Cost",
        "targets": [
          {
            "expr": "llm_daily_cost_usd",
            "legendFormat": "{{model}}"
          }
        ],
        "type": "stat"
      }
    ]
  }
}

A/B 测试监控

A/B 测试设置

# ab_test_monitor.py
from typing import Dict, Any
import numpy as np
from scipy import stats

class ABTestMonitor:
    def __init__(self):
        self.experiments = {}

    def create_experiment(self, name: str, variants: list):
        """Create A/B test experiment"""
        self.experiments[name] = {
            'variants': {v: {'metrics': []} for v in variants},
            'created_at': datetime.utcnow()
        }

    def record_metric(self, experiment: str, variant: str, metric: float):
        """Record metric for variant"""
        if experiment in self.experiments:
            self.experiments[experiment]['variants'][variant]['metrics'].append(metric)

    def analyze_experiment(self, experiment: str, metric_name: str = "metric"):
        """Analyze A/B test results"""
        if experiment not in self.experiments:
            return None

        variants = self.experiments[experiment]['variants']
        results = {}

        for variant_name, data in variants.items():
            metrics = data['metrics']
            results[variant_name] = {
                'mean': np.mean(metrics),
                'std': np.std(metrics),
                'count': len(metrics)
            }

        # Statistical significance test
        variant_names = list(results.keys())
        if len(variant_names) == 2:
            t_stat, p_value = stats.ttest_ind(
                variants[variant_names[0]]['metrics'],
                variants[variant_names[1]]['metrics']
            )
            results['significance'] = {
                't_statistic': t_stat,
                'p_value': p_value,
                'significant': p_value < 0.05
            }

        return results

# Usage
monitor = ABTestMonitor()
monitor.create_experiment('model_comparison', ['gpt-4', 'gpt-3.5-turbo'])

# Record metrics (e.g., user satisfaction ratings)
monitor.record_metric('model_comparison', 'gpt-4', 4.5)
monitor.record_metric('model_comparison', 'gpt-3.5-turbo', 4.2)

# Analyze
results = monitor.analyze_experiment('model_comparison')

成本优化

成本跟踪和优化

# cost_optimizer.py
from typing import Dict, List
import heapq

class CostOptimizer:
    def __init__(self):
        self.usage_history = []

    def recommend_model(
        self,
        task_type: str,
        complexity: str,
        budget_constraint: float
    ) -> str:
        """Recommend model based on task and budget"""
        model_recommendations = {
            'simple': {
                'low': 'gpt-3.5-turbo',
                'medium': 'gpt-3.5-turbo',
                'high': 'gpt-4'
            },
            'complex': {
                'low': 'gpt-3.5-turbo',
                'medium': 'gpt-4',
                'high': 'gpt-4'
            }
        }

        return model_recommendations.get(complexity, {}).get(task_type, 'gpt-3.5-turbo')

    def optimize_token_usage(self, prompt: str, max_tokens: int = 1000) -> str:
        """Optimize prompt to reduce token usage"""
        # Remove redundant content
        # Summarize long contexts
        # Use system prompts efficiently
        optimized = prompt[:max_tokens * 4]  # Rough approximation
        return optimized

    def batch_requests(self, requests: List[Dict], batch_size: int = 10):
        """Batch requests for efficiency"""
        for i in range(0, len(requests), batch_size):
            batch = requests[i:i + batch_size]
            yield batch

    def cache_responses(self, cache_key: str, response: str, ttl: int = 3600):
        """Cache responses to avoid duplicate calls"""
        # Implement caching logic
        pass

调试模式

常见问题和解决方案

# debugging_patterns.py

class LLMDbugger:
    @staticmethod
    def debug_high_latency(latency: float, model: str):
        """Debug high latency issues"""
        issues = []

        if latency > 30:
            issues.append("Consider using a faster model")
            issues.append("Check network connectivity")
            issues.append("Review prompt complexity")

        if model == 'gpt-4' and latency > 60:
            issues.append("GPT-4 has higher latency, consider GPT-3.5 for simpler tasks")

        return issues

    @staticmethod
    def debug_high_error_rate(error_rate: float, error_types: Dict[str, int]):
        """Debug high error rate"""
        issues = []

        if 'rate_limit_exceeded' in error_types and error_types['rate_limit_exceeded'] > 10:
            issues.append("Implement rate limiting and retry logic")
            issues.append("Consider upgrading API tier")

        if 'timeout' in error_types and error_types['timeout'] > 5:
            issues.append("Increase timeout duration")
            issues.append("Check for network issues")

        return issues

    @staticmethod
    def debug_quality_degradation(quality_metrics: Dict[str, float]):
        """Debug quality degradation"""
        issues = []

        if quality_metrics.get('rougeL', 1.0) < 0.5:
            issues.append("Check for data drift")
            issues.append("Review prompt templates")
            issues.append("Consider fine-tuning model")

        return issues

生产清单

部署前清单

## 监控
- [ ] 所有指标正在收集
- [ ] 仪表板已配置并测试
- [ ] 告警规则已设置
- [ ] 告警通知已配置
- [ ] 日志保留政策已定义

## 性能
- [ ] 延迟 SLA 已定义
- [ ] 吞吐量容量已测试
- [ ] 成本预算已设置
- [ ] 速率限制已配置
- [ ] 缓存策略已实施

## 可靠性
- [ ] 重试逻辑已实施
- [ ] 后备机制已就绪
- [ ] 断路器已配置
- [ ] 健康检查已实施
- [ ] 优雅降级已测试

## 安全
- [ ] API 密钥安全存储
- [ ] 日志中敏感数据已掩码
- [ ] 访问控制已实施
- [ ] 审计日志已启用
- [ ] 内容过滤已配置

## 质量
- [ ] 质量指标正在跟踪
- [ ] A/B 测试框架已就绪
- [ ] 反馈收集已实施
- [ ] 数据漂移检测已配置
- [ ] 模型性能正在监控

## 运维
- [ ] 部署管道已测试
- [ ] 回滚程序已记录
- [ ] 值班轮换已定义
- [ ] 运行手册已创建
- [ ] 事件响应计划已就绪

部署后监控

# post_deployment_monitor.py

class PostDeploymentMonitor:
    def __init__(self, baseline_metrics: Dict[str, float]):
        self.baseline = baseline_metrics

    def check_deployment_health(self, current_metrics: Dict[str, float]) -> Dict[str, bool]:
        """Check if deployment is healthy"""
        health_status = {}

        # Check latency
        health_status['latency'] = (
            current_metrics['latency'] <= self.baseline['latency'] * 1.5
        )

        # Check error rate
        health_status['error_rate'] = (
            current_metrics['error_rate'] <= self.baseline['error_rate'] * 2
        )

        # Check quality
        health_status['quality'] = (
            current_metrics['quality'] >= self.baseline['quality'] * 0.9
        )

        return health_status

    def should_rollback(self, health_status: Dict[str, bool]) -> bool:
        """Determine if rollback is needed"""
        critical_failures = [
            not health_status.get('error_rate', True),
            not health_status.get('quality', True)
        ]

        return any(critical_failures)

最佳实践

监控设置

  • 使用全面的监控堆栈
    • Prometheus 用于指标
    • Grafana 用于可视化
    • Loki 用于日志
    • Jaeger 用于追踪
    • Phoenix 用于 AI 特定的可观测性
  • 跟踪所有关键指标
    • 延迟(P50、P95、P99)
    • 吞吐量(每秒请求数)
    • 令牌使用和成本
    • 错误率(按类型)

告警

  • 设置适当的阈值
    • 延迟:P95 > 30s 时告警
    • 错误率:> 5% 时告警
    • 成本:每日预算超出时告警
  • 使用多个告警渠道
    • 电子邮件用于关键告警
    • Slack 用于警告
    • PagerDuty 用于紧急情况

日志记录

  • 记录所有 LLM 交互
    • 包括模型、提示、响应、令牌、延迟
    • 使用结构化日志(JSON)
    • 掩码敏感数据
  • 实施中间件以自动日志记录
    • 使用 FastAPI 中间件
    • 跟踪请求/响应周期
    • 添加相关 ID

漂移检测

  • 监控数据漂移
    • 使用统计测试(KS 测试)
    • 跟踪嵌入漂移
    • 设置适当的阈值
  • 检测概念漂移
    • 监控模型性能随时间变化
    • 跟踪质量指标
    • 与基线比较

成本管理

  • 跟踪每个模型的成本
    • 监控令牌使用情况
    • 计算每个请求的成本
    • 设置每日预算
  • 优化成本
    • 根据任务使用适当的模型
    • 缓存响应
    • 批量请求

生产部署

  • 遵循清单
    • 完成所有部署前项目
    • 测试监控和告警
    • 记录运行手册
  • 监控部署后
    • 与基线指标比较
    • 观察异常
    • 准备回滚

相关技能