数据清洗流程Skill DataCleaningPipeline

构建稳健的数据清洗流程,包括缺失值处理、异常值检测、数据类型标准化、重复项删除、归一化和缩放,以及文本清洗等,以确保数据的完整性和一致性,适合于数据预处理和自动化数据流水线。

数据工程 0 次安装 0 次浏览 更新于 3/3/2026

name: 数据清洗流程 description: 构建稳健的数据清洗、缺失值插补、异常值处理和数据转换流程,以预处理数据、保证数据质量和自动化数据流水线

数据清洗流程

概览

数据清洗流程将原始、混乱的数据转换为干净、标准化的格式,适用于分析和建模,通过系统处理缺失值、异常值和数据质量问题。

使用场景

  • 准备原始数据集进行分析或建模
  • 处理缺失值和数据质量问题
  • 删除重复项和标准化格式
  • 检测和处理异常值
  • 构建自动化数据预处理工作流
  • 确保数据的完整性和一致性

核心组件

  • 缺失值处理:插补和删除策略
  • 异常值检测与处理:识别和处理异常
  • 数据类型标准化:确保正确的数据类型
  • 重复项删除:识别和删除重复项
  • 归一化与缩放:标准化值范围
  • 文本清洗:处理文本数据

清洗策略

  • 删除:删除行或列
  • 插补:用均值、中位数或预测模型填充
  • 转换:格式间转换
  • 验证:确保数据完整性规则

使用Python实现

import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.impute import SimpleImputer, KNNImputer

# 加载原始数据
df = pd.read_csv('raw_data.csv')

# 第1步:识别和处理缺失值
print("缺失值:
", df.isnull().sum())

# 策略1:删除具有关键缺失值的行
df = df.dropna(subset=['customer_id', 'transaction_date'])

# 策略2:用中位数插补数值列
imputer = SimpleImputer(strategy='median')
df['age'] = imputer.fit_transform(df[['age']])

# 策略3:对相关特征使用KNN插补
knn_imputer = KNNImputer(n_neighbors=5)
numeric_cols = df.select_dtypes(include=[np.number]).columns
df[numeric_cols] = knn_imputer.fit_transform(df[numeric_cols])

# 策略4:用众数填充分类变量
df['category'] = df['category'].fillna(df['category'].mode()[0])

# 第2步:处理重复项
print(f"重复行: {df.duplicated().sum()}")
df = df.drop_duplicates()

# 特定列的重复项
df = df.drop_duplicates(subset=['customer_id', 'transaction_date'])

# 第3步:异常值检测和处理
Q1 = df['amount'].quantile(0.25)
Q3 = df['amount'].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR

# 删除异常值
df = df[(df['amount'] >= lower_bound) & (df['amount'] <= upper_bound)]

# 替代方案:限制异常值
df['amount'] = df['amount'].clip(lower=lower_bound, upper=upper_bound)

# 第4步:数据类型标准化
df['transaction_date'] = pd.to_datetime(df['transaction_date'])
df['customer_id'] = df['customer_id'].astype('int64')
df['amount'] = pd.to_numeric(df['amount'], errors='coerce')

# 第5步:文本清洗
df['name'] = df['name'].str.strip().str.lower()
df['name'] = df['name'].str.replace('[^a-z0-9\s]', '', regex=True)

# 第6步:归一化和缩放
scaler = StandardScaler()
df[['age', 'income']] = scaler.fit_transform(df[['age', 'income']])

# 归一化缩放到有界范围[0, 1]
minmax_scaler = MinMaxScaler()
df[['score']] = minmax_scaler.fit_transform(df[['score']])

# 第7步:创建数据质量报告
def create_quality_report(df_original, df_cleaned):
    report = {
        '原始行数': len(df_original),
        '清洗后行数': len(df_cleaned),
        '移除行数': len(df_original) - len(df_cleaned),
        '移除百分比': ((len(df_original) - len(df_cleaned)) / len(df_original) * 100),
        '原始缺失': df_original.isnull().sum().sum(),
        '清洗后缺失': df_cleaned.isnull().sum().sum(),
    }
    return pd.DataFrame(report, index=[0])

quality = create_quality_report(df, df)
print(quality)

# 第8步:验证检查
assert df['age'].isnull().sum() == 0, "年龄有缺失值"
assert df['transaction_date'].dtype == 'datetime64[ns]', "日期不是datetime类型"
assert (df['amount'] >= 0).all(), "检测到负金额"

print("数据清洗流程成功完成!")

流程架构

class DataCleaningPipeline:
    def __init__(self):
        self.cleaner_steps = []

    def add_step(self, func, description):
        self.cleaner_steps.append((func, description))
        return self

    def execute(self, df):
        for func, desc in self.cleaner_steps:
            print(f"执行: {desc}")
            df = func(df)
        return df

# 使用方法
pipeline = DataCleaningPipeline()
pipeline.add_step(
    lambda df: df.dropna(subset=['customer_id']),
    "删除缺失customer_id的行"
).add_step(
    lambda df: df.drop_duplicates(),
    "删除重复行"
).add_step(
    lambda df: df[(df['amount'] > 0) & (df['amount'] < 100000)],
    "过滤无效金额范围"
)

df_clean = pipeline.execute(df)

高级清洗技巧

# 第9步:特定特征清洗
df['phone'] = df['phone'].str.replace(r'\D', '', regex=True)  # 移除非数字

# 第10步:日期时间处理
df['created_date'] = pd.to_datetime(df['created_date'], errors='coerce')
df['days_since_creation'] = (pd.Timestamp.now() - df['created_date']).dt.days

# 第11步:分类标准化
df['status'] = df['status'].str.lower().str.strip()
df['status'] = df['status'].replace({
    'active': 'active',
    'inactive': 'inactive',
    'pending': 'pending',
})

# 第12步:数值约束检查
df['age'] = df['age'].where((df['age'] >= 0) & (df['age'] <= 150), np.nan)
df['percentage'] = df['percentage'].where((df['percentage'] >= 0) & (df['percentage'] <= 100), np.nan)

# 第13步:创建数据质量评分
quality_score = {
    '缺失%': (df.isnull().sum() / len(df) * 100).mean(),
    '重复%': (df.duplicated().sum() / len(df) * 100),
    '完整特征': (df.notna().sum() / len(df)).mean() * 100,
}

# 第14步:生成清洗报告
cleaning_report = f"""
数据清洗报告
====================
移除行数: {len(df) - len(df_clean)}
列数: {len(df_clean.columns)}
剩余行数: {len(df_clean)}
完整性: {(df_clean.notna().sum().sum() / (len(df_clean) * len(df_clean.columns)) * 100):.1f}%
"""
print(cleaning_report)

关键决策

  • 如何处理缺失值(删除vs插补)?
  • 哪些异常值是合法的商业案例?
  • 可接受的值范围是什么?
  • 哪些重复项是真正的重复项?
  • 如何标准化分类值?

验证步骤

  • 检查数据类型一致性
  • 验证值范围合理性
  • 确认无意外数据丢失
  • 文档化所有应用的转换
  • 创建变更审计跟踪

交付物

  • 带有质量指标的清洗数据集
  • 记录所有步骤的数据清洗日志
  • 确认数据完整性的验证报告
  • 前后比较统计数据
  • 清洗代码和流程文档