"""完整的单细胞RNA-seq分析流程 (Python版)功能特性: 智能断点续跑 - 自动检测已完成步骤 GPU加速支持 - 自动检测并使用GPU 多种拟时序方法 - PAGA → DPT → 跳过 完整日志记录 - 详细记录每步进度 结果汇总报告 - 生成分析总结作者:itwangyang版本:v2.0日期:2026-01-11"""import osimport sysimport pickleimport loggingfrom datetime import datetimefrom pathlib import Pathimport warningswarnings.filterwarnings('ignore')import numpy as npimport pandas as pdimport matplotlib.pyplot as pltimport seaborn as sns# 单细胞分析核心库import scanpy as scimport anndata as ad# GPU支持检测try: import rapids_singlecell as rsc GPU_AVAILABLE = True print("✓ 检测到GPU支持 - 将使用RAPIDS加速")except ImportError: GPU_AVAILABLE = False print("ℹ GPU支持未安装 - 使用CPU模式") print("ℹ 安装GPU加速: pip install cupy-cuda11x rapids-singlecell") print("ℹ 注意: 需要NVIDIA GPU和CUDA环境\n")# 可选依赖try: import celltypist CELLTYPIST_AVAILABLE = Trueexcept ImportError: CELLTYPIST_AVAILABLE = False# 设置sc.settings.verbosity = 1sc.settings.set_figure_params(dpi=100, dpi_save=300, frameon=False, figsize=(6, 6), fontsize=12)####################### 配置参数 #######################class Config: """分析配置参数""" # 路径设置 WORK_DIR = "C:\\Users\\wangyang\\Desktop\\ST\\03Seurat" # 修改为你的路径 # 阈值参数 LOG_FC_FILTER = 1.0 ADJ_PVAL_FILTER = 0.05 # 质控参数 MIN_GENES = 200 MIN_CELLS = 5 MAX_MT_PERCENT = 15 MIN_GENES_AFTER_QC = 100 # 降维参数 N_PCS = 20 N_NEIGHBORS = 15 # 聚类参数 RESOLUTION = 0.6 # 高变基因数 N_TOP_GENES = 1500 # 是否强制重新运行 FORCE_RERUN = False # 是否使用GPU USE_GPU = GPU_AVAILABLE####################### 辅助函数 #######################class AnalysisLogger: """分析日志管理器""" def __init__(self, log_file="analysis_progress.log"): self.log_file = log_file self.completed_steps = set() self.checkpoint_file = "analysis_checkpoint.pkl" # 配置日志 logging.basicConfig( level=logging.INFO, format='[%(asctime)s] [%(levelname)s] %(message)s', handlers=[ logging.FileHandler(log_file, encoding='utf-8'), logging.StreamHandler(sys.stdout) ] ) self.logger = logging.getLogger(__name__) # 加载检查点 self.load_checkpoint() def log(self, message, level="INFO"): """记录日志""" if level == "INFO": self.logger.info(message) elif level == "SUCCESS": self.logger.info(f"✓ {message}") elif level == "WARN": self.logger.warning(message) elif level == "ERROR": self.logger.error(message) elif level == "SKIP": self.logger.info(f"⊘ {message}") def load_checkpoint(self): """加载检查点""" if os.path.exists(self.checkpoint_file): with open(self.checkpoint_file, 'rb') as f: self.completed_steps = pickle.load(f) self.log("从检查点恢复分析", "INFO") else: self.log("开始新的分析", "INFO") def save_checkpoint(self): """保存检查点""" with open(self.checkpoint_file, 'wb') as f: pickle.dump(self.completed_steps, f) def is_completed(self, step_name): """检查步骤是否完成""" if Config.FORCE_RERUN: return False return step_name in self.completed_steps def mark_completed(self, step_name): """标记步骤完成""" self.completed_steps.add(step_name) self.save_checkpoint() self.log(f"{step_name} 已完成", "SUCCESS")def check_files_exist(files): """检查文件是否存在""" return all(os.path.exists(f) for f in files)def print_banner(title): """打印步骤横幅""" print("\n" + "=" * 64) print(f" {title}") print("=" * 64)def save_figure(fig, filename): """保存图片到当前工作目录""" try: filepath = os.path.join(os.getcwd(), filename) fig.savefig(filepath, bbox_inches='tight', dpi=300) print(f" → 图片已保存: {filepath}") plt.close(fig) except Exception as e: print(f" ✗ 保存图片失败 {filename}: {str(e)}") plt.close(fig)####################### 主分析流程 #######################class SeuratAnalysisPipeline: """单细胞分析主流程""" def __init__(self, config=Config): self.config = config self.logger = AnalysisLogger() self.adata = None # 切换工作目录 os.chdir(config.WORK_DIR) self.logger.log(f"工作目录: {os.getcwd()}", "INFO") # 打印欢迎信息 self._print_welcome() def _print_welcome(self): """打印欢迎信息""" print("\n" + "╔" + "=" * 62 + "╗") print("║" + " " * 10 + "Scanpy单细胞RNA-seq完整分析流程 v2.0" + " " * 15 + "║") print("║" + " " * 62 + "║") print("║ 功能: 从数据加载到差异分析和拟时序的完整流程" + " " * 16 + "║") print("║ 特性: 智能断点续跑 + GPU加速 + 详细日志" + " " * 21 + "║") print("╚" + "=" * 62 + "╝\n") def _list_generated_files(self, filenames): """列出生成的文件""" print("\n 生成的文件:") for fname in filenames: fpath = os.path.join(os.getcwd(), fname) if os.path.exists(fpath): size_mb = os.path.getsize(fpath) / 1024 / 1024 print(f" ✓ {fname} ({size_mb:.2f} MB)") else: print(f" ✗ {fname} (未找到)") print() def run(self): """运行完整分析流程""" try: self.step01_load_and_qc() self.step02_pca_and_harmony() self.step03_clustering() self.step04_cell_annotation() self.step05_differential_analysis() self.step06_trajectory_analysis() self.generate_summary_report() except Exception as e: self.logger.log(f"分析出错: {str(e)}", "ERROR") raise ####################### STEP 01 ####################### def step01_load_and_qc(self): """步骤01:数据加载和质量控制""" step_files = [ "01.featureViolin.pdf", "01.featureCor.pdf", "01.featureVar.pdf", "step01_adata.h5ad" ] if self.logger.is_completed("step01") and check_files_exist(step_files): self.logger.log("步骤01已完成,跳过", "SKIP") self.adata = sc.read_h5ad("step01_adata.h5ad") return print_banner("STEP 01: 数据加载和质量控制") self.logger.log("开始步骤01:数据加载和质控", "INFO") # 读取10X数据 self.logger.log("读取10X Genomics数据...", "INFO") data_dirs = [d for d in Path(self.config.WORK_DIR).iterdir() if d.is_dir() and d.name != '__pycache__'] adatas = [] for data_dir in data_dirs: try: sample_name = data_dir.name adata_sample = sc.read_10x_mtx( data_dir, var_names='gene_symbols', cache=True ) adata_sample.obs['sample'] = sample_name adatas.append(adata_sample) self.logger.log(f" 加载样本 {sample_name}: {adata_sample.n_obs} cells", "INFO") except Exception as e: self.logger.log(f" 跳过 {data_dir.name}: {str(e)}", "WARN") # 合并数据 self.adata = ad.concat(adatas, label="batch", keys=[a.obs['sample'][0] for a in adatas]) self.logger.log(f"初始数据 - 细胞数: {self.adata.n_obs}, 基因数: {self.adata.n_vars}", "INFO") # 计算QC指标 self.adata.var['mt'] = self.adata.var_names.str.startswith('MT-') sc.pp.calculate_qc_metrics( self.adata, qc_vars=['mt'], percent_top=None, log1p=False, inplace=True ) # QC可视化(过滤前) self.logger.log("生成质控图...", "INFO") fig, axes = plt.subplots(1, 3, figsize=(15, 4)) # 手动绘制violin plot避免版本兼容问题 import seaborn as sns qc_metrics = ['n_genes_by_counts', 'total_counts', 'pct_counts_mt'] titles = ['Number of Genes', 'Total Counts', 'Mitochondrial %'] for idx, (metric, title) in enumerate(zip(qc_metrics, titles)): data_to_plot = self.adata.obs[metric].values axes[idx].violinplot([data_to_plot], positions=[0], showmeans=True, widths=0.7) axes[idx].set_ylabel(title) axes[idx].set_xticks([]) axes[idx].grid(axis='y', alpha=0.3) plt.tight_layout() save_figure(fig, "01.featureViolin.pdf") # 质控过滤 n_cells_before = self.adata.n_obs sc.pp.filter_cells(self.adata, min_genes=self.config.MIN_GENES_AFTER_QC) sc.pp.filter_genes(self.adata, min_cells=self.config.MIN_CELLS) self.adata = self.adata[self.adata.obs.pct_counts_mt < self.config.MAX_MT_PERCENT, :] self.logger.log(f"质控后 - 细胞数: {self.adata.n_obs} (过滤掉 {n_cells_before - self.adata.n_obs})", "INFO") # 特征相关性图 self.logger.log("生成特征相关性图...", "INFO") fig, axes = plt.subplots(1, 2, figsize=(12, 5)) # 手动绘制散点图 axes[0].scatter(self.adata.obs['total_counts'], self.adata.obs['n_genes_by_counts'], s=1, alpha=0.5, c='steelblue') axes[0].set_xlabel('Total Counts') axes[0].set_ylabel('Number of Genes') axes[0].set_title('Feature Correlation') axes[1].scatter(self.adata.obs['total_counts'], self.adata.obs['pct_counts_mt'], s=1, alpha=0.5, c='coral') axes[1].set_xlabel('Total Counts') axes[1].set_ylabel('Mitochondrial %') axes[1].set_title('MT% vs Total Counts') plt.tight_layout() save_figure(fig, "01.featureCor.pdf") # 数据标准化 self.logger.log("数据标准化...", "INFO") sc.pp.normalize_total(self.adata, target_sum=1e4) sc.pp.log1p(self.adata) # 识别高变基因 self.logger.log("识别高变基因...", "INFO") sc.pp.highly_variable_genes( self.adata, n_top_genes=self.config.N_TOP_GENES, flavor='seurat' ) top_genes = self.adata.var_names[self.adata.var.highly_variable][:10].tolist() self.logger.log(f"Top 10高变基因: {', '.join(top_genes)}", "INFO") # 高变基因可视化 self.logger.log("生成高变基因图...", "INFO") fig, axes = plt.subplots(1, 2, figsize=(14, 5)) # 左图:所有基因的分散度 mean_expr = np.array(self.adata.var['means']) dispersions = np.array(self.adata.var['dispersions']) highly_variable = self.adata.var['highly_variable'].values axes[0].scatter(mean_expr[~highly_variable], dispersions[~highly_variable], s=1, alpha=0.3, c='gray', label='Other') axes[0].scatter(mean_expr[highly_variable], dispersions[highly_variable], s=2, alpha=0.7, c='red', label='Highly Variable') axes[0].set_xlabel('Mean Expression') axes[0].set_ylabel('Dispersion') axes[0].set_title('Highly Variable Genes') axes[0].legend() axes[0].set_xscale('log') axes[0].set_yscale('log') # 右图:标注top10基因 axes[1].scatter(mean_expr[~highly_variable], dispersions[~highly_variable], s=1, alpha=0.3, c='gray') axes[1].scatter(mean_expr[highly_variable], dispersions[highly_variable], s=2, alpha=0.7, c='red') # 标注top10基因 for gene in top_genes: if gene in self.adata.var_names: idx = self.adata.var_names.get_loc(gene) axes[1].annotate(gene, (mean_expr[idx], dispersions[idx]), fontsize=8, alpha=0.8) axes[1].set_xlabel('Mean Expression') axes[1].set_ylabel('Dispersion') axes[1].set_title('Top 10 Highly Variable Genes') axes[1].set_xscale('log') axes[1].set_yscale('log') plt.tight_layout() save_figure(fig, "01.featureVar.pdf") # 保存结果 self.adata.write("step01_adata.h5ad") self.logger.mark_completed("step01") # 列出生成的图片 self._list_generated_files(["01.featureViolin.pdf", "01.featureCor.pdf", "01.featureVar.pdf"]) ####################### STEP 02 ####################### def step02_pca_and_harmony(self): """步骤02:PCA降维和Harmony批次校正""" step_files = [ "02.pcaGene.pdf", "02.PCA.pdf", "02.pcaVariance.pdf", "step02_adata.h5ad" ] if self.logger.is_completed("step02") and check_files_exist(step_files): self.logger.log("步骤02已完成,跳过", "SKIP") self.adata = sc.read_h5ad("step02_adata.h5ad") return print_banner("STEP 02: PCA降维和Harmony批次校正") self.logger.log("开始步骤02:PCA降维和批次校正", "INFO") if self.adata is None: self.adata = sc.read_h5ad("step01_adata.h5ad") # 数据缩放 self.logger.log("数据缩放...", "INFO") sc.pp.scale(self.adata, max_value=10) # PCA降维 self.logger.log("执行PCA降维...", "INFO") if self.config.USE_GPU: rsc.pp.pca(self.adata, n_comps=self.config.N_PCS) else: sc.tl.pca(self.adata, n_comps=self.config.N_PCS, svd_solver='arpack') # Harmony批次校正 if len(self.adata.obs['batch'].unique()) > 1: self.logger.log("Harmony批次校正...", "INFO") try: import scanpy.external as sce sce.pp.harmony_integrate(self.adata, 'batch', basis='X_pca', adjusted_basis='X_pca_harmony') use_rep = 'X_pca_harmony' except Exception as e: self.logger.log(f"Harmony失败,使用原始PCA: {str(e)}", "WARN") use_rep = 'X_pca' else: use_rep = 'X_pca' self.adata.uns['use_rep'] = use_rep # PCA基因贡献图 self.logger.log("生成PCA基因贡献图...", "INFO") fig, axes = plt.subplots(2, 2, figsize=(12, 10)) axes = axes.flatten() for i, pc in enumerate([0, 1, 2, 3]): loadings = self.adata.varm['PCs'][:, pc] top_idx = np.argsort(np.abs(loadings))[-20:] y_pos = np.arange(len(top_idx)) axes[i].barh(y_pos, loadings[top_idx]) axes[i].set_yticks(y_pos) axes[i].set_yticklabels(self.adata.var_names[top_idx], fontsize=8) axes[i].set_xlabel('Loading') axes[i].set_title(f'PC{pc+1}') axes[i].grid(axis='x', alpha=0.3) plt.tight_layout() save_figure(fig, "02.pcaGene.pdf") # PCA散点图 self.logger.log("生成PCA散点图...", "INFO") fig, axes = plt.subplots(1, 2, figsize=(12, 5)) pca_coords = self.adata.obsm['X_pca'] colors = plt.cm.tab10(self.adata.obs['batch'].cat.codes) axes[0].scatter(pca_coords[:, 0], pca_coords[:, 1], c=colors, s=1, alpha=0.5) axes[0].set_xlabel('PC1') axes[0].set_ylabel('PC2') axes[0].set_title('PCA (PC1 vs PC2)') axes[1].scatter(pca_coords[:, 2], pca_coords[:, 3], c=colors, s=1, alpha=0.5) axes[1].set_xlabel('PC3') axes[1].set_ylabel('PC4') axes[1].set_title('PCA (PC3 vs PC4)') # 添加图例 from matplotlib.patches import Patch legend_elements = [Patch(facecolor=plt.cm.tab10(i), label=cat) for i, cat in enumerate(self.adata.obs['batch'].cat.categories)] axes[0].legend(handles=legend_elements, loc='best', fontsize=8) plt.tight_layout() save_figure(fig, "02.PCA.pdf") # 方差解释图 self.logger.log("生成方差解释图...", "INFO") fig, ax = plt.subplots(figsize=(10, 5)) variance_ratio = self.adata.uns['pca']['variance_ratio'] x = np.arange(1, len(variance_ratio) + 1) ax.bar(x, variance_ratio, color='steelblue', alpha=0.7) ax.plot(x, np.cumsum(variance_ratio), 'ro-', linewidth=2, markersize=4, label='Cumulative Variance') ax.set_xlabel('Principal Component') ax.set_ylabel('Variance Ratio') ax.set_title('PCA Variance Ratio') ax.legend() ax.grid(axis='y', alpha=0.3) plt.tight_layout() save_figure(fig, "02.pcaVariance.pdf") # 保存结果 self.adata.write("step02_adata.h5ad") self.logger.mark_completed("step02") self._list_generated_files(["02.pcaGene.pdf", "02.PCA.pdf", "02.pcaVariance.pdf"]) ####################### STEP 03 ####################### def step03_clustering(self): """步骤03:细胞聚类和Marker基因识别""" step_files = [ "03.cluster.pdf", "03.Cluster.txt", "03.clusterMarkers.txt", "03.clusterHeatmap.pdf", "step03_adata.h5ad" ] if self.logger.is_completed("step03") and check_files_exist(step_files): self.logger.log("步骤03已完成,跳过", "SKIP") self.adata = sc.read_h5ad("step03_adata.h5ad") return print_banner("STEP 03: 细胞聚类和Marker基因识别") self.logger.log("开始步骤03:细胞聚类", "INFO") if self.adata is None: self.adata = sc.read_h5ad("step02_adata.h5ad") use_rep = self.adata.uns.get('use_rep', 'X_pca') # 构建邻居图 self.logger.log("构建最近邻图...", "INFO") if self.config.USE_GPU: rsc.pp.neighbors(self.adata, n_neighbors=self.config.N_NEIGHBORS, n_pcs=self.config.N_PCS, use_rep=use_rep) else: sc.pp.neighbors(self.adata, n_neighbors=self.config.N_NEIGHBORS, n_pcs=self.config.N_PCS, use_rep=use_rep) # Leiden聚类 self.logger.log("Leiden聚类...", "INFO") if self.config.USE_GPU: rsc.tl.leiden(self.adata, resolution=self.config.RESOLUTION, key_added='leiden') else: sc.tl.leiden(self.adata, resolution=self.config.RESOLUTION, key_added='leiden') n_clusters = len(self.adata.obs['leiden'].unique()) self.logger.log(f"聚类完成 - 共 {n_clusters} 个cluster", "INFO") # tSNE降维 self.logger.log("tSNE降维...", "INFO") if self.config.USE_GPU: rsc.tl.tsne(self.adata, use_rep=use_rep) else: sc.tl.tsne(self.adata, use_rep=use_rep) # UMAP降维 self.logger.log("UMAP降维...", "INFO") if self.config.USE_GPU: rsc.tl.umap(self.adata) else: sc.tl.umap(self.adata) # 聚类可视化 self.logger.log("生成聚类图...", "INFO") fig, axes = plt.subplots(1, 2, figsize=(14, 6)) # tSNE tsne_coords = self.adata.obsm['X_tsne'] clusters = self.adata.obs['leiden'].astype(int) n_clusters = len(np.unique(clusters)) colors = plt.cm.tab20(clusters / n_clusters) axes[0].scatter(tsne_coords[:, 0], tsne_coords[:, 1], c=colors, s=1, alpha=0.6) axes[0].set_xlabel('tSNE 1') axes[0].set_ylabel('tSNE 2') axes[0].set_title('tSNE Clustering') # 添加cluster标签 for cluster_id in np.unique(clusters): cluster_mask = clusters == cluster_id center_x = tsne_coords[cluster_mask, 0].mean() center_y = tsne_coords[cluster_mask, 1].mean() axes[0].text(center_x, center_y, str(cluster_id), fontsize=12, weight='bold', ha='center', va='center', bbox=dict(boxstyle='round,pad=0.3', facecolor='white', alpha=0.7)) # UMAP umap_coords = self.adata.obsm['X_umap'] axes[1].scatter(umap_coords[:, 0], umap_coords[:, 1], c=colors, s=1, alpha=0.6) axes[1].set_xlabel('UMAP 1') axes[1].set_ylabel('UMAP 2') axes[1].set_title('UMAP Clustering') # 添加cluster标签 for cluster_id in np.unique(clusters): cluster_mask = clusters == cluster_id center_x = umap_coords[cluster_mask, 0].mean() center_y = umap_coords[cluster_mask, 1].mean() axes[1].text(center_x, center_y, str(cluster_id), fontsize=12, weight='bold', ha='center', va='center', bbox=dict(boxstyle='round,pad=0.3', facecolor='white', alpha=0.7)) plt.tight_layout() save_figure(fig, "03.cluster.pdf") # 保存聚类结果 cluster_df = pd.DataFrame({ 'cell_id': self.adata.obs_names, 'cluster': self.adata.obs['leiden'] }) cluster_df.to_csv("03.Cluster.txt", sep='\t', index=False) # 寻找marker基因 self.logger.log("寻找各cluster的marker基因(耗时约5-10分钟)...", "INFO") sc.tl.rank_genes_groups( self.adata, 'leiden', method='wilcoxon', key_added='rank_genes_leiden' ) # 提取显著marker markers_df = self._extract_significant_markers('leiden', 'rank_genes_leiden') markers_df.to_csv("03.clusterMarkers.txt", sep='\t', index=False) self.logger.log(f"发现 {len(markers_df)} 个显著marker基因", "INFO") # Marker基因热图 self.logger.log("生成marker基因热图...", "INFO") top_genes = self._get_top_markers_per_cluster('leiden', 'rank_genes_leiden', n=10) # 确保基因存在 top_genes = [g for g in top_genes if g in self.adata.var_names] if len(top_genes) > 0: # 提取表达数据 gene_indices = [self.adata.var_names.get_loc(g) for g in top_genes] expr_data = self.adata.X[:, gene_indices].toarray() if hasattr(self.adata.X, 'toarray') else self.adata.X[:, gene_indices] # 按cluster排序 cluster_order = np.argsort(self.adata.obs['leiden'].values) expr_data_sorted = expr_data[cluster_order, :] # 绘制热图 fig, ax = plt.subplots(figsize=(12, 10)) im = ax.imshow(expr_data_sorted.T, aspect='auto', cmap='RdYlBu_r', interpolation='nearest') ax.set_yticks(range(len(top_genes))) ax.set_yticklabels(top_genes, fontsize=6) ax.set_xlabel('Cells (ordered by cluster)') ax.set_title('Top Marker Genes per Cluster') plt.colorbar(im, ax=ax, label='Expression') plt.tight_layout() save_figure(fig, "03.clusterHeatmap.pdf") else: self.logger.log("没有找到足够的marker基因绘制热图", "WARN") # 保存结果 self.adata.write("step03_adata.h5ad") self.logger.mark_completed("step03") self._list_generated_files(["03.cluster.pdf", "03.Cluster.txt", "03.clusterMarkers.txt", "03.clusterHeatmap.pdf"]) def _extract_significant_markers(self, groupby, key): """提取显著的marker基因""" result = self.adata.uns[key] groups = result['names'].dtype.names markers_list = [] for group in groups: gene_names = result['names'][group] logfcs = result['logfoldchanges'][group] pvals_adj = result['pvals_adj'][group] for i, gene in enumerate(gene_names): if abs(logfcs[i]) > self.config.LOG_FC_FILTER and pvals_adj[i] < self.config.ADJ_PVAL_FILTER: markers_list.append({ 'cluster': group, 'gene': gene, 'avg_log2FC': logfcs[i], 'p_val_adj': pvals_adj[i] }) return pd.DataFrame(markers_list) def _get_top_markers_per_cluster(self, groupby, key, n=10): """获取每个cluster的top N marker基因""" result = self.adata.uns[key] groups = result['names'].dtype.names top_genes = [] for group in groups: top_genes.extend(result['names'][group][:n]) return list(set(top_genes)) # 去重 ####################### STEP 04 ####################### def step04_cell_annotation(self): """步骤04:SingleR细胞类型自动注释""" step_files = [ "04.clusterAnn.txt", "04.cellAnn.txt", "04.cellAnn.pdf", "04.cellMarkers.txt", "step04_adata.h5ad" ] if self.logger.is_completed("step04") and check_files_exist(step_files): self.logger.log("步骤04已完成,跳过", "SKIP") self.adata = sc.read_h5ad("step04_adata.h5ad") return print_banner("STEP 04: 细胞类型自动注释") self.logger.log("开始步骤04:细胞类型注释", "INFO") if self.adata is None: self.adata = sc.read_h5ad("step03_adata.h5ad") # 尝试多种注释方法 annotation_success = False # 方法1: CellTypist if CELLTYPIST_AVAILABLE and not annotation_success: try: self.logger.log("尝试方法1: CellTypist...", "INFO") import celltypist from celltypist import models # 下载模型 model = models.Model.load(model='Immune_All_Low.pkl') # 预测 predictions = celltypist.annotate( self.adata, model=model, majority_voting=True ) self.adata.obs['cell_type'] = predictions.predicted_labels.majority_voting annotation_success = True self.logger.log("✓ CellTypist注释完成", "SUCCESS") except Exception as e: self.logger.log(f"CellTypist失败: {str(e)}", "WARN") # 方法2: 基于marker基因的简单注释 if not annotation_success: self.logger.log("方法2: 基于marker基因注释...", "INFO") # 定义常见细胞类型的marker基因 cell_type_markers = { 'T cells': ['CD3D', 'CD3E', 'CD3G'], 'B cells': ['CD79A', 'CD79B', 'MS4A1'], 'NK cells': ['GNLY', 'NKG7', 'NCAM1'], 'Monocytes': ['CD14', 'LYZ', 'S100A8'], 'Dendritic cells': ['FCER1A', 'CST3'], 'Megakaryocytes': ['PPBP'] } # 为每个cluster分配细胞类型 cluster_annotations = {} for cluster in self.adata.obs['leiden'].unique(): cluster_cells = self.adata.obs['leiden'] == cluster # 计算每种细胞类型的得分 scores = {} for cell_type, markers in cell_type_markers.items(): available_markers = [m for m in markers if m in self.adata.var_names] if available_markers: score = self.adata[cluster_cells, available_markers].X.mean() scores[cell_type] = score # 分配得分最高的类型 if scores: best_type = max(scores, key=scores.get) cluster_annotations[cluster] = best_type else: cluster_annotations[cluster] = f'Unknown_{cluster}' # 应用注释 self.adata.obs['cell_type'] = self.adata.obs['leiden'].map(cluster_annotations) annotation_success = True self.logger.log("✓ 基于marker基因的注释完成", "SUCCESS") # 保存注释结果 cluster_ann = pd.DataFrame({ 'cluster': self.adata.obs['leiden'].unique(), 'cell_type': [self.adata.obs[self.adata.obs['leiden'] == c]['cell_type'].iloc[0] for c in self.adata.obs['leiden'].unique()] }) cluster_ann.to_csv("04.clusterAnn.txt", sep='\t', index=False) cell_ann = pd.DataFrame({ 'cell_id': self.adata.obs_names, 'cell_type': self.adata.obs['cell_type'] }) cell_ann.to_csv("04.cellAnn.txt", sep='\t', index=False) # 统计细胞类型分布 self.logger.log("细胞类型分布:", "INFO") type_counts = self.adata.obs['cell_type'].value_counts() for ct, count in type_counts.items(): pct = 100 * count / len(self.adata) self.logger.log(f" {ct}: {count} cells ({pct:.1f}%)", "INFO") # 可视化 self.logger.log("生成细胞类型注释图...", "INFO") fig, axes = plt.subplots(1, 2, figsize=(14, 6)) # 创建颜色映射 cell_types = self.adata.obs['cell_type'].astype('category') cell_type_codes = cell_types.cat.codes n_types = len(cell_types.cat.categories) colors = plt.cm.tab20(cell_type_codes / max(n_types, 1)) # tSNE tsne_coords = self.adata.obsm['X_tsne'] axes[0].scatter(tsne_coords[:, 0], tsne_coords[:, 1], c=colors, s=1, alpha=0.6) axes[0].set_xlabel('tSNE 1') axes[0].set_ylabel('tSNE 2') axes[0].set_title('Cell Types (tSNE)') # 添加标签 for i, cell_type in enumerate(cell_types.cat.categories): mask = cell_type_codes == i if mask.sum() > 0: center_x = tsne_coords[mask, 0].mean() center_y = tsne_coords[mask, 1].mean() axes[0].text(center_x, center_y, cell_type, fontsize=8, weight='bold', ha='center', va='center', bbox=dict(boxstyle='round,pad=0.3', facecolor='white', alpha=0.7)) # UMAP umap_coords = self.adata.obsm['X_umap'] axes[1].scatter(umap_coords[:, 0], umap_coords[:, 1], c=colors, s=1, alpha=0.6) axes[1].set_xlabel('UMAP 1') axes[1].set_ylabel('UMAP 2') axes[1].set_title('Cell Types (UMAP)') # 添加标签 for i, cell_type in enumerate(cell_types.cat.categories): mask = cell_type_codes == i if mask.sum() > 0: center_x = umap_coords[mask, 0].mean() center_y = umap_coords[mask, 1].mean() axes[1].text(center_x, center_y, cell_type, fontsize=8, weight='bold', ha='center', va='center', bbox=dict(boxstyle='round,pad=0.3', facecolor='white', alpha=0.7)) plt.tight_layout() save_figure(fig, "04.cellAnn.pdf") # 细胞类型marker基因 self.logger.log("寻找细胞类型marker基因...", "INFO") sc.tl.rank_genes_groups( self.adata, 'cell_type', method='wilcoxon', key_added='rank_genes_celltype' ) markers_df = self._extract_significant_markers('cell_type', 'rank_genes_celltype') markers_df.to_csv("04.cellMarkers.txt", sep='\t', index=False) self.logger.log(f"发现 {len(markers_df)} 个细胞类型marker基因", "INFO") # 保存结果 self.adata.write("step04_adata.h5ad") self.logger.mark_completed("step04") self._list_generated_files(["04.clusterAnn.txt", "04.cellAnn.txt", "04.cellAnn.pdf", "04.cellMarkers.txt"]) ####################### STEP 05 ####################### def step05_differential_analysis(self): """步骤05:组间差异基因分析""" if self.logger.is_completed("step05"): self.logger.log("步骤05已完成,跳过", "SKIP") return print_banner("STEP 05: 组间差异基因分析") self.logger.log("开始步骤05:组间差异分析", "INFO") if self.adata is None: self.adata = sc.read_h5ad("step04_adata.h5ad") # 构建分组信息(转换Categorical为字符串) self.adata.obs['group'] = (self.adata.obs['batch'].astype(str) + '_' + self.adata.obs['cell_type'].astype(str)) unique_cell_types = self.adata.obs['cell_type'].unique() self.logger.log(f"检测到 {len(unique_cell_types)} 种细胞类型", "INFO") diff_gene_count = 0 for cell_type in unique_cell_types: # 检查是否有足够的样本 control_group = f'Control_{cell_type}' treat_group = f'Treat_{cell_type}' control_count = sum(self.adata.obs['group'] == control_group) treat_count = sum(self.adata.obs['group'] == treat_group) if control_count > 5 and treat_count > 5: self.logger.log(f"分析 {cell_type}: Control={control_count}, Treat={treat_count}", "INFO") try: # 执行差异分析 sc.tl.rank_genes_groups( self.adata, groupby='group', groups=[treat_group], reference=control_group, method='wilcoxon', key_added=f'rank_genes_{cell_type}' ) # 提取显著差异基因 result = self.adata.uns[f'rank_genes_{cell_type}'] diff_genes = [] gene_names = result['names'][treat_group] logfcs = result['logfoldchanges'][treat_group] pvals_adj = result['pvals_adj'][treat_group] for i, gene in enumerate(gene_names): if abs(logfcs[i]) > self.config.LOG_FC_FILTER and pvals_adj[i] < self.config.ADJ_PVAL_FILTER: diff_genes.append({ 'gene': gene, 'avg_log2FC': logfcs[i], 'p_val_adj': pvals_adj[i] }) if diff_genes: diff_df = pd.DataFrame(diff_genes) safe_name = cell_type.replace(' ', '_').replace('/', '_') output_file = f"05.{safe_name}.diffGene.txt" diff_df.to_csv(output_file, sep='\t', index=False) self.logger.log(f" ✓ {cell_type}: 发现 {len(diff_genes)} 个差异基因", "SUCCESS") diff_gene_count += 1 else: self.logger.log(f" - {cell_type}: 未发现显著差异基因", "INFO") except Exception as e: self.logger.log(f" ✗ {cell_type} 分析出错: {str(e)}", "ERROR") else: self.logger.log(f"跳过 {cell_type}: 样本量不足 (Control={control_count}, Treat={treat_count})", "WARN") self.logger.log(f"完成差异分析,共 {diff_gene_count} 个细胞类型有显著差异", "SUCCESS") # 保存最终结果 self.adata.write("Scanpy.h5ad") self.logger.mark_completed("step05") ####################### STEP 06 ####################### def step06_trajectory_analysis(self): """步骤06:拟时序分析(可选)""" if self.logger.is_completed("step06"): self.logger.log("步骤06已完成,跳过", "SKIP") return print_banner("STEP 06: 拟时序分析(可选)") self.logger.log("开始步骤06:拟时序分析", "INFO") if self.adata is None: self.adata = sc.read_h5ad("step04_adata.h5ad") trajectory_success = False # 方法1: PAGA try: self.logger.log("尝试方法1: PAGA...", "INFO") # 计算PAGA sc.tl.paga(self.adata, groups='leiden') # 可视化 self.logger.log("生成PAGA轨迹图...", "INFO") fig, axes = plt.subplots(1, 2, figsize=(14, 6)) # PAGA图 paga_coords = self.adata.uns['paga']['pos'] connectivities = self.adata.uns['paga']['connectivities'].toarray() # 绘制连接 for i in range(len(paga_coords)): for j in range(i+1, len(paga_coords)): if connectivities[i, j] > 0.1: axes[0].plot([paga_coords[i, 0], paga_coords[j, 0]], [paga_coords[i, 1], paga_coords[j, 1]], 'k-', alpha=connectivities[i, j], linewidth=2) # 绘制节点 axes[0].scatter(paga_coords[:, 0], paga_coords[:, 1], s=200, c=range(len(paga_coords)), cmap='tab20') for i, pos in enumerate(paga_coords): axes[0].text(pos[0], pos[1], str(i), ha='center', va='center', fontsize=10, weight='bold') axes[0].set_title('PAGA Graph') axes[0].axis('off') # UMAP with PAGA umap_coords = self.adata.obsm['X_umap'] clusters = self.adata.obs['leiden'].astype(int) colors = plt.cm.tab20(clusters / len(np.unique(clusters))) axes[1].scatter(umap_coords[:, 0], umap_coords[:, 1], c=colors, s=1, alpha=0.5) axes[1].set_xlabel('UMAP 1') axes[1].set_ylabel('UMAP 2') axes[1].set_title('PAGA with Clusters') plt.tight_layout() save_figure(fig, "06.trajectory.paga.pdf") # 计算伪时间 sc.tl.dpt(self.adata, n_dcs=15) # 保存伪时间 pseudotime_df = pd.DataFrame({ 'cell_id': self.adata.obs_names, 'pseudotime': self.adata.obs['dpt_pseudotime'], 'cluster': self.adata.obs['leiden'] }) pseudotime_df.to_csv("06.pseudotime.txt", sep='\t', index=False) # 可视化伪时间 self.logger.log("生成伪时间图...", "INFO") fig, ax = plt.subplots(figsize=(8, 6)) umap_coords = self.adata.obsm['X_umap'] pseudotime = self.adata.obs['dpt_pseudotime'].values scatter = ax.scatter(umap_coords[:, 0], umap_coords[:, 1], c=pseudotime, s=2, cmap='viridis', alpha=0.8) ax.set_xlabel('UMAP 1') ax.set_ylabel('UMAP 2') ax.set_title('Pseudotime (DPT)') plt.colorbar(scatter, ax=ax, label='Pseudotime') plt.tight_layout() save_figure(fig, "06.pseudotime.pdf") self.logger.log("✓ PAGA拟时序分析完成", "SUCCESS") trajectory_success = True except Exception as e: self.logger.log(f"PAGA失败: {str(e)}", "WARN") # 如果PAGA失败,尝试简单的Diffusion Pseudotime if not trajectory_success: try: self.logger.log("尝试方法2: Diffusion Pseudotime...", "INFO") sc.tl.diffmap(self.adata) sc.tl.dpt(self.adata) fig, ax = plt.subplots(figsize=(8, 6)) umap_coords = self.adata.obsm['X_umap'] pseudotime = self.adata.obs['dpt_pseudotime'].values scatter = ax.scatter(umap_coords[:, 0], umap_coords[:, 1], c=pseudotime, s=2, cmap='viridis', alpha=0.8) ax.set_xlabel('UMAP 1') ax.set_ylabel('UMAP 2') ax.set_title('Pseudotime (DPT)') plt.colorbar(scatter, ax=ax, label='Pseudotime') plt.tight_layout() save_figure(fig, "06.pseudotime.simple.pdf") self.logger.log("✓ 简单伪时间分析完成", "SUCCESS") trajectory_success = True except Exception as e: self.logger.log(f"伪时间分析失败: {str(e)}", "WARN") if not trajectory_success: self.logger.log("拟时序分析未能完成,跳过此步骤", "WARN") self.logger.log("提示: 拟时序分析为可选步骤,不影响主要分析结果", "INFO") self.logger.mark_completed("step06") ####################### 总结报告 ####################### def generate_summary_report(self): """生成分析总结报告""" print("\n\n") print("╔" + "=" * 62 + "╗") print("║" + " " * 25 + "分析完成总结" + " " * 25 + "║") print("╚" + "=" * 62 + "╝\n") # 步骤完成状态 print("【步骤完成状态】") print("─" * 64) steps = { "step01": "数据加载和质控", "step02": "PCA降维和批次校正", "step03": "细胞聚类和Marker基因", "step04": "细胞类型注释", "step05": "组间差异分析", "step06": "拟时序分析(可选)" } for step_id, step_name in steps.items(): status = "✓ 已完成" if step_id in self.logger.completed_steps else "✗ 未完成" print(f"{status}: {step_name}") print() # 数据统计 if self.adata is not None: print("【数据统计】") print("─" * 64) print(f"总细胞数: {self.adata.n_obs}") print(f"总基因数: {self.adata.n_vars}") if 'leiden' in self.adata.obs: print(f"聚类数: {len(self.adata.obs['leiden'].unique())}") if 'cell_type' in self.adata.obs: print("\n细胞类型分布:") type_counts = self.adata.obs['cell_type'].value_counts() for ct, count in type_counts.items(): pct = 100 * count / self.adata.n_obs print(f" - {ct}: {count} cells ({pct:.1f}%)") print() # 生成的文件 print("【生成的文件】") print("─" * 64) output_files = sorted([f for f in os.listdir('.') if f.endswith(('.pdf', '.txt', '.h5ad'))]) for category in ['01', '02', '03', '04', '05', '06']: category_files = [f for f in output_files if f.startswith(category)] if category_files: print(f"\n步骤{category}:") for f in category_files: print(f" - {f}") print("\n【重要结果文件】") print("─" * 64) print(" - Scanpy.h5ad # 完整AnnData对象") print(" - 04.cellAnn.txt # 细胞类型注释") print(" - 03.clusterMarkers.txt # Cluster marker基因") print(" - 04.cellMarkers.txt # 细胞类型marker基因") print(" - 05.*.diffGene.txt # 差异基因列表") print(" - analysis_progress.log # 详细分析日志") print() print("【工作目录】") print("─" * 64) print(f" {os.getcwd()}") print() print("【下一步分析建议】") print("─" * 64) print(" 1. 查看 analysis_progress.log 了解详细过程") print(" 2. 对差异基因进行GO/KEGG富集分析") print(" 3. 使用 Scanpy.h5ad 进行后续定制化分析") print(" 4. 根据生物学意义解释细胞类型和差异基因") print() print("╔" + "=" * 62 + "╗") print("║" + " " * 22 + "分析流程全部完成!" + " " * 22 + "║") print("╚" + "=" * 62 + "╝\n")####################### 主函数 #######################def main(): """主函数""" # 创建分析流程 pipeline = SeuratAnalysisPipeline() # 运行分析 pipeline.run()if __name__ == "__main__": main()