AI/ML 可观测性和监控
概览
全面的指南,用于监控生产中的 AI/ML 系统,包括 LLM、RAG 应用和传统 ML 模型。这个技能涵盖了监控堆栈设置(Prometheus、Grafana、Jaeger、Phoenix)、指标跟踪(延迟、吞吐量、令牌使用、成本、错误)、模型性能监控、数据漂移检测、LLM 交互日志记录、使用 LangSmith/Phoenix 进行追踪、告警策略、仪表板、A/B 测试监控、成本优化、调试模式和生产部署清单。
前提条件
- 理解可观测性概念(指标、日志、追踪)
- 熟悉监控工具(Prometheus、Grafana)
- 了解 LLM API 及其定价模型
- 理解统计概念以检测漂移
- 熟悉 Docker 和容器编排
- 了解 FastAPI 和中间件模式
核心概念
监控堆栈组件
- Prometheus: 指标收集和存储
- Grafana: 可视化和仪表板
- Loki/Promtail: 日志聚合
- Jaeger: 分布式追踪
- Phoenix/Arize: AI 特定的可观测性
- LangSmith: LangChain 追踪和调试
关键指标
- 延迟: 请求/响应时间(P50、P95、P99)
- 吞吐量: 每秒/分钟请求数
- 令牌使用: 每个模型的输入/输出令牌
- 成本: 每个模型和端点的 USD 成本
- 错误率: 速率限制、超时、验证错误
模型性能指标
- 分类: 准确率、F1、精确度、召回率
- 生成: ROUGE 分数、BLEU、语义相似度
- RAG: Precision@K、Recall@K、MRR、NDCG
漂移检测
- 统计漂移: KS 测试、JS 散度、人口稳定性指数
- 嵌入漂移: 中心点距离、相似性偏移
- 概念漂移: 模型行为随时间的变化
追踪
- OpenTelemetry: 标准追踪框架
- LangSmith Tracer: LangChain 特定追踪
- Phoenix Instrumentor: LangChain 的自动追踪
实施指南
监控堆栈
Docker Compose 设置
# docker-compose.yml for observability stack
version: '3.8'
services:
# Metrics Collection
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
# Visualization
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- grafana-storage:/var/lib/grafana
# Logs
loki:
image: grafana/loki:latest
ports:
- "3100:3100"
promtail:
image: grafana/promtail:latest
volumes:
- ./promtail.yml:/etc/promtail/promtail.yml
- /var/log:/var/log:ro
# Tracing
jaeger:
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686"
- "14268:14268"
# AI-Specific Observability
arize:
image: arizephoenix/phoenix:latest
ports:
- "6006:6006"
volumes:
grafana-storage:
LangSmith 设置
# langsmith_config.py
import os
from langsmith import Client
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
os.environ["LANGCHAIN_PROJECT"] = "my-ai-app"
client = Client()
要跟踪的指标
延迟跟踪
# metrics_collector.py
from prometheus_client import Histogram, Counter, Gauge
import time
# Request latency histogram
request_latency = Histogram(
'llm_request_latency_seconds',
'LLM request latency',
['model', 'endpoint']
)
@request_latency.time()
def track_llm_request(model: str, endpoint: str):
# Decorator for tracking latency
pass
# Manual tracking
start_time = time.time()
response = call_llm()
duration = time.time() - start_time
request_latency.labels(model='gpt-4', endpoint='/chat').observe(duration)
吞吐量跟踪
# Requests per second
requests_total = Counter(
'llm_requests_total',
'Total LLM requests',
['model', 'status']
)
requests_per_minute = Gauge(
'llm_requests_per_minute',
'LLM requests per minute',
['model']
)
# Track throughput
def track_request(model: str, status: str):
requests_total.labels(model=model, status=status).inc()
令牌使用跟踪
# Token usage tracking
token_usage = Counter(
'llm_token_usage_total',
'Total tokens used',
['model', 'type'] # type: input, output
)
token_cost = Counter(
'llm_token_cost_usd',
'Total cost in USD',
['model']
)
def track_tokens(model: str, input_tokens: int, output_tokens: int):
token_usage.labels(model=model, type='input').inc(input_tokens)
token_usage.labels(model=model, type='output').inc(output_tokens)
# Calculate cost
input_cost = input_tokens * INPUT_COST_PER_1K[model] / 1000
output_cost = output_tokens * OUTPUT_COST_PER_1K[model] / 1000
token_cost.labels(model=model).inc(input_cost + output_cost)
成本跟踪
# Cost tracking configuration
COST_PER_1K_TOKENS = {
'gpt-4': {'input': 0.03, 'output': 0.06},
'gpt-3.5-turbo': {'input': 0.0015, 'output': 0.002},
'claude-3-opus': {'input': 0.015, 'output': 0.075},
}
def calculate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
costs = COST_PER_1K_TOKENS.get(model, {'input': 0, 'output': 0})
return (input_tokens * costs['input'] + output_tokens * costs['output']) / 1000
# Daily cost tracking
daily_cost = Gauge(
'llm_daily_cost_usd',
'Daily LLM cost',
['model', 'date']
)
错误率跟踪
# Error tracking
error_total = Counter(
'llm_errors_total',
'Total LLM errors',
['model', 'error_type']
)
error_rate = Gauge(
'llm_error_rate',
'LLM error rate',
['model']
)
ERROR_TYPES = [
'rate_limit_exceeded',
'invalid_request',
'timeout',
'content_filter',
'model_not_found',
]
def track_error(model: str, error_type: str):
if error_type in ERROR_TYPES:
error_total.labels(model=model, error_type=error_type).inc()
模型性能监控
质量指标
# quality_metrics.py
from sklearn.metrics import accuracy_score, f1_score, precision_recall_fscore_support
from rouge_score import rouge_scorer
import numpy as np
class QualityMonitor:
def __init__(self):
self.rouge_scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True)
def track_classification(self, y_true, y_pred):
"""Track classification metrics"""
metrics = {
'accuracy': accuracy_score(y_true, y_pred),
'f1': f1_score(y_true, y_pred, average='weighted'),
'precision_recall': precision_recall_fscore_support(y_true, y_pred)
}
return metrics
def track_generation_quality(self, reference: str, generated: str):
"""Track text generation quality"""
scores = self.rouge_scorer.score(reference, generated)
return {
'rouge1': scores['rouge1'].fmeasure,
'rouge2': scores['rouge2'].fmeasure,
'rougeL': scores['rougeL'].fmeasure
}
def track_rag_retrieval(self, relevant_docs: list, retrieved_docs: list, k: int = 10):
"""Track RAG retrieval quality"""
# Precision@K
precision = len(set(relevant_docs) & set(retrieved_docs[:k])) / k
# Recall@K
recall = len(set(relevant_docs) & set(retrieved_docs[:k])) / len(relevant_docs) if relevant_docs else 0
# MRR (Mean Reciprocal Rank)
mrr = 0
for i, doc in enumerate(retrieved_docs[:k], 1):
if doc in relevant_docs:
mrr = 1 / i
break
return {'precision_at_k': precision, 'recall_at_k': recall, 'mrr': mrr}
反馈收集
# feedback_collector.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import json
from datetime import datetime
class Feedback(BaseModel):
request_id: str
user_id: str
rating: int # 1-5 stars
comment: str = None
helpful: bool = None
app = FastAPI()
@app.post("/feedback")
async def collect_feedback(feedback: Feedback):
"""Collect user feedback on AI responses"""
feedback_data = {
**feedback.dict(),
'timestamp': datetime.utcnow().isoformat()
}
# Store feedback
await store_feedback(feedback_data)
# Update metrics
update_feedback_metrics(feedback.rating, feedback.helpful)
return {"status": "recorded"}
def update_feedback_metrics(rating: int, helpful: bool):
"""Update feedback metrics"""
# Track average rating
# Track helpful percentage
pass
数据漂移检测
统计漂移检测
# drift_detection.py
import numpy as np
from scipy import stats
from scipy.spatial.distance import jensenshannon
import pandas as pd
class DriftDetector:
def __init__(self, reference_data: pd.DataFrame):
self.reference_data = reference_data
self.reference_stats = self._calculate_stats(reference_data)
def _calculate_stats(self, data: pd.DataFrame):
"""Calculate reference statistics"""
stats_dict = {}
for column in data.select_dtypes(include=[np.number]).columns:
stats_dict[column] = {
'mean': data[column].mean(),
'std': data[column].std(),
'min': data[column].min(),
'max': data[column].max(),
'percentiles': data[column].quantile([0.25, 0.5, 0.75]).to_dict()
}
return stats_dict
def detect_drift(self, current_data: pd.DataFrame, threshold: float = 0.05):
"""Detect statistical drift"""
drift_results = {}
for column in current_data.select_dtypes(include=[np.number]).columns:
if column in self.reference_stats:
# Kolmogorov-Smirnov test
ks_stat, p_value = stats.ks_2samp(
self.reference_data[column],
current_data[column]
)
drift_results[column] = {
'ks_statistic': ks_stat,
'p_value': p_value,
'drift_detected': p_value < threshold
}
return drift_results
def detect_distribution_drift(self, current_data: pd.DataFrame):
"""Detect distribution drift using JS divergence"""
drift_results = {}
for column in current_data.select_dtypes(include=[np.number]).columns:
# Create histograms
ref_hist, ref_bins = np.histogram(self.reference_data[column], bins=20, density=True)
curr_hist, _ = np.histogram(current_data[column], bins=ref_bins, density=True)
# Calculate Jensen-Shannon divergence
js_divergence = jensenshannon(ref_hist, curr_hist)
drift_results[column] = {
'js_divergence': js_divergence,
'drift_detected': js_divergence > 0.1 # Threshold
}
return drift_results
嵌入漂移检测
# embedding_drift.py
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
class EmbeddingDriftDetector:
def __init__(self, reference_embeddings: np.ndarray):
self.reference_embeddings = reference_embeddings
self.reference_centroid = np.mean(reference_embeddings, axis=0)
def detect_drift(self, current_embeddings: np.ndarray, threshold: float = 0.1):
"""Detect drift in embedding space"""
current_centroid = np.mean(current_embeddings, axis=0)
# Calculate centroid distance
centroid_distance = 1 - cosine_similarity(
[self.reference_centroid],
[current_centroid]
)[0][0]
# Calculate average pairwise similarity
ref_similarities = []
curr_similarities = []
for i in range(min(100, len(self.reference_embeddings))):
ref_sim = np.mean(cosine_similarity(
[self.reference_embeddings[i]],
self.reference_embeddings[:100]
))
ref_similarities.append(ref_sim)
for i in range(min(100, len(current_embeddings))):
curr_sim = np.mean(cosine_similarity(
[current_embeddings[i]],
current_embeddings[:100]
))
curr_similarities.append(curr_sim)
similarity_shift = abs(np.mean(ref_similarities) - np.mean(curr_similarities))
return {
'centroid_distance': centroid_distance,
'similarity_shift': similarity_shift,
'drift_detected': centroid_distance > threshold or similarity_shift > threshold
}
日志记录 LLM 交互
结构化日志记录
# llm_logger.py
import json
import uuid
from datetime import datetime
from typing import Dict, Any, Optional
class LLMLogger:
def __init__(self, log_file: str = "llm_interactions.jsonl"):
self.log_file = log_file
def log_interaction(
self,
model: str,
prompt: str,
response: str,
input_tokens: int,
output_tokens: int,
latency: float,
metadata: Optional[Dict[str, Any]] = None
):
"""Log LLM interaction"""
interaction = {
"id": str(uuid.uuid4()),
"timestamp": datetime.utcnow().isoformat(),
"model": model,
"prompt": prompt,
"response": response,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"total_tokens": input_tokens + output_tokens,
"latency_seconds": latency,
"metadata": metadata or {}
}
with open(self.log_file, 'a') as f:
f.write(json.dumps(interaction) + '
')
return interaction["id"]
def log_rag_interaction(
self,
query: str,
retrieved_docs: list,
response: str,
retrieval_scores: list,
metadata: Optional[Dict[str, Any]] = None
):
"""Log RAG interaction"""
interaction = {
"id": str(uuid.uuid4()),
"timestamp": datetime.utcnow().isoformat(),
"type": "rag",
"query": query,
"retrieved_docs": retrieved_docs,
"retrieval_scores": retrieval_scores,
"response": response,
"metadata": metadata or {}
}
with open(self.log_file, 'a') as f:
f.write(json.dumps(interaction) + '
')
return interaction["id"]
FastAPI 中间件
# llm_middleware.py
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware
import time
import json
class LLMMiddleware(BaseHTTPMiddleware):
def __init__(self, app, logger: LLMLogger):
super().__init__(app)
self.logger = logger
async def dispatch(self, request: Request, call_next):
# Start timer
start_time = time.time()
# Process request
response = await call_next(request)
# Calculate latency
process_time = time.time() - start_time
# Log if it's an LLM endpoint
if request.url.path.startswith("/api/llm"):
# Extract request body (if available)
body = await request.body()
# Log interaction
self.logger.log_interaction(
model=request.headers.get("X-Model", "unknown"),
prompt=body.decode(),
response="",
input_tokens=0,
output_tokens=0,
latency=process_time
)
# Add latency header
response.headers["X-Process-Time"] = str(process_time)
return response
使用 LangSmith/Phoenix 进行追踪
LangSmith 集成
# langsmith_tracing.py
from langchain_openai import ChatOpenAI
from langchain.callbacks.tracers import LangChainTracer
from langchain.schema import HumanMessage
# Initialize tracer
tracer = LangChainTracer(project_name="my-ai-app")
# Initialize LLM with tracing
llm = ChatOpenAI(
model="gpt-4",
temperature=0,
callbacks=[tracer]
)
# Run with automatic tracing
response = llm.invoke([HumanMessage(content="Hello, world!")])
print(response.content)
Phoenix 集成
# phoenix_tracing.py
import phoenix as px
from phoenix.trace.langchain import LangChainInstrumentor
from langchain_openai import ChatOpenAI
from langchain.schema import HumanMessage
# Start Phoenix UI
px.launch_app()
# Instrument LangChain
LangChainInstrumentor().instrument()
# Use LangChain as normal
llm = ChatOpenAI(model="gpt-4")
response = llm.invoke([HumanMessage(content="Hello, world!")])
自定义追踪
# custom_tracing.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
# Setup tracing
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
# Setup Jaeger exporter
jaeger_exporter = JaegerExporter(
agent_host_name="localhost",
agent_port=6831,
)
span_processor = BatchSpanProcessor(jaeger_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
# Trace LLM call
def traced_llm_call(prompt: str):
with tracer.start_as_current_span("llm_call") as span:
span.set_attribute("prompt", prompt)
# Call LLM
response = call_llm(prompt)
span.set_attribute("response", response)
span.set_attribute("tokens", count_tokens(response))
return response
告警策略
Prometheus 告警规则
# alert_rules.yml
groups:
- name: llm_alerts
interval: 30s
rules:
# High latency alert
- alert: HighLLMLatency
expr: histogram_quantile(0.95, llm_request_latency_seconds_bucket) > 30
for: 5m
labels:
severity: warning
annotations:
summary: "High LLM latency detected"
description: "95th percentile latency is {{ $value }}s for model {{ $labels.model }}"
# High error rate alert
- alert: HighLLMErrorRate
expr: rate(llm_errors_total[5m]) / rate(llm_requests_total[5m]) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "High LLM error rate detected"
description: "Error rate is {{ $value | humanizePercentage }}"
# Cost alert
- alert: HighDailyCost
expr: llm_daily_cost_usd > 100
labels:
severity: warning
annotations:
summary: "Daily cost exceeds threshold"
description: "Daily cost is ${{ $value }}"
# Data drift alert
- alert: DataDriftDetected
expr: data_drift_detected == 1
for: 10m
labels:
severity: warning
annotations:
summary: "Data drift detected"
description: "Drift detected in {{ $labels.feature }}"
自定义告警
# alert_manager.py
from typing import Callable, Dict, Any
import smtplib
from email.mime.text import MIMEText
class AlertManager:
def __init__(self):
self.alert_handlers: Dict[str, Callable] = {}
self.alert_history = []
def register_handler(self, alert_type: str, handler: Callable):
"""Register alert handler"""
self.alert_handlers[alert_type] = handler
def trigger_alert(self, alert_type: str, data: Dict[str, Any]):
"""Trigger an alert"""
self.alert_history.append({
'type': alert_type,
'data': data,
'timestamp': datetime.utcnow().isoformat()
})
if alert_type in self.alert_handlers:
self.alert_handlers[alert_type](data)
def email_alert(self, data: Dict[str, Any]):
"""Send email alert"""
msg = MIMEText(data['message'])
msg['Subject'] = data['subject']
msg['From'] = 'alerts@example.com'
msg['To'] = data['recipient']
with smtplib.SMTP('smtp.example.com') as server:
server.send_message(msg)
def slack_alert(self, data: Dict[str, Any]):
"""Send Slack alert"""
import requests
webhook_url = data['webhook_url']
payload = {
'text': data['message'],
'attachments': data.get('attachments', [])
}
requests.post(webhook_url, json=payload)
# Usage
alert_manager = AlertManager()
alert_manager.register_handler('high_latency', alert_manager.email_alert)
alert_manager.register_handler('high_error_rate', alert_manager.slack_alert)
仪表板
Grafana 仪表板配置
{
"dashboard": {
"title": "AI/ML Observability",
"panels": [
{
"title": "Request Rate",
"targets": [
{
"expr": "rate(llm_requests_total[5m])",
"legendFormat": "{{model}}"
}
],
"type": "graph"
},
{
"title": "Latency (P95)",
"targets": [
{
"expr": "histogram_quantile(0.95, llm_request_latency_seconds_bucket)",
"legendFormat": "{{model}}"
}
],
"type": "graph"
},
{
"title": "Token Usage",
"targets": [
{
"expr": "rate(llm_token_usage_total[5m])",
"legendFormat": "{{model}} - {{type}}"
}
],
"type": "graph"
},
{
"title": "Error Rate",
"targets": [
{
"expr": "rate(llm_errors_total[5m]) / rate(llm_requests_total[5m])",
"legendFormat": "{{model}}"
}
],
"type": "graph"
},
{
"title": "Daily Cost",
"targets": [
{
"expr": "llm_daily_cost_usd",
"legendFormat": "{{model}}"
}
],
"type": "stat"
}
]
}
}
A/B 测试监控
A/B 测试设置
# ab_test_monitor.py
from typing import Dict, Any
import numpy as np
from scipy import stats
class ABTestMonitor:
def __init__(self):
self.experiments = {}
def create_experiment(self, name: str, variants: list):
"""Create A/B test experiment"""
self.experiments[name] = {
'variants': {v: {'metrics': []} for v in variants},
'created_at': datetime.utcnow()
}
def record_metric(self, experiment: str, variant: str, metric: float):
"""Record metric for variant"""
if experiment in self.experiments:
self.experiments[experiment]['variants'][variant]['metrics'].append(metric)
def analyze_experiment(self, experiment: str, metric_name: str = "metric"):
"""Analyze A/B test results"""
if experiment not in self.experiments:
return None
variants = self.experiments[experiment]['variants']
results = {}
for variant_name, data in variants.items():
metrics = data['metrics']
results[variant_name] = {
'mean': np.mean(metrics),
'std': np.std(metrics),
'count': len(metrics)
}
# Statistical significance test
variant_names = list(results.keys())
if len(variant_names) == 2:
t_stat, p_value = stats.ttest_ind(
variants[variant_names[0]]['metrics'],
variants[variant_names[1]]['metrics']
)
results['significance'] = {
't_statistic': t_stat,
'p_value': p_value,
'significant': p_value < 0.05
}
return results
# Usage
monitor = ABTestMonitor()
monitor.create_experiment('model_comparison', ['gpt-4', 'gpt-3.5-turbo'])
# Record metrics (e.g., user satisfaction ratings)
monitor.record_metric('model_comparison', 'gpt-4', 4.5)
monitor.record_metric('model_comparison', 'gpt-3.5-turbo', 4.2)
# Analyze
results = monitor.analyze_experiment('model_comparison')
成本优化
成本跟踪和优化
# cost_optimizer.py
from typing import Dict, List
import heapq
class CostOptimizer:
def __init__(self):
self.usage_history = []
def recommend_model(
self,
task_type: str,
complexity: str,
budget_constraint: float
) -> str:
"""Recommend model based on task and budget"""
model_recommendations = {
'simple': {
'low': 'gpt-3.5-turbo',
'medium': 'gpt-3.5-turbo',
'high': 'gpt-4'
},
'complex': {
'low': 'gpt-3.5-turbo',
'medium': 'gpt-4',
'high': 'gpt-4'
}
}
return model_recommendations.get(complexity, {}).get(task_type, 'gpt-3.5-turbo')
def optimize_token_usage(self, prompt: str, max_tokens: int = 1000) -> str:
"""Optimize prompt to reduce token usage"""
# Remove redundant content
# Summarize long contexts
# Use system prompts efficiently
optimized = prompt[:max_tokens * 4] # Rough approximation
return optimized
def batch_requests(self, requests: List[Dict], batch_size: int = 10):
"""Batch requests for efficiency"""
for i in range(0, len(requests), batch_size):
batch = requests[i:i + batch_size]
yield batch
def cache_responses(self, cache_key: str, response: str, ttl: int = 3600):
"""Cache responses to avoid duplicate calls"""
# Implement caching logic
pass
调试模式
常见问题和解决方案
# debugging_patterns.py
class LLMDbugger:
@staticmethod
def debug_high_latency(latency: float, model: str):
"""Debug high latency issues"""
issues = []
if latency > 30:
issues.append("Consider using a faster model")
issues.append("Check network connectivity")
issues.append("Review prompt complexity")
if model == 'gpt-4' and latency > 60:
issues.append("GPT-4 has higher latency, consider GPT-3.5 for simpler tasks")
return issues
@staticmethod
def debug_high_error_rate(error_rate: float, error_types: Dict[str, int]):
"""Debug high error rate"""
issues = []
if 'rate_limit_exceeded' in error_types and error_types['rate_limit_exceeded'] > 10:
issues.append("Implement rate limiting and retry logic")
issues.append("Consider upgrading API tier")
if 'timeout' in error_types and error_types['timeout'] > 5:
issues.append("Increase timeout duration")
issues.append("Check for network issues")
return issues
@staticmethod
def debug_quality_degradation(quality_metrics: Dict[str, float]):
"""Debug quality degradation"""
issues = []
if quality_metrics.get('rougeL', 1.0) < 0.5:
issues.append("Check for data drift")
issues.append("Review prompt templates")
issues.append("Consider fine-tuning model")
return issues
生产清单
部署前清单
## 监控
- [ ] 所有指标正在收集
- [ ] 仪表板已配置并测试
- [ ] 告警规则已设置
- [ ] 告警通知已配置
- [ ] 日志保留政策已定义
## 性能
- [ ] 延迟 SLA 已定义
- [ ] 吞吐量容量已测试
- [ ] 成本预算已设置
- [ ] 速率限制已配置
- [ ] 缓存策略已实施
## 可靠性
- [ ] 重试逻辑已实施
- [ ] 后备机制已就绪
- [ ] 断路器已配置
- [ ] 健康检查已实施
- [ ] 优雅降级已测试
## 安全
- [ ] API 密钥安全存储
- [ ] 日志中敏感数据已掩码
- [ ] 访问控制已实施
- [ ] 审计日志已启用
- [ ] 内容过滤已配置
## 质量
- [ ] 质量指标正在跟踪
- [ ] A/B 测试框架已就绪
- [ ] 反馈收集已实施
- [ ] 数据漂移检测已配置
- [ ] 模型性能正在监控
## 运维
- [ ] 部署管道已测试
- [ ] 回滚程序已记录
- [ ] 值班轮换已定义
- [ ] 运行手册已创建
- [ ] 事件响应计划已就绪
部署后监控
# post_deployment_monitor.py
class PostDeploymentMonitor:
def __init__(self, baseline_metrics: Dict[str, float]):
self.baseline = baseline_metrics
def check_deployment_health(self, current_metrics: Dict[str, float]) -> Dict[str, bool]:
"""Check if deployment is healthy"""
health_status = {}
# Check latency
health_status['latency'] = (
current_metrics['latency'] <= self.baseline['latency'] * 1.5
)
# Check error rate
health_status['error_rate'] = (
current_metrics['error_rate'] <= self.baseline['error_rate'] * 2
)
# Check quality
health_status['quality'] = (
current_metrics['quality'] >= self.baseline['quality'] * 0.9
)
return health_status
def should_rollback(self, health_status: Dict[str, bool]) -> bool:
"""Determine if rollback is needed"""
critical_failures = [
not health_status.get('error_rate', True),
not health_status.get('quality', True)
]
return any(critical_failures)
最佳实践
监控设置
- 使用全面的监控堆栈
- Prometheus 用于指标
- Grafana 用于可视化
- Loki 用于日志
- Jaeger 用于追踪
- Phoenix 用于 AI 特定的可观测性
- 跟踪所有关键指标
- 延迟(P50、P95、P99)
- 吞吐量(每秒请求数)
- 令牌使用和成本
- 错误率(按类型)
告警
- 设置适当的阈值
- 延迟:P95 > 30s 时告警
- 错误率:> 5% 时告警
- 成本:每日预算超出时告警
- 使用多个告警渠道
- 电子邮件用于关键告警
- Slack 用于警告
- PagerDuty 用于紧急情况
日志记录
- 记录所有 LLM 交互
- 包括模型、提示、响应、令牌、延迟
- 使用结构化日志(JSON)
- 掩码敏感数据
- 实施中间件以自动日志记录
- 使用 FastAPI 中间件
- 跟踪请求/响应周期
- 添加相关 ID
漂移检测
- 监控数据漂移
- 使用统计测试(KS 测试)
- 跟踪嵌入漂移
- 设置适当的阈值
- 检测概念漂移
- 监控模型性能随时间变化
- 跟踪质量指标
- 与基线比较
成本管理
- 跟踪每个模型的成本
- 监控令牌使用情况
- 计算每个请求的成本
- 设置每日预算
- 优化成本
- 根据任务使用适当的模型
- 缓存响应
- 批量请求
生产部署
- 遵循清单
- 完成所有部署前项目
- 测试监控和告警
- 记录运行手册
- 监控部署后
- 与基线指标比较
- 观察异常
- 准备回滚