# pip install pandas numpy matplotlib seaborn scikit-learn statsmodels factor_analyzer openpyxl python-docximport osimport sysimport warningsimport numpy as npimport pandas as pdimport matplotlib.pyplot as pltimport seaborn as snsfrom pathlib import Pathfrom datetime import datetimefrom typing import List, Dict, Any, Tuple, Optionalimport mathimport json# 数据分析和建模from sklearn.preprocessing import LabelEncoder, KBinsDiscretizerfrom sklearn.impute import SimpleImputerfrom sklearn.mixture import GaussianMixturefrom sklearn.metrics import silhouette_score, calinski_harabasz_score, davies_bouldin_scorefrom sklearn.cluster import KMeansfrom scipy import statsimport statsmodels.api as smfrom statsmodels.miscmodels.ordinal_model import OrderedModelimport factor_analyzerfrom factor_analyzer import FactorAnalyzer# 可视化设置plt.style.use('seaborn-v0_8-whitegrid')sns.set_palette("husl")plt.rcParams['figure.figsize'] = (12, 8)plt.rcParams['font.size'] = 12warnings.filterwarnings('ignore')# Word报告生成try: from docx import Document from docx.shared import Inches, Pt, RGBColor from docx.enum.text import WD_ALIGN_PARAGRAPH from docx.enum.table import WD_TABLE_ALIGNMENT from docx.oxml.ns import qn HAS_DOCX = Trueexcept ImportError:print("警告: python-docx未安装,Word报告功能将不可用") HAS_DOCX = Falsedef setup_environment():"""设置环境和工作目录"""# 获取桌面路径if sys.platform == 'win32': desktop_path = Path.home() / 'Desktop'else: desktop_path = Path.home() / 'Desktop'# 创建结果文件夹 results_dir = desktop_path / "14-LCM结果" results_dir.mkdir(exist_ok=True)# 设置工作目录 os.chdir(results_dir)print("=" * 70)print("潜类别模型(LCM)分析 - Python实现")print(f"结果保存路径: {results_dir}")print("=" * 70)return results_dirdef read_data(desktop_path: Path) -> Tuple[pd.DataFrame, pd.DataFrame]:"""读取数据"""print("\n正在读取数据...") data_file = desktop_path / "longitudinal_data.xlsx" try:# 读取Excel文件的两个工作表 data_full = pd.read_excel(data_file, sheet_name='Full_Dataset') data_simple = pd.read_excel(data_file, sheet_name='Simple_Dataset')print(f"Full Dataset: {data_full.shape[0]}行, {data_full.shape[1]}列")print(f"Simple Dataset: {data_simple.shape[0]}行, {data_simple.shape[1]}列")return data_full, data_simple except Exception as e:print(f"读取数据时出错: {e}")# 创建示例数据用于测试print("创建示例数据用于演示...")return create_sample_data()def create_sample_data() -> Tuple[pd.DataFrame, pd.DataFrame]:"""创建示例数据(当真实数据不可用时)""" np.random.seed(123) n = 500# 创建完整数据集 data = {'ID': range(1, n + 1),'Time': np.random.choice([0, 1, 2], n),'Age': np.random.normal(45, 10, n).clip(18, 80),'Sex': np.random.choice(['Male', 'Female'], n, p=[0.55, 0.45]),'Treatment': np.random.choice(['A', 'B', 'C'], n, p=[0.4, 0.3, 0.3]),'Baseline_Score': np.random.normal(50, 15, n).clip(0, 100),'Indicator1': np.random.normal(60, 20, n).clip(0, 100),'Indicator2': np.random.normal(55, 18, n).clip(0, 100),'Indicator3': np.random.normal(65, 22, n).clip(0, 100),'Medication_Adherence': np.random.beta(2, 1, n) * 100,'Stress_Level': np.random.choice(range(1, 11), n),'Quality_of_Life': np.random.normal(70, 15, n).clip(0, 100),'Outcome_Continuous': np.random.normal(0, 1, n),'Outcome_Binary': np.random.binomial(1, 0.3, n),'Outcome_Count': np.random.poisson(3, n),'SES': np.random.choice(['Low', 'Medium', 'High'], n, p=[0.3, 0.5, 0.2]),'Genetic_Marker': np.random.choice(['A', 'B', 'C', 'D'], n),'Latent_Class': np.random.choice([1, 2, 3], n, p=[0.4, 0.35, 0.25]) } data_full = pd.DataFrame(data) data_simple = data_full.copy()return data_full, data_simpledef preprocess_data(data_full: pd.DataFrame) -> pd.DataFrame:"""数据预处理"""print("\n正在进行数据预处理...")# 选择基线数据(Time=0) baseline_data = data_full[data_full['Time'] == 0].copy() baseline_data = baseline_data.drop_duplicates(subset='ID', keep='first')# 选择LCM分析需要的列 lcm_columns = ['ID', 'Age', 'Sex', 'Treatment', 'Baseline_Score','Indicator1', 'Indicator2', 'Indicator3','Medication_Adherence', 'Stress_Level', 'Quality_of_Life','Outcome_Continuous', 'Outcome_Binary', 'Outcome_Count','SES', 'Genetic_Marker', 'Latent_Class' ]# 检查数据中是否有这些列 available_columns = [col for col in lcm_columns if col in baseline_data.columns] lcm_data = baseline_data[available_columns].copy()# 转换分类变量 categorical_cols = ['Sex', 'Treatment', 'SES', 'Genetic_Marker', 'Latent_Class']for col in categorical_cols:if col in lcm_data.columns: lcm_data[col] = lcm_data[col].astype('category')# 确保SES的顺序if'SES'in lcm_data.columns: lcm_data['SES'] = pd.Categorical( lcm_data['SES'], categories=['Low', 'Medium', 'High'], ordered=True )# 创建分类变量(类似于R中的safe_cut函数) def safe_cut(x, n_categories=3, labels=None):"""安全地将连续变量转换为分类变量"""if labels is None: labels = [f'Cat{i + 1}'for i in range(n_categories)]# 检查数据是否有变异if x.nunique() < 2:return pd.Series([labels[0]] * len(x), index=x.index) try:# 使用分位数分割if n_categories == 3: bins = pd.qcut(x, q=[0, 0.33, 0.67, 1], labels=labels, duplicates='drop')elif n_categories == 2: bins = pd.qcut(x, q=[0, 0.5, 1], labels=labels[:2], duplicates='drop')else: bins = pd.cut(x, bins=n_categories, labels=labels[:n_categories])return bins except Exception as e:print(f"警告: 使用等宽分箱代替分位数分箱 ({e})")return pd.cut(x, bins=n_categories, labels=labels[:n_categories])# 创建分类变量 indicator_cols = ['Indicator1', 'Indicator2', 'Indicator3']for col in indicator_cols:if col in lcm_data.columns: cat_col = f'{col}_cat' lcm_data[cat_col] = safe_cut( lcm_data[col], n_categories=3, labels=['Low', 'Medium', 'High'] )# 其他变量的分类转换if'Medication_Adherence'in lcm_data.columns: lcm_data['Medication_Adherence_cat'] = pd.cut( lcm_data['Medication_Adherence'], bins=3, labels=['Low', 'Medium', 'High'] )if'Stress_Level'in lcm_data.columns: conditions = [ (lcm_data['Stress_Level'] <= 3), (lcm_data['Stress_Level'] <= 7), (lcm_data['Stress_Level'] > 7) ] choices = ['Low', 'Medium', 'High'] lcm_data['Stress_Level_cat'] = np.select(conditions, choices, default='Medium') lcm_data['Stress_Level_cat'] = pd.Categorical( lcm_data['Stress_Level_cat'], categories=['Low', 'Medium', 'High'], ordered=True )if'Quality_of_Life'in lcm_data.columns: lcm_data['Quality_of_Life_cat'] = pd.cut( lcm_data['Quality_of_Life'], bins=3, labels=['Low', 'Medium', 'High'] )# 处理缺失值print("处理缺失值...")# 数值变量用中位数填充 numeric_cols = lcm_data.select_dtypes(include=[np.number]).columnsfor col in numeric_cols:if lcm_data[col].isnull().any(): median_val = lcm_data[col].median() lcm_data[col].fillna(median_val, inplace=True)print(f" 用中位数填充 {col} 的缺失值: {median_val:.2f}")# 分类变量用众数填充 cat_cols_new = ['Indicator1_cat', 'Indicator2_cat', 'Indicator3_cat','Medication_Adherence_cat', 'Stress_Level_cat', 'Quality_of_Life_cat' ]for col in cat_cols_new:if col in lcm_data.columns:# 确保是分类类型 lcm_data[col] = lcm_data[col].astype('category')# 用众数填充缺失值if lcm_data[col].isnull().any(): mode_val = lcm_data[col].mode()[0] if not lcm_data[col].mode().empty else'Low' lcm_data[col].fillna(mode_val, inplace=True)print(f" 用众数填充 {col} 的缺失值: {mode_val}")# 显示数据结构print(f"\nLCM分析数据结构:")print(f"数据维度: {lcm_data.shape}")print(f"变量名: {list(lcm_data.columns)}")# 检查分类变量的水平数print("\n分类变量水平数检查:") all_cat_cols = categorical_cols + cat_cols_newfor col in all_cat_cols:if col in lcm_data.columns:print(f" {col}: {lcm_data[col].nunique()}个水平")# 检查缺失值 missing_stats = lcm_data.isnull().sum()if missing_stats.any():print("\n缺失值统计:")print(missing_stats[missing_stats > 0])else:print("\n无缺失值")return lcm_datadef exploratory_analysis(lcm_data: pd.DataFrame):"""探索性数据分析"""print("\n进行数据探索性分析...")# 3.1 分类变量的分布 categorical_vars = ['Sex', 'Treatment', 'SES', 'Genetic_Marker','Indicator1_cat', 'Indicator2_cat', 'Indicator3_cat','Medication_Adherence_cat', 'Stress_Level_cat', 'Quality_of_Life_cat' ]# 过滤实际存在的列 categorical_vars = [col for col in categorical_vars if col in lcm_data.columns]# 创建分类变量分布图 fig, axes = plt.subplots(math.ceil(len(categorical_vars) / 3), 3, figsize=(20, 15)) axes = axes.flatten()for i, var in enumerate(categorical_vars): ax = axes[i]if var in lcm_data.columns: value_counts = lcm_data[var].value_counts().sort_index() bars = ax.bar(range(len(value_counts)), value_counts.values, alpha=0.7, color='steelblue') ax.set_title(f'{var}分布', fontsize=14) ax.set_xlabel(var, fontsize=12) ax.set_ylabel('频数', fontsize=12) ax.set_xticks(range(len(value_counts))) ax.set_xticklabels(value_counts.index, rotation=45, ha='right')# 在柱子上方添加数值for bar, count in zip(bars, value_counts.values): height = bar.get_height() ax.text(bar.get_x() + bar.get_width() / 2., height, f'{count}', ha='center', va='bottom', fontsize=10)# 隐藏多余的子图for j in range(i + 1, len(axes)): axes[j].set_visible(False) plt.tight_layout() plt.savefig('Categorical_Variable_Distributions.png', dpi=150, bbox_inches='tight')print("分类变量分布图已保存: Categorical_Variable_Distributions.png") plt.close()# 3.2 交叉表分析print("\n生成变量间交叉表...") cross_tabs = {}if all(col in lcm_data.columns for col in ['Sex', 'Treatment']): cross_tabs['Sex_Treatment'] = pd.crosstab(lcm_data['Sex'], lcm_data['Treatment'])if all(col in lcm_data.columns for col in ['SES', 'Stress_Level_cat']): cross_tabs['SES_Stress'] = pd.crosstab(lcm_data['SES'], lcm_data['Stress_Level_cat'])if all(col in lcm_data.columns for col in ['Treatment', 'Medication_Adherence_cat']): cross_tabs['Treatment_Adherence'] = pd.crosstab(lcm_data['Treatment'], lcm_data['Medication_Adherence_cat'])# 保存交叉表到Excel with pd.ExcelWriter('Cross_Tabulations.xlsx', engine='openpyxl') as writer:for name, table in cross_tabs.items(): table.to_excel(writer, sheet_name=name)print("交叉表已保存: Cross_Tabulations.xlsx")# 3.3 描述性统计三线表print("\n生成描述性统计三线表...")# 选择要分析的变量 vars_to_analyze = ['Age', 'Sex', 'Treatment', 'Baseline_Score','Indicator1', 'Indicator2', 'Indicator3','Medication_Adherence', 'Stress_Level', 'Quality_of_Life','Outcome_Continuous', 'Outcome_Binary', 'Outcome_Count','SES', 'Genetic_Marker' ]# 过滤实际存在的列 vars_to_analyze = [col for col in vars_to_analyze if col in lcm_data.columns]# 创建描述性统计表 desc_stats = pd.DataFrame()for var in vars_to_analyze:if pd.api.types.is_numeric_dtype(lcm_data[var]):# 连续变量 stats_dict = {'Variable': var,'Type': 'Continuous','N': lcm_data[var].count(),'Mean': lcm_data[var].mean(),'SD': lcm_data[var].std(),'Min': lcm_data[var].min(),'Median': lcm_data[var].median(),'Max': lcm_data[var].max(),'Missing': lcm_data[var].isnull().sum() } desc_stats = pd.concat([desc_stats, pd.DataFrame([stats_dict])], ignore_index=True)else:# 分类变量 value_counts = lcm_data[var].value_counts(dropna=False) total = value_counts.sum()for level, count in value_counts.items(): percentage = count / total * 100 stats_dict = {'Variable': f"{var} - {level}",'Type': 'Categorical','N': count,'Percentage': f"{percentage:.1f}%",'Missing': 0 } desc_stats = pd.concat([desc_stats, pd.DataFrame([stats_dict])], ignore_index=True)# 保存描述性统计表 desc_stats.to_csv('Descriptive_Statistics_Table.csv', index=False, encoding='utf-8-sig')print("描述性统计三线表已保存: Descriptive_Statistics_Table.csv")# 打印表格print("\n描述性统计三线表:")print(desc_stats.to_string())return cross_tabs, desc_stats

# 创建用于LCA的数据集 lca_data = lcm_data[manifest_vars_simple].copy()# 将所有分类变量转换为数值编码 le_dict = {}for col in manifest_vars_simple: le = LabelEncoder() lca_data[col] = le.fit_transform(lca_data[col].astype(str)) le_dict[col] = leprint("\n用于LCA分析的数据结构:")print(f"数据维度: {lca_data.shape}")print(f"前几行数据:")print(lca_data.head())# 检查每个变量的水平print("\n外显指标水平:")for col in manifest_vars_simple:print(f" {col}: {lca_data[col].nunique()}个水平")return lca_data, le_dictdef fit_lca_models(lca_data: pd.DataFrame, max_classes: int = 4):"""拟合LCA模型并评估不同类别数"""print("\n确定最佳潜类别数...")# 存储拟合指标 fit_stats = [] models = [] X = lca_data.valuesfor k in range(1, max_classes + 1):print(f"\n尝试拟合 {k} 个潜类别的模型...") try:if k == 1:# 单类别模型特殊处理 log_likelihood = -np.inf # 简化的处理 aic = np.inf bic = np.inf n_params = 0 model = None posterior_probs = np.ones((len(X), 1)) labels = np.zeros(len(X))else:# 使用高斯混合模型 gmm = GaussianMixture( n_components=k, covariance_type='full', max_iter=1000, n_init=5, random_state=123 ) gmm.fit(X)# 获取预测结果 labels = gmm.predict(X) posterior_probs = gmm.predict_proba(X) log_likelihood = gmm.score(X) * len(X) # 转换为总对数似然# 计算模型参数数量# 对于多元高斯分布: 均值参数 + 协方差参数 + 混合权重 n_features = X.shape[1] n_means = k * n_features n_cov = k * n_features * (n_features + 1) / 2 n_weights = k - 1 n_params = n_means + n_cov + n_weights# 计算信息准则 aic = -2 * log_likelihood + 2 * n_params bic = -2 * log_likelihood + n_params * np.log(len(X)) model = gmm except Exception as e:print(f" 模型拟合错误: {e}") log_likelihood = np.nan aic = np.nan bic = np.nan n_params = np.nan model = None posterior_probs = None labels = None# 计算熵(衡量分类准确性)if k > 1 and posterior_probs is not None:# 避免log(0) epsilon = 1e-10 posterior_with_epsilon = np.clip(posterior_probs, epsilon, 1 - epsilon) entropy_term = -np.sum(posterior_with_epsilon * np.log(posterior_with_epsilon), axis=1) entropy = 1 - np.mean(entropy_term) / np.log(k)else: entropy = np.nan# 存储结果 fit_stats.append({'Classes': k,'AIC': aic,'BIC': bic,'LogLik': log_likelihood,'npar': n_params,'Entropy': entropy }) models.append(model)if not np.isnan(aic):print(f" 成功拟合! AIC: {aic:.2f}, BIC: {bic:.2f}, 对数似然: {log_likelihood:.2f}")# 转换为DataFrame fit_stats_df = pd.DataFrame(fit_stats)# 保存拟合指标 fit_stats_df.to_csv('LCA_Fit_Statistics.csv', index=False)print("\n拟合指标已保存: LCA_Fit_Statistics.csv")print("\n不同类别数模型的拟合指标:")print(fit_stats_df.to_string())return fit_stats_df, modelsdef visualize_fit_statistics(fit_stats_df: pd.DataFrame):"""可视化模型拟合指标"""print("\n生成模型拟合指标可视化...")# 过滤有效数据 valid_stats = fit_stats_df.dropna(subset=['AIC'])if len(valid_stats) > 1: # 需要至少2个点才能画线 fig, axes = plt.subplots(2, 2, figsize=(15, 12))# AIC和BIC图 ax1 = axes[0, 0] ax1.plot(valid_stats['Classes'], valid_stats['AIC'], marker='o', linewidth=2, markersize=8, label='AIC', color='steelblue') ax1.plot(valid_stats['Classes'], valid_stats['BIC'], marker='s', linewidth=2, markersize=8, label='BIC', color='coral') ax1.set_xlabel('潜类别数', fontsize=12) ax1.set_ylabel('指标值', fontsize=12) ax1.set_title('AIC和BIC随潜类别数的变化', fontsize=14) ax1.legend() ax1.grid(True, alpha=0.3)# 熵图 ax2 = axes[0, 1] entropy_data = valid_stats[valid_stats['Entropy'].notna()]if len(entropy_data) > 1: ax2.plot(entropy_data['Classes'], entropy_data['Entropy'], marker='o', linewidth=2, markersize=8, color='darkgreen') ax2.set_xlabel('潜类别数', fontsize=12) ax2.set_ylabel('熵', fontsize=12) ax2.set_title('熵随潜类别数的变化', fontsize=14) ax2.grid(True, alpha=0.3)else: ax2.text(0.5, 0.5, '无有效熵数据', ha='center', va='center', fontsize=12) ax2.set_title('熵随潜类别数的变化', fontsize=14)# 对数似然图 ax3 = axes[1, 0] ax3.plot(valid_stats['Classes'], valid_stats['LogLik'], marker='o', linewidth=2, markersize=8, color='purple') ax3.set_xlabel('潜类别数', fontsize=12) ax3.set_ylabel('对数似然', fontsize=12) ax3.set_title('对数似然随潜类别数的变化', fontsize=14) ax3.grid(True, alpha=0.3)# 模型比较条形图 ax4 = axes[1, 1]if len(valid_stats) > 0:# 标准化AIC和BIC用于比较 valid_stats['AIC_scaled'] = (valid_stats['AIC'] - valid_stats['AIC'].min()) / ( valid_stats['AIC'].max() - valid_stats['AIC'].min()) valid_stats['BIC_scaled'] = (valid_stats['BIC'] - valid_stats['BIC'].min()) / ( valid_stats['BIC'].max() - valid_stats['BIC'].min()) x = np.arange(len(valid_stats)) width = 0.35 ax4.bar(x - width / 2, valid_stats['AIC_scaled'], width, label='AIC (标准化)', alpha=0.7, color='steelblue') ax4.bar(x + width / 2, valid_stats['BIC_scaled'], width, label='BIC (标准化)', alpha=0.7, color='coral') ax4.set_xlabel('潜类别数', fontsize=12) ax4.set_ylabel('标准化指标值', fontsize=12) ax4.set_title('模型比较 (标准化AIC和BIC)', fontsize=14) ax4.set_xticks(x) ax4.set_xticklabels(valid_stats['Classes']) ax4.legend() ax4.grid(True, alpha=0.3, axis='y') plt.tight_layout() plt.savefig('LCA_Fit_Statistics_Plots.png', dpi=150, bbox_inches='tight')print("拟合指标图已保存: LCA_Fit_Statistics_Plots.png") plt.close()else:print("没有足够的有效数据生成可视化")def select_best_model(fit_stats_df: pd.DataFrame, models: list, lca_data: pd.DataFrame) -> Tuple[Any, int]:"""选择最佳LCA模型"""print("\n选择最佳潜类别模型...")# 找到有效的模型(排除None和单类别) valid_indices = []for i, model in enumerate(models):if model is not None or i == 0: # 包括单类别模型 valid_indices.append(i)if len(valid_indices) == 0:print("没有成功拟合的模型")return None, None# 基于BIC选择最佳模型(排除NaN值) valid_stats = fit_stats_df.loc[valid_indices].copy() valid_stats = valid_stats[valid_stats['BIC'].notna()]if len(valid_stats) == 0:print("没有有效的BIC值用于模型选择")return None, None# 找到BIC最小的模型 best_idx = valid_stats['BIC'].idxmin() best_class = int(valid_stats.loc[best_idx, 'Classes']) best_model = models[best_idx]print(f"基于BIC的最佳类别数: {best_class}")print(f"选择 {best_class} 个潜类别的模型作为最终模型")return best_model, best_classdef analyze_final_model(model, best_class: int, lca_data: pd.DataFrame, lcm_data: pd.DataFrame, le_dict: dict):"""分析最终模型结果"""print("\n分析最终模型结果...") X = lca_data.valuesif best_class == 1:# 单类别模型的特殊处理print("\n最终模型概要 (单类别):")print(" 所有样本属于同一类别")# 类别分配 lcm_data['Latent_Class_LCA'] = 1 lcm_data['Latent_Class_Prob'] = 1.0# 类别先验概率 class_probs = [1.0]# 条件概率(使用数据分布) cond_probs = {}for i, col in enumerate(lca_data.columns):# 计算每个水平在总体中的比例 value_counts = lcm_data[col].value_counts(normalize=True) cond_probs[col] = pd.DataFrame([value_counts.values], columns=value_counts.index)else:# 多类别模型print("\n最终模型概要:")# 获取预测结果 labels = model.predict(X) posterior_probs = model.predict_proba(X)# 类别分配 lcm_data['Latent_Class_LCA'] = labels + 1 # 从1开始编号 lcm_data['Latent_Class_Prob'] = np.max(posterior_probs, axis=1)# 类别先验概率 class_probs = model.weights_# 估计条件概率 cond_probs = estimate_conditional_probabilities(model, lca_data, labels, le_dict)# 打印类别先验概率print("\n潜类别先验概率:")for i, prob in enumerate(class_probs):print(f"类别 {i + 1}: {prob * 100:.2f}%")# 打印类别分配统计print("\n潜类别分配统计:") class_counts = lcm_data['Latent_Class_LCA'].value_counts().sort_index()for class_id, count in class_counts.items(): percentage = count / len(lcm_data) * 100print(f"类别 {class_id}: {count}人 ({percentage:.2f}%)")# 可视化结果 visualize_lca_results(lcm_data, cond_probs, best_class, le_dict)return lcm_data, cond_probs, class_probsdef estimate_conditional_probabilities(model, lca_data: pd.DataFrame, labels: np.ndarray, le_dict: dict) -> dict:"""估计条件概率矩阵""" cond_probs = {} X = lca_data.values n_classes = len(np.unique(labels))for i, col in enumerate(lca_data.columns):# 获取原始变量名 orig_col = col # 这里已经是编码后的列名# 获取每个类别的条件分布 class_probs = []for class_id in range(n_classes):# 获取属于该类别的样本 class_mask = labels == class_idif np.sum(class_mask) > 0:# 计算该类别的条件概率 values = X[class_mask, i] unique_values, counts = np.unique(values, return_counts=True) probs = counts / counts.sum()# 创建完整的概率分布(包括所有可能的值) all_values = np.arange(len(le_dict[col].classes_)) full_probs = np.zeros(len(all_values))for val, prob in zip(unique_values, probs): val_idx = np.where(all_values == val)[0]if len(val_idx) > 0: full_probs[val_idx[0]] = prob class_probs.append(full_probs)else:# 如果没有样本属于该类,使用均匀分布 class_probs.append(np.ones(len(le_dict[col].classes_)) / len(le_dict[col].classes_))# 转换为DataFrame prob_df = pd.DataFrame(class_probs, columns=le_dict[col].classes_) cond_probs[col] = prob_dfreturn cond_probsdef visualize_lca_results(lcm_data: pd.DataFrame, cond_probs: dict, best_class: int, le_dict: dict):"""可视化LCA结果"""print("\n生成条件概率条形图(概率剖面)...")# 1. 条件概率条形图 fig_height = 4 * best_class fig, axes = plt.subplots(best_class, len(cond_probs), figsize=(15, fig_height))if best_class == 1: axes = np.array([axes])for i, (var_name, prob_df) in enumerate(cond_probs.items()):for class_id in range(best_class): ax = axes[class_id, i] if best_class > 1 else axes[i]# 获取概率值if best_class == 1: probs = prob_df.iloc[0].valueselse: probs = prob_df.iloc[class_id].values# 获取水平标签 levels = prob_df.columns.tolist()# 绘制条形图 bars = ax.bar(range(len(levels)), probs, alpha=0.7, color=plt.cm.viridis(np.linspace(0, 1, len(levels)))) ax.set_ylim([0, 1]) ax.set_ylabel('概率', fontsize=10)# 设置标题if class_id == 0: ax.set_title(f'{var_name}', fontsize=12)if i == 0: ax.set_ylabel(f'类别 {class_id + 1}\n概率', fontsize=10)# 设置x轴标签 ax.set_xticks(range(len(levels))) ax.set_xticklabels(levels, rotation=45, ha='right', fontsize=9)# 在柱子上方添加数值for j, (bar, prob) in enumerate(zip(bars, probs)): height = bar.get_height() ax.text(bar.get_x() + bar.get_width() / 2., height, f'{prob:.2f}', ha='center', va='bottom', fontsize=8) plt.suptitle('潜类别条件概率剖面', fontsize=16, y=1.02) plt.tight_layout() plt.savefig('LCA_Conditional_Probabilities.png', dpi=150, bbox_inches='tight')print("条件概率条形图已保存: LCA_Conditional_Probabilities.png") plt.close()# 2. 条件概率热图print("\n生成条件概率热图...")# 准备热图数据 heatmap_data = []for var_name, prob_df in cond_probs.items():for class_id in range(best_class):if best_class == 1: probs = prob_df.iloc[0].valueselse: probs = prob_df.iloc[class_id].valuesfor level_idx, level in enumerate(prob_df.columns): heatmap_data.append({'Variable': var_name,'Class': f'Class {class_id + 1}','Level': level,'Probability': probs[level_idx] }) heatmap_df = pd.DataFrame(heatmap_data)# 创建透视表用于热图 pivot_table = heatmap_df.pivot_table( index=['Variable', 'Level'], columns='Class', values='Probability', aggfunc='mean' )# 绘制热图 plt.figure(figsize=(12, 8)) sns.heatmap(pivot_table, annot=True, fmt='.2f', cmap='viridis', cbar_kws={'label': '概率'}) plt.title('潜类别条件概率热图', fontsize=16) plt.xlabel('潜类别', fontsize=12) plt.ylabel('变量和水平', fontsize=12) plt.tight_layout() plt.savefig('LCA_Conditional_Probabilities_Heatmap.png', dpi=150, bbox_inches='tight')print("条件概率热图已保存: LCA_Conditional_Probabilities_Heatmap.png") plt.close()# 3. 潜类别分布图 plt.figure(figsize=(10, 6)) class_counts = lcm_data['Latent_Class_LCA'].value_counts().sort_index() colors = plt.cm.Set3(np.linspace(0, 1, len(class_counts))) bars = plt.bar(class_counts.index.astype(str), class_counts.values, color=colors, alpha=0.8) plt.title('潜类别分布', fontsize=16) plt.xlabel('潜类别', fontsize=12) plt.ylabel('人数', fontsize=12)# 添加数值标签for bar, count in zip(bars, class_counts.values): height = bar.get_height() plt.text(bar.get_x() + bar.get_width() / 2., height, f'{count}\n({height / len(lcm_data) * 100:.1f}%)', ha='center', va='bottom', fontsize=10) plt.grid(True, alpha=0.3, axis='y') plt.tight_layout() plt.savefig('LCA_Class_Distribution.png', dpi=150, bbox_inches='tight')print("潜类别分布图已保存: LCA_Class_Distribution.png") plt.close()def validate_lca_model(lcm_data: pd.DataFrame, best_class: int):"""验证LCA模型并进行外部效度分析"""print("\n进行潜类别验证和外部效度分析...")if'Latent_Class_LCA' not in lcm_data.columns:print("警告: 没有潜类别分配数据")return# 1. 潜类别与人口学变量的关系print("\n分析潜类别与人口学变量的关系...") demographic_vars = ['Sex', 'Age', 'Treatment', 'SES', 'Genetic_Marker'] demographic_vars = [col for col in demographic_vars if col in lcm_data.columns] demographic_results = []for var in demographic_vars:if pd.api.types.is_numeric_dtype(lcm_data[var]):# 连续变量:计算统计量 stats_by_class = lcm_data.groupby('Latent_Class_LCA')[var].agg(['count', 'mean', 'std', 'min', 'median', 'max' ]).reset_index() stats_by_class['Variable'] = var stats_by_class['Type'] = 'Continuous' demographic_results.append(stats_by_class)else:# 分类变量:创建交叉表 cross_tab = pd.crosstab(lcm_data[var], lcm_data['Latent_Class_LCA'], normalize='columns') * 100# 转换为长格式 cross_tab_long = cross_tab.reset_index().melt( id_vars=[var], var_name='Class', value_name='Percentage' ) cross_tab_long['Variable'] = var cross_tab_long['Type'] = 'Categorical'# 添加计数 count_tab = pd.crosstab(lcm_data[var], lcm_data['Latent_Class_LCA']) count_tab_long = count_tab.reset_index().melt( id_vars=[var], var_name='Class', value_name='Count' )# 合并百分比和计数 merged = pd.merge(cross_tab_long, count_tab_long, on=[var, 'Class']) demographic_results.append(merged)# 合并所有结果if demographic_results: demographic_df = pd.concat(demographic_results, ignore_index=True) demographic_df.to_csv('LCA_Demographic_Analysis.csv', index=False, encoding='utf-8-sig')print("人口学分析结果已保存: LCA_Demographic_Analysis.csv")else:print("没有可分析的人口学变量")# 2. 可视化潜类别与结局变量的关系print("\n生成潜类别与结局变量关系图...") outcome_vars = ['Outcome_Continuous', 'Outcome_Binary', 'Outcome_Count'] outcome_vars = [col for col in outcome_vars if col in lcm_data.columns]if outcome_vars: n_outcomes = len(outcome_vars) fig, axes = plt.subplots(1, n_outcomes, figsize=(5 * n_outcomes, 6))if n_outcomes == 1: axes = [axes]for i, var in enumerate(outcome_vars): ax = axes[i]if pd.api.types.is_numeric_dtype(lcm_data[var]) and lcm_data[var].nunique() > 10:# 连续变量:箱线图 box_data = [] class_labels = []for class_id in sorted(lcm_data['Latent_Class_LCA'].unique()): class_data = lcm_data[lcm_data['Latent_Class_LCA'] == class_id][var].dropna()if len(class_data) > 0: box_data.append(class_data) class_labels.append(f'类别 {class_id}')if box_data: bp = ax.boxplot(box_data, labels=class_labels, patch_artist=True)# 设置颜色 colors = plt.cm.Set3(np.linspace(0, 1, len(box_data)))for patch, color in zip(bp['boxes'], colors): patch.set_facecolor(color) patch.set_alpha(0.7) ax.set_title(f'{var} by 潜类别', fontsize=14) ax.set_xlabel('潜类别', fontsize=12) ax.set_ylabel(var, fontsize=12) ax.grid(True, alpha=0.3, axis='y')else:# 分类或计数变量:条形图if lcm_data[var].nunique() <= 5:# 分类变量:分组条形图 cross_tab = pd.crosstab(lcm_data['Latent_Class_LCA'], lcm_data[var], normalize='index') * 100 width = 0.8 / cross_tab.shape[1] x = np.arange(len(cross_tab.index))for j, level in enumerate(cross_tab.columns): ax.bar(x + j * width - width * (cross_tab.shape[1] - 1) / 2, cross_tab[level], width=width, label=level, alpha=0.7) ax.set_xlabel('潜类别', fontsize=12) ax.set_ylabel('百分比 (%)', fontsize=12) ax.set_xticks(x) ax.set_xticklabels([f'类别 {c}'for c in cross_tab.index]) ax.legend(title=var, fontsize=10)else:# 计数变量:分组箱线图 box_data = [] class_labels = []for class_id in sorted(lcm_data['Latent_Class_LCA'].unique()): class_data = lcm_data[lcm_data['Latent_Class_LCA'] == class_id][var].dropna()if len(class_data) > 0: box_data.append(class_data) class_labels.append(f'类别 {class_id}')if box_data: bp = ax.boxplot(box_data, labels=class_labels, patch_artist=True)# 设置颜色 colors = plt.cm.Set3(np.linspace(0, 1, len(box_data)))for patch, color in zip(bp['boxes'], colors): patch.set_facecolor(color) patch.set_alpha(0.7) ax.set_title(f'{var} by 潜类别', fontsize=14) ax.grid(True, alpha=0.3, axis='y') plt.suptitle('潜类别与结局变量关系', fontsize=16, y=1.02) plt.tight_layout() plt.savefig('LCA_Outcome_Relationships.png', dpi=150, bbox_inches='tight')print("潜类别与结局变量关系图已保存: LCA_Outcome_Relationships.png") plt.close()def create_word_report(lcm_data: pd.DataFrame, fit_stats_df: pd.DataFrame, best_class: Optional[int], class_probs: Optional[list], cond_probs: Optional[dict], desc_stats: pd.DataFrame):"""创建Word分析报告"""if not HAS_DOCX:print("警告: python-docx未安装,跳过Word报告生成")returnprint("\n正在生成Word分析报告...") try:# 创建文档 doc = Document()# 设置默认字体 style = doc.styles['Normal'] style.font.name = '宋体' style._element.rPr.rFonts.set(qn('w:eastAsia'), '宋体')# 添加标题 title = doc.add_heading('潜类别模型(LCM)分析报告', level=1) title.alignment = WD_ALIGN_PARAGRAPH.CENTER# 添加日期 date_para = doc.add_paragraph(f'生成日期:{datetime.now().strftime("%Y年%m月%d日")}') date_para.alignment = WD_ALIGN_PARAGRAPH.CENTER doc.add_paragraph()# 1. 研究概述 doc.add_heading('1. 研究概述', level=2) doc.add_paragraph( f'本研究基于 {len(lcm_data)} 名个体的基线数据,使用潜类别模型探索个体在多个外显指标上的响应模式,识别潜在的亚组。')# 获取外显指标 manifest_vars = [col for col in lcm_data.columns if'_cat'in col and 'Indicator'in col]if manifest_vars: doc.add_paragraph(f'外显指标:{", ".join(manifest_vars)}') doc.add_paragraph()# 2. 描述性统计 doc.add_heading('2. 描述性统计', level=2)# 添加简化的描述性统计表 table = doc.add_table(rows=min(10, len(desc_stats) + 1), cols=4) table.style = 'Light Grid Accent 1'# 设置表头 headers = ['变量', '类型', 'N/均值', '标准差/百分比']for i, header in enumerate(headers): table.cell(0, i).text = header table.cell(0, i).paragraphs[0].runs[0].font.bold = True# 添加数据for i, row in enumerate(desc_stats.head(9).itertuples(), 1): table.cell(i, 0).text = str(row.Variable) table.cell(i, 1).text = str(row.Type)if row.Type == 'Continuous': table.cell(i, 2).text = f'{row.Mean:.2f}' table.cell(i, 3).text = f'{row.SD:.2f}'else: table.cell(i, 2).text = str(row.N) table.cell(i, 3).text = str(getattr(row, 'Percentage', '')) doc.add_paragraph()# 3. 模型选择结果 doc.add_heading('3. 模型选择结果', level=2)if not fit_stats_df.empty:# 添加拟合指标表 table2 = doc.add_table(rows=len(fit_stats_df) + 1, cols=len(fit_stats_df.columns)) table2.style = 'Light Grid Accent 1'# 设置表头for i, col in enumerate(fit_stats_df.columns): table2.cell(0, i).text = col table2.cell(0, i).paragraphs[0].runs[0].font.bold = True# 添加数据for i, row in enumerate(fit_stats_df.itertuples(), 1):for j, col in enumerate(fit_stats_df.columns): value = getattr(row, col)if isinstance(value, float): table2.cell(i, j).text = f'{value:.2f}'else: table2.cell(i, j).text = str(value) doc.add_paragraph()# 添加模型选择解释if best_class is not None: doc.add_paragraph(f'基于BIC准则,选择 {best_class} 个潜类别的模型作为最终模型。')else: doc.add_paragraph('注:所有模型均未能成功拟合,可能是由于数据特征或模型设定问题。')# 4. 潜类别结果if best_class is not None and class_probs is not None: doc.add_heading('4. 潜类别结果', level=2)# 类别分布表 table3 = doc.add_table(rows=best_class + 1, cols=3) table3.style = 'Light Grid Accent 1'# 表头 headers3 = ['类别', '比例 (%)', '人数']for i, header in enumerate(headers3): table3.cell(0, i).text = header table3.cell(0, i).paragraphs[0].runs[0].font.bold = True# 获取类别人数if'Latent_Class_LCA'in lcm_data.columns: class_counts = lcm_data['Latent_Class_LCA'].value_counts().sort_index()# 添加数据for i in range(best_class): table3.cell(i + 1, 0).text = str(i + 1) table3.cell(i + 1, 1).text = f'{class_probs[i] * 100:.2f}'if'Latent_Class_LCA'in lcm_data.columns and (i + 1) in class_counts.index: table3.cell(i + 1, 2).text = str(class_counts[i + 1])else: table3.cell(i + 1, 2).text = 'N/A' doc.add_paragraph()# 5. 潜类别特征 doc.add_heading('5. 潜类别特征', level=2) doc.add_paragraph('每个潜类别的特征如下:')if cond_probs is not None:for i in range(best_class): doc.add_paragraph(f'类别 {i + 1}({class_probs[i] * 100:.2f}%):', style='List Bullet')# 提取该类别的特征 class_features = []for var_name, prob_df in cond_probs.items():if i < len(prob_df): probs = prob_df.iloc[i].values if best_class > 1 else prob_df.iloc[0].values levels = prob_df.columns.tolist()# 找到概率最高的水平 max_idx = np.argmax(probs) max_level = levels[max_idx] max_prob = probs[max_idx]if max_prob > 0.5: # 只显示概率大于50%的特征 class_features.append(f'{var_name}:{max_level}({max_prob * 100:.1f}%)')if class_features:for feature in class_features: doc.add_paragraph(f' • {feature}', style='Normal')else: doc.add_paragraph(' 无明显主导特征', style='Normal')# 6. 公共卫生意义 doc.add_heading('6. 公共卫生意义', level=2) doc.add_paragraph('• 潜类别分析识别了人群中基于多个健康指标的不同亚组', style='List Bullet') doc.add_paragraph('• 每个类别具有独特的健康特征模式,可用于个性化医疗和精准公共卫生干预', style='List Bullet') doc.add_paragraph('• 不同类别可能在疾病风险、治疗反应和预后方面存在差异', style='List Bullet') doc.add_paragraph('• 为整合照护提供分型依据,特别适用于慢性病管理和共病模式研究', style='List Bullet') doc.add_paragraph()# 7. 研究局限 doc.add_heading('7. 研究局限', level=2) doc.add_paragraph('• 样本量可能限制模型稳定性', style='List Bullet') doc.add_paragraph('• 外显指标的选择影响类别解释', style='List Bullet') doc.add_paragraph('• 需要进一步验证类别稳定性', style='List Bullet') doc.add_paragraph('• 纵向变化未考虑(仅基线数据)', style='List Bullet')# 保存文档 word_file = f'LCA_Analysis_Report_{datetime.now().strftime("%Y%m%d")}.docx' doc.save(word_file)print(f"Word分析报告已保存: {word_file}") except Exception as e:print(f"Word报告生成失败: {e}")# 创建简单报告 try: simple_doc = Document() simple_doc.add_heading('潜类别模型(LCM)分析报告', level=1) simple_doc.add_paragraph(f'生成日期:{datetime.now().strftime("%Y年%m月%d日")}') simple_doc.add_paragraph() simple_doc.add_paragraph('注:分析过程中遇到技术问题,详细结果请查看其他输出文件。') simple_word_file = f'LCA_Simple_Report_{datetime.now().strftime("%Y%m%d")}.docx' simple_doc.save(simple_word_file)print(f"简易Word报告已保存: {simple_word_file}") except Exception as e2:print(f"简易报告生成也失败: {e2}")def convert_to_serializable(obj):"""将对象转换为可JSON序列化的格式"""if isinstance(obj, np.ndarray):return obj.tolist()elif isinstance(obj, pd.DataFrame):return obj.to_dict('records')elif isinstance(obj, pd.Series):return obj.tolist()elif isinstance(obj, np.integer):return int(obj)elif isinstance(obj, np.floating):returnfloat(obj)elif isinstance(obj, dict):return {key: convert_to_serializable(value) for key, value in obj.items()}elif isinstance(obj, list):return [convert_to_serializable(item) for item in obj]else:return objdef save_workspace(lcm_data: pd.DataFrame, fit_stats_df: pd.DataFrame, best_class: Optional[int], class_probs: Optional[list], cond_probs: Optional[dict], desc_stats: pd.DataFrame):"""保存工作空间"""print("\n保存工作空间...")# 转换数据为可序列化格式 workspace_data = {'lcm_data': convert_to_serializable(lcm_data.to_dict('records')),'fit_stats_df': convert_to_serializable(fit_stats_df.to_dict('records')),'best_class': best_class,'class_probs': convert_to_serializable(class_probs) if class_probs is not None else None,'desc_stats': convert_to_serializable(desc_stats.to_dict('records')) }# 转换cond_probsif cond_probs is not None: serializable_cond_probs = {}for var_name, prob_df in cond_probs.items():# 将DataFrame转换为字典 serializable_cond_probs[var_name] = convert_to_serializable(prob_df) workspace_data['cond_probs'] = serializable_cond_probselse: workspace_data['cond_probs'] = None# 保存为JSON with open('LCA_Analysis_Workspace.json', 'w', encoding='utf-8') as f: json.dump(workspace_data, f, ensure_ascii=False, indent=2)print("工作空间已保存: LCA_Analysis_Workspace.json")# 同时保存为pickle格式(更完整) try: import pickle pickle_data = {'lcm_data': lcm_data,'fit_stats_df': fit_stats_df,'best_class': best_class,'class_probs': class_probs,'cond_probs': cond_probs,'desc_stats': desc_stats } with open('LCA_Analysis_Workspace.pkl', 'wb') as f: pickle.dump(pickle_data, f)print("工作空间已保存 (pickle格式): LCA_Analysis_Workspace.pkl") except Exception as e:print(f"保存pickle格式失败: {e}")def print_summary(final_model_exists: bool = False):"""打印分析总结"""print("\n" + "=" * 70)print("LCM分析完成!")print("=" * 70)print("\n结果文件汇总:")print("1. 描述性统计三线表: Descriptive_Statistics_Table.csv")print("2. 分类变量分布图: Categorical_Variable_Distributions.png")print("3. 交叉表: Cross_Tabulations.xlsx")print("4. 模型拟合指标: LCA_Fit_Statistics.csv")print("5. 拟合指标图: LCA_Fit_Statistics_Plots.png")if final_model_exists:print("6. 条件概率图: LCA_Conditional_Probabilities.png")print("7. 条件概率热图: LCA_Conditional_Probabilities_Heatmap.png")print("8. 潜类别分布图: LCA_Class_Distribution.png")print("9. 人口学分析: LCA_Demographic_Analysis.csv")print("10. 结局关系图: LCA_Outcome_Relationships.png")print("11. Word分析报告: LCA_Analysis_Report_*.docx")print("12. 工作空间 (JSON): LCA_Analysis_Workspace.json")print("13. 工作空间 (pickle): LCA_Analysis_Workspace.pkl")print("\n\n公共卫生应用建议:")print("1. 慢性病共病模式: 识别高血压、糖尿病、冠心病等疾病的共病类型")print("2. 健康行为分类: 基于吸烟、饮酒、饮食、运动等行为识别健康行为模式")print("3. 症状群分析: 识别疾病症状的不同组合模式")print("4. 治疗反应分型: 基于多个结局指标识别治疗反应类型")print("5. 风险分层: 基于多个风险因素识别高风险亚组")print(f"\n分析完成!所有结果已保存在 {os.getcwd()} 文件夹中。")def main():"""主函数"""# 1. 设置环境 results_dir = setup_environment()# 2. 读取数据 desktop_path = Path.home() / 'Desktop' data_full, data_simple = read_data(desktop_path)# 3. 数据预处理 lcm_data = preprocess_data(data_full)# 4. 探索性分析 cross_tabs, desc_stats = exploratory_analysis(lcm_data)# 5. 准备LCA数据 lca_data, le_dict = prepare_lca_data(lcm_data)# 6. 拟合LCA模型 fit_stats_df, models = fit_lca_models(lca_data)# 7. 选择最佳模型 best_model, best_class = select_best_model(fit_stats_df, models, lca_data)# 8. 分析最终模型if best_model is not None and best_class is not None: lcm_data, cond_probs, class_probs = analyze_final_model( best_model, best_class, lca_data, lcm_data, le_dict )# 9. 模型验证 validate_lca_model(lcm_data, best_class)# 10. 创建Word报告 create_word_report(lcm_data, fit_stats_df, best_class, class_probs, cond_probs, desc_stats)# 11. 保存工作空间 save_workspace(lcm_data, fit_stats_df, best_class, class_probs, cond_probs, desc_stats)# 12. 打印总结 print_summary(final_model_exists=True)else:# 创建简单的Word报告 create_word_report(lcm_data, fit_stats_df, None, None, None, desc_stats)# 保存工作空间 save_workspace(lcm_data, fit_stats_df, None, None, None, desc_stats)# 打印总结 print_summary(final_model_exists=False)if __name__ == "__main__": main()