异常检测Skill AnomalyDetection

异常检测技术用于识别数据中的异常模式和异常值,应用于欺诈检测、质量监控等多个领域,涉及统计方法、机器学习等多种技术。

数据分析 0 次安装 0 次浏览 更新于 3/3/2026

name: 异常检测 description: 使用统计方法、隔离森林和自编码器等技术识别数据中的不寻常模式、异常值和异常,用于欺诈检测和质量监控

异常检测

概览

异常检测识别数据中显著偏离正常行为的不寻常模式、异常值和异常,使得欺诈检测和系统监控成为可能。

使用场景

  • 在金融数据中检测欺诈交易或可疑活动
  • 识别系统故障、网络入侵或安全漏洞
  • 监控制造质量并识别有缺陷的产品
  • 在医疗保健数据或患者生命体征中发现不寻常的模式
  • 在物联网或工业系统中检测异常传感器读数
  • 识别客户行为中的异常值以进行有针对性的干预

检测方法

  • 统计方法: Z分数,IQR,修改Z分数
  • 基于距离: K最近邻,局部异常因子
  • 隔离: 隔离森林
  • 基于密度: DBSCAN
  • 深度学习: 自编码器,GANs

异常类型

  • 点异常: 单个不寻常的记录
  • 上下文异常: 在特定上下文中不寻常
  • 集体异常: 序列中的不寻常模式
  • 新类别: 完全新的模式

使用Python实现

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.covariance import EllipticEnvelope
from scipy import stats

# 生成带有异常的样本数据
np.random.seed(42)

# 正常数据
n_normal = 950
normal_data = np.random.normal(100, 15, (n_normal, 2))

# 异常值
n_anomalies = 50
anomalies = np.random.uniform(0, 200, (n_anomalies, 2))
anomalies[n_anomalies//2:, 0] = np.random.uniform(80, 120, n_anomalies//2)
anomalies[n_anomalies//2:, 1] = np.random.uniform(-50, 0, n_anomalies//2)

X = np.vstack([normal_data, anomalies])
y_true = np.hstack([np.zeros(n_normal), np.ones(n_anomalies)])

df = pd.DataFrame(X, columns=['特征1', '特征2'])
df['is_anomaly_true'] = y_true

print("数据摘要:")
print(f"正常样本: {n_normal}")
print(f"异常值: {n_anomalies}")
print(f"总数: {len(df)}")

# 标准化
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# 1. 统计方法 (Z分数)
z_scores = np.abs(stats.zscore(X))
z_anomaly_mask = (z_scores > 3).any(axis=1)
df['z_score_anomaly'] = z_anomaly_mask

print(f"
1. Z分数方法:")
print(f"检测到的异常值: {z_anomaly_mask.sum()}")
print(f"准确率: {(z_anomaly_mask == y_true).mean():.2%}")

# 2. 隔离森林
iso_forest = IsolationForest(contamination=n_anomalies/len(df), random_state=42)
iso_predictions = iso_forest.fit_predict(X_scaled)
iso_anomaly_mask = iso_predictions == -1
iso_scores = iso_forest.score_samples(X_scaled)

df['iso_anomaly'] = iso_anomaly_mask
df['iso_score'] = iso_scores

print(f"
2. 隔离森林:")
print(f"检测到的异常值: {iso_anomaly_mask.sum()}")
print(f"准确率: {(iso_anomaly_mask == y_true).mean():.2%}")

# 3. 局部异常因子
lof = LocalOutlierFactor(n_neighbors=20, contamination=n_anomalies/len(df))
lof_predictions = lof.fit_predict(X_scaled)
lof_anomaly_mask = lof_predictions == -1
lof_scores = lof.negative_outlier_factor_

df['lof_anomaly'] = lof_anomaly_mask
df['lof_score'] = lof_scores

print(f"
3. 局部异常因子:")
print(f"检测到的异常值: {lof_anomaly_mask.sum()}")
print(f"准确率: {(lof_anomaly_mask == y_true).mean():.2%}")

# 4. 椭圆包络 (鲁棒协方差)
ee = EllipticEnvelope(contamination=n_anomalies/len(df), random_state=42)
ee_predictions = ee.fit_predict(X_scaled)
ee_anomaly_mask = ee_predictions == -1
ee_scores = ee.mahalanobis(X_scaled)

df['ee_anomaly'] = ee_anomaly_mask
df['ee_score'] = ee_scores

print(f"
4. 椭圆包络:")
print(f"检测到的异常值: {ee_anomaly_mask.sum()}")
print(f"准确率: {(ee_anomaly_mask == y_true).mean():.2%}")

# 5. IQR方法
Q1 = np.percentile(X, 25, axis=0)
Q3 = np.percentile(X, 75, axis=0)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR

iqr_anomaly_mask = ((X < lower_bound) | (X > upper_bound)).any(axis=1)
df['iqr_anomaly'] = iqr_anomaly_mask

print(f"
5. IQR方法:")
print(f"检测到的异常值: {iqr_anomaly_mask.sum()}")
print(f"准确率: {(iqr_anomaly_mask == y_true).mean():.2%}")

# 可视化异常检测方法
fig, axes = plt.subplots(2, 3, figsize=(15, 10))

methods = [
    (z_anomaly_mask, 'Z分数', None),
    (iso_anomaly_mask, '隔离森林', iso_scores),
    (lof_anomaly_mask, 'LOF', lof_scores),
    (ee_anomaly_mask, '椭圆包络', ee_scores),
    (iqr_anomaly_mask, 'IQR', None),
]

# 真实异常值
ax = axes[0, 0]
colors = ['blue' if not a else 'red' for a in y_true]
ax.scatter(df['特征1'], df['特征2'], c=colors, alpha=0.6, s=30)
ax.set_title('真实异常值')
ax.set_xlabel('特征 1')
ax.set_ylabel('特征 2')

# 绘制每种方法
for idx, (anomaly_mask, method_name, scores) in enumerate(methods):
    ax = axes.flatten()[idx + 1]

    if scores is not None:
        scatter = ax.scatter(df['特征1'], df['特征2'], c=scores, cmap='RdYlBu_r', alpha=0.6, s=30)
        plt.colorbar(scatter, ax=ax, label='分数')
    else:
        colors = ['red' if a else 'blue' for a in anomaly_mask]
        ax.scatter(df['特征1'], df['特征2'], c=colors, alpha=0.6, s=30)

    ax.set_title(f'{method_name}
({anomaly_mask.sum()} 异常值)')
    ax.set_xlabel('特征 1')
    ax.set_ylabel('特征 2')

plt.tight_layout()
plt.show()

# 6. 异常分数比较
fig, axes = plt.subplots(2, 2, figsize=(14, 8))

# 隔离森林分数
axes[0, 0].hist(iso_scores[~y_true], bins=30, alpha=0.7, label='正常', color='blue')
axes[0, 0].hist(iso_scores[y_true == 1], bins=10, alpha=0.7, label='异常', color='red')
axes[0, 0].set_xlabel('异常分数')
axes[0, 0].set_title('隔离森林分数分布')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)

# LOF分数
axes[0, 1].hist(lof_scores[~y_true], bins=30, alpha=0.7, label='正常', color='blue')
axes[0, 1].hist(lof_scores[y_true == 1], bins=10, alpha=0.7, label='异常', color='red')
axes[0, 1].set_xlabel('异常分数')
axes[0, 1].set_title('LOF分数分布')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)

# 隔离森林的ROC类曲线
iso_scores_sorted = np.sort(iso_scores)
detected_at_threshold = []
for threshold in iso_scores_sorted:
    detected = (iso_scores <= threshold).sum()
    true_detected = ((iso_scores <= threshold) & (y_true == 1)).sum()
    if detected > 0:
        precision = true_detected / detected
        recall = true_detected / n_anomalies
        detected_at_threshold.append({'Threshold': threshold, 'Precision': precision, 'Recall': recall})

if detected_at_threshold:
    threshold_df = pd.DataFrame(detected_at_threshold)
    axes[1, 0].plot(threshold_df['Recall'], threshold_df['Precision'], linewidth=2)
    axes[1, 0].set_xlabel('召回率')
    axes[1, 0].set_ylabel('精确度')
    axes[1, 0].set_title('精确度-召回率曲线 (隔离森林)')
    axes[1, 0].grid(True, alpha=0.3)

# 方法比较
methods_comparison = pd.DataFrame({
    '方法': ['Z分数', '隔离森林', 'LOF', '椭圆包络', 'IQR'],
    '准确率': [
        (z_anomaly_mask == y_true).mean(),
        (iso_anomaly_mask == y_true).mean(),
        (lof_anomaly_mask == y_true).mean(),
        (ee_anomaly_mask == y_true).mean(),
        (iqr_anomaly_mask == y_true).mean(),
    ]
})

axes[1, 1].barh(methods_comparison['方法'], methods_comparison['准确率'], color='steelblue', edgecolor='black')
axes[1, 1].set_xlabel('准确率')
axes[1, 1].set_title('方法比较')
axes[1, 1].set_xlim([0, 1])
for i, v in enumerate(methods_comparison['准确率']):
    axes[1, 1].text(v, i, f' {v:.2%}', va='center')

plt.tight_layout()
plt.show()

# 7. 集成异常检测
# 结合多种方法
ensemble_votes = (z_anomaly_mask.astype(int) +
                  iso_anomaly_mask.astype(int) +
                  lof_anomaly_mask.astype(int) +
                  ee_anomaly_mask.astype(int) +
                  iqr_anomaly_mask.astype(int))
df['ensemble_votes'] = ensemble_votes
ensemble_anomaly = ensemble_votes >= 3  # 多数投票

print(f"
6. 集成 (多数投票):
")
print(f"检测到的异常值: {ensemble_anomaly.sum()}")
print(f"准确率: {(ensemble_anomaly == y_true).mean():.2%}")

# 可视化集成
fig, ax = plt.subplots(figsize=(10, 8))
scatter = ax.scatter(df['特征1'], df['特征2'], c=ensemble_votes, cmap='RdYlGn_r',
                     s=100 * (ensemble_anomaly.astype(int) + 0.5), alpha=0.6, edgecolors='black')
ax.set_xlabel('特征 1')
ax.set_ylabel('特征 2')
ax.set_title('集成异常检测 (颜色: 投票数, 大小: 异常值)')
cbar = plt.colorbar(scatter, ax=ax, label='方法数量')
plt.show()

# 8. 时间序列异常
time_series_data = np.sin(np.arange(100) * 0.2) * 10 + 100
time_series_data = time_series_data + np.random.normal(0, 2, 100)
# 添加异常值
time_series_data[25] = 150
time_series_data[50] = 50
time_series_data[75] = 140

# 使用滚动统计检测
rolling_mean = pd.Series(time_series_data).rolling(window=5).mean()
rolling_std = pd.Series(time_series_data).rolling(window=5).std()
z_scores_ts = np.abs((time_series_data - rolling_mean) / rolling_std) > 2

fig, ax = plt.subplots(figsize=(12, 5))
ax.plot(time_series_data, linewidth=1, label='数据')
ax.plot(rolling_mean, linewidth=2, label='滚动平均')
ax.scatter(np.where(z_scores_ts)[0], time_series_data[z_scores_ts], color='red', s=100, label='异常值', zorder=5)
ax.fill_between(range(len(time_series_data)), rolling_mean - 2*rolling_std, rolling_mean + 2*rolling_std,
                alpha=0.2, label='±2 标准差')
ax.set_xlabel('时间')
ax.set_ylabel('值')
ax.set_title('时间序列异常检测')
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

print("
异常检测分析完成!")

方法选择指南

  • Z分数: 简单,快速,假设正态分布
  • IQR: 稳健,非参数,适合异常值
  • 隔离森林: 高效,适合高维
  • LOF: 基于密度,发现局部异常
  • 自编码器: 复杂模式,深度学习

阈值选择

  • 保守: 较少误报,较多漏报
  • 积极: 更多异常被标记,更多误报
  • 数据驱动: 使用验证集优化阈值

交付物

  • 异常检测结果
  • 异常分数可视化
  • 方法比较
  • 识别的异常记录
  • 生产部署建议
  • 阈值优化分析