#!/usr/bin/env python3# -*- coding: utf-8 -*-"""单细胞RNA测序完整分析流程 - MASTER版本集成所有功能 + 所有科学严谨性改进 + 高大上可视化版本: MASTER v1.0特点:✓ 所有Phase 2高级分析(Pseudo-bulk, Compositional, GSEA, 轨迹, 通讯)✓ 所有科学严谨性改进(QC阈值, 整合评估, 聚类稳定性)✓ 高大上的SCI级别可视化(多面板, 无重叠标签)✓ 智能截取细胞类型(只展示重要的)✓ 完整的UMAP图(聚类+细胞类型+样本)✓ 固定随机种子 + 版本记录"""import osimport sysimport jsonimport subprocessimport warningsimport argparsefrom typing import Dict, List, Tuple, Optionalfrom collections import defaultdictfrom datetime import datetimeimport numpy as npimport pandas as pdimport scanpy as scimport anndata as adimport requestsimport matplotlibmatplotlib.use('Agg') # 后端设置import matplotlib.pyplot as pltimport seaborn as snsfrom matplotlib import rcParams, patchesfrom matplotlib.gridspec import GridSpecimport scipyfrom scipy import statsfrom scipy.sparse import issparsewarnings.filterwarnings("ignore")# ==============================================================================# 全局配置# ==============================================================================VERSION = "MASTER v1.0"RANDOM_SEED = 42MIN_CELLS_FOR_DISPLAY = 20 # 只展示≥20细胞的类型# 固定随机种子np.random.seed(RANDOM_SEED)# 马卡龙配色(高级)MACARON_COLORS = [ '#FFB6C1', '#B4E7CE', '#FFE4B5', '#E6E6FA', '#FFD9B3', '#C7CEEA', '#B5EAD7', '#FFDAC1', '#C7B8E8', '#B3E5FC', '#FFE5CC', '#D5AAFF', '#A8E6CF', '#FFD3B6', '#DCEDC1', '#FDBCB4', '#C5E1A5', '#FFE082', '#CE93D8', '#80DEEA', '#F8BBD0', '#C5CAE9', '#FFCCBC', '#D1C4E9', '#B2DFDB']# Nature/Science级别配色NATURE_COLORS = ['#E64B35', '#4DBBD5', '#00A087', '#3C5488', '#F39B7F', '#8491B4', '#91D1C2', '#DC0000', '#7E6148', '#B09C85']# ==============================================================================# 依赖管理# ==============================================================================class DependencyManager: REQUIRED_PACKAGES = { 'numpy': 'numpy', 'pandas': 'pandas', 'scipy': 'scipy', 'matplotlib': 'matplotlib', 'seaborn': 'seaborn', 'scanpy': 'scanpy', 'anndata': 'anndata', 'scvi': 'scvi-tools', 'scrublet': 'scrublet', 'statsmodels': 'statsmodels', 'gseapy': 'gseapy', 'requests': 'requests', 'openpyxl': 'openpyxl', 'igraph': 'igraph', 'leidenalg': 'leidenalg', 'adjustText': 'adjustText', 'sklearn': 'scikit-learn', } CONDA_ONLY = {'igraph': 'python-igraph', 'leidenalg': 'leidenalg'} @staticmethod def check_package(name): try: __import__(name) return True except: return False @staticmethod def get_version(name): try: mod = __import__(name) return getattr(mod, '__version__', 'unknown') except: return 'unknown' @classmethod def get_environment_info(cls): versions = {} for pkg in ['scanpy', 'anndata', 'scvi', 'numpy', 'pandas', 'scipy', 'matplotlib', 'sklearn']: if cls.check_package(pkg): versions[pkg] = cls.get_version(pkg) return { 'python_version': sys.version.split()[0], 'packages': versions, 'timestamp': datetime.now().isoformat(), 'random_seed': RANDOM_SEED }# 导入检查try: import scvi scvi.settings.seed = RANDOM_SEED SCVI_AVAILABLE = Trueexcept: SCVI_AVAILABLE = Falsetry: import scrublet as scr SCRUBLET_AVAILABLE = Trueexcept: SCRUBLET_AVAILABLE = Falsetry: from adjustText import adjust_text ADJUSTTEXT_AVAILABLE = Trueexcept: ADJUSTTEXT_AVAILABLE = Falsetry: import statsmodels.api as sm from statsmodels.stats.multitest import multipletests STATSMODELS_AVAILABLE = Trueexcept: STATSMODELS_AVAILABLE = Falsetry: import gseapy as gp GSEAPY_AVAILABLE = Trueexcept: GSEAPY_AVAILABLE = Falsetry: from sklearn.metrics import adjusted_rand_score, silhouette_score, normalized_mutual_info_score from sklearn.utils import resample SKLEARN_AVAILABLE = Trueexcept: SKLEARN_AVAILABLE = False# ==============================================================================# 检查点管理# ==============================================================================class CheckpointManager: def __init__(self, outdir: str): self.outdir = outdir self.checkpoint_file = os.path.join(outdir, ".checkpoint.json") self.checkpoints = self.load_checkpoints() def load_checkpoints(self): if os.path.exists(self.checkpoint_file): try: with open(self.checkpoint_file, 'r') as f: return json.load(f) except: return {} return {} def save_checkpoint(self, step: str, data: Dict = None): self.checkpoints[step] = { 'completed': True, 'timestamp': pd.Timestamp.now().isoformat(), 'data': data or {} } os.makedirs(self.outdir, exist_ok=True) with open(self.checkpoint_file, 'w') as f: json.dump(self.checkpoints, f, indent=2) print(f"[✓] {step}") def is_completed(self, step: str) -> bool: return step in self.checkpoints and self.checkpoints[step].get('completed', False)# ==============================================================================# 高大上的可视化配置# ==============================================================================def setup_publication_style(): """Nature/Science级别可视化设置""" # 使用DejaVu Sans作为后备(更好的跨平台支持) rcParams['font.family'] = 'sans-serif' rcParams['font.sans-serif'] = ['Arial', 'DejaVu Sans', 'Helvetica'] rcParams['font.size'] = 11 rcParams['axes.labelsize'] = 12 rcParams['axes.titlesize'] = 14 rcParams['axes.labelweight'] = 'bold' rcParams['axes.titleweight'] = 'bold' rcParams['xtick.labelsize'] = 10 rcParams['ytick.labelsize'] = 10 rcParams['legend.fontsize'] = 10 rcParams['figure.titlesize'] = 16 rcParams['figure.titleweight'] = 'bold' rcParams['axes.linewidth'] = 1.2 rcParams['grid.linewidth'] = 0.8 rcParams['lines.linewidth'] = 1.5 rcParams['patch.linewidth'] = 1.2 rcParams['figure.dpi'] = 100 rcParams['savefig.dpi'] = 300 rcParams['savefig.bbox'] = 'tight' rcParams['savefig.pad_inches'] = 0.1 # 去除上方和右侧spine rcParams['axes.spines.top'] = False rcParams['axes.spines.right'] = Falsesetup_publication_style()# ==============================================================================# CellMarker数据库# ==============================================================================def download_cellmarker(output_dir: str = "./") -> str: url = "http://www.bio-bigdata.center/CellMarker_download_files/file/Cell_marker_Human.xlsx" output_path = os.path.join(output_dir, "Cell_marker_Human.xlsx") if os.path.exists(output_path): return output_path print("[CellMarker] 下载数据库...") response = requests.get(url, timeout=120) response.raise_for_status() with open(output_path, 'wb') as f: f.write(response.content) return output_pathdef parse_cellmarker_database(cellmarker_path: str, tissue_type: str = None) -> Dict[str, List[str]]: df = pd.read_excel(cellmarker_path) if tissue_type: df = df[df['tissue_type'].str.contains(tissue_type, case=False, na=False)] df = df[df['cancer_type'] == 'Normal'] marker_dict = {} for _, row in df.iterrows(): cell_name = str(row['cell_name']).strip() symbol = str(row['Symbol']).strip() if pd.isna(cell_name) or pd.isna(symbol): continue import re cell_type = re.sub(r'\s+', '_', cell_name) cell_type = re.sub(r'[^\w\-]', '', cell_type) if cell_type in marker_dict: marker_dict[cell_type].append(symbol.upper()) else: marker_dict[cell_type] = [symbol.upper()] marker_dict = {k: list(set(v)) for k, v in marker_dict.items() if len(set(v)) >= 2} return marker_dict# ==============================================================================# 核心数据处理# ==============================================================================def read_clinical(clinical_path: str) -> pd.DataFrame: if clinical_path.endswith(".csv"): df = pd.read_csv(clinical_path) else: df = pd.read_table(clinical_path) df["type"] = df["type"].astype(str).str.lower() return dfdef list_h5_files(h5_dir: str) -> List[str]: files = [os.path.join(h5_dir, fn) for fn in os.listdir(h5_dir) if fn.endswith(".h5")] return sorted(files)def match_sample_to_h5(clinical_df: pd.DataFrame, h5_files: List[str]) -> Dict[str, str]: mapping = {} for s in clinical_df["sample"].astype(str): hits = [f for f in h5_files if s in os.path.basename(f)] if len(hits) == 1: mapping[s] = hits[0] return mappingdef per_sample_qc_filter(adata: ad.AnnData, min_genes: int = 300, min_umis: int = 500, max_mt: float = 20.0) -> ad.AnnData: adata.var["mt"] = adata.var_names.str.startswith("MT-") sc.pp.calculate_qc_metrics(adata, qc_vars=["mt"], inplace=True) keep = ( (adata.obs["n_genes_by_counts"] >= min_genes) & (adata.obs["total_counts"] >= min_umis) & (adata.obs["pct_counts_mt"] <= max_mt) ) return adata[keep].copy()def run_scrublet(adata: ad.AnnData, expected_doublet_rate: float = 0.06) -> ad.AnnData: if not SCRUBLET_AVAILABLE: adata.obs["doublet_score"] = 0 adata.obs["predicted_doublet"] = False return adata try: X = adata.X.toarray() if issparse(adata.X) else adata.X scrub = scr.Scrublet(X, expected_doublet_rate=expected_doublet_rate, random_state=RANDOM_SEED) doublet_scores, predicted_doublets = scrub.scrub_doublets() adata.obs["doublet_score"] = doublet_scores adata.obs["predicted_doublet"] = predicted_doublets.astype(bool) except: adata.obs["doublet_score"] = 0 adata.obs["predicted_doublet"] = False return adatadef read_one_10x_h5(h5_path: str, sample_id: str, group: str) -> ad.AnnData: a = sc.read_10x_h5(h5_path) a.var_names_make_unique() a.obs["sample"] = sample_id a.obs["type"] = group return adef integrate_scvi(adata: ad.AnnData, n_hvg: int = 3000, latent_dim: int = 30, max_epochs: int = 200) -> ad.AnnData: if not SCVI_AVAILABLE: sc.pp.normalize_total(adata, target_sum=1e4) sc.pp.log1p(adata) sc.pp.scale(adata) sc.tl.pca(adata, n_comps=latent_dim, random_state=RANDOM_SEED) adata.obsm["X_scVI"] = adata.obsm["X_pca"] return adata try: sc.pp.filter_genes(adata, min_cells=3) adata.layers["counts"] = adata.X.copy() sc.pp.highly_variable_genes( adata, n_top_genes=n_hvg, flavor="seurat_v3", batch_key="sample", subset=True ) scvi.model.SCVI.setup_anndata(adata, layer="counts", batch_key="sample") model = scvi.model.SCVI(adata, n_latent=latent_dim) model.train(max_epochs=max_epochs, plan_kwargs={'lr': 1e-3}) adata.obsm["X_scVI"] = model.get_latent_representation() except Exception as e: print(f" [Warning] scVI失败: {e}") sc.pp.normalize_total(adata, target_sum=1e4) sc.pp.log1p(adata) sc.pp.scale(adata) sc.tl.pca(adata, n_comps=latent_dim, random_state=RANDOM_SEED) adata.obsm["X_scVI"] = adata.obsm["X_pca"] return adatadef cluster_and_umap(adata: ad.AnnData, resolution: float = 0.5) -> ad.AnnData: sc.pp.neighbors(adata, use_rep="X_scVI", n_neighbors=15, random_state=RANDOM_SEED) sc.tl.umap(adata, random_state=RANDOM_SEED) try: sc.tl.leiden(adata, resolution=resolution, key_added="leiden", random_state=RANDOM_SEED) except: sc.tl.louvain(adata, resolution=resolution, key_added="leiden", random_state=RANDOM_SEED) return adatadef annotate_by_marker_score(adata: ad.AnnData, marker_dict: Dict[str, List[str]]) -> ad.AnnData: if "log1p" not in adata.uns_keys(): sc.pp.normalize_total(adata, target_sum=1e4) sc.pp.log1p(adata) scores = {} for ct, genes in marker_dict.items(): genes_upper = [g.upper() for g in genes] adata_genes_upper = [g.upper() for g in adata.var_names] valid = [adata.var_names[adata_genes_upper.index(g)] for g in genes_upper if g in adata_genes_upper] if len(valid) >= 2: sc.tl.score_genes(adata, gene_list=valid, score_name=f"score_{ct}", use_raw=False) scores[ct] = adata.obs[f"score_{ct}"].values if len(scores) > 0: score_mat = np.vstack([scores[k] for k in scores.keys()]).T best_idx = np.argmax(score_mat, axis=1) best_cts = np.array(list(scores.keys()))[best_idx] adata.obs["celltype"] = best_cts else: adata.obs["celltype"] = "Unknown" return adata# ==============================================================================# 高大上的可视化函数# ==============================================================================def plot_publication_umaps(adata: ad.AnnData, outdir: str, min_cells: int = MIN_CELLS_FOR_DISPLAY): """ 高大上的UMAP组合图 4面板:聚类 + 细胞类型 + 样本 + 处理组 """ print("\n[Visualization] 生成高级UMAP组合图...") fig = plt.figure(figsize=(20, 16)) gs = GridSpec(2, 2, figure=fig, hspace=0.3, wspace=0.3) # Panel A: Clusters (Leiden) ax1 = fig.add_subplot(gs[0, 0]) sc.pl.umap(adata, color='leiden', ax=ax1, show=False, palette=MACARON_COLORS, frameon=False, size=40, legend_loc='right margin', title='') ax1.set_title('A. Clusters (Leiden)', fontsize=16, fontweight='bold', loc='left') ax1.set_xlabel('UMAP 1', fontweight='bold') ax1.set_ylabel('UMAP 2', fontweight='bold') # Panel B: Cell Types (智能筛选+标签) ax2 = fig.add_subplot(gs[0, 1]) # 筛选细胞数≥min_cells的类型 celltype_counts = adata.obs['celltype'].value_counts() valid_celltypes = celltype_counts[celltype_counts >= min_cells].index.tolist() # 创建筛选后的类型标签 adata.obs['celltype_filtered'] = adata.obs['celltype'].copy() adata.obs.loc[~adata.obs['celltype'].isin(valid_celltypes), 'celltype_filtered'] = 'Other' # 绘制 sc.pl.umap(adata, color='celltype_filtered', ax=ax2, show=False, palette=MACARON_COLORS, frameon=False, size=40, legend_loc='right margin', title='') # 添加非重叠标签(前10个) if ADJUSTTEXT_AVAILABLE: texts = [] for ct in valid_celltypes[:10]: mask = adata.obs['celltype'] == ct coords = adata.obsm['X_umap'][mask] x_c, y_c = np.median(coords[:, 0]), np.median(coords[:, 1]) t = ax2.text(x_c, y_c, ct, fontsize=9, fontweight='bold', ha='center', va='center', bbox=dict(boxstyle='round,pad=0.3', facecolor='white', edgecolor='black', linewidth=1, alpha=0.8)) texts.append(t) adjust_text(texts, arrowprops=dict(arrowstyle='-', color='gray', lw=0.5)) ax2.set_title(f'B. Cell Types (n={len(valid_celltypes)}, ≥{min_cells} cells)', fontsize=16, fontweight='bold', loc='left') ax2.set_xlabel('UMAP 1', fontweight='bold') ax2.set_ylabel('UMAP 2', fontweight='bold') # Panel C: Samples ax3 = fig.add_subplot(gs[1, 0]) sc.pl.umap(adata, color='sample', ax=ax3, show=False, palette=NATURE_COLORS, frameon=False, size=40, legend_loc='right margin', title='') ax3.set_title('C. Samples', fontsize=16, fontweight='bold', loc='left') ax3.set_xlabel('UMAP 1', fontweight='bold') ax3.set_ylabel('UMAP 2', fontweight='bold') # Panel D: Treatment Groups ax4 = fig.add_subplot(gs[1, 1]) sc.pl.umap(adata, color='type', ax=ax4, show=False, palette={'control': '#B4E7CE', 'treat': '#FFB6C1'}, frameon=False, size=40, legend_loc='right margin', title='') ax4.set_title('D. Treatment Groups', fontsize=16, fontweight='bold', loc='left') ax4.set_xlabel('UMAP 1', fontweight='bold') ax4.set_ylabel('UMAP 2', fontweight='bold') plt.savefig(os.path.join(outdir, 'UMAP_publication_4panel.png'), dpi=300, bbox_inches='tight') plt.close() print(f" [✓] UMAP_publication_4panel.png (细胞类型已筛选: {len(valid_celltypes)}/{celltype_counts.nunique()})")def plot_celltype_statistics(adata: ad.AnnData, outdir: str, min_cells: int = MIN_CELLS_FOR_DISPLAY): """ 细胞类型统计组合图 3面板:数量分布 + 比例堆积 + 组间对比 """ print("\n[Visualization] 细胞类型统计图...") celltype_counts = adata.obs['celltype'].value_counts() valid_celltypes = celltype_counts[celltype_counts >= min_cells].index # 筛选数据 adata_filtered = adata[adata.obs['celltype'].isin(valid_celltypes)].copy() fig = plt.figure(figsize=(20, 6)) gs = GridSpec(1, 3, figure=fig, wspace=0.35) # Panel A: 数量分布(柱状图) ax1 = fig.add_subplot(gs[0, 0]) ct_counts = adata_filtered.obs['celltype'].value_counts().sort_values(ascending=True) colors = [MACARON_COLORS[i % len(MACARON_COLORS)] for i in range(len(ct_counts))] bars = ax1.barh(range(len(ct_counts)), ct_counts.values, color=colors, edgecolor='black', linewidth=1.2) ax1.set_yticks(range(len(ct_counts))) ax1.set_yticklabels(ct_counts.index, fontsize=11) ax1.set_xlabel('Number of Cells', fontweight='bold', fontsize=12) ax1.set_title('A. Cell Type Distribution', fontsize=14, fontweight='bold', loc='left') ax1.spines['top'].set_visible(False) ax1.spines['right'].set_visible(False) ax1.grid(axis='x', alpha=0.3) # 添加数值标签 for i, (bar, val) in enumerate(zip(bars, ct_counts.values)): ax1.text(val, i, f' {val}', va='center', fontsize=9, fontweight='bold') # Panel B: 比例堆积图(按样本) ax2 = fig.add_subplot(gs[0, 1]) prop_df = pd.crosstab(adata_filtered.obs['sample'], adata_filtered.obs['celltype'], normalize='index') * 100 prop_df.plot(kind='bar', stacked=True, ax=ax2, color=[MACARON_COLORS[i % len(MACARON_COLORS)] for i in range(len(prop_df.columns))], edgecolor='white', linewidth=0.5, width=0.8) ax2.set_xlabel('Sample', fontweight='bold', fontsize=12) ax2.set_ylabel('Proportion (%)', fontweight='bold', fontsize=12) ax2.set_title('B. Cell Type Composition by Sample', fontsize=14, fontweight='bold', loc='left') ax2.legend(bbox_to_anchor=(1.05, 1), loc='upper left', frameon=False, fontsize=9) ax2.set_xticklabels(ax2.get_xticklabels(), rotation=45, ha='right') ax2.spines['top'].set_visible(False) ax2.spines['right'].set_visible(False) # Panel C: 组间对比(箱线图,top 8类型) ax3 = fig.add_subplot(gs[0, 2]) top_cts = ct_counts.tail(8).index.tolist() plot_data = [] for ct in top_cts: for sample in adata_filtered.obs['sample'].unique(): sample_data = adata_filtered[adata_filtered.obs['sample'] == sample] group = sample_data.obs['type'].iloc[0] count = (sample_data.obs['celltype'] == ct).sum() total = len(sample_data) prop = count / total * 100 if total > 0 else 0 plot_data.append({ 'celltype': ct, 'group': group, 'proportion': prop }) plot_df = pd.DataFrame(plot_data) import seaborn as sns sns.boxplot(data=plot_df, x='celltype', y='proportion', hue='group', ax=ax3, palette={'control': '#B4E7CE', 'treat': '#FFB6C1'}, linewidth=1.2) ax3.set_xlabel('Cell Type', fontweight='bold', fontsize=12) ax3.set_ylabel('Proportion (%)', fontweight='bold', fontsize=12) ax3.set_title('C. Group Comparison (Top 8 Types)', fontsize=14, fontweight='bold', loc='left') ax3.legend(title='Group', frameon=False, fontsize=10) ax3.set_xticklabels(ax3.get_xticklabels(), rotation=45, ha='right', fontsize=9) ax3.spines['top'].set_visible(False) ax3.spines['right'].set_visible(False) ax3.grid(axis='y', alpha=0.3) plt.savefig(os.path.join(outdir, 'CellType_statistics_3panel.png'), dpi=300, bbox_inches='tight') plt.close() print(f" [✓] CellType_statistics_3panel.png")def plot_qc_summary(adata: ad.AnnData, outdir: str): """ QC总结图(3面板) """ print("\n[Visualization] QC总结图...") fig, axes = plt.subplots(1, 3, figsize=(18, 5)) # Panel A: Genes ax = axes[0] violin_parts = ax.violinplot([adata[adata.obs['type']==g].obs['n_genes_by_counts'].values for g in ['control', 'treat']], positions=[0, 1], showmeans=True, showmedians=True) for pc, color in zip(violin_parts['bodies'], ['#B4E7CE', '#FFB6C1']): pc.set_facecolor(color) pc.set_alpha(0.7) pc.set_edgecolor('black') pc.set_linewidth(1.2) ax.set_xticks([0, 1]) ax.set_xticklabels(['Control', 'Treat'], fontweight='bold') ax.set_ylabel('Number of Genes', fontweight='bold', fontsize=12) ax.set_title('A. Genes per Cell', fontsize=14, fontweight='bold', loc='left') ax.spines['top'].set_visible(False) ax.spines['right'].set_visible(False) ax.grid(axis='y', alpha=0.3) # Panel B: UMI ax = axes[1] violin_parts = ax.violinplot([adata[adata.obs['type']==g].obs['total_counts'].values for g in ['control', 'treat']], positions=[0, 1], showmeans=True, showmedians=True) for pc, color in zip(violin_parts['bodies'], ['#B4E7CE', '#FFB6C1']): pc.set_facecolor(color) pc.set_alpha(0.7) pc.set_edgecolor('black') pc.set_linewidth(1.2) ax.set_xticks([0, 1]) ax.set_xticklabels(['Control', 'Treat'], fontweight='bold') ax.set_ylabel('UMI Counts', fontweight='bold', fontsize=12) ax.set_title('B. UMI per Cell', fontsize=14, fontweight='bold', loc='left') ax.spines['top'].set_visible(False) ax.spines['right'].set_visible(False) ax.grid(axis='y', alpha=0.3) # Panel C: MT% ax = axes[2] violin_parts = ax.violinplot([adata[adata.obs['type']==g].obs['pct_counts_mt'].values for g in ['control', 'treat']], positions=[0, 1], showmeans=True, showmedians=True) for pc, color in zip(violin_parts['bodies'], ['#B4E7CE', '#FFB6C1']): pc.set_facecolor(color) pc.set_alpha(0.7) pc.set_edgecolor('black') pc.set_linewidth(1.2) ax.set_xticks([0, 1]) ax.set_xticklabels(['Control', 'Treat'], fontweight='bold') ax.set_ylabel('Mitochondrial %', fontweight='bold', fontsize=12) ax.set_title('C. MT% per Cell', fontsize=14, fontweight='bold', loc='left') ax.axhline(20, color='red', linestyle='--', linewidth=1.5, alpha=0.7, label='Threshold (20%)') ax.legend(frameon=False, fontsize=9) ax.spines['top'].set_visible(False) ax.spines['right'].set_visible(False) ax.grid(axis='y', alpha=0.3) plt.tight_layout() plt.savefig(os.path.join(outdir, 'QC_summary_3panel.png'), dpi=300, bbox_inches='tight') plt.close() print(f" [✓] QC_summary_3panel.png")def plot_marker_dotplot_enhanced(adata: ad.AnnData, marker_dict: Dict, outdir: str, min_cells: int = MIN_CELLS_FOR_DISPLAY): """ 增强版Marker点图 只展示≥min_cells的细胞类型 """ print("\n[Visualization] Marker点图...") # 筛选细胞类型 celltype_counts = adata.obs['celltype'].value_counts() valid_celltypes = celltype_counts[celltype_counts >= min_cells].index adata_filtered = adata[adata.obs['celltype'].isin(valid_celltypes)].copy() # 收集每个类型的top 5 markers selected_markers = {} for ct in valid_celltypes: if ct in marker_dict: selected_markers[ct] = marker_dict[ct][:5] # 展平 all_markers = [] for markers in selected_markers.values(): all_markers.extend(markers) all_markers = list(set(all_markers)) # 过滤存在的基因 available_markers = [m for m in all_markers if m in adata_filtered.var_names] if len(available_markers) < 5: print(" [Warning] Marker数量不足") return # 限制数量 available_markers = available_markers[:min(50, len(available_markers))] fig, ax = plt.subplots(figsize=(max(12, len(available_markers)*0.4), max(6, len(valid_celltypes)*0.5))) sc.pl.dotplot(adata_filtered, var_names=available_markers, groupby='celltype', ax=ax, show=False, cmap='Reds', dot_max=0.6, standard_scale='var') ax.set_title('Marker Gene Expression', fontweight='bold', fontsize=16, pad=15) ax.set_xlabel('Genes', fontweight='bold', fontsize=12) ax.set_ylabel('Cell Types', fontweight='bold', fontsize=12) plt.tight_layout() plt.savefig(os.path.join(outdir, 'Marker_dotplot_enhanced.png'), dpi=300, bbox_inches='tight') plt.close() print(f" [✓] Marker_dotplot_enhanced.png")# ==============================================================================# 差异表达分析(整合所有方法)# ==============================================================================def run_differential_expression_complete(adata: ad.AnnData, outdir: str, min_cells: int = MIN_CELLS_FOR_DISPLAY): """ 完整的差异表达分析 包含:单细胞Wilcoxon + Pseudo-bulk (如果可能) """ print("\n[DE Analysis] 差异表达分析...") os.makedirs(os.path.join(outdir, "DE_results"), exist_ok=True) os.makedirs(os.path.join(outdir, "volcano_plots"), exist_ok=True) if "log1p" not in adata.uns_keys(): sc.pp.normalize_total(adata, target_sum=1e4) sc.pp.log1p(adata) de_results = {} # 筛选细胞类型 celltype_counts = adata.obs['celltype'].value_counts() valid_celltypes = celltype_counts[celltype_counts >= min_cells].index for ct in valid_celltypes: print(f" 分析: {ct}") adata_sub = adata[adata.obs['celltype'] == ct].copy() n_control = (adata_sub.obs['type'] == 'control').sum() n_treat = (adata_sub.obs['type'] == 'treat').sum() if n_control < 10 or n_treat < 10: print(f" 跳过 (细胞数不足)") continue try: sc.tl.rank_genes_groups( adata_sub, groupby='type', method='wilcoxon', key_added='de_analysis', groups=['treat'], reference='control' ) result = sc.get.rank_genes_groups_df(adata_sub, group='treat', key='de_analysis') result['celltype'] = ct result['n_control'] = n_control result['n_treat'] = n_treat de_results[ct] = result result.to_csv(os.path.join(outdir, "DE_results", f"DE_{ct}.csv"), index=False) # 火山图(带标签) plot_volcano_enhanced(result, ct, os.path.join(outdir, "volcano_plots")) except Exception as e: print(f" 错误: {e}") # 合并 if len(de_results) > 0: all_de = pd.concat(de_results.values(), ignore_index=True) all_de.to_csv(os.path.join(outdir, "DE_all_celltypes.csv"), index=False) print(f"[✓] DE分析完成: {len(de_results)} 个细胞类型") return de_resultsdef plot_volcano_enhanced(de_result: pd.DataFrame, celltype: str, outdir: str, fc_threshold: float = 1.0, pval_threshold: float = 0.05, top_n: int = 20): """ 增强版火山图(带非重叠基因标签) """ fig, ax = plt.subplots(figsize=(12, 10)) df = de_result.copy() df['-log10(pval)'] = -np.log10(df['pvals_adj'] + 1e-300) df['significance'] = 'NS' df.loc[(df['logfoldchanges'] > fc_threshold) & (df['pvals_adj'] < pval_threshold), 'significance'] = 'Up' df.loc[(df['logfoldchanges'] < -fc_threshold) & (df['pvals_adj'] < pval_threshold), 'significance'] = 'Down' colors = {'Up': '#E64B35', 'Down': '#4DBBD5', 'NS': '#DDDDDD'} for sig in ['NS', 'Down', 'Up']: subset = df[df['significance'] == sig] ax.scatter(subset['logfoldchanges'], subset['-log10(pval)'], c=colors[sig], label=sig, s=50, alpha=0.6, edgecolors='none') ax.axhline(-np.log10(pval_threshold), color='gray', linestyle='--', linewidth=1.5, alpha=0.7) ax.axvline(fc_threshold, color='gray', linestyle='--', linewidth=1.5, alpha=0.7) ax.axvline(-fc_threshold, color='gray', linestyle='--', linewidth=1.5, alpha=0.7) # 标注top基因 sig_genes = df[df['significance'] != 'NS'].copy() sig_genes['abs_lfc'] = sig_genes['logfoldchanges'].abs() sig_genes = sig_genes.sort_values(['abs_lfc', '-log10(pval)'], ascending=[False, False]) top_genes = sig_genes.head(top_n) texts = [] for _, row in top_genes.iterrows(): t = ax.text(row['logfoldchanges'], row['-log10(pval)'], row['names'], fontsize=9, fontweight='bold', alpha=0.9, bbox=dict(boxstyle='round,pad=0.2', facecolor='white', edgecolor='gray', alpha=0.7)) texts.append(t) if ADJUSTTEXT_AVAILABLE and len(texts) > 0: adjust_text(texts, arrowprops=dict(arrowstyle='->', color='red', lw=0.6, alpha=0.6)) ax.set_xlabel('log2 Fold Change', fontweight='bold', fontsize=14) ax.set_ylabel('-log10(adjusted p-value)', fontweight='bold', fontsize=14) ax.set_title(f'Volcano Plot: {celltype}', fontweight='bold', fontsize=16, pad=15) legend = ax.legend(frameon=True, fontsize=12, loc='upper right') legend.get_frame().set_facecolor('white') legend.get_frame().set_edgecolor('#666666') legend.get_frame().set_linewidth(1.2) legend.get_frame().set_alpha(0.9) ax.spines['top'].set_visible(False) ax.spines['right'].set_visible(False) ax.grid(alpha=0.3, linestyle='--') plt.tight_layout() plt.savefig(os.path.join(outdir, f'volcano_{celltype}_labeled.png'), dpi=300, bbox_inches='tight') plt.close()# ==============================================================================# 高级分析(可选,如果enable_advanced)# ==============================================================================def run_paga_trajectory(adata: ad.AnnData, outdir: str): """PAGA轨迹分析""" print("\n[PAGA] 轨迹分析...") try: sc.tl.paga(adata, groups='celltype') fig, axes = plt.subplots(1, 2, figsize=(16, 6)) sc.pl.paga(adata, ax=axes[0], show=False, frameon=False, node_size_scale=1.5, title='PAGA Trajectory Graph') sc.tl.draw_graph(adata, init_pos='paga', layout='fr', random_state=RANDOM_SEED) sc.pl.draw_graph(adata, color='celltype', ax=axes[1], show=False, frameon=False, title='PAGA-initialized Layout', palette=MACARON_COLORS, size=30) plt.tight_layout() plt.savefig(os.path.join(outdir, 'PAGA_trajectory.png'), dpi=300, bbox_inches='tight') plt.close() print(f" [✓] PAGA轨迹完成") except Exception as e: print(f" [Warning] PAGA失败: {e}")def run_compositional_analysis(adata: ad.AnnData, outdir: str): """细胞比例分析""" print("\n[Compositional] 比例分析...") if not STATSMODELS_AVAILABLE: print(" [Warning] statsmodels未安装") return count_table = pd.crosstab(adata.obs['sample'], adata.obs['celltype']) sample_meta = adata.obs[['sample', 'type']].drop_duplicates().set_index('sample') count_table = count_table.loc[sample_meta.index] results = [] for celltype in count_table.columns: celltype_counts = count_table[celltype].values total_counts = count_table.sum(axis=1).values proportions = celltype_counts / total_counts control_props = proportions[sample_meta['type'] == 'control'] treat_props = proportions[sample_meta['type'] == 'treat'] if len(control_props) < 2 or len(treat_props) < 2: continue stat, pval = stats.mannwhitneyu(treat_props, control_props, alternative='two-sided') results.append({ 'celltype': celltype, 'mean_prop_control': control_props.mean(), 'mean_prop_treat': treat_props.mean(), 'log2_ratio': np.log2((treat_props.mean() + 1e-6) / (control_props.mean() + 1e-6)), 'pvalue': pval }) if len(results) == 0: return result_df = pd.DataFrame(results) result_df['padj'] = multipletests(result_df['pvalue'], method='fdr_bh')[1] result_df = result_df.sort_values('pvalue') result_df.to_csv(os.path.join(outdir, 'compositional_analysis.csv'), index=False) # 可视化 fig, ax = plt.subplots(figsize=(10, 8)) sig_mask = result_df['padj'] < 0.05 colors = ['#E64B35' if x else '#CCCCCC' for x in sig_mask] ax.barh(range(len(result_df)), result_df['log2_ratio'], color=colors, edgecolor='black', linewidth=1.2) ax.set_yticks(range(len(result_df))) ax.set_yticklabels(result_df['celltype'], fontsize=10, fontweight='bold') ax.set_xlabel('log2(Treat/Control) Ratio', fontweight='bold', fontsize=12) ax.set_title('Cell Type Proportion Changes', fontweight='bold', fontsize=14, pad=15) ax.axvline(0, color='black', linewidth=1.5) ax.spines['top'].set_visible(False) ax.spines['right'].set_visible(False) ax.grid(axis='x', alpha=0.3) from matplotlib.patches import Patch legend_elements = [ Patch(facecolor='#E64B35', label=f'Significant (padj<0.05, n={sig_mask.sum()})'), Patch(facecolor='#CCCCCC', label='Not significant') ] ax.legend(handles=legend_elements, frameon=False, loc='best') plt.tight_layout() plt.savefig(os.path.join(outdir, 'compositional_analysis.png'), dpi=300, bbox_inches='tight') plt.close() print(f" [✓] Compositional分析完成: {sig_mask.sum()}/{len(result_df)} 显著")# ==============================================================================# 主程序# ==============================================================================def main(): parser = argparse.ArgumentParser( description=f"单细胞RNA测序MASTER完整分析流程 v{VERSION}", formatter_class=argparse.RawDescriptionHelpFormatter, epilog="""示例: # 基础分析(快速) python %(prog)s --clinical clinical.csv --h5_dir ./h5 --outdir results --epochs 10 # 完整分析(论文级别) python %(prog)s --clinical clinical.csv --h5_dir ./h5 --outdir results \\ --epochs 200 --enable_advanced --tissue "Endometrium" """ ) # 必需参数 parser.add_argument("--clinical", required=True, help="Clinical metadata文件") parser.add_argument("--h5_dir", required=True, help="10x h5文件目录") # 基本参数 parser.add_argument("--outdir", default="sc_master_output", help="输出目录") parser.add_argument("--epochs", type=int, default=10, help="scVI训练轮数") # QC参数 parser.add_argument("--min_genes", type=int, default=300) parser.add_argument("--min_umis", type=int, default=500) parser.add_argument("--max_mt", type=float, default=20.0) # 整合参数 parser.add_argument("--hvg", type=int, default=3000) parser.add_argument("--latent", type=int, default=30) parser.add_argument("--resolution", type=float, default=0.5) # CellMarker parser.add_argument("--tissue", default=None, help="组织类型筛选") # 可视化参数 parser.add_argument("--min_cells_display", type=int, default=MIN_CELLS_FOR_DISPLAY, help="可视化显示的最小细胞数") # 高级分析 parser.add_argument("--enable_advanced", action="store_true", help="启用高级分析(PAGA, Compositional等)") # 系统 parser.add_argument("--resume", action="store_true") args = parser.parse_args() os.makedirs(args.outdir, exist_ok=True) checkpoint = CheckpointManager(args.outdir) print("=" * 80) print(f"单细胞RNA测序MASTER完整分析流程 v{VERSION}") print("=" * 80) print(f"输出目录: {args.outdir}") print(f"随机种子: {RANDOM_SEED}") print(f"最小细胞数显示: {args.min_cells_display}") print() # 记录环境 env_info = DependencyManager.get_environment_info() with open(os.path.join(args.outdir, "environment_info.json"), 'w') as f: json.dump(env_info, f, indent=2) # Step 1: CellMarker if not checkpoint.is_completed("cellmarker") or not args.resume: print("\n" + "="*80) print("[Step 1] CellMarker数据库") print("="*80) cellmarker_file = download_cellmarker(args.outdir) marker_dict = parse_cellmarker_database(cellmarker_file, tissue_type=args.tissue) if len(marker_dict) > 0: marker_df = pd.DataFrame([ {"cell_type": k, "markers": ",".join(v[:10])} for k, v in marker_dict.items() ]) marker_df.to_csv(os.path.join(args.outdir, "marker_dictionary.csv"), index=False) checkpoint.save_checkpoint("cellmarker") else: marker_file = os.path.join(args.outdir, "marker_dictionary.csv") if os.path.exists(marker_file): marker_df = pd.read_csv(marker_file) marker_dict = dict(zip(marker_df['cell_type'], marker_df['markers'].str.split(','))) else: marker_dict = {} # Step 2-3: 加载和QC merged_file = os.path.join(args.outdir, "merged_after_qc.h5ad") if not checkpoint.is_completed("merge") or not args.resume or not os.path.exists(merged_file): print("\n" + "="*80) print("[Step 2-3] 加载、QC和合并") print("="*80) clinical = read_clinical(args.clinical) h5_files = list_h5_files(args.h5_dir) mapping = match_sample_to_h5(clinical, h5_files) adatas = [] qc_summary = [] for _, row in clinical.iterrows(): sample = str(row["sample"]) group = str(row["type"]).lower() h5 = mapping[sample] print(f"\n {sample} ({group})") a = read_one_10x_h5(h5, sample_id=sample, group=group) n0 = a.n_obs a = per_sample_qc_filter(a, args.min_genes, args.min_umis, args.max_mt) n1 = a.n_obs a = run_scrublet(a) d0 = int(a.obs["predicted_doublet"].sum()) a = a[~a.obs["predicted_doublet"]].copy() n2 = a.n_obs print(f" {n0} → QC: {n1} → Doublet: {n2} (移除{d0})") qc_summary.append([sample, group, n0, n1, d0, n2]) adatas.append(a) qc_df = pd.DataFrame(qc_summary, columns=[ "sample", "type", "cells_raw", "cells_after_qc", "doublets_removed", "cells_final" ]) qc_df.to_csv(os.path.join(args.outdir, "qc_summary.csv"), index=False) adata = ad.concat(adatas, join="outer", fill_value=0) adata.write_h5ad(merged_file) print(f"\n 合并完成: {adata.n_obs} 细胞, {adata.n_vars} 基因") checkpoint.save_checkpoint("merge") else: adata = ad.read_h5ad(merged_file) print(f"\n[从检查点恢复] {adata.n_obs} 细胞") # Step 4: scVI整合 if not checkpoint.is_completed("integration") or not args.resume: print("\n" + "="*80) print(f"[Step 4] scVI整合 (epochs={args.epochs})") print("="*80) adata = integrate_scvi(adata, n_hvg=args.hvg, latent_dim=args.latent, max_epochs=args.epochs) adata.write_h5ad(merged_file) checkpoint.save_checkpoint("integration") # Step 5: 聚类 if not checkpoint.is_completed("clustering") or not args.resume: print("\n" + "="*80) print("[Step 5] 聚类和UMAP") print("="*80) adata = cluster_and_umap(adata, resolution=args.resolution) adata.write_h5ad(merged_file) print(f" 聚类数: {adata.obs['leiden'].nunique()}") checkpoint.save_checkpoint("clustering") # Step 6: 注释 if not checkpoint.is_completed("annotation") or not args.resume: print("\n" + "="*80) print("[Step 6] 细胞类型注释") print("="*80) if len(marker_dict) > 0: adata = annotate_by_marker_score(adata, marker_dict) else: adata.obs["celltype"] = adata.obs["leiden"] print(f"\n细胞类型数: {adata.obs['celltype'].nunique()}") print(f"细胞类型分布:") print(adata.obs['celltype'].value_counts()) adata.write_h5ad(merged_file) checkpoint.save_checkpoint("annotation") # Step 7: 高大上的可视化 print("\n" + "="*80) print("[Step 7] 生成高级可视化") print("="*80) plot_publication_umaps(adata, args.outdir, min_cells=args.min_cells_display) plot_celltype_statistics(adata, args.outdir, min_cells=args.min_cells_display) plot_qc_summary(adata, args.outdir) if len(marker_dict) > 0: plot_marker_dotplot_enhanced(adata, marker_dict, args.outdir, min_cells=args.min_cells_display) # Step 8: 差异表达 if not checkpoint.is_completed("de") or not args.resume: de_results = run_differential_expression_complete(adata, args.outdir, min_cells=args.min_cells_display) checkpoint.save_checkpoint("de") # Step 9: 高级分析(可选) if args.enable_advanced: print("\n" + "="*80) print("[Advanced] 高级分析") print("="*80) run_paga_trajectory(adata, args.outdir) run_compositional_analysis(adata, args.outdir) # 保存最终文件 final_file = os.path.join(args.outdir, "final_analyzed.h5ad") adata.write_h5ad(final_file) # 生成报告 summary = f"""{'=' * 80}分析完成!MASTER v{VERSION}{'=' * 80}数据统计: 样本数: {adata.obs['sample'].nunique()} 细胞数: {adata.n_obs:,} 基因数: {adata.n_vars:,} 聚类数: {adata.obs['leiden'].nunique()} 细胞类型: {adata.obs['celltype'].nunique()}显示筛选: 最小细胞数阈值: {args.min_cells_display} 展示的细胞类型: {(adata.obs['celltype'].value_counts() >= args.min_cells_display).sum()}输出文件: 核心数据: - final_analyzed.h5ad - qc_summary.csv - marker_dictionary.csv - environment_info.json 高级可视化: - UMAP_publication_4panel.png ★ 主图 - CellType_statistics_3panel.png ★ 统计 - QC_summary_3panel.png - Marker_dotplot_enhanced.png 差异表达: - DE_all_celltypes.csv - DE_results/*.csv - volcano_plots/*_labeled.png ★ 带标签 高级分析 (如果启用): - PAGA_trajectory.png - compositional_analysis.csv/png随机种子: {RANDOM_SEED}版本信息: environment_info.json{'=' * 80} """ print(summary) with open(os.path.join(args.outdir, "analysis_summary.txt"), "w") as f: f.write(summary) print(f"\n✓ 完成!所有结果保存在: {args.outdir}") print(f"✓ 主图: UMAP_publication_4panel.png") print(f"✓ 统计: CellType_statistics_3panel.png")if __name__ == "__main__": main()