一、高级路径操作
1. 目录和文件操作
from pathlib import Pathimport shutilimport osclass FileSystemOperations: """文件系统操作类""" def __init__(self, base_dir): self.base_dir = Path(base_dir) def create_directory(self, dir_path): """创建目录(自动创建父目录)""" full_path = self.base_dir / dir_path full_path.mkdir(parents=True, exist_ok=True) print(f"创建目录: {full_path}") return full_path def delete_directory(self, dir_path): """删除目录及其所有内容""" full_path = self.base_dir / dir_path if full_path.exists(): shutil.rmtree(full_path) print(f"删除目录: {full_path}") def create_file(self, file_path, content=""): """创建文件并写入内容""" full_path = self.base_dir / file_path # 确保父目录存在 full_path.parent.mkdir(parents=True, exist_ok=True) full_path.write_text(content, encoding='utf-8') print(f"创建文件: {full_path}") return full_path def delete_file(self, file_path): """删除文件""" full_path = self.base_dir / file_path if full_path.exists(): full_path.unlink() print(f"删除文件: {full_path}") def copy_file(self, source, destination): """复制文件""" src = self.base_dir / source dst = self.base_dir / destination dst.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(src, dst) print(f"复制文件: {src} -> {dst}") def move_file(self, source, destination): """移动/重命名文件""" src = self.base_dir / source dst = self.base_dir / destination dst.parent.mkdir(parents=True, exist_ok=True) shutil.move(src, dst) print(f"移动文件: {src} -> {dst}")# 使用示例# fs = FileSystemOperations("./myproject")# fs.create_directory("data/raw")# fs.create_file("data/raw/sample.txt", "Hello World")# fs.copy_file("data/raw/sample.txt", "backup/sample.txt")
2. 文件搜索和遍历
from pathlib import Pathfrom datetime import datetimeclass FileSearcher: """文件搜索器""" def __init__(self, root_dir): self.root_dir = Path(root_dir) def find_by_pattern(self, pattern="*"): """按通配符模式查找文件(非递归)""" return list(self.root_dir.glob(pattern)) def find_recursive(self, pattern="*"): """递归查找文件""" return list(self.root_dir.rglob(pattern)) def find_by_extension(self, extension): """按扩展名查找文件""" return list(self.root_dir.rglob(f"*.{extension}")) def find_by_name(self, filename): """按文件名查找""" return list(self.root_dir.rglob(filename)) def find_recent_files(self, days=7): """查找最近修改的文件""" recent_files = [] cutoff_time = datetime.now().timestamp() - (days * 24 * 3600) for file_path in self.root_dir.rglob("*"): if file_path.is_file(): if file_path.stat().st_mtime > cutoff_time: recent_files.append(file_path) return recent_files def find_large_files(self, min_size_mb=10): """查找大文件""" large_files = [] min_size = min_size_mb * 1024 * 1024 for file_path in self.root_dir.rglob("*"): if file_path.is_file(): if file_path.stat().st_size > min_size: large_files.append((file_path, file_path.stat().st_size)) return large_files def get_directory_tree(self, max_depth=None): """获取目录树结构""" tree = [] def _build_tree(current_dir, depth=0): if max_depth is not None and depth > max_depth: return indent = " " * depth tree.append(f"{indent}📁 {current_dir.name}/") # 先添加文件 for file_path in sorted(current_dir.glob("*")): if file_path.is_file(): tree.append(f"{indent} 📄 {file_path.name}") # 再递归处理子目录 for sub_dir in sorted(current_dir.glob("*")): if sub_dir.is_dir(): _build_tree(sub_dir, depth + 1) _build_tree(self.root_dir) return "\n".join(tree)# 使用示例# searcher = FileSearcher("./project")# # # 查找所有 Python 文件# py_files = searcher.find_by_extension("py")# print(f"找到 {len(py_files)} 个 Python 文件")# # # 查找最近修改的文件# recent = searcher.find_recent_files(days=3)# print(f"最近3天修改的文件: {len(recent)} 个")# # # 显示目录树# print(searcher.get_directory_tree(max_depth=3))
二、路径安全与最佳实践
1. 防止路径遍历攻击
from pathlib import Pathclass SecurePathHandler: """安全的路径处理器,防止路径遍历攻击""" @staticmethod def safe_join(base_dir, user_path): """ 安全地拼接路径,防止路径遍历攻击 Args: base_dir: 基础目录(允许访问的根目录) user_path: 用户提供的路径 Returns: 安全的完整路径 """ base = Path(base_dir).resolve() user = Path(user_path) # 尝试拼接并解析 full_path = (base / user).resolve() # 安全检查:确保路径仍在基础目录内 if not str(full_path).startswith(str(base)): raise ValueError(f"路径访问被拒绝: {user_path} 不在允许的目录内") return full_path @staticmethod def safe_read(base_dir, user_path): """安全读取文件""" try: safe_path = SecurePathHandler.safe_join(base_dir, user_path) if not safe_path.is_file(): raise FileNotFoundError(f"文件不存在: {user_path}") return safe_path.read_text(encoding='utf-8') except ValueError as e: raise PermissionError(f"拒绝访问: {e}") @staticmethod def safe_write(base_dir, user_path, content): """安全写入文件""" try: safe_path = SecurePathHandler.safe_join(base_dir, user_path) safe_path.parent.mkdir(parents=True, exist_ok=True) safe_path.write_text(content, encoding='utf-8') return safe_path except ValueError as e: raise PermissionError(f"拒绝写入: {e}")# 使用示例base = "/home/user/data"try: # 正常访问 path1 = SecurePathHandler.safe_join(base, "docs/file.txt") print(f"安全路径: {path1}") # 路径遍历攻击尝试 path2 = SecurePathHandler.safe_join(base, "../../etc/passwd") print(f"恶意路径: {path2}")except ValueError as e: print(f"安全拦截: {e}")
2. 临时文件处理
import tempfilefrom pathlib import Pathclass TempFileManager: """临时文件管理器""" @staticmethod def create_temp_file(suffix=".txt", prefix="temp_"): """创建临时文件""" # 创建临时文件(自动删除) with tempfile.NamedTemporaryFile( suffix=suffix, prefix=prefix, delete=False ) as tmp: tmp_path = Path(tmp.name) print(f"创建临时文件: {tmp_path}") return tmp_path @staticmethod def create_temp_dir(): """创建临时目录""" with tempfile.TemporaryDirectory() as tmp_dir: tmp_path = Path(tmp_dir) print(f"创建临时目录: {tmp_path}") # 注意:退出上下文后目录会被删除 return tmp_path @staticmethod def temp_file_context(): """使用上下文的临时文件""" @contextmanager def temp_file(suffix=".txt"): """临时文件上下文管理器""" tmp = None try: fd, path = tempfile.mkstemp(suffix=suffix) os.close(fd) tmp = Path(path) yield tmp finally: if tmp and tmp.exists(): tmp.unlink() return temp_file# 使用示例# with TempFileManager.temp_file_context() as tmp:# tmp.write_text("临时内容")# print(f"临时文件: {tmp}")# # 退出上下文后文件被自动删除
3. 符号链接处理
from pathlib import Pathclass SymlinkHandler: """符号链接处理器""" @staticmethod def create_symlink(target, link_name): """创建符号链接(跨平台)""" target_path = Path(target).resolve() link_path = Path(link_name) # 如果链接已存在,先删除 if link_path.exists(): link_path.unlink() # 创建符号链接 link_path.symlink_to(target_path) print(f"创建符号链接: {link_path} -> {target_path}") return link_path @staticmethod def resolve_link(link_path): """解析符号链接""" path = Path(link_path) if path.is_symlink(): real_path = path.resolve() print(f"链接: {path} -> {real_path}") return real_path else: print(f"不是符号链接: {path}") return path @staticmethod def find_symlinks(root_dir): """递归查找目录下的所有符号链接""" root = Path(root_dir) symlinks = [] for item in root.rglob("*"): if item.is_symlink(): symlinks.append({ 'link': item, 'target': item.resolve(), 'is_broken': not item.exists() }) return symlinks# 使用示例# # 创建符号链接# SymlinkHandler.create_symlink("/path/to/target", "./my_link")# # # 解析符号链接# real = SymlinkHandler.resolve_link("./my_link")# # # 查找所有符号链接# links = SymlinkHandler.find_symlinks("./project")
三、实际应用示例
1. 项目路径配置管理
from pathlib import Pathfrom typing import Optionalimport jsonclass ProjectPaths: """项目路径配置管理""" def __init__(self, root_dir: Optional[Path] = None): # 获取项目根目录 if root_dir is None: # 假设当前文件在项目根目录下 root_dir = Path(__file__).parent self.root = Path(root_dir).resolve() self._ensure_directories() def _ensure_directories(self): """确保所有必要的目录存在""" for dir_name in ['data', 'logs', 'output', 'temp', 'config']: (self.root / dir_name).mkdir(parents=True, exist_ok=True) @property def data_dir(self): return self.root / "data" @property def logs_dir(self): return self.root / "logs" @property def output_dir(self): return self.root / "output" @property def temp_dir(self): return self.root / "temp" @property def config_dir(self): return self.root / "config" def get_data_file(self, filename): """获取数据文件路径""" return self.data_dir / filename def get_log_file(self, name): """获取日志文件路径""" from datetime import datetime timestamp = datetime.now().strftime("%Y%m%d") return self.logs_dir / f"{name}_{timestamp}.log" def get_output_file(self, filename): """获取输出文件路径""" return self.output_dir / filename def save_config(self, config_name, data): """保存配置文件""" config_path = self.config_dir / f"{config_name}.json" with open(config_path, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) return config_path def load_config(self, config_name): """加载配置文件""" config_path = self.config_dir / f"{config_name}.json" if not config_path.exists(): raise FileNotFoundError(f"配置文件不存在: {config_path}") with open(config_path, 'r', encoding='utf-8') as f: return json.load(f)# 使用示例# paths = ProjectPaths()# # # 获取各种路径# print(f"项目根目录: {paths.root}")# print(f"数据目录: {paths.data_dir}")# print(f"日志目录: {paths.logs_dir}")# # # 保存配置# paths.save_config("settings", {"debug": True, "timeout": 30})# # # 加载配置# config = paths.load_config("settings")# print(config)
2. 批量文件处理器
from pathlib import Pathimport shutilfrom datetime import datetimeimport hashlibclass BatchFileProcessor: """批量文件处理器""" def __init__(self, input_dir, output_dir): self.input_dir = Path(input_dir) self.output_dir = Path(output_dir) self.output_dir.mkdir(parents=True, exist_ok=True) def find_files(self, pattern="*"): """查找所有匹配的文件""" return list(self.input_dir.rglob(pattern)) def organize_by_date(self): """按创建日期组织文件""" for file_path in self.find_files("*"): if file_path.is_file(): # 获取修改时间 mtime = datetime.fromtimestamp(file_path.stat().st_mtime) # 创建目标目录(年/月) target_dir = self.output_dir / str(mtime.year) / str(mtime.month) target_dir.mkdir(parents=True, exist_ok=True) # 移动文件 target_path = target_dir / file_path.name shutil.move(str(file_path), str(target_path)) print(f"移动: {file_path} -> {target_path}") def organize_by_extension(self): """按扩展名组织文件""" for file_path in self.find_files("*"): if file_path.is_file(): ext = file_path.suffix[1:] if file_path.suffix else "no_extension" target_dir = self.output_dir / ext target_dir.mkdir(parents=True, exist_ok=True) target_path = target_dir / file_path.name shutil.move(str(file_path), str(target_path)) print(f"移动: {file_path} -> {target_path}") def rename_files(self, pattern, replacement): """批量重命名文件""" for file_path in self.find_files(pattern): if file_path.is_file(): new_name = file_path.name.replace(pattern, replacement) new_path = file_path.parent / new_name file_path.rename(new_path) print(f"重命名: {file_path.name} -> {new_name}") def calculate_checksums(self): """计算所有文件的校验和""" checksums = {} for file_path in self.find_files("*"): if file_path.is_file(): sha256 = hashlib.sha256() sha256.update(file_path.read_bytes()) checksums[str(file_path)] = sha256.hexdigest() return checksums def find_duplicates(self): """查找重复文件(基于内容)""" from collections import defaultdict # 按文件大小分组 size_map = defaultdict(list) for file_path in self.find_files("*"): if file_path.is_file(): size_map[file_path.stat().st_size].append(file_path) # 在相同大小的文件中比较内容 duplicates = [] for size, files in size_map.items(): if len(files) > 1: # 计算每个文件的哈希 hash_map = defaultdict(list) for f in files: sha256 = hashlib.sha256() sha256.update(f.read_bytes()) hash_map[sha256.hexdigest()].append(f) # 找出重复的 for hash_val, duplicate_files in hash_map.items(): if len(duplicate_files) > 1: duplicates.append({ 'hash': hash_val, 'size': size, 'files': duplicate_files }) return duplicates# 使用示例# processor = BatchFileProcessor("./raw_data", "./organized_data")# # # 按扩展名组织文件# processor.organize_by_extension()# # # 批量重命名# processor.rename_files("old_", "new_")# # # 查找重复文件# duplicates = processor.find_duplicates()# for dup in duplicates:# print(f"重复文件 (大小: {dup['size']} 字节):")# for f in dup['files']:# print(f" - {f}")
3. 日志文件轮转
from pathlib import Pathfrom datetime import datetimeimport gzipimport shutilclass LogRotator: """日志文件轮转器""" def __init__(self, log_dir, max_size_mb=10, backup_count=5): self.log_dir = Path(log_dir) self.max_size = max_size_mb * 1024 * 1024 self.backup_count = backup_count self.log_dir.mkdir(parents=True, exist_ok=True) def rotate_log(self, log_file): """轮转单个日志文件""" log_path = self.log_dir / log_file if not log_path.exists(): return # 检查文件大小 if log_path.stat().st_size < self.max_size: return # 删除最旧的备份 oldest = self.log_dir / f"{log_file}.{self.backup_count}.gz" if oldest.exists(): oldest.unlink() # 轮转备份文件 for i in range(self.backup_count - 1, 0, -1): old = self.log_dir / f"{log_file}.{i}.gz" new = self.log_dir / f"{log_file}.{i+1}.gz" if old.exists(): old.rename(new) # 压缩当前日志 current = log_path backup = self.log_dir / f"{log_file}.1.gz" with open(current, 'rb') as f_in: with gzip.open(backup, 'wb') as f_out: shutil.copyfileobj(f_in, f_out) # 清空当前日志 current.write_text("") print(f"日志轮转: {log_file} -> {backup.name}") def rotate_all(self): """轮转所有日志文件""" for log_file in self.log_dir.glob("*.log"): self.rotate_log(log_file.name) def clean_old_logs(self, days=30): """清理旧日志""" cutoff = datetime.now().timestamp() - (days * 24 * 3600) for log_file in self.log_dir.glob("*"): if log_file.stat().st_mtime < cutoff: log_file.unlink() print(f"删除旧日志: {log_file}")# 使用示例# rotator = LogRotator("./logs", max_size_mb=5, backup_count=3)# rotator.rotate_all()# rotator.clean_old_logs(days=7)
四、pathlib vs os.path 对比
| | |
|---|
| 路径拼接 | os.path.join('a', 'b') | Path('a') / 'b' |
| 获取目录 | os.path.dirname(path) | Path(path).parent |
| 获取文件名 | os.path.basename(path) | Path(path).name |
| 获取扩展名 | os.path.splitext(path)[1] | Path(path).suffix |
| 检查存在 | os.path.exists(path) | Path(path).exists() |
| 绝对路径 | os.path.abspath(path) | Path(path).resolve() |
| 遍历目录 | os.walk() | Path().rglob() |
五、总结与最佳实践
路径操作核心要点
| |
|---|
| 使用 pathlib | |
| 避免硬编码路径 | |
| 跨平台兼容 | |
| 安全第一 | |
| 路径规范化 | |
最佳实践示例
from pathlib import Pathfrom typing import Unionimport osclass PathBestPractices: """路径操作最佳实践""" @staticmethod def get_project_root() -> Path: """获取项目根目录""" return Path(__file__).parent.parent @staticmethod def ensure_dir(path: Union[str, Path]) -> Path: """确保目录存在""" path = Path(path) path.mkdir(parents=True, exist_ok=True) return path @staticmethod def get_relative_path(path: Union[str, Path]) -> Path: """获取相对于项目根目录的路径""" root = PathBestPractices.get_project_root() return Path(path).resolve().relative_to(root) @staticmethod def safe_file_operation(file_path: Union[str, Path], mode: str = 'r'): """安全的文件操作""" path = Path(file_path) # 确保父目录存在 if 'w' in mode or 'a' in mode: path.parent.mkdir(parents=True, exist_ok=True) # 检查读取权限 if 'r' in mode and not path.exists(): raise FileNotFoundError(f"文件不存在: {path}") return open(path, mode, encoding='utf-8')# 使用示例root = PathBestPractices.get_project_root()print(f"项目根目录: {root}")# 创建目录结构PathBestPractices.ensure_dir(root / "data" / "raw")PathBestPractices.ensure_dir(root / "data" / "processed")PathBestPractices.ensure_dir(root / "output")# 安全文件操作with PathBestPractices.safe_file_operation(root / "output" / "result.txt", 'w') as f: f.write("Hello World")
📝 学习检查清单