🕐 预计用时:3-4 小时 | 🎯 目标:用 Day26-31 所学知识完成两个实战项目
场景:你下载了一堆文件,名字乱七八糟(IMG_001.jpg、photo(2).png、新建文件夹(3).txt……),想统一重命名。
import osfrom pathlib import Pathdef rename_sequential(directory, prefix="file", ext_filter=None, start=1): """按序号批量重命名 Args: directory: 目标目录 prefix: 文件名前缀 ext_filter: 只处理指定扩展名(如 ".jpg") start: 起始序号 """ target = Path(directory) if not target.is_dir(): print(f"❌ 目录不存在: {directory}") return # 获取所有文件(不包括目录) files = sorted([ f for f in target.iterdir() if f.is_file() ]) # 如果指定了扩展名过滤 if ext_filter: files = [f for f in files if f.suffix.lower() == ext_filter.lower()] if not files: print("⚠️ 没有找到匹配的文件") return print(f"📂 目录: {target}") print(f"📄 找到 {len(files)} 个文件") print() # 重命名 padding = len(str(len(files) + start - 1)) # 序号位数 renamed = [] for i, filepath in enumerate(files, start=start): new_name = f"{prefix}_{str(i).zfill(padding)}{filepath.suffix}" new_path = filepath.parent / new_name # 避免覆盖已有文件 if new_path.exists(): print(f"⚠️ 跳过 {filepath.name} → {new_name}(目标已存在)") continue filepath.rename(new_path) renamed.append((str(filepath), str(new_path))) print(f"✅ {filepath.name} → {new_name}") print(f"\n完成: 重命名 {len(renamed)} 个文件") return renamed# 使用示例:创建测试文件from pathlib import Pathimport os# 创建测试目录和文件test_dir = Path("test_rename")test_dir.mkdir(exist_ok=True)for name in ["photo 1.jpg", "IMG_002.jpg", "picture(3).jpg", "新建文件.jpg"]: (test_dir / name).touch()# 按序号重命名rename_sequential(test_dir, prefix="photo", ext_filter=".jpg")# 结果:# ✅ IMG_002.jpg → photo_1.jpg# ✅ photo 1.jpg → photo_2.jpg# ✅ picture(3).jpg → photo_3.jpg# ✅ 新建文件.jpg → photo_4.jpgimport osimport refrom pathlib import Pathdef rename_regex(directory, pattern, replacement, ext_filter=None, dry_run=True): """用正则表达式批量重命名 Args: directory: 目标目录 pattern: 正则表达式模式 replacement: 替换文本 ext_filter: 只处理指定扩展名 dry_run: True=只预览不执行 """ target = Path(directory) files = sorted([f for f in target.iterdir() if f.is_file()]) if ext_filter: files = [f for f in files if f.suffix.lower() == ext_filter.lower()] print(f"{'🔍 预览模式' if dry_run else '⚡ 执行模式'}") print(f"规则: '{pattern}' → '{replacement}'") print("-" * 50) renamed = [] for filepath in files: stem = filepath.stem new_stem = re.sub(pattern, replacement, stem) if new_stem == stem: continue # 没变化,跳过 new_name = new_stem + filepath.suffix new_path = filepath.parent / new_name if not dry_run: if new_path.exists(): print(f"⚠️ 跳过 {filepath.name}(目标已存在)") continue filepath.rename(new_path) renamed.append((filepath.name, new_name)) print(f"{'🔍' if dry_run else '✅'} {filepath.name} → {new_name}") action = "将被" if dry_run else "已" print(f"\n{action}重命名: {len(renamed)} 个文件") return renamed# 使用示例# 1. 空格换成下划线rename_regex("test_rename", r"\s+", "_")# 2. 去掉括号及内容rename_regex("test_rename", r"\(.*?\)", "")# 3. 统一日期格式:2024.01.15 → 2024-01-15rename_regex("test_rename", r"(\d{4})\.(\d{2})\.(\d{2})", r"\1-\2-\3")# 4. 去掉中文字符rename_regex("test_rename", r"[\u4e00-\u9fff]+", "")# 5. 全部转小写(需要自定义函数)def rename_lowercase(directory, ext_filter=None, dry_run=True): """文件名全部转小写""" target = Path(directory) files = sorted([f for f in target.iterdir() if f.is_file()]) if ext_filter: files = [f for f in files if f.suffix.lower() == ext_filter.lower()] renamed = [] for filepath in files: new_name = filepath.name.lower() if new_name != filepath.name: new_path = filepath.parent / new_name if not dry_run: filepath.rename(new_path) renamed.append((filepath.name, new_name)) print(f"{'🔍' if dry_run else '✅'} {filepath.name} → {new_name}") return renamed⚠️ dry_run 模式很重要!批量重命名是不可逆操作,一定要先预览(dry_run=True),确认无误后再执行(dry_run=False)。
import osimport timefrom datetime import datetimefrom pathlib import Pathdef rename_by_date(directory, date_format="%Y%m%d_%H%M%S", ext_filter=None, dry_run=True): """按文件修改日期重命名 Args: directory: 目标目录 date_format: 日期格式 ext_filter: 只处理指定扩展名 dry_run: True=只预览不执行 """ target = Path(directory) files = sorted([f for f in target.iterdir() if f.is_file()]) if ext_filter: files = [f for f in files if f.suffix.lower() == ext_filter.lower()] # 检查重名(同一秒可能有多个文件) name_count = {} print(f"{'🔍 预览模式' if dry_run else '⚡ 执行模式'}") print("-" * 50) renamed = [] for filepath in files: # 获取修改时间 mtime = filepath.stat().st_mtime dt = datetime.fromtimestamp(mtime) # 生成新文件名 base_name = dt.strftime(date_format) # 处理重名:加后缀 _1, _2, ... if base_name in name_count: name_count[base_name] += 1 base_name = f"{base_name}_{name_count[base_name]}" else: name_count[base_name] = 0 new_name = f"{base_name}{filepath.suffix}" new_path = filepath.parent / new_name if not dry_run: filepath.rename(new_path) renamed.append((filepath.name, new_name)) print(f"{'🔍' if dry_run else '✅'} {filepath.name} → {new_name}") print(f"\n{'将被' if dry_run else '已'}重命名: {len(renamed)} 个文件") return renamed# 使用示例rename_by_date("test_rename", date_format="%Y-%m-%d_%H%M%S", dry_run=True)# 结果:# 🔍 IMG_002.jpg → 2024-01-15_143025.jpg# 🔍 photo 1.jpg → 2024-01-15_143026.jpg# 🔍 picture(3).jpg → 2024-01-15_143027.jpg# 🔍 新建文件.jpg → 2024-01-15_143028.jpgimport osimport reimport jsonimport argparsefrom pathlib import Pathfrom datetime import datetimeclass BatchRenamer: """批量重命名工具(支持撤销)""" def __init__(self, directory): self.directory = Path(directory) self.history_file = self.directory / ".rename_history.json" self.history = self._load_history() def _load_history(self): """加载历史记录""" if self.history_file.exists(): return json.loads(self.history_file.read_text()) return [] def _save_history(self, rename_list): """保存重命名历史(用于撤销)""" entry = { "timestamp": datetime.now().isoformat(), "changes": rename_list } self.history.append(entry) self.history_file.write_text(json.dumps(self.history, ensure_ascii=False, indent=2)) def list_files(self, ext_filter=None): """列出目录中的文件""" files = sorted([f for f in self.directory.iterdir() if f.is_file()]) if ext_filter: files = [f for f in files if f.suffix.lower() == ext_filter.lower()] return files def preview(self, rename_func, **kwargs): """预览重命名结果""" files = self.list_files(kwargs.get("ext_filter")) changes = [] for filepath in files: new_name = rename_func(filepath.name, **kwargs) if new_name and new_name != filepath.name: changes.append((filepath.name, new_name)) if not changes: print("⚠️ 没有需要重命名的文件") return [] print(f"🔍 预览({len(changes)} 个文件将被重命名):") print("-" * 60) for old, new in changes: print(f" {old}") print(f" → {new}") print("-" * 60) return changes def execute(self, changes): """执行重命名""" if not changes: return rename_list = [] for old_name, new_name in changes: old_path = self.directory / old_name new_path = self.directory / new_name if new_path.exists(): print(f"⚠️ 跳过 {old_name}(目标已存在)") continue old_path.rename(new_path) rename_list.append({"old": old_name, "new": new_name}) print(f"✅ {old_name} → {new_name}") # 保存历史 if rename_list: self._save_history(rename_list) print(f"\n完成: 重命名 {len(rename_list)} 个文件(已保存历史,可撤销)") def undo(self): """撤销上一次重命名""" if not self.history: print("⚠️ 没有可撤销的操作") return last = self.history.pop() changes = last["changes"] print(f"⏪ 撤销 {last['timestamp']} 的操作:") for item in changes: old_path = self.directory / item["new"] new_path = self.directory / item["old"] if old_path.exists(): old_path.rename(new_path) print(f" ✅ {item['new']} → {item['old']}") else: print(f" ⚠️ {item['new']} 不存在,跳过") # 更新历史文件 self.history_file.write_text(json.dumps(self.history, ensure_ascii=False, indent=2))# ---- 重命名策略函数 ----def sequential_name(old_name, prefix="file", index=0, **kwargs): """按序号重命名""" ext = os.path.splitext(old_name)[1] return f"{prefix}_{index:03d}{ext}"def replace_pattern(old_name, pattern="", replacement="", **kwargs): """正则替换""" return re.sub(pattern, replacement, old_name)def add_prefix(old_name, prefix="", **kwargs): """添加前缀""" return f"{prefix}{old_name}"def add_suffix(old_name, suffix="", **kwargs): """添加后缀""" name, ext = os.path.splitext(old_name) return f"{name}{suffix}{ext}"def lowercase_name(old_name, **kwargs): """转小写""" return old_name.lower()# ---- 使用示例 ----# 创建测试test_dir = Path("test_rename")test_dir.mkdir(exist_ok=True)for name in ["Photo A.jpg", "IMG_001.jpg", "Picture (2).jpg", "新建文件.jpg"]: (test_dir / name).touch()renamer = BatchRenamer("test_rename")# 方式1: 预览 + 确认执行changes = renamer.preview(replace_pattern, pattern=r"\s+", replacement="_")if changes: renamer.execute(changes)# 方式2: 撤销renamer.undo()# 方式3: 序号重命名changes = renamer.preview(lambda name, **kw: f"photo_{kw['index']:03d}{os.path.splitext(name)[1]}", index=1)# 注意:上面的 lambda 每次 index 都是 1,实际需要用闭包或类方法💡 撤销功能的设计思路:1. 每次重命名前,把 (旧名→新名) 记录到 JSON 文件2. 撤销时读取最后一次记录,反向重命名3. 这是"操作日志"模式——数据库、游戏存档都用这个思路
场景:服务器每天产出大量日志文件,需要分析错误频率、请求量、响应时间等指标。
# 典型的 Web 服务器日志格式(Apache/Nginx 风格)# 192.168.1.1 - - [15/Jan/2024:10:30:15 +0800] "GET /api/users HTTP/1.1" 200 1234# 192.168.1.2 - - [15/Jan/2024:10:30:16 +0800] "POST /api/login HTTP/1.1" 401 567# 192.168.1.1 - - [15/Jan/2024:10:30:17 +0800] "GET /api/orders HTTP/1.1" 500 0# 我们用简化的格式(方便演示)# [2024-01-15 10:30:15] [INFO] GET /api/users 200 12ms# [2024-01-15 10:30:16] [WARN] POST /api/login 401 5ms# [2024-01-15 10:30:17] [ERROR] GET /api/orders 500 150ms - DatabaseError: Connection refusedimport randomfrom datetime import datetime, timedeltadef generate_test_log(filename, num_lines=500): """生成测试日志文件""" levels = ["INFO", "INFO", "INFO", "INFO", "WARN", "ERROR"] # INFO 概率更高 methods = ["GET", "POST", "PUT", "DELETE"] paths = ["/api/users", "/api/login", "/api/orders", "/api/products", "/api/cart", "/api/pay"] status_codes = [200, 200, 200, 201, 301, 400, 401, 403, 404, 500] errors = [ "DatabaseError: Connection refused", "TimeoutError: Request timeout", "ValueError: Invalid parameter", "KeyError: 'user_id'", "PermissionError: Access denied" ] base_time = datetime(2024, 1, 15, 0, 0, 0) lines = [] for i in range(num_lines): # 随机时间(按顺序递增) base_time += timedelta(seconds=random.randint(1, 30)) level = random.choice(levels) method = random.choice(methods) path = random.choice(paths) status = random.choice(status_codes) response_time = random.randint(1, 500) line = f"[{base_time.strftime('%Y-%m-%d %H:%M:%S')}] [{level}] {method} {path} {status} {response_time}ms" if level == "ERROR": line += f" - {random.choice(errors)}" lines.append(line) with open(filename, "w", encoding="utf-8") as f: f.write("\n".join(lines)) print(f"✅ 生成 {num_lines} 行日志 → {filename}")generate_test_log("app.log", 500)import refrom collections import Counter, defaultdictfrom datetime import datetimedef parse_log_line(line): """解析一行日志""" # 格式: [2024-01-15 10:30:15] [INFO] GET /api/users 200 12ms - ErrorMsg pattern = r"\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\] \[(\w+)\] (\w+) ([^\s]+) (\d+) (\d+)ms(?: - (.+))?" match = re.match(pattern, line) if not match: return None return { "timestamp": datetime.strptime(match.group(1), "%Y-%m-%d %H:%M:%S"), "level": match.group(2), "method": match.group(3), "path": match.group(4), "status": int(match.group(5)), "response_time": int(match.group(6)), "error": match.group(7) if match.group(7) else None }def analyze_log(filepath): """分析日志文件""" logs = [] parse_errors = 0 with open(filepath, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue entry = parse_log_line(line) if entry: logs.append(entry) else: parse_errors += 1 if not logs: print("⚠️ 没有解析到有效日志") return # ---- 统计 ---- # 1. 日志级别统计 level_counter = Counter(log["level"] for log in logs) # 2. 状态码统计 status_counter = Counter(log["status"] for log in logs) # 3. 请求路径统计 path_counter = Counter(log["path"] for log in logs) # 4. HTTP 方法统计 method_counter = Counter(log["method"] for log in logs) # 5. 响应时间统计 response_times = [log["response_time"] for log in logs] avg_response = sum(response_times) / len(response_times) max_response = max(response_times) min_response = min(response_times) # 6. 每小时请求量 hourly = Counter(log["timestamp"].hour for log in logs) # 7. 错误日志详情 errors = [log for log in logs if log["level"] == "ERROR"] error_types = Counter(log["error"] for log in errors if log["error"]) # ---- 输出报告 ---- print("=" * 60) print(f"📊 日志分析报告: {filepath}") print("=" * 60) print(f"\n📝 基本信息:") print(f" 总行数: {len(logs)}") print(f" 解析失败: {parse_errors} 行") print(f" 时间范围: {logs[0]['timestamp']} ~ {logs[-1]['timestamp']}") print(f"\n📊 日志级别:") for level, count in level_counter.most_common(): pct = count / len(logs) * 100 bar = "█" * int(pct / 2) print(f" {level:8s} {count:4d} ({pct:5.1f}%) {bar}") print(f"\n📡 HTTP 方法:") for method, count in method_counter.most_common(): print(f" {method:8s} {count:4d}") print(f"\n🔢 状态码分布:") for status, count in status_counter.most_common(): emoji = "✅" if status < 400 else "⚠️" if status < 500 else "❌" print(f" {emoji} {status} {count:4d}") print(f"\n🛤️ 热门路径 TOP 5:") for path, count in path_counter.most_common(5): print(f" {path:30s} {count:4d} 次") print(f"\n⏱️ 响应时间:") print(f" 平均: {avg_response:.1f}ms") print(f" 最快: {min_response}ms") print(f" 最慢: {max_response}ms") # 响应时间分布 fast = sum(1 for t in response_times if t < 100) medium = sum(1 for t in response_times if 100 <= t < 300) slow = sum(1 for t in response_times if t >= 300) print(f" <100ms: {fast} | 100-300ms: {medium} | >300ms: {slow}") print(f"\n🕐 每小时请求量:") for hour in sorted(hourly.keys()): count = hourly[hour] bar = "█" * (count // 2) print(f" {hour:02d}:00 {count:3d} {bar}") if errors: print(f"\n❌ 错误详情 (共 {len(errors)} 条):") for error_type, count in error_types.most_common(5): print(f" {error_type}: {count} 次") print("\n" + "=" * 60) return { "total": len(logs), "levels": dict(level_counter), "statuses": dict(status_counter), "paths": dict(path_counter), "avg_response_time": avg_response, "errors": len(errors) }# 使用result = analyze_log("app.log")import csvfrom pathlib import Pathdef export_report_csv(log_filepath, output_csv="log_report.csv"): """导出分析报告为 CSV""" logs = [] with open(log_filepath, "r", encoding="utf-8") as f: for line in f: entry = parse_log_line(line.strip()) if entry: logs.append(entry) # 按小时统计 from collections import defaultdict hourly_stats = defaultdict(lambda: {"requests": 0, "errors": 0, "total_time": 0}) for log in logs: hour = log["timestamp"].strftime("%Y-%m-%d %H:00") hourly_stats[hour]["requests"] += 1 hourly_stats[hour]["total_time"] += log["response_time"] if log["level"] == "ERROR": hourly_stats[hour]["errors"] += 1 # 写入 CSV with open(output_csv, "w", encoding="utf-8", newline="") as f: writer = csv.writer(f) writer.writerow(["时间段", "请求数", "错误数", "平均响应时间(ms)"]) for hour in sorted(hourly_stats.keys()): stats = hourly_stats[hour] avg_time = stats["total_time"] / stats["requests"] if stats["requests"] > 0 else 0 writer.writerow([hour, stats["requests"], stats["errors"], f"{avg_time:.1f}"]) print(f"✅ 报告已导出: {output_csv}")export_report_csv("app.log")def export_report_html(log_filepath, output_html="log_report.html"): """导出分析报告为 HTML(公众号兼容格式)""" from collections import Counter, defaultdict logs = [] with open(log_filepath, "r", encoding="utf-8") as f: for line in f: entry = parse_log_line(line.strip()) if entry: logs.append(entry) # 统计数据 level_counter = Counter(log["level"] for log in logs) status_counter = Counter(log["status"] for log in logs) path_counter = Counter(log["path"] for log in logs) response_times = [log["response_time"] for log in logs] errors = [log for log in logs if log["level"] == "ERROR"] error_types = Counter(log["error"] for log in errors if log["error"]) # 生成 HTML html = f"""<!DOCTYPE html><html><head><meta charset="utf-8"><title>日志分析报告</title><style>body {{ font-family: sans-serif; max-width: 780px; margin: 0 auto; padding: 20px; }}h1 {{ text-align: center; color: #1a1a1a; }}table {{ width: 100%; border-collapse: collapse; margin: 16px 0; }}th {{ background: #07c160; color: white; padding: 10px; text-align: left; }}td {{ padding: 8px 12px; border: 1px solid #e0e0e0; }}tr:nth-child(even) {{ background: #f9f9f9; }}</style></head><body><h1>📊 日志分析报告</h1><p>总日志: {len(logs)} 条 | 时间范围: {logs[0]['timestamp']} ~ {logs[-1]['timestamp']}</p><h2>日志级别分布</h2><table><tr><th>级别</th><th>数量</th><th>占比</th></tr>""" for level, count in level_counter.most_common(): pct = count / len(logs) * 100 html += f"\n<tr><td>{level}</td><td>{count}</td><td>{pct:.1f}%</td></tr>" html += """</table><h2>状态码分布</h2><table><tr><th>状态码</th><th>数量</th></tr>""" for status, count in status_counter.most_common(): html += f"\n<tr><td>{status}</td><td>{count}</td></tr>" html += """</table><h2>热门路径 TOP 5</h2><table><tr><th>路径</th><th>请求次数</th></tr>""" for path, count in path_counter.most_common(5): html += f"\n<tr><td>{path}</td><td>{count}</td></tr>" html += f"""</table><h2>响应时间</h2><table><tr><th>指标</th><th>值</th></tr><tr><td>平均</td><td>{sum(response_times)/len(response_times):.1f}ms</td></tr><tr><td>最快</td><td>{min(response_times)}ms</td></tr><tr><td>最慢</td><td>{max(response_times)}ms</td></tr></table>""" if error_types: html += "\n<h2>错误类型 TOP 5</h2>\n<table>\n<tr><th>错误</th><th>次数</th></tr>" for error, count in error_types.most_common(5): html += f"\n<tr><td>{error}</td><td>{count}</td></tr>" html += "\n</table>" html += "\n</body></html>" Path(output_html).write_text(html, encoding="utf-8") print(f"✅ HTML 报告已导出: {output_html}")export_report_html("app.log")import timeimport refrom datetime import datetimedef tail_log(filepath, callback, interval=1): """实时监控日志文件(类似 tail -f) Args: filepath: 日志文件路径 callback: 每行的处理函数 interval: 检查间隔(秒) """ print(f"👁️ 开始监控: {filepath}") print("按 Ctrl+C 停止\n") with open(filepath, "r", encoding="utf-8") as f: # 跳到文件末尾 f.seek(0, 2) try: while True: line = f.readline() if line: line = line.strip() if line: entry = parse_log_line(line) if entry: callback(entry) else: time.sleep(interval) except KeyboardInterrupt: print("\n⏹️ 监控已停止")def alert_handler(entry): """告警处理函数""" if entry["level"] == "ERROR": print(f"🚨 [{entry['timestamp']}] ERROR: {entry['path']} - {entry['error']}") elif entry["response_time"] > 300: print(f"🐢 [{entry['timestamp']}] 慢请求: {entry['path']} ({entry['response_time']}ms)") elif entry["status"] >= 500: print(f"💥 [{entry['timestamp']}] 服务器错误: {entry['status']} {entry['path']}")# 实时监控(取消注释运行)# tail_log("app.log", alert_handler)💡 tail -f 原理:持续读取文件的新内容。f.seek(0, 2) 把指针移到末尾,然后不断 readline() 检查新行。这是日志监控的经典模式。
f"{prefix}_{i:03d}{ext}" | ||
re.sub(pattern, repl, name) | ||
datetime.fromtimestamp(mtime) | ||
dry_run=True | ||
re.match(pattern, line) | ||
Counter(log["level"]) | ||
path_counter.most_common(5) | ||
sum(times)/len(times) | ||
Counter(timestamp.hour) | ||
csv.writer | ||
Path.write_text() | ||
f.seek(0,2) + readline() |
🎯 扩展练习:1. 给重命名工具添加"递归模式"——连子目录的文件一起处理2. 给日志分析器添加"异常检测"——找出响应时间突增的时间段3. 写一个"日志清洗器"——把敏感信息(IP、手机号)替换成 ***4. 把两个工具打包成命令行程序(用 argparse)
📚 Day32 完成!第二阶段数据处理基础已全部结束 ✅明天进入 Day33:迭代器与生成器 — 惰性求值的魔法