因果推断的「黄金搭档」,一篇搞懂!
做实证研究,最头疼的问题之一就是——处理组和对照组本来就不一样,直接比较「有失公允」。倾向得分匹配(PSM)正是解决这类问题的经典方法。今天我们用 Python,从原理到代码,把 PSM 讲透。
一、为什么需要 PSM?
想象一个场景:你想研究「参加培训」对员工工资的影响。
问题是——参加培训的人本来可能就更上进、学历更高、更有能力。即使不参加培训,他们的工资也可能更高。直接比较得到的「培训效果」,其实是选择偏差在作祟。
PSM 的核心思想:为每个处理组个体,在对照组中找到一个「长得像」的匹配对象,使得两者在可观测特征上尽可能相似。这样比较的就是「反事实」——如果处理组个体没参加培训,工资会是多少?
二、PSM 三步走
Step 1 → 构建倾向得分:用 Logit 回归估计每个个体「被处理」的概率Step 2 → 最近邻匹配:处理组找对照组中倾向得分最接近的个体Step 3 → ATT 估计:比较匹配后处理组与对照组的结局差异
下面一步步来。
三、代码实现
Step 0|环境准备
import pandas as pdimport numpy as npimport statsmodels.api as smimport statsmodels.formula.api as smfimport matplotlib.pyplot as pltimport warningswarnings.filterwarnings('ignore')plt.rcParams['font.sans-serif'] = ['SimHei', 'Arial Unicode MS']plt.rcParams['axes.unicode_minus'] = False
Step 1|读取数据
# ============================================================# 修改这里:替换为你的数据文件路径# ============================================================df = pd.read_csv('YOUR_DATA_PATH.csv')# 数据结构说明(请替换为你的真实变量名):# - TREAT:处理组虚拟变量(1=处理组,0=对照组)# - Y:结局变量(如工资、绩效等)# - X1, X2, X3:协变量(用于估计倾向得分的控制变量)# ============================================================print(f"样本量:{len(df)}")print(f"处理组:{df['TREAT'].sum()},对照组:{(df['TREAT']==0).sum()}")
小提示:协变量的选择遵循「既影响处理状态、又影响结局变量,但不中介处理-结局关系」的原则——即著名的 CIA 条件(条件独立假设)。
Step 2|倾向得分估计
用 Logit 回归估计每个个体进入处理组的概率:
# ============================================================# 修改这里:列出所有协变量# ============================================================formula = 'TREAT ~ X1 + X2 + X3 + X4 + X5'# 方法一:用 statsmodels 公式接口(推荐,输出更清晰)logit_model = smf.logit(formula, data=df).fit(disp=False)# 查看回归结果(检查协变量显著性)print(logit_model.summary())# 提取倾向得分(每个个体「被处理」的概率)df['pscore'] = logit_model.predict(df)print(f"\n倾向得分范围:[{df['pscore'].min():.4f}, {df['pscore'].max():.4f}]")print(f"倾向得分均值:{df['pscore'].mean():.4f}")
重要检验:如果处理组和对照组的倾向得分分布没有重叠区域(overlap assumption 违反),则无法进行有效匹配。需要先检查这一步。
重叠假设检验:
# 画出处理组 vs 对照组的倾向得分分布treated_ps = df[df['TREAT'] == 1]['pscore']control_ps = df[df['TREAT'] == 0]['pscore']plt.figure(figsize=(8, 5))plt.hist(treated_ps, bins=50, alpha=0.6, label='处理组', color='steelblue')plt.hist(control_ps, bins=50, alpha=0.6, label='对照组', color='coral')plt.xlabel('倾向得分', fontsize=12)plt.ylabel('频数', fontsize=12)plt.title('倾向得分分布对比(Overlap 检验)', fontsize=14)plt.legend()plt.tight_layout()plt.savefig('pscore_distribution.png', dpi=150)plt.show()print(" 重叠假设检验图已保存")
合格标准:两条曲线有明显的重叠区域。如果处理组得分全在对照组右侧(或反之),则存在共同支撑域(common support)问题,需要删除不满足条件的样本。
Step 3|最近邻匹配
手写最近邻匹配(无依赖,逻辑透明):
def nearest_neighbor_matching(df, treatment_col, pscore_col, replace=True, caliper=None): """ 最近邻匹配 参数: df : DataFrame treatment_col : 处理组列名 pscore_col : 倾向得分列名 replace : 是否允许有放回匹配(True=1:1最近邻,False=不重复匹配) caliper : 卡尺半径(如 0.05),超过此距离不匹配 返回: 匹配后的 DataFrame,包含 match_id 列 """ treated = df[df[treatment_col] == 1].copy() control = df[df[treatment_col] == 0].copy() matched_control_idx = [] for idx, row in treated.iterrows(): pscore_t = row[pscore_col] # 计算与所有对照组的得分距离 distances = np.abs(control[pscore_col] - pscore_t) if caliper is not None: distances[distances > caliper] = np.inf # 找到距离最小的对照组 min_idx = distances.idxmin() # 不重复匹配(可选) if not replace and min_idx in matched_control_idx: # 找次优匹配 distances[min_idx] = np.inf min_idx = distances.idxmin() matched_control_idx.append(min_idx) # 构建匹配后的数据集 treated_matched = treated.copy() treated_matched['match_id'] = matched_control_idx control_matched = control.loc[matched_control_idx].copy() control_matched['match_id'] = matched_control_idx control_matched.index.name = 'original_index' # 合并 matched_df = pd.concat([treated_matched, control_matched], ignore_index=True) return matched_df# ============================================================# 修改这里:选择是否启用卡尺匹配# ============================================================# 不设卡尺(默认1:1最近邻,有放回)matched_df = nearest_neighbor_matching( df, treatment_col='TREAT', pscore_col='pscore', replace=True, # True=有放回,False=不放回 caliper=None # 可设卡尺,如 caliper=0.05)# 或启用卡尺匹配(更严格,推荐)matched_df = nearest_neighbor_matching( df, treatment_col='TREAT', pscore_col='pscore', replace=True, caliper=0.05 # 只匹配得分差距在0.05以内的个体)print(f"匹配后样本量:{len(matched_df)}")print(f"处理组:{(matched_df['TREAT']==1).sum()},对照组:{(matched_df['TREAT']==0).sum()}")
卡尺匹配 vs 普通最近邻:
- • 普通最近邻:处理组每个个体强制匹配得分最接近的对照,不问差距有多大
- • 卡尺匹配:加一个「容忍阈值」,差距超过阈值的对照组直接拒绝匹配,更保守、偏误更小
- • 建议优先用卡尺匹配,卡尺一般取 0.01~0.05
Step 4|平衡性检验
匹配质量好不好,关键看协变量在匹配后是否「平衡」了。
标准化均值差异(SMD)计算:
def calc_std_mean_diff(df, var, treatment_col, after_match=True): """计算标准化均值差异(SMD)""" treated = df[df[treatment_col] == 1][var] control = df[df[treatment_col] == 0][var] diff = treated.mean() - control.mean() pooled_std = np.sqrt((treated.var() + control.var()) / 2) smd = diff / pooled_std if pooled_std > 0 else 0 return smd# ============================================================# 修改这里:列出你的所有协变量# ============================================================covariates = ['X1', 'X2', 'X3', 'X4', 'X5']# 匹配前 SMDsmd_before = {var: calc_std_mean_diff(df, var, 'TREAT', False) for var in covariates}# 匹配后 SMDsmd_after = {var: calc_std_mean_diff(matched_df, var, 'TREAT', True) for var in covariates}# 汇总表格balance_df = pd.DataFrame({ '变量': covariates, '匹配前 SMD': [smd_before[v] for v in covariates], '匹配后 SMD': [smd_after[v] for v in covariates],})balance_df['SMD 改善幅度(%)'] = (1 - balance_df['匹配后 SMD'].abs() / balance_df['匹配前 SMD'].abs().replace(0, np.nan)) * 100print("=" * 55)print("平衡性检验结果")print("=" * 55)print(balance_df.to_string(index=False))print("=" * 55)# 判断标准:|SMD| < 0.1 为理想平衡,< 0.2 为可接受balance_df['是否平衡(<0.1)'] = balance_df['匹配后 SMD'].abs().apply( lambda x: '✅' if x < 0.1 else ('⚠️' if x < 0.2 else '❌'))print(balance_df[['变量', '匹配后 SMD', '是否平衡(<0.1)']].to_string(index=False))
可视化:匹配前后 SMD 对比:
vars_order = balance_df['变量'].tolist()x_pos = np.arange(len(vars_order))plt.figure(figsize=(9, 5))plt.bar(x_pos - 0.2, balance_df['匹配前 SMD'].abs(), 0.4, label='匹配前', color='coral', alpha=0.8)plt.bar(x_pos + 0.2, balance_df['匹配后 SMD'].abs(), 0.4, label='匹配后', color='steelblue', alpha=0.8)plt.axhline(y=0.1, color='red', linestyle='--', linewidth=1.5, label='理想阈值(|SMD|=0.1)')plt.xticks(x_pos, vars_order, fontsize=11)plt.ylabel('|SMD|', fontsize=12)plt.title('匹配前后标准化均值差异(SMD)对比', fontsize=14)plt.legend()plt.tight_layout()plt.savefig('smd_comparison.png', dpi=150)plt.show()print(" 平衡性检验图已保存")
合格标准:匹配后所有协变量的 |SMD| < 0.1(推荐)或 < 0.2(勉强接受)。如果某变量在匹配后 SMD 仍然很大,说明该变量的分布仍不平衡,需要重新审视协变量选择或匹配方法。
Step 5|ATT 估计
ATT(Average Treatment effect on the Treated)= 处理组的平均处理效应
# 匹配后各组的结局变量均值att_treated = matched_df[matched_df['TREAT'] == 1]['Y'].mean()att_control = matched_df[matched_df['TREAT'] == 0]['Y'].mean()ATT = att_treated - att_controlprint("=" * 45)print("ATT 估计结果")print("=" * 45)print(f"处理组均值(匹配后):{att_treated:.4f}")print(f"对照组均值(匹配后):{att_control:.4f}")print(f"ATT(平均处理效应):{ATT:.4f}")print("=" * 45)# 配对 t 检验(检验 ATT 是否显著不同于零)treated_Y = matched_df[matched_df['TREAT'] == 1][['Y', 'match_id']].set_index('match_id')control_Y = matched_df[matched_df['TREAT'] == 0][['Y']].rename(columns={'Y': 'Y_control'})control_Y.index.name = 'match_id'paired = treated_Y.join(control_Y).dropna()diff = paired['Y'] - paired['Y_control']t_stat = diff.mean() / (diff.std() / np.sqrt(len(diff)))p_value = 2 * (1 - stats.t.cdf(abs(t_stat), df=len(diff)-1))from scipy import statsprint(f"\n配对 t 检验:t = {t_stat:.4f}, p = {p_value:.4f}")if p_value < 0.01: print("结论:ATT 在 1% 水平上显著")elif p_value < 0.05: print("结论:ATT 在 5% 水平上显著")elif p_value < 0.1: print("结论:ATT 在 10% 水平上显著")else: print("结论:ATT 不显著")
四、完整流程封装
把以上步骤封装成一个通用函数,方便以后直接调用:
def run_psm_analysis(df, treatment_col, outcome_col, covariate_list, caliper=0.05, replace=True, verbose=True): """ PSM 完整分析流程 参数: df : 原始数据 DataFrame treatment_col : 处理组列名(如 'TREAT') outcome_col : 结局变量列名(如 'Y') covariate_list : 协变量列表(如 ['X1', 'X2', 'X3']) caliper : 卡尺半径(None = 无卡尺) replace : 是否允许有放回匹配 verbose : 是否打印详细结果 返回: matched_df, ATT, balance_df """ import statsmodels.formula.api as smf from scipy import stats import numpy as np # 1. 倾向得分估计 formula = f"{treatment_col} ~ {' + '.join(covariate_list)}" logit = smf.logit(formula, data=df).fit(disp=False) df = df.copy() df['pscore'] = logit.predict(df) # 2. 最近邻匹配 matched_df = nearest_neighbor_matching( df, treatment_col, 'pscore', replace=replace, caliper=caliper ) # 3. ATT 估计 treated_mean = matched_df[matched_df[treatment_col]==1][outcome_col].mean() control_mean = matched_df[matched_df[treatment_col]==0][outcome_col].mean() ATT = treated_mean - control_mean # 4. 平衡性检验 smd_before = {} smd_after = {} for var in covariate_list: t = df[df[treatment_col]==1][var] c = df[df[treatment_col]==0][var] tm = matched_df[matched_df[treatment_col]==1][var] cm = matched_df[matched_df[treatment_col]==0][var] pooled = np.sqrt((t.var() + c.var()) / 2) smd_before[var] = (t.mean() - c.mean()) / pooled smd_after[var] = (tm.mean() - cm.mean()) / pooled balance_df = pd.DataFrame({ '变量': covariate_list, '匹配前 SMD': [smd_before[v] for v in covariate_list], '匹配后 SMD': [smd_after[v] for v in covariate_list], }) if verbose: print(f"\n{'='*45}") print(f"ATT 估计结果:{ATT:.4f}") print(f"处理组均值:{treated_mean:.4f}") print(f"对照组均值:{control_mean:.4f}") print(f"{'='*45}") print(balance_df.round(4).to_string(index=False)) return matched_df, ATT, balance_df# 使用示例(替换变量名即可):# matched, att, balance = run_psm_analysis(# df=df,# treatment_col='TREAT',# outcome_col='Y',# covariate_list=['X1', 'X2', 'X3', 'X4', 'X5'],# caliper=0.05# )
五、进阶:倾向得分加权(PSW)
除了匹配,还可以用逆倾向得分加权(IPW) 来估计 ATT,思路是给每个样本赋予权重:
df = df.copy()df['weight'] = np.where( df['TREAT'] == 1, 1 / df['pscore'], # 处理组:1 / pscore 1 / (1 - df['pscore']) # 对照组:1 / (1 - pscore))# 加权 ATTATT_psw = ( (df[df['TREAT']==1]['Y'] * df[df['TREAT']==1]['weight']).sum() / df[df['TREAT']==1]['weight'].sum() - (df[df['TREAT']==0]['Y'] * df[df['TREAT']==0]['weight']).sum() / df[df['TREAT']==0]['weight'].sum())print(f"PSW 加权 ATT 估计值:{ATT_psw:.4f}")
💡 PSW 的优势在于不损失样本量,但对极端倾向得分(接近0或1)非常敏感,需要做截断(trimming)或用稳健标准误。
六、常见「坑」与避坑指南
| | |
|---|
| 匹配后样本量骤降 | | |
| 协变量选择错误 | | |
| 匹配后 SMD 仍大 | | 考虑增加协变量,或换用更严格的匹配方法(如半径匹配、核匹配) |
| 倾向得分模型误设 | | |
| 未报告稳健标准误 | | |
七、结语
PSM 是因果推断中最常用也最易理解的「准实验」方法之一。它的本质是用倾向得分这把尺子,在对照组中找到处理组的「影子」,让比较变得公平。
今天这套代码覆盖了:
- • 倾向得分估计(statsmodels Logit)