Python空间转录组学
方案1: 降低聚类分辨率(推荐)
python scrna_simple.py \--input data.h5 \--outdir ./results \--subsample5000 \--n_pcs15 \--n_top_genes1000 \--resolution0.5
为什么?
方案2: 更低的分辨率
python scrna_simple.py \--input data.h5 \--outdir ./results \--subsample5000 \--n_pcs15 \--n_top_genes1000 \--resolution0.3
更少的clusters,更稳定
方案3: 增加细胞数(如果内存够)
python scrna_simple.py \--input data.h5 \--outdir ./results \--subsample8000 \--n_pcs15 \--n_top_genes1000 \--resolution0.8
分辨率选择指南
| 细胞数 | 推荐分辨率 | 预期clusters |
|---|
| 2000 | 0.3-0.5 | 5-10 |
| 5000 | 0.5-0.8 | 8-15 |
| 10000 | 0.8-1.0 | 15-25 |
| 20000+ | 1.0-1.5 | 20-40 |
单细胞分析 - CellMarker数据库版
2 研究方法
2.1 单细胞转录组数据获取与预处理总体流程
本研究基于单细胞 RNA 测序(single-cell RNA-seq)数据构建完整的分析流程,涵盖数据读取、质控过滤、标准化与高变基因筛选、降维与聚类、细胞类型自动注释以及差异表达与 marker 基因挖掘。所有分析均在 Python 环境中完成,主要依托 Scanpy 等单细胞分析工具包实现,并结合 CellMarker 数据库对细胞类型进行自动化注释。
2.2 数据导入与质控指标计算
根据原始数据格式(10x Genomics 的矩阵文件、h5 文件、loom 文件或经预处理的表达矩阵 csv 文件),使用 Scanpy 对表达矩阵进行读取,并将基因名标准化处理以保证其唯一性。数据导入后首先统计总体数据规模,包括细胞数(number of cells)与基因数(number of genes)。
为评估数据质量,本研究计算了每个细胞层面的多维度质控指标,包括:
每个细胞检测到的基因数(n_genes_by_counts);
每个细胞的总 UMI 计数(total_counts);
线粒体基因表达比例(pct_counts_mt):将基因名以 “MT-” 开头的基因定义为线粒体相关基因,计算其在每个细胞总 UMI 中的百分比。
在此基础上,分别绘制基因数、总 UMI 以及线粒体基因比例的分布图(小提琴图与箱线图结合)以及“总 UMI vs. 基因数”“线粒体比例 vs. 基因数”的散点图,用于评估测序饱和度、文库复杂度及潜在低质量细胞。
2.3 低质量细胞与低表达基因过滤
为去除显著低质量细胞与极低表达基因,本研究设置如下过滤标准:
细胞过滤:仅保留检测基因数不少于 200 个的细胞;
基因过滤:仅保留在不少于 3 个细胞中有表达的基因;
对表达矩阵中总表达量为 0 的细胞进一步进行排除。
上述过滤步骤旨在剔除极度稀疏的细胞与信息量不足的基因,为后续的归一化与降维分析提供更稳定的输入。
2.4 表达量归一化与高变基因筛选
质控之后的数据首先进行归一化和对数转换。具体步骤包括:
归一化(library size normalization):对每个细胞的总表达量进行归一化,将每个细胞的总 UMI 归一到 1×10⁴(target_sum=1e4),消除测序深度差异的影响;
对数转换(log1p):在归一化后的表达矩阵基础上进行自然对数转换(log1p),提高数据的对称性并减弱极值的影响;
高变基因筛选:采用 Seurat v3 风格的高变基因识别算法,在全基因集中筛选出表达量均值与变异性均较高的前 2 000 个高变基因(n_top_genes=2000),作为后续降维与聚类分析的特征基因。
为了进一步提高数值稳定性,对筛选出的高变基因进行标准化(z-score 转换),并截断极端值(max_value=10),以减少极端高表达基因对 PCA 的不利影响。
2.5 降维分析与邻近图构建
在高变基因子集上进行主成分分析(principal component analysis, PCA),提取前 20 个主成分(n_pcs=20),用于表征细胞在低维空间中的主要变异结构。通过主成分方差解释度曲线评估各主成分的重要性,并绘制 PC1 与 PC2 的散点分布图。
基于 PCA 结果,采用最近邻算法构建细胞间的邻近关系图。具体步骤包括:以欧氏距离为度量,针对每个细胞寻找其 15 个最近邻(n_neighbors=15),构建稀疏的距离矩阵与邻接矩阵。随后通过高斯核函数(基于距离的指数衰减)将距离转换为权重,得到细胞间的连通性矩阵(connectivity matrix),作为后续非线性降维与聚类的输入。
在邻近图基础上,进一步计算 UMAP(uniform manifold approximation and projection)嵌入,将高维表达结构投射至二维空间,以便于可视化不同细胞群体的空间分布与相互关系。
2.6 基于 Leiden 算法的细胞聚类
利用构建好的邻近图与 UMAP 结果,采用 Leiden 算法对细胞进行无监督聚类。聚类分辨率(resolution)设置为 0.5,以在过度分裂和过度合并之间取得平衡。聚类完成后,对聚类结果进行以下处理:
统计每个聚类(cluster)包含的细胞数量;
对细胞数少于 3 个的极小聚类视为不稳定聚类;
对于这些小聚类,通过其在邻近图中与其他聚类的连接关系,将其合并至与之联系最紧密的较大聚类中;若无法确定最优合并对象,则统一合并至包含细胞数最多的主要聚类。
最终,对聚类标签进行重新编号,确保聚类编号的连续性与可读性,并统计每个聚类的细胞数量分布,用于后续细胞类型注释和下游分析。
2.7 基于 CellMarker 数据库的细胞类型自动注释
为实现对各细胞群体的系统性注释,本研究优先调用 CellMarker 数据库中人类(human)物种的标记基因信息。具体步骤如下:
数据库加载与构建 marker 集合:从 CellMarker 数据库(Excel 文件)中读取 human 工作表,提取每种细胞类型对应的 marker 基因列表。为保证注释的稳定性与简洁性,对每种细胞类型仅保留最多 10 个最常用的 marker 基因,构建“细胞类型→marker 基因集”的映射字典。如果外部数据库不可用,则采用预定义的免疫细胞、内皮细胞、成纤维细胞及上皮细胞等常见细胞类型的 marker 集合作为备用库。
聚类水平的 marker 表达评分:对每一个聚类,在其所属细胞中计算每个 marker 基因的表达情况。对于每个给定的细胞类型,其评分定义为该类型 marker 基因集合的平均“表达占比×平均表达量”,即:
细胞类型判定原则:对每个聚类,按照综合得分对所有候选细胞类型进行排序,选择得分最高的细胞类型作为该聚类的主要候选类型,同时记录得分排名第二、第三的候选类型及其得分,用于后续生物学解释和人工复核。若最高得分超过预设阈值(例如 0.05),则将该细胞类型赋予该聚类;若最高得分低于阈值,则该聚类暂标记为 “Unknown_ClusterID”,并在结果文件中保留候选类型信息。
注释信息整合:将细胞聚类编号映射为对应的细胞类型标签,得到每个细胞的注释结果,并输出包含聚类编号、细胞类型、细胞数、候选类型及得分等信息的注释详情表(annotation_details),用于后续分析或人工校正。
2.8 可视化分析与细胞组成展示
为直观展示分析结果,本研究制作了多种图形可视化:
质控图:包括基因数、UMI 总量和线粒体基因比例的小提琴图/箱线图,以及总 UMI vs. 基因数、线粒体比例 vs. 基因数的散点图,用于总体评估数据质量与筛选策略的合理性;
高变基因分布图:展示基因平均表达与归一化方差(或离散度)的关系,并区分高变基因与非高变基因,以体现特征基因筛选的结果;
PCA 与 UMAP 图:绘制 PC1–PC2 的散点图以及 UMAP 嵌入图,分别以聚类编号或细胞类型进行着色,并在 UMAP 图上标记各聚类/细胞类型的空间中心位置;
细胞类型组成图:统计不同细胞类型对应的细胞数量,绘制水平柱状图展示各细胞类型在样本中的组成比例;
marker 基因特征图:针对选定的 marker 基因,在 UMAP 空间上绘制表达量热度图,以展示其在不同细胞群中的空间表达特征。
所有图形均采用统一的配色与排版风格,以满足高质量期刊对图像分辨率与美观性的要求。
2.9 细胞类型特异性 marker 基因鉴定
在细胞类型注释基础上,进一步挖掘各细胞类型的特异性 marker 基因。具体步骤为:
以细胞类型标签(cell_type)为分组变量,在原始表达矩阵(use_raw=True)上采用 Wilcoxon 秩和检验进行差异表达分析,对每一细胞类型与其他细胞类型进行比较;
对每一细胞类型提取排名前 100 个差异基因,记录其 log₂ 倍数变化(log fold change)、原始 P 值和多重检验校正后的 P 值(pval_adj)等统计指标,整理为全量 marker 基因表并输出;
在差异基因中筛选多重检验校正 P 值小于 0.05 的基因,并按统计显著性与效应量排序,选取前 10 个基因作为各细胞类型的代表性 marker 基因,形成精简的高可信度 marker 基因列表;
基于上述 marker 基因集构建 dot plot 等可视化图形,展示不同细胞类型间在 marker 表达模式上的差异与特异性。
综上,本研究构建了一套从原始单细胞 RNA-seq 数据出发,经系统质控、标准化、降维与聚类、数据库驱动的细胞类型自动注释直至细胞类型特异性 marker 挖掘的完整分析流程,为后续生物学机制解析与临床转化提供了高分辨率的细胞层面图谱基础。
核心优势
使用真实的CellMarker数据库
✅ 60,877条标注记录
✅ 1,715种细胞类型
✅ 来自文献的真实marker
✅ 自动匹配最佳细胞类型
使用方法
标准使用(带数据库)
python scrna_database.py \
--input data.h5 \
--database Cell_marker_Human.xlsx \
--outdir ./results \
--subsample5000 \
--n_pcs15 \
--n_top_genes1000 \
--resolution0.5
不带数据库(使用默认库)
python scrna_database.py \
--input data.h5 \
--outdir ./results \
--subsample5000 \
--resolution0.5
CellMarker数据库说明
数据库内容
Cell_marker_Human.xlsx 包含:
- 60,877 条marker记录
- 1,715 种细胞类型
- 来源: 文献、实验验证
涵盖的细胞类型示例
免疫细胞:
T cell (多个亚型)
B cell (多个亚型)
Monocyte
Macrophage (M1, M2等)
Dendritic cell (多个亚型)
NK cell
Plasma cell
Neutrophil
间质细胞:
Fibroblast (多个亚型)
Endothelial cell (多个亚型)
Pericyte
Smooth muscle cell
上皮细胞:
其他:
Adipocyte
Neuron (多个类型)
肿瘤相关细胞
...等等
新增功能
1. 智能细胞类型匹配
程序会:
读取CellMarker数据库
为每种细胞类型提取marker基因
计算每个cluster与所有细胞类型的匹配得分
自动选择最佳匹配
2. 详细的注释报告
生成 annotation_details.csv:
cluster,n_cells,cell_type,score,second_candidate,second_score,third_candidate,third_score
0,857,T cell,0.856,NK cell,0.234,Monocyte,0.145
1,628,B cell,0.912,Plasma cell,0.456,T cell,0.123
...
包含信息:
最佳匹配细胞类型
匹配得分
第2、第3候选类型
方便验证和调整
3. 识别率统计
自动计算并显示:
✓ 识别率: 15/18 (83.3%)
4. 改进的评分算法
得分=Σ(表达比例×平均表达) /marker数量
更准确地评估细胞类型匹配
输出文件
results/
├── QC/
│ ├── QC1_violin_plots.png # 柔和配色小提琴图
│ └── QC2_scatter_plots.png
├── preprocessing/
│ └── highly_variable_genes.png
├── dimensionality_reduction/
│ ├── PCA1_variance_ratio.png
│ └── PCA2_scatter.png
├── figures/
│ ├── Fig_UMAP_annotated.png # 带细胞类型名称
│ ├── Fig_cell_type_distribution.png
│ └── Fig_marker_expression.png
├── markers/
│ ├── all_markers_by_celltype.csv
│ ├── top_markers_by_celltype.csv
│ └── marker_dotplot.png
├── annotation_details.csv # ⭐ 注释详情(新)
└── cell_type_annotations.csv # 最终注释
完整示例
示例1: PBMC数据
python scrna_database.py \
--input pbmc_10k.h5 \
--database Cell_marker_Human.xlsx \
--outdir ./pbmc_results \
--subsample8000 \
--n_pcs20 \
--n_top_genes2000 \
--resolution0.8
预期识别:
示例2: 肿瘤数据
python scrna_database.py \
--input tumor.h5 \
--database Cell_marker_Human.xlsx \
--outdir ./tumor_results \
--subsample10000 \
--resolution0.6
可能识别:
示例3: 快速测试
python scrna_database.py \
--input data.h5 \
--database Cell_marker_Human.xlsx \
--outdir ./test \
--subsample2000 \
--resolution0.5
🔍 如何验证注释结果
1. 查看注释详情
cat annotation_details.csv
查看:
2. 查看top markers
head -20 markers/top_markers_by_celltype.csv
验证marker基因是否符合细胞类型
3. 查看UMAP图
open figures/Fig_UMAP_annotated.png
检查:
4. 如果识别不准确
情况1: 得分很低
Cluster 5: Unknown (best=T cell, score=0.03)
→ 可能需要调整分辨率,或该cluster确实特殊
情况2: 多个高分候选
Cluster 3: B cell (score=0.45)
候选2: Plasma cell (score=0.43)
→ 可能是中间状态细胞,需要进一步分析
情况3: 全部Unknown → 检查数据质量,或使用更低的resolution
最佳实践
第一次运行
# 使用保守的分辨率python scrna_simple.py \--input data.h5 \--outdir ./results \--subsample5000 \--resolution0.5
如果clusters太少
# 增加分辨率python scrna_simple.py \--input data.h5 \--outdir ./results_fine \--subsample5000 \--resolution0.8
如果clusters太多
# 降低分辨率python scrna_simple.py \--input data.h5 \--outdir ./results_coarse \--subsample5000 \--resolution0.3
立即运行(推荐配置)
python scrna_simple.py \--input data.h5 \--outdir ./results \--subsample5000 \--n_pcs15 \--n_top_genes1000 \--resolution0.5
#!/usr/bin/env python# -*- coding: utf-8 -*-"""Single Cell RNA-seq Analysis - Simple & Stable Version简单、稳定、低内存"""import osimport sysimport argparseimport warningsfrom pathlib import Pathimport numpy as npimport pandas as pd# 环境变量 - 防止线程冲突os.environ['OMP_NUM_THREADS'] = '1'os.environ['OPENBLAS_NUM_THREADS'] = '1'os.environ['MKL_NUM_THREADS'] = '1'os.environ['NUMEXPR_NUM_THREADS'] = '1'os.environ['VECLIB_MAXIMUM_THREADS'] = '1'os.environ['NUMBA_NUM_THREADS'] = '1'os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE'import scanpy as scimport matplotlibmatplotlib.use('Agg')import matplotlib.pyplot as pltimport gcwarnings.filterwarnings('ignore')sc.settings.verbosity = 1sc.settings.n_jobs = 1def load_data(input_path, format_type='h5', subsample=None, min_genes=200, min_cells=3): """加载数据""" print(f"\n{'='*60}") print("📂 步骤1: 加载数据") print(f"{'='*60}") print(f"文件: {input_path}") # 加载 if format_type == '10x': adata = sc.read_10x_mtx(input_path, var_names='gene_symbols', cache=True, gex_only=True) elif format_type == 'h5': adata = sc.read_10x_h5(input_path, genome=None, gex_only=True) elif format_type == 'csv': adata = sc.read_csv(input_path, delimiter=',', first_column_names=True) elif format_type == 'loom': adata = sc.read_loom(input_path, sparse=True) # 修复基因名 adata.var_names_make_unique() print(f"✓ 原始数据: {adata.n_obs} cells × {adata.n_vars} genes") # 基础质控 sc.pp.filter_cells(adata, min_genes=min_genes) sc.pp.filter_genes(adata, min_cells=min_cells) # 移除零计数细胞 nonzero = np.asarray(adata.X.sum(axis=1)).flatten() > 0 adata = adata[nonzero, :] print(f"✓ 质控后: {adata.n_obs} cells × {adata.n_vars} genes") # 子采样 if subsample and subsample < adata.n_obs: sc.pp.subsample(adata, n_obs=subsample, random_state=42) print(f"✓ 子采样: {adata.n_obs} cells") # 转为稀疏矩阵 from scipy.sparse import csr_matrix, issparse if not issparse(adata.X): adata.X = csr_matrix(adata.X) return adatadef preprocess(adata, n_top_genes=2000): """预处理""" print(f"\n{'='*60}") print("🔄 步骤2: 预处理") print(f"{'='*60}") # 标准化 print("标准化...") sc.pp.normalize_total(adata, target_sum=1e4) sc.pp.log1p(adata) # 高变基因 print(f"识别高变基因 (top {n_top_genes})...") sc.pp.highly_variable_genes(adata, n_top_genes=n_top_genes, flavor='seurat_v3') n_hvg = adata.var['highly_variable'].sum() print(f"✓ 高变基因: {n_hvg}") # 保存原始数据 adata.raw = adata # 只保留高变基因 adata = adata[:, adata.var['highly_variable']].copy() print(f"✓ 保留: {adata.n_vars} genes") # Scale print("标准化...") sc.pp.scale(adata, max_value=10) gc.collect() return adatadef dimension_reduction(adata, n_pcs=20, n_neighbors=15): """降维""" print(f"\n{'='*60}") print("🔬 步骤3: 降维") print(f"{'='*60}") # PCA print(f"PCA (n_comps={n_pcs})...") sc.tl.pca(adata, n_comps=n_pcs, svd_solver='arpack', random_state=42) # Neighbors - 使用sklearn避免崩溃 print(f"计算邻居 (k={n_neighbors})...") try: from sklearn.neighbors import NearestNeighbors from scipy.sparse import coo_matrix X_pca = adata.obsm['X_pca'] nbrs = NearestNeighbors(n_neighbors=n_neighbors, metric='euclidean', n_jobs=1) nbrs.fit(X_pca) distances, indices = nbrs.kneighbors(X_pca) n_obs = adata.n_obs rows = np.repeat(np.arange(n_obs), n_neighbors) cols = indices.flatten() dists = distances.flatten() # 距离矩阵 dist_matrix = coo_matrix((dists, (rows, cols)), shape=(n_obs, n_obs)).tocsr() # 连接矩阵 sigma = np.median(dists) weights = np.exp(-dists**2 / (2 * sigma**2)) conn_matrix = coo_matrix((weights, (rows, cols)), shape=(n_obs, n_obs)).tocsr() adata.obsp['distances'] = dist_matrix adata.obsp['connectivities'] = conn_matrix adata.uns['neighbors'] = { 'connectivities_key': 'connectivities', 'distances_key': 'distances', 'params': {'n_neighbors': n_neighbors, 'method': 'sklearn'} } print("✓ 使用sklearn方法") except Exception as e: print(f"sklearn失败,使用scanpy: {e}") sc.pp.neighbors(adata, n_neighbors=n_neighbors, n_pcs=n_pcs, random_state=42) # UMAP print("计算UMAP...") try: sc.tl.umap(adata, random_state=42) print("✓ UMAP完成") except Exception as e: print(f"UMAP失败,使用PCA替代: {e}") adata.obsm['X_umap'] = adata.obsm['X_pca'][:, :2] gc.collect() return adatadef cluster(adata, resolution=1.0, min_cluster_size=3): """聚类""" print(f"\n{'='*60}") print("🎯 步骤4: 聚类") print(f"{'='*60}") print(f"Leiden聚类 (resolution={resolution})...") sc.tl.leiden(adata, resolution=resolution, random_state=42, n_iterations=2) # 检查并过滤小cluster cluster_counts = adata.obs['leiden'].value_counts() small_clusters = cluster_counts[cluster_counts < min_cluster_size].index.tolist() if len(small_clusters) > 0: print(f"⚠️ 发现 {len(small_clusters)} 个小cluster (细胞数<{min_cluster_size})") print(f"合并小cluster到最近的大cluster...") # 将小cluster合并到最近的邻居cluster leiden_orig = adata.obs['leiden'].copy() valid_clusters = cluster_counts[cluster_counts >= min_cluster_size].index.tolist() # 对每个小cluster,找到最近的大cluster for small_cl in small_clusters: small_cells = adata.obs['leiden'] == small_cl if small_cells.sum() == 0: continue # 获取这些细胞的邻居 small_indices = np.where(small_cells)[0] neighbor_clusters = [] for idx in small_indices: # 获取邻居细胞的cluster neighbors = adata.obsp['connectivities'][idx].nonzero()[1] neighbor_cls = adata.obs['leiden'].iloc[neighbors].values neighbor_clusters.extend([cl for cl in neighbor_cls if cl in valid_clusters]) # 找到最常见的邻居cluster if neighbor_clusters: from collections import Counter most_common = Counter(neighbor_clusters).most_common(1)[0][0] adata.obs.loc[small_cells, 'leiden'] = most_common else: # 如果没有邻居,合并到最大的cluster largest_cluster = cluster_counts[cluster_counts >= min_cluster_size].idxmax() adata.obs.loc[small_cells, 'leiden'] = largest_cluster # 重新编号cluster unique_clusters = sorted(adata.obs['leiden'].unique()) cluster_map = {old: str(i) for i, old in enumerate(unique_clusters)} adata.obs['leiden'] = adata.obs['leiden'].map(cluster_map).astype('category') adata.obs['Celltype'] = adata.obs['leiden'] n_clusters = len(adata.obs['leiden'].cat.categories) print(f"✓ 最终聚类数: {n_clusters}") # 显示cluster大小分布 cluster_sizes = adata.obs['leiden'].value_counts().sort_index() print(f"✓ Cluster大小: min={cluster_sizes.min()}, max={cluster_sizes.max()}, mean={cluster_sizes.mean():.1f}") return adatadef visualize(adata, outdir, gene_list=None): """可视化""" print(f"\n{'='*60}") print("🎨 步骤5: 可视化") print(f"{'='*60}") viz_dir = Path(outdir) / 'visualizations' viz_dir.mkdir(parents=True, exist_ok=True) plt.switch_backend('Agg') # UMAP聚类图 print("生成UMAP聚类图...") try: fig, ax = plt.subplots(figsize=(8, 6)) sc.pl.umap(adata, color='Celltype', ax=ax, show=False, frameon=False, legend_loc='right margin') plt.tight_layout() plt.savefig(viz_dir / 'umap_clusters.png', dpi=150, bbox_inches='tight') plt.close() print(f"✓ 保存: {viz_dir}/umap_clusters.png") except Exception as e: print(f"⚠️ UMAP图失败: {e}") # 基因表达图 if gene_list: genes = gene_list if isinstance(gene_list, list) else [gene_list] available = [g for g in genes if g in adata.raw.var_names][:10] if available: print(f"生成基因表达图 ({len(available)} genes)...") for gene in available: try: fig, ax = plt.subplots(figsize=(7, 5)) sc.pl.umap(adata, color=gene, ax=ax, show=False, frameon=False, use_raw=True, cmap='Blues') plt.tight_layout() plt.savefig(viz_dir / f'umap_{gene}.png', dpi=150, bbox_inches='tight') plt.close() except Exception as e: print(f"⚠️ {gene} 失败: {e}") print(f"✓ 基因图保存在: {viz_dir}") gc.collect()def find_markers(adata, outdir, n_genes=100): """寻找marker""" print(f"\n{'='*60}") print("🔍 步骤6: 寻找Marker基因") print(f"{'='*60}") marker_dir = Path(outdir) / 'markers' marker_dir.mkdir(parents=True, exist_ok=True) # 检查cluster大小 cluster_sizes = adata.obs['leiden'].value_counts() print(f"Cluster数量: {len(cluster_sizes)}") print(f"每个cluster细胞数: min={cluster_sizes.min()}, max={cluster_sizes.max()}") try: print("计算差异基因...") sc.tl.rank_genes_groups(adata, 'leiden', method='wilcoxon', n_genes=n_genes, use_raw=True) result = adata.uns['rank_genes_groups'] groups = result['names'].dtype.names # 保存Top markers df_names = pd.DataFrame(result['names']) df_names.to_csv(marker_dir / 'top_markers.csv') print(f"✓ 保存: {marker_dir}/top_markers.csv") # 详细统计 records = [] for group in groups: for i in range(len(result['names'][group])): records.append({ 'cluster': group, 'gene': result['names'][group][i], 'score': result['scores'][group][i], 'pval': result['pvals'][group][i], 'pval_adj': result['pvals_adj'][group][i], 'logfoldchange': result['logfoldchanges'][group][i] }) pd.DataFrame(records).to_csv(marker_dir / 'marker_stats.csv', index=False) print(f"✓ 保存: {marker_dir}/marker_stats.csv") # 可视化 print("生成Marker排名图...") try: fig, ax = plt.subplots(figsize=(12, 8)) sc.pl.rank_genes_groups(adata, n_genes=10, ax=ax, show=False) plt.tight_layout() plt.savefig(marker_dir / 'rank_genes.png', dpi=150, bbox_inches='tight') plt.close() print(f"✓ 保存: {marker_dir}/rank_genes.png") except Exception as e: print(f"⚠️ 排名图失败: {e}") gc.collect() return df_names except ValueError as e: if "only contain one sample" in str(e): print(f"⚠️ Marker分析跳过: 某些cluster样本数太少") print(f"提示: 尝试降低分辨率 (--resolution 0.5)") # 保存cluster信息 cluster_info = pd.DataFrame({ 'cluster': cluster_sizes.index, 'n_cells': cluster_sizes.values }) cluster_info.to_csv(marker_dir / 'cluster_sizes.csv', index=False) print(f"✓ 保存cluster大小: {marker_dir}/cluster_sizes.csv") gc.collect() return None else: raise except Exception as e: print(f"⚠️ Marker分析失败: {e}") gc.collect() return Nonedef batch_process(input_dir, pattern, outdir, **kwargs): """批量处理""" print(f"\n{'='*60}") print("📁 批量处理模式") print(f"{'='*60}") input_path = Path(input_dir) files = sorted(input_path.glob(pattern)) print(f"找到 {len(files)} 个文件") results = [] for i, file in enumerate(files, 1): print(f"\n{'#'*60}") print(f"处理 [{i}/{len(files)}]: {file.name}") print(f"{'#'*60}") try: file_outdir = Path(outdir) / file.stem file_outdir.mkdir(parents=True, exist_ok=True) # 分析 adata = load_data(str(file), **kwargs) adata = preprocess(adata, kwargs.get('n_top_genes', 2000)) adata = dimension_reduction(adata, kwargs.get('n_pcs', 20)) adata = cluster(adata, kwargs.get('resolution', 1.0)) visualize(adata, file_outdir, kwargs.get('gene_list')) find_markers(adata, file_outdir) results.append({'file': file.name, 'status': 'success', 'cells': adata.n_obs}) print(f"✓ {file.name} 完成") except Exception as e: print(f"✗ {file.name} 失败: {str(e)}") results.append({'file': file.name, 'status': 'failed', 'error': str(e)}) finally: gc.collect() # 保存批处理结果 df_results = pd.DataFrame(results) df_results.to_csv(Path(outdir) / 'batch_results.csv', index=False) print(f"\n{'='*60}") print("批处理完成!") success = sum(r['status'] == 'success' for r in results) print(f"成功: {success}/{len(results)}") print(f"{'='*60}")def main(): parser = argparse.ArgumentParser( description='单细胞RNA-seq分析 - 简化版', formatter_class=argparse.RawDescriptionHelpFormatter, epilog="""示例: # 标准分析 python scrna_simple.py --input data.h5 --outdir ./results # 低内存模式 python scrna_simple.py --input data.h5 --subsample 5000 --n_pcs 15 # 批量处理 python scrna_simple.py --batch_dir ./data --pattern "*.h5" --outdir ./results """ ) # 输入输出 parser.add_argument('--input', help='输入文件路径') parser.add_argument('--format', choices=['10x', 'h5', 'loom', 'csv'], default='h5', help='输入格式') parser.add_argument('--outdir', default='./output', help='输出目录') # 批量处理 parser.add_argument('--batch_dir', help='批量处理目录') parser.add_argument('--pattern', default='*.h5', help='文件匹配模式') # 参数 parser.add_argument('--subsample', type=int, help='子采样细胞数') parser.add_argument('--n_pcs', type=int, default=20, help='PCA维度') parser.add_argument('--n_top_genes', type=int, default=2000, help='高变基因数') parser.add_argument('--resolution', type=float, default=1.0, help='聚类分辨率') parser.add_argument('--min_genes', type=int, default=200, help='细胞最少基因数') parser.add_argument('--min_cells', type=int, default=3, help='基因最少细胞数') # 可选 parser.add_argument('--gene_list', help='基因列表文件') args = parser.parse_args() # 检查输入 if not args.batch_dir and not args.input: parser.error("需要 --input 或 --batch_dir") # 加载基因列表 gene_list = None if args.gene_list and os.path.exists(args.gene_list): with open(args.gene_list) as f: gene_list = f.read().strip().split() try: # 批量处理 if args.batch_dir: batch_process( args.batch_dir, args.pattern, args.outdir, format_type=args.format, subsample=args.subsample, min_genes=args.min_genes, min_cells=args.min_cells, n_pcs=args.n_pcs, n_top_genes=args.n_top_genes, resolution=args.resolution, gene_list=gene_list ) # 单文件处理 else: print("🚀 单细胞RNA-seq分析") print(f"输入: {args.input}") print(f"输出: {args.outdir}") # 执行分析 adata = load_data( args.input, format_type=args.format, subsample=args.subsample, min_genes=args.min_genes, min_cells=args.min_cells ) adata = preprocess(adata, n_top_genes=args.n_top_genes) adata = dimension_reduction(adata, n_pcs=args.n_pcs) adata = cluster(adata, resolution=args.resolution) visualize(adata, args.outdir, gene_list=gene_list) find_markers(adata, args.outdir) print(f"\n{'='*60}") print(f"✓ 分析完成!") print(f"✓ 结果保存在: {args.outdir}") print(f"{'='*60}\n") except KeyboardInterrupt: print("\n⚠️ 用户中断") sys.exit(1) except Exception as e: print(f"\n✗ 错误: {str(e)}") import traceback traceback.print_exc() sys.exit(1)if __name__ == '__main__': main()
#!/usr/bin/env python# -*- coding: utf-8 -*-"""Single Cell RNA-seq Analysis Pipeline - Final Fixed Version修复: 断点续传时adata加载问题"""import osimport sysimport argparseimport jsonimport warningsfrom pathlib import Pathfrom datetime import datetimeimport numpy as npimport pandas as pd# 环境变量设置os.environ['OMP_NUM_THREADS'] = '1'os.environ['OPENBLAS_NUM_THREADS'] = '1'os.environ['MKL_NUM_THREADS'] = '1'os.environ['NUMEXPR_NUM_THREADS'] = '1'os.environ['VECLIB_MAXIMUM_THREADS'] = '1'os.environ['NUMBA_NUM_THREADS'] = '1'os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE'import scanpy as scimport matplotlibmatplotlib.use('Agg')import matplotlib.pyplot as pltfrom tqdm import tqdmimport gcwarnings.filterwarnings('ignore')sc.settings.verbosity = 0sc.settings.n_jobs = 1class CheckpointManager: """断点管理器""" def __init__(self, checkpoint_dir): self.checkpoint_dir = Path(checkpoint_dir) self.checkpoint_dir.mkdir(parents=True, exist_ok=True) self.checkpoint_file = self.checkpoint_dir / 'checkpoint.json' self.data_file = self.checkpoint_dir / 'adata.h5ad' self.state = self._load_state() def _load_state(self): if self.checkpoint_file.exists(): with open(self.checkpoint_file, 'r') as f: return json.load(f) return {'completed_steps': [], 'last_step': None} def save_checkpoint(self, step_name, adata=None): """保存断点""" self.state['completed_steps'].append(step_name) self.state['last_step'] = step_name self.state['timestamp'] = datetime.now().isoformat() with open(self.checkpoint_file, 'w') as f: json.dump(self.state, f, indent=2) if adata is not None: try: adata.write_h5ad(self.data_file, compression='gzip') except: adata.write_h5ad(self.data_file) def load_checkpoint(self): """加载断点数据""" if self.data_file.exists(): try: print(f"📥 加载断点数据...") adata = sc.read_h5ad(self.data_file) print(f"✓ 成功加载: {adata.n_obs} cells × {adata.n_vars} genes") return adata except Exception as e: print(f"⚠️ 加载断点失败: {e}") return None return None def is_completed(self, step_name): return step_name in self.state['completed_steps'] def reset(self): """重置断点""" self.state = {'completed_steps': [], 'last_step': None} if self.checkpoint_file.exists(): self.checkpoint_file.unlink() if self.data_file.exists(): self.data_file.unlink()class ScRNAAnalyzer: def __init__(self, outdir='./analysis_output', low_memory=True, checkpoint_dir=None): self.outdir = Path(outdir) self.outdir.mkdir(parents=True, exist_ok=True) self.low_memory = low_memory self.adata = None if checkpoint_dir is None: checkpoint_dir = self.outdir / '.checkpoints' self.checkpoint = CheckpointManager(checkpoint_dir) sc.settings.n_jobs = 1 sc.settings.max_memory = 8 if low_memory else 16 def _safe_step(self, step_name, func, *args, **kwargs): """安全执行步骤""" if self.checkpoint.is_completed(step_name): print(f"✓ {step_name} 已完成,跳过") return True # 确保adata已加载 if self.adata is None: print(f"⚠️ adata为None,尝试加载断点...") self.adata = self.checkpoint.load_checkpoint() if self.adata is None: raise ValueError(f"无法加载数据,请使用 --reset_checkpoint 重新开始") try: print(f"▶ 执行: {step_name}") result = func(*args, **kwargs) self.checkpoint.save_checkpoint(step_name, self.adata) print(f"✓ {step_name} 完成") gc.collect() return result except Exception as e: print(f"✗ {step_name} 失败: {str(e)}") raise def load_data(self, input_path, format_type='h5', barcode_file=None, subsample=None, min_genes=200, min_cells=3): """加载数据""" step_name = 'load_data' def _load(): print(f"📂 加载数据: {input_path}") if format_type == '10x': self.adata = sc.read_10x_mtx(input_path, var_names='gene_symbols', cache=True, gex_only=True) elif format_type == 'h5': self.adata = sc.read_10x_h5(input_path, genome=None, gex_only=True) elif format_type == 'csv': self.adata = sc.read_csv(input_path, delimiter=',', first_column_names=True) elif format_type == 'loom': self.adata = sc.read_loom(input_path, sparse=True) self.adata.var_names_make_unique() print(f"📊 原始: {self.adata.n_obs} cells × {self.adata.n_vars} genes") # 质控 sc.pp.filter_cells(self.adata, min_genes=min_genes) sc.pp.filter_genes(self.adata, min_cells=min_cells) # 移除零计数 nonzero_mask = np.asarray(self.adata.X.sum(axis=1)).flatten() > 0 self.adata = self.adata[nonzero_mask, :] print(f"📊 质控后: {self.adata.n_obs} cells × {self.adata.n_vars} genes") # Barcode过滤 if barcode_file and os.path.exists(barcode_file): df = pd.read_csv(barcode_file) barcodes = df.iloc[:, 0].tolist() mask = self.adata.obs_names.isin(barcodes) self.adata = self.adata[mask, :] print(f"📊 Barcode过滤: {self.adata.n_obs} cells") # 子采样 if subsample and subsample < self.adata.n_obs: sc.pp.subsample(self.adata, n_obs=subsample, random_state=42) print(f"📊 子采样: {self.adata.n_obs} cells") # 稀疏化 from scipy.sparse import csr_matrix, issparse if not issparse(self.adata.X): self.adata.X = csr_matrix(self.adata.X) return self.adata return self._safe_step(step_name, _load) def preprocess(self, target_sum=1e4, n_top_genes=2000): """预处理""" def _normalize(): print("🔄 标准化...") sc.pp.normalize_total(self.adata, target_sum=target_sum) sc.pp.log1p(self.adata) return self.adata self._safe_step('normalize', _normalize) def _hvg(): print("🔍 识别高变基因...") sc.pp.highly_variable_genes( self.adata, n_top_genes=n_top_genes, flavor='seurat_v3', batch_key=None ) n_hvg = self.adata.var['highly_variable'].sum() print(f"📊 高变基因: {n_hvg}") # 保存原始数据 self.adata.raw = self.adata # 只保留高变基因 self.adata = self.adata[:, self.adata.var['highly_variable']].copy() print(f"📊 保留: {self.adata.n_vars} genes") return self.adata self._safe_step('hvg', _hvg) def _scale(): print("⚖️ 标准化...") sc.pp.scale(self.adata, max_value=10) return self.adata self._safe_step('scale', _scale) return self.adata def dimension_reduction(self, n_pcs=20, n_neighbors=15): """降维""" def _pca(): print(f"🔬 PCA (n_comps={n_pcs})...") sc.tl.pca(self.adata, n_comps=n_pcs, svd_solver='arpack', random_state=42) return self.adata self._safe_step('pca', _pca) def _neighbors(): print(f"🔗 计算邻居 (k={n_neighbors})...") # 确保adata和X_pca存在 if self.adata is None: raise ValueError("adata is None") if 'X_pca' not in self.adata.obsm: raise ValueError("X_pca not found, PCA may have failed") try: # 方法1: scanpy标准方法 sc.pp.neighbors( self.adata, n_neighbors=n_neighbors, n_pcs=n_pcs, method='umap', metric='euclidean', random_state=42 ) print("✓ 使用scanpy方法") except Exception as e: print(f"⚠️ scanpy方法失败: {e}") print("尝试sklearn备用方法...") # 方法2: sklearn备用 from sklearn.neighbors import NearestNeighbors from scipy.sparse import coo_matrix X_pca = self.adata.obsm['X_pca'] nbrs = NearestNeighbors( n_neighbors=n_neighbors, metric='euclidean', algorithm='auto', n_jobs=1 ).fit(X_pca) distances, indices = nbrs.kneighbors(X_pca) # 构建连接矩阵 n_obs = self.adata.n_obs rows = np.repeat(np.arange(n_obs), n_neighbors) cols = indices.flatten() dists = distances.flatten() # 距离矩阵 distances_matrix = coo_matrix( (dists, (rows, cols)), shape=(n_obs, n_obs) ).tocsr() # 连接矩阵 (使用高斯核) sigma = np.median(dists) weights = np.exp(-dists**2 / (2 * sigma**2)) connectivities = coo_matrix( (weights, (rows, cols)), shape=(n_obs, n_obs) ).tocsr() self.adata.obsp['distances'] = distances_matrix self.adata.obsp['connectivities'] = connectivities self.adata.uns['neighbors'] = { 'connectivities_key': 'connectivities', 'distances_key': 'distances', 'params': { 'n_neighbors': n_neighbors, 'method': 'sklearn', 'metric': 'euclidean' } } print("✓ 使用sklearn方法") return self.adata self._safe_step('neighbors', _neighbors) def _umap(): print("🗺️ 计算UMAP...") try: sc.tl.umap(self.adata, random_state=42, n_components=2) print("✓ UMAP完成") except Exception as e: print(f"⚠️ UMAP失败: {e}") print("使用PCA前2维作为替代...") self.adata.obsm['X_umap'] = self.adata.obsm['X_pca'][:, :2] return self.adata self._safe_step('umap', _umap) return self.adata def cluster(self, resolution=1.0, cluster_file=None): """聚类""" def _cluster(): if cluster_file and os.path.exists(cluster_file): print("📍 加载聚类...") df = pd.read_csv(cluster_file, index_col=0) clusters = df.iloc[:, 0].astype(str).astype('category') self.adata.obs['leiden'] = clusters else: print(f"🎯 Leiden聚类 (resolution={resolution})...") sc.tl.leiden( self.adata, resolution=resolution, random_state=42, n_iterations=2 ) self.adata.obs['Celltype'] = self.adata.obs['leiden'] n_clusters = len(self.adata.obs['leiden'].cat.categories) print(f"📊 聚类数: {n_clusters}") return self.adata return self._safe_step('cluster', _cluster) def visualize(self, gene_list=None, max_genes=10): """可视化""" def _viz(): viz_dir = self.outdir / 'visualizations' viz_dir.mkdir(exist_ok=True) print("🎨 生成可视化...") plt.switch_backend('Agg') # UMAP聚类图 try: fig, ax = plt.subplots(figsize=(8, 6)) sc.pl.umap(self.adata, color='Celltype', ax=ax, show=False, frameon=False) plt.tight_layout() plt.savefig(viz_dir / 'umap_clusters.png', dpi=150, bbox_inches='tight') plt.close() print("✓ UMAP聚类图") except Exception as e: print(f"⚠️ UMAP图失败: {e}") # 基因表达图 if gene_list: genes = gene_list if isinstance(gene_list, list) else [gene_list] if hasattr(self.adata, 'raw') and self.adata.raw is not None: available = [g for g in genes if g in self.adata.raw.var_names][:max_genes] else: available = [g for g in genes if g in self.adata.var_names][:max_genes] if available: print(f"🧬 可视化 {len(available)} 个基因...") for gene in available: try: fig, ax = plt.subplots(figsize=(7, 5)) sc.pl.umap( self.adata, color=gene, ax=ax, show=False, frameon=False, use_raw=(hasattr(self.adata, 'raw')) ) plt.tight_layout() plt.savefig(viz_dir / f'umap_gene_{gene}.png', dpi=150, bbox_inches='tight') plt.close() except: pass print(f"✓ 基因表达图") return self.adata return self._safe_step('visualize', _viz) def find_markers(self, method='wilcoxon', n_genes=100): """寻找marker""" def _markers(): marker_dir = self.outdir / 'markers' marker_dir.mkdir(exist_ok=True) print(f"🔍 寻找Marker (method={method})...") sc.tl.rank_genes_groups( self.adata, 'leiden', method=method, n_genes=n_genes, use_raw=(hasattr(self.adata, 'raw')), key_added='rank_genes_groups' ) result = self.adata.uns['rank_genes_groups'] groups = result['names'].dtype.names # Top markers df_names = pd.DataFrame(result['names']) df_names.to_csv(marker_dir / 'top_markers.csv') # 详细统计 records = [] for group in groups: for i in range(len(result['names'][group])): records.append({ 'cluster': group, 'gene': result['names'][group][i], 'score': result['scores'][group][i], 'pval': result['pvals'][group][i], 'pval_adj': result['pvals_adj'][group][i], 'logfoldchange': result['logfoldchanges'][group][i] }) pd.DataFrame(records).to_csv(marker_dir / 'marker_stats.csv', index=False) print(f"💾 保存: marker_stats.csv") # 可视化 try: fig, ax = plt.subplots(figsize=(12, 8)) sc.pl.rank_genes_groups(self.adata, n_genes=10, ax=ax, show=False) plt.tight_layout() plt.savefig(marker_dir / 'rank_genes.png', dpi=150, bbox_inches='tight') plt.close() print("✓ Marker排名图") except: pass return df_names return self._safe_step('find_markers', _markers) def batch_process_directory(self, input_dir, pattern='*.h5', **kwargs): """批量处理""" input_path = Path(input_dir) files = sorted(input_path.glob(pattern)) print(f"📁 找到 {len(files)} 个文件") results = [] for i, file in enumerate(files, 1): print(f"\n{'='*60}") print(f"处理 [{i}/{len(files)}]: {file.name}") print(f"{'='*60}") try: file_outdir = self.outdir / file.stem file_outdir.mkdir(exist_ok=True) # 重置 self.checkpoint = CheckpointManager(file_outdir / '.checkpoints') self.adata = None # 分析 self.load_data(str(file), **kwargs) self.preprocess() self.dimension_reduction() self.cluster() self.visualize() self.find_markers() results.append({'file': file.name, 'status': 'success', 'cells': self.adata.n_obs}) except Exception as e: print(f"✗ 失败: {str(e)}") results.append({'file': file.name, 'status': 'failed', 'error': str(e)}) finally: self.adata = None gc.collect() # 保存结果 pd.DataFrame(results).to_csv(self.outdir / 'batch_results.csv', index=False) print(f"\n{'='*60}") print("批处理完成!") success = sum(r['status'] == 'success' for r in results) print(f"成功: {success}/{len(results)}") print(f"{'='*60}") return resultsdef main(): parser = argparse.ArgumentParser(description='单细胞分析工具 - 最终修复版') parser.add_argument('--input', help='输入文件') parser.add_argument('--format', choices=['10x', 'h5', 'loom', 'csv'], default='h5') parser.add_argument('--outdir', default='./output') parser.add_argument('--batch_dir', help='批量处理目录') parser.add_argument('--pattern', default='*.h5') parser.add_argument('--low_memory', action='store_true', default=True) parser.add_argument('--subsample', type=int) parser.add_argument('--resume', action='store_true') parser.add_argument('--reset_checkpoint', action='store_true') parser.add_argument('--barcode_file') parser.add_argument('--cluster_file') parser.add_argument('--gene_list') parser.add_argument('--n_pcs', type=int, default=20) parser.add_argument('--n_top_genes', type=int, default=2000) parser.add_argument('--resolution', type=float, default=1.0) parser.add_argument('--min_genes', type=int, default=200) parser.add_argument('--min_cells', type=int, default=3) args = parser.parse_args() if not args.batch_dir and not args.input: parser.error("需要 --input 或 --batch_dir") # 初始化 analyzer = ScRNAAnalyzer(outdir=args.outdir, low_memory=args.low_memory) if args.reset_checkpoint: analyzer.checkpoint.reset() print("✓ 断点已重置") # 基因列表 gene_list = None if args.gene_list and os.path.exists(args.gene_list): with open(args.gene_list) as f: gene_list = f.read().strip().split() try: if args.batch_dir: print("🚀 批量处理模式") analyzer.batch_process_directory( args.batch_dir, pattern=args.pattern, format_type=args.format, subsample=args.subsample, min_genes=args.min_genes, min_cells=args.min_cells ) else: print("🚀 单文件处理模式") # 恢复断点 if args.resume: analyzer.adata = analyzer.checkpoint.load_checkpoint() # 执行分析 if analyzer.adata is None: analyzer.load_data( args.input, format_type=args.format, barcode_file=args.barcode_file, subsample=args.subsample, min_genes=args.min_genes, min_cells=args.min_cells ) analyzer.preprocess(n_top_genes=args.n_top_genes) analyzer.dimension_reduction(n_pcs=args.n_pcs) analyzer.cluster(resolution=args.resolution, cluster_file=args.cluster_file) analyzer.visualize(gene_list=gene_list) analyzer.find_markers() print(f"\n{'='*60}") print(f"✓ 完成! 结果: {args.outdir}") print(f"{'='*60}") except KeyboardInterrupt: print("\n⚠️ 中断,进度已保存") sys.exit(1) except Exception as e: print(f"\n✗ 错误: {str(e)}") import traceback traceback.print_exc() sys.exit(1)if __name__ == '__main__': main()