#!/usr/bin/env python3# -*- coding: utf-8 -*-import argparseimport osimport reimport shutilimport subprocessimport sysfrom pathlib import Path# -----------------------------# helpers# -----------------------------def run(cmd, cwd: Path | None = None, stdin: str | None = None, env: dict | None = None) -> str: """Run a command, raise with stdout/stderr on failure.""" cmd_str = " ".join(map(str, cmd)) print(f"\n[RUN] {cmd_str}") p = subprocess.run( list(map(str, cmd)), cwd=str(cwd) if cwd else None, input=stdin, text=True, capture_output=True, env=env, ) if p.returncode != 0: print("\n[STDOUT]\n" + p.stdout) print("\n[STDERR]\n" + p.stderr, file=sys.stderr) raise RuntimeError(f"Command failed ({p.returncode}): {cmd_str}") return p.stdoutdef which_or_die(exe: str): if shutil.which(exe) is None: raise SystemExit(f"找不到可执行文件: {exe}。请先安装并确保在 PATH 里。")def parse_groups_from_make_ndx_output(text: str) -> dict[str, int]: """ Parse group table printed by gmx make_ndx at startup. Typical lines: 13 SOL : 34008 atoms """ groups = {} for line in text.splitlines(): m = re.match(r"^\s*(\d+)\s+(\S+)\s*:\s*\d+\s+atoms", line) if m: gid = int(m.group(1)) gname = m.group(2) groups[gname] = gid return groupsdef get_group_id(gmx: str, gro_file: Path, group_name: str, workdir: Path) -> int: """ Get group id by running make_ndx once (quit immediately) and parsing stdout. """ tmp_ndx = workdir / "_tmp.ndx" out = run([gmx, "make_ndx", "-f", gro_file, "-o", tmp_ndx], cwd=workdir, stdin="q\n") try: groups = parse_groups_from_make_ndx_output(out) if group_name not in groups: raise KeyError return groups[group_name] finally: if tmp_ndx.exists(): tmp_ndx.unlink()def gro_read_atoms(gro: Path): lines = gro.read_text().splitlines() title = lines[0] natoms = int(lines[1].strip()) atoms = lines[2:2 + natoms] box = lines[2 + natoms] if len(lines) > 2 + natoms else "0 0 0" return title, atoms, boxdef gro_write(out: Path, title: str, atoms: list[str], box: str): out.write_text("\n".join([title, f"{len(atoms):5d}", *atoms, box]) + "\n")def merge_gro(protein_gro: Path, ligand_gro: Path, out_gro: Path): _, p_atoms, _ = gro_read_atoms(protein_gro) _, l_atoms, _ = gro_read_atoms(ligand_gro) # keep atom lines as-is, just concatenate; box will be reset by editconf later gro_write(out_gro, "Protein+Ligand", p_atoms + l_atoms, "0 0 0")def extract_protein_pdb_from_complex(complex_pdb: Path, protein_pdb: Path): """ Keep ATOM records only. This is a common way to strip ligand HETATM lines before pdb2gmx, because pdb2gmx needs residues known to the force field. """ out_lines = [] for line in complex_pdb.read_text().splitlines(): if line.startswith("ATOM"): out_lines.append(line) elif line.startswith("TER"): out_lines.append(line) out_lines.append("END") protein_pdb.write_text("\n".join(out_lines) + "\n")def detect_ligand_resname_from_gro(lig_gro: Path) -> str: _, atoms, _ = gro_read_atoms(lig_gro) if not atoms: raise RuntimeError("ligand gro 为空,无法识别配体 resname") # gro format: resid(5) resname(5) atomname(5) atomnr(5) x y z # resname in columns 6-10 (0-based 5:10) resname = atoms[0][5:10].strip() return resnamedef detect_moleculetype_name_from_itp(itp: Path) -> str: txt = itp.read_text().splitlines() for i, line in enumerate(txt): if line.strip().lower().startswith("[ moleculetype"): # next non-empty, non-comment line contains name for j in range(i+1, min(i+20, len(txt))): l = txt[j].strip() if not l or l.startswith(";") or l.startswith("#"): continue # first token return l.split()[0] raise RuntimeError("无法在 itp 中找到 [ moleculetype ] 名称")def patch_topology(protein_top: Path, ligand_itp: Path, posre_lig_itp: Path, out_top: Path, ligand_molname: str, ligand_count: int = 1): """ Insert ligand include + optional posre include, and add ligand to [ molecules ]. """ lines = protein_top.read_text().splitlines() # 1) include ligand itp near the end but before [ system ] or [ molecules ] insert_idx = None for i, l in enumerate(lines): if re.match(r"^\s*\[\s*system\s*\]\s*$", l, flags=re.I) or re.match(r"^\s*\[\s*molecules\s*\]\s*$", l, flags=re.I): insert_idx = i break if insert_idx is None: insert_idx = len(lines) include_block = [ "; --- Ligand topology ---", f'#include "{ligand_itp.name}"', "", "; --- Position restraints for ligand (enabled when -DPOSRES is used) ---", "#ifdef POSRES", f'#include "{posre_lig_itp.name}"', "#endif", "", ] lines = lines[:insert_idx] + include_block + lines[insert_idx:] # 2) add ligand to [ molecules ] mol_idx = None for i, l in enumerate(lines): if re.match(r"^\s*\[\s*molecules\s*\]\s*$", l, flags=re.I): mol_idx = i break if mol_idx is None: # append section if missing lines += ["", "[ molecules ]", "; Compound #mols"] mol_idx = len(lines) - 1 # find where molecules section ends (blank line or EOF or new section) end_idx = len(lines) for i in range(mol_idx + 1, len(lines)): if re.match(r"^\s*\[", lines[i]): end_idx = i break # insert ligand line near the top of molecules list (after comment lines) insert_mol_line_at = mol_idx + 1 while insert_mol_line_at < end_idx and (lines[insert_mol_line_at].strip().startswith(";") or lines[insert_mol_line_at].strip() == ""): insert_mol_line_at += 1 ligand_line = f"{ligand_molname:<16s}{ligand_count}" # avoid duplicates if not any(re.match(rf"^\s*{re.escape(ligand_molname)}\s+\d+", l) for l in lines[mol_idx:end_idx]): lines.insert(insert_mol_line_at, ligand_line) out_top.write_text("\n".join(lines) + "\n")def mdrun_gpu_args(gmx: str) -> list[str]: """ Conservative GPU flags; users can tune later. We try to include options that exist on modern GROMACS, but keep minimal. """ # Always safe baseline: args = ["-nb", "gpu"] # Try to detect whether -pme and -bonded and -update exist (by checking help output). help_txt = run([gmx, "mdrun", "-h"]) if " -pme " in help_txt: args += ["-pme", "gpu"] if " -bonded " in help_txt: args += ["-bonded", "gpu"] if " -update " in help_txt: args += ["-update", "gpu"] return args# -----------------------------# main pipeline# -----------------------------def main(): ap = argparse.ArgumentParser(description="GPU GROMACS protein-ligand MD pipeline (PDB+MOL2 -> full run)") ap.add_argument("--complex", required=True, type=Path, help="Docked complex PDB (protein+ligand coordinates)") ap.add_argument("--ligand", required=True, type=Path, help="Ligand MOL2 (docked pose coordinates)") ap.add_argument("--outdir", default=Path("work"), type=Path, help="Working directory") ap.add_argument("--gmx", default="gmx", help="GROMACS wrapper binary name/path (default: gmx)") ap.add_argument("--ff", default="amber99sb-ildn", help="Force field for pdb2gmx (AMBER recommended with ACPYPE)") ap.add_argument("--water", default="tip3p", help="Water model for pdb2gmx (e.g. tip3p, spc, spce)") ap.add_argument("--acpype", default="acpype", help="acpype executable name/path") ap.add_argument("--mdpdir", default=Path("mdp"), type=Path, help="Directory containing mdp files") ap.add_argument("--gpu", action="store_true", help="Use GPU flags for mdrun stages") ap.add_argument("--maxwarn", default="3", help="Maxwarn for grompp steps") ap.add_argument("--box_d", default="1.0", help="Box distance in nm (editconf -d)") ap.add_argument("--box_type", default="cubic", help="Box type (editconf -bt)") args = ap.parse_args() which_or_die(args.gmx) which_or_die(args.acpype) outdir: Path = args.outdir.resolve() outdir.mkdir(parents=True, exist_ok=True) # Copy inputs for provenance inputs_dir = outdir / "inputs" inputs_dir.mkdir(exist_ok=True) complex_pdb = inputs_dir / args.complex.name ligand_mol2 = inputs_dir / args.ligand.name shutil.copy2(args.complex, complex_pdb) shutil.copy2(args.ligand, ligand_mol2) mdpdir = args.mdpdir.resolve() required_mdps = [ "ref.ions.mdp", "ref.em_steep.mdp", "ref.em_cg.mdp", "ref.nvt.mdp", "ref.npt.mdp", "ref.md.mdp" ] for f in required_mdps: if not (mdpdir / f).exists(): raise SystemExit(f"缺少 mdp 文件: {mdpdir/f}") # ---------- A) Build protein topology ---------- protein_pdb = outdir / "protein_only.pdb" extract_protein_pdb_from_complex(complex_pdb, protein_pdb) # pdb2gmx: produce processed.gro + topol.top + posre.itp + (protein.itp) processed_gro = outdir / "protein.gro" protein_top = outdir / "topol.top" posre_itp = outdir / "posre.itp" run( [args.gmx, "pdb2gmx", "-f", protein_pdb, "-o", processed_gro, "-p", protein_top, "-i", posre_itp, "-ff", args.ff, "-water", args.water, "-ignh"], cwd=outdir, stdin=None ) # ---------- B) Ligand topology via ACPYPE ---------- lig_build = outdir / "ligand_acpype" lig_build.mkdir(exist_ok=True) # acpype output folder: <basename>.acpype lig_base = "LIG" run([args.acpype, "-i", ligand_mol2, "-b", lig_base, "-c", "bcc"], cwd=lig_build) acpype_dir = None for d in lig_build.iterdir(): if d.is_dir() and d.name.endswith(".acpype"): acpype_dir = d break if acpype_dir is None: raise RuntimeError("没找到 acpype 输出目录 (*.acpype),请检查 acpype 是否成功运行。") # Common acpype outputs: # LIG_GMX.itp, LIG_GMX.gro (names vary slightly by version) lig_itp = None lig_gro = None for f in acpype_dir.iterdir(): if f.suffix == ".itp" and "GMX" in f.name.upper(): lig_itp = f if f.suffix == ".gro" and "GMX" in f.name.upper(): lig_gro = f if lig_itp is None or lig_gro is None: raise RuntimeError("没找到 lig_GMX.itp / lig_GMX.gro(acpype 输出),请检查 acpype 版本/输出文件名。") # Copy ligand files to outdir root lig_itp2 = outdir / "lig.itp" lig_gro2 = outdir / "lig.gro" shutil.copy2(lig_itp, lig_itp2) shutil.copy2(lig_gro, lig_gro2) lig_resname = detect_ligand_resname_from_gro(lig_gro2) lig_molname = detect_moleculetype_name_from_itp(lig_itp2) print(f"[INFO] Ligand resname = {lig_resname}, moleculetype = {lig_molname}") # ---------- C) Merge coordinates & topology ---------- merge_gmx_gro = outdir / "merge_gmx.gro" merge_gro(processed_gro, lig_gro2, merge_gmx_gro) # ligand position restraint posre_lig = outdir / "posre_lig.itp" # choose "System" group from lig.gro (usually 0); parse robustly: lig_sys_gid = get_group_id(args.gmx, lig_gro2, "System", outdir) run([args.gmx, "genrestr", "-f", lig_gro2, "-o", posre_lig, "-fc", "1000", "1000", "1000"], cwd=outdir, stdin=f"{lig_sys_gid}\n" ) merge_top = outdir / "merge_topol.top" patch_topology(protein_top, lig_itp2, posre_lig, merge_top, ligand_molname=lig_molname, ligand_count=1) # ---------- D) Your 1~17 pipeline ---------- # 1) add box rep_newbox = outdir / "rep_newbox.gro" run([args.gmx, "editconf", "-f", merge_gmx_gro, "-o", rep_newbox, "-c", "-d", args.box_d, "-bt", args.box_type], cwd=outdir) # 2) solvate (use SPC216 by default here; if you prefer TIP3P box, adjust -cs) rep_solv = outdir / "rep_solv.gro" run([args.gmx, "solvate", "-cp", rep_newbox, "-cs", "spc216.gro", "-o", rep_solv, "-p", merge_top], cwd=outdir) # 3) grompp ions ions_tpr = outdir / "ions.tpr" run([args.gmx, "grompp", "-f", mdpdir / "ref.ions.mdp", "-c", rep_solv, "-p", merge_top, "-o", ions_tpr, "-maxwarn", str(args.maxwarn)], cwd=outdir) # 4) genion neutralize (choose SOL group) solv_ions = outdir / "solv_ions.gro" # Determine SOL group id from make_ndx on rep_solv.gro sol_gid = get_group_id(args.gmx, rep_solv, "SOL", outdir) run([args.gmx, "genion", "-s", ions_tpr, "-o", solv_ions, "-p", merge_top, "-pname", "NA", "-nname", "CL", "-neutral"], cwd=outdir, stdin=f"{sol_gid}\n" ) # 5) EM steep em_tpr = outdir / "em.tpr" run([args.gmx, "grompp", "-f", mdpdir / "ref.em_steep.mdp", "-c", solv_ions, "-p", merge_top, "-o", em_tpr, "-maxwarn", str(args.maxwarn)], cwd=outdir) mdrun_args = [args.gmx, "mdrun", "-v", "-deffnm", "em"] if args.gpu: mdrun_args += mdrun_gpu_args(args.gmx) run(mdrun_args, cwd=outdir) # 5b) EM cg em2_tpr = outdir / "em_cg.tpr" run([args.gmx, "grompp", "-f", mdpdir / "ref.em_cg.mdp", "-c", outdir / "em.gro", "-p", merge_top, "-o", em2_tpr, "-maxwarn", str(args.maxwarn)], cwd=outdir) mdrun_args = [args.gmx, "mdrun", "-v", "-deffnm", "em_cg"] if args.gpu: mdrun_args += mdrun_gpu_args(args.gmx) run(mdrun_args, cwd=outdir) # 6) index: Protein + Ligand group (non-interactive using make_ndx commands) # First, get current group ids from em_cg.gro em_gro = outdir / "em_cg.gro" out_ndx = outdir / "index.ndx" out0 = run([args.gmx, "make_ndx", "-f", em_gro, "-o", out_ndx], cwd=outdir, stdin="q\n") gmap = parse_groups_from_make_ndx_output(out0) if "Protein" not in gmap: raise RuntimeError("index groups 中找不到 Protein") protein_gid = gmap["Protein"] # ligand group in make_ndx is often the residue name (e.g. LIG/MOL/UNK) # We try by resname first: ligand_gid = gmap.get(lig_resname) if ligand_gid is None: # fallback: try "Other" ligand_gid = gmap.get("Other") if ligand_gid is None: raise RuntimeError(f"index groups 中找不到配体组(resname={lig_resname}),请检查配体是否并入坐标文件。") # now rebuild index with combined group via interactive commands # example: "1 | 13" then "q" cmds = f"{protein_gid} | {ligand_gid}\nq\n" run([args.gmx, "make_ndx", "-f", em_gro, "-o", out_ndx], cwd=outdir, stdin=cmds) # 7) NVT nvt_tpr = outdir / "nvt.tpr" run([args.gmx, "grompp", "-f", mdpdir / "ref.nvt.mdp", "-c", em_gro, "-r", em_gro, "-p", merge_top, "-n", out_ndx, "-o", nvt_tpr, "-maxwarn", str(args.maxwarn)], cwd=outdir) mdrun_args = [args.gmx, "mdrun", "-v", "-deffnm", "nvt"] if args.gpu: mdrun_args += mdrun_gpu_args(args.gmx) run(mdrun_args, cwd=outdir) # 8) NPT npt_tpr = outdir / "npt.tpr" run([args.gmx, "grompp", "-f", mdpdir / "ref.npt.mdp", "-c", outdir / "nvt.gro", "-r", outdir / "nvt.gro", "-t", outdir / "nvt.cpt", "-p", merge_top, "-n", out_ndx, "-o", npt_tpr, "-maxwarn", str(args.maxwarn)], cwd=outdir) mdrun_args = [args.gmx, "mdrun", "-v", "-deffnm", "npt"] if args.gpu: mdrun_args += mdrun_gpu_args(args.gmx) run(mdrun_args, cwd=outdir) # 9) Production MD md_tpr = outdir / "md_0_1.tpr" run([args.gmx, "grompp", "-f", mdpdir / "ref.md.mdp", "-c", outdir / "npt.gro", "-r", outdir / "npt.gro", "-t", outdir / "npt.cpt", "-p", merge_top, "-n", out_ndx, "-o", md_tpr, "-maxwarn", str(args.maxwarn)], cwd=outdir) mdrun_args = [args.gmx, "mdrun", "-v", "-deffnm", "md_0_1"] if args.gpu: mdrun_args += mdrun_gpu_args(args.gmx) run(mdrun_args, cwd=outdir) # 10) Trajectory fixing mdw_xtc = outdir / "mdw.xtc" md_xtc = outdir / "md.xtc" # choose System sys_gid = get_group_id(args.gmx, outdir / "md_0_1.gro", "System", outdir) run([args.gmx, "trjconv", "-s", md_tpr, "-f", outdir / "md_0_1.xtc", "-o", mdw_xtc, "-pbc", "whole"], cwd=outdir, stdin=f"{sys_gid}\n" ) # center: Protein as center, output System prot_gid = get_group_id(args.gmx, outdir / "md_0_1.gro", "Protein", outdir) run([args.gmx, "trjconv", "-s", md_tpr, "-f", mdw_xtc, "-o", md_xtc, "-pbc", "mol", "-center"], cwd=outdir, stdin=f"{prot_gid}\n{sys_gid}\n" ) # 11) RMSD backbone backbone_gid = get_group_id(args.gmx, outdir / "md_0_1.gro", "Backbone", outdir) run([args.gmx, "rms", "-s", md_tpr, "-f", md_xtc, "-o", outdir / "pic.rmsd.xvg"], cwd=outdir, stdin=f"{backbone_gid}\n{backbone_gid}\n" ) # 12) RMSD (protein+ligand) using index2 index2 = outdir / "index2.ndx" # create Protein+Ligand again for md_0_1.gro out1 = run([args.gmx, "make_ndx", "-f", outdir / "md_0_1.gro", "-o", index2], cwd=outdir, stdin="q\n") gmap2 = parse_groups_from_make_ndx_output(out1) p2 = gmap2.get("Protein") l2 = gmap2.get(lig_resname) or gmap2.get("Other") if p2 is None or l2 is None: raise RuntimeError("无法为 RMSD2 创建 Protein+Ligand 组,请检查配体 resname / groups。") run([args.gmx, "make_ndx", "-f", outdir / "md_0_1.gro", "-o", index2], cwd=outdir, stdin=f"{p2} | {l2}\nq\n" ) # we need group id of the newly created one; easiest: parse from second make_ndx output by listing groups out2 = run([args.gmx, "make_ndx", "-f", outdir / "md_0_1.gro", "-o", index2], cwd=outdir, stdin="q\n") gmap3 = parse_groups_from_make_ndx_output(out2) # The newly created group name is usually "Protein_<RES>" or something; fallback: take the max id if gmap3: protein_lig_gid = max(gmap3.values()) else: protein_lig_gid = p2 # fallback (won't be correct but avoids crash) run([args.gmx, "rms", "-s", md_tpr, "-f", md_xtc, "-n", index2, "-o", outdir / "pic.rmsd2.xvg"], cwd=outdir, stdin=f"{protein_lig_gid}\n{protein_lig_gid}\n" ) # 13) RMSF (protein) run([args.gmx, "rmsf", "-s", md_tpr, "-f", md_xtc, "-res", "-o", outdir / "pic.rmsf.xvg"], cwd=outdir, stdin=f"{prot_gid}\n" ) # 14) gyrate backbone run([args.gmx, "gyrate", "-s", md_tpr, "-f", md_xtc, "-o", outdir / "pic.gyrate.xvg"], cwd=outdir, stdin=f"{backbone_gid}\n" ) # 15) sasa protein run([args.gmx, "sasa", "-s", md_tpr, "-f", md_xtc, "-surface", "-o", outdir / "pic.area.xvg"], cwd=outdir, stdin=f"{prot_gid}\n" ) # 16) hbond protein vs ligand group # ligand group id (resname) from make_ndx on md_0_1.gro out_hb = run([args.gmx, "make_ndx", "-f", outdir / "md_0_1.gro", "-o", outdir / "_hb.ndx"], cwd=outdir, stdin="q\n") ghb = parse_groups_from_make_ndx_output(out_hb) lig_gid_hb = ghb.get(lig_resname) or ghb.get("Other") if lig_gid_hb is None: raise RuntimeError("hbond: 找不到配体 group") run([args.gmx, "hbond", "-s", md_tpr, "-f", md_xtc, "-num", outdir / "pic.hbnum.xvg"], cwd=outdir, stdin=f"{prot_gid}\n{lig_gid_hb}\n" ) # cleanup (outdir / "_hb.ndx").unlink(missing_ok=True) # 17) export frames for animation (remove water) # Using trjconv with index: choose Protein then non-water (try group name "non-Water") # If non-Water missing, user can edit. out_anim = outdir / "delWater.pdb" nonwater_gid = get_group_id(args.gmx, outdir / "md_0_1.gro", "non-Water", outdir) run([args.gmx, "trjconv", "-f", md_xtc, "-s", md_tpr, "-o", out_anim, "-pbc", "mol", "-n", out_ndx, "-dt", "100", "-center", "-ur", "compact"], cwd=outdir, stdin=f"{prot_gid}\n{nonwater_gid}\n" ) print("\n 全流程完成!输出都在:", outdir)if __name__ == "__main__": main()