数据预处理
概览
数据预处理是机器学习管道中的关键步骤,它将原始数据转换为适合模型训练的格式。这项技能涵盖了数据清洗、特征工程、归一化、编码分类变量、缩放、增强、管道创建以及不同数据类型的预处理。
前提条件
- 理解Python编程
- 了解pandas和NumPy
- 熟悉scikit-learn
- 理解机器学习概念
- 基本统计知识
核心概念
数据清洗
- 缺失值:通过插补或移除处理null/NaN值
- 异常值:检测和处理极端值
- 重复项:识别和移除重复记录
- 数据验证:确保数据的完整性和一致性
特征工程
- 多项式特征:创建高阶和交互特征
- 日期/时间特征:从日期时间列中提取时间模式
- 文本特征:将文本转换为数值表示
- 比率特征:从组合中创建派生特征
归一化和缩放
- 标准化:Z分数归一化(均值=0,标准差=1)
- 最小-最大缩放:缩放到固定范围[0, 1]
- 鲁棒缩放:使用中位数和IQR进行抗异常缩放
- 归一化:将单个样本缩放到单位范数
编码分类变量
- 标签编码:将类别转换为整数
- 独热编码:为每个类别创建二进制列
- 目标编码:使用目标均值进行编码
- 序数编码:保持分类数据的顺序
实施指南
数据清洗
缺失值
import pandas as pd
import numpy as np
from sklearn.impute import SimpleImputer, KNNImputer
class MissingValueHandler:
"""处理数据集中的缺失值。"""
def __init__(self, strategy='mean', numeric_strategy='mean', categorical_strategy='most_frequent'):
self.strategy = strategy
self.numeric_strategy = numeric_strategy
self.categorical_strategy = categorical_strategy
self.numeric_imputer = None
self.categorical_imputer = None
def fit(self, X):
"""对数据进行拟合。"""
if isinstance(X, pd.DataFrame):
numeric_cols = X.select_dtypes(include=[np.number]).columns
categorical_cols = X.select_dtypes(exclude=[np.number]).columns
else:
# 假设所有列都是数值型,对于numpy数组
numeric_cols = list(range(X.shape[1]))
categorical_cols = []
if len(numeric_cols) > 0:
self.numeric_imputer = SimpleImputer(strategy=self.numeric_strategy)
if isinstance(X, pd.DataFrame):
self.numeric_imputer.fit(X[numeric_cols])
else:
self.numeric_imputer.fit(X[:, numeric_cols])
if len(categorical_cols) > 0:
self.categorical_imputer = SimpleImputer(strategy=self.categorical_strategy)
self.categorical_imputer.fit(X[categorical_cols])
return self
def transform(self, X):
"""使用拟合的插补器转换数据。"""
X_transformed = X.copy()
if isinstance(X, pd.DataFrame):
numeric_cols = X.select_dtypes(include=[np.number]).columns
categorical_cols = X.select_dtypes(exclude=[np.number]).columns
if self.numeric_imputer is not None and len(numeric_cols) > 0:
X_transformed[numeric_cols] = self.numeric_imputer.transform(X[numeric_cols])
if self.categorical_imputer is not None and len(categorical_cols) > 0:
X_transformed[categorical_cols] = self.categorical_imputer.transform(X[categorical_cols])
else:
if self.numeric_imputer is not None:
X_transformed = self.numeric_imputer.transform(X)
return X_transformed
def fit_transform(self, X):
"""一步完成拟合和转换。"""
return self.fit(X).transform(X)
# 使用方法
handler = MissingValueHandler(numeric_strategy='mean', categorical_strategy='most_frequent')
X_clean = handler.fit_transform(X_train)
# KNN插补,更复杂的处理
knn_imputer = KNNImputer(n_neighbors=5)
X_knn = knn_imputer.fit_transform(X_train)
异常值
from scipy import stats
from sklearn.preprocessing import RobustScaler
class OutlierHandler:
"""检测和处理异常值。"""
@staticmethod
def z_score_detection(X, threshold=3):
"""使用Z分数检测异常值。"""
if isinstance(X, pd.DataFrame):
numeric_cols = X.select_dtypes(include=[np.number]).columns
z_scores = np.abs(stats.zscore(X[numeric_cols], nan_policy='omit'))
outliers = (z_scores > threshold).any(axis=1)
else:
z_scores = np.abs(stats.zscore(X, nan_policy='omit'))
outliers = (z_scores > threshold).any(axis=1)
return outliers
@staticmethod
def iqr_detection(X, multiplier=1.5):
"""使用IQR方法检测异常值。"""
if isinstance(X, pd.DataFrame):
numeric_cols = X.select_dtypes(include=[np.number]).columns
outliers = pd.Series(False, index=X.index)
for col in numeric_cols:
Q1 = X[col].quantile(0.25)
Q3 = X[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - multiplier * IQR
upper_bound = Q3 + multiplier * IQR
outliers |= (X[col] < lower_bound) | (X[col] > upper_bound)
else:
Q1 = np.percentile(X, 25, axis=0)
Q3 = np.percentile(X, 75, axis=0)
IQR = Q3 - Q1
lower_bound = Q1 - multiplier * IQR
upper_bound = Q3 + multiplier * IQR
outliers = ((X < lower_bound) | (X > upper_bound)).any(axis=1)
return outliers
@staticmethod
def cap_outliers(X, method='iqr', multiplier=1.5):
"""将异常值限制在边界内,而不是移除。"""
if isinstance(X, pd.DataFrame):
X_capped = X.copy()
numeric_cols = X.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
if method == 'iqr':
Q1 = X[col].quantile(0.25)
Q3 = X[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - multiplier * IQR
upper_bound = Q3 + multiplier * IQR
elif method == 'zscore':
mean = X[col].mean()
std = X[col].std()
lower_bound = mean - multiplier * std
upper_bound = mean + multiplier * std
X_capped[col] = X[col].clip(lower_bound, upper_bound)
return X_capped
else:
if method == 'iqr':
Q1 = np.percentile(X, 25, axis=0)
Q3 = np.percentile(X, 75, axis=0)
IQR = Q3 - Q1
lower_bound = Q1 - multiplier * IQR
upper_bound = Q3 + multiplier * IQR
elif method == 'zscore':
mean = np.mean(X, axis=0)
std = np.std(X, axis=0)
lower_bound = mean - multiplier * std
upper_bound = mean + multiplier * std
return np.clip(X, lower_bound, upper_bound)
# 使用方法
outlier_handler = OutlierHandler()
# 检测异常值
outliers = outlier_handler.z_score_detection(X_train, threshold=3)
# 限制异常值
X_capped = outlier_handler.cap_outliers(X_train, method='iqr')
# 移除异常值
X_clean = X_train[~outliers]
y_clean = y_train[~outliers]
重复项
class DuplicateHandler:
"""处理重复行。"""
@staticmethod
def find_duplicates(X, subset=None):
"""查找重复行。"""
if isinstance(X, pd.DataFrame):
duplicates = X.duplicated(subset=subset, keep='first')
else:
# 对于numpy数组
_, indices = np.unique(X, axis=0, return_index=True)
duplicates = np.ones(len(X), dtype=bool)
duplicates[indices] = False
return duplicates
@staticmethod
def remove_duplicates(X, y=None, subset=None):
"""移除重复行。"""
if isinstance(X, pd.DataFrame):
if y is not None:
df = X.copy()
df['target'] = y
df_clean = df.drop_duplicates(subset=subset, keep='first')
return df_clean.drop(columns=['target']), df_clean['target']
else:
return X.drop_duplicates(subset=subset, keep='first')
else:
if y is not None:
combined = np.column_stack([X, y])
_, indices = np.unique(combined, axis=0, return_index=True)
return X[indices], y[indices]
else:
_, indices = np.unique(X, axis=0, return_index=True)
return X[indices]
# 使用方法
dup_handler = DuplicateHandler()
# 查找重复项
duplicates = dup_handler.find_duplicates(X_train)
# 移除重复项
X_clean, y_clean = dup_handler.remove_duplicates(X_train, y_train)
特征工程
多项式特征
from sklearn.preprocessing import PolynomialFeatures
import numpy as np
class FeatureEngineer:
"""从现有特征创建新特征。"""
def __init__(self):
self.polynomial_features = None
def create_polynomial_features(self, X, degree=2, include_bias=False):
"""创建多项式特征。"""
self.polynomial_features = PolynomialFeatures(
degree=degree,
include_bias=include_bias,
interaction_only=False
)
return self.polynomial_features.fit_transform(X)
def create_interaction_features(self, X):
"""创建成对交互特征。"""
n_features = X.shape[1]
interaction_features = []
for i in range(n_features):
for j in range(i + 1, n_features):
interaction_features.append(X[:, i] * X[:, j])
return np.column_stack([X] + interaction_features)
def create_ratio_features(self, X, pairs):
"""从列对创建比率特征。"""
ratio_features = X.copy()
for col1, col2 in pairs:
# 避免除以零
ratio = np.divide(X[:, col1], X[:, col2],
out=np.zeros_like(X[:, col1]),
where=X[:, col2] != 0)
ratio_features = np.column_stack([ratio_features, ratio])
return ratio_features
def create_bin_features(self, X, bins=10, strategy='uniform'):
"""创建分箱特征。"""
from sklearn.preprocessing import KBinsDiscretizer
discretizer = KBinsDiscretizer(
n_bins=bins,
encode='onehot',
strategy=strategy
)
return discretizer.fit_transform(X)
# 使用方法
engineer = FeatureEngineer()
# 多项式特征
X_poly = engineer.create_polynomial_features(X_train, degree=2)
# 交互特征
X_interaction = engineer.create_interaction_features(X_train)
# 比率特征
X_ratio = engineer.create_ratio_features(X_train, pairs=[(0, 1), (0, 2)])
日期/时间特征
import pandas as pd
from datetime import datetime
class DateTimeFeatureEngineer:
"""从日期时间列中提取特征。"""
@staticmethod
def extract_features(X, datetime_cols):
"""从日期时间列中提取特征。"""
if isinstance(X, pd.DataFrame):
X_features = X.copy()
for col in datetime_cols:
if col in X.columns:
# 如果尚未转换为日期时间,则进行转换
if not pd.api.types.is_datetime64_any_dtype(X[col]):
X_features[col] = pd.to_datetime(X[col])
# 提取特征
X_features[f'{col}_year'] = X_features[col].dt.year
X_features[f'{col}_month'] = X_features[col].dt.month
X_features[f'{col}_day'] = X_features[col].dt.day
X_features[f'{col}_dayofweek'] = X_features[col].dt.dayofweek
X_features[f'{col}_dayofyear'] = X_features[col].dt.dayofyear
X_features[f'{col}_weekofyear'] = X_features[col].dt.isocalendar().week
X_features[f'{col}_hour'] = X_features[col].dt.hour
X_features[f'{col}_minute'] = X_features[col].dt.minute
X_features[f'{col}_is_weekend'] = (X_features[col].dt.dayofweek >= 5).astype(int)
X_features[f'{col}_is_month_start'] = (X_features[col].dt.day <= 7).astype(int)
X_features[f'{col}_is_month_end'] = (X_features[col].dt.day >= 24).astype(int)
# 循环特征
X_features[f'{col}_month_sin'] = np.sin(2 * np.pi * X_features[col].dt.month / 12)
X_features[f'{col}_month_cos'] = np.cos(2 * np.pi * X_features[col].dt.month / 12)
X_features[f'{col}_dayofweek_sin'] = np.sin(2 * np.pi * X_features[col].dt.dayofweek / 7)
X_features[f'{col}_dayofweek_cos'] = np.cos(2 * np.pi * X_features[col].dt.dayofweek / 7)
# 删除原始日期时间列
X_features = X_features.drop(columns=datetime_cols)
return X_features
else:
raise ValueError("DateTimeFeatureEngineer仅支持pandas DataFrames")
# 使用方法
dt_engineer = DateTimeFeatureEngineer()
X_dt_features = dt_engineer.extract_features(X_train, datetime_cols=['date_column'])
文本特征
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
import re
class TextFeatureEngineer:
"""从文本列中提取特征。"""
def __init__(self):
self.tfidf_vectorizer = None
self.count_vectorizer = None
def create_tfidf_features(self, texts, max_features=1000, ngram_range=(1, 2)):
"""创建TF-IDF特征。"""
self.tfidf_vectorizer = TfidfVectorizer(
max_features=max_features,
ngram_range=ngram_range,
stop_words='english',
lowercase=True
)
return self.tfidf_vectorizer.fit_transform(texts)
def create_count_features(self, texts, max_features=1000, ngram_range=(1, 1)):
"""创建计数特征。"""
self.count_vectorizer = CountVectorizer(
max_features=max_features,
ngram_range=ngram_range,
stop_words='english',
lowercase=True
)
return self.count_vectorizer.fit_transform(texts)
def create_basic_features(self, texts):
"""创建基本文本特征。"""
features = []
for text in texts:
# 长度特征
features.append([
len(text), # 字符数
len(text.split()), # 词数
len(text.splitlines()), # 句子数
sum(1 for c in text if c.isupper()), # 大写字母数
sum(1 for c in text if c.islower()), # 小写字母数
sum(1 for c in text if c.isdigit()), # 数字数
sum(1 for c in text if c in '.,!?;:'), # 标点符号数
text.count(' '), # 空格数
len(set(text.split())) / max(len(text.split()), 1) # 唯一词比率
])
return np.array(features)
def clean_text(self, text):
"""清洗文本。"""
# 移除特殊字符
text = re.sub(r'[^a-zA-Z\s]', '', text)
# 转换为小写
text = text.lower()
# 移除多余的空格
text = ' '.join(text.split())
return text
# 使用方法
text_engineer = TextFeatureEngineer()
# TF-IDF特征
X_tfidf = text_engineer.create_tfidf_features(text_data)
# 基本特征
X_basic = text_engineer.create_basic_features(text_data)
数据归一化/标准化
标准化
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler, MaxAbsScaler
class DataScaler:
"""缩放和归一化数据。"""
def __init__(self, method='standard'):
self.method = method
self.scaler = None
if method == 'standard':
self.scaler = StandardScaler()
elif method == 'minmax':
self.scaler = MinMaxScaler()
elif method == 'robust':
self.scaler = RobustScaler()
elif method == 'maxabs':
self.scaler = MaxAbsScaler()
else:
raise ValueError(f"未知的缩放方法:{method}")
def fit(self, X):
"""对数据进行拟合。"""
self.scaler.fit(X)
return self
def transform(self, X):
"""使用拟合的缩放器转换数据。"""
return self.scaler.transform(X)
def fit_transform(self, X):
"""一步完成拟合和转换。"""
return self.scaler.fit_transform(X)
def inverse_transform(self, X):
"""逆向转换缩放数据。"""
return self.scaler.inverse_transform(X)
# 使用方法
scaler = DataScaler(method='standard')
X_scaled = scaler.fit_transform(X_train)
# 转换测试数据
X_test_scaled = scaler.transform(X_test)
归一化
from sklearn.preprocessing import Normalizer
class DataNormalizer:
"""单独归一化样本。"""
def __init__(self, norm='l2'):
self.norm = norm
self.normalizer = Normalizer(norm=norm)
def fit(self, X):
"""对归一化器进行拟合(对Normalizer来说是无操作)。"""
return self
def transform(self, X):
"""使用归一化器转换数据。"""
return self.normalizer.transform(X)
def fit_transform(self, X):
"""一步完成拟合和转换。"""
return self.normalizer.fit_transform(X)
# 使用方法
normalizer = DataNormalizer(norm='l2')
X_normalized = normalizer.fit_transform(X_train)
编码分类变量
标签编码
from sklearn.preprocessing import LabelEncoder
import pandas as pd
class CategoricalEncoder:
"""编码分类变量。"""
def __init__(self, method='label'):
self.method = method
self.encoders = {}
self.onehot_encoder = None
def fit(self, X, categorical_cols=None):
"""对数据进行拟合。"""
if isinstance(X, pd.DataFrame):
if categorical_cols is None:
categorical_cols = X.select_dtypes(include=['object', 'category']).columns
for col in categorical_cols:
if self.method == 'label':
encoder = LabelEncoder()
encoder.fit(X[col].astype(str))
self.encoders[col] = encoder
elif self.method == 'onehot':
from sklearn.preprocessing import OneHotEncoder
self.onehot_encoder = OneHotEncoder(
sparse_output=False,
handle_unknown='ignore'
)
self.onehot_encoder.fit(X[categorical_cols])
else:
raise ValueError("CategoricalEncoder仅支持pandas DataFrames")
return self
def transform(self, X):
"""使用拟合的编码器转换数据。"""
if isinstance(X, pd.DataFrame):
X_transformed = X.copy()
if self.method == 'label':
for col, encoder in self.encoders.items():
X_transformed[col] = encoder.transform(X[col].astype(str))
elif self.method == 'onehot' and self.onehot_encoder is not None:
categorical_cols = list(self.encoders.keys())
onehot_features = self.onehot_encoder.transform(X[categorical_cols])
feature_names = self.onehot_encoder.get_feature_names_out(categorical_cols)
# 删除原始分类列
X_transformed = X_transformed.drop(columns=categorical_cols)
# 添加独热编码列
onehot_df = pd.DataFrame(onehot_features, columns=feature_names, index=X.index)
X_transformed = pd.concat([X_transformed, onehot_df], axis=1)
return X_transformed
else:
raise ValueError("CategoricalEncoder仅支持pandas DataFrames")
def fit_transform(self, X, categorical_cols=None):
"""一步完成拟合和转换。"""
return self.fit(X, categorical_cols).transform(X)
# 使用方法
encoder = CategoricalEncoder(method='label')
X_encoded = encoder.fit_transform(X_train, categorical_cols=['category_col'])
# 独热编码
encoder_onehot = CategoricalEncoder(method='onehot')
X_onehot = encoder_onehot.fit_transform(X_train, categorical_cols=['category_col'])
目标编码
from sklearn.model_selection import KFold
import numpy as np
class TargetEncoder:
"""分类变量的目标编码。"""
def __init__(self, smoothing=1.0, min_samples_leaf=1):
self.smoothing = smoothing
self.min_samples_leaf = min_samples_leaf
self.encodings = {}
self.global_mean = None
def fit(self, X, y, categorical_cols=None):
"""拟合目标编码器。"""
if isinstance(X, pd.DataFrame):
if categorical_cols is None:
categorical_cols = X.select_dtypes(include=['object', 'category']).columns
self.global_mean = y.mean()
for col in categorical_cols:
# 计算每个类别的目标均值
category_means = y.groupby(X[col]).mean()
category_counts = X[col].value_counts()
# 应用平滑
smoothing_factor = 1 / (1 + np.exp(-(category_counts - self.min_samples_leaf) / self.smoothing))
smoothed_means = self.global_mean * (1 - smoothing_factor) + category_means * smoothing_factor
self.encodings[col] = smoothed_means
else:
raise ValueError("TargetEncoder仅支持pandas DataFrames")
return self
def transform(self, X):
"""使用拟合的编码器转换数据。"""
if isinstance(X, pd.DataFrame):
X_transformed = X.copy()
for col, encoding in self.encodings.items():
X_transformed[f'{col}_encoded'] = X[col].map(encoding).fillna(self.global_mean)
return X_transformed
else:
raise ValueError("TargetEncoder仅支持pandas DataFrames")
def fit_transform(self, X, y, categorical_cols=None):
"""一步完成拟合和转换。"""
return self.fit(X, y, categorical_cols).transform(X)
# 使用方法
target_encoder = TargetEncoder(smoothing=1.0, min_samples_leaf=10)
X_encoded = target_encoder.fit_transform(X_train, y_train, categorical_cols=['category_col'])
特征缩放
最小-最大缩放
from sklearn.preprocessing import MinMaxScaler
class MinMaxScalerCustom:
"""自定义最小-最大缩放。"""
def __init__(self, feature_range=(0, 1)):
self.feature_range = feature_range
self.min_ = None
self.max_ = None
self.scale_ = None
def fit(self, X):
"""对数据进行拟合。"""
self.min_ = np.min(X, axis=0)
self.max_ = np.max(X, axis=0)
data_range = self.max_ - self.min_
data_range[data_range == 0] = 1 # 避免除以零
self.scale_ = (self.feature_range[1] - self.feature_range[0]) / data_range
return self
def transform(self, X):
"""转换数据。"""
X_scaled = (X - self.min_) * self.scale_
X_scaled += self.feature_range[0]
return X_scaled
def fit_transform(self, X):
"""一步完成拟合和转换。"""
return self.fit(X).transform(X)
# 使用方法
scaler = MinMaxScalerCustom(feature_range=(0, 1))
X_scaled = scaler.fit_transform(X_train)
鲁棒缩放
from sklearn.preprocessing import RobustScaler
class RobustScalerCustom:
"""使用中位数和IQR的鲁棒缩放。"""
def __init__(self, with_centering=True, with_scaling=True, quantile_range=(25.0, 75.0)):
self.with_centering = with_centering
self.with_scaling = with_scaling
self.quantile_range = quantile_range
self.center_ = None
self.scale_ = None
def fit(self, X):
"""对数据进行拟合。"""
if self.with_centering:
self.center_ = np.median(X, axis=0)
if self.with_scaling:
q_min, q_max = self.quantile_range
q1 = np.percentile(X, q_min, axis=0)
q3 = np.percentile(X, q_max, axis=0)
iqr = q3 - q1
iqr[iqr == 0] = 1 # 避免除以零
self.scale_ = iqr
return self
def transform(self, X):
"""转换数据。"""
X_scaled = X.copy()
if self.with_centering:
X_scaled -= self.center_
if self.with_scaling:
X_scaled /= self.scale_
return X_scaled
def fit_transform(self, X):
"""一步完成拟合和转换。"""
return self.fit(X).transform(X)
# 使用方法
robust_scaler = RobustScalerCustom()
X_scaled = robust_scaler.fit_transform(X_train)
管道创建
Scikit-learn管道
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
def create_preprocessing_pipeline(numeric_features, categorical_features):
"""创建sklearn预处理管道。"""
# 数值预处理
numeric_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='mean')),
('scaler', StandardScaler())
])
# 分类预处理
categorical_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='most_frequent')),
('onehot', OneHotEncoder(handle_unknown='ignore'))
])
# 列转换器
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
]
)
return preprocessor
# 使用方法
numeric_features = ['age', 'income', 'score']
categorical_features = ['gender', 'education', 'city']
preprocessor = create_preprocessing_pipeline(numeric_features, categorical_features)
# 拟合和转换
X_processed = preprocessor.fit_transform(X_train)
# 转换测试数据
X_test_processed = preprocessor.transform(X_test)
自定义管道
from sklearn.base import BaseEstimator, TransformerMixin
class CustomPreprocessor(BaseEstimator, TransformerMixin):
"""自定义预处理管道。"""
def __init__(self, steps=None):
self.steps = steps or []
def add_step(self, name, transformer):
"""添加预处理步骤。"""
self.steps.append((name, transformer))
return self
def fit(self, X, y=None):
"""拟合所有转换器。"""
for name, transformer in self.steps:
transformer.fit(X, y)
return self
def transform(self, X):
"""通过所有步骤转换数据。"""
X_transformed = X.copy()
for name, transformer in self.steps:
X_transformed = transformer.transform(X_transformed)
return X_transformed
def fit_transform(self, X, y=None):
"""一步完成拟合和转换。"""
return self.fit(X, y).transform(X)
# 使用方法
preprocessor = CustomPreprocessor()
preprocessor.add_step('missing_values', MissingValueHandler())
preprocessor.add_step('scaler', DataScaler(method='standard'))
preprocessor.add_step('encoder', CategoricalEncoder(method='label'))
X_processed = preprocessor.fit_transform(X_train)
不同数据类型的预处理
图像预处理
import torchvision.transforms as transforms
from PIL import Image
import numpy as np
class ImagePreprocessor:
"""为机器学习预处理图像。"""
def __init__(self, image_size=(224, 224), normalize=True, augment=False):
self.image_size = image_size
self.normalize = normalize
self.augment = augment
# 基础转换
transform_list = [
transforms.Resize(image_size),
transforms.ToTensor()
]
# 添加归一化
if normalize:
transform_list.append(
transforms.Normalize(
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]
)
)
# 添加增强
if augment:
transform_list.insert(1, transforms.RandomHorizontalFlip(p=0.5))
transform_list.insert(2, transforms.RandomRotation(degrees=15))
transform_list.insert(3, transforms.ColorJitter(
brightness=0.2, contrast=0.2, saturation=0.2
))
self.transform = transforms.Compose(transform_list)
def preprocess(self, image):
"""预处理单个图像。"""
if isinstance(image, str):
image = Image.open(image).convert('RGB')
elif isinstance(image, np.ndarray):
image = Image.fromarray(image)
return self.transform(image)
def preprocess_batch(self, images):
"""预处理一批图像。"""
return torch.stack([self.preprocess(img) for img in images])
# 使用方法
preprocessor = ImagePreprocessor(image_size=(224, 224), augment=True)
processed_image = preprocessor.preprocess("image.jpg")
文本预处理
import re
import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
class TextPreprocessor:
"""为机器学习预处理文本。"""
def __init__(self, remove_stopwords=True, lemmatize=True, lowercase=True):
self.remove_stopwords = remove_stopwords
self.lemmatize = lemmatize
self.lowercase = lowercase
if remove_stopwords:
nltk.download('stopwords')
self.stop_words = set(stopwords.words('english'))
if lemmatize:
nltk.download('wordnet')
self.lemmatizer = WordNetLemmatizer()
def clean_text(self, text):
"""清洗文本。"""
# 移除URL
text = re.sub(r'http\S+', '', text)
# 移除电子邮件地址
text = re.sub(r'\S*@\S*', '', text)
# 移除特殊字符
text = re.sub(r'[^a-zA-Z\s]', '', text)
# 移除多余的空格
text = ' '.join(text.split())
return text
def tokenize(self, text):
"""分词。"""
return text.split()
def preprocess(self, text):
"""完整的预处理管道。"""
# 清洗文本
text = self.clean_text(text)
# 小写
if self.lowercase:
text = text.lower()
# 分词
tokens = self.tokenize(text)
# 移除停用词
if self.remove_stopwords:
tokens = [token for token in tokens if token not in self.stop_words]
# 词形还原
if self.lemmatize:
tokens = [self.lemmatizer.lemmatize(token) for token in tokens]
return ' '.join(tokens)
# 使用方法
preprocessor = TextPreprocessor(remove_stopwords=True, lemmatize=True)
processed_text = preprocessor.preprocess("This is a sample text for preprocessing!")
表格预处理
import pandas as pd
import numpy as np
class TabularPreprocessor:
"""为机器学习预处理表格数据。"""
def __init__(self):
self.numeric_features = None
self.categorical_features = None
self.datetime_features = None
def identify_features(self, X):
"""识别特征类型。"""
if isinstance(X, pd.DataFrame):
self.numeric_features = X.select_dtypes(include=[np.number]).columns.tolist()
self.categorical_features = X.select_dtypes(include=['object', 'category']).columns.tolist()
self.datetime_features = X.select_dtypes(include=['datetime64']).columns.tolist()
return {
'numeric': self.numeric_features,
'categorical': self.categorical_features,
'datetime': self.datetime_features
}
def preprocess(self, X, handle_missing=True, scale=True, encode=True):
"""完整的预处理管道。"""
X_processed = X.copy()
# 识别特征
feature_types = self.identify_features(X)
# 处理缺失值
if handle_missing:
handler = MissingValueHandler()
X_processed = handler.fit_transform(X_processed)
# 编码分类变量
if encode and self.categorical_features:
encoder = CategoricalEncoder(method='onehot')
X_processed = encoder.fit_transform(X_processed, self.categorical_features)
# 缩放数值特征
if scale and self.numeric_features:
scaler = DataScaler(method='standard')
if isinstance(X_processed, pd.DataFrame):
numeric_cols = X_processed.select_dtypes(include=[np.number]).columns
X_processed[numeric_cols] = scaler.fit_transform(X_processed[numeric_cols])
else:
X_processed = scaler.fit_transform(X_processed)
return X_processed
# 使用方法
preprocessor = TabularPreprocessor()
X_processed = preprocessor.preprocess(X_train)
可重现性
随机种子设置
import random
import numpy as np
import torch
def set_seed(seed=42):
"""设置随机种子以确保可重现性。"""
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
# 使用方法
set_seed(42)
确定性预处理
class DeterministicPreprocessor:
"""确定性预处理以确保可重现性。"""
def __init__(self, seed=42):
self.seed = seed
set_seed(seed)
def train_test_split(self, X, y, test_size=0.2, random_state=None):
"""确定性训练-测试拆分。"""
from sklearn.model_selection import train_test_split
return train_test_split(
X, y,
test_size=test_size,
random_state=random_state or self.seed,
stratify=y
)
def kfold_split(self, X, y, n_splits=5, random_state=None):
"""确定性K折拆分。"""
from sklearn.model_selection import StratifiedKFold
kfold = StratifiedKFold(
n_splits=n_splits,
shuffle=True,
random_state=random_state or self.seed
)
return kfold.split(X, y)
# 使用方法
preprocessor = DeterministicPreprocessor(seed=42)
X_train, X_test, y_train, y_test = preprocessor.train_test_split(X, y)
测试预处理
单元测试
import unittest
import pandas as pd
import numpy as np
class TestPreprocessing(unittest.TestCase):
"""预处理的单元测试。"""
def setUp(self):
"""设置测试数据。"""
self.X = pd.DataFrame({
'numeric': [1, 2, 3, 4, 5],
'categorical': ['A', 'B', 'A', 'B', 'A'],
'missing': [1, np.nan, 3, np.nan, 5]
})
def test_missing_value_handler(self):
"""测试缺失值处理程序。"""
handler = MissingValueHandler()
X_clean = handler.fit_transform(self.X)
# 检查没有缺失值
self.assertFalse(X_clean.isnull().any().any())
def test_data_scaler(self):
"""测试数据缩放器。"""
scaler = DataScaler(method='standard')
X_scaled = scaler.fit_transform(self.X[['numeric']])
# 检查均值大约为0
self.assertAlmostEqual(X_scaled.mean(), 0, places=5)
# 检查标准差大约为1
self.assertAlmostEqual(X_scaled.std(), 1, places=5)
def test_categorical_encoder(self):
"""测试分类编码器。"""
encoder = CategoricalEncoder(method='label')
X_encoded = encoder.fit_transform(self.X, ['categorical'])
# 检查分类列是数值型
self.assertTrue(pd.api.types.is_numeric_dtype(X_encoded['categorical']))
if __name__ == '__main__':
unittest.main()
最佳实践
-
了解您的数据
- 执行探索性数据分析(EDA)
- 检查数据类型和分布
- 识别缺失值和异常值
- 理解特征之间的关系
-
适当处理缺失值
- 对数值数据使用均值/中位数插补
- 对分类数据使用最频繁插补
- 考虑使用KNN插补处理复杂模式
- 文档化插补策略
-
检测和处理异常值
- 对正态分布数据使用Z分数
- 对非正态分布使用IQR
- 在删除异常值之前考虑领域知识
- 如适当,限制异常值而不是删除
-
选择正确的编码方法
- 对序数变量使用标签编码
- 对名义变量使用独热编码
- 对高基数分类变量使用目标编码
- 避免创建太多独热特征
-
一致地缩放特征
- 仅在训练数据上拟合缩放器
- 对训练和测试数据使用相同的缩放器
- 考虑对异常值较多的数据使用鲁棒缩放
- 存储缩放器参数以供推理
-
创建可重用的管道
- 构建模块化预处理步骤
- 使用sklearn Pipeline以确保可重现性
- 文档化每个预处理步骤
- 版本控制预处理代码
-
确保可重现性
- 一致地设置随机种子
- 在可能的情况下使用确定性算法
- 保存预处理参数
- 记录预处理步骤
-
测试预处理
- 为预处理函数编写单元测试
- 验证输出形状和类型
- 检查数据泄露
- 监控预处理性能
-
处理不同数据类型
- 对每种数据类型使用适当的预处理
- 考虑多模态数据预处理
- 在转换过程中保留信息
- 文档化数据类型处理
-
监控和迭代
- 跟踪预处理对模型性能的影响
- A/B测试不同的预处理策略
- 监控预处理管道性能
- 根据结果持续改进