Evidently漂移检测器Skill evidently-drift-detector

Evidently漂移检测器是一个基于Evidently AI的机器学习监控技能,专门用于生产环境中ML系统的数据漂移检测、模型性能监控、目标漂移分析和自动报告生成。该技能支持多种统计检验方法(如KS、PSI、Wasserstein),能够量化漂移幅度、监控性能退化、生成可视化报告,并与ML管道集成实现自动化重训练触发。适用于机器学习运维(MLOps)、模型监控、数据质量保障等场景。关键词:机器学习监控、数据漂移检测、模型性能监控、Evidently AI、MLOps、概念漂移、目标漂移、自动化报告、统计检验、生产环境ML。

机器学习 0 次安装 1 次浏览 更新于 2/23/2026

name: evidently-drift-detector description: Evidently AI技能,用于生产环境中机器学习系统的数据漂移检测、模型性能监控、目标漂移分析和自动报告生成。 allowed-tools: Read, Grep, Write, Bash, Edit, Glob

Evidently 漂移检测器

使用Evidently AI检测数据漂移、监控模型性能并生成自动报告。

概述

本技能利用Evidently AI提供全面的机器学习监控能力。它能够检测生产环境中机器学习系统的数据漂移、概念漂移、目标漂移和模型性能退化。

能力

数据漂移检测

  • 特征级漂移检测
  • 数据集级漂移分析
  • 多种漂移检测方法(KS、PSI、Wasserstein等)
  • 分布可视化
  • 漂移幅度量化

模型性能监控

  • 分类指标跟踪
  • 回归指标跟踪
  • 性能退化检测
  • 切片分析
  • 错误分析

目标漂移分析

  • 目标分布变化
  • 标签漂移检测
  • 预测漂移监控
  • 类别平衡监控

自动报告生成

  • HTML报告生成
  • JSON指标导出
  • 仪表板集成
  • 自定义指标创建
  • 测试套件执行

生产环境监控

  • 实时监控集成
  • 告警阈值配置
  • 时间序列漂移跟踪
  • 批次比较分析

先决条件

安装

pip install evidently>=0.4.0

可选依赖

# 支持Spark
pip install evidently[spark]

# 特定可视化
pip install plotly nbformat

使用模式

基础数据漂移报告

from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset

# 定义列映射
column_mapping = ColumnMapping(
    target='target',
    prediction='prediction',
    numerical_features=['feature_1', 'feature_2', 'feature_3'],
    categorical_features=['category_1', 'category_2']
)

# 创建漂移报告
report = Report(metrics=[
    DataDriftPreset()
])

# 运行报告,比较参考数据和当前数据
report.run(
    reference_data=reference_df,
    current_data=current_df,
    column_mapping=column_mapping
)

# 保存报告
report.save_html("drift_report.html")

# 获取指标字典
metrics_dict = report.as_dict()

分类性能报告

from evidently.metric_preset import ClassificationPreset

report = Report(metrics=[
    ClassificationPreset()
])

report.run(
    reference_data=reference_df,
    current_data=current_df,
    column_mapping=column_mapping
)

# 访问特定指标
results = report.as_dict()
accuracy = results['metrics'][0]['result']['current']['accuracy']

回归性能报告

from evidently.metric_preset import RegressionPreset

report = Report(metrics=[
    RegressionPreset()
])

report.run(
    reference_data=reference_df,
    current_data=current_df,
    column_mapping=column_mapping
)

自动化检查测试套件

from evidently.test_suite import TestSuite
from evidently.test_preset import DataDriftTestPreset, DataQualityTestPreset

# 创建测试套件
test_suite = TestSuite(tests=[
    DataDriftTestPreset(),
    DataQualityTestPreset()
])

# 运行测试
test_suite.run(
    reference_data=reference_df,
    current_data=current_df,
    column_mapping=column_mapping
)

# 检查结果
if test_suite.as_dict()['summary']['all_passed']:
    print("所有测试通过!")
else:
    failed_tests = [t for t in test_suite.as_dict()['tests'] if t['status'] == 'FAIL']
    print(f"失败测试数:{len(failed_tests)}")

独立漂移指标

from evidently.metrics import (
    DatasetDriftMetric,
    ColumnDriftMetric,
    DataDriftTable,
    TargetByFeaturesTable
)

# 详细漂移分析
report = Report(metrics=[
    DatasetDriftMetric(),
    ColumnDriftMetric(column_name='feature_1'),
    ColumnDriftMetric(column_name='feature_2'),
    DataDriftTable(),
    TargetByFeaturesTable()
])

report.run(
    reference_data=reference_df,
    current_data=current_df,
    column_mapping=column_mapping
)

自定义漂移阈值

from evidently.metrics import DatasetDriftMetric
from evidently.options import DataDriftOptions

# 自定义选项
options = DataDriftOptions(
    drift_share=0.5,  # 标记数据集漂移的漂移特征比例
    stattest='psi',   # 统计检验方法
    stattest_threshold=0.1  # PSI阈值
)

report = Report(metrics=[
    DatasetDriftMetric(options=options)
])

时间序列监控

import pandas as pd
from datetime import datetime, timedelta

def monitor_over_time(reference_df, production_data_stream, window_days=7):
    """随时间窗口监控漂移。"""
    results = []

    for window_start in production_data_stream:
        window_end = window_start + timedelta(days=window_days)
        current_window = production_data_stream.query(
            f"timestamp >= '{window_start}' and timestamp < '{window_end}'"
        )

        report = Report(metrics=[DataDriftPreset()])
        report.run(reference_data=reference_df, current_data=current_window)

        metrics = report.as_dict()
        results.append({
            'window_start': window_start,
            'drift_detected': metrics['metrics'][0]['result']['dataset_drift'],
            'drift_share': metrics['metrics'][0]['result']['drift_share']
        })

    return pd.DataFrame(results)

与Babysitter SDK集成

任务定义示例

const driftDetectionTask = defineTask({
  name: 'evidently-drift-detection',
  description: '检测参考数据和当前数据之间的数据漂移',

  inputs: {
    referenceDataPath: { type: 'string', required: true },
    currentDataPath: { type: 'string', required: true },
    targetColumn: { type: 'string' },
    predictionColumn: { type: 'string' },
    numericalFeatures: { type: 'array' },
    categoricalFeatures: { type: 'array' },
    driftThreshold: { type: 'number', default: 0.5 }
  },

  outputs: {
    driftDetected: { type: 'boolean' },
    driftShare: { type: 'number' },
    driftedFeatures: { type: 'array' },
    reportPath: { type: 'string' }
  },

  async run(inputs, taskCtx) {
    return {
      kind: 'skill',
      title: '检测数据漂移',
      skill: {
        name: 'evidently-drift-detector',
        context: {
          operation: 'detect_drift',
          referenceDataPath: inputs.referenceDataPath,
          currentDataPath: inputs.currentDataPath,
          targetColumn: inputs.targetColumn,
          predictionColumn: inputs.predictionColumn,
          numericalFeatures: inputs.numericalFeatures,
          categoricalFeatures: inputs.categoricalFeatures,
          driftThreshold: inputs.driftThreshold
        }
      },
      io: {
        inputJsonPath: `tasks/${taskCtx.effectId}/input.json`,
        outputJsonPath: `tasks/${taskCtx.effectId}/result.json`
      }
    };
  }
});

可用预设

指标预设

预设 使用场景
DataDriftPreset 特征漂移检测
DataQualityPreset 数据质量检查
ClassificationPreset 分类模型性能
RegressionPreset 回归模型性能
TargetDriftPreset 目标变量漂移
TextOverviewPreset 文本数据分析

测试预设

预设 使用场景
DataDriftTestPreset 自动化漂移测试
DataQualityTestPreset 数据质量验证
DataStabilityTestPreset 数据稳定性检查
NoTargetPerformanceTestPreset 代理性能测试
RegressionTestPreset 回归性能测试
MulticlassClassificationTestPreset 多分类测试
BinaryClassificationTestPreset 二分类测试

可用统计检验

检验 方法 最佳适用
ks Kolmogorov-Smirnov 数值型,通用
psi 人口稳定性指数 生产环境监控
wasserstein Wasserstein距离 分布比较
jensenshannon Jensen-Shannon散度 概率分布
chisquare 卡方检验 分类特征
z Z检验 大样本,正态分布
kl_div KL散度 信息论

ML管道集成

重训练触发

def check_retraining_needed(reference_df, current_df, column_mapping, threshold=0.3):
    """基于漂移判断是否需要模型重训练。"""
    from evidently.test_suite import TestSuite
    from evidently.tests import TestShareOfDriftedColumns

    test_suite = TestSuite(tests=[
        TestShareOfDriftedColumns(lt=threshold)
    ])

    test_suite.run(
        reference_data=reference_df,
        current_data=current_df,
        column_mapping=column_mapping
    )

    results = test_suite.as_dict()
    retraining_needed = not results['summary']['all_passed']

    return {
        'retraining_needed': retraining_needed,
        'drift_share': results['tests'][0]['result']['current'],
        'threshold': threshold
    }

性能退化告警

def check_performance_degradation(reference_df, current_df, column_mapping, min_accuracy=0.85):
    """分类准确率低于阈值时告警。"""
    from evidently.tests import TestAccuracyScore

    test_suite = TestSuite(tests=[
        TestAccuracyScore(gte=min_accuracy)
    ])

    test_suite.run(
        reference_data=reference_df,
        current_data=current_df,
        column_mapping=column_mapping
    )

    results = test_suite.as_dict()
    return {
        'degradation_detected': not results['summary']['all_passed'],
        'current_accuracy': results['tests'][0]['result']['current'],
        'threshold': min_accuracy
    }

最佳实践

  1. 建立基线:使用生产数据作为参考,而非训练数据
  2. 选择合适检验:匹配数据类型的统计检验
  3. 设置有意义阈值:平衡敏感度与告警疲劳
  4. 监控特征重要性:关注高影响特征
  5. 基于时间比较:比较相似时间段
  6. 记录决策:记录为何某些漂移可接受

参考资料