name: evidently-drift-detector
description: Evidently AI技能,用于生产环境中机器学习系统的数据漂移检测、模型性能监控、目标漂移分析和自动报告生成。
allowed-tools: Read, Grep, Write, Bash, Edit, Glob
Evidently 漂移检测器
使用Evidently AI检测数据漂移、监控模型性能并生成自动报告。
概述
本技能利用Evidently AI提供全面的机器学习监控能力。它能够检测生产环境中机器学习系统的数据漂移、概念漂移、目标漂移和模型性能退化。
能力
数据漂移检测
- 特征级漂移检测
- 数据集级漂移分析
- 多种漂移检测方法(KS、PSI、Wasserstein等)
- 分布可视化
- 漂移幅度量化
模型性能监控
- 分类指标跟踪
- 回归指标跟踪
- 性能退化检测
- 切片分析
- 错误分析
目标漂移分析
- 目标分布变化
- 标签漂移检测
- 预测漂移监控
- 类别平衡监控
自动报告生成
- HTML报告生成
- JSON指标导出
- 仪表板集成
- 自定义指标创建
- 测试套件执行
生产环境监控
- 实时监控集成
- 告警阈值配置
- 时间序列漂移跟踪
- 批次比较分析
先决条件
安装
pip install evidently>=0.4.0
可选依赖
# 支持Spark
pip install evidently[spark]
# 特定可视化
pip install plotly nbformat
使用模式
基础数据漂移报告
from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset
# 定义列映射
column_mapping = ColumnMapping(
target='target',
prediction='prediction',
numerical_features=['feature_1', 'feature_2', 'feature_3'],
categorical_features=['category_1', 'category_2']
)
# 创建漂移报告
report = Report(metrics=[
DataDriftPreset()
])
# 运行报告,比较参考数据和当前数据
report.run(
reference_data=reference_df,
current_data=current_df,
column_mapping=column_mapping
)
# 保存报告
report.save_html("drift_report.html")
# 获取指标字典
metrics_dict = report.as_dict()
分类性能报告
from evidently.metric_preset import ClassificationPreset
report = Report(metrics=[
ClassificationPreset()
])
report.run(
reference_data=reference_df,
current_data=current_df,
column_mapping=column_mapping
)
# 访问特定指标
results = report.as_dict()
accuracy = results['metrics'][0]['result']['current']['accuracy']
回归性能报告
from evidently.metric_preset import RegressionPreset
report = Report(metrics=[
RegressionPreset()
])
report.run(
reference_data=reference_df,
current_data=current_df,
column_mapping=column_mapping
)
自动化检查测试套件
from evidently.test_suite import TestSuite
from evidently.test_preset import DataDriftTestPreset, DataQualityTestPreset
# 创建测试套件
test_suite = TestSuite(tests=[
DataDriftTestPreset(),
DataQualityTestPreset()
])
# 运行测试
test_suite.run(
reference_data=reference_df,
current_data=current_df,
column_mapping=column_mapping
)
# 检查结果
if test_suite.as_dict()['summary']['all_passed']:
print("所有测试通过!")
else:
failed_tests = [t for t in test_suite.as_dict()['tests'] if t['status'] == 'FAIL']
print(f"失败测试数:{len(failed_tests)}")
独立漂移指标
from evidently.metrics import (
DatasetDriftMetric,
ColumnDriftMetric,
DataDriftTable,
TargetByFeaturesTable
)
# 详细漂移分析
report = Report(metrics=[
DatasetDriftMetric(),
ColumnDriftMetric(column_name='feature_1'),
ColumnDriftMetric(column_name='feature_2'),
DataDriftTable(),
TargetByFeaturesTable()
])
report.run(
reference_data=reference_df,
current_data=current_df,
column_mapping=column_mapping
)
自定义漂移阈值
from evidently.metrics import DatasetDriftMetric
from evidently.options import DataDriftOptions
# 自定义选项
options = DataDriftOptions(
drift_share=0.5, # 标记数据集漂移的漂移特征比例
stattest='psi', # 统计检验方法
stattest_threshold=0.1 # PSI阈值
)
report = Report(metrics=[
DatasetDriftMetric(options=options)
])
时间序列监控
import pandas as pd
from datetime import datetime, timedelta
def monitor_over_time(reference_df, production_data_stream, window_days=7):
"""随时间窗口监控漂移。"""
results = []
for window_start in production_data_stream:
window_end = window_start + timedelta(days=window_days)
current_window = production_data_stream.query(
f"timestamp >= '{window_start}' and timestamp < '{window_end}'"
)
report = Report(metrics=[DataDriftPreset()])
report.run(reference_data=reference_df, current_data=current_window)
metrics = report.as_dict()
results.append({
'window_start': window_start,
'drift_detected': metrics['metrics'][0]['result']['dataset_drift'],
'drift_share': metrics['metrics'][0]['result']['drift_share']
})
return pd.DataFrame(results)
与Babysitter SDK集成
任务定义示例
const driftDetectionTask = defineTask({
name: 'evidently-drift-detection',
description: '检测参考数据和当前数据之间的数据漂移',
inputs: {
referenceDataPath: { type: 'string', required: true },
currentDataPath: { type: 'string', required: true },
targetColumn: { type: 'string' },
predictionColumn: { type: 'string' },
numericalFeatures: { type: 'array' },
categoricalFeatures: { type: 'array' },
driftThreshold: { type: 'number', default: 0.5 }
},
outputs: {
driftDetected: { type: 'boolean' },
driftShare: { type: 'number' },
driftedFeatures: { type: 'array' },
reportPath: { type: 'string' }
},
async run(inputs, taskCtx) {
return {
kind: 'skill',
title: '检测数据漂移',
skill: {
name: 'evidently-drift-detector',
context: {
operation: 'detect_drift',
referenceDataPath: inputs.referenceDataPath,
currentDataPath: inputs.currentDataPath,
targetColumn: inputs.targetColumn,
predictionColumn: inputs.predictionColumn,
numericalFeatures: inputs.numericalFeatures,
categoricalFeatures: inputs.categoricalFeatures,
driftThreshold: inputs.driftThreshold
}
},
io: {
inputJsonPath: `tasks/${taskCtx.effectId}/input.json`,
outputJsonPath: `tasks/${taskCtx.effectId}/result.json`
}
};
}
});
可用预设
指标预设
| 预设 |
使用场景 |
DataDriftPreset |
特征漂移检测 |
DataQualityPreset |
数据质量检查 |
ClassificationPreset |
分类模型性能 |
RegressionPreset |
回归模型性能 |
TargetDriftPreset |
目标变量漂移 |
TextOverviewPreset |
文本数据分析 |
测试预设
| 预设 |
使用场景 |
DataDriftTestPreset |
自动化漂移测试 |
DataQualityTestPreset |
数据质量验证 |
DataStabilityTestPreset |
数据稳定性检查 |
NoTargetPerformanceTestPreset |
代理性能测试 |
RegressionTestPreset |
回归性能测试 |
MulticlassClassificationTestPreset |
多分类测试 |
BinaryClassificationTestPreset |
二分类测试 |
可用统计检验
| 检验 |
方法 |
最佳适用 |
ks |
Kolmogorov-Smirnov |
数值型,通用 |
psi |
人口稳定性指数 |
生产环境监控 |
wasserstein |
Wasserstein距离 |
分布比较 |
jensenshannon |
Jensen-Shannon散度 |
概率分布 |
chisquare |
卡方检验 |
分类特征 |
z |
Z检验 |
大样本,正态分布 |
kl_div |
KL散度 |
信息论 |
ML管道集成
重训练触发
def check_retraining_needed(reference_df, current_df, column_mapping, threshold=0.3):
"""基于漂移判断是否需要模型重训练。"""
from evidently.test_suite import TestSuite
from evidently.tests import TestShareOfDriftedColumns
test_suite = TestSuite(tests=[
TestShareOfDriftedColumns(lt=threshold)
])
test_suite.run(
reference_data=reference_df,
current_data=current_df,
column_mapping=column_mapping
)
results = test_suite.as_dict()
retraining_needed = not results['summary']['all_passed']
return {
'retraining_needed': retraining_needed,
'drift_share': results['tests'][0]['result']['current'],
'threshold': threshold
}
性能退化告警
def check_performance_degradation(reference_df, current_df, column_mapping, min_accuracy=0.85):
"""分类准确率低于阈值时告警。"""
from evidently.tests import TestAccuracyScore
test_suite = TestSuite(tests=[
TestAccuracyScore(gte=min_accuracy)
])
test_suite.run(
reference_data=reference_df,
current_data=current_df,
column_mapping=column_mapping
)
results = test_suite.as_dict()
return {
'degradation_detected': not results['summary']['all_passed'],
'current_accuracy': results['tests'][0]['result']['current'],
'threshold': min_accuracy
}
最佳实践
- 建立基线:使用生产数据作为参考,而非训练数据
- 选择合适检验:匹配数据类型的统计检验
- 设置有意义阈值:平衡敏感度与告警疲劳
- 监控特征重要性:关注高影响特征
- 基于时间比较:比较相似时间段
- 记录决策:记录为何某些漂移可接受
参考资料