#!/usr/bin/env python3# -*- coding: utf-8 -*-'''使用方法:python geo_sc_pipeline_cellmarker.py \ --clinical clinical.csv \ --h5_dir ./h5_files \ --outdir ./sc_out \ --tissue_filter Endometrium \ --tissue_mode tissue_type \ --seed 42'''import osimport argparseimport warningsfrom typing import Dict, List, Optionalimport numpy as npimport pandas as pdimport scanpy as scimport anndata as adwarnings.filterwarnings("ignore")os.environ.pop("CUDA_VISIBLE_DEVICES", None)os.environ.pop("PL_TORCH_DISTRIBUTED_BACKEND", None)# 设置所有随机种子以确保可重复性import randomimport torch# 设置随机种子函数def set_all_seeds(seed=0): """设置所有随机种子以确保可重复性""" random.seed(seed) np.random.seed(seed) os.environ['PYTHONHASHSEED'] = str(seed) # PyTorch 相关 torch.manual_seed(seed) if torch.cuda.is_available(): torch.cuda.manual_seed(seed) torch.cuda.manual_seed_all(seed) # 确保可重复性 torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False# 在导入其他包之前设置种子set_all_seeds(0)import scviimport scrublet as scrimport statsmodels.api as smimport gseapy as gpimport requestsCELL_MARKER_HUMAN_URL = "http://www.bio-bigdata.center/CellMarker_download_files/file/Cell_marker_Human.xlsx"# -----------------------------# Clinical + h5 mapping# -----------------------------def read_clinical(clinical_path: str) -> pd.DataFrame: if clinical_path.endswith(".csv"): df = pd.read_csv(clinical_path) else: df = pd.read_table(clinical_path) if "type" not in df.columns: raise ValueError("clinical 文件必须包含列: type (control/treat)") if "sample" not in df.columns: raise ValueError("clinical 文件必须包含列: sample (用于匹配.h5文件名)") df["type"] = df["type"].astype(str).str.lower() if not set(df["type"].unique()).issubset({"control", "treat"}): raise ValueError(f"type列只能包含control/treat,你的type值为: {df['type'].unique()}") return dfdef list_h5_files(h5_dir: str) -> List[str]: files = [os.path.join(h5_dir, fn) for fn in os.listdir(h5_dir) if fn.endswith(".h5")] if len(files) == 0: raise FileNotFoundError(f"{h5_dir} 下找不到 .h5 文件") return sorted(files)def match_sample_to_h5(clinical_df: pd.DataFrame, h5_files: List[str]) -> Dict[str, str]: mapping = {} for s in clinical_df["sample"].astype(str): hits = [f for f in h5_files if s in os.path.basename(f)] if len(hits) == 1: mapping[s] = hits[0] elif len(hits) == 0: raise ValueError(f"sample={s} 在h5_dir里找不到匹配文件(文件名需包含sample)") else: raise ValueError(f"sample={s} 匹配到多个h5文件: {hits},请保证唯一") return mapping# -----------------------------# QC + doublets# -----------------------------def add_qc_metrics(adata: ad.AnnData) -> None: adata.var["mt"] = adata.var_names.str.startswith("MT-") sc.pp.calculate_qc_metrics(adata, qc_vars=["mt"], inplace=True)def per_sample_qc_filter(adata: ad.AnnData, min_genes=300, min_umis=500, max_mt=20.0) -> ad.AnnData: add_qc_metrics(adata) keep = ( (adata.obs["n_genes_by_counts"] >= min_genes) & (adata.obs["total_counts"] >= min_umis) & (adata.obs["pct_counts_mt"] <= max_mt) ) return adata[keep].copy()def run_scrublet(adata: ad.AnnData, expected_doublet_rate=0.06) -> ad.AnnData: scrub = scr.Scrublet(adata.X, expected_doublet_rate=expected_doublet_rate) scores, preds = scrub.scrub_doublets() adata.obs["doublet_score"] = scores adata.obs["predicted_doublet"] = preds.astype(bool) return adatadef drop_doublets(adata: ad.AnnData) -> ad.AnnData: if "predicted_doublet" not in adata.obs.columns: return adata return adata[~adata.obs["predicted_doublet"]].copy()def read_one_10x_h5(h5_path: str, sample_id: str, group: str) -> ad.AnnData: a = sc.read_10x_h5(h5_path) a.var_names_make_unique() a.obs["sample"] = sample_id a.obs["type"] = group return a# -----------------------------# scVI integration + clustering# -----------------------------def integrate_scvi(adata: ad.AnnData, n_hvg=3000, latent_dim=30, max_epochs=200, seed=0) -> ad.AnnData: sc.pp.filter_genes(adata, min_cells=3) adata.layers["counts"] = adata.X.copy() sc.pp.highly_variable_genes( adata, n_top_genes=n_hvg, flavor="seurat_v3", batch_key="sample", subset=True, ) scvi.model.SCVI.setup_anndata(adata, layer="counts", batch_key="sample") model = scvi.model.SCVI(adata, n_latent=latent_dim) # 新版本scvi-tools的训练参数 model.train(max_epochs=max_epochs, plan_kwargs={"lr": 1e-3}, seed=seed, accelerator="cpu", devices=1) adata.obsm["X_scVI"] = model.get_latent_representation() return adatadef cluster_and_umap(adata: ad.AnnData, neighbors_k=15, resolution=0.5) -> ad.AnnData: sc.pp.neighbors(adata, use_rep="X_scVI", n_neighbors=neighbors_k) sc.tl.umap(adata) sc.tl.leiden(adata, resolution=resolution, key_added="leiden") return adatadef find_markers(adata: ad.AnnData, groupby="leiden", method="wilcoxon") -> pd.DataFrame: sc.pp.normalize_total(adata, target_sum=1e4) sc.pp.log1p(adata) sc.tl.rank_genes_groups(adata, groupby=groupby, method=method) return sc.get.rank_genes_groups_df(adata, group=None)# -----------------------------# CellMarker download + parse (YOUR FORMAT)# -----------------------------def download_file(url: str, out_path: str, force=False, timeout=60) -> str: os.makedirs(os.path.dirname(out_path), exist_ok=True) if os.path.exists(out_path) and (not force): return out_path r = requests.get(url, stream=True, timeout=timeout) r.raise_for_status() with open(out_path, "wb") as f: for chunk in r.iter_content(chunk_size=1024 * 1024): if chunk: f.write(chunk) return out_pathdef load_cellmarker_human_markers_from_xlsx( xlsx_path: str, tissue_filter: Optional[str] = None, tissue_mode: str = "tissue_type", # tissue_type or tissue_class min_genes_per_celltype: int = 5, max_genes_per_celltype: int = 80,) -> Dict[str, List[str]]: """ 你的 Excel 列:species, tissue_class, tissue_type, cell_name, Symbol ... 我们用: - cell_name 作为 celltype 名字 - Symbol 作为 gene symbol(大写) - tissue_filter 可选过滤组织(包含匹配) """ df = pd.read_excel(xlsx_path) # default first sheet needed = {"species", "cell_name", "Symbol", "tissue_class", "tissue_type"} missing = needed - set(df.columns) if missing: raise ValueError(f"CellMarker表缺少列: {missing},现有列: {df.columns.tolist()}") df = df[df["species"].astype(str).str.lower() == "human"].copy() # tissue filter if tissue_filter: tf = tissue_filter.strip().lower() if tissue_mode not in {"tissue_type", "tissue_class"}: raise ValueError("--tissue_mode 只能是 tissue_type 或 tissue_class") df[tissue_mode] = df[tissue_mode].astype(str) df = df[df[tissue_mode].str.lower().str.contains(tf, na=False)].copy() df["cell_name"] = df["cell_name"].astype(str).str.strip() df["Symbol"] = df["Symbol"].astype(str).str.strip().str.upper() df = df[(df["cell_name"] != "") & (df["Symbol"] != "")] marker_dict: Dict[str, List[str]] = {} for ct, sub in df.groupby("cell_name"): genes = sub["Symbol"].dropna().unique().tolist() genes = sorted(set(genes)) if len(genes) < min_genes_per_celltype: continue if len(genes) > max_genes_per_celltype: genes = genes[:max_genes_per_celltype] marker_dict[str(ct)] = genes return marker_dict# -----------------------------# Auto annotation by CellMarker (cluster-level)# -----------------------------def score_celltypes_per_cell(adata: ad.AnnData, marker_dict: Dict[str, List[str]], prefix="cm_") -> List[str]: # ensure log1p exists if "log1p" not in adata.uns_keys(): sc.pp.normalize_total(adata, target_sum=1e4) sc.pp.log1p(adata) score_cols = [] for ct, genes in marker_dict.items(): valid = [g for g in genes if g in adata.var_names] if len(valid) < 3: continue col = f"{prefix}{ct}" sc.tl.score_genes(adata, gene_list=valid, score_name=col, use_raw=False) score_cols.append(col) return score_colsdef assign_cluster_celltypes( adata: ad.AnnData, score_cols: List[str], cluster_key="leiden", prefix="cm_", out_cluster_key="celltype_cluster", out_cell_key="celltype_auto",) -> ad.AnnData: if len(score_cols) == 0: adata.obs[out_cluster_key] = "unknown" adata.obs[out_cell_key] = "unknown" return adata df = adata.obs[[cluster_key] + score_cols].copy() mean_scores = df.groupby(cluster_key)[score_cols].mean() best_col = mean_scores.idxmax(axis=1) cluster2ct = best_col.apply(lambda x: x[len(prefix):] if x.startswith(prefix) else x).to_dict() adata.obs[out_cluster_key] = adata.obs[cluster_key].map(cluster2ct).astype("category") adata.obs[out_cell_key] = adata.obs[cluster_key].map(cluster2ct).astype("category") return adata# -----------------------------# Proportion shift# -----------------------------def compute_celltype_proportions(adata: ad.AnnData, celltype_key="celltype_auto") -> pd.DataFrame: df = adata.obs[["sample", "type", celltype_key]].copy() tab = ( df.groupby(["sample", "type", celltype_key]) .size() .reset_index(name="n_cells") ) totals = tab.groupby(["sample"])["n_cells"].sum().rename("n_total") tab = tab.merge(totals, on="sample") tab["prop"] = tab["n_cells"] / tab["n_total"] return tabdef mannwhitneyu_by_celltype(prop_df: pd.DataFrame, celltype_col="celltype_auto") -> pd.DataFrame: from scipy.stats import mannwhitneyu out = [] for ct in prop_df[celltype_col].unique(): d = prop_df[prop_df[celltype_col] == ct] c = d[d["type"] == "control"]["prop"].values t = d[d["type"] == "treat"]["prop"].values if len(c) >= 2 and len(t) >= 2: _, p = mannwhitneyu(t, c, alternative="two-sided") out.append([ct, len(c), len(t), np.mean(c), np.mean(t), p]) return pd.DataFrame(out, columns=["celltype", "n_control", "n_treat", "mean_control", "mean_treat", "p_mannwhitneyu"])def beta_regression(prop_df: pd.DataFrame, celltype_col="celltype_auto") -> pd.DataFrame: out = [] eps = 1e-4 for ct in prop_df[celltype_col].unique(): d = prop_df[prop_df[celltype_col] == ct].copy() if d["type"].nunique() < 2: continue y = np.clip(d["prop"].values, eps, 1 - eps) y_logit = np.log(y / (1 - y)) X = (d["type"].values == "treat").astype(int) X = sm.add_constant(X) model = sm.OLS(y_logit, X).fit() out.append([ct, model.params[1], model.pvalues[1], model.rsquared]) return pd.DataFrame(out, columns=["celltype", "coef_treat", "p_value", "r2"])# -----------------------------# Enrichment# -----------------------------def run_enrichr(gene_list: List[str], library="KEGG_2021_Human", outdir="enrichr_out") -> pd.DataFrame: os.makedirs(outdir, exist_ok=True) enr = gp.enrichr(gene_list=gene_list, gene_sets=library, organism="Human", outdir=outdir, no_plot=True) return enr.results# -----------------------------# Main# -----------------------------def main(): parser = argparse.ArgumentParser("GEO scRNA pipeline: QC + scrublet + scVI + CellMarker auto annotation") parser.add_argument("--clinical", required=True) parser.add_argument("--h5_dir", required=True) parser.add_argument("--outdir", default="sc_out") parser.add_argument("--min_genes", type=int, default=300) parser.add_argument("--min_umis", type=int, default=500) parser.add_argument("--max_mt", type=float, default=20.0) parser.add_argument("--doublet_rate", type=float, default=0.06) parser.add_argument("--hvg", type=int, default=3000) parser.add_argument("--latent", type=int, default=30) parser.add_argument("--epochs", type=int, default=200) parser.add_argument("--resolution", type=float, default=0.5) # CellMarker options parser.add_argument("--cellmarker_cache_dir", default="cellmarker_db") parser.add_argument("--tissue_filter", default=None, help='e.g. "Endometrium" or "Uterus" (recommended)') parser.add_argument("--tissue_mode", default="tissue_type", choices=["tissue_type", "tissue_class"]) parser.add_argument("--cm_min_genes", type=int, default=5) parser.add_argument("--cm_max_genes", type=int, default=80) parser.add_argument("--seed", type=int, default=0, help="随机种子") args = parser.parse_args() os.makedirs(args.outdir, exist_ok=True) # 设置随机种子 set_all_seeds(args.seed) # clinical + h5 mapping clinical = read_clinical(args.clinical) h5_files = list_h5_files(args.h5_dir) mapping = match_sample_to_h5(clinical, h5_files) # per-sample QC + scrublet adatas = [] qc_rows = [] for _, row in clinical.iterrows(): sample = str(row["sample"]) group = str(row["type"]).lower() h5 = mapping[sample] print(f"[Load] {sample} ({group}) <- {os.path.basename(h5)}") a = read_one_10x_h5(h5, sample, group) n0 = a.n_obs a = per_sample_qc_filter(a, args.min_genes, args.min_umis, args.max_mt) n1 = a.n_obs a = run_scrublet(a, args.doublet_rate) d0 = int(a.obs["predicted_doublet"].sum()) a = drop_doublets(a) n2 = a.n_obs qc_rows.append([sample, group, n0, n1, d0, n2]) adatas.append(a) qc_df = pd.DataFrame(qc_rows, columns=["sample", "type", "cells_raw", "cells_after_qc", "doublets_removed", "cells_final"]) qc_df.to_csv(os.path.join(args.outdir, "qc_summary.csv"), index=False) # merge adata = ad.concat(adatas, join="outer", label="sample", keys=[a.obs["sample"][0] for a in adatas], fill_value=0) adata.obs["type"] = adata.obs["type"].astype(str).str.lower() adata.write_h5ad(os.path.join(args.outdir, "merged_after_qc.h5ad")) # scVI - 移除原有的 scvi.settings.seed = 0 print("[scVI] Running integration...") adata = integrate_scvi(adata, n_hvg=args.hvg, latent_dim=args.latent, max_epochs=args.epochs, seed=args.seed) # cluster print("[Clustering] Running clustering and UMAP...") adata = cluster_and_umap(adata, neighbors_k=15, resolution=args.resolution) # markers print("[Markers] Finding cluster markers...") markers = find_markers(adata, groupby="leiden") markers.to_csv(os.path.join(args.outdir, "cluster_markers.csv"), index=False) # download + parse CellMarker cm_cache = os.path.join(args.outdir, args.cellmarker_cache_dir) os.makedirs(cm_cache, exist_ok=True) cm_xlsx = os.path.join(cm_cache, "Cell_marker_Human.xlsx") print(f"[CellMarker] downloading/caching: {cm_xlsx}") download_file(CELL_MARKER_HUMAN_URL, cm_xlsx, force=False) marker_dict = load_cellmarker_human_markers_from_xlsx( cm_xlsx, tissue_filter=args.tissue_filter, tissue_mode=args.tissue_mode, min_genes_per_celltype=args.cm_min_genes, max_genes_per_celltype=args.cm_max_genes, ) print(f"[CellMarker] usable celltypes: {len(marker_dict)}") # score + assign cluster celltypes print("[CellMarker] Scoring cell types...") score_cols = score_celltypes_per_cell(adata, marker_dict, prefix="cm_") adata = assign_cluster_celltypes(adata, score_cols, cluster_key="leiden", prefix="cm_", out_cluster_key="celltype_cluster", out_cell_key="celltype_auto") # export mapping ct_map = adata.obs[["leiden", "celltype_cluster"]].drop_duplicates().sort_values("leiden") ct_map.to_csv(os.path.join(args.outdir, "cluster_to_celltype.csv"), index=False) # proportions + stats print("[Statistics] Computing cell type proportions...") prop = compute_celltype_proportions(adata, celltype_key="celltype_auto") prop.to_csv(os.path.join(args.outdir, "celltype_proportions.csv"), index=False) prop_u = mannwhitneyu_by_celltype(prop, celltype_col="celltype_auto") prop_u.to_csv(os.path.join(args.outdir, "prop_stats_mannwhitneyu.csv"), index=False) prop_b = beta_regression(prop, celltype_col="celltype_auto") prop_b.to_csv(os.path.join(args.outdir, "prop_stats_logit_ols.csv"), index=False) # save adata.write_h5ad(os.path.join(args.outdir, "final_scvi_cellmarker_annotated.h5ad")) # plots sc.settings.figdir = args.outdir print("[Plotting] Creating UMAP plots...") sc.pl.umap(adata, color=["type", "sample", "leiden", "celltype_auto"], wspace=0.4, show=False, save="_overview.png") print("=" * 50) print("DONE. Results saved in:", args.outdir) print("=" * 50)if __name__ == "__main__": main()