name: 数据清洗流程 description: 构建稳健的数据清洗、缺失值插补、异常值处理和数据转换流程,以预处理数据、保证数据质量和自动化数据流水线
数据清洗流程
概览
数据清洗流程将原始、混乱的数据转换为干净、标准化的格式,适用于分析和建模,通过系统处理缺失值、异常值和数据质量问题。
使用场景
- 准备原始数据集进行分析或建模
- 处理缺失值和数据质量问题
- 删除重复项和标准化格式
- 检测和处理异常值
- 构建自动化数据预处理工作流
- 确保数据的完整性和一致性
核心组件
- 缺失值处理:插补和删除策略
- 异常值检测与处理:识别和处理异常
- 数据类型标准化:确保正确的数据类型
- 重复项删除:识别和删除重复项
- 归一化与缩放:标准化值范围
- 文本清洗:处理文本数据
清洗策略
- 删除:删除行或列
- 插补:用均值、中位数或预测模型填充
- 转换:格式间转换
- 验证:确保数据完整性规则
使用Python实现
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.impute import SimpleImputer, KNNImputer
# 加载原始数据
df = pd.read_csv('raw_data.csv')
# 第1步:识别和处理缺失值
print("缺失值:
", df.isnull().sum())
# 策略1:删除具有关键缺失值的行
df = df.dropna(subset=['customer_id', 'transaction_date'])
# 策略2:用中位数插补数值列
imputer = SimpleImputer(strategy='median')
df['age'] = imputer.fit_transform(df[['age']])
# 策略3:对相关特征使用KNN插补
knn_imputer = KNNImputer(n_neighbors=5)
numeric_cols = df.select_dtypes(include=[np.number]).columns
df[numeric_cols] = knn_imputer.fit_transform(df[numeric_cols])
# 策略4:用众数填充分类变量
df['category'] = df['category'].fillna(df['category'].mode()[0])
# 第2步:处理重复项
print(f"重复行: {df.duplicated().sum()}")
df = df.drop_duplicates()
# 特定列的重复项
df = df.drop_duplicates(subset=['customer_id', 'transaction_date'])
# 第3步:异常值检测和处理
Q1 = df['amount'].quantile(0.25)
Q3 = df['amount'].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
# 删除异常值
df = df[(df['amount'] >= lower_bound) & (df['amount'] <= upper_bound)]
# 替代方案:限制异常值
df['amount'] = df['amount'].clip(lower=lower_bound, upper=upper_bound)
# 第4步:数据类型标准化
df['transaction_date'] = pd.to_datetime(df['transaction_date'])
df['customer_id'] = df['customer_id'].astype('int64')
df['amount'] = pd.to_numeric(df['amount'], errors='coerce')
# 第5步:文本清洗
df['name'] = df['name'].str.strip().str.lower()
df['name'] = df['name'].str.replace('[^a-z0-9\s]', '', regex=True)
# 第6步:归一化和缩放
scaler = StandardScaler()
df[['age', 'income']] = scaler.fit_transform(df[['age', 'income']])
# 归一化缩放到有界范围[0, 1]
minmax_scaler = MinMaxScaler()
df[['score']] = minmax_scaler.fit_transform(df[['score']])
# 第7步:创建数据质量报告
def create_quality_report(df_original, df_cleaned):
report = {
'原始行数': len(df_original),
'清洗后行数': len(df_cleaned),
'移除行数': len(df_original) - len(df_cleaned),
'移除百分比': ((len(df_original) - len(df_cleaned)) / len(df_original) * 100),
'原始缺失': df_original.isnull().sum().sum(),
'清洗后缺失': df_cleaned.isnull().sum().sum(),
}
return pd.DataFrame(report, index=[0])
quality = create_quality_report(df, df)
print(quality)
# 第8步:验证检查
assert df['age'].isnull().sum() == 0, "年龄有缺失值"
assert df['transaction_date'].dtype == 'datetime64[ns]', "日期不是datetime类型"
assert (df['amount'] >= 0).all(), "检测到负金额"
print("数据清洗流程成功完成!")
流程架构
class DataCleaningPipeline:
def __init__(self):
self.cleaner_steps = []
def add_step(self, func, description):
self.cleaner_steps.append((func, description))
return self
def execute(self, df):
for func, desc in self.cleaner_steps:
print(f"执行: {desc}")
df = func(df)
return df
# 使用方法
pipeline = DataCleaningPipeline()
pipeline.add_step(
lambda df: df.dropna(subset=['customer_id']),
"删除缺失customer_id的行"
).add_step(
lambda df: df.drop_duplicates(),
"删除重复行"
).add_step(
lambda df: df[(df['amount'] > 0) & (df['amount'] < 100000)],
"过滤无效金额范围"
)
df_clean = pipeline.execute(df)
高级清洗技巧
# 第9步:特定特征清洗
df['phone'] = df['phone'].str.replace(r'\D', '', regex=True) # 移除非数字
# 第10步:日期时间处理
df['created_date'] = pd.to_datetime(df['created_date'], errors='coerce')
df['days_since_creation'] = (pd.Timestamp.now() - df['created_date']).dt.days
# 第11步:分类标准化
df['status'] = df['status'].str.lower().str.strip()
df['status'] = df['status'].replace({
'active': 'active',
'inactive': 'inactive',
'pending': 'pending',
})
# 第12步:数值约束检查
df['age'] = df['age'].where((df['age'] >= 0) & (df['age'] <= 150), np.nan)
df['percentage'] = df['percentage'].where((df['percentage'] >= 0) & (df['percentage'] <= 100), np.nan)
# 第13步:创建数据质量评分
quality_score = {
'缺失%': (df.isnull().sum() / len(df) * 100).mean(),
'重复%': (df.duplicated().sum() / len(df) * 100),
'完整特征': (df.notna().sum() / len(df)).mean() * 100,
}
# 第14步:生成清洗报告
cleaning_report = f"""
数据清洗报告
====================
移除行数: {len(df) - len(df_clean)}
列数: {len(df_clean.columns)}
剩余行数: {len(df_clean)}
完整性: {(df_clean.notna().sum().sum() / (len(df_clean) * len(df_clean.columns)) * 100):.1f}%
"""
print(cleaning_report)
关键决策
- 如何处理缺失值(删除vs插补)?
- 哪些异常值是合法的商业案例?
- 可接受的值范围是什么?
- 哪些重复项是真正的重复项?
- 如何标准化分类值?
验证步骤
- 检查数据类型一致性
- 验证值范围合理性
- 确认无意外数据丢失
- 文档化所有应用的转换
- 创建变更审计跟踪
交付物
- 带有质量指标的清洗数据集
- 记录所有步骤的数据清洗日志
- 确认数据完整性的验证报告
- 前后比较统计数据
- 清洗代码和流程文档