#!/usr/bin/env python3import reimport sys# ================== 参数 ==================GFF_FILE = "genes.gff3"ZJ_FASTA = "ZJ_genome.fasta"S14_FASTA = "S14_genome.fasta"UPSTREAM = 3000GENE_LIST_FILE = "genes_of_interest.txt"# ==========================================MOTIF = re.compile(r'CA..TG', re.IGNORECASE)def load_fasta(fasta_path): """读取 FASTA,返回 {seqid: 序列大写字符串}""" genome = {} with open(fasta_path) as f: seqid = None seq_lines = [] for line in f: line = line.strip() if not line or line.startswith('#'): continue if line.startswith('>'): if seqid: genome[seqid] = ''.join(seq_lines).upper() seqid = line[1:].split()[0] seq_lines = [] else: seq_lines.append(line) if seqid: genome[seqid] = ''.join(seq_lines).upper() return genomedef parse_gff_first_cds(gff_path, target_genes): """ 解析 GFF,返回字典 {gene_id: (seqid, cds_start, cds_end, strand)} 只取每个基因的第一个 CDS(最靠近转录起始位点) """ # 第一步:读取所有 mRNA 行,建立 mRNA_id -> (gene_id, seqid, strand) mrna_to_gene = {} # 第二步:读取所有 CDS 行,收集每个 mRNA 的所有 CDS 坐标 cds_by_mrna = {} # {mrna_id: [(start, end, seqid, strand), ...]} with open(gff_path) as f: for line in f: if line.startswith('#') or not line.strip(): continue parts = line.strip().split('\t') if len(parts) < 9: continue seqid, source, type_, start, end, score, strand, phase, attrs = parts start = int(start) end = int(end) # 提取 ID 和 Parent attrs_dict = {} for attr in attrs.split(';'): if '=' in attr: k, v = attr.split('=', 1) attrs_dict[k] = v if type_ == 'mRNA': mrna_id = attrs_dict.get('ID') gene_id = attrs_dict.get('Parent') if mrna_id and gene_id: mrna_to_gene[mrna_id] = (gene_id, seqid, strand) elif type_ == 'CDS': mrna_id = attrs_dict.get('Parent') if mrna_id: cds_by_mrna.setdefault(mrna_id, []).append((start, end, seqid, strand)) # 构建每个基因的 CDS 列表(通过 mRNA 映射) gene_cds = {} # {gene_id: [(start, end, seqid, strand), ...]} for mrna_id, cds_list in cds_by_mrna.items(): if mrna_id not in mrna_to_gene: continue gene_id, seqid, strand = mrna_to_gene[mrna_id] if gene_id not in target_genes: continue # 为每个 CDS 加上链信息(每个 CDS 的链应该与 mRNA 一致,但以防万一也保存) for start, end, _, _ in cds_list: gene_cds.setdefault(gene_id, []).append((start, end, seqid, strand)) # 对每个基因的 CDS 排序,取第一个 CDS(正链取最小 start,负链取最大 end) first_cds = {} for gene_id, cds_list in gene_cds.items(): if not cds_list: continue # 提取链信息(所有 CDS 应该同链) strand = cds_list[0][3] if strand == '+': # 正链:按 start 升序,取第一个 cds_list.sort(key=lambda x: x[0]) start, end, seqid, _ = cds_list[0] else: # 负链:按 end 降序(最大 end 最靠近 5' 端) cds_list.sort(key=lambda x: x[1], reverse=True) start, end, seqid, _ = cds_list[0] first_cds[gene_id] = (seqid, start, end, strand) return first_cdsdef get_promoter(genome_dict, seqid, cds_start, cds_end, strand, up): """提取上游 up bp 的序列,返回(序列, 区域描述)""" chrom = genome_dict.get(seqid) if chrom is None: raise ValueError(f"Sequence {seqid} not found in genome") if strand == '+': start = max(1, cds_start - up) end = cds_start - 1 if start > end: start = 1 end = cds_start - 1 seq = chrom[start-1:end] region = f"{seqid}:{start}-{end}" else: start = cds_end + 1 end = min(len(chrom), cds_end + up) if start > end: end = len(chrom) start = cds_end + 1 seq = chrom[start-1:end] # 反向互补 comp = str.maketrans('ACGTacgt', 'TGCAtgca') seq = seq.translate(comp)[::-1].upper() region = f"{seqid}:{start}-{end}(revComp)" return seq, regiondef count_motif(seq, pattern): return len(pattern.findall(seq.upper()))def main(): print("读取目标基因列表...") with open(GENE_LIST_FILE) as f: target_genes = {line.strip() for line in f if line.strip()} print(f"共有 {len(target_genes)} 个基因待处理。") print("加载基因组序列...") genome_zj = load_fasta(ZJ_FASTA) genome_s14 = load_fasta(S14_FASTA) print("基因组加载完成。") print("从 GFF 中提取第一个 CDS 位置...") cds_dict = parse_gff_first_cds(GFF_FILE, target_genes) print(f"成功找到 {len(cds_dict)} 个基因的 CDS。") if not cds_dict: print("错误:没有找到任何目标基因的 CDS,请检查基因ID是否正确。") sys.exit(1) results = [] for i, gene in enumerate(target_genes, 1): if gene not in cds_dict: print(f"警告:基因 {gene} 未找到 CDS,跳过。") continue seqid, cds_s, cds_e, strand = cds_dict[gene] print(f"处理 {gene} ({i}/{len(target_genes)}) ...") seq_zj, reg_zj = get_promoter(genome_zj, seqid, cds_s, cds_e, strand, UPSTREAM) cnt_zj = count_motif(seq_zj, MOTIF) seq_s14, reg_s14 = get_promoter(genome_s14, seqid, cds_s, cds_e, strand, UPSTREAM) cnt_s14 = count_motif(seq_s14, MOTIF) results.append({ 'gene': gene, 'scaffold': seqid, 'strand': strand, 'CDS_start': cds_s, 'CDS_end': cds_e, 'ZJ_promoter_region': reg_zj, 'ZJ_CANNTG_count': cnt_zj, 'S14_promoter_region': reg_s14, 'S14_CANNTG_count': cnt_s14 }) # 保存结果 import pandas as pd df = pd.DataFrame(results) out_csv = "CANNTG_counts.csv" df.to_csv(out_csv, index=False) print(f"\n完成!结果已保存至 {out_csv}") print("\n前5行预览:") print(df.head())if __name__ == "__main__": main()