一、统计学习路线图
"""统计学习路径:描述统计 → 概率论 → 推断统计 → 回归分析 → 时间序列 → 机器学习今日重点:1. 描述性统计分析2. 概率分布与统计检验3. 相关性与回归分析4. 时间序列分析基础5. 统计建模实践"""import numpy as npimport pandas as pdimport matplotlib.pyplot as pltimport seaborn as snsimport scipy.stats as statsfrom scipy import statsimport statsmodels.api as smimport statsmodels.formula.api as smffrom statsmodels.tsa.seasonal import seasonal_decomposeimport warningswarnings.filterwarnings('ignore')# 设置中文字体plt.rcParams['font.sans-serif'] = ['SimHei', 'DejaVu Sans']plt.rcParams['axes.unicode_minus'] = False# 创建示例数据集np.random.seed(42)n_samples = 1000data = pd.DataFrame({ 'age': np.random.normal(35, 10, n_samples).clip(18, 70).astype(int), 'income': np.random.lognormal(10, 0.5, n_samples), 'education_years': np.random.choice([12, 16, 18, 21], n_samples, p=[0.3, 0.4, 0.2, 0.1]), 'experience': np.random.gamma(5, 2, n_samples).clip(0, 40), 'satisfaction': np.random.beta(3, 2, n_samples) * 10, 'gender': np.random.choice(['Male', 'Female'], n_samples, p=[0.55, 0.45]), 'department': np.random.choice(['Sales', 'IT', 'HR', 'Finance', 'Marketing'], n_samples), 'performance': np.random.normal(75, 15, n_samples).clip(0, 100)})# 添加一些相关性data['income'] = data['income'] * (1 + 0.01 * data['education_years']) + np.random.randn(n_samples) * 5000data['performance'] = data['performance'] + 0.3 * data['satisfaction'] + np.random.randn(n_samples) * 5print("数据集形状:", data.shape)print("\n数据预览:")print(data.head())
二、描述性统计分析
1. 基本统计量
def comprehensive_descriptive_analysis(df): """全面的描述性统计分析""" print("=" * 80) print("描述性统计分析报告") print("=" * 80) # 1. 基本统计信息 print("\n1. 基本统计信息:") print("-" * 40) numeric_cols = df.select_dtypes(include=[np.number]).columns basic_stats = df[numeric_cols].agg(['count', 'mean', 'std', 'min', 'max', 'median']) print(basic_stats.round(2)) # 2. 分位数分析 print("\n2. 分位数分析:") print("-" * 40) quantiles = df[numeric_cols].quantile([0.01, 0.05, 0.25, 0.5, 0.75, 0.95, 0.99]) print(quantiles.round(2)) # 3. 偏度和峰度 print("\n3. 分布形状分析:") print("-" * 40) skewness = df[numeric_cols].skew() kurtosis = df[numeric_cols].kurtosis() distribution_info = pd.DataFrame({ '偏度': skewness, '峰度': kurtosis, '分布类型': skewness.apply(lambda x: '右偏' if x > 0.5 else '左偏' if x < -0.5 else '对称') }) print(distribution_info.round(4)) # 4. 异常值检测 print("\n4. 异常值检测 (IQR方法):") print("-" * 40) outlier_info = {} for col in numeric_cols: Q1 = df[col].quantile(0.25) Q3 = df[col].quantile(0.75) IQR = Q3 - Q1 lower_bound = Q1 - 1.5 * IQR upper_bound = Q3 + 1.5 * IQR outliers = df[(df[col] < lower_bound) | (df[col] > upper_bound)][col] outlier_info[col] = { '异常值数量': len(outliers), '异常值比例': f"{len(outliers)/len(df)*100:.2f}%", '最小值': df[col].min(), '下界': lower_bound, '上界': upper_bound, '最大值': df[col].max() } outlier_df = pd.DataFrame(outlier_info).T print(outlier_df[['异常值数量', '异常值比例', '最小值', '下界', '上界', '最大值']]) # 5. 数据完整性检查 print("\n5. 数据完整性检查:") print("-" * 40) missing_info = pd.DataFrame({ '缺失值数量': df.isnull().sum(), '缺失值比例': df.isnull().sum() / len(df) * 100, '唯一值数量': df.nunique(), '数据类型': df.dtypes }) print(missing_info.round(2)) # 6. 分类变量分析 print("\n6. 分类变量分析:") print("-" * 40) categorical_cols = df.select_dtypes(include=['object']).columns for col in categorical_cols: print(f"\n{col}:") value_counts = df[col].value_counts() percentages = df[col].value_counts(normalize=True) * 100 summary = pd.DataFrame({ '频数': value_counts, '百分比': percentages.round(2), '累计百分比': percentages.cumsum().round(2) }) print(summary) return basic_stats, outlier_info, missing_info# 执行描述性分析basic_stats, outlier_info, missing_info = comprehensive_descriptive_analysis(data)
2. 可视化分析
def visualize_distributions(df): """可视化数据分布""" numeric_cols = df.select_dtypes(include=[np.number]).columns categorical_cols = df.select_dtypes(include=['object']).columns # 创建图形 fig = plt.figure(figsize=(20, 15)) # 1. 数值变量分布 n_numeric = len(numeric_cols) cols = 3 rows = int(np.ceil(n_numeric / cols)) for i, col in enumerate(numeric_cols): ax = plt.subplot(rows, cols, i+1) # 直方图 n, bins, patches = ax.hist(df[col].dropna(), bins=30, alpha=0.7, color='steelblue', edgecolor='black') # 添加核密度估计 from scipy.stats import gaussian_kde kde = gaussian_kde(df[col].dropna()) x_range = np.linspace(df[col].min(), df[col].max(), 1000) ax2 = ax.twinx() ax2.plot(x_range, kde(x_range), 'r-', linewidth=2, alpha=0.8) ax2.set_ylabel('密度', color='red') ax2.tick_params(axis='y', labelcolor='red') # 添加统计信息 mean = df[col].mean() median = df[col].median() std = df[col].std() ax.axvline(mean, color='green', linestyle='--', linewidth=2, alpha=0.7, label=f'均值: {mean:.2f}') ax.axvline(median, color='orange', linestyle='--', linewidth=2, alpha=0.7, label=f'中位数: {median:.2f}') ax.axvline(mean + std, color='gray', linestyle=':', linewidth=1, alpha=0.5) ax.axvline(mean - std, color='gray', linestyle=':', linewidth=1, alpha=0.5) ax.set_title(f'{col} 分布\n偏度: {df[col].skew():.2f}, 峰度: {df[col].kurtosis():.2f}') ax.set_xlabel(col) ax.set_ylabel('频数') ax.legend(loc='upper right', fontsize=8) ax.grid(True, alpha=0.3) plt.suptitle('数值变量分布分析', fontsize=16, y=1.02) plt.tight_layout() plt.show() # 2. 箱线图比较 if len(numeric_cols) > 0: fig, axes = plt.subplots(1, 2, figsize=(15, 6)) # 数值变量箱线图 box_data = [df[col].dropna() for col in numeric_cols] axes[0].boxplot(box_data, labels=numeric_cols, patch_artist=True) axes[0].set_title('数值变量箱线图比较') axes[0].set_ylabel('值') axes[0].grid(True, alpha=0.3) axes[0].tick_params(axis='x', rotation=45) # 添加均值点 for i, col in enumerate(numeric_cols): axes[0].plot(i+1, df[col].mean(), 'ro', markersize=8) # 小提琴图 if len(numeric_cols) <= 6: # 太多变量会太拥挤 violin_data = df[numeric_cols].melt(var_name='变量', value_name='值') sns.violinplot(x='变量', y='值', data=violin_data, ax=axes[1], palette='Set2') axes[1].set_title('数值变量小提琴图') axes[1].tick_params(axis='x', rotation=45) axes[1].grid(True, alpha=0.3) plt.tight_layout() plt.show() # 3. 分类变量分析 if len(categorical_cols) > 0: n_categorical = len(categorical_cols) cols = 2 rows = int(np.ceil(n_categorical / cols)) fig, axes = plt.subplots(rows, cols, figsize=(15, rows*5)) axes = axes.flatten() if n_categorical > 1 else [axes] for i, col in enumerate(categorical_cols): if i < len(axes): # 条形图 value_counts = df[col].value_counts() bars = axes[i].bar(range(len(value_counts)), value_counts.values, color=plt.cm.Set3(np.arange(len(value_counts))/len(value_counts))) # 添加数值标签 for idx, bar in enumerate(bars): height = bar.get_height() axes[i].text(bar.get_x() + bar.get_width()/2., height + max(value_counts.values)*0.01, f'{value_counts.values[idx]}\n({value_counts.values[idx]/len(df)*100:.1f}%)', ha='center', va='bottom', fontsize=9) axes[i].set_title(f'{col} 分布 (共{len(value_counts)}个类别)') axes[i].set_xlabel(col) axes[i].set_ylabel('频数') axes[i].set_xticks(range(len(value_counts))) axes[i].set_xticklabels(value_counts.index, rotation=45, ha='right') axes[i].grid(True, alpha=0.3) # 隐藏多余的子图 for i in range(n_categorical, len(axes)): axes[i].set_visible(False) plt.suptitle('分类变量分布分析', fontsize=16, y=1.02) plt.tight_layout() plt.show() # 4. 相关矩阵热力图 if len(numeric_cols) > 1: fig, axes = plt.subplots(1, 2, figsize=(16, 7)) # 相关矩阵 corr_matrix = df[numeric_cols].corr() # 热力图 im = axes[0].imshow(corr_matrix, cmap='coolwarm', aspect='auto', vmin=-1, vmax=1) axes[0].set_title('相关矩阵热力图') axes[0].set_xticks(range(len(numeric_cols))) axes[0].set_yticks(range(len(numeric_cols))) axes[0].set_xticklabels(numeric_cols, rotation=45, ha='right') axes[0].set_yticklabels(numeric_cols) # 添加数值 for i in range(len(numeric_cols)): for j in range(len(numeric_cols)): text = axes[0].text(j, i, f'{corr_matrix.iloc[i, j]:.2f}', ha="center", va="center", color="black", fontsize=10) plt.colorbar(im, ax=axes[0], fraction=0.046, pad=0.04) # 散点图矩阵(简化版) if len(numeric_cols) <= 5: from pandas.plotting import scatter_matrix scatter_matrix(df[numeric_cols], alpha=0.5, diagonal='kde', ax=axes[1]) axes[1].set_title('散点图矩阵') else: # 显示前5个变量的散点图矩阵 scatter_matrix(df[numeric_cols[:5]], alpha=0.5, diagonal='kde', ax=axes[1]) axes[1].set_title('前5个变量的散点图矩阵') plt.tight_layout() plt.show()# 执行可视化分析visualize_distributions(data)
三、概率分布与统计检验
1. 常见概率分布
def explore_probability_distributions(): """探索常见概率分布""" # 创建数据 np.random.seed(42) n_samples = 10000 distributions = { '正态分布': np.random.normal(0, 1, n_samples), '均匀分布': np.random.uniform(-3, 3, n_samples), '指数分布': np.random.exponential(1, n_samples), '二项分布': np.random.binomial(20, 0.5, n_samples), '泊松分布': np.random.poisson(5, n_samples), '学生t分布': np.random.standard_t(10, n_samples), '卡方分布': np.random.chisquare(5, n_samples), '贝塔分布': np.random.beta(2, 5, n_samples) } # 可视化分布 fig, axes = plt.subplots(4, 2, figsize=(15, 15)) axes = axes.flatten() for idx, (name, data) in enumerate(distributions.items()): ax = axes[idx] # 直方图 ax.hist(data, bins=50, density=True, alpha=0.6, color='steelblue', edgecolor='black') # 核密度估计 from scipy.stats import gaussian_kde kde = gaussian_kde(data) x_range = np.linspace(data.min(), data.max(), 1000) ax.plot(x_range, kde(x_range), 'r-', linewidth=2, alpha=0.8) # 添加理论分布(如果适用) if name == '正态分布': from scipy.stats import norm ax.plot(x_range, norm.pdf(x_range, 0, 1), 'g--', linewidth=2, alpha=0.7, label='理论分布') elif name == '均匀分布': from scipy.stats import uniform ax.plot(x_range, uniform.pdf(x_range, -3, 6), 'g--', linewidth=2, alpha=0.7, label='理论分布') # 统计信息 mean = np.mean(data) median = np.median(data) std = np.std(data) ax.axvline(mean, color='red', linestyle='--', linewidth=2, alpha=0.7, label=f'均值: {mean:.2f}') ax.axvline(median, color='orange', linestyle='--', linewidth=2, alpha=0.7, label=f'中位数: {median:.2f}') ax.set_title(f'{name}\n偏度: {stats.skew(data):.2f}, 峰度: {stats.kurtosis(data):.2f}') ax.set_xlabel('值') ax.set_ylabel('密度') ax.legend(loc='upper right', fontsize=8) ax.grid(True, alpha=0.3) plt.suptitle('常见概率分布比较', fontsize=16, y=1.02) plt.tight_layout() plt.show() # 分布拟合检验 print("=" * 80) print("分布拟合检验") print("=" * 80) # 对正态分布进行拟合检验 normal_data = distributions['正态分布'] # 1. Shapiro-Wilk检验(小样本) if len(normal_data) <= 5000: shapiro_stat, shapiro_p = stats.shapiro(normal_data[:5000]) # Shapiro最多支持5000个样本 print(f"Shapiro-Wilk检验: 统计量={shapiro_stat:.4f}, p值={shapiro_p:.4f}") print(f" 是否正态分布: {'是'if shapiro_p > 0.05else'否'}") # 2. Kolmogorov-Smirnov检验 ks_stat, ks_p = stats.kstest(normal_data, 'norm', args=(0, 1)) print(f"Kolmogorov-Smirnov检验: 统计量={ks_stat:.4f}, p值={ks_p:.4f}") print(f" 是否标准正态分布: {'是'if ks_p > 0.05else'否'}") # 3. Anderson-Darling检验 anderson_result = stats.anderson(normal_data, dist='norm') print(f"Anderson-Darling检验: 统计量={anderson_result.statistic:.4f}") print(f" 显著性水平: {anderson_result.significance_level}") print(f" 临界值: {anderson_result.critical_values}") # 4. QQ图 fig, axes = plt.subplots(1, 2, figsize=(12, 5)) # 正态QQ图 stats.probplot(normal_data, dist="norm", plot=axes[0]) axes[0].set_title('正态QQ图') axes[0].grid(True, alpha=0.3) # PP图 from scipy.stats import probplot probplot(normal_data, dist="norm", plot=axes[1]) axes[1].set_title('正态PP图') axes[1].grid(True, alpha=0.3) plt.tight_layout() plt.show()# 探索概率分布explore_probability_distributions()
2. 假设检验
def comprehensive_hypothesis_testing(df): """综合假设检验""" print("=" * 80) print("假设检验分析") print("=" * 80) # 1. 单样本t检验 print("\n1. 单样本t检验:") print("-" * 40) # 检验平均年龄是否等于35岁 age_data = df['age'].dropna() t_stat, p_value = stats.ttest_1samp(age_data, 35) print(f"假设: 平均年龄 = 35岁") print(f"样本均值: {age_data.mean():.2f}") print(f"t统计量: {t_stat:.4f}, p值: {p_value:.4f}") print(f"结论: {'不能拒绝' if p_value > 0.05 else '拒绝'}原假设") # 2. 双样本t检验 print("\n2. 独立双样本t检验:") print("-" * 40) # 比较男性和女性的收入 male_income = df[df['gender'] == 'Male']['income'] female_income = df[df['gender'] == 'Female']['income'] # 先检验方差齐性 f_stat, f_p = stats.levene(male_income, female_income) equal_var = f_p > 0.05 t_stat, p_value = stats.ttest_ind(male_income, female_income, equal_var=equal_var) print(f"假设: 男性收入 = 女性收入") print(f"男性平均收入: ${male_income.mean():,.2f}") print(f"女性平均收入: ${female_income.mean():,.2f}") print(f"方差齐性检验p值: {f_p:.4f}, 方差{'相等' if equal_var else '不等'}") print(f"t统计量: {t_stat:.4f}, p值: {p_value:.4f}") print(f"结论: {'不能拒绝' if p_value > 0.05 else '拒绝'}原假设") # 3. 配对样本t检验 print("\n3. 配对样本t检验:") print("-" * 40) # 模拟培训前后的测试成绩 np.random.seed(42) before = np.random.normal(70, 10, 100) after = before + np.random.normal(5, 8, 100) # 培训后平均提高5分 t_stat, p_value = stats.ttest_rel(after, before) print(f"假设: 培训前后成绩无差异") print(f"培训前平均分: {before.mean():.2f}") print(f"培训后平均分: {after.mean():.2f}") print(f"t统计量: {t_stat:.4f}, p值: {p_value:.4f}") print(f"结论: {'不能拒绝' if p_value > 0.05 else '拒绝'}原假设") print(f"平均提高: {(after - before).mean():.2f}分") # 4. 卡方检验 print("\n4. 卡方独立性检验:") print("-" * 40) # 检验性别和部门是否独立 contingency_table = pd.crosstab(df['gender'], df['department']) chi2, p_value, dof, expected = stats.chi2_contingency(contingency_table) print("列联表:") print(contingency_table) print(f"\n卡方统计量: {chi2:.4f}") print(f"自由度: {dof}") print(f"p值: {p_value:.4f}") print(f"结论: 性别和部门{'独立' if p_value > 0.05 else '不独立'}") # 5. 方差分析 (ANOVA) print("\n5. 单因素方差分析 (ANOVA):") print("-" * 40) # 比较不同部门的满意度 departments = df['department'].unique() groups = [df[df['department'] == dept]['satisfaction'] for dept in departments] f_stat, p_value = stats.f_oneway(*groups) print(f"假设: 各部门满意度相同") print(f"F统计量: {f_stat:.4f}, p值: {p_value:.4f}") print(f"结论: {'不能拒绝' if p_value > 0.05 else '拒绝'}原假设") # 事后检验 (Tukey HSD) print("\n6. 事后检验 (Tukey HSD):") print("-" * 40) from statsmodels.stats.multicomp import pairwise_tukeyhsd tukey_result = pairwise_tukeyhsd( endog=df['satisfaction'], groups=df['department'], alpha=0.05 ) print(tukey_result) # 可视化结果 fig, axes = plt.subplots(2, 3, figsize=(15, 10)) # 1. 单样本t检验可视化 axes[0, 0].hist(age_data, bins=30, alpha=0.7, color='steelblue', edgecolor='black') axes[0, 0].axvline(35, color='red', linestyle='--', linewidth=3, label='假设均值 (35)') axes[0, 0].axvline(age_data.mean(), color='green', linestyle='--', linewidth=3, label=f'样本均值 ({age_data.mean():.1f})') axes[0, 0].set_title('单样本t检验: 年龄分布') axes[0, 0].set_xlabel('年龄') axes[0, 0].set_ylabel('频数') axes[0, 0].legend() axes[0, 0].grid(True, alpha=0.3) # 2. 双样本t检验可视化 positions = [1, 2] axes[0, 1].boxplot([male_income, female_income], positions=positions, labels=['男性', '女性'], patch_artist=True) axes[0, 1].set_title('双样本t检验: 收入比较') axes[0, 1].set_ylabel('收入 ($)') axes[0, 1].grid(True, alpha=0.3) # 添加显著性标记 if p_value < 0.05: axes[0, 1].text(1.5, max(male_income.max(), female_income.max()) * 1.05, f'p = {p_value:.3f}\n*', ha='center', fontsize=12, color='red') # 3. 配对样本t检验可视化 axes[0, 2].plot([1, 2], [before.mean(), after.mean()], 'bo-', markersize=10, linewidth=2) axes[0, 2].fill_between([1, 2], [before.mean() - before.std(), after.mean() - after.std()], [before.mean() + before.std(), after.mean() + after.std()], alpha=0.2) axes[0, 2].set_xlim(0.5, 2.5) axes[0, 2].set_xticks([1, 2]) axes[0, 2].set_xticklabels(['培训前', '培训后']) axes[0, 2].set_ylabel('平均分') axes[0, 2].set_title('配对样本t检验: 培训效果') axes[0, 2].grid(True, alpha=0.3) # 4. 卡方检验可视化 im = axes[1, 0].imshow(contingency_table.values, cmap='YlOrRd', aspect='auto') axes[1, 0].set_title('卡方检验: 性别×部门列联表') axes[1, 0].set_xlabel('部门') axes[1, 0].set_ylabel('性别') axes[1, 0].set_xticks(range(len(contingency_table.columns))) axes[1, 0].set_yticks(range(len(contingency_table.index))) axes[1, 0].set_xticklabels(contingency_table.columns, rotation=45, ha='right') axes[1, 0].set_yticklabels(contingency_table.index) plt.colorbar(im, ax=axes[1, 0]) # 5. 方差分析可视化 bp = axes[1, 1].boxplot(groups, labels=departments, patch_artist=True) for patch, color in zip(bp['boxes'], plt.cm.Set2(range(len(departments)))): patch.set_facecolor(color) axes[1, 1].set_title('单因素方差分析: 部门满意度比较') axes[1, 1].set_xlabel('部门') axes[1, 1].set_ylabel('满意度') axes[1, 1].tick_params(axis='x', rotation=45) axes[1, 1].grid(True, alpha=0.3) # 6. Tukey HSD结果可视化 tukey_result.plot_simultaneous(ax=axes[1, 2]) axes[1, 2].set_title('Tukey HSD: 多重比较') plt.suptitle('假设检验可视化汇总', fontsize=16, y=1.02) plt.tight_layout() plt.show()# 执行假设检验comprehensive_hypothesis_testing(data)
四、相关性与回归分析
1. 相关性分析
def comprehensive_correlation_analysis(df): """综合相关性分析""" print("=" * 80) print("相关性与回归分析") print("=" * 80) numeric_cols = df.select_dtypes(include=[np.number]).columns # 1. 各种相关系数计算 print("\n1. 相关系数矩阵:") print("-" * 40) # Pearson相关系数 pearson_corr = df[numeric_cols].corr(method='pearson') print("Pearson相关系数 (线性相关):") print(pearson_corr.round(3)) # Spearman秩相关系数 spearman_corr = df[numeric_cols].corr(method='spearman') print("\nSpearman相关系数 (单调相关):") print(spearman_corr.round(3)) # Kendall Tau相关系数 kendall_corr = df[numeric_cols].corr(method='kendall') print("\nKendall Tau相关系数 (秩相关):") print(kendall_corr.round(3)) # 2. 相关性显著性检验 print("\n2. 相关性显著性检验:") print("-" * 40) # 创建示例变量对 variable_pairs = [('age', 'income'), ('experience', 'performance'), ('education_years', 'income')] for var1, var2 in variable_pairs: if var1 in numeric_cols and var2 in numeric_cols: # Pearson相关性检验 pearson_corr_val, pearson_p = stats.pearsonr(df[var1], df[var2]) # Spearman相关性检验 spearman_corr_val, spearman_p = stats.spearmanr(df[var1], df[var2]) # Kendall Tau检验 kendall_corr_val, kendall_p = stats.kendalltau(df[var1], df[var2]) print(f"\n{var1} 与 {var2}:") print(f" Pearson: r = {pearson_corr_val:.3f}, p = {pearson_p:.4f}") print(f" Spearman: ρ = {spearman_corr_val:.3f}, p = {spearman_p:.4f}") print(f" Kendall: τ = {kendall_corr_val:.3f}, p = {kendall_p:.4f}") # 判断相关性强度 abs_corr = abs(pearson_corr_val) if abs_corr < 0.3: strength = "弱相关" elif abs_corr < 0.7: strength = "中等相关" else: strength = "强相关" direction = "正" if pearson_corr_val > 0 else "负" print(f" 结论: {direction}{strength}") # 3. 偏相关和半偏相关 print("\n3. 偏相关分析:") print("-" * 40) # 使用pingouin库进行偏相关分析(如果可用) try: import pingouin as pg # 控制教育年限,分析年龄和收入的偏相关 partial_corr = pg.partial_corr(data=df, x='age', y='income', covar='education_years') print("偏相关 (控制教育年限):") print(partial_corr.round(4)) # 半偏相关 semi_partial_corr = pg.partial_corr(data=df, x='age', y='income', covar=['education_years'], semi=True) print("\n半偏相关 (控制教育年限):") print(semi_partial_corr.round(4)) except ImportError: print("安装pingouin库以进行偏相关分析: pip install pingouin") # 使用statsmodels进行偏相关 import statsmodels.stats.api as sms from statsmodels.sandbox.stats.runs import cochrane_orcutt print("\n使用statsmodels计算偏相关:") # 这里可以添加statsmodels的实现 # 可视化 visualize_correlation_analysis(df, numeric_cols) return pearson_corr, spearman_corr, kendall_corrdef visualize_correlation_analysis(df, numeric_cols): """可视化相关性分析""" fig = plt.figure(figsize=(18, 12)) # 1. 相关矩阵热力图 ax1 = plt.subplot(2, 3, 1) corr_matrix = df[numeric_cols].corr() im = ax1.imshow(corr_matrix, cmap='coolwarm', aspect='auto', vmin=-1, vmax=1) ax1.set_title('相关矩阵热力图') ax1.set_xticks(range(len(numeric_cols))) ax1.set_yticks(range(len(numeric_cols))) ax1.set_xticklabels(numeric_cols, rotation=45, ha='right') ax1.set_yticklabels(numeric_cols) # 添加数值 for i in range(len(numeric_cols)): for j in range(len(numeric_cols)): text = ax1.text(j, i, f'{corr_matrix.iloc[i, j]:.2f}', ha="center", va="center", color="black", fontsize=9) plt.colorbar(im, ax=ax1, fraction=0.046, pad=0.04) # 2. 散点图矩阵 ax2 = plt.subplot(2, 3, 2) if len(numeric_cols) <= 5: from pandas.plotting import scatter_matrix pd.plotting.scatter_matrix(df[numeric_cols[:5]], alpha=0.5, ax=ax2) ax2.set_title('散点图矩阵') else: # 显示相关性最强的几对变量 top_pairs = [] for i in range(len(numeric_cols)): for j in range(i+1, len(numeric_cols)): corr_val = abs(corr_matrix.iloc[i, j]) top_pairs.append((corr_val, numeric_cols[i], numeric_cols[j])) top_pairs.sort(reverse=True) selected_pairs = top_pairs[:3] # 绘制选定的散点图 for idx, (corr_val, var1, var2) in enumerate(selected_pairs): ax = plt.subplot(2, 3, 2 + idx + 1) if idx < 2 else ax2 ax.scatter(df[var1], df[var2], alpha=0.5) ax.set_xlabel(var1) ax.set_ylabel(var2) ax.set_title(f'{var1} vs {var2}\nr = {corr_matrix.loc[var1, var2]:.2f}') ax.grid(True, alpha=0.3) # 3. 相关网络图 ax3 = plt.subplot(2, 3, 4) # 创建相关网络 import networkx as nx G = nx.Graph() # 添加节点 for col in numeric_cols: G.add_node(col, size=df[col].std() * 10) # 添加边(只显示显著相关的边) for i in range(len(numeric_cols)): for j in range(i+1, len(numeric_cols)): corr_val = corr_matrix.iloc[i, j] if abs(corr_val) > 0.3: # 只显示中等以上相关 G.add_edge(numeric_cols[i], numeric_cols[j], weight=abs(corr_val) * 5, color='red' if corr_val > 0 else 'blue') # 绘制网络 pos = nx.spring_layout(G, seed=42) edges = G.edges() colors = [G[u][v]['color'] for u, v in edges] weights = [G[u][v]['weight'] for u, v in edges] nx.draw_networkx_nodes(G, pos, ax=ax3, node_size=500, node_color='lightgray', alpha=0.8) nx.draw_networkx_edges(G, pos, ax=ax3, edgelist=edges, edge_color=colors, width=weights, alpha=0.5) nx.draw_networkx_labels(G, pos, ax=ax3, font_size=10) ax3.set_title('相关网络图 (|r| > 0.3)') ax3.axis('off') # 4. 相关条形图 ax4 = plt.subplot(2, 3, 5) # 选择一个变量(如income)与其他变量的相关性 target_var = 'income' if target_var in numeric_cols: correlations = corr_matrix[target_var].drop(target_var) correlations = correlations.sort_values() colors = ['red' if x < 0 else 'green' for x in correlations.values] bars = ax4.barh(range(len(correlations)), correlations.values, color=colors) # 添加数值标签 for i, bar in enumerate(bars): width = bar.get_width() ax4.text(width, bar.get_y() + bar.get_height()/2, f'{width:.2f}', ha='left' if width >= 0 else 'right', va='center', fontsize=9) ax4.set_yticks(range(len(correlations))) ax4.set_yticklabels(correlations.index) ax4.set_xlabel('相关系数') ax4.set_title(f'{target_var}与其他变量的相关性') ax4.axvline(x=0, color='black', linewidth=0.5) ax4.grid(True, alpha=0.3, axis='x') # 5. 3D散点图 ax5 = plt.subplot(2, 3, 6, projection='3d') if len(numeric_cols) >= 3: # 选择相关性最强的三个变量 var1, var2, var3 = numeric_cols[:3] scatter = ax5.scatter(df[var1], df[var2], df[var3], c=df[numeric_cols[3]] if len(numeric_cols) > 3 else 'blue', alpha=0.5, cmap='viridis') ax5.set_xlabel(var1) ax5.set_ylabel(var2) ax5.set_zlabel(var3) ax5.set_title('3D散点图') if len(numeric_cols) > 3: plt.colorbar(scatter, ax=ax5, label=numeric_cols[3]) plt.suptitle('相关性分析可视化', fontsize=16, y=1.02) plt.tight_layout() plt.show()# 执行相关性分析pearson_corr, spearman_corr, kendall_corr = comprehensive_correlation_analysis(data)
2. 回归分析
def comprehensive_regression_analysis(df): """综合回归分析""" print("=" * 80) print("回归分析") print("=" * 80) # 1. 简单线性回归 print("\n1. 简单线性回归:") print("-" * 40) # 使用statsmodels进行回归分析 X = df['experience'] y = df['performance'] # 添加常数项 X_with_const = sm.add_constant(X) # 拟合模型 model_simple = sm.OLS(y, X_with_const).fit() print("模型: performance = β₀ + β₁ * experience") print(model_simple.summary()) # 2. 多元线性回归 print("\n2. 多元线性回归:") print("-" * 40) # 选择多个自变量 X_multi = df[['age', 'experience', 'education_years', 'satisfaction']] y_multi = df['performance'] X_multi_const = sm.add_constant(X_multi) model_multi = sm.OLS(y_multi, X_multi_const).fit() print("模型: performance = β₀ + β₁*age + β₂*experience + β₃*education + β₄*satisfaction") print(model_multi.summary()) # 3. 逻辑回归(分类问题) print("\n3. 逻辑回归:") print("-" * 40) # 创建二分类目标变量(例如:高绩效 vs 低绩效) df['high_performance'] = (df['performance'] > df['performance'].median()).astype(int) X_logistic = df[['age', 'experience', 'education_years', 'satisfaction']] y_logistic = df['high_performance'] X_logistic_const = sm.add_constant(X_logistic) model_logistic = sm.Logit(y_logistic, X_logistic_const).fit(disp=False) print("模型: logit(P(high_performance)) = β₀ + β₁*age + β₂*experience + β₃*education + β₄*satisfaction") print(model_logistic.summary()) # 4. 回归诊断 print("\n4. 回归诊断:") print("-" * 40) # 检查多重共线性 from statsmodels.stats.outliers_influence import variance_inflation_factor print("多重共线性检查 (VIF):") vif_data = pd.DataFrame() vif_data["变量"] = X_multi.columns vif_data["VIF"] = [variance_inflation_factor(X_multi.values, i) for i in range(X_multi.shape[1])] print(vif_data) print("\nVIF > 10 表示严重多重共线性") # 残差分析 residuals = model_multi.resid fitted = model_multi.fittedvalues print(f"\n残差统计:") print(f" 均值: {residuals.mean():.4f}") print(f" 标准差: {residuals.std():.4f}") print(f" 偏度: {stats.skew(residuals):.4f}") print(f" 峰度: {stats.kurtosis(residuals):.4f}") # 残差正态性检验 _, jb_pvalue = stats.jarque_bera(residuals) _, shapiro_pvalue = stats.shapiro(residuals) print(f"\n残差正态性检验:") print(f" Jarque-Bera检验p值: {jb_pvalue:.4f}") print(f" Shapiro检验p值: {shapiro_pvalue:.4f}") # 异方差性检验 # Breusch-Pagan检验 try: from statsmodels.stats.diagnostic import het_breuschpagan bp_test = het_breuschpagan(residuals, X_multi_const) print(f"\nBreusch-Pagan异方差检验:") print(f" LM统计量: {bp_test[0]:.4f}") print(f" p值: {bp_test[1]:.4f}") print(f" F统计量: {bp_test[2]:.4f}") print(f" F检验p值: {bp_test[3]:.4f}") except: pass # 5. 模型比较 print("\n5. 模型比较:") print("-" * 40) # 比较简单回归和多元回归 models = { '简单回归': model_simple, '多元回归': model_multi, '逻辑回归': model_logistic } comparison = pd.DataFrame({ '模型': list(models.keys()), 'R²/伪R²': [m.rsquared if hasattr(m, 'rsquared') else m.prsquared for m in models.values()], 'AIC': [m.aic for m in models.values()], 'BIC': [m.bic for m in models.values()], '对数似然': [m.llf for m in models.values()], '样本数': [m.nobs for m in models.values()] }) print(comparison.round(4)) # 可视化回归结果 visualize_regression_results(df, model_simple, model_multi, model_logistic) return model_simple, model_multi, model_logisticdef visualize_regression_results(df, model_simple, model_multi, model_logistic): """可视化回归分析结果""" fig = plt.figure(figsize=(18, 12)) # 1. 简单线性回归可视化 ax1 = plt.subplot(2, 3, 1) X_simple = df['experience'] y_simple = df['performance'] ax1.scatter(X_simple, y_simple, alpha=0.5, label='实际值') # 绘制回归线 x_range = np.linspace(X_simple.min(), X_simple.max(), 100) x_range_const = sm.add_constant(x_range) y_pred = model_simple.predict(x_range_const) ax1.plot(x_range, y_pred, 'r-', linewidth=3, label='回归线') # 添加置信区间 predictions = model_simple.get_prediction(x_range_const) ci = predictions.conf_int(alpha=0.05) ax1.fill_between(x_range, ci[:, 0], ci[:, 1], alpha=0.2, color='red', label='95%置信区间') ax1.set_xlabel('经验') ax1.set_ylabel('绩效') ax1.set_title('简单线性回归: 经验 vs 绩效') ax1.legend() ax1.grid(True, alpha=0.3) # 2. 残差图 ax2 = plt.subplot(2, 3, 2) residuals = model_simple.resid fitted = model_simple.fittedvalues ax2.scatter(fitted, residuals, alpha=0.5) ax2.axhline(y=0, color='red', linestyle='--', linewidth=2) ax2.set_xlabel('拟合值') ax2.set_ylabel('残差') ax2.set_title('残差 vs 拟合值图') ax2.grid(True, alpha=0.3) # 3. QQ图 ax3 = plt.subplot(2, 3, 3) stats.probplot(residuals, dist="norm", plot=ax3) ax3.set_title('残差QQ图') ax3.grid(True, alpha=0.3) # 4. 系数可视化 ax4 = plt.subplot(2, 3, 4) # 多元回归系数 coefs = model_multi.params[1:] # 排除常数项 coef_errors = model_multi.bse[1:] y_pos = np.arange(len(coefs)) ax4.barh(y_pos, coefs.values, xerr=coef_errors.values, color=['steelblue', 'lightcoral', 'lightgreen', 'orange']) ax4.set_yticks(y_pos) ax4.set_yticklabels(coefs.index) ax4.set_xlabel('系数值') ax4.set_title('多元回归系数') ax4.axvline(x=0, color='black', linewidth=0.5) ax4.grid(True, alpha=0.3, axis='x') # 5. 预测 vs 实际 ax5 = plt.subplot(2, 3, 5) y_pred_multi = model_multi.fittedvalues y_actual = df['performance'] ax5.scatter(y_actual, y_pred_multi, alpha=0.5) # 添加完美预测线 min_val = min(y_actual.min(), y_pred_multi.min()) max_val = max(y_actual.max(), y_pred_multi.max()) ax5.plot([min_val, max_val], [min_val, max_val], 'r--', linewidth=2, label='完美预测') ax5.set_xlabel('实际值') ax5.set_ylabel('预测值') ax5.set_title('预测 vs 实际') ax5.legend() ax5.grid(True, alpha=0.3) # 计算R² from sklearn.metrics import r2_score r2 = r2_score(y_actual, y_pred_multi) ax5.text(0.05, 0.95, f'R² = {r2:.3f}', transform=ax5.transAxes, fontsize=12, verticalalignment='top', bbox=dict(boxstyle='round', facecolor='white', alpha=0.8)) # 6. 逻辑回归可视化 ax6 = plt.subplot(2, 3, 6) # 使用一个变量进行可视化 X_logistic_var = df['satisfaction'] X_logistic_const = sm.add_constant(X_logistic_var) model_logistic_simple = sm.Logit(df['high_performance'], X_logistic_const).fit(disp=False) x_range = np.linspace(X_logistic_var.min(), X_logistic_var.max(), 100) x_range_const = sm.add_constant(x_range) y_prob = model_logistic_simple.predict(x_range_const) # 散点图(抖动处理) y_jittered = df['high_performance'] + np.random.normal(0, 0.02, len(df)) ax6.scatter(X_logistic_var, y_jittered, alpha=0.3, label='实际数据') # 逻辑回归曲线 ax6.plot(x_range, y_prob, 'r-', linewidth=3, label='逻辑回归曲线') ax6.set_xlabel('满意度') ax6.set_ylabel('高绩效概率') ax6.set_title('逻辑回归: 满意度 vs 高绩效概率') ax6.legend() ax6.grid(True, alpha=0.3) plt.suptitle('回归分析可视化', fontsize=16, y=1.02) plt.tight_layout() plt.show()# 执行回归分析model_simple, model_multi, model_logistic = comprehensive_regression_analysis(data)
五、时间序列分析基础
def time_series_analysis(): """时间序列分析""" print("=" * 80) print("时间序列分析") print("=" * 80) # 创建时间序列数据 np.random.seed(42) n_points = 365 * 3 # 3年的每日数据 dates = pd.date_range(start='2020-01-01', periods=n_points, freq='D') # 生成具有趋势、季节性和随机性的时间序列 trend = np.linspace(100, 200, n_points) seasonal = 50 * np.sin(2 * np.pi * np.arange(n_points) / 365) noise = np.random.normal(0, 10, n_points) ts_data = trend + seasonal + noise # 创建DataFrame ts_df = pd.DataFrame({ 'date': dates, 'value': ts_data, 'year': dates.year, 'month': dates.month, 'day': dates.day, 'weekday': dates.dayofweek, 'quarter': dates.quarter }) ts_df.set_index('date', inplace=True) print(f"时间序列数据: {len(ts_df)} 个观测值") print(f"时间范围: {ts_df.index.min()} 到 {ts_df.index.max()}") # 1. 时间序列分解 print("\n1. 时间序列分解:") print("-" * 40) try: # 使用季节性分解 decomposition = seasonal_decompose(ts_df['value'], model='additive', period=365) print("分解完成 (加法模型)") print(f" 趋势成分: {len(decomposition.trend)} 个点") print(f" 季节成分: {len(decomposition.seasonal)} 个点") print(f" 残差成分: {len(decomposition.resid)} 个点") except Exception as e: print(f"分解失败: {e}") # 使用移动平均进行简单分解 window_size = 30 ts_df['trend'] = ts_df['value'].rolling(window=window_size, center=True).mean() ts_df['detrended'] = ts_df['value'] - ts_df['trend'] # 计算季节性(简单方法) seasonal_pattern = ts_df.groupby(ts_df.index.month)['detrended'].mean() ts_df['seasonal'] = ts_df.index.month.map(seasonal_pattern) ts_df['residual'] = ts_df['detrended'] - ts_df['seasonal'] print("使用移动平均进行简单分解") # 2. 平稳性检验 print("\n2. 平稳性检验:") print("-" * 40) # ADF检验 (Augmented Dickey-Fuller) from statsmodels.tsa.stattools import adfuller adf_result = adfuller(ts_df['value'].dropna()) print("ADF检验:") print(f" ADF统计量: {adf_result[0]:.4f}") print(f" p值: {adf_result[1]:.4f}") print(f" 临界值: {adf_result[4]}") print(f" 结论: 序列{'平稳'if adf_result[1] < 0.05else'非平稳'}") # KPSS检验 try: from statsmodels.tsa.stattools import kpss kpss_result = kpss(ts_df['value'].dropna(), regression='c') print("\nKPSS检验:") print(f" KPSS统计量: {kpss_result[0]:.4f}") print(f" p值: {kpss_result[1]:.4f}") print(f" 临界值: {kpss_result[3]}") except: pass # 3. 自相关和偏自相关分析 print("\n3. 自相关分析:") print("-" * 40) from statsmodels.graphics.tsaplots import plot_acf, plot_pacf # 计算自相关 autocorr = ts_df['value'].autocorr(lag=1) print(f"滞后1自相关: {autocorr:.4f}") # 4. 简单预测模型 print("\n4. 简单预测模型:") print("-" * 40) # 使用ARIMA模型(简化版) try: from statsmodels.tsa.arima.model import ARIMA # 将数据分为训练集和测试集 train_size = int(len(ts_df) * 0.8) train = ts_df['value'][:train_size] test = ts_df['value'][train_size:] # 拟合ARIMA模型 model = ARIMA(train, order=(1, 1, 1)) model_fit = model.fit() print("ARIMA(1,1,1)模型拟合结果:") print(model_fit.summary()) # 预测 forecast_steps = len(test) forecast = model_fit.forecast(steps=forecast_steps) # 计算预测误差 from sklearn.metrics import mean_squared_error, mean_absolute_error mse = mean_squared_error(test, forecast) mae = mean_absolute_error(test, forecast) mape = np.mean(np.abs((test - forecast) / test)) * 100 print(f"\n预测性能:") print(f" 均方误差 (MSE): {mse:.2f}") print(f" 平均绝对误差 (MAE): {mae:.2f}") print(f" 平均绝对百分比误差 (MAPE): {mape:.2f}%") except Exception as e: print(f"ARIMA建模失败: {e}") # 使用简单移动平均进行预测 window = 30 train_ma = train.rolling(window=window).mean() forecast_simple = pd.Series([train_ma.iloc[-1]] * len(test), index=test.index) mse_simple = mean_squared_error(test, forecast_simple) print(f"\n简单移动平均预测 (窗口={window}):") print(f" MSE: {mse_simple:.2f}") # 可视化时间序列分析 visualize_time_series(ts_df, decomposition if 'decomposition' in locals() else None) return ts_dfdef visualize_time_series(ts_df, decomposition=None): """可视化时间序列分析""" fig = plt.figure(figsize=(18, 12)) # 1. 原始时间序列 ax1 = plt.subplot(3, 3, 1) ax1.plot(ts_df.index, ts_df['value'], linewidth=1) ax1.set_xlabel('日期') ax1.set_ylabel('值') ax1.set_title('原始时间序列') ax1.grid(True, alpha=0.3) # 2. 滚动统计量 ax2 = plt.subplot(3, 3, 2) window = 30 ts_df['rolling_mean'] = ts_df['value'].rolling(window=window).mean() ts_df['rolling_std'] = ts_df['value'].rolling(window=window).std() ax2.plot(ts_df.index, ts_df['value'], alpha=0.5, label='原始') ax2.plot(ts_df.index, ts_df['rolling_mean'], 'r-', linewidth=2, label=f'{window}天移动平均') ax2.fill_between(ts_df.index, ts_df['rolling_mean'] - ts_df['rolling_std'], ts_df['rolling_mean'] + ts_df['rolling_std'], alpha=0.2, color='red', label='±1标准差') ax2.set_xlabel('日期') ax2.set_ylabel('值') ax2.set_title('滚动统计量') ax2.legend(loc='upper left', fontsize=8) ax2.grid(True, alpha=0.3) # 3. 季节性分解 if decomposition is not None: ax3 = plt.subplot(3, 3, 3) ax3.plot(ts_df.index, decomposition.observed, alpha=0.5, label='观测值') ax3.plot(ts_df.index, decomposition.trend, 'r-', linewidth=2, label='趋势') ax3.set_xlabel('日期') ax3.set_ylabel('值') ax3.set_title('趋势成分') ax3.legend(loc='upper left') ax3.grid(True, alpha=0.3) else: ax3 = plt.subplot(3, 3, 3) if 'trend' in ts_df.columns: ax3.plot(ts_df.index, ts_df['value'], alpha=0.5, label='原始') ax3.plot(ts_df.index, ts_df['trend'], 'r-', linewidth=2, label='趋势') ax3.set_xlabel('日期') ax3.set_ylabel('值') ax3.set_title('趋势成分 (移动平均)') ax3.legend(loc='upper left') ax3.grid(True, alpha=0.3) # 4. 季节性分析 ax4 = plt.subplot(3, 3, 4) if decomposition is not None: seasonal_values = decomposition.seasonal[:365] # 第一年的季节性 ax4.plot(range(len(seasonal_values)), seasonal_values) ax4.set_xlabel('一年中的天数') ax4.set_ylabel('季节性') ax4.set_title('季节性模式 (第一年)') else: if 'seasonal' in ts_df.columns: # 按月聚合季节性 monthly_seasonal = ts_df.groupby('month')['seasonal'].mean() ax4.bar(range(1, 13), monthly_seasonal.values) ax4.set_xlabel('月份') ax4.set_ylabel('季节性') ax4.set_title('月度季节性模式') ax4.grid(True, alpha=0.3) # 5. 自相关函数 (ACF) ax5 = plt.subplot(3, 3, 5) plot_acf(ts_df['value'].dropna(), lags=40, ax=ax5) ax5.set_title('自相关函数 (ACF)') ax5.grid(True, alpha=0.3) # 6. 偏自相关函数 (PACF) ax6 = plt.subplot(3, 3, 6) plot_pacf(ts_df['value'].dropna(), lags=40, ax=ax6) ax6.set_title('偏自相关函数 (PACF)') ax6.grid(True, alpha=0.3) # 7. 分布分析 ax7 = plt.subplot(3, 3, 7) ax7.hist(ts_df['value'].dropna(), bins=50, density=True, alpha=0.7, edgecolor='black') # 添加核密度估计 from scipy.stats import gaussian_kde kde = gaussian_kde(ts_df['value'].dropna()) x_range = np.linspace(ts_df['value'].min(), ts_df['value'].max(), 1000) ax7.plot(x_range, kde(x_range), 'r-', linewidth=2) ax7.set_xlabel('值') ax7.set_ylabel('密度') ax7.set_title('时间序列值分布') ax7.grid(True, alpha=0.3) # 8. 年度对比 ax8 = plt.subplot(3, 3, 8) years = ts_df['year'].unique() for year in years[:3]: # 只显示前三年 year_data = ts_df[ts_df['year'] == year] ax8.plot(range(1, 13), year_data.groupby('month')['value'].mean(), marker='o', label=f'{year}年') ax8.set_xlabel('月份') ax8.set_ylabel('月平均值') ax8.set_title('年度对比') ax8.legend() ax8.grid(True, alpha=0.3) # 9. 箱线图(季节性) ax9 = plt.subplot(3, 3, 9) if 'month' in ts_df.columns: month_data = [ts_df[ts_df['month'] == month]['value'] for month in range(1, 13)] bp = ax9.boxplot(month_data, labels=range(1, 13), patch_artist=True) # 设置颜色 for patch in bp['boxes']: patch.set_facecolor('lightblue') ax9.set_xlabel('月份') ax9.set_ylabel('值') ax9.set_title('月度箱线图') ax9.grid(True, alpha=0.3) plt.suptitle('时间序列分析可视化', fontsize=16, y=1.02) plt.tight_layout() plt.show()# 执行时间序列分析ts_df = time_series_analysis()
六、统计建模实践项目
def statistical_modeling_project(): """统计建模实践项目""" print("=" * 80) print("统计建模实践项目: 员工绩效预测") print("=" * 80) # 创建更复杂的数据集 np.random.seed(42) n_samples = 5000 project_data = pd.DataFrame({ 'age': np.random.normal(35, 8, n_samples).clip(22, 65).astype(int), 'education': np.random.choice(['高中', '本科', '硕士', '博士'], n_samples, p=[0.2, 0.5, 0.25, 0.05]), 'experience': np.random.exponential(5, n_samples).clip(0, 30), 'training_hours': np.random.poisson(40, n_samples), 'manager_rating': np.random.beta(3, 1.5, n_samples) * 10, 'peer_rating': np.random.beta(2, 2, n_samples) * 10, 'work_hours': np.random.normal(45, 5, n_samples).clip(35, 60), 'job_satisfaction': np.random.beta(4, 2, n_samples) * 10, 'department': np.random.choice(['研发', '销售', '市场', '运营', '财务'], n_samples), 'tenure': np.random.exponential(3, n_samples).clip(0.5, 20) }) # 创建目标变量 - 绩效评分(有复杂关系) # 模拟真实情况:绩效受多个因素影响 base_performance = 60 # 教育程度的影响 education_map = {'高中': 0, '本科': 5, '硕士': 10, '博士': 15} education_effect = project_data['education'].map(education_map) # 经验的影响(边际递减) experience_effect = 2 * np.log(project_data['experience'] + 1) # 培训的影响(有交互效应) training_effect = 0.1 * project_data['training_hours'] * (1 + 0.01 * education_effect) # 评分的影响 rating_effect = 0.5 * project_data['manager_rating'] + 0.3 * project_data['peer_rating'] # 工作时间和满意度的非线性关系 work_hours_effect = 0.2 * project_data['work_hours'] - 0.002 * project_data['work_hours']**2 satisfaction_effect = 1.5 * project_data['job_satisfaction'] # 加入随机噪声 noise = np.random.normal(0, 8, n_samples) # 计算最终绩效 project_data['performance'] = (base_performance + education_effect + experience_effect + training_effect + rating_effect + work_hours_effect + satisfaction_effect + noise) # 限制在合理范围 project_data['performance'] = project_data['performance'].clip(0, 100) print("项目数据预览:") print(project_data.head()) print(f"\n数据集大小: {project_data.shape}") # 项目步骤 print("\n" + "=" * 80) print("项目步骤:") print("1. 探索性数据分析 (EDA)") print("2. 特征工程") print("3. 模型构建与选择") print("4. 模型评估") print("5. 结果解释与业务洞察") print("=" * 80) # 步骤1: 探索性数据分析 print("\n步骤1: 探索性数据分析") print("-" * 40) # 基本统计 print("目标变量 - 绩效评分:") print(project_data['performance'].describe().round(2)) # 相关性分析 numeric_cols = project_data.select_dtypes(include=[np.number]).columns numeric_cols = numeric_cols.drop('performance') # 排除目标变量 correlations = project_data[numeric_cols.tolist() + ['performance']].corr()['performance'].sort_values(ascending=False) print("\n与绩效的相关性:") print(correlations.round(3)) # 步骤2: 特征工程 print("\n步骤2: 特征工程") print("-" * 40) # 数据预处理 from sklearn.preprocessing import StandardScaler, LabelEncoder from sklearn.model_selection import train_test_split # 复制数据 data_processed = project_data.copy() # 编码分类变量 label_encoders = {} for col in ['education', 'department']: le = LabelEncoder() data_processed[col + '_encoded'] = le.fit_transform(data_processed[col]) label_encoders[col] = le # 创建交互特征 data_processed['experience_training'] = data_processed['experience'] * data_processed['training_hours'] data_processed['rating_composite'] = (data_processed['manager_rating'] + data_processed['peer_rating']) / 2 # 创建多项式特征 data_processed['work_hours_sq'] = data_processed['work_hours'] ** 2 data_processed['satisfaction_sq'] = data_processed['job_satisfaction'] ** 2 print(f"原始特征数: {len(project_data.columns)}") print(f"工程后特征数: {len(data_processed.columns)}") # 步骤3: 模型构建 print("\n步骤3: 模型构建与选择") print("-" * 40) # 准备特征和目标变量 feature_cols = ['age', 'experience', 'training_hours', 'manager_rating', 'peer_rating', 'work_hours', 'job_satisfaction', 'tenure', 'education_encoded', 'department_encoded', 'experience_training', 'rating_composite', 'work_hours_sq', 'satisfaction_sq'] X = data_processed[feature_cols] y = data_processed['performance'] # 划分训练集和测试集 X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=42 ) print(f"训练集大小: {X_train.shape}") print(f"测试集大小: {X_test.shape}") # 标准化特征 scaler = StandardScaler() X_train_scaled = scaler.fit_transform(X_train) X_test_scaled = scaler.transform(X_test) # 训练多个模型 from sklearn.linear_model import LinearRegression, Ridge, Lasso from sklearn.tree import DecisionTreeRegressor from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score models = { '线性回归': LinearRegression(), '岭回归': Ridge(alpha=1.0), 'LASSO回归': Lasso(alpha=0.01), '决策树': DecisionTreeRegressor(max_depth=5, random_state=42), '随机森林': RandomForestRegressor(n_estimators=100, max_depth=10, random_state=42), '梯度提升': GradientBoostingRegressor(n_estimators=100, max_depth=5, random_state=42) } results = [] for name, model in models.items(): # 训练模型 model.fit(X_train_scaled, y_train) # 预测 y_train_pred = model.predict(X_train_scaled) y_test_pred = model.predict(X_test_scaled) # 评估 train_mse = mean_squared_error(y_train, y_train_pred) test_mse = mean_squared_error(y_test, y_test_pred) train_r2 = r2_score(y_train, y_train_pred) test_r2 = r2_score(y_test, y_test_pred) train_mae = mean_absolute_error(y_train, y_train_pred) test_mae = mean_absolute_error(y_test, y_test_pred) results.append({ '模型': name, '训练MSE': train_mse, '测试MSE': test_mse, '训练R²': train_r2, '测试R²': test_r2, '训练MAE': train_mae, '测试MAE': test_mae }) results_df = pd.DataFrame(results) print("\n模型性能比较:") print(results_df.round(4)) # 步骤4: 模型评估 print("\n步骤4: 模型评估") print("-" * 40) # 选择最佳模型 best_model_idx = results_df['测试R²'].idxmax() best_model_name = results_df.loc[best_model_idx, '模型'] best_model = models[best_model_name] print(f"最佳模型: {best_model_name}") print(f"测试集R²: {results_df.loc[best_model_idx, '测试R²']:.4f}") print(f"测试集MSE: {results_df.loc[best_model_idx, '测试MSE']:.4f}") print(f"测试集MAE: {results_df.loc[best_model_idx, '测试MAE']:.4f}") # 步骤5: 结果解释 print("\n步骤5: 结果解释与业务洞察") print("-" * 40) # 特征重要性 if hasattr(best_model, 'feature_importances_'): importances = best_model.feature_importances_ feature_importance = pd.DataFrame({ '特征': feature_cols, '重要性': importances }).sort_values('重要性', ascending=False) print("特征重要性:") print(feature_importance.head(10).round(4)) elif hasattr(best_model, 'coef_'): coefficients = best_model.coef_ if len(coefficients.shape) > 1: # 对于多输出模型 coefficients = coefficients[0] feature_importance = pd.DataFrame({ '特征': feature_cols, '系数': coefficients }).sort_values('系数', ascending=False) print("特征系数 (标准化后):") print(feature_importance.round(4)) # 业务洞察 print("\n业务洞察:") print("1. 影响绩效的关键因素:") top_features = feature_importance.head(5)['特征'].tolist() for i, feature in enumerate(top_features, 1): feature_name = feature.replace('_', ' ').title() print(f" {i}. {feature_name}") print("\n2. 建议行动:") print(" - 增加相关培训投入,特别是对有经验的员工") print(" - 优化工作时间安排,避免过长工作时间导致绩效下降") print(" - 加强管理者和同事间的反馈机制") print(" - 关注员工满意度,定期进行满意度调查") # 可视化项目结果 visualize_modeling_project(results_df, best_model, X_test_scaled, y_test, feature_cols) return project_data, results_df, best_modeldef visualize_modeling_project(results_df, best_model, X_test, y_test, feature_cols): """可视化建模项目结果""" fig = plt.figure(figsize=(18, 12)) # 1. 模型性能比较 ax1 = plt.subplot(2, 3, 1) models = results_df['模型'] test_r2 = results_df['测试R²'] bars = ax1.barh(range(len(models)), test_r2.values) # 为最佳模型设置不同颜色 best_idx = test_r2.idxmax() bars[best_idx].set_color('gold') ax1.set_yticks(range(len(models))) ax1.set_yticklabels(models) ax1.set_xlabel('测试集R²') ax1.set_title('模型性能比较') ax1.axvline(x=0, color='black', linewidth=0.5) ax1.grid(True, alpha=0.3, axis='x') # 添加数值标签 for i, bar in enumerate(bars): width = bar.get_width() ax1.text(width, bar.get_y() + bar.get_height()/2, f'{width:.3f}', ha='left' if width >= 0 else 'right', va='center', fontsize=9) # 2. 预测 vs 实际 ax2 = plt.subplot(2, 3, 2) y_pred = best_model.predict(X_test) ax2.scatter(y_test, y_pred, alpha=0.5) # 完美预测线 min_val = min(y_test.min(), y_pred.min()) max_val = max(y_test.max(), y_pred.max()) ax2.plot([min_val, max_val], [min_val, max_val], 'r--', linewidth=2, label='完美预测') # 计算R² from sklearn.metrics import r2_score r2 = r2_score(y_test, y_pred) ax2.set_xlabel('实际绩效') ax2.set_ylabel('预测绩效') ax2.set_title(f'预测 vs 实际 (R² = {r2:.3f})') ax2.legend() ax2.grid(True, alpha=0.3) # 3. 残差分析 ax3 = plt.subplot(2, 3, 3) residuals = y_test - y_pred ax3.scatter(y_pred, residuals, alpha=0.5) ax3.axhline(y=0, color='red', linestyle='--', linewidth=2) ax3.set_xlabel('预测值') ax3.set_ylabel('残差') ax3.set_title('残差分析') ax3.grid(True, alpha=0.3) # 4. 特征重要性 ax4 = plt.subplot(2, 3, 4) if hasattr(best_model, 'feature_importances_'): importances = best_model.feature_importances_ elif hasattr(best_model, 'coef_'): importances = np.abs(best_model.coef_) if len(importances.shape) > 1: importances = importances[0] else: importances = np.ones(len(feature_cols)) / len(feature_cols) # 取前10个最重要的特征 top_n = min(10, len(feature_cols)) sorted_idx = np.argsort(importances)[-top_n:][::-1] top_features = [feature_cols[i] for i in sorted_idx] top_importances = importances[sorted_idx] # 简化特征名称 simplified_features = [] for feature in top_features: simple_name = feature.replace('_', ' ').replace('encoded', '').title() if len(simple_name) > 20: simple_name = simple_name[:17] + '...' simplified_features.append(simple_name) y_pos = np.arange(len(top_features)) bars = ax4.barh(y_pos, top_importances) ax4.set_yticks(y_pos) ax4.set_yticklabels(simplified_features) ax4.set_xlabel('重要性') ax4.set_title('Top 10特征重要性') ax4.grid(True, alpha=0.3, axis='x') # 添加数值标签 for i, bar in enumerate(bars): width = bar.get_width() ax4.text(width, bar.get_y() + bar.get_height()/2, f'{width:.3f}', ha='left', va='center', fontsize=9) # 5. 误差分布 ax5 = plt.subplot(2, 3, 5) ax5.hist(residuals, bins=30, alpha=0.7, edgecolor='black') # 添加正态分布曲线 from scipy.stats import norm mu, std = norm.fit(residuals) x = np.linspace(residuals.min(), residuals.max(), 100) p = norm.pdf(x, mu, std) ax5.plot(x, p * len(residuals) * (residuals.max() - residuals.min()) / 30, 'r-', linewidth=2) ax5.axvline(x=0, color='green', linestyle='--', linewidth=2, label='零误差') ax5.axvline(x=mu, color='red', linestyle='--', linewidth=2, label=f'均值: {mu:.2f}') ax5.set_xlabel('预测误差') ax5.set_ylabel('频数') ax5.set_title('误差分布') ax5.legend() ax5.grid(True, alpha=0.3) # 6. 学习曲线(简化版) ax6 = plt.subplot(2, 3, 6) # 模拟学习曲线 if hasattr(best_model, 'staged_predict'): # 对于梯度提升树 train_scores = [] test_scores = [] for y_pred_stage in best_model.staged_predict(X_test): test_scores.append(r2_score(y_test, y_pred_stage)) ax6.plot(range(1, len(test_scores) + 1), test_scores, 'b-', linewidth=2, label='测试集') ax6.set_xlabel('迭代次数') ax6.set_ylabel('R²') ax6.set_title('学习曲线') ax6.legend() ax6.grid(True, alpha=0.3) else: # 使用简单的模型复杂度曲线 from sklearn.tree import DecisionTreeRegressor depths = range(1, 11) train_scores = [] test_scores = [] for depth in depths: model = DecisionTreeRegressor(max_depth=depth, random_state=42) model.fit(X_train_scaled, y_train) train_scores.append(r2_score(y_train, model.predict(X_train_scaled))) test_scores.append(r2_score(y_test, model.predict(X_test_scaled))) ax6.plot(depths, train_scores, 'b-', linewidth=2, label='训练集') ax6.plot(depths, test_scores, 'r-', linewidth=2, label='测试集') ax6.set_xlabel('树深度') ax6.set_ylabel('R²') ax6.set_title('模型复杂度曲线') ax6.legend() ax6.grid(True, alpha=0.3) plt.suptitle('统计建模项目结果', fontsize=16, y=1.02) plt.tight_layout() plt.show()# 运行统计建模项目project_data, results_df, best_model = statistical_modeling_project()
七、今日练习与挑战
"""今日练习任务:基础练习:1. 创建自己的数据集并执行完整的描述性统计分析2. 对两个变量进行相关性分析,并使用三种不同的相关系数3. 实现一个简单的线性回归模型,并进行诊断检验中级挑战:1. 对真实数据集(如sklearn内置数据集)进行全面的统计分析2. 实现多元线性回归,并处理多重共线性问题3. 进行时间序列分解,识别趋势、季节性和残差成分高级项目:1. 完整的商业分析项目: - 数据收集与清洗 - 探索性数据分析 - 统计建模 - 结果解释与报告生成2. A/B测试分析: - 设计实验 - 收集数据 - 假设检验 - 效应量计算 - 结果解释3. 预测建模比赛: - 参加Kaggle的简单比赛 - 应用今天学到的统计方法 - 优化模型性能"""# 练习模板def practice_exercise(): """练习模板""" # 1. 数据生成 np.random.seed(123) n = 200 exercise_data = pd.DataFrame({ 'study_hours': np.random.normal(20, 5, n).clip(5, 35), 'sleep_hours': np.random.normal(7, 1.5, n).clip(4, 10), 'exercise_hours': np.random.exponential(3, n).clip(0, 10), 'stress_level': np.random.uniform(1, 10, n), 'gpa': np.random.normal(3.0, 0.5, n).clip(1.0, 4.0) }) # 添加一些关系 exercise_data['gpa'] = exercise_data['gpa'] + \ 0.05 * exercise_data['study_hours'] + \ 0.1 * exercise_data['sleep_hours'] + \ (-0.05) * exercise_data['stress_level'] + \ np.random.normal(0, 0.3, n) print("练习数据集:") print(exercise_data.head()) # 你的任务: # 1. 执行描述性统计分析 # 2. 分析各变量与GPA的相关性 # 3. 构建预测GPA的回归模型 # 4. 解释哪些因素对GPA影响最大 return exercise_data# 运行练习exercise_data = practice_exercise()
统计不是数学游戏,而是帮助决策的工具。最好的统计分析师是那些能够将复杂统计概念转化为简单业务洞察的人。