name: 因果推断 description: 使用倾向得分、工具变量和因果图来确定政策评估和治疗效果中的因果关系
因果推断
概述
因果推断确定因果关系并估计治疗效果,超越相关性以理解什么导致什么。
何时使用
- 评估政策干预或商业决策的影响
- 在随机实验不可行时估计治疗效果
- 在观察数据中控制混杂变量
- 确定营销活动或产品变更是否导致了结果
- 分析不同用户群体之间的异质性治疗效果
- 使用倾向得分或工具变量从非实验数据中提出因果主张
关键概念
- 处理:干预或暴露
- 结果:结果或后果
- 混杂:影响处理和结果的变量
- 因果图:关系的可视化表示
- 治疗效果:干预的影响
- 选择偏差:非随机处理分配
因果方法
- 随机对照试验 (RCT):黄金标准
- 倾向得分匹配:平衡处理/对照
- 差异中的差异:前后比较
- 工具变量:处理内生性
- 因果森林:异质性治疗效果
使用Python实现
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.linear_model import LinearRegression, LogisticRegression
from sklearn.preprocessing import StandardScaler
from scipy import stats
# 生成具有混杂的观察数据
np.random.seed(42)
n = 1000
# 混杂因素:年龄(影响处理和结果)
age = np.random.uniform(25, 75, n)
# 处理:培训计划(年轻人更有可能)
treatment_prob = 0.3 + 0.3 * (75 - age) / 50 # 与年龄的逆关系
treatment = (np.random.uniform(0, 1, n) < treatment_prob).astype(int)
# 结果:工资(受处理和年龄的影响)
# 处理的真实因果效应:+5000美元
salary = 40000 + 500 * age + 5000 * treatment + np.random.normal(0, 10000, n)
df = pd.DataFrame({
'age': age,
'treatment': treatment,
'salary': salary,
})
print("观察数据摘要:")
print(df.describe())
print(f"
处理率:{df['treatment'].mean():.1%}")
print(f"对照组平均工资:${df[df['treatment']==0]['salary'].mean():.0f}")
print(f"处理组平均工资:${df[df['treatment']==1]['salary'].mean():.0f}")
# 1. 简单比较(有偏见 - 忽略混杂)
naive_effect = df[df['treatment']==1]['salary'].mean() - df[df['treatment']==0]['salary'].mean()
print(f"
1. 简单比较:${naive_effect:.0f}(有偏见)")
# 2. 回归调整(协变量调整)
X = df[['treatment', 'age']]
y = df['salary']
model = LinearRegression()
model.fit(X, y)
regression_effect = model.coef_[0]
print(f"
2. 回归调整:${regression_effect:.0f}")
# 3. 倾向得分匹配
# 估计给定协变量的处理概率
ps_model = LogisticRegression()
ps_model.fit(df[['age']], df['treatment'])
df['propensity_score'] = ps_model.predict_proba(df[['age']])[:, 1]
print(f"
3. 倾向得分匹配:")
print(f"PS范围:[{df['propensity_score'].min():.3f}, {df['propensity_score'].max():.3f}]")
# 匹配:为每个处理单元找到对照
matched_pairs = []
treated_units = df[df['treatment'] == 1].index
for treated_idx in treated_units:
treated_ps = df.loc[treated_idx, 'propensity_score']
treated_age = df.loc[treated_idx, 'age']
# 找到最近的对照单元
control_units = df[(df['treatment'] == 0) &
(df['propensity_score'] >= treated_ps - 0.1) &
(df['propensity_score'] <= treated_ps + 0.1)].index
if len(control_units) > 0:
closest_control = min(control_units,
key=lambda x: abs(df.loc[x, 'propensity_score'] - treated_ps))
matched_pairs.append({
'treated_idx': treated_idx,
'control_idx': closest_control,
'treated_salary': df.loc[treated_idx, 'salary'],
'control_salary': df.loc[closest_control, 'salary'],
})
matched_df = pd.DataFrame(matched_pairs)
psm_effect = (matched_df['treated_salary'] - matched_df['control_salary']).mean()
print(f"PSM效果:${psm_effect:.0f}")
print(f"匹配对数:{len(matched_df)}")
# 4. 按倾向得分分层
df['ps_stratum'] = pd.qcut(df['propensity_score'], q=5, labels=False, duplicates='drop')
stratified_effects = []
for stratum in df['ps_stratum'].unique():
stratum_data = df[df['ps_stratum'] == stratum]
if (stratum_data['treatment'] == 0).sum() > 0 and (stratum_data['treatment'] == 1).sum() > 0:
treated_mean = stratum_data[stratum_data['treatment'] == 1]['salary'].mean()
control_mean = stratum_data[stratum_data['treatment'] == 0]['salary'].mean()
effect = treated_mean - control_mean
stratified_effects.append(effect)
stratified_effect = np.mean(stratified_effects)
print(f"
4. 按PS分层:${stratified_effect:.0f}")
# 5. 可视化
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# 按年龄分布的处理
ax = axes[0, 0]
...
# 6. 双重稳健估计
from sklearn.ensemble import RandomForestRegressor
# 倾向得分模型
ps_model_dr = LogisticRegression().fit(df[['age']], df['treatment'])
ps_scores = ps_model_dr.predict_proba(df[['age']])[:, 1]
# 结果模型
outcome_model = RandomForestRegressor(n_estimators=50, random_state=42)
outcome_model.fit(df[['treatment', 'age']], df['salary'])
# 双重稳健估计器
treated_mask = df['treatment'] == 1
control_mask = df['treatment'] == 0
# 调整倾向得分
treated_adjusted = (treated_mask.astype(int) * df['salary']) / (ps_scores + 0.01)
control_adjusted = (control_mask.astype(int) * df['salary']) / (1 - ps_scores + 0.01)
# 结果预测
pred_treated = outcome_model.predict(df[['treatment', 'age']].replace({'treatment': 0, 1: 1}))
pred_control = outcome_model.predict(df[['treatment', 'age']].replace({'treatment': 1, 0: 0}))
dr_effect = treated_adjusted.sum() / treated_mask.sum() - control_adjusted.sum() / control_mask.sum()
print(f"
6. 双重稳健估计:${dr_effect:.0f}")
# 7. 异质性治疗效果
print(f"
7. 按年龄四分位数的异质性治疗效果:")
for age_q in pd.qcut(df['age'], q=4, duplicates='drop').unique():
mask = (df['age'] >= age_q.left) & (df['age'] < age_q.right)
stratum_data = df[mask]
if (stratum_data['treatment'] == 0).sum() > 0 and (stratum_data['treatment'] == 1).sum() > 0:
treated_mean = stratum_data[stratum_data['treatment'] == 1]['salary'].mean()
control_mean = stratum_data[stratum_data['treatment'] == 0]['salary'].mean()
effect = treated_mean - control_mean
print(f" 年龄{age_q.left:.0f}-{age_q.right:.0f}:${effect:.0f}")
# 8. 敏感性分析
print(f"
8. 敏感性分析(隐藏混杂因素的影响):")
# 改变隐藏混杂因素与结果的相关性
for hidden_effect in [1000, 2000, 5000, 10000]:
adjusted_effect = regression_effect - hidden_effect * 0.1
print(f" 如果隐藏混杂因素价值为${hidden_effect}:效果 = ${adjusted_effect:.0f}")
# 9. 汇总表
print(f"
" + "="*60)
print("因果推断汇总")
print("="*60)
print(f"真实处理效应:${true_effect:,.0f}")
print(f"
估计值:")
print(f" 简单(有偏见):${naive_effect:,.0f}")
print(f" 回归调整:${regression_effect:,.0f}")
print(f" 倾向得分匹配:${psm_effect:,.0f}")
print(f" 分层:${stratified_effect:,.0f}")
print(f" 双重稳健:${dr_effect:,.0f}")
print("="*60)
# 10. 因果图(文本表示)
print(f"
10. 因果图(DAG):")
print(f"""
年龄 → 处理 ← (选择偏差)
↓ ↓
└─→ 工资
解释:
- 年龄是混杂因素
- 处理对工资有因果影响
- 年龄直接影响工资
- 年龄影响处理的概率
""")
因果假设
- 无混杂性:没有未测量的混杂因素
- 重叠:在倾向得分上有共同支持
- SUTVA:单位之间无干扰
- 一致性:单一版本的处理
治疗效果类型
- ATE:平均治疗效果(总体)
- ATT:对已处理的平均治疗效果
- CATE:条件平均治疗效果
- HTE:异质性治疗效果
方法优势
- RCT:黄金标准,控制所有混杂因素
- 匹配:平衡组别,保持重叠
- 回归:调整协变量
- 工具变量:处理内生性
- 因果森林:学习异质性效果
交付物
- 因果图可视化
- 治疗效果估计
- 敏感性分析
- 异质性治疗效果
- 协变量平衡评估
- 倾向得分诊断
- 最终因果推断报告