# pip install pandas numpy scikit-learn matplotlib seaborn scipy python-docx openpyxl tabulate# ===========================================# 15. 潜剖面模型 (Latent Profile Analysis, LPA) - Python实现# ===========================================import osimport sysimport pandas as pdimport numpy as npfrom pathlib import Pathimport warningswarnings.filterwarnings('ignore')# 设置matplotlib支持中文import matplotlib.pyplot as pltplt.rcParams['font.sans-serif'] = ['SimHei', 'Arial Unicode MS', 'DejaVu Sans']plt.rcParams['axes.unicode_minus'] = False# 导入必要的库import seaborn as snsfrom sklearn.preprocessing import StandardScalerfrom sklearn.mixture import GaussianMixturefrom sklearn.metrics import silhouette_score, davies_bouldin_scorefrom scipy import statsimport matplotlibmatplotlib.use('Agg') # 使用非交互式后端# 用于生成Word报告from docx import Documentfrom docx.shared import Inches, Ptfrom docx.enum.text import WD_ALIGN_PARAGRAPH# 用于三线表from tabulate import tabulate# 设置桌面路径if sys.platform == "win32": desktop_path = Path.home() / "Desktop"else: desktop_path = Path.home() / "Desktop"# 创建结果文件夹results_dir = desktop_path / "15-LPA结果"results_dir.mkdir(exist_ok=True)# 设置工作目录os.chdir(results_dir)print("=" * 70)print("潜剖面模型(LPA)分析 - Python实现")print("=" * 70)# 1. 读取数据print("\n1. 正在读取数据...")try:# 尝试读取Excel文件 excel_path = desktop_path / "longitudinal_data.xlsx"# 读取两个sheet data_full = pd.read_excel(excel_path, sheet_name='Full_Dataset') data_simple = pd.read_excel(excel_path, sheet_name='Simple_Dataset')print(f"Full Dataset: {data_full.shape[0]}行, {data_full.shape[1]}列")print(f"Simple Dataset: {data_simple.shape[0]}行, {data_simple.shape[1]}列")except FileNotFoundError:print(f"错误: 未找到文件 {excel_path}")print("请确保 longitudinal_data.xlsx 文件在桌面上") sys.exit(1)except Exception as e:print(f"读取数据时出错: {e}") sys.exit(1)# 2. 数据预处理print("\n2. 正在进行数据预处理...")# 选择基线数据(Time=0)baseline_data = data_full[data_full['Time'] == 0].drop_duplicates(subset='ID', keep='first')# 创建LPA分析数据集lpa_columns = ['ID', 'Age', 'Sex', 'Treatment', 'Baseline_Score','Indicator1', 'Indicator2', 'Indicator3','Medication_Adherence', 'Stress_Level', 'Quality_of_Life','Outcome_Continuous', 'Outcome_Binary', 'Outcome_Count','SES', 'Genetic_Marker', 'Latent_Class']# 只选择存在的列available_columns = [col for col in lpa_columns if col in baseline_data.columns]lpa_data = baseline_data[available_columns].copy()# 转换因子变量categorical_cols = ['Sex', 'Treatment', 'SES', 'Genetic_Marker', 'Latent_Class']for col in categorical_cols:if col in lpa_data.columns: lpa_data[col] = lpa_data[col].astype('category')# 处理缺失值print("\n缺失值处理:")numeric_cols = lpa_data.select_dtypes(include=[np.number]).columns.tolist()for col in numeric_cols:if col in lpa_data.columns and lpa_data[col].isnull().any(): median_val = lpa_data[col].median() lpa_data[col].fillna(median_val, inplace=True)print(f" 用中位数填充 {col} 的缺失值: {median_val:.2f}")# 显示数据结构print(f"\nLPA分析数据结构:")print(f"数据维度: {lpa_data.shape}")print(f"变量名: {list(lpa_data.columns)}")# 检查缺失值print("\n缺失值统计:")missing_stats = lpa_data.isnull().sum()if missing_stats.any():print(missing_stats[missing_stats > 0])else:print("无缺失值")# 3. 数据探索性分析print("\n3. 进行数据探索性分析...")# 3.1 创建描述性统计表print("\n生成描述性统计表...")# 选择要分析的变量vars_to_analyze = ['Age', 'Sex', 'Treatment', 'Baseline_Score','Indicator1', 'Indicator2', 'Indicator3','Medication_Adherence', 'Stress_Level', 'Quality_of_Life','Outcome_Continuous', 'Outcome_Binary', 'Outcome_Count','SES', 'Genetic_Marker']# 只选择存在的变量vars_to_analyze = [var for var in vars_to_analyze if var in lpa_data.columns]# 创建描述性统计表desc_stats = []for var in vars_to_analyze:if lpa_data[var].dtype == 'category' or lpa_data[var].nunique() < 10:# 分类变量 value_counts = lpa_data[var].value_counts(dropna=False) total = len(lpa_data[var])for value, count in value_counts.items(): percentage = count / total * 100 desc_stats.append({'变量': var,'类别/统计量': str(value),'频数': count,'百分比(%)': f"{percentage:.1f}%" })else:# 连续变量 desc_stats.append({'变量': var,'类别/统计量': '均值','值': f"{lpa_data[var].mean():.2f}",'百分比(%)': '-' }) desc_stats.append({'变量': var,'类别/统计量': '标准差','值': f"{lpa_data[var].std():.2f}",'百分比(%)': '-' }) desc_stats.append({'变量': var,'类别/统计量': '中位数','值': f"{lpa_data[var].median():.2f}",'百分比(%)': '-' }) desc_stats.append({'变量': var,'类别/统计量': '最小值','值': f"{lpa_data[var].min():.2f}",'百分比(%)': '-' }) desc_stats.append({'变量': var,'类别/统计量': '最大值','值': f"{lpa_data[var].max():.2f}",'百分比(%)': '-' })# 转换为DataFrame并保存desc_df = pd.DataFrame(desc_stats)desc_df.to_csv("描述性统计表.csv", index=False, encoding='utf-8-sig')print("描述性统计表已保存: 描述性统计表.csv")# 4. 潜剖面模型(LPA)分析print("\n4. 开始潜剖面模型(LPA)分析...")# 4.1 准备数据 - 使用连续外显指标manifest_vars_continuous = ['Indicator1', 'Indicator2', 'Indicator3']# 只选择存在的变量manifest_vars_continuous = [var for var in manifest_vars_continuous if var in lpa_data.columns]if len(manifest_vars_continuous) < 2:print("错误: 需要至少2个连续指标进行LPA分析") sys.exit(1)# 创建用于LPA的数据集lpa_analysis_data = lpa_data[manifest_vars_continuous].copy()# 标准化数据scaler = StandardScaler()lpa_scaled = scaler.fit_transform(lpa_analysis_data)lpa_scaled_df = pd.DataFrame(lpa_scaled, columns=manifest_vars_continuous)print(f"\n连续指标标准化完成")# 检查数据print(f"\n用于LPA分析的连续外显指标:")print(f"指标: {', '.join(manifest_vars_continuous)}")print(f"数据维度: {lpa_analysis_data.shape}")print("描述性统计:")print(lpa_analysis_data.describe().round(3))

print("\n使用高斯混合模型(GMM)进行模型比较...")for cov_type in covariance_types:print(f"\n协方差结构: {cov_type}")for n_components in range(1, max_profiles + 1): try:# 拟合高斯混合模型 gmm = GaussianMixture( n_components=n_components, covariance_type=cov_type, max_iter=1000, random_state=42, n_init=5 ) gmm.fit(lpa_scaled)# 计算指标 bic = gmm.bic(lpa_scaled) aic = gmm.aic(lpa_scaled) log_likelihood = gmm.score(lpa_scaled) * len(lpa_scaled)# 计算熵(分类清晰度) probs = gmm.predict_proba(lpa_scaled) entropy = -np.sum(probs * np.log(probs + 1e-10)) / (len(lpa_scaled) * np.log(n_components)) entropy = max(0, 1 - entropy) # 归一化到0-1 model_results.append({'协方差结构': cov_type,'剖面数': n_components,'BIC': bic,'AIC': aic,'对数似然': log_likelihood,'熵': entropy })print(f" 剖面数 {n_components}: BIC={bic:.2f}, AIC={aic:.2f}, 熵={entropy:.3f}")# 更新最佳模型(基于BIC)if bic < best_bic: best_bic = bic best_aic = aic best_model = gmm best_n_components = n_components best_cov_type = cov_type except Exception as e:print(f" 剖面数 {n_components}: 失败 - {e}")# 保存模型比较结果if model_results: model_comparison_df = pd.DataFrame(model_results) model_comparison_df = model_comparison_df.sort_values('BIC') model_comparison_df.to_csv("模型比较结果.csv", index=False, encoding='utf-8-sig')print("\n模型比较结果已保存: 模型比较结果.csv")print(model_comparison_df.round(3).to_string())# 5. 选择最佳模型并提取结果print(f"\n6. 选择最佳潜剖面模型...")if best_model is not None:print(f"最佳模型: {best_cov_type}结构, {best_n_components}个剖面")print(f"BIC: {best_bic:.2f}, AIC: {best_aic:.2f}")# 预测剖面分配 profile_assignments = best_model.predict(lpa_scaled) profile_probs = best_model.predict_proba(lpa_scaled)# 将剖面分配给每个个体 lpa_data['潜剖面'] = profile_assignments + 1 # 从1开始编号 lpa_data['潜剖面'] = lpa_data['潜剖面'].astype('category')# 6.1 计算剖面概率print("\n计算潜剖面分布...") profile_counts = pd.Series(profile_assignments).value_counts().sort_index() profile_proportions = profile_counts / len(profile_assignments) profile_distribution = pd.DataFrame({'剖面': [i + 1 for i in profile_counts.index],'频数': profile_counts.values,'比例': profile_proportions.values })print("\n潜剖面分布:")for _, row in profile_distribution.iterrows():print(f"剖面 {row['剖面']}: {row['比例'] * 100:.1f}% ({int(row['频数'])}人)")# 保存剖面分布 profile_distribution.to_csv("潜剖面分布.csv", index=False, encoding='utf-8-sig')print("潜剖面分布已保存: 潜剖面分布.csv")# 6.2 计算每个剖面的特征print("\n7. 计算每个潜剖面的特征...")# 创建剖面特征汇总表 profile_characteristics_list = []for profile_num in range(1, best_n_components + 1): profile_data = lpa_data[lpa_data['潜剖面'] == profile_num]if len(profile_data) > 0: profile_info = {'潜剖面': profile_num,'样本数': len(profile_data),'比例': len(profile_data) / len(lpa_data) }# 连续变量的均值for var in manifest_vars_continuous:if var in profile_data.columns: profile_info[f'{var}_均值'] = profile_data[var].mean()# 其他连续变量 other_continuous = ['Medication_Adherence', 'Stress_Level', 'Quality_of_Life','Age', 'Baseline_Score', 'Outcome_Continuous']for var in other_continuous:if var in profile_data.columns: profile_info[f'{var}_均值'] = profile_data[var].mean()# 分类变量的比例if'Sex'in profile_data.columns: profile_info['女性比例'] = (profile_data['Sex'] == 'F').mean() if'F'in profile_data['Sex'].values else 0if'Treatment'in profile_data.columns:for treatment in ['A', 'B', 'C', 'D']:if treatment in profile_data['Treatment'].values: profile_info[f'治疗{treatment}_比例'] = (profile_data['Treatment'] == treatment).mean()if'SES'in profile_data.columns:for ses in ['Low', 'Medium', 'High']:if ses in profile_data['SES'].values: profile_info[f'SES_{ses}_比例'] = (profile_data['SES'] == ses).mean() profile_characteristics_list.append(profile_info) profile_characteristics = pd.DataFrame(profile_characteristics_list)# 四舍五入 numeric_cols = profile_characteristics.select_dtypes(include=[np.number]).columnsfor col in numeric_cols:if col not in ['潜剖面', '样本数']: profile_characteristics[col] = profile_characteristics[col].round(3)# 保存剖面特征 profile_characteristics.to_csv("剖面特征.csv", index=False, encoding='utf-8-sig')print("剖面特征已保存: 剖面特征.csv")# 7. 可视化print("\n8. 生成潜剖面可视化...")# 7.1 剖面连线图(均值剖面)print("\n生成剖面连线图...") try:# 准备数据 profile_means_data = []for profile_num in range(1, best_n_components + 1): profile_data = lpa_data[lpa_data['潜剖面'] == profile_num]for var in manifest_vars_continuous:if var in profile_data.columns: profile_means_data.append({'潜剖面': f'剖面{profile_num}','变量': var,'均值': profile_data[var].mean() })if profile_means_data: profile_means_df = pd.DataFrame(profile_means_data)# 创建图形 plt.figure(figsize=(10, 8))# 为每个剖面创建线图for profile in profile_means_df['潜剖面'].unique(): profile_data = profile_means_df[profile_means_df['潜剖面'] == profile] plt.plot(profile_data['变量'], profile_data['均值'], marker='o', linewidth=2, markersize=8, label=profile) plt.title('潜剖面均值剖面图', fontsize=16, fontweight='bold') plt.xlabel('指标', fontsize=12) plt.ylabel('均值', fontsize=12) plt.legend(title='潜剖面') plt.grid(True, alpha=0.3) plt.tight_layout() plt.savefig('潜剖面连线图.png', dpi=300) plt.close()print("剖面连线图已保存: 潜剖面连线图.png") except Exception as e:print(f"剖面连线图生成失败: {e}")# 7.2 剖面箱线图print("\n生成剖面箱线图...") try:# 准备数据 boxplot_vars = manifest_vars_continuous + ['Medication_Adherence', 'Stress_Level', 'Quality_of_Life'] boxplot_vars = [var for var in boxplot_vars if var in lpa_data.columns]if boxplot_vars:# 创建子图 n_vars = len(boxplot_vars) n_cols = min(3, n_vars) n_rows = (n_vars + n_cols - 1) // n_cols fig, axes = plt.subplots(n_rows, n_cols, figsize=(15, 5 * n_rows))if n_vars == 1: axes = [axes]else: axes = axes.flatten()for idx, var in enumerate(boxplot_vars):if idx < len(axes):# 创建箱线图数据 plot_data = [] profile_labels = []for profile_num in sorted(lpa_data['潜剖面'].unique()): profile_values = lpa_data[lpa_data['潜剖面'] == profile_num][var].dropna()if len(profile_values) > 0: plot_data.append(profile_values) profile_labels.append(f'剖面{profile_num}')if plot_data: axes[idx].boxplot(plot_data, labels=profile_labels) axes[idx].set_title(var, fontsize=12) axes[idx].set_xlabel('潜剖面') axes[idx].set_ylabel('值') axes[idx].grid(True, alpha=0.3)# 移除多余的子图for idx in range(len(boxplot_vars), len(axes)): fig.delaxes(axes[idx]) plt.suptitle('各潜剖面的指标分布箱线图', fontsize=16, fontweight='bold') plt.tight_layout() plt.savefig('潜剖面箱线图.png', dpi=300) plt.close()print("剖面箱线图已保存: 潜剖面箱线图.png") except Exception as e:print(f"剖面箱线图生成失败: {e}")# 7.3 剖面概率分布图print("\n生成剖面概率分布图...") try: plt.figure(figsize=(10, 8)) bars = plt.bar(profile_distribution['剖面'].astype(str), profile_distribution['比例'], color=plt.cm.viridis(np.linspace(0, 1, len(profile_distribution))))# 添加数值标签for i, (idx, row) in enumerate(profile_distribution.iterrows()): plt.text(i, row['比例'] + 0.01, f"{row['比例'] * 100:.1f}%\n(n={int(row['频数'])})", ha='center', va='bottom', fontsize=10) plt.title('潜剖面分布', fontsize=16, fontweight='bold') plt.xlabel('潜剖面', fontsize=12) plt.ylabel('比例', fontsize=12) plt.ylim(0, max(profile_distribution['比例']) * 1.2) plt.grid(True, alpha=0.3, axis='y') plt.tight_layout() plt.savefig('潜剖面分布图.png', dpi=300) plt.close()print("剖面概率分布图已保存: 潜剖面分布图.png") except Exception as e:print(f"剖面概率分布图生成失败: {e}")# 8. 剖面验证和外部效度分析print("\n9. 进行潜剖面验证和外部效度分析...")# 8.1 剖面与人口学变量的关系print("\n分析潜剖面与人口学变量的关系...") demographic_results = []# 性别分析if'Sex'in lpa_data.columns and '潜剖面'in lpa_data.columns: sex_crosstab = pd.crosstab(lpa_data['Sex'], lpa_data['潜剖面'], normalize='columns') * 100for sex in sex_crosstab.index:for profile in sex_crosstab.columns: demographic_results.append({'变量': '性别','类别': sex,'潜剖面': profile,'频数': len(lpa_data[(lpa_data['Sex'] == sex) & (lpa_data['潜剖面'] == profile)]),'百分比(%)': sex_crosstab.loc[sex, profile] })# 保存人口学分析结果if demographic_results: demographic_df = pd.DataFrame(demographic_results) demographic_df.to_csv("人口学分析.csv", index=False, encoding='utf-8-sig')print("人口学分析结果已保存: 人口学分析.csv")# 9. 创建综合分析报告print("\n10. 创建综合分析报告...")# 9.1 剖面特征汇总表 try:if not profile_characteristics.empty: comprehensive_table = profile_characteristics.copy()# 重命名列 rename_dict = {'潜剖面': '剖面','样本数': '样本数','比例': '比例' }for col in comprehensive_table.columns:if col.endswith('_均值'): rename_dict[col] = col.replace('_均值', '均值')elif col.endswith('_比例'): rename_dict[col] = col.replace('_比例', '比例') comprehensive_table = comprehensive_table.rename(columns=rename_dict)# 格式化if'比例'in comprehensive_table.columns: comprehensive_table['比例'] = comprehensive_table['比例'].apply(lambda x: f"{x * 100:.1f}%")# 保存三线表 comprehensive_table.to_csv("综合分析表.csv", index=False, encoding='utf-8-sig')print("综合分析表已保存: 综合分析表.csv") except Exception as e:print(f"综合分析表创建失败: {e}")# 9.2 模型拟合指标表print("\n创建模型拟合指标表...") try: fit_summary = pd.DataFrame({'指标': ['剖面数', '模型结构', '样本量', 'AIC', 'BIC', '熵'],'值': [ str(best_n_components), best_cov_type, str(len(lpa_data)), f"{best_aic:.3f}", f"{best_bic:.3f}", f"{entropy:.3f}"if'entropy'in locals() else"N/A" ] }) fit_summary.to_csv("模型拟合指标.csv", index=False, encoding='utf-8-sig')print("模型拟合指标表已保存: 模型拟合指标.csv") except Exception as e:print(f"模型拟合指标表创建失败: {e}")# 10. 创建Word分析报告print("\n11. 正在生成Word分析报告...") try:# 创建Word文档 doc = Document()# 添加标题 title = doc.add_heading('潜剖面模型(LPA)分析报告', 0) title.alignment = WD_ALIGN_PARAGRAPH.CENTER# 添加日期 from datetime import datetime date_para = doc.add_paragraph(f'生成日期:{datetime.now().strftime("%Y年%m月%d日")}') date_para.alignment = WD_ALIGN_PARAGRAPH.CENTER doc.add_paragraph()# 1. 研究概述 doc.add_heading('1. 研究概述', level=1) doc.add_paragraph( f'本研究基于{len(lpa_data)}名个体的基线数据,使用潜剖面模型探索个体在多个连续健康指标上的得分模式,识别潜在的健康风险亚组。') doc.add_paragraph(f'连续外显指标:{", ".join(manifest_vars_continuous)}') doc.add_paragraph()# 2. 数据描述 doc.add_heading('2. 数据描述', level=1) doc.add_paragraph(f'样本量:{len(lpa_data)}名个体') doc.add_paragraph(f'连续指标数:{len(manifest_vars_continuous)}') doc.add_paragraph()# 3. 模型选择结果 doc.add_heading('3. 模型选择结果', level=1) doc.add_paragraph(f'基于信息准则(BIC、AIC)和熵值,选择{best_n_components}个潜剖面的模型作为最终模型。') doc.add_paragraph(f'模型结构:{best_cov_type}') doc.add_paragraph(f'BIC:{best_bic:.3f}') doc.add_paragraph(f'AIC:{best_aic:.3f}')if'entropy'in locals(): doc.add_paragraph(f'熵:{entropy:.3f}(值越接近1表示分类越清晰)') doc.add_paragraph()# 4. 潜剖面结果 doc.add_heading('4. 潜剖面结果', level=1) doc.add_heading('4.1 潜剖面分布', level=2)for _, row in profile_distribution.iterrows(): doc.add_paragraph(f'剖面{row["剖面"]}:{row["比例"] * 100:.1f}%({int(row["频数"])}人)') doc.add_paragraph()# 5. 结论与建议 doc.add_heading('5. 结论与建议', level=1) doc.add_paragraph('本研究通过潜剖面模型分析了健康指标的模式识别,为精准公共卫生干预提供了科学依据。建议进一步开展纵向研究以验证剖面的稳定性。')# 添加图片(如果存在) image_files = ['潜剖面连线图.png', '潜剖面箱线图.png', '潜剖面分布图.png']for img_file in image_files:if os.path.exists(img_file): doc.add_heading(f'图表:{img_file}', level=2) doc.add_picture(img_file, width=Inches(6)) doc.add_paragraph()# 保存Word文档 word_file = f"LPA分析报告_{datetime.now().strftime('%Y%m%d')}.docx" doc.save(word_file)print(f"Word分析报告已保存: {word_file}") except Exception as e:print(f"Word报告生成失败: {e}")# 创建最简单的报告 try: simple_doc = Document() simple_doc.add_heading('潜剖面模型(LPA)分析报告', 0) simple_doc.add_paragraph(f'生成日期:{datetime.now().strftime("%Y年%m月%d日")}') simple_doc.add_paragraph() simple_doc.add_paragraph(f'分析完成。最佳模型为{best_n_components}个剖面,模型结构为{best_cov_type}。') simple_doc.add_paragraph('详细结果请查看CSV文件。') simple_word_file = f"LPA简易报告_{datetime.now().strftime('%Y%m%d')}.docx" simple_doc.save(simple_word_file)print(f"简易Word报告已保存: {simple_word_file}") except:print("无法生成任何Word报告")else:print("没有成功拟合的模型,无法进行后续分析")# 11. 保存工作空间print("\n12. 保存工作空间...")try: import pickle workspace_data = {'lpa_data': lpa_data,'best_model': best_model,'best_n_components': best_n_components,'best_cov_type': best_cov_type,'profile_assignments': profile_assignments if'profile_assignments'in locals() else None,'profile_characteristics': profile_characteristics if'profile_characteristics'in locals() else None } with open('LPA分析工作空间.pkl', 'wb') as f: pickle.dump(workspace_data, f)print("工作空间已保存: LPA分析工作空间.pkl")except Exception as e:print(f"保存工作空间失败: {e}")# 12. 汇总输出print("\n" + "=" * 70)print("LPA分析完成!")print("=" * 70)print("\n结果文件汇总:")result_files = ["描述性统计表.csv","模型比较结果.csv","潜剖面分布.csv","剖面特征.csv","人口学分析.csv","综合分析表.csv","模型拟合指标.csv","LPA分析工作空间.pkl"]for file in result_files:if os.path.exists(file):print(f"✓ {file}")print("\n可视化文件汇总:")visual_files = ["潜剖面连线图.png", "潜剖面箱线图.png", "潜剖面分布图.png"]for file in visual_files:if os.path.exists(file):print(f"✓ {file}")print("\n报告文件汇总:")report_files = [f for f in os.listdir() if f.endswith('.docx') and 'LPA'in f]for file in report_files:print(f"✓ {file}")print(f"\n分析完成!所有结果已保存在 {results_dir} 文件夹中。")