因果推断Skill CausalInference

使用倾向得分、工具变量和因果图来确定政策评估和治疗效果中的因果关系

数据分析 0 次安装 0 次浏览 更新于 3/3/2026

name: 因果推断 description: 使用倾向得分、工具变量和因果图来确定政策评估和治疗效果中的因果关系

因果推断

概述

因果推断确定因果关系并估计治疗效果,超越相关性以理解什么导致什么。

何时使用

  • 评估政策干预或商业决策的影响
  • 在随机实验不可行时估计治疗效果
  • 在观察数据中控制混杂变量
  • 确定营销活动或产品变更是否导致了结果
  • 分析不同用户群体之间的异质性治疗效果
  • 使用倾向得分或工具变量从非实验数据中提出因果主张

关键概念

  • 处理:干预或暴露
  • 结果:结果或后果
  • 混杂:影响处理和结果的变量
  • 因果图:关系的可视化表示
  • 治疗效果:干预的影响
  • 选择偏差:非随机处理分配

因果方法

  • 随机对照试验 (RCT):黄金标准
  • 倾向得分匹配:平衡处理/对照
  • 差异中的差异:前后比较
  • 工具变量:处理内生性
  • 因果森林:异质性治疗效果

使用Python实现

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.linear_model import LinearRegression, LogisticRegression
from sklearn.preprocessing import StandardScaler
from scipy import stats

# 生成具有混杂的观察数据
np.random.seed(42)

n = 1000

# 混杂因素:年龄(影响处理和结果)
age = np.random.uniform(25, 75, n)

# 处理:培训计划(年轻人更有可能)
treatment_prob = 0.3 + 0.3 * (75 - age) / 50  # 与年龄的逆关系
treatment = (np.random.uniform(0, 1, n) < treatment_prob).astype(int)

# 结果:工资(受处理和年龄的影响)
# 处理的真实因果效应:+5000美元
salary = 40000 + 500 * age + 5000 * treatment + np.random.normal(0, 10000, n)

df = pd.DataFrame({
    'age': age,
    'treatment': treatment,
    'salary': salary,
})

print("观察数据摘要:")
print(df.describe())
print(f"
处理率:{df['treatment'].mean():.1%}")
print(f"对照组平均工资:${df[df['treatment']==0]['salary'].mean():.0f}")
print(f"处理组平均工资:${df[df['treatment']==1]['salary'].mean():.0f}")

# 1. 简单比较(有偏见 - 忽略混杂)
naive_effect = df[df['treatment']==1]['salary'].mean() - df[df['treatment']==0]['salary'].mean()
print(f"
1. 简单比较:${naive_effect:.0f}(有偏见)")

# 2. 回归调整(协变量调整)
X = df[['treatment', 'age']]
y = df['salary']
model = LinearRegression()
model.fit(X, y)
regression_effect = model.coef_[0]

print(f"
2. 回归调整:${regression_effect:.0f}")

# 3. 倾向得分匹配
# 估计给定协变量的处理概率
ps_model = LogisticRegression()
ps_model.fit(df[['age']], df['treatment'])
df['propensity_score'] = ps_model.predict_proba(df[['age']])[:, 1]

print(f"
3. 倾向得分匹配:")
print(f"PS范围:[{df['propensity_score'].min():.3f}, {df['propensity_score'].max():.3f}]")

# 匹配:为每个处理单元找到对照
matched_pairs = []
treated_units = df[df['treatment'] == 1].index
for treated_idx in treated_units:
    treated_ps = df.loc[treated_idx, 'propensity_score']
    treated_age = df.loc[treated_idx, 'age']

    # 找到最近的对照单元
    control_units = df[(df['treatment'] == 0) &
                      (df['propensity_score'] >= treated_ps - 0.1) &
                      (df['propensity_score'] <= treated_ps + 0.1)].index

    if len(control_units) > 0:
        closest_control = min(control_units,
                             key=lambda x: abs(df.loc[x, 'propensity_score'] - treated_ps))
        matched_pairs.append({
            'treated_idx': treated_idx,
            'control_idx': closest_control,
            'treated_salary': df.loc[treated_idx, 'salary'],
            'control_salary': df.loc[closest_control, 'salary'],
        })

matched_df = pd.DataFrame(matched_pairs)
psm_effect = (matched_df['treated_salary'] - matched_df['control_salary']).mean()
print(f"PSM效果:${psm_effect:.0f}")
print(f"匹配对数:{len(matched_df)}")

# 4. 按倾向得分分层
df['ps_stratum'] = pd.qcut(df['propensity_score'], q=5, labels=False, duplicates='drop')

stratified_effects = []
for stratum in df['ps_stratum'].unique():
    stratum_data = df[df['ps_stratum'] == stratum]
    if (stratum_data['treatment'] == 0).sum() > 0 and (stratum_data['treatment'] == 1).sum() > 0:
        treated_mean = stratum_data[stratum_data['treatment'] == 1]['salary'].mean()
        control_mean = stratum_data[stratum_data['treatment'] == 0]['salary'].mean()
        effect = treated_mean - control_mean
        stratified_effects.append(effect)

stratified_effect = np.mean(stratified_effects)
print(f"
4. 按PS分层:${stratified_effect:.0f}")

# 5. 可视化
fig, axes = plt.subplots(2, 2, figsize=(14, 10))

# 按年龄分布的处理
ax = axes[0, 0]
...

# 6. 双重稳健估计
from sklearn.ensemble import RandomForestRegressor

# 倾向得分模型
ps_model_dr = LogisticRegression().fit(df[['age']], df['treatment'])
ps_scores = ps_model_dr.predict_proba(df[['age']])[:, 1]

# 结果模型
outcome_model = RandomForestRegressor(n_estimators=50, random_state=42)
outcome_model.fit(df[['treatment', 'age']], df['salary'])

# 双重稳健估计器
treated_mask = df['treatment'] == 1
control_mask = df['treatment'] == 0

# 调整倾向得分
treated_adjusted = (treated_mask.astype(int) * df['salary']) / (ps_scores + 0.01)
control_adjusted = (control_mask.astype(int) * df['salary']) / (1 - ps_scores + 0.01)

# 结果预测
pred_treated = outcome_model.predict(df[['treatment', 'age']].replace({'treatment': 0, 1: 1}))
pred_control = outcome_model.predict(df[['treatment', 'age']].replace({'treatment': 1, 0: 0}))

dr_effect = treated_adjusted.sum() / treated_mask.sum() - control_adjusted.sum() / control_mask.sum()
print(f"
6. 双重稳健估计:${dr_effect:.0f}")

# 7. 异质性治疗效果
print(f"
7. 按年龄四分位数的异质性治疗效果:")

for age_q in pd.qcut(df['age'], q=4, duplicates='drop').unique():
    mask = (df['age'] >= age_q.left) & (df['age'] < age_q.right)
    stratum_data = df[mask]

    if (stratum_data['treatment'] == 0).sum() > 0 and (stratum_data['treatment'] == 1).sum() > 0:
        treated_mean = stratum_data[stratum_data['treatment'] == 1]['salary'].mean()
        control_mean = stratum_data[stratum_data['treatment'] == 0]['salary'].mean()
        effect = treated_mean - control_mean

        print(f"  年龄{age_q.left:.0f}-{age_q.right:.0f}:${effect:.0f}")

# 8. 敏感性分析
print(f"
8. 敏感性分析(隐藏混杂因素的影响):")

# 改变隐藏混杂因素与结果的相关性
for hidden_effect in [1000, 2000, 5000, 10000]:
    adjusted_effect = regression_effect - hidden_effect * 0.1
    print(f"  如果隐藏混杂因素价值为${hidden_effect}:效果 = ${adjusted_effect:.0f}")

# 9. 汇总表
print(f"
" + "="*60)
print("因果推断汇总")
print("="*60)
print(f"真实处理效应:${true_effect:,.0f}")
print(f"
估计值:")
print(f"  简单(有偏见):${naive_effect:,.0f}")
print(f"  回归调整:${regression_effect:,.0f}")
print(f"  倾向得分匹配:${psm_effect:,.0f}")
print(f"  分层:${stratified_effect:,.0f}")
print(f"  双重稳健:${dr_effect:,.0f}")
print("="*60)

# 10. 因果图(文本表示)
print(f"
10. 因果图(DAG):")
print(f"""
年龄 → 处理 ← (选择偏差)
  ↓        ↓
  └─→ 工资

解释:
- 年龄是混杂因素
- 处理对工资有因果影响
- 年龄直接影响工资
- 年龄影响处理的概率
""")

因果假设

  • 无混杂性:没有未测量的混杂因素
  • 重叠:在倾向得分上有共同支持
  • SUTVA:单位之间无干扰
  • 一致性:单一版本的处理

治疗效果类型

  • ATE:平均治疗效果(总体)
  • ATT:对已处理的平均治疗效果
  • CATE:条件平均治疗效果
  • HTE:异质性治疗效果

方法优势

  • RCT:黄金标准,控制所有混杂因素
  • 匹配:平衡组别,保持重叠
  • 回归:调整协变量
  • 工具变量:处理内生性
  • 因果森林:学习异质性效果

交付物

  • 因果图可视化
  • 治疗效果估计
  • 敏感性分析
  • 异质性治疗效果
  • 协变量平衡评估
  • 倾向得分诊断
  • 最终因果推断报告