Python回归与统计分析Skill python-regression-statistics

这个技能用于在Python中进行回归分析、统计建模和异常值检测,提供完整的统计推断和机器学习方法,包括使用statsmodels、scikit-learn等库进行模型诊断、假设检验和预测建模。适用于数据科学和人工智能项目,关键词:Python回归分析,统计建模,异常值检测,数据科学,预测建模。

预测建模 1 次安装 3 次浏览 更新于 3/12/2026

name: python-regression-statistics description: 使用statsmodels、scikit-learn、scipy和PyOD进行回归分析、统计建模和异常值检测的专家指导 - 包括模型诊断、假设检验、稳健方法和全面的异常值检测策略 allowed-tools:

  • “*”

Python回归与统计分析

Python中回归分析、统计建模和异常值检测的专家助手。该技能涵盖statsmodels用于统计回归,scikit-learn用于机器学习方法,scipy用于统计方法,以及PyOD用于全面的异常值检测。

何时使用此技能

  • 使用适当的统计推断拟合线性和非线性回归模型
  • 检查和验证回归假设(LINE:线性、独立性、正态性、等方差)
  • 执行模型诊断:残差分析、影响点、多重共线性
  • 使用统计、基于邻近性和集成方法检测和处理异常值
  • 比较统计与机器学习回归方法
  • 构建具有适当验证的稳健回归管道
  • 时间序列建模和预测
  • 广义线性模型(GLM)用于非正态响应变量

哲学:统计与机器学习方法

统计回归(statsmodels、scipy.stats)

使用时:

  • 您需要p值、置信区间和假设检验
  • 理解哪些变量显著很重要
  • 您想因果解释系数
  • 样本量适中且需要推断保证
  • 您需要假设的诊断测试

优势:

  • 丰富的统计推断(t检验、F检验、似然比)
  • 全面的诊断和假设检验
  • 公式接口易于模型指定
  • 标准误差、置信区间、预测区间
  • 理论基础明确

限制:

  • 对复杂非线性关系不够灵活
  • 需要仔细检查假设
  • 可能不擅长处理高维数据
  • 自动化特征选择较少

机器学习回归(scikit-learn)

使用时:

  • 预测准确性是主要目标
  • 您有许多特征和复杂交互
  • 关系高度非线性
  • 您不需要统计推断(p值)
  • 您有足够数据进行训练/测试分割

优势:

  • 处理复杂非线性模式
  • 内置正则化(Ridge、Lasso、ElasticNet)
  • 擅长高维数据
  • 自动化交叉验证和超参数调优
  • 集成方法提高准确性

限制:

  • 无自动p值或统计推断
  • 系数解释性较差
  • 复杂模型需要更多数据
  • 某些算法为黑盒

推荐工作流

# 1. 探索性数据分析
# - 可视化关系(散点图、配对图)
# - 检查分布(直方图、Q-Q图)
# - 识别潜在异常值(箱线图、散点图)
# - 计算汇总统计

# 2. 初始异常值筛查(可选)
# - 使用多种检测方法(IQR、Z得分、隔离森林)
# - 可视化标记点
# - 调查为何是异常值(数据错误 vs 真实极端值)
# - 记录保留或移除决策

# 3. 检查假设(用于统计回归)
# - 线性:残差与拟合图、偏回归图
# - 独立性:Durbin-Watson检验、自相关图
# - 正态性:Q-Q图、Shapiro-Wilk检验、残差直方图
# - 等方差:尺度-位置图、Breusch-Pagan检验

# 4. 模型拟合
# - 从简单开始(OLS)然后根据需要添加复杂度
# - 如果假设违反,尝试变换(对数、Box-Cox)
# - 如果存在异常值,考虑稳健方法
# - 如果预测变量多,使用正则化(Ridge/Lasso)

# 5. 模型诊断
# - 残差分析
# - 检查影响点(Cook's D、DFFITS、杠杆)
# - 多重共线性(VIF)
# - 预测空间与响应空间的异常值

# 6. 验证
# - 训练/测试分割或交叉验证
# - 比较多个模型(AIC、BIC、R²、RMSE)
# - 检查预测区间(统计)或残差分布(ML)

# 7. 解释与报告
# - 系数解释带置信区间
# - 特征重要性(如果ML模型)
# - 可视化预测与实际
# - 报告诊断和假设检查

使用statsmodels进行统计回归

普通最小二乘法(OLS)

import statsmodels.api as sm
import statsmodels.formula.api as smf
import numpy as np
import pandas as pd

# 数组API - 更多控制
X = sm.add_constant(X_raw)  # 添加截距
model = sm.OLS(y, X)
results = model.fit()
print(results.summary())

# 公式API - 更易指定
df = pd.DataFrame({'y': y, 'x1': x1, 'x2': x2, 'category': cat})
model = smf.ols('y ~ x1 + x2 + C(category)', data=df)
results = model.fit()

# 访问关键结果
results.params           # 系数
results.pvalues         # P值
results.conf_int()      # 置信区间
results.rsquared        # R²
results.rsquared_adj    # 调整R²
results.aic, results.bic  # 信息准则
results.resid           # 残差
results.fittedvalues    # 预测值

加权最小二乘法(WLS)

# 当异方差性存在时
# 权重 = 每个观测值的方差的倒数

# 如果您知道方差结构
weights = 1 / variance_array
model = sm.WLS(y, X, weights=weights)
results = model.fit()

# 迭代重加权最小二乘法
# 从OLS开始,估计方差函数,重新拟合
ols_results = sm.OLS(y, X).fit()
resid_squared = ols_results.resid**2

# 拟合方差作为预测变量的函数
variance_model = sm.OLS(np.log(resid_squared), X).fit()
weights = 1 / np.exp(variance_model.fittedvalues)

wls_results = sm.WLS(y, X, weights=weights).fit()

稳健回归(RLM)

# 抵抗异常值 - 降低影响点的权重
import statsmodels.robust.robust_linear_model as rlm

# Huber's T 范数(默认)
model = rlm.RLM(y, X, M=sm.robust.norms.HuberT())
results = model.fit()

# 其他M估计器
# TukeyBiweight - 更积极降低异常值权重
model = rlm.RLM(y, X, M=sm.robust.norms.TukeyBiweight())
results = model.fit()

# 检查分配给每个观测值的权重
weights = results.weights  # 异常值获得较低权重

广义线性模型(GLM)

# 用于非正态响应变量

# 泊松回归(计数数据)
model = sm.GLM(y, X, family=sm.families.Poisson())
results = model.fit()

# 负二项式(过离散计数)
model = sm.GLM(y, X, family=sm.families.NegativeBinomial())
results = model.fit()

# Gamma回归(正连续数据)
model = sm.GLM(y, X, family=sm.families.Gamma())
results = model.fit()

# 二项式/逻辑回归
model = sm.GLM(y, X, family=sm.families.Binomial())
results = model.fit()

# 链接函数
# family=sm.families.Gaussian(link=sm.families.links.log())

时间序列模型

from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.tsa.vector_ar.var_model import VAR

# ARIMA(自回归积分移动平均)
model = ARIMA(y, order=(p, d, q))  # p=AR阶数, d=差分阶数, q=MA阶数
results = model.fit()

# 季节性ARIMA
model = SARIMAX(y, order=(p, d, q), seasonal_order=(P, D, Q, s))
results = model.fit()

# 预测
forecast = results.forecast(steps=10)
forecast_with_ci = results.get_forecast(steps=10)
forecast_df = forecast_with_ci.summary_frame()

# 向量自回归(多变量时间序列)
model = VAR(df[['y1', 'y2', 'y3']])
results = model.fit(maxlags=5)
results.forecast(df.values[-results.k_ar:], steps=10)

模型诊断 - statsmodels

# 全面诊断图
from statsmodels.graphics.gofplots import qqplot
import matplotlib.pyplot as plt

# 1. 残差与拟合
plt.scatter(results.fittedvalues, results.resid)
plt.axhline(y=0, color='r', linestyle='--')
plt.xlabel('拟合值')
plt.ylabel('残差')
# 应显示随机散点,无模式

# 2. Q-Q图用于正态性
qqplot(results.resid, line='s')
# 点应沿对角线

# 3. 尺度-位置(异方差性)
plt.scatter(results.fittedvalues, np.sqrt(np.abs(results.resid_pearson)))
plt.xlabel('拟合值')
plt.ylabel('√|标准化残差|')
# 应显示恒定扩散

# 4. 残差与杠杆
from statsmodels.stats.outliers_influence import OLSInfluence
influence = OLSInfluence(results)
fig, ax = plt.subplots()
ax.scatter(influence.hat_matrix_diag, results.resid_pearson)
# 识别影响点(高杠杆 + 大残差)

# 统计检验
from statsmodels.stats.diagnostic import het_breuschpagan, acorr_ljungbox
from statsmodels.stats.stattools import durbin_watson

# 异方差性检验
lm_stat, lm_pval, f_stat, f_pval = het_breuschpagan(results.resid, results.model.exog)
print(f"Breusch-Pagan p值: {lm_pval}")  # p < 0.05 表示异方差性

# 自相关
dw = durbin_watson(results.resid)
print(f"Durbin-Watson: {dw}")  # 应约2表示无自相关

# 自相关检验
lb_test = acorr_ljungbox(results.resid, lags=10)

# 正态性检验
from scipy.stats import shapiro
stat, pval = shapiro(results.resid)
print(f"Shapiro-Wilk p值: {pval}")  # p < 0.05 表示非正态性

影响点和多重共线性

from statsmodels.stats.outliers_influence import variance_inflation_factor, OLSInfluence

# 方差膨胀因子(多重共线性)
vif_data = pd.DataFrame()
vif_data["变量"] = X.columns
vif_data["VIF"] = [variance_inflation_factor(X.values, i) for i in range(X.shape[1])]
print(vif_data)
# VIF > 10 表示严重多重共线性
# VIF > 5 令人担忧

# 影响度量
influence = OLSInfluence(results)

# Cook's距离(总体影响)
cooks_d = influence.cooks_distance[0]
# 经验法则: D > 4/n 是影响点

# DFFITS(对拟合值的影响)
dffits = influence.dffits[0]
# 经验法则: |DFFITS| > 2√(p/n) 是影响点

# 杠杆(帽子值)
leverage = influence.hat_matrix_diag
# 经验法则: h > 2p/n 或 h > 3p/n 是高杠杆

# 学生化残差
student_resid = influence.resid_studentized_internal
# |r| > 2 或 3 表示响应空间异常值

# 汇总表
influence_summary = influence.summary_frame()
print(influence_summary)

使用scikit-learn进行机器学习回归

带正则化的线性模型

from sklearn.linear_model import (
    LinearRegression, Ridge, Lasso, ElasticNet,
    Lars, LassoLars, BayesianRidge, HuberRegressor,
    RANSACRegressor, TheilSenRegressor
)
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.model_selection import cross_val_score, GridSearchCV

# 标准线性回归
model = LinearRegression()
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
model.coef_, model.intercept_

# Ridge回归(L2正则化)
# 当许多相关预测变量时好
ridge = Ridge(alpha=1.0)  # 更高的alpha = 更多正则化
ridge.fit(X_train, y_train)

# Lasso回归(L1正则化)
# 通过设系数为零执行特征选择
lasso = Lasso(alpha=0.1)
lasso.fit(X_train, y_train)
# 检查哪些特征被选中
selected_features = X.columns[lasso.coef_ != 0]

# ElasticNet(L1 + L2)
# l1_ratio=1.0 是Lasso,l1_ratio=0.0 是Ridge
elastic = ElasticNet(alpha=0.1, l1_ratio=0.5)
elastic.fit(X_train, y_train)

# Huber回归(抵抗异常值)
huber = HuberRegressor(epsilon=1.35)
huber.fit(X_train, y_train)

# RANSAC(极其稳健,找到内点子集)
ransac = RANSACRegressor(min_samples=0.5, residual_threshold=2.0)
ransac.fit(X_train, y_train)
inlier_mask = ransac.inlier_mask_

# Theil-Sen(稳健,适合小数据集)
theil = TheilSenRegressor()
theil.fit(X_train, y_train)

交叉验证和超参数调优

from sklearn.model_selection import (
    cross_val_score, cross_validate,
    GridSearchCV, RandomizedSearchCV,
    KFold, RepeatedKFold
)
from sklearn.metrics import (
    mean_squared_error, mean_absolute_error,
    r2_score, mean_absolute_percentage_error
)

# 简单交叉验证
scores = cross_val_score(model, X, y, cv=5,
                         scoring='neg_mean_squared_error')
rmse_scores = np.sqrt(-scores)
print(f"RMSE: {rmse_scores.mean():.3f} (+/- {rmse_scores.std():.3f})")

# 多个指标
cv_results = cross_validate(
    model, X, y, cv=5,
    scoring=['neg_mean_squared_error', 'r2', 'neg_mean_absolute_error'],
    return_train_score=True
)

# 网格搜索最优超参数
param_grid = {
    'alpha': [0.001, 0.01, 0.1, 1.0, 10.0],
    'l1_ratio': [0.1, 0.3, 0.5, 0.7, 0.9]
}
grid_search = GridSearchCV(
    ElasticNet(),
    param_grid,
    cv=5,
    scoring='neg_mean_squared_error',
    n_jobs=-1
)
grid_search.fit(X_train, y_train)
print(f"最佳参数: {grid_search.best_params_}")
print(f"最佳CV分数: {np.sqrt(-grid_search.best_score_):.3f}")

# 使用最佳模型
best_model = grid_search.best_estimator_

树基和集成方法

from sklearn.ensemble import (
    RandomForestRegressor, GradientBoostingRegressor,
    AdaBoostRegressor, BaggingRegressor, ExtraTreesRegressor
)
from sklearn.tree import DecisionTreeRegressor

# 随机森林
rf = RandomForestRegressor(
    n_estimators=100,
    max_depth=10,
    min_samples_split=5,
    min_samples_leaf=2,
    random_state=42,
    n_jobs=-1
)
rf.fit(X_train, y_train)

# 特征重要性
importance_df = pd.DataFrame({
    '特征': X.columns,
    '重要性': rf.feature_importances_
}).sort_values('重要性', ascending=False)

# 梯度提升
gb = GradientBoostingRegressor(
    n_estimators=100,
    learning_rate=0.1,
    max_depth=3,
    subsample=0.8,
    random_state=42
)
gb.fit(X_train, y_train)

# XGBoost(如果已安装)
try:
    import xgboost as xgb
    xgb_model = xgb.XGBRegressor(
        n_estimators=100,
        learning_rate=0.1,
        max_depth=5,
        random_state=42
    )
    xgb_model.fit(X_train, y_train)
except ImportError:
    print("XGBoost未安装。使用: pip install xgboost")

其他ML回归方法

from sklearn.svm import SVR
from sklearn.kernel_ridge import KernelRidge
from sklearn.neighbors import KNeighborsRegressor
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF, WhiteKernel

# 支持向量回归
svr = SVR(kernel='rbf', C=1.0, epsilon=0.1)
svr.fit(X_train, y_train)

# 核Ridge回归
kr = KernelRidge(alpha=1.0, kernel='rbf')
kr.fit(X_train, y_train)

# K-最近邻
knn = KNeighborsRegressor(n_neighbors=5, weights='distance')
knn.fit(X_train, y_train)

# 高斯过程(带不确定性量化)
kernel = RBF(length_scale=1.0) + WhiteKernel(noise_level=1.0)
gp = GaussianProcessRegressor(kernel=kernel, random_state=42)
gp.fit(X_train, y_train)

# 带不确定性的预测
y_pred, sigma = gp.predict(X_test, return_std=True)

构建完整管道

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
from sklearn.compose import TransformedTargetRegressor

# 带标准化的标准管道
pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('model', Ridge(alpha=1.0))
])
pipeline.fit(X_train, y_train)

# 多项式特征
poly_pipeline = Pipeline([
    ('poly', PolynomialFeatures(degree=2, include_bias=False)),
    ('scaler', StandardScaler()),
    ('model', Ridge(alpha=1.0))
])

# 目标变换(例如,对数变换y)
log_model = TransformedTargetRegressor(
    regressor=Ridge(alpha=1.0),
    func=np.log1p,
    inverse_func=np.expm1
)
log_model.fit(X_train, y_train)

异常值检测

统计方法(scipy、statsmodels)

import numpy as np
from scipy import stats

# Z得分方法
def detect_outliers_zscore(data, threshold=3):
    """
    异常值是|z-score| > threshold的点
    适用于正态分布数据
    """
    z_scores = np.abs(stats.zscore(data))
    return z_scores > threshold

# 修改的Z得分(MAD - 中位数绝对偏差)
def detect_outliers_mad(data, threshold=3.5):
    """
    比Z得分更稳健,使用中位数而非均值
    推荐阈值: 3.5
    """
    median = np.median(data)
    mad = np.median(np.abs(data - median))
    modified_z_scores = 0.6745 * (data - median) / mad
    return np.abs(modified_z_scores) > threshold

# IQR方法
def detect_outliers_iqr(data, factor=1.5):
    """
    异常值是低于Q1 - factor*IQR或高于Q3 + factor*IQR的点
    factor=1.5: 异常值,factor=3.0: 极端异常值
    """
    q1 = np.percentile(data, 25)
    q3 = np.percentile(data, 75)
    iqr = q3 - q1
    lower_bound = q1 - factor * iqr
    upper_bound = q3 + factor * iqr
    return (data < lower_bound) | (data > upper_bound)

# Grubbs检验(用于正态数据中的单个异常值)
def grubbs_test(data, alpha=0.05):
    """
    检验最大值或最小值是否为异常值
    假设数据正态分布
    """
    from scipy.stats import t
    n = len(data)
    mean = np.mean(data)
    std = np.std(data, ddof=1)

    # 检验统计量
    G_max = np.max(np.abs(data - mean)) / std

    # 临界值
    t_crit = t.ppf(1 - alpha / (2 * n), n - 2)
    G_crit = ((n - 1) / np.sqrt(n)) * np.sqrt(t_crit**2 / (n - 2 + t_crit**2))

    return G_max > G_crit

# 示例使用
data = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 100])  # 100是异常值
outliers_z = detect_outliers_zscore(data)
outliers_mad = detect_outliers_mad(data)
outliers_iqr = detect_outliers_iqr(data)

PyOD - 全面异常值检测

PyOD (Python Outlier Detection) 提供40+算法。安装命令: pip install pyod

from pyod.models.knn import KNN
from pyod.models.lof import LOF
from pyod.models.iforest import IForest
from pyod.models.ocsvm import OCSVM
from pyod.models.pca import PCA as PCA_OD
from pyod.models.mcd import MCD
from pyod.models.abod import ABOD
from pyod.models.copod import COPOD
from pyod.models.ecod import ECOD
from pyod.models.combination import average, maximization

# PyOD检测器的通用模式
detector = KNN(contamination=0.1)  # 预期10%异常值
detector.fit(X)

# 获取异常值标签(0=内点,1=异常值)
outlier_labels = detector.labels_

# 获取异常值得分(更高 = 更异常)
outlier_scores = detector.decision_scores_

# 对新数据预测
new_labels = detector.predict(X_new)
new_scores = detector.decision_function(X_new)

基于邻近的检测器

# K-最近邻(KNN)
# 简单、快速、好基线
knn = KNN(contamination=0.1, n_neighbors=5, method='mean')
knn.fit(X)

# 局部异常因子(LOF)
# 捕捉局部密度偏差
# 适用于变化密度数据集
lof = LOF(contamination=0.1, n_neighbors=20)
lof.fit(X)

# 基于连接的异常因子(COF)
from pyod.models.cof import COF
cof = COF(contamination=0.1, n_neighbors=20)
cof.fit(X)

概率检测器

# COPOD(基于Copula的异常值检测)
# 快速、无参数,在高维数据上表现好
copod = COPOD(contamination=0.1)
copod.fit(X)

# ECOD(经验累积分布)
# 非常快、无参数,适合表格数据
ecod = ECOD(contamination=0.1)
ecod.fit(X)

# ABOD(基于角度的异常值检测)
# 在高维空间有效
# 大数据集上慢
abod = ABOD(contamination=0.1)
abod.fit(X)

线性模型检测器

# 基于PCA的检测
# 异常值具有大重构误差
pca = PCA_OD(contamination=0.1, n_components=None)
pca.fit(X)

# MCD(最小协方差行列式)
# 稳健协方差估计
# 假设数据大致高斯分布
mcd = MCD(contamination=0.1)
mcd.fit(X)

# 一类SVM
# 学习正常点边界
ocsvm = OCSVM(contamination=0.1, kernel='rbf', nu=0.1)
ocsvm.fit(X)

隔离基检测器

# 隔离森林
# 快速、可扩展,对高维数据有效
# 通过随机划分隔离异常值
iforest = IForest(
    contamination=0.1,
    n_estimators=100,
    max_features=1.0,
    random_state=42
)
iforest.fit(X)

集成方法

# 组合多个检测器
from pyod.models.lscp import LSCP
from pyod.models.feature_bagging import FeatureBagging

# 特征打包
# 在随机特征子集上训练多个检测器
fb = FeatureBagging(
    base_estimator=LOF(contamination=0.1),
    n_estimators=10,
    contamination=0.1,
    random_state=42
)
fb.fit(X)

# 平均多个检测器得分
detectors = [KNN(), LOF(), IForest(), COPOD()]
scores_list = []
for detector in detectors:
    detector.fit(X)
    scores_list.append(detector.decision_scores_)

# 平均组合
avg_scores = average(scores_list)

# 最大组合(更保守)
max_scores = maximization(scores_list)

scikit-learn异常值检测

from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.covariance import EllipticEnvelope
from sklearn.svm import OneClassSVM

# 隔离森林
iso_forest = IsolationForest(
    contamination=0.1,
    random_state=42,
    n_estimators=100
)
outlier_labels = iso_forest.fit_predict(X)  # -1=异常值,1=内点
outlier_scores = iso_forest.score_samples(X)  # 更负 = 更异常

# 局部异常因子(新颖性检测模式)
lof = LocalOutlierFactor(
    n_neighbors=20,
    contamination=0.1,
    novelty=True  # 启用对新数据预测
)
lof.fit(X_train)
outlier_labels = lof.predict(X_test)
outlier_scores = lof.score_samples(X_test)

# 椭圆包络(假设高斯分布)
elliptic = EllipticEnvelope(contamination=0.1, random_state=42)
outlier_labels = elliptic.fit_predict(X)

# 一类SVM
oc_svm = OneClassSVM(kernel='rbf', gamma='auto', nu=0.1)
outlier_labels = oc_svm.fit_predict(X)

选择正确的异常值检测方法

使用IQR或Z得分当:

  • 单变量数据(单个变量)
  • 需要快速筛查
  • 可解释性重要

使用修改的Z得分(MAD)当:

  • 数据可能已包含异常值(MAD稳健)
  • 分布大致对称

使用隔离森林当:

  • 高维数据
  • 无数据分布假设
  • 需要快速计算
  • 通用好选择

使用LOF当:

  • 数据中变化密度簇
  • 局部结构重要
  • 愿意调优n_neighbors参数

使用COPOD/ECOD当:

  • 需要非常快速检测
  • 高维表格数据
  • 不想调优参数

使用基于PCA的当:

  • 数据位于低维流形
  • 想理解哪些特征贡献异常性

使用MCD当:

  • 多变量高斯假设合理
  • 需要稳健协方差估计

使用集成方法当:

  • 需要最大稳健性
  • 能承受计算成本
  • 不同检测器类型不同意

完整工作流示例

示例1:带完整诊断的统计回归

import pandas as pd
import numpy as np
import statsmodels.api as sm
import statsmodels.formula.api as smf
from statsmodels.graphics.gofplots import qqplot
from statsmodels.stats.outliers_influence import variance_inflation_factor, OLSInfluence
from statsmodels.stats.diagnostic import het_breuschpagan
from statsmodels.stats.stattools import durbin_watson
import matplotlib.pyplot as plt

# 加载数据
df = pd.read_csv('data.csv')

# 1. 探索性分析
print(df.describe())
df.hist(bins=30, figsize=(15, 10))
pd.plotting.scatter_matrix(df, figsize=(15, 15))

# 2. 拟合初始模型
model = smf.ols('price ~ area + bedrooms + age + location', data=df)
results = model.fit()
print(results.summary())

# 3. 检查假设
fig, axes = plt.subplots(2, 2, figsize=(12, 10))

# 残差与拟合
axes[0, 0].scatter(results.fittedvalues, results.resid)
axes[0, 0].axhline(y=0, color='r', linestyle='--')
axes[0, 0].set_xlabel('拟合值')
axes[0, 0].set_ylabel('残差')
axes[0, 0].set_title('残差与拟合')

# Q-Q图
qqplot(results.resid, line='s', ax=axes[0, 1])
axes[0, 1].set_title('正态Q-Q')

# 尺度-位置
axes[1, 0].scatter(results.fittedvalues, np.sqrt(np.abs(results.resid_pearson)))
axes[1, 0].set_xlabel('拟合值')
axes[1, 0].set_ylabel('√|标准化残差|')
axes[1, 0].set_title('尺度-位置')

# 残差与杠杆
influence = OLSInfluence(results)
axes[1, 1].scatter(influence.hat_matrix_diag, results.resid_pearson)
axes[1, 1].set_xlabel('杠杆')
axes[1, 1].set_ylabel('标准化残差')
axes[1, 1].set_title('残差与杠杆')

plt.tight_layout()

# 4. 统计检验
# 异方差性
lm_stat, lm_pval, f_stat, f_pval = het_breuschpagan(results.resid, results.model.exog)
print(f"
Breusch-Pagan检验p值: {lm_pval:.4f}")

# 自相关
dw = durbin_watson(results.resid)
print(f"Durbin-Watson统计量: {dw:.4f}")

# 5. 检查多重共线性
X = df[['area', 'bedrooms', 'age']]  # 排除分类
X = sm.add_constant(X)
vif_data = pd.DataFrame()
vif_data["变量"] = X.columns
vif_data["VIF"] = [variance_inflation_factor(X.values, i) for i in range(X.shape[1])]
print("
", vif_data)

# 6. 识别影响点
influence_df = influence.summary_frame()
print("
影响点(Cook's D > 4/n):")
n = len(df)
influential = influence_df[influence_df['cooks_d'] > 4/n]
print(influential[['cooks_d', 'student_resid', 'hat_diag']])

# 7. 如果假设违反,尝试变换或稳健回归
if lm_pval < 0.05:  # 检测到异方差性
    print("
拟合稳健回归(RLM)...")
    from statsmodels.robust.robust_linear_model import RLM
    robust_model = RLM(df['price'], X, M=sm.robust.norms.HuberT())
    robust_results = robust_model.fit()
    print(robust_results.summary())

示例2:带交叉验证的ML回归

from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.linear_model import Ridge, Lasso, ElasticNet
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
import pandas as pd
import numpy as np

# 加载和分割数据
df = pd.read_csv('data.csv')
X = df.drop('target', axis=1)
y = df['target']
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# 定义要比较的模型
models = {
    'Ridge': Ridge(),
    'Lasso': Lasso(),
    'ElasticNet': ElasticNet(),
    'RandomForest': RandomForestRegressor(random_state=42),
    'GradientBoosting': GradientBoostingRegressor(random_state=42)
}

# 交叉验证比较
results = {}
for name, model in models.items():
    pipeline = Pipeline([
        ('scaler', StandardScaler()),
        ('model', model)
    ])

    scores = cross_val_score(
        pipeline, X_train, y_train,
        cv=5, scoring='neg_mean_squared_error'
    )
    rmse_scores = np.sqrt(-scores)
    results[name] = {
        'mean_rmse': rmse_scores.mean(),
        'std_rmse': rmse_scores.std()
    }
    print(f"{name}: RMSE = {rmse_scores.mean():.3f} (+/- {rmse_scores.std():.3f})")

# 最佳模型的超参数调优
param_grid = {
    'model__n_estimators': [50, 100, 200],
    'model__max_depth': [5, 10, 15, None],
    'model__min_samples_split': [2, 5, 10]
}

pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('model', RandomForestRegressor(random_state=42))
])

grid_search = GridSearchCV(
    pipeline, param_grid, cv=5,
    scoring='neg_mean_squared_error',
    n_jobs=-1, verbose=1
)
grid_search.fit(X_train, y_train)

print(f"
最佳参数: {grid_search.best_params_}")
print(f"最佳CV RMSE: {np.sqrt(-grid_search.best_score_):.3f}")

# 测试集评估
best_model = grid_search.best_estimator_
y_pred = best_model.predict(X_test)

print(f"
测试集性能:")
print(f"RMSE: {np.sqrt(mean_squared_error(y_test, y_pred)):.3f}")
print(f"MAE: {mean_absolute_error(y_test, y_pred):.3f}")
print(f"R²: {r2_score(y_test, y_pred):.3f}")

# 特征重要性
if hasattr(best_model.named_steps['model'], 'feature_importances_'):
    importance_df = pd.DataFrame({
        '特征': X.columns,
        '重要性': best_model.named_steps['model'].feature_importances_
    }).sort_values('重要性', ascending=False)
    print("
前10特征:")
    print(importance_df.head(10))

示例3:异常值检测管道

import numpy as np
import pandas as pd
from pyod.models.iforest import IForest
from pyod.models.lof import LOF
from pyod.models.copod import COPOD
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt

# 加载数据
df = pd.read_csv('data.csv')
X = df.drop('target', axis=1).values

# 标准化特征
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# 多种检测方法
detectors = {
    '隔离森林': IForest(contamination=0.1, random_state=42),
    'LOF': LOF(contamination=0.1, n_neighbors=20),
    'COPOD': COPOD(contamination=0.1)
}

# 拟合检测器并收集得分
scores_df = pd.DataFrame(index=df.index)
labels_df = pd.DataFrame(index=df.index)

for name, detector in detectors.items():
    detector.fit(X_scaled)
    scores_df[name] = detector.decision_scores_
    labels_df[name] = detector.labels_

# 共识:如果至少被两种方法标记,点为异常值
consensus_outliers = (labels_df.sum(axis=1) >= 2)
print(f"共识检测的异常值: {consensus_outliers.sum()}")

# 调查异常值
outlier_indices = df[consensus_outliers].index
print("
异常值汇总统计:")
print(df.loc[outlier_indices].describe())
print("
正常数据汇总统计:")
print(df.loc[~consensus_outliers].describe())

# 可视化异常值得分
fig, axes = plt.subplots(1, 3, figsize=(15, 4))
for i, (name, scores) in enumerate(scores_df.items()):
    axes[i].hist(scores, bins=50, alpha=0.7)
    threshold = np.percentile(scores, 90)  # 90百分位数
    axes[i].axvline(threshold, color='r', linestyle='--', label='阈值')
    axes[i].set_title(f'{name} 得分')
    axes[i].set_xlabel('异常值得分')
    axes[i].set_ylabel('频率')
    axes[i].legend()
plt.tight_layout()

# 决策:移除、调查还是保留?
# 选项1:移除异常值
df_clean = df[~consensus_outliers].copy()

# 选项2:使用处理异常值的稳健回归
# (参见上面RLM、Huber、RANSAC示例)

# 选项3:保留但单独分析
df['is_outlier'] = consensus_outliers

示例4:组合回归 + 异常值检测

import numpy as np
import pandas as pd
import statsmodels.api as sm
from pyod.models.iforest import IForest
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt

# 加载数据
df = pd.read_csv('data.csv')
X = df[['x1', 'x2', 'x3']].values
y = df['y'].values

# 步骤1:检测预测空间的异常值
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

detector = IForest(contamination=0.05, random_state=42)
detector.fit(X_scaled)
outliers_X = detector.labels_ == 1

print(f"预测空间异常值: {outliers_X.sum()}")

# 步骤2:拟合初始模型
X_with_const = sm.add_constant(X)
model = sm.OLS(y, X_with_const)
results = model.fit()

# 步骤3:识别响应空间异常值
from statsmodels.stats.outliers_influence import OLSInfluence
influence = OLSInfluence(results)
studentized_resid = influence.resid_studentized_internal
outliers_y = np.abs(studentized_resid) > 3

print(f"响应空间异常值: {outliers_y.sum()}")

# 步骤4:识别影响点
cooks_d = influence.cooks_distance[0]
n, p = X_with_const.shape
influential = cooks_d > 4/n

print(f"影响点: {influential.sum()}")

# 步骤5:分类点
df['outlier_X'] = outliers_X
df['outlier_y'] = outliers_y
df['influential'] = influential
df['any_flag'] = outliers_X | outliers_y | influential

# 步骤6:可视化
fig, axes = plt.subplots(1, 2, figsize=(12, 5))

# 图1:残差与拟合
colors = ['red' if flag else 'blue' for flag in df['any_flag']]
axes[0].scatter(results.fittedvalues, results.resid, c=colors, alpha=0.6)
axes[0].axhline(y=0, color='black', linestyle='--')
axes[0].set_xlabel('拟合值')
axes[0].set_ylabel('残差')
axes[0].set_title('残差与拟合(红色 = 标记)')

# 图2:Cook's距离
axes[1].stem(range(len(cooks_d)), cooks_d, markerfmt=',')
axes[1].axhline(y=4/n, color='r', linestyle='--', label='阈值 (4/n)')
axes[1].set_xlabel('观测索引')
axes[1].set_ylabel("Cook's距离")
axes[1].set_title("Cook's距离")
axes[1].legend()

plt.tight_layout()

# 步骤7:比较模型
print("
=== 原始模型 ===")
print(results.summary())

# 移除标记点并重新拟合
df_clean = df[~df['any_flag']].copy()
X_clean = df_clean[['x1', 'x2', 'x3']].values
y_clean = df_clean['y'].values
X_clean_const = sm.add_constant(X_clean)

model_clean = sm.OLS(y_clean, X_clean_const)
results_clean = model_clean.fit()

print("
=== 移除标记点后的模型 ===")
print(results_clean.summary())

# 替代:稳健回归(保留所有数据)
from statsmodels.robust.robust_linear_model import RLM
robust_model = RLM(y, X_with_const, M=sm.robust.norms.HuberT())
robust_results = robust_model.fit()

print("
=== 稳健回归(所有数据) ===")
print(robust_results.summary())

常见陷阱和解决方案

1. P-hacking和多重检验

问题: 测试许多模型/特征并只报告显著结果。

解决方案:

  • 基于理论/先验知识预先指定模型
  • 使用调整后的p值(Bonferroni、Benjamini-Hochberg)进行多重比较
  • 关注效应大小和置信区间,不仅仅是p值
  • 使用训练/测试分割或交叉验证进行模型选择
from statsmodels.stats.multitest import multipletests

# 多重假设检验校正
p_values = [0.04, 0.03, 0.001, 0.15, 0.08]
reject, pvals_corrected, _, _ = multipletests(p_values, method='fdr_bh')
# Benjamini-Hochberg控制错误发现率

2. 特征过多导致过拟合

问题: 模型完美拟合训练数据但泛化差。

解决方案:

  • 使用正则化(Ridge、Lasso、ElasticNet)
  • 交叉验证估计泛化性能
  • 基于领域知识进行特征选择
  • 样本量小时使用更简单模型
# 经验法则:每个预测变量至少需要10-20个观测值
n_samples, n_features = X.shape
if n_samples < 10 * n_features:
    print(f"警告: 每个特征仅 {n_samples/n_features:.1f} 个样本")
    print("考虑: (1) 正则化, (2) 特征选择, (3) 收集更多数据")

3. 忽略假设

问题: 假设违反时使用OLS导致无效推断。

解决方案:

  • 始终检查诊断(残差图、Q-Q图、检验)
  • 如果需要,变换变量(对数、Box-Cox、平方根)
  • 存在异常值时使用稳健方法
  • 检测到异方差性时使用WLS
  • 响应分布非正态时使用GLM
# Box-Cox变换用于正态性
from scipy.stats import boxcox

y_transformed, lambda_param = boxcox(y)  # 仅适用于正y
print(f"最优lambda: {lambda_param}")
# lambda=0: 对数变换
# lambda=0.5: 平方根变换
# lambda=1: 无需变换

4. 无理由移除异常值

问题: 任意移除点以改进拟合。

解决方案:

  • 调查为何点是异常值(数据错误 vs 真实极端值)
  • 记录所有数据清洗决策
  • 移除前尝试稳健方法
  • 比较带/不带异常值的结果
  • 永远不要仅因为点不拟合模型而移除
# 异常值调查的系统方法
def investigate_outliers(df, outlier_mask):
    """
    打印检测异常值的详细信息
    """
    outliers = df[outlier_mask]
    normal = df[~outlier_mask]

    print(f"异常值数量: {len(outliers)} ({100*len(outliers)/len(df):.1f}%)")
    print("
异常值特征:")
    print(outliers.describe())
    print("
正常数据特征:")
    print(normal.describe())

    # 检查数据录入错误
    print("
潜在数据错误:")
    for col in df.columns:
        if df[col].dtype in [np.float64, np.int64]:
            min_val, max_val = df[col].min(), df[col].max()
            suspicious = (outliers[col] < min_val * 0.1) | (outliers[col] > max_val * 0.9)
            if suspicious.any():
                print(f"  {col}: {suspicious.sum()} 个可疑值")

5. 混淆异常值与影响点

问题: 并非所有异常值都有影响,并非所有影响点都是异常值。

概念:

  • X空间异常值: 异常预测值(高杠杆)
  • Y空间异常值: 异常响应值(大残差)
  • 影响点: 移除它显著改变模型(高Cook’s D)

解决方案:

  • 检查多个诊断:杠杆、残差、Cook’s D
  • 高杠杆 + 小残差 = 不太有影响
  • 低杠杆 + 大残差 = 不太有影响
  • 高杠杆 + 大残差 = 非常有影响
# 分类点
def categorize_points(influence, results, n, p):
    """
    基于影响度量分类观测值
    """
    leverage = influence.hat_matrix_diag
    studentized_resid = influence.resid_studentized_internal
    cooks_d = influence.cooks_distance[0]

    high_leverage = leverage > 2 * p / n
    outlier_y = np.abs(studentized_resid) > 3
    influential = cooks_d > 4 / n

    categories = []
    for i in range(n):
        if influential[i]:
            categories.append('影响点')
        elif high_leverage[i] and outlier_y[i]:
            categories.append('高杠杆异常值')
        elif high_leverage[i]:
            categories.append('高杠杆')
        elif outlier_y[i]:
            categories.append('异常值')
        else:
            categories.append('正常')

    return pd.Series(categories, name='类别')

6. 不检查预测区间

问题: 报告点预测而不带不确定性。

解决方案:

  • 使用预测区间(统计模型)或分位数回归
  • 报告系数的置信区间
  • 可视化预测不确定性
# 使用statsmodels的预测区间
predictions = results.get_prediction(X_new)
pred_df = predictions.summary_frame(alpha=0.05)  # 95%区间

# pred_df有列: mean, mean_se, mean_ci_lower, mean_ci_upper,
#                       obs_ci_lower, obs_ci_upper

# 置信区间:平均预测的不确定性
# 预测区间:我们期望新观测值落的位置(更宽)

import matplotlib.pyplot as plt
plt.scatter(X_test, y_test, label='实际')
plt.plot(X_test, pred_df['mean'], 'r-', label='预测')
plt.fill_between(X_test, pred_df['obs_ci_lower'], pred_df['obs_ci_upper'],
                 alpha=0.2, label='95%预测区间')
plt.legend()

可视化最佳实践

诊断图

import matplotlib.pyplot as plt
import seaborn as sns
from statsmodels.graphics.gofplots import qqplot

def plot_diagnostics(results, figsize=(12, 10)):
    """
    为回归创建全面诊断图
    """
    fig, axes = plt.subplots(2, 2, figsize=figsize)

    # 1. 残差与拟合
    axes[0, 0].scatter(results.fittedvalues, results.resid, alpha=0.6)
    axes[0, 0].axhline(y=0, color='r', linestyle='--')
    axes[0, 0].set_xlabel('拟合值')
    axes[0, 0].set_ylabel('残差')
    axes[0, 0].set_title('残差与拟合')

    # 添加lowess平滑
    from statsmodels.nonparametric.smoothers_lowess import lowess
    smoothed = lowess(results.resid, results.fittedvalues, frac=0.3)
    axes[0, 0].plot(smoothed[:, 0], smoothed[:, 1], 'g-', lw=2)

    # 2. Q-Q图
    qqplot(results.resid, line='s', ax=axes[0, 1])
    axes[0, 1].set_title('正态Q-Q')

    # 3. 尺度-位置
    resid_std = np.sqrt(np.abs(results.resid_pearson))
    axes[1, 0].scatter(results.fittedvalues, resid_std, alpha=0.6)
    axes[1, 0].set_xlabel('拟合值')
    axes[1, 0].set_ylabel('√|标准化残差|')
    axes[1, 0].set_title('尺度-位置')
    smoothed = lowess(resid_std, results.fittedvalues, frac=0.3)
    axes[1, 0].plot(smoothed[:, 0], smoothed[:, 1], 'r-', lw=2)

    # 4. 残差与杠杆
    from statsmodels.stats.outliers_influence import OLSInfluence
    influence = OLSInfluence(results)
    leverage = influence.hat_matrix_diag
    cooks_d = influence.cooks_distance[0]

    axes[1, 1].scatter(leverage, results.resid_pearson, alpha=0.6)
    axes[1, 1].set_xlabel('杠杆')
    axes[1, 1].set_ylabel('标准化残差')
    axes[1, 1].set_title('残差与杠杆')

    # 高亮高Cook's D点
    n = len(results.resid)
    high_cooks = cooks_d > 4/n
    axes[1, 1].scatter(leverage[high_cooks], results.resid_pearson[high_cooks],
                       color='red', s=100, alpha=0.8, label="高Cook's D")
    axes[1, 1].legend()

    plt.tight_layout()
    return fig

# 使用
# fig = plot_diagnostics(results)
# plt.show()

异常值得分分布

def plot_outlier_scores(detectors_dict, X, figsize=(15, 5)):
    """
    绘制多个检测器的异常值得分分布
    """
    n_detectors = len(detectors_dict)
    fig, axes = plt.subplots(1, n_detectors, figsize=figsize)
    if n_detectors == 1:
        axes = [axes]

    for ax, (name, detector) in zip(axes, detectors_dict.items()):
        detector.fit(X)
        scores = detector.decision_scores_
        labels = detector.labels_

        # 直方图
        ax.hist(scores[labels == 0], bins=50, alpha=0.6, label='内点')
        ax.hist(scores[labels == 1], bins=50, alpha=0.6, label='异常值')
        ax.set_xlabel('异常值得分')
        ax.set_ylabel('频率')
        ax.set_title(name)
        ax.legend()

    plt.tight_layout()
    return fig

快速参考

何时使用哪种方法

目标 方法
带推断的简单线性回归 OLS statsmodels
带相关预测变量的回归 Ridge sklearn
回归中的特征选择 Lasso sklearn
抵抗异常值 RLMHuberRegressorRANSACRegressor statsmodels、sklearn
计数数据 GLM(Poisson)GLM(NegativeBinomial) statsmodels
时间序列预测 ARIMASARIMAX statsmodels
高维数据 LassoElasticNetRidge sklearn
复杂非线性模式 RandomForestGradientBoosting sklearn
快速异常值检测 COPODECODIsolationForest PyOD、sklearn
稳健异常值检测 LOF集成方法 PyOD、sklearn
单变量异常值 IQRMADZ-score scipy

关键诊断检查清单

  • [ ] 线性: 残差与拟合图显示随机散点
  • [ ] 独立性: Durbin-Watson ≈ 2,无自相关
  • [ ] 正态性: Q-Q图沿对角线,Shapiro-Wilk p > 0.05
  • [ ] 等方差: 尺度-位置图显示恒定扩散,Breusch-Pagan p > 0.05
  • [ ] 多重共线性: 所有VIF < 5(理想 < 10)
  • [ ] 异常值: 使用|学生化残差| > 3识别
  • [ ] 影响点: Cook’s D < 4/n,检查DFFITS
  • [ ] 模型比较: 使用AIC、BIC、交叉验证RMSE

安装命令

# 统计建模
pip install statsmodels scipy

# 机器学习
pip install scikit-learn

# 异常值检测
pip install pyod

# 可选:高级方法
pip install xgboost lightgbm

额外资源

文档

关键Statsmodels页面

关键Sklearn页面