import osimport refrom pathlib import Pathfrom typing import List, Tupleimport shutilfrom datetime import datetimeclass SASFileProcessor: """SAS文件处理器,用于批量替换文件中的代码""" def __init__(self, root_dir: str, backup: bool = True): """ 初始化处理器 Args: root_dir: 要处理的根目录 backup: 是否在修改前创建备份 """ self.root_dir = Path(root_dir) self.backup = backup self.processed_files = [] self.modified_files = [] self.errors = [] # 定义替换规则 self.replacements = [ # 规则1:替换 PGM 为 *this is a test ( r'PGM;', '*this is a test;' ), # 规则2:替换 %macro cc; 为 %macro zz; ( r'%macro cc;', '%macro zz;' ) ] def find_sas_files(self) -> List[Path]: """查找所有.sas文件""" sas_files = list(self.root_dir.rglob("*.sas")) print(f"找到 {len(sas_files)} 个SAS文件") return sas_files def create_backup(self, file_path: Path) -> Path: """创建文件备份""" backup_dir = self.root_dir / "backup" / datetime.now().strftime("%Y%m%d_%H%M%S") backup_dir.mkdir(parents=True, exist_ok=True) # 保持相对路径结构 relative_path = file_path.relative_to(self.root_dir) backup_path = backup_dir / relative_path backup_path.parent.mkdir(parents=True, exist_ok=True) # 复制文件 shutil.copy2(file_path, backup_path) return backup_path def process_file(self, file_path: Path) -> Tuple[bool, str]: """ 处理单个SAS文件 Returns: (是否修改, 修改后的内容或错误信息) """ try: # 读取文件内容 with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() original_content = content # 应用所有替换规则 for pattern, replacement in self.replacements: content = re.sub(pattern, replacement, content, flags=re.IGNORECASE | re.DOTALL) # 检查内容是否被修改 if content != original_content: # 创建备份(如果需要) if self.backup: backup_path = self.create_backup(file_path) print(f"已备份: {backup_path}") # 写回文件 with open(file_path, 'w', encoding='utf-8') as f: f.write(content) return True, "已修改" else: return False, "无需修改" except Exception as e: return False, f"错误: {str(e)}" def preview_changes(self, file_path: Path, num_lines: int = 5): """ 预览文件中的修改(不实际修改文件) """ try: with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() print(f"\n预览文件: {file_path}") print("-" * 60) # 查找匹配的行 lines = content.split('\n') for i, line in enumerate(lines, 1): for pattern, replacement in self.replacements: # 简化模式用于预览 simple_pattern = pattern.replace(r'\s*', ' ').replace(r'\(', '(').replace(r'\)', ')') if re.search(pattern.replace(r'\s*', '.*?'), line, re.IGNORECASE): print(f"行 {i}: {line.strip()}") print(f"将替换为: {replacement}") print() print("-" * 60) except Exception as e: print(f"预览错误: {e}") def run(self, preview_only: bool = False): """ 运行批量处理 Args: preview_only: 是否只预览不修改 """ print(f"开始处理目录: {self.root_dir}") print(f"模式: {'预览模式'if preview_only else'修改模式'}") print("=" * 60) # 查找所有SAS文件 sas_files = self.find_sas_files() if not sas_files: print("没有找到SAS文件") return # 处理每个文件 for file_path in sas_files: self.processed_files.append(file_path) if preview_only: self.preview_changes(file_path) else: modified, message = self.process_file(file_path) if modified: self.modified_files.append(file_path) print(f"✓ {file_path.name}: {message}") else: print(f" {file_path.name}: {message}") # 输出统计信息 self.print_summary(preview_only) def print_summary(self, preview_only: bool): """输出处理统计""" print("\n" + "=" * 60) print("处理完成!") print(f"总文件数: {len(self.processed_files)}") if not preview_only: print(f"修改文件数: {len(self.modified_files)}") print(f"错误文件数: {len(self.errors)}") if self.modified_files: print("\n已修改的文件:") for file in self.modified_files: print(f" - {file.relative_to(self.root_dir)}") if self.errors: print("\n处理错误的文件:") for error in self.errors: print(f" - {error}") if self.backup and not preview_only: backup_dir = self.root_dir / "backup" if backup_dir.exists(): print(f"\n备份文件保存在: {backup_dir}")def main(): """主函数""" import argparse # 设置命令行参数 parser = argparse.ArgumentParser(description='批量处理SAS文件中的代码替换') parser.add_argument('directory', nargs='?', default='.', help='要处理的目录路径 (默认: 当前目录)') parser.add_argument('--preview', '-p', action='store_true', help='预览模式,不实际修改文件') parser.add_argument('--no-backup', '-nb', action='store_true', help='不创建备份') parser.add_argument('--file', '-f', help='只处理指定的单个文件') args = parser.parse_args() # 创建处理器 processor = SASFileProcessor( root_dir=args.directory, backup=not args.no_backup ) # 如果指定了单个文件 if args.file: file_path = Path(args.directory) / args.file if file_path.exists(): if args.preview: processor.preview_changes(file_path) else: modified, message = processor.process_file(file_path) print(f"{file_path.name}: {message}") else: print(f"文件不存在: {file_path}") else: # 批量处理 processor.run(preview_only=args.preview) # 更简单的版本 - 如果你不需要复杂的功能def simple_sas_replace(): """简单的SAS文件替换函数""" import os # 设置要处理的目录 target_dir = "." # 当前目录 # 定义替换规则 replacements = [ # (模式, 替换文本) (r'pgm;', '*this is a test;'), (r'%macro cc;', '%macro zz') ] # 遍历所有.sas文件 for root, dirs, files in os.walk(target_dir): for file in files: if file.endswith('.sas'): file_path = os.path.join(root, file) try: # 读取文件 with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() original = content # 应用替换 for pattern, replacement in replacements: content = re.sub(pattern, replacement, content, flags=re.IGNORECASE | re.DOTALL) # 如果内容有变化,写回文件 if content != original: # 创建备份 backup_path = file_path + '.bak' with open(backup_path, 'w', encoding='utf-8') as f: f.write(original) # 写入新内容 with open(file_path, 'w', encoding='utf-8') as f: f.write(content) print(f"已修改: {file_path}") except Exception as e: print(f"处理文件出错 {file_path}: {e}")if __name__ == "__main__": # 使用命令行版本 # main() # 或者直接运行简单版本 # simple_sas_replace() # 示例:预览模式 processor = SASFileProcessor(root_dir="你的文件位置") processor.run(preview_only=True) # 示例:实际修改(请谨慎使用!) # processor = SASFileProcessor(root_dir=".") # processor.run(preview_only=False)