#!/usr/bin/env python3"""Split abundance table by groups in metadata.For each group column, subsets are saved in a subdirectory named after the column.Missing samples (abundance-only or empty group values) are logged."""import osimport argparseimport pandas as pdimport sysdef parse_sep(sep_str): """Convert 'tsv' or 'csv' to actual separator.""" if sep_str.lower() == 'tsv': return '\t' elif sep_str.lower() == 'csv': return ',' else: raise ValueError(f"Unsupported separator: {sep_str}. Use 'tsv' or 'csv'.")def read_table(file_path, sep, layout): """Read abundance table, optionally convert to numeric.""" df = pd.read_csv(file_path, sep=sep, index_col=0) # Warn about non‑numeric values and convert them to 0 numeric_df = df.apply(pd.to_numeric, errors='coerce').fillna(0) non_numeric_cols = [col for col in df.columns if not df[col].apply(pd.to_numeric, errors='coerce').notna().all()] if non_numeric_cols: print(f"Warning: Columns with non‑numeric values: {non_numeric_cols}") print(" Non‑numeric values have been converted to 0.") if layout == 'features_samples': numeric_df.columns = numeric_df.columns.astype(str) elif layout == 'samples_features': numeric_df.index = numeric_df.index.astype(str) else: raise ValueError(f"Invalid layout: {layout}. Choose 'features_samples' or 'samples_features'.") return numeric_dfdef remove_zero_rows_columns(df, axis): """Remove rows or columns that sum to zero.""" if axis == 0: row_sums = df.sum(axis=1) keep = row_sums != 0 return df.loc[keep], keep elif axis == 1: col_sums = df.sum(axis=0) keep = col_sums != 0 return df.loc[:, keep], keep else: raise ValueError("axis must be 0 (rows) or 1 (columns)")def safe_filename(value): """Convert a group value to a safe filename component.""" return str(value).replace('/', '_').replace('\\', '_').replace(' ', '_')def main(): parser = argparse.ArgumentParser( description="Split abundance table by groups in metadata. For each group column, " "subsets are saved in a subdirectory named after the column. " "Missing samples (abundance-only or empty group values) are logged but not saved.", formatter_class=argparse.RawDescriptionHelpFormatter, epilog="""Examples: # Split by a single group column python split_abundance.py -i abundance.tsv -m metadata.tsv -g group1 -o ./output # Split by two independent group columns # Results will be in ./output/group1/ and ./output/group2/ python split_abundance.py -i abundance.csv -m metadata.csv -g group1,group2 \\ --layout samples_features --output_sep csv # Remove zero rows/columns python split_abundance.py -i abundance.tsv -m metadata.csv -g Condition \\ --meta_sep csv --zero_remove """ ) parser.add_argument('-i', '--input', required=True, help="Path to abundance table file.") parser.add_argument('-m', '--metadata', required=True, help="Path to metadata file (first column = sample names).") parser.add_argument('-s', '--sep', default='tsv', choices=['tsv', 'csv'], help="Separator for abundance table (tsv or csv). Default: tsv") parser.add_argument('--meta_sep', default=None, choices=['tsv', 'csv'], help="Separator for metadata file. Default: same as --sep.") parser.add_argument('--output_sep', default=None, choices=['tsv', 'csv'], help="Separator for output files. Default: same as input abundance separator.") parser.add_argument('--layout', default='features_samples', choices=['features_samples', 'samples_features'], help="Layout of input abundance table. Default: features_samples") parser.add_argument('--output_layout', default=None, choices=['features_samples', 'samples_features'], help="Layout for output files. Default: same as input.") parser.add_argument('-g', '--groups', required=True, help="Comma-separated column names in metadata to split by. " "Each column is processed independently.") parser.add_argument('-o', '--output_dir', default='.', help="Base output directory. Subdirectories named after group columns are created inside.") parser.add_argument('--prefix', default='', help="Prefix for output filenames (optional).") parser.add_argument('--zero_remove', action='store_true', help="Remove rows (features) and columns (samples) that sum to zero.") args = parser.parse_args() # Separators abund_sep = parse_sep(args.sep) meta_sep = parse_sep(args.meta_sep) if args.meta_sep else abund_sep out_sep = parse_sep(args.output_sep) if args.output_sep else abund_sep out_ext = 'tsv' if out_sep == '\t' else 'csv' out_layout = args.output_layout if args.output_layout else args.layout group_cols = [g.strip() for g in args.groups.split(',')] if not group_cols: parser.error("At least one group column must be specified.") os.makedirs(args.output_dir, exist_ok=True) # Read abundance table print(f"Reading abundance table from {args.input} ...") abund = read_table(args.input, abund_sep, args.layout) print(f"Abundance table shape: {abund.shape}") # Read metadata print(f"Reading metadata from {args.metadata} ...") meta = pd.read_csv(args.metadata, sep=meta_sep, index_col=0) meta.index = meta.index.astype(str) # Check group columns exist missing_cols = [c for c in group_cols if c not in meta.columns] if missing_cols: parser.error(f"Group column(s) not found in metadata: {missing_cols}") # Get sample names from abundance if args.layout == 'features_samples': abund_samples = set(abund.columns) abund_is_samples_columns = True else: abund_samples = set(abund.index) abund_is_samples_columns = False meta_samples = set(meta.index) # Determine common samples and abundance-only samples common_samples = abund_samples & meta_samples abund_only = abund_samples - common_samples meta_only = meta_samples - common_samples # Print sample matching statistics print(f"\nSample matching:") print(f" Abundance table samples: {len(abund_samples)}") print(f" Metadata samples: {len(meta_samples)}") print(f" Common samples: {len(common_samples)}") if abund_only: print(f" Samples only in abundance (will be logged as missing): {len(abund_only)}") if meta_only: print(f" Samples only in metadata (will be ignored): {len(meta_only)}") if len(common_samples) == 0: print("\nERROR: No common samples between abundance table and metadata. Cannot proceed.") sys.exit(1) # Subset abundance and metadata to common samples if abund_is_samples_columns: abund = abund.loc[:, list(common_samples)] else: abund = abund.loc[list(common_samples), :] meta = meta.loc[list(common_samples)] # Convert abund_only to list for easy membership test abund_only_list = list(abund_only) # Process each group column independently for group_col in group_cols: print(f"\nProcessing group column: {group_col}") col_dir = os.path.join(args.output_dir, group_col) os.makedirs(col_dir, exist_ok=True) # Detect missing samples in metadata (common samples) for this column missing_mask = meta[group_col].isna() | (meta[group_col] == '') missing_samples_meta = meta.index[missing_mask].tolist() if missing_mask.any() else [] # Combine all missing samples: abundance-only + metadata missing all_missing = set(abund_only_list) | set(missing_samples_meta) if all_missing: print(f" Missing samples (no file generated):") for samp in sorted(all_missing): if samp in abund_only_list: reason = "Sample not found in metadata" else: reason = f"Value for '{group_col}' is missing (empty or NaN)" print(f" {samp}: {reason}") # Normal samples: common samples with non-missing value in this column valid_mask = ~missing_mask valid_samples = meta.index[valid_mask].tolist() if not valid_samples: print(f" No valid samples found for column '{group_col}'. Skipping.") continue # Subset abundance to valid samples if abund_is_samples_columns: abund_valid = abund.loc[:, valid_samples] else: abund_valid = abund.loc[valid_samples, :] # Unique group values among valid samples group_vals = meta.loc[valid_samples, group_col].astype(str) unique_vals = group_vals.unique() print(f" Found {len(unique_vals)} unique groups.") for val in unique_vals: val_str = safe_filename(val) # Select samples for this value mask = group_vals == val group_samples = group_vals.index[mask].tolist() # Extract subset if abund_is_samples_columns: sub = abund_valid.loc[:, group_samples].copy() else: sub = abund_valid.loc[group_samples, :].copy() if args.zero_remove: sub, _ = remove_zero_rows_columns(sub, axis=0) sub, _ = remove_zero_rows_columns(sub, axis=1) if sub.empty: print(f" Group '{val}': Subtable empty after zero removal, skipping.") continue if args.layout != out_layout: sub = sub.T out_filename = f"{args.prefix}{val_str}.{out_ext}" out_path = os.path.join(col_dir, out_filename) sub.to_csv(out_path, sep=out_sep) print(f" Saved {out_path} ({sub.shape[0]} rows, {sub.shape[1]} columns)") print("\nDone.")if __name__ == "__main__": main()