前言
文件操作是程序与外部世界交互的基础,而异常处理则是保证程序健壮性的关键机制。本文将深入讲解Python中的文件读写、路径操作和异常处理,从基础用法到底层原理,帮助你编写出既功能完善又稳定可靠的代码。
一、文件读写基础
1.1 打开文件
Python使用内置的open()函数打开文件:
# 基础语法file = open('data.txt', 'r', encoding='utf-8')content = file.read()file.close()# 模式说明# 'r' - 读取(默认)# 'w' - 写入(覆盖)# 'a' - 追加# 'x' - 独占创建(文件已存在则失败)# 'b' - 二进制模式# '+' - 读写模式# 常用组合# 'rb' - 读取二进制文件# 'w+' - 读写(覆盖)# 'a+' - 读写(追加)
1.2 读取文件
# 方法1: 一次性读取全部内容with open('data.txt', 'r', encoding='utf-8') as f: content = f.read() print(content)# 方法2: 按行读取(适合大文件)with open('data.txt', 'r', encoding='utf-8') as f: for line in f: print(line.strip()) # strip()去除换行符# 方法3: 读取所有行到列表with open('data.txt', 'r', encoding='utf-8') as f: lines = f.readlines()# 方法4: 读取指定字节数(二进制文件)with open('image.png', 'rb') as f: chunk = f.read(1024) # 读取1024字节
1.3 写入文件
# 写入文本data = ["第一行", "第二行", "第三行"]with open('output.txt', 'w', encoding='utf-8') as f: for line in data: f.write(line + '\n')# 使用writelines(不会自动添加换行)with open('output.txt', 'w', encoding='utf-8') as f: f.writelines(line + '\n' for line in data)# 追加模式with open('log.txt', 'a', encoding='utf-8') as f: f.write("新的日志条目\n")
二、with上下文管理器
2.1 为什么需要with语句
传统文件操作的问题:
# ❌ 容易忘记关闭文件f = open('data.txt', 'r')data = f.read()# 如果这里发生异常,文件永远不会关闭!f.close()# ❌ 即使使用try-finally也很冗长f = open('data.txt', 'r')try: data = f.read()finally: f.close()
2.2 with语句的优势
# ✅ 简洁且安全withopen('data.txt', 'r', encoding='utf-8') as f: data = f.read()# 文件自动关闭,即使在with块中发生异常
2.3 上下文管理协议
with语句的背后是上下文管理协议,需要实现两个特殊方法:
class ManagedFile: """自定义上下文管理器""" def __init__(self, filename, mode='r'): self.filename = filename self.mode = mode self.file = None def __enter__(self): """进入上下文时调用""" print(f"打开文件: {self.filename}") self.file = open(self.filename, self.mode) return self.file def __exit__(self, exc_type, exc_val, exc_tb): """退出上下文时调用""" if self.file: self.file.close() print(f"关闭文件: {self.filename}") # 返回True表示异常已处理,不再传播 # 返回False或不返回,异常会继续传播 if exc_type: print(f"发生异常: {exc_val}") return False# 使用自定义上下文管理器with ManagedFile('test.txt', 'w') as f: f.write("Hello, World!")
2.4 contextlib简化实现
from contextlib import contextmanager@contextmanagerdef managed_file(filename, mode='r'): """使用装饰器简化上下文管理器""" f = open(filename, mode) try: yield f finally: f.close()# 使用with managed_file('test.txt', 'w') as f: f.write("Hello!")# 更实用的例子:计时上下文管理器import timefrom contextlib import contextmanager@contextmanagerdef timer(name="操作"): start = time.time() yield elapsed = time.time() - start print(f"{name} 耗时: {elapsed:.4f}秒")with timer("数据处理"): time.sleep(1)
三、路径操作:pathlib vs os.path
3.1 pathlib模块(推荐)
Python 3.4+引入的pathlib提供了面向对象的路径操作:
from pathlib import Path# 创建路径对象p = Path('/home/user/documents')# 路径拼接(使用/运算符)file_path = p / 'report' / '2024' / 'data.txt'print(file_path)# /home/user/documents/report/2024/data.txt# 常用属性和方法print(file_path.name) # data.txt(文件名)print(file_path.stem) # data(不含扩展名)print(file_path.suffix) # .txt(扩展名)print(file_path.parent) # /home/user/documents/report/2024(父目录)print(file_path.parts) # ('/', 'home', 'user', ...)# 路径检查print(file_path.exists()) # 是否存在print(file_path.is_file()) # 是否是文件print(file_path.is_dir()) # 是否是目录print(file_path.is_absolute()) # 是否是绝对路径
3.2 文件系统操作
from pathlib import Pathp = Path('example.txt')# 文件操作p.write_text('Hello, World!', encoding='utf-8') # 写入文本content = p.read_text(encoding='utf-8') # 读取文本p.write_bytes(b'Binary data') # 写入二进制bytes_content = p.read_bytes() # 读取二进制# 目录操作dir_path = Path('my_folder')dir_path.mkdir(parents=True, exist_ok=True) # 创建目录(包括父目录)# 遍历目录for item in dir_path.iterdir(): print(item)# 递归遍历for py_file in dir_path.rglob('*.py'): print(py_file)# 文件重命名和删除p.rename('new_name.txt')p.replace(Path('backup') / 'new_name.txt') # 移动并覆盖p.unlink() # 删除文件dir_path.rmdir() # 删除空目录
3.3 pathlib vs os.path 对比
| | |
|---|
| p / 'file.txt' | os.path.join(p, 'file.txt') |
| p.name | os.path.basename(p) |
| p.parent | os.path.dirname(p) |
| p.suffix | os.path.splitext(p)[1] |
| p.resolve() | os.path.abspath(p) |
| p.exists() | os.path.exists(p) |
| p.read_text() | open(p).read() |
# pathlib的优势示例from pathlib import Pathimport os# 查找所有Python文件并计算总行数total_lines = 0for py_file in Path('.').rglob('*.py'): total_lines += len(py_file.read_text(encoding='utf-8').splitlines())print(f"Python文件总行数: {total_lines}")# 使用os.path的等效代码(更冗长)total_lines = 0for root, dirs, files in os.walk('.'): for file in files: if file.endswith('.py'): filepath = os.path.join(root, file) with open(filepath, 'r', encoding='utf-8') as f: total_lines += len(f.readlines())
四、异常处理机制
4.1 try/except/else/finally 结构
def divide(a, b): try: # 可能引发异常的代码 result = a / b except ZeroDivisionError: # 处理特定异常 print("错误:除数不能为零") return None except TypeError as e: # 捕获异常对象 print(f"类型错误: {e}") return None except Exception as e: # 捕获所有其他异常(谨慎使用) print(f"未知错误: {e}") return None else: # 没有异常时执行 print("计算成功") return result finally: # 无论是否发生异常都执行 print("操作完成")# 测试print(divide(10, 2)) # 正常情况print(divide(10, 0)) # ZeroDivisionErrorprint(divide(10, 'a')) # TypeError
4.2 异常捕获的最佳实践
# ✅ 捕获具体的异常try: value = int(user_input)except ValueError: print("请输入有效的数字")# ❌ 不要捕获所有异常(会隐藏bug)try: value = int(user_input)except: # 捕获包括KeyboardInterrupt在内的所有异常 pass# ✅ 获取异常详情try: risky_operation()except ValueError as e: print(f"错误信息: {e}") import traceback traceback.print_exc() # 打印完整堆栈# ✅ 多个异常一起捕获try: process_data()except (ValueError, TypeError) as e: print(f"数据错误: {e}")
4.3 自定义异常
class ValidationError(Exception): """数据验证错误""" passclass NotFoundError(Exception): """资源未找到""" def __init__(self, resource, resource_id): self.resource = resource self.resource_id = resource_id super().__init__(f"{resource} '{resource_id}' 未找到")# 使用自定义异常def get_user(user_id): user = database.find_user(user_id) if user is None: raise NotFoundError("用户", user_id) return userdef validate_age(age): if not isinstance(age, int) or age < 0 or age > 150: raise ValidationError("年龄必须在0-150之间")# 捕获自定义异常try: user = get_user(123) validate_age(user.age)except NotFoundError as e: print(e)except ValidationError as e: print(f"验证失败: {e}")
4.4 异常链和raise from
def read_config(filename): try: with open(filename, 'r') as f: return json.load(f) except FileNotFoundError as e: # 保留原始异常信息,添加上下文 raise ConfigError(f"配置文件不存在: {filename}") from e except json.JSONDecodeError as e: raise ConfigError(f"配置文件格式错误: {filename}") from e# 查看异常链try: config = read_config('app.json')except ConfigError as e: print(f"当前异常: {e}") print(f"原始异常: {e.__cause__}")
五、底层原理
5.1 文件描述符
操作系统通过文件描述符(File Descriptor)管理打开的文件:
# 获取文件描述符f = open('test.txt', 'r')print(f.fileno()) # 输出文件描述符(如3)# 文件描述符是有限的系统资源import osprint(f"当前进程打开的文件数: {len(os.listdir(f'/proc/{os.getpid()}/fd'))}")
为什么必须关闭文件?
5.2 异常继承体系
Python异常类的层次结构:
BaseException ├── SystemExit # sys.exit()引发 ├── KeyboardInterrupt # Ctrl+C ├── GeneratorExit # 生成器关闭 └── Exception # 常规异常的基类 ├── ArithmeticError │ └── ZeroDivisionError ├── LookupError │ ├── IndexError │ └── KeyError ├── TypeError ├── ValueError ├── RuntimeError │ └── RecursionError └── OSError ├── FileNotFoundError ├── PermissionError └── TimeoutError
# 捕获多个相关异常try: operation()except OSError as e: # 捕获所有操作系统相关错误 print(f"系统错误: {e}")
5.3 上下文管理器的底层实现
# with语句的等价展开with EXPR as VAR: BLOCK# 等价于mgr = (EXPR)exit = type(mgr).__exit__value = type(mgr).__enter__(mgr)exc = Truetry: VAR = value BLOCKexcept: exc = False if not exit(mgr, *sys.exc_info()): raisefinally: if exc: exit(mgr, None, None, None)
六、实战项目
6.1 日志记录系统
import jsonfrom datetime import datetimefrom pathlib import Pathfrom enum import Enumclass LogLevel(Enum): DEBUG = "DEBUG" INFO = "INFO" WARNING = "WARNING" ERROR = "ERROR" CRITICAL = "CRITICAL"class Logger: """简单的文件日志系统""" def __init__(self, log_file='app.log', level=LogLevel.INFO): self.log_file = Path(log_file) self.level = level self.levels = list(LogLevel) # 确保日志目录存在 self.log_file.parent.mkdir(parents=True, exist_ok=True) def _should_log(self, level): """检查是否应该记录该级别的日志""" return self.levels.index(level) >= self.levels.index(self.level) def _write_log(self, level, message, **kwargs): """写入日志""" if not self._should_log(level): return entry = { 'timestamp': datetime.now().isoformat(), 'level': level.value, 'message': message } entry.update(kwargs) with open(self.log_file, 'a', encoding='utf-8') as f: f.write(json.dumps(entry, ensure_ascii=False) + '\n') def debug(self, message, **kwargs): self._write_log(LogLevel.DEBUG, message, **kwargs) def info(self, message, **kwargs): self._write_log(LogLevel.INFO, message, **kwargs) def warning(self, message, **kwargs): self._write_log(LogLevel.WARNING, message, **kwargs) def error(self, message, **kwargs): self._write_log(LogLevel.ERROR, message, **kwargs) def critical(self, message, **kwargs): self._write_log(LogLevel.CRITICAL, message, **kwargs) def read_logs(self, level=None, limit=None): """读取日志""" if not self.log_file.exists(): return [] logs = [] with open(self.log_file, 'r', encoding='utf-8') as f: for line in f: entry = json.loads(line.strip()) if level is None or entry['level'] == level.value: logs.append(entry) if limit: logs = logs[-limit:] return logs def clear(self): """清空日志""" if self.log_file.exists(): self.log_file.unlink()# 使用示例logger = Logger('logs/myapp.log', LogLevel.DEBUG)logger.info("应用启动", version="1.0.0")logger.warning("内存使用率较高", usage="85%")logger.error("数据库连接失败", retry=3)print("\n最近的日志:")for log in logger.read_logs(limit=5): print(f"[{log['level']}] {log['timestamp']}: {log['message']}")
6.2 配置文件读写
import jsonimport configparserfrom pathlib import Pathclass ConfigManager: """支持多种格式的配置管理器""" def __init__(self, config_path): self.config_path = Path(config_path) self.config = {} self._load() def _load(self): """根据文件扩展名自动选择解析器""" if not self.config_path.exists(): self.config = {} return suffix = self.config_path.suffix.lower() try: if suffix == '.json': with open(self.config_path, 'r', encoding='utf-8') as f: self.config = json.load(f) elif suffix in ('.ini', '.cfg'): parser = configparser.ConfigParser() parser.read(self.config_path, encoding='utf-8') self.config = {s: dict(parser[s]) for s in parser.sections()} elif suffix in ('.yaml', '.yml'): import yaml with open(self.config_path, 'r', encoding='utf-8') as f: self.config = yaml.safe_load(f) else: raise ValueError(f"不支持的配置文件格式: {suffix}") except json.JSONDecodeError as e: raise ConfigError(f"JSON解析错误: {e}") from e except Exception as e: raise ConfigError(f"加载配置失败: {e}") from e def get(self, key, default=None): """获取配置值(支持点号分隔的键)""" keys = key.split('.') value = self.config for k in keys: if isinstance(value, dict) and k in value: value = value[k] else: return default return value def set(self, key, value): """设置配置值""" keys = key.split('.') config = self.config for k in keys[:-1]: if k not in config: config[k] = {} config = config[k] config[keys[-1]] = value def save(self): """保存配置到文件""" self.config_path.parent.mkdir(parents=True, exist_ok=True) suffix = self.config_path.suffix.lower() with open(self.config_path, 'w', encoding='utf-8') as f: if suffix == '.json': json.dump(self.config, f, indent=2, ensure_ascii=False) elif suffix in ('.ini', '.cfg'): parser = configparser.ConfigParser() for section, values in self.config.items(): parser[section] = values parser.write(f) def __getitem__(self, key): return self.config[key] def __setitem__(self, key, value): self.config[key] = value# 使用示例config = ConfigManager('app.json')config.set('database.host', 'localhost')config.set('database.port', 3306)config.set('app.debug', True)config.save()print(f"数据库主机: {config.get('database.host')}")print(f"调试模式: {config.get('app.debug', False)}")
6.3 目录遍历工具
from pathlib import Pathfrom collections import defaultdictimport hashlibclass DirectoryAnalyzer: """目录分析工具""" def __init__(self, root_path): self.root = Path(root_path) if not self.root.exists(): raise FileNotFoundError(f"目录不存在: {root_path}") def get_statistics(self): """获取目录统计信息""" stats = { 'total_files': 0, 'total_dirs': 0, 'total_size': 0, 'extensions': defaultdict(int) } for item in self.root.rglob('*'): if item.is_file(): stats['total_files'] += 1 stats['total_size'] += item.stat().st_size stats['extensions'][item.suffix.lower()] += 1 elif item.is_dir(): stats['total_dirs'] += 1 return stats def find_duplicates(self): """查找重复文件(基于文件内容哈希)""" hashes = defaultdict(list) for file_path in self.root.rglob('*'): if file_path.is_file(): file_hash = self._hash_file(file_path) hashes[file_hash].append(file_path) # 返回有重复的文件组 return {h: paths for h, paths in hashes.items() if len(paths) > 1} def _hash_file(self, file_path, block_size=65536): """计算文件MD5哈希""" hasher = hashlib.md5() with open(file_path, 'rb') as f: for block in iter(lambda: f.read(block_size), b''): hasher.update(block) return hasher.hexdigest() def find_large_files(self, min_size_mb=100): """查找大文件""" min_bytes = min_size_mb * 1024 * 1024 large_files = [] for file_path in self.root.rglob('*'): if file_path.is_file(): size = file_path.stat().st_size if size >= min_bytes: large_files.append((file_path, size)) return sorted(large_files, key=lambda x: x[1], reverse=True) def generate_tree(self, max_depth=None): """生成目录树""" lines = [str(self.root)] self._tree_recursive(self.root, '', lines, 0, max_depth) return '\n'.join(lines) def _tree_recursive(self, path, prefix, lines, depth, max_depth): """递归生成树形结构""" if max_depth is not None and depth >= max_depth: return items = sorted(path.iterdir(), key=lambda x: (x.is_file(), x.name)) for i, item in enumerate(items): is_last = (i == len(items) - 1) connector = '└── ' if is_last else '├── ' lines.append(prefix + connector + item.name) if item.is_dir(): extension = ' ' if is_last else '│ ' self._tree_recursive(item, prefix + extension, lines, depth + 1, max_depth)# 使用示例analyzer = DirectoryAnalyzer('.')print("=" * 50)print("目录统计")print("=" * 50)stats = analyzer.get_statistics()print(f"文件总数: {stats['total_files']}")print(f"目录总数: {stats['total_dirs']}")print(f"总大小: {stats['total_size'] / 1024 / 1024:.2f} MB")print("\n文件类型分布:")for ext, count in sorted(stats['extensions'].items(), key=lambda x: -x[1])[:5]: print(f" {ext or'(无扩展名)'}: {count}")print("\n" + "=" * 50)print("目录树(前2层)")print("=" * 50)print(analyzer.generate_tree(max_depth=2))
七、常见陷阱与最佳实践
7.1 陷阱1:忘记指定编码
# ❌ 错误:依赖系统默认编码(Windows可能是gbk)with open('text.txt', 'r') as f: content = f.read()# ✅ 正确:始终指定编码with open('text.txt', 'r', encoding='utf-8') as f: content = f.read()# ✅ 处理未知编码的文件try: with open('text.txt', 'r', encoding='utf-8') as f: content = f.read()except UnicodeDecodeError: with open('text.txt', 'r', encoding='gbk', errors='ignore') as f: content = f.read()
7.2 陷阱2:路径分隔符硬编码
# ❌ 错误:硬编码路径分隔符path = 'folder\\file.txt' # Windowspath = 'folder/file.txt' # Unix# ✅ 正确:使用pathlib或os.pathfrom pathlib import Pathpath = Path('folder') / 'file.txt'# 或import ospath = os.path.join('folder', 'file.txt')
7.3 陷阱3:裸except捕获
# ❌ 危险:捕获所有异常,包括KeyboardInterrupttry: process()except: pass# ✅ 正确:捕获具体异常try: process()except ValueError as e: logger.error(f"处理失败: {e}")# ✅ 如果确实需要捕获所有异常,至少打印信息try: process()except Exception as e: logger.exception("处理失败") raise
7.4 陷阱4:文件操作中的竞态条件
# ❌ 危险:检查后再操作存在竞态条件if os.path.exists('file.txt'): with open('file.txt', 'r') as f: # 文件可能在检查后被删除 content = f.read()# ✅ 正确:直接尝试操作,捕获异常try: with open('file.txt', 'r') as f: content = f.read()except FileNotFoundError: content = ''
7.5 最佳实践总结
八、本章小结
核心知识点
- 文件读写
- with语句
- pathlib
- 异常处理:try/except/else/finally结构,捕获具体异常
- 自定义异常
底层原理要点
- 文件描述符
- 上下文管理协议
- 异常继承体系:BaseException → Exception → 具体异常
- 竞态条件
九、课后练习
基础练习
文件复制工具:编写一个函数,实现文件的复制,要求显示复制进度(百分比)
文本搜索工具:编写程序,在指定目录的所有文本文件中搜索包含特定关键字的文件
异常处理练习:为以下代码添加适当的异常处理:
def read_and_parse(filename): with open(filename) as f: data = json.load(f) return data['value'] / data['count']
进阶练习
文件监控器:编写一个程序,监控指定目录,当有新文件创建时自动打印文件信息
CSV处理器:实现一个CSV文件处理器,支持读取、过滤、转换和保存
安全的临时文件:使用tempfile模块创建临时文件,确保程序退出时自动清理
挑战练习
实现文件锁:使用fcntl(Unix)或msvcrt(Windows)实现跨平台的文件锁机制
内存映射文件:使用mmap模块实现大文件的高效处理
参考资源
💡 学习建议:文件操作和异常处理是编写健壮程序的基础。建议在实际项目中多使用pathlib,它比传统的os.path更加直观和安全。同时,要养成良好的异常处理习惯,捕获具体异常而非泛泛处理。
本文是《Python全栈修炼之路》系列第8篇,系列文章持续更新中,欢迎关注!