DataPreprocessing DataPreprocessing

数据预处理技能涵盖了从原始数据转换为适合机器学习模型训练的格式的全过程,包括数据清洗、特征工程、归一化、编码分类变量等关键步骤。

机器学习 0 次安装 0 次浏览 更新于 3/5/2026

数据预处理

概览

数据预处理是机器学习管道中的关键步骤,它将原始数据转换为适合模型训练的格式。这项技能涵盖了数据清洗、特征工程、归一化、编码分类变量、缩放、增强、管道创建以及不同数据类型的预处理。

前提条件

  • 理解Python编程
  • 了解pandas和NumPy
  • 熟悉scikit-learn
  • 理解机器学习概念
  • 基本统计知识

核心概念

数据清洗

  • 缺失值:通过插补或移除处理null/NaN值
  • 异常值:检测和处理极端值
  • 重复项:识别和移除重复记录
  • 数据验证:确保数据的完整性和一致性

特征工程

  • 多项式特征:创建高阶和交互特征
  • 日期/时间特征:从日期时间列中提取时间模式
  • 文本特征:将文本转换为数值表示
  • 比率特征:从组合中创建派生特征

归一化和缩放

  • 标准化:Z分数归一化(均值=0,标准差=1)
  • 最小-最大缩放:缩放到固定范围[0, 1]
  • 鲁棒缩放:使用中位数和IQR进行抗异常缩放
  • 归一化:将单个样本缩放到单位范数

编码分类变量

  • 标签编码:将类别转换为整数
  • 独热编码:为每个类别创建二进制列
  • 目标编码:使用目标均值进行编码
  • 序数编码:保持分类数据的顺序

实施指南

数据清洗

缺失值

import pandas as pd
import numpy as np
from sklearn.impute import SimpleImputer, KNNImputer

class MissingValueHandler:
    """处理数据集中的缺失值。"""

    def __init__(self, strategy='mean', numeric_strategy='mean', categorical_strategy='most_frequent'):
        self.strategy = strategy
        self.numeric_strategy = numeric_strategy
        self.categorical_strategy = categorical_strategy
        self.numeric_imputer = None
        self.categorical_imputer = None

    def fit(self, X):
        """对数据进行拟合。"""
        if isinstance(X, pd.DataFrame):
            numeric_cols = X.select_dtypes(include=[np.number]).columns
            categorical_cols = X.select_dtypes(exclude=[np.number]).columns
        else:
            # 假设所有列都是数值型,对于numpy数组
            numeric_cols = list(range(X.shape[1]))
            categorical_cols = []

        if len(numeric_cols) > 0:
            self.numeric_imputer = SimpleImputer(strategy=self.numeric_strategy)
            if isinstance(X, pd.DataFrame):
                self.numeric_imputer.fit(X[numeric_cols])
            else:
                self.numeric_imputer.fit(X[:, numeric_cols])

        if len(categorical_cols) > 0:
            self.categorical_imputer = SimpleImputer(strategy=self.categorical_strategy)
            self.categorical_imputer.fit(X[categorical_cols])

        return self

    def transform(self, X):
        """使用拟合的插补器转换数据。"""
        X_transformed = X.copy()

        if isinstance(X, pd.DataFrame):
            numeric_cols = X.select_dtypes(include=[np.number]).columns
            categorical_cols = X.select_dtypes(exclude=[np.number]).columns

            if self.numeric_imputer is not None and len(numeric_cols) > 0:
                X_transformed[numeric_cols] = self.numeric_imputer.transform(X[numeric_cols])

            if self.categorical_imputer is not None and len(categorical_cols) > 0:
                X_transformed[categorical_cols] = self.categorical_imputer.transform(X[categorical_cols])
        else:
            if self.numeric_imputer is not None:
                X_transformed = self.numeric_imputer.transform(X)

        return X_transformed

    def fit_transform(self, X):
        """一步完成拟合和转换。"""
        return self.fit(X).transform(X)

# 使用方法
handler = MissingValueHandler(numeric_strategy='mean', categorical_strategy='most_frequent')
X_clean = handler.fit_transform(X_train)

# KNN插补,更复杂的处理
knn_imputer = KNNImputer(n_neighbors=5)
X_knn = knn_imputer.fit_transform(X_train)

异常值

from scipy import stats
from sklearn.preprocessing import RobustScaler

class OutlierHandler:
    """检测和处理异常值。"""

    @staticmethod
    def z_score_detection(X, threshold=3):
        """使用Z分数检测异常值。"""
        if isinstance(X, pd.DataFrame):
            numeric_cols = X.select_dtypes(include=[np.number]).columns
            z_scores = np.abs(stats.zscore(X[numeric_cols], nan_policy='omit'))
            outliers = (z_scores > threshold).any(axis=1)
        else:
            z_scores = np.abs(stats.zscore(X, nan_policy='omit'))
            outliers = (z_scores > threshold).any(axis=1)

        return outliers

    @staticmethod
    def iqr_detection(X, multiplier=1.5):
        """使用IQR方法检测异常值。"""
        if isinstance(X, pd.DataFrame):
            numeric_cols = X.select_dtypes(include=[np.number]).columns
            outliers = pd.Series(False, index=X.index)

            for col in numeric_cols:
                Q1 = X[col].quantile(0.25)
                Q3 = X[col].quantile(0.75)
                IQR = Q3 - Q1
                lower_bound = Q1 - multiplier * IQR
                upper_bound = Q3 + multiplier * IQR
                outliers |= (X[col] < lower_bound) | (X[col] > upper_bound)
        else:
            Q1 = np.percentile(X, 25, axis=0)
            Q3 = np.percentile(X, 75, axis=0)
            IQR = Q3 - Q1
            lower_bound = Q1 - multiplier * IQR
            upper_bound = Q3 + multiplier * IQR
            outliers = ((X < lower_bound) | (X > upper_bound)).any(axis=1)

        return outliers

    @staticmethod
    def cap_outliers(X, method='iqr', multiplier=1.5):
        """将异常值限制在边界内,而不是移除。"""
        if isinstance(X, pd.DataFrame):
            X_capped = X.copy()
            numeric_cols = X.select_dtypes(include=[np.number]).columns

            for col in numeric_cols:
                if method == 'iqr':
                    Q1 = X[col].quantile(0.25)
                    Q3 = X[col].quantile(0.75)
                    IQR = Q3 - Q1
                    lower_bound = Q1 - multiplier * IQR
                    upper_bound = Q3 + multiplier * IQR
                elif method == 'zscore':
                    mean = X[col].mean()
                    std = X[col].std()
                    lower_bound = mean - multiplier * std
                    upper_bound = mean + multiplier * std

                X_capped[col] = X[col].clip(lower_bound, upper_bound)

            return X_capped
        else:
            if method == 'iqr':
                Q1 = np.percentile(X, 25, axis=0)
                Q3 = np.percentile(X, 75, axis=0)
                IQR = Q3 - Q1
                lower_bound = Q1 - multiplier * IQR
                upper_bound = Q3 + multiplier * IQR
            elif method == 'zscore':
                mean = np.mean(X, axis=0)
                std = np.std(X, axis=0)
                lower_bound = mean - multiplier * std
                upper_bound = mean + multiplier * std

            return np.clip(X, lower_bound, upper_bound)

# 使用方法
outlier_handler = OutlierHandler()

# 检测异常值
outliers = outlier_handler.z_score_detection(X_train, threshold=3)

# 限制异常值
X_capped = outlier_handler.cap_outliers(X_train, method='iqr')

# 移除异常值
X_clean = X_train[~outliers]
y_clean = y_train[~outliers]

重复项

class DuplicateHandler:
    """处理重复行。"""

    @staticmethod
    def find_duplicates(X, subset=None):
        """查找重复行。"""
        if isinstance(X, pd.DataFrame):
            duplicates = X.duplicated(subset=subset, keep='first')
        else:
            # 对于numpy数组
            _, indices = np.unique(X, axis=0, return_index=True)
            duplicates = np.ones(len(X), dtype=bool)
            duplicates[indices] = False

        return duplicates

    @staticmethod
    def remove_duplicates(X, y=None, subset=None):
        """移除重复行。"""
        if isinstance(X, pd.DataFrame):
            if y is not None:
                df = X.copy()
                df['target'] = y
                df_clean = df.drop_duplicates(subset=subset, keep='first')
                return df_clean.drop(columns=['target']), df_clean['target']
            else:
                return X.drop_duplicates(subset=subset, keep='first')
        else:
            if y is not None:
                combined = np.column_stack([X, y])
                _, indices = np.unique(combined, axis=0, return_index=True)
                return X[indices], y[indices]
            else:
                _, indices = np.unique(X, axis=0, return_index=True)
                return X[indices]

# 使用方法
dup_handler = DuplicateHandler()

# 查找重复项
duplicates = dup_handler.find_duplicates(X_train)

# 移除重复项
X_clean, y_clean = dup_handler.remove_duplicates(X_train, y_train)

特征工程

多项式特征

from sklearn.preprocessing import PolynomialFeatures
import numpy as np

class FeatureEngineer:
    """从现有特征创建新特征。"""

    def __init__(self):
        self.polynomial_features = None

    def create_polynomial_features(self, X, degree=2, include_bias=False):
        """创建多项式特征。"""
        self.polynomial_features = PolynomialFeatures(
            degree=degree,
            include_bias=include_bias,
            interaction_only=False
        )
        return self.polynomial_features.fit_transform(X)

    def create_interaction_features(self, X):
        """创建成对交互特征。"""
        n_features = X.shape[1]
        interaction_features = []

        for i in range(n_features):
            for j in range(i + 1, n_features):
                interaction_features.append(X[:, i] * X[:, j])

        return np.column_stack([X] + interaction_features)

    def create_ratio_features(self, X, pairs):
        """从列对创建比率特征。"""
        ratio_features = X.copy()

        for col1, col2 in pairs:
            # 避免除以零
            ratio = np.divide(X[:, col1], X[:, col2],
                           out=np.zeros_like(X[:, col1]),
                           where=X[:, col2] != 0)
            ratio_features = np.column_stack([ratio_features, ratio])

        return ratio_features

    def create_bin_features(self, X, bins=10, strategy='uniform'):
        """创建分箱特征。"""
        from sklearn.preprocessing import KBinsDiscretizer

        discretizer = KBinsDiscretizer(
            n_bins=bins,
            encode='onehot',
            strategy=strategy
        )

        return discretizer.fit_transform(X)

# 使用方法
engineer = FeatureEngineer()

# 多项式特征
X_poly = engineer.create_polynomial_features(X_train, degree=2)

# 交互特征
X_interaction = engineer.create_interaction_features(X_train)

# 比率特征
X_ratio = engineer.create_ratio_features(X_train, pairs=[(0, 1), (0, 2)])

日期/时间特征

import pandas as pd
from datetime import datetime

class DateTimeFeatureEngineer:
    """从日期时间列中提取特征。"""

    @staticmethod
    def extract_features(X, datetime_cols):
        """从日期时间列中提取特征。"""
        if isinstance(X, pd.DataFrame):
            X_features = X.copy()

            for col in datetime_cols:
                if col in X.columns:
                    # 如果尚未转换为日期时间,则进行转换
                    if not pd.api.types.is_datetime64_any_dtype(X[col]):
                        X_features[col] = pd.to_datetime(X[col])

                    # 提取特征
                    X_features[f'{col}_year'] = X_features[col].dt.year
                    X_features[f'{col}_month'] = X_features[col].dt.month
                    X_features[f'{col}_day'] = X_features[col].dt.day
                    X_features[f'{col}_dayofweek'] = X_features[col].dt.dayofweek
                    X_features[f'{col}_dayofyear'] = X_features[col].dt.dayofyear
                    X_features[f'{col}_weekofyear'] = X_features[col].dt.isocalendar().week
                    X_features[f'{col}_hour'] = X_features[col].dt.hour
                    X_features[f'{col}_minute'] = X_features[col].dt.minute
                    X_features[f'{col}_is_weekend'] = (X_features[col].dt.dayofweek >= 5).astype(int)
                    X_features[f'{col}_is_month_start'] = (X_features[col].dt.day <= 7).astype(int)
                    X_features[f'{col}_is_month_end'] = (X_features[col].dt.day >= 24).astype(int)

                    # 循环特征
                    X_features[f'{col}_month_sin'] = np.sin(2 * np.pi * X_features[col].dt.month / 12)
                    X_features[f'{col}_month_cos'] = np.cos(2 * np.pi * X_features[col].dt.month / 12)
                    X_features[f'{col}_dayofweek_sin'] = np.sin(2 * np.pi * X_features[col].dt.dayofweek / 7)
                    X_features[f'{col}_dayofweek_cos'] = np.cos(2 * np.pi * X_features[col].dt.dayofweek / 7)

            # 删除原始日期时间列
            X_features = X_features.drop(columns=datetime_cols)

            return X_features
        else:
            raise ValueError("DateTimeFeatureEngineer仅支持pandas DataFrames")

# 使用方法
dt_engineer = DateTimeFeatureEngineer()
X_dt_features = dt_engineer.extract_features(X_train, datetime_cols=['date_column'])

文本特征

from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
import re

class TextFeatureEngineer:
    """从文本列中提取特征。"""

    def __init__(self):
        self.tfidf_vectorizer = None
        self.count_vectorizer = None

    def create_tfidf_features(self, texts, max_features=1000, ngram_range=(1, 2)):
        """创建TF-IDF特征。"""
        self.tfidf_vectorizer = TfidfVectorizer(
            max_features=max_features,
            ngram_range=ngram_range,
            stop_words='english',
            lowercase=True
        )
        return self.tfidf_vectorizer.fit_transform(texts)

    def create_count_features(self, texts, max_features=1000, ngram_range=(1, 1)):
        """创建计数特征。"""
        self.count_vectorizer = CountVectorizer(
            max_features=max_features,
            ngram_range=ngram_range,
            stop_words='english',
            lowercase=True
        )
        return self.count_vectorizer.fit_transform(texts)

    def create_basic_features(self, texts):
        """创建基本文本特征。"""
        features = []

        for text in texts:
            # 长度特征
            features.append([
                len(text),  # 字符数
                len(text.split()),  # 词数
                len(text.splitlines()),  # 句子数
                sum(1 for c in text if c.isupper()),  # 大写字母数
                sum(1 for c in text if c.islower()),  # 小写字母数
                sum(1 for c in text if c.isdigit()),  # 数字数
                sum(1 for c in text if c in '.,!?;:'),  # 标点符号数
                text.count(' '),  # 空格数
                len(set(text.split())) / max(len(text.split()), 1)  # 唯一词比率
            ])

        return np.array(features)

    def clean_text(self, text):
        """清洗文本。"""
        # 移除特殊字符
        text = re.sub(r'[^a-zA-Z\s]', '', text)
        # 转换为小写
        text = text.lower()
        # 移除多余的空格
        text = ' '.join(text.split())
        return text

# 使用方法
text_engineer = TextFeatureEngineer()

# TF-IDF特征
X_tfidf = text_engineer.create_tfidf_features(text_data)

# 基本特征
X_basic = text_engineer.create_basic_features(text_data)

数据归一化/标准化

标准化

from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler, MaxAbsScaler

class DataScaler:
    """缩放和归一化数据。"""

    def __init__(self, method='standard'):
        self.method = method
        self.scaler = None

        if method == 'standard':
            self.scaler = StandardScaler()
        elif method == 'minmax':
            self.scaler = MinMaxScaler()
        elif method == 'robust':
            self.scaler = RobustScaler()
        elif method == 'maxabs':
            self.scaler = MaxAbsScaler()
        else:
            raise ValueError(f"未知的缩放方法:{method}")

    def fit(self, X):
        """对数据进行拟合。"""
        self.scaler.fit(X)
        return self

    def transform(self, X):
        """使用拟合的缩放器转换数据。"""
        return self.scaler.transform(X)

    def fit_transform(self, X):
        """一步完成拟合和转换。"""
        return self.scaler.fit_transform(X)

    def inverse_transform(self, X):
        """逆向转换缩放数据。"""
        return self.scaler.inverse_transform(X)

# 使用方法
scaler = DataScaler(method='standard')
X_scaled = scaler.fit_transform(X_train)

# 转换测试数据
X_test_scaled = scaler.transform(X_test)

归一化

from sklearn.preprocessing import Normalizer

class DataNormalizer:
    """单独归一化样本。"""

    def __init__(self, norm='l2'):
        self.norm = norm
        self.normalizer = Normalizer(norm=norm)

    def fit(self, X):
        """对归一化器进行拟合(对Normalizer来说是无操作)。"""
        return self

    def transform(self, X):
        """使用归一化器转换数据。"""
        return self.normalizer.transform(X)

    def fit_transform(self, X):
        """一步完成拟合和转换。"""
        return self.normalizer.fit_transform(X)

# 使用方法
normalizer = DataNormalizer(norm='l2')
X_normalized = normalizer.fit_transform(X_train)

编码分类变量

标签编码

from sklearn.preprocessing import LabelEncoder
import pandas as pd

class CategoricalEncoder:
    """编码分类变量。"""

    def __init__(self, method='label'):
        self.method = method
        self.encoders = {}
        self.onehot_encoder = None

    def fit(self, X, categorical_cols=None):
        """对数据进行拟合。"""
        if isinstance(X, pd.DataFrame):
            if categorical_cols is None:
                categorical_cols = X.select_dtypes(include=['object', 'category']).columns

            for col in categorical_cols:
                if self.method == 'label':
                    encoder = LabelEncoder()
                    encoder.fit(X[col].astype(str))
                    self.encoders[col] = encoder
                elif self.method == 'onehot':
                    from sklearn.preprocessing import OneHotEncoder
                    self.onehot_encoder = OneHotEncoder(
                        sparse_output=False,
                        handle_unknown='ignore'
                    )
                    self.onehot_encoder.fit(X[categorical_cols])
        else:
            raise ValueError("CategoricalEncoder仅支持pandas DataFrames")

        return self

    def transform(self, X):
        """使用拟合的编码器转换数据。"""
        if isinstance(X, pd.DataFrame):
            X_transformed = X.copy()

            if self.method == 'label':
                for col, encoder in self.encoders.items():
                    X_transformed[col] = encoder.transform(X[col].astype(str))
            elif self.method == 'onehot' and self.onehot_encoder is not None:
                categorical_cols = list(self.encoders.keys())
                onehot_features = self.onehot_encoder.transform(X[categorical_cols])
                feature_names = self.onehot_encoder.get_feature_names_out(categorical_cols)

                # 删除原始分类列
                X_transformed = X_transformed.drop(columns=categorical_cols)

                # 添加独热编码列
                onehot_df = pd.DataFrame(onehot_features, columns=feature_names, index=X.index)
                X_transformed = pd.concat([X_transformed, onehot_df], axis=1)

            return X_transformed
        else:
            raise ValueError("CategoricalEncoder仅支持pandas DataFrames")

    def fit_transform(self, X, categorical_cols=None):
        """一步完成拟合和转换。"""
        return self.fit(X, categorical_cols).transform(X)

# 使用方法
encoder = CategoricalEncoder(method='label')
X_encoded = encoder.fit_transform(X_train, categorical_cols=['category_col'])

# 独热编码
encoder_onehot = CategoricalEncoder(method='onehot')
X_onehot = encoder_onehot.fit_transform(X_train, categorical_cols=['category_col'])

目标编码

from sklearn.model_selection import KFold
import numpy as np

class TargetEncoder:
    """分类变量的目标编码。"""

    def __init__(self, smoothing=1.0, min_samples_leaf=1):
        self.smoothing = smoothing
        self.min_samples_leaf = min_samples_leaf
        self.encodings = {}
        self.global_mean = None

    def fit(self, X, y, categorical_cols=None):
        """拟合目标编码器。"""
        if isinstance(X, pd.DataFrame):
            if categorical_cols is None:
                categorical_cols = X.select_dtypes(include=['object', 'category']).columns

            self.global_mean = y.mean()

            for col in categorical_cols:
                # 计算每个类别的目标均值
                category_means = y.groupby(X[col]).mean()
                category_counts = X[col].value_counts()

                # 应用平滑
                smoothing_factor = 1 / (1 + np.exp(-(category_counts - self.min_samples_leaf) / self.smoothing))
                smoothed_means = self.global_mean * (1 - smoothing_factor) + category_means * smoothing_factor

                self.encodings[col] = smoothed_means
        else:
            raise ValueError("TargetEncoder仅支持pandas DataFrames")

        return self

    def transform(self, X):
        """使用拟合的编码器转换数据。"""
        if isinstance(X, pd.DataFrame):
            X_transformed = X.copy()

            for col, encoding in self.encodings.items():
                X_transformed[f'{col}_encoded'] = X[col].map(encoding).fillna(self.global_mean)

            return X_transformed
        else:
            raise ValueError("TargetEncoder仅支持pandas DataFrames")

    def fit_transform(self, X, y, categorical_cols=None):
        """一步完成拟合和转换。"""
        return self.fit(X, y, categorical_cols).transform(X)

# 使用方法
target_encoder = TargetEncoder(smoothing=1.0, min_samples_leaf=10)
X_encoded = target_encoder.fit_transform(X_train, y_train, categorical_cols=['category_col'])

特征缩放

最小-最大缩放

from sklearn.preprocessing import MinMaxScaler

class MinMaxScalerCustom:
    """自定义最小-最大缩放。"""

    def __init__(self, feature_range=(0, 1)):
        self.feature_range = feature_range
        self.min_ = None
        self.max_ = None
        self.scale_ = None

    def fit(self, X):
        """对数据进行拟合。"""
        self.min_ = np.min(X, axis=0)
        self.max_ = np.max(X, axis=0)

        data_range = self.max_ - self.min_
        data_range[data_range == 0] = 1  # 避免除以零

        self.scale_ = (self.feature_range[1] - self.feature_range[0]) / data_range

        return self

    def transform(self, X):
        """转换数据。"""
        X_scaled = (X - self.min_) * self.scale_
        X_scaled += self.feature_range[0]
        return X_scaled

    def fit_transform(self, X):
        """一步完成拟合和转换。"""
        return self.fit(X).transform(X)

# 使用方法
scaler = MinMaxScalerCustom(feature_range=(0, 1))
X_scaled = scaler.fit_transform(X_train)

鲁棒缩放

from sklearn.preprocessing import RobustScaler

class RobustScalerCustom:
    """使用中位数和IQR的鲁棒缩放。"""

    def __init__(self, with_centering=True, with_scaling=True, quantile_range=(25.0, 75.0)):
        self.with_centering = with_centering
        self.with_scaling = with_scaling
        self.quantile_range = quantile_range
        self.center_ = None
        self.scale_ = None

    def fit(self, X):
        """对数据进行拟合。"""
        if self.with_centering:
            self.center_ = np.median(X, axis=0)

        if self.with_scaling:
            q_min, q_max = self.quantile_range
            q1 = np.percentile(X, q_min, axis=0)
            q3 = np.percentile(X, q_max, axis=0)
            iqr = q3 - q1
            iqr[iqr == 0] = 1  # 避免除以零
            self.scale_ = iqr

        return self

    def transform(self, X):
        """转换数据。"""
        X_scaled = X.copy()

        if self.with_centering:
            X_scaled -= self.center_

        if self.with_scaling:
            X_scaled /= self.scale_

        return X_scaled

    def fit_transform(self, X):
        """一步完成拟合和转换。"""
        return self.fit(X).transform(X)

# 使用方法
robust_scaler = RobustScalerCustom()
X_scaled = robust_scaler.fit_transform(X_train)

管道创建

Scikit-learn管道

from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer

def create_preprocessing_pipeline(numeric_features, categorical_features):
    """创建sklearn预处理管道。"""

    # 数值预处理
    numeric_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='mean')),
        ('scaler', StandardScaler())
    ])

    # 分类预处理
    categorical_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('onehot', OneHotEncoder(handle_unknown='ignore'))
    ])

    # 列转换器
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numeric_transformer, numeric_features),
            ('cat', categorical_transformer, categorical_features)
        ]
    )

    return preprocessor

# 使用方法
numeric_features = ['age', 'income', 'score']
categorical_features = ['gender', 'education', 'city']

preprocessor = create_preprocessing_pipeline(numeric_features, categorical_features)

# 拟合和转换
X_processed = preprocessor.fit_transform(X_train)

# 转换测试数据
X_test_processed = preprocessor.transform(X_test)

自定义管道

from sklearn.base import BaseEstimator, TransformerMixin

class CustomPreprocessor(BaseEstimator, TransformerMixin):
    """自定义预处理管道。"""

    def __init__(self, steps=None):
        self.steps = steps or []

    def add_step(self, name, transformer):
        """添加预处理步骤。"""
        self.steps.append((name, transformer))
        return self

    def fit(self, X, y=None):
        """拟合所有转换器。"""
        for name, transformer in self.steps:
            transformer.fit(X, y)
        return self

    def transform(self, X):
        """通过所有步骤转换数据。"""
        X_transformed = X.copy()
        for name, transformer in self.steps:
            X_transformed = transformer.transform(X_transformed)
        return X_transformed

    def fit_transform(self, X, y=None):
        """一步完成拟合和转换。"""
        return self.fit(X, y).transform(X)

# 使用方法
preprocessor = CustomPreprocessor()
preprocessor.add_step('missing_values', MissingValueHandler())
preprocessor.add_step('scaler', DataScaler(method='standard'))
preprocessor.add_step('encoder', CategoricalEncoder(method='label'))

X_processed = preprocessor.fit_transform(X_train)

不同数据类型的预处理

图像预处理

import torchvision.transforms as transforms
from PIL import Image
import numpy as np

class ImagePreprocessor:
    """为机器学习预处理图像。"""

    def __init__(self, image_size=(224, 224), normalize=True, augment=False):
        self.image_size = image_size
        self.normalize = normalize
        self.augment = augment

        # 基础转换
        transform_list = [
            transforms.Resize(image_size),
            transforms.ToTensor()
        ]

        # 添加归一化
        if normalize:
            transform_list.append(
                transforms.Normalize(
                    mean=[0.485, 0.456, 0.406],
                    std=[0.229, 0.224, 0.225]
                )
            )

        # 添加增强
        if augment:
            transform_list.insert(1, transforms.RandomHorizontalFlip(p=0.5))
            transform_list.insert(2, transforms.RandomRotation(degrees=15))
            transform_list.insert(3, transforms.ColorJitter(
                brightness=0.2, contrast=0.2, saturation=0.2
            ))

        self.transform = transforms.Compose(transform_list)

    def preprocess(self, image):
        """预处理单个图像。"""
        if isinstance(image, str):
            image = Image.open(image).convert('RGB')
        elif isinstance(image, np.ndarray):
            image = Image.fromarray(image)

        return self.transform(image)

    def preprocess_batch(self, images):
        """预处理一批图像。"""
        return torch.stack([self.preprocess(img) for img in images])

# 使用方法
preprocessor = ImagePreprocessor(image_size=(224, 224), augment=True)
processed_image = preprocessor.preprocess("image.jpg")

文本预处理

import re
import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer

class TextPreprocessor:
    """为机器学习预处理文本。"""

    def __init__(self, remove_stopwords=True, lemmatize=True, lowercase=True):
        self.remove_stopwords = remove_stopwords
        self.lemmatize = lemmatize
        self.lowercase = lowercase

        if remove_stopwords:
            nltk.download('stopwords')
            self.stop_words = set(stopwords.words('english'))

        if lemmatize:
            nltk.download('wordnet')
            self.lemmatizer = WordNetLemmatizer()

    def clean_text(self, text):
        """清洗文本。"""
        # 移除URL
        text = re.sub(r'http\S+', '', text)
        # 移除电子邮件地址
        text = re.sub(r'\S*@\S*', '', text)
        # 移除特殊字符
        text = re.sub(r'[^a-zA-Z\s]', '', text)
        # 移除多余的空格
        text = ' '.join(text.split())

        return text

    def tokenize(self, text):
        """分词。"""
        return text.split()

    def preprocess(self, text):
        """完整的预处理管道。"""
        # 清洗文本
        text = self.clean_text(text)

        # 小写
        if self.lowercase:
            text = text.lower()

        # 分词
        tokens = self.tokenize(text)

        # 移除停用词
        if self.remove_stopwords:
            tokens = [token for token in tokens if token not in self.stop_words]

        # 词形还原
        if self.lemmatize:
            tokens = [self.lemmatizer.lemmatize(token) for token in tokens]

        return ' '.join(tokens)

# 使用方法
preprocessor = TextPreprocessor(remove_stopwords=True, lemmatize=True)
processed_text = preprocessor.preprocess("This is a sample text for preprocessing!")

表格预处理

import pandas as pd
import numpy as np

class TabularPreprocessor:
    """为机器学习预处理表格数据。"""

    def __init__(self):
        self.numeric_features = None
        self.categorical_features = None
        self.datetime_features = None

    def identify_features(self, X):
        """识别特征类型。"""
        if isinstance(X, pd.DataFrame):
            self.numeric_features = X.select_dtypes(include=[np.number]).columns.tolist()
            self.categorical_features = X.select_dtypes(include=['object', 'category']).columns.tolist()
            self.datetime_features = X.select_dtypes(include=['datetime64']).columns.tolist()

        return {
            'numeric': self.numeric_features,
            'categorical': self.categorical_features,
            'datetime': self.datetime_features
        }

    def preprocess(self, X, handle_missing=True, scale=True, encode=True):
        """完整的预处理管道。"""
        X_processed = X.copy()

        # 识别特征
        feature_types = self.identify_features(X)

        # 处理缺失值
        if handle_missing:
            handler = MissingValueHandler()
            X_processed = handler.fit_transform(X_processed)

        # 编码分类变量
        if encode and self.categorical_features:
            encoder = CategoricalEncoder(method='onehot')
            X_processed = encoder.fit_transform(X_processed, self.categorical_features)

        # 缩放数值特征
        if scale and self.numeric_features:
            scaler = DataScaler(method='standard')
            if isinstance(X_processed, pd.DataFrame):
                numeric_cols = X_processed.select_dtypes(include=[np.number]).columns
                X_processed[numeric_cols] = scaler.fit_transform(X_processed[numeric_cols])
            else:
                X_processed = scaler.fit_transform(X_processed)

        return X_processed

# 使用方法
preprocessor = TabularPreprocessor()
X_processed = preprocessor.preprocess(X_train)

可重现性

随机种子设置

import random
import numpy as np
import torch

def set_seed(seed=42):
    """设置随机种子以确保可重现性。"""
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

# 使用方法
set_seed(42)

确定性预处理

class DeterministicPreprocessor:
    """确定性预处理以确保可重现性。"""

    def __init__(self, seed=42):
        self.seed = seed
        set_seed(seed)

    def train_test_split(self, X, y, test_size=0.2, random_state=None):
        """确定性训练-测试拆分。"""
        from sklearn.model_selection import train_test_split
        return train_test_split(
            X, y,
            test_size=test_size,
            random_state=random_state or self.seed,
            stratify=y
        )

    def kfold_split(self, X, y, n_splits=5, random_state=None):
        """确定性K折拆分。"""
        from sklearn.model_selection import StratifiedKFold
        kfold = StratifiedKFold(
            n_splits=n_splits,
            shuffle=True,
            random_state=random_state or self.seed
        )
        return kfold.split(X, y)

# 使用方法
preprocessor = DeterministicPreprocessor(seed=42)
X_train, X_test, y_train, y_test = preprocessor.train_test_split(X, y)

测试预处理

单元测试

import unittest
import pandas as pd
import numpy as np

class TestPreprocessing(unittest.TestCase):
    """预处理的单元测试。"""

    def setUp(self):
        """设置测试数据。"""
        self.X = pd.DataFrame({
            'numeric': [1, 2, 3, 4, 5],
            'categorical': ['A', 'B', 'A', 'B', 'A'],
            'missing': [1, np.nan, 3, np.nan, 5]
        })

    def test_missing_value_handler(self):
        """测试缺失值处理程序。"""
        handler = MissingValueHandler()
        X_clean = handler.fit_transform(self.X)

        # 检查没有缺失值
        self.assertFalse(X_clean.isnull().any().any())

    def test_data_scaler(self):
        """测试数据缩放器。"""
        scaler = DataScaler(method='standard')
        X_scaled = scaler.fit_transform(self.X[['numeric']])

        # 检查均值大约为0
        self.assertAlmostEqual(X_scaled.mean(), 0, places=5)

        # 检查标准差大约为1
        self.assertAlmostEqual(X_scaled.std(), 1, places=5)

    def test_categorical_encoder(self):
        """测试分类编码器。"""
        encoder = CategoricalEncoder(method='label')
        X_encoded = encoder.fit_transform(self.X, ['categorical'])

        # 检查分类列是数值型
        self.assertTrue(pd.api.types.is_numeric_dtype(X_encoded['categorical']))

if __name__ == '__main__':
    unittest.main()

最佳实践

  1. 了解您的数据

    • 执行探索性数据分析(EDA)
    • 检查数据类型和分布
    • 识别缺失值和异常值
    • 理解特征之间的关系
  2. 适当处理缺失值

    • 对数值数据使用均值/中位数插补
    • 对分类数据使用最频繁插补
    • 考虑使用KNN插补处理复杂模式
    • 文档化插补策略
  3. 检测和处理异常值

    • 对正态分布数据使用Z分数
    • 对非正态分布使用IQR
    • 在删除异常值之前考虑领域知识
    • 如适当,限制异常值而不是删除
  4. 选择正确的编码方法

    • 对序数变量使用标签编码
    • 对名义变量使用独热编码
    • 对高基数分类变量使用目标编码
    • 避免创建太多独热特征
  5. 一致地缩放特征

    • 仅在训练数据上拟合缩放器
    • 对训练和测试数据使用相同的缩放器
    • 考虑对异常值较多的数据使用鲁棒缩放
    • 存储缩放器参数以供推理
  6. 创建可重用的管道

    • 构建模块化预处理步骤
    • 使用sklearn Pipeline以确保可重现性
    • 文档化每个预处理步骤
    • 版本控制预处理代码
  7. 确保可重现性

    • 一致地设置随机种子
    • 在可能的情况下使用确定性算法
    • 保存预处理参数
    • 记录预处理步骤
  8. 测试预处理

    • 为预处理函数编写单元测试
    • 验证输出形状和类型
    • 检查数据泄露
    • 监控预处理性能
  9. 处理不同数据类型

    • 对每种数据类型使用适当的预处理
    • 考虑多模态数据预处理
    • 在转换过程中保留信息
    • 文档化数据类型处理
  10. 监控和迭代

    • 跟踪预处理对模型性能的影响
    • A/B测试不同的预处理策略
    • 监控预处理管道性能
    • 根据结果持续改进

相关技能