嘿,学习搭子!咱们一起闯过了高级数据结构Collections模块的关卡,是不是感觉自己的工具箱又丰富了不少?但你想过没有,Python真正的强大之处,不仅在于那些高级数据结构,更在于它 "开箱即用" 的标准库!今天咱们就来探索Python的 "瑞士军刀" ——常用内置模块!我会像之前一样,带你一步步拆解,让你掌握这些在日常开发中无处不在的核心工具!
- • 需要操作文件或目录,却不知道如何高效处理路径、遍历文件夹?
- • 想要获取程序的命令行参数,或者了解Python解释器的信息?
- • 需要处理时间日期,但总是被时区、格式化搞得头晕?
- • 要进行JSON数据解析,或者生成随机数、进行数学计算?
- • 想要用正则表达式处理文本,或者使用高阶函数简化代码?
如果你的答案是"是的,我经常需要这些功能!",那今天这篇文章就是为你量身定做的。咱们要系统学习 Python最常用的内置模块:os、sys、datetime、math、random、json、re、itertools、functools。通过设计 自动备份脚本、日志分析工具、数据处理管道 等实际案例,你将掌握如何用Python标准库解决真实问题。
学习目标:学完本章,你不仅能理解常用内置模块的核心功能,还能根据场景选择最合适的工具,编写出更专业、高效、易维护的代码。预期成果:掌握内置模块的应用场景,能独立完成文件操作、数据处理、系统交互等常见任务,并为后续学习第三方库生态打下坚实基础。
os模块是Python与操作系统交互的接口,提供了丰富的文件、目录操作功能。
import osimport os.path as op# 1. 路径操作current_dir = os.getcwd() # 获取当前工作目录print(f"当前目录: {current_dir}")# 路径拼接(跨平台安全)config_path = op.join("config", "app", "settings.json")print(f"配置文件路径: {config_path}")# 检查路径存在性if op.exists(config_path):print("配置文件存在")else:print("配置文件不存在")# 2. 目录操作# 创建目录(包括父目录)os.makedirs("logs/2024/01", exist_ok=True)# 遍历目录for root, dirs, files in os.walk("."):for file in files:if file.endswith(".py"): full_path = op.join(root, file)print(f"Python文件: {full_path}")# 3. 文件信息file_path = "example.txt"if op.exists(file_path): file_size = op.getsize(file_path) modified_time = op.getmtime(file_path)print(f"文件大小: {file_size}字节")print(f"修改时间: {modified_time}")
defbatch_rename_files(directory, pattern, new_name_format):"""批量重命名目录中的文件"""import reifnot op.exists(directory):print(f"目录不存在: {directory}")return renamed_count = 0for filename in os.listdir(directory):if re.match(pattern, filename):# 提取信息(如从"IMG_20240101_001.jpg"提取日期和序号)match = re.search(r'(\d{8})_(\d{3})', filename)ifmatch: date_str = match.group(1) seq_num = match.group(2)# 新文件名 ext = op.splitext(filename)[1] new_filename = new_name_format.format( date=date_str, seq=seq_num, ext=ext )# 重命名 old_path = op.join(directory, filename) new_path = op.join(directory, new_filename) os.rename(old_path, new_path)print(f"重命名: {filename} -> {new_filename}") renamed_count += 1print(f"完成!共重命名 {renamed_count} 个文件")# 使用示例# batch_rename_files("photos", r"IMG_\d{8}_\d{3}\.jpg", "vacation_{date}_{seq}{ext}")
sys模块提供了与Python解释器紧密相关的功能,包括命令行参数、模块搜索路径等。
import sys# 1. 命令行参数print(f"脚本名称: {sys.argv[0]}")print(f"参数列表: {sys.argv[1:]}")# 示例:处理命令行参数defprocess_arguments():iflen(sys.argv) < 2:print("使用方法: python script.py <command> [options]") sys.exit(1) command = sys.argv[1]if command == "help":print("可用命令: help, run, test")elif command == "run":print("运行程序...")else:print(f"未知命令: {command}")# 2. 模块搜索路径print("Python模块搜索路径:")for path in sys.path:print(f" {path}")# 添加自定义路径sys.path.append("/opt/my_modules")# 3. 标准输入输出sys.stdout.write("这是标准输出\n")sys.stderr.write("这是错误输出\n")# 从标准输入读取# user_input = sys.stdin.readline().strip()# 4. 系统信息print(f"Python版本: {sys.version}")print(f"平台: {sys.platform}")print(f"默认编码: {sys.getdefaultencoding()}")# 5. 退出程序# sys.exit(0) # 正常退出# sys.exit(1) # 错误退出
classCommandLineTool:"""简单的命令行工具框架"""def__init__(self):self.commands = {'help': self.show_help,'version': self.show_version,'process': self.process_data }defshow_help(self, args):"""显示帮助信息"""print("可用命令:")for cmd inself.commands:print(f" {cmd}")defshow_version(self, args):"""显示版本信息"""print(f"工具版本: 1.0.0")print(f"Python版本: {sys.version}")defprocess_data(self, args):"""处理数据"""iflen(args) < 1:print("使用方法: process <filename>")return filename = args[0]print(f"处理文件: {filename}")# 实际处理逻辑...defrun(self):"""运行命令行工具"""iflen(sys.argv) < 2:self.show_help([]) sys.exit(1) command = sys.argv[1] args = sys.argv[2:]if command inself.commands:self.commands[command](args)else:print(f"错误: 未知命令 '{command}'")self.show_help([]) sys.exit(1)# 使用示例# if __name__ == "__main__":# tool = CommandLineTool()# tool.run()
datetime模块提供了丰富的时间日期处理功能,支持日期计算、格式化、时区转换等。
from datetime import datetime, date, time, timedeltaimport pytz # 需要安装: pip install pytz# 1. 获取当前时间now = datetime.now()print(f"当前时间: {now}")print(f"格式化: {now.strftime('%Y-%m-%d %H:%M:%S')}")# 2. 创建特定时间new_year = datetime(2024, 1, 1, 0, 0, 0)print(f"新年: {new_year}")# 3. 时间运算tomorrow = now + timedelta(days=1)last_week = now - timedelta(weeks=1)print(f"明天: {tomorrow.date()}")print(f"上周: {last_week.date()}")# 时间差计算delta = tomorrow - nowprint(f"距离明天还有: {delta}")print(f"小时数: {delta.total_seconds() / 3600:.1f}")# 4. 时区处理# 本地时间转UTCutc_now = datetime.utcnow()print(f"UTC时间: {utc_now}")# 时区转换(需要pytz)# beijing_tz = pytz.timezone('Asia/Shanghai')# beijing_time = utc_now.replace(tzinfo=pytz.utc).astimezone(beijing_tz)# print(f"北京时间: {beijing_time}")# 5. 解析字符串date_str = "2024-01-15 14:30:00"parsed_date = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')print(f"解析的时间: {parsed_date}")# 6. 日期属性print(f"年: {now.year}")print(f"月: {now.month}")print(f"日: {now.day}")print(f"小时: {now.hour}")print(f"星期: {now.weekday()}") # 0=周一, 6=周日
classTaskScheduler:"""简单的任务调度器"""def__init__(self):self.tasks = []defadd_task(self, name, execute_time, callback):"""添加任务"""self.tasks.append({'name': name,'execute_time': execute_time,'callback': callback,'executed': False })self.tasks.sort(key=lambda x: x['execute_time'])print(f"添加任务: {name} ({execute_time})")defcheck_and_execute(self):"""检查并执行到期任务""" now = datetime.now() executed_tasks = []for task inself.tasks:ifnot task['executed'] and task['execute_time'] <= now:try:print(f"执行任务: {task['name']}") task['callback']() task['executed'] = True executed_tasks.append(task['name'])except Exception as e:print(f"任务执行失败 {task['name']}: {e}")# 清理已完成任务self.tasks = [t for t inself.tasks ifnot t['executed']]return executed_tasksdefupcoming_tasks(self, n=5):"""获取即将执行的任务""" now = datetime.now() upcoming = []for task inself.tasks:ifnot task['executed']: time_diff = (task['execute_time'] - now).total_seconds()if time_diff > 0: upcoming.append({'name': task['name'],'execute_time': task['execute_time'],'time_left': f"{time_diff/3600:.1f}小时" })iflen(upcoming) >= n:breakreturn upcoming# 使用示例# scheduler = TaskScheduler()# scheduler.add_task("备份数据库", datetime.now() + timedelta(hours=1), backup_db)# scheduler.add_task("发送报告", datetime.now() + timedelta(hours=2), send_report)
math模块提供了丰富的数学函数,包括三角函数、对数函数、幂函数等。
import math# 1. 基本数学函数print(f"圆周率: {math.pi}")print(f"自然常数: {math.e}")print(f"向上取整: {math.ceil(3.2)}") # 4print(f"向下取整: {math.floor(3.8)}") # 3print(f"绝对值: {math.fabs(-5.3)}") # 5.3# 2. 幂与对数print(f"2的3次方: {math.pow(2, 3)}") # 8.0print(f"平方根: {math.sqrt(16)}") # 4.0print(f"e的平方: {math.exp(2)}") # e^2print(f"以e为底的对数: {math.log(10)}") # ln(10)print(f"以10为底的对数: {math.log10(100)}") # 2.0# 3. 三角函数(弧度制)angle_rad = math.radians(30) # 30度转弧度print(f"30度的正弦: {math.sin(angle_rad):.3f}")print(f"30度的余弦: {math.cos(angle_rad):.3f}")print(f"30度的正切: {math.tan(angle_rad):.3f}")# 4. 角度与弧度转换print(f"π弧度 = {math.degrees(math.pi)}度") # 180.0print(f"90度 = {math.radians(90)}弧度") # π/2# 5. 特殊函数print(f"阶乘5: {math.factorial(5)}") # 120print(f"最大公约数(12, 18): {math.gcd(12, 18)}") # 6# 6. 浮点数工具print(f"判断无穷大: {math.isinf(float('inf'))}") # Trueprint(f"判断NaN: {math.isnan(float('nan'))}") # Trueprint(f"判断有限数: {math.isfinite(100)}") # True
classScientificCalculator:"""科学计算器""" @staticmethoddefcalculate_circle(radius):"""计算圆的相关参数"""if radius <= 0:raise ValueError("半径必须大于0") area = math.pi * math.pow(radius, 2) circumference = 2 * math.pi * radius diameter = 2 * radiusreturn {'radius': radius,'diameter': diameter,'area': round(area, 4),'circumference': round(circumference, 4) } @staticmethoddefcalculate_triangle(a, b, c):"""计算三角形相关参数"""# 验证是否为有效三角形 sides = sorted([a, b, c])if sides[0] + sides[1] <= sides[2]:raise ValueError("无效的三角形边长")# 使用海伦公式计算面积 s = (a + b + c) / 2 area = math.sqrt(s * (s - a) * (s - b) * (s - c))# 计算角度(余弦定理) angle_a = math.degrees(math.acos((b**2 + c**2 - a**2) / (2 * b * c))) angle_b = math.degrees(math.acos((a**2 + c**2 - b**2) / (2 * a * c))) angle_c = 180 - angle_a - angle_breturn {'sides': {'a': a, 'b': b, 'c': c},'area': round(area, 4),'angles': {'A': round(angle_a, 2),'B': round(angle_b, 2),'C': round(angle_c, 2) } } @staticmethoddefstatistics(data):"""计算基本统计量"""ifnot data:raise ValueError("数据不能为空") n = len(data) mean = sum(data) / n variance = sum((x - mean) ** 2for x in data) / n std_dev = math.sqrt(variance) sorted_data = sorted(data)if n % 2 == 0: median = (sorted_data[n//2 - 1] + sorted_data[n//2]) / 2else: median = sorted_data[n//2]return {'count': n,'mean': round(mean, 4),'median': round(median, 4),'variance': round(variance, 4),'std_dev': round(std_dev, 4),'min': min(data),'max': max(data) }# 使用示例# calc = ScientificCalculator()# print(calc.calculate_circle(5))# print(calc.calculate_triangle(3, 4, 5))# print(calc.statistics([1, 2, 3, 4, 5]))
random模块提供了各种随机数生成功能,支持随机选择、打乱、采样等。
import random# 1. 基本随机数print(f"0-1随机浮点数: {random.random()}")print(f"1-10随机整数: {random.randint(1, 10)}")print(f"1-10随机浮点数: {random.uniform(1, 10):.3f}")# 2. 序列操作items = ['apple', 'banana', 'orange', 'grape']print(f"随机选择一个: {random.choice(items)}")print(f"随机选择3个(可重复): {random.choices(items, k=3)}")print(f"随机选择2个(不重复): {random.sample(items, 2)}")# 打乱顺序shuffled = items.copy()random.shuffle(shuffled)print(f"打乱后: {shuffled}")# 3. 随机种子(可重现的随机序列)random.seed(42) # 固定种子print(f"固定种子随机数1: {random.random()}")print(f"固定种子随机数2: {random.random()}")random.seed(42) # 重置相同种子print(f"相同种子随机数1: {random.random()}") # 与上面相同# 4. 概率分布# 正态分布normals = [random.normalvariate(0, 1) for _ inrange(5)]print(f"正态分布样本: {[round(x, 3) for x in normals]}")# 指数分布exponentials = [random.expovariate(1.0) for _ inrange(3)]print(f"指数分布样本: {[round(x, 3) for x in exponentials]}")# 5. 高级随机# 生成随机字符串import stringrandom_str = ''.join(random.choices(string.ascii_letters + string.digits, k=10))print(f"随机字符串: {random_str}")# 随机布尔值random_bool = random.choice([True, False])print(f"随机布尔: {random_bool}")
classLotterySystem:"""抽奖系统"""def__init__(self):self.participants = []self.prizes = []self.winners = {}defadd_participant(self, name, ticket_count=1):"""添加参与者"""self.participants.extend([name] * ticket_count)print(f"添加参与者: {name} (票数: {ticket_count})")defadd_prize(self, prize_name, quantity=1):"""添加奖品"""self.prizes.extend([prize_name] * quantity)print(f"添加奖品: {prize_name} (数量: {quantity})")defdraw_lottery(self):"""进行抽奖"""iflen(self.participants) == 0:print("错误: 没有参与者")returniflen(self.prizes) == 0:print("错误: 没有奖品")return# 打乱参与者顺序 random.shuffle(self.participants)# 抽奖self.winners = {} prize_index = 0for i inrange(min(len(self.prizes), len(self.participants))): winner = self.participants[i] prize = self.prizes[prize_index]if winner notinself.winners:self.winners[winner] = []self.winners[winner].append(prize) prize_index += 1# 显示结果print("\n🎉 抽奖结果 🎉")print("=" * 30)for winner, prizes inself.winners.items():print(f"{winner}: {', '.join(prizes)}")returnself.winnersdefstatistics(self):"""统计信息""" total_participants = len(set(self.participants)) total_tickets = len(self.participants) total_prizes = len(self.prizes)return {'unique_participants': total_participants,'total_tickets': total_tickets,'total_prizes': total_prizes,'ticket_distribution': total_tickets / total_participants if total_participants > 0else0,'win_probability': total_prizes / total_tickets if total_tickets > 0else0 }# 使用示例# lottery = LotterySystem()# lottery.add_participant("张三", 3)# lottery.add_participant("李四", 2)# lottery.add_participant("王五", 1)# lottery.add_prize("一等奖", 1)# lottery.add_prize("二等奖", 2)# lottery.add_prize("参与奖", 5)# winners = lottery.draw_lottery()# print(lottery.statistics())
json模块提供了JSON数据的编码(序列化)和解码(反序列化)功能。
import json# 1. 基本编码解码data = {"name": "Python学习搭子","age": 30,"skills": ["Python", "数据分析", "Web开发"],"active": True,"score": 95.5}# 编码为JSON字符串json_str = json.dumps(data, ensure_ascii=False, indent=2)print("JSON字符串:")print(json_str)# 解码JSON字符串parsed_data = json.loads(json_str)print(f"\n解码后类型: {type(parsed_data)}")print(f"姓名: {parsed_data['name']}")# 2. 文件操作# 写入JSON文件withopen("data.json", "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2)# 读取JSON文件withopen("data.json", "r", encoding="utf-8") as f: loaded_data = json.load(f)print(f"\n从文件加载: {loaded_data['name']}")# 3. 高级特性# 自定义编码器classPerson:def__init__(self, name, age):self.name = nameself.age = ageclassPersonEncoder(json.JSONEncoder):defdefault(self, obj):ifisinstance(obj, Person):return {"name": obj.name, "age": obj.age}returnsuper().default(obj)person = Person("张三", 25)person_json = json.dumps(person, cls=PersonEncoder, ensure_ascii=False)print(f"\n自定义编码: {person_json}")# 4. 处理复杂类型complex_data = {"timestamp": datetime.now().isoformat(), # 日期时间"data": [1, 2, 3, 4, 5],"metadata": {"version": "1.0","author": "Python学习搭子" }}# 自定义序列化函数defcustom_serializer(obj):ifisinstance(obj, datetime):return obj.isoformat()raise TypeError(f"Type {type(obj)} not serializable")complex_json = json.dumps(complex_data, default=custom_serializer, indent=2)print(f"\n复杂数据JSON:\n{complex_json}")
classConfigManager:"""配置文件管理器"""def__init__(self, config_path="config.json"):self.config_path = config_pathself.config = self._load_config()def_load_config(self):"""加载配置文件""" default_config = {"app": {"name": "MyApp","version": "1.0.0","debug": False },"database": {"host": "localhost","port": 3306,"username": "root","password": "" },"logging": {"level": "INFO","file": "app.log","max_size": "10MB" } }try:if os.path.exists(self.config_path):withopen(self.config_path, "r", encoding="utf-8") as f: loaded_config = json.load(f)# 合并配置(加载的配置优先)returnself._merge_configs(default_config, loaded_config)else:# 创建默认配置self._save_config(default_config)return default_configexcept json.JSONDecodeError as e:print(f"配置文件格式错误: {e}")return default_configdef_merge_configs(self, default, loaded):"""合并配置""" merged = default.copy()defdeep_merge(source, update):for key, value in update.items():if key in source andisinstance(source[key], dict) andisinstance(value, dict): deep_merge(source[key], value)else: source[key] = value deep_merge(merged, loaded)return mergeddef_save_config(self, config):"""保存配置文件"""try:withopen(self.config_path, "w", encoding="utf-8") as f: json.dump(config, f, ensure_ascii=False, indent=2)print(f"配置文件已保存: {self.config_path}")except Exception as e:print(f"保存配置文件失败: {e}")defget(self, key, default=None):"""获取配置值""" keys = key.split(".") value = self.configtry:for k in keys: value = value[k]return valueexcept (KeyError, TypeError):return defaultdefset(self, key, value):"""设置配置值""" keys = key.split(".") config = self.config# 导航到最后一个键的父级for k in keys[:-1]:if k notin config: config[k] = {} config = config[k]# 设置值 config[keys[-1]] = value# 保存self._save_config(self.config)print(f"配置已更新: {key} = {value}")defshow_config(self):"""显示当前配置"""print("\n当前配置:")print("=" * 40)print(json.dumps(self.config, ensure_ascii=False, indent=2))# 使用示例# config = ConfigManager()# print(f"应用名称: {config.get('app.name')}")# config.set('database.host', '127.0.0.1')# config.show_config()
re模块提供了正则表达式功能,用于复杂的文本匹配、搜索和替换。
import re# 1. 基本匹配text = "今天是2024年1月15日,天气晴朗。"# 搜索数字pattern = r'\d+'matches = re.findall(pattern, text)print(f"找到的数字: {matches}") # ['2024', '1', '15']# 2. 搜索与匹配# 搜索第一个匹配match = re.search(r'\d{4}', text)ifmatch:print(f"找到年份: {match.group()}") # 2024print(f"位置: {match.start()}-{match.end()}") # 3-7# 检查是否匹配开头if re.match(r'今天', text):print("文本以'今天'开头")# 3. 替换new_text = re.sub(r'\d+', '数字', text)print(f"替换后: {new_text}") # 今天是数字年数字月数字日,天气晴朗。# 4. 分组提取email_text = "联系邮箱: user@example.com, 备用邮箱: admin@test.org"email_pattern = r'([\w\.-]+)@([\w\.-]+)\.(\w+)'formatchin re.finditer(email_pattern, email_text):print(f"邮箱: {match.group()}")print(f"用户名: {match.group(1)}")print(f"域名: {match.group(2)}")print(f"后缀: {match.group(3)}")print()# 5. 分割csv_data = "张三,30,北京,工程师"items = re.split(r',', csv_data)print(f"CSV分割: {items}") # ['张三', '30', '北京', '工程师']# 6. 标志位multi_line_text = """第一行第二行第三行"""# 多行模式matches = re.findall(r'^第.*行$', multi_line_text, re.MULTILINE)print(f"多行匹配: {matches}") # ['第一行', '第二行', '第三行']# 忽略大小写text_en = "Hello World, HELLO PYTHON"matches = re.findall(r'hello', text_en, re.IGNORECASE)print(f"忽略大小写匹配: {matches}") # ['Hello', 'HELLO']
classLogAnalyzer:"""日志分析器"""def__init__(self):self.patterns = {'error': r'ERROR.*','warning': r'WARNING.*','info': r'INFO.*','timestamp': r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}','ip_address': r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}','url': r'https?://[^\s]+','email': r'[\w\.-]+@[\w\.-]+\.\w+' }defanalyze_file(self, filepath):"""分析日志文件"""ifnot os.path.exists(filepath):print(f"文件不存在: {filepath}")returnNonewithopen(filepath, 'r', encoding='utf-8') as f: content = f.read()returnself.analyze_content(content)defanalyze_content(self, content):"""分析日志内容""" results = {}# 统计各种日志级别for level, pattern in [('error', self.patterns['error']), ('warning', self.patterns['warning']), ('info', self.patterns['info'])]: matches = re.findall(pattern, content, re.IGNORECASE) results[f'{level}_count'] = len(matches) results[f'{level}_examples'] = matches[:5] # 前5个示例# 提取时间戳 timestamps = re.findall(self.patterns['timestamp'], content) results['timestamp_count'] = len(timestamps)if timestamps: results['first_timestamp'] = timestamps[0] results['last_timestamp'] = timestamps[-1]# 提取IP地址 ip_addresses = re.findall(self.patterns['ip_address'], content) results['unique_ips'] = len(set(ip_addresses)) results['ip_examples'] = list(set(ip_addresses))[:5]# 提取URL urls = re.findall(self.patterns['url'], content) results['url_count'] = len(urls) results['url_examples'] = urls[:5]# 错误频率分析 error_lines = re.findall(self.patterns['error'], content, re.IGNORECASE) error_patterns = {}for line in error_lines:# 提取错误类型(如"ConnectionError", "Timeout"等)match = re.search(r'ERROR:\s*([^:]+)', line)ifmatch: error_type = match.group(1).strip() error_patterns[error_type] = error_patterns.get(error_type, 0) + 1 results['error_patterns'] = error_patternsreturn resultsdefgenerate_report(self, results):"""生成分析报告"""ifnot results:return"无分析结果" report = [] report.append("=" * 50) report.append("日志分析报告") report.append("=" * 50) report.append(f"\n📊 日志级别统计:") report.append(f" ERROR: {results.get('error_count', 0)} 条") report.append(f" WARNING: {results.get('warning_count', 0)} 条") report.append(f" INFO: {results.get('info_count', 0)} 条") report.append(f"\n🕐 时间范围:") report.append(f" 第一条: {results.get('first_timestamp', 'N/A')}") report.append(f" 最后一条: {results.get('last_timestamp', 'N/A')}") report.append(f"\n🌐 网络信息:") report.append(f" 唯一IP地址: {results.get('unique_ips', 0)} 个") report.append(f"\n🔍 错误模式分析:")for error_type, count in results.get('error_patterns', {}).items(): report.append(f" {error_type}: {count} 次")if results.get('error_examples'): report.append(f"\n📝 ERROR示例:")for i, example inenumerate(results['error_examples'], 1): report.append(f" {i}. {example[:80]}...")return"\n".join(report)# 使用示例# analyzer = LogAnalyzer()# results = analyzer.analyze_file("app.log")# if results:# print(analyzer.generate_report(results))
itertools模块提供了创建和使用迭代器的各种函数,用于高效处理循环和组合问题。
import itertools# 1. 无限迭代器print("无限迭代器示例:")# 从10开始的计数counter = itertools.count(start=10, step=2)print(f"计数: {next(counter)}, {next(counter)}, {next(counter)}") # 10, 12, 14# 循环迭代cycler = itertools.cycle(['A', 'B', 'C'])print(f"循环: {next(cycler)}, {next(cycler)}, {next(cycler)}") # A, B, C# 重复元素repeater = itertools.repeat('Python', 3)print(f"重复: {list(repeater)}") # ['Python', 'Python', 'Python']# 2. 有限迭代器print("\n有限迭代器示例:")# 累积计算numbers = [1, 2, 3, 4, 5]print(f"累积和: {list(itertools.accumulate(numbers))}") # [1, 3, 6, 10, 15]# 成对组合pairs = list(itertools.pairwise(numbers))print(f"成对组合: {pairs}") # [(1, 2), (2, 3), (3, 4), (4, 5)]# 3. 组合迭代器print("\n组合迭代器示例:")# 排列(顺序重要)items = ['A', 'B', 'C']permutations = list(itertools.permutations(items, 2))print(f"排列 (AB != BA): {permutations}")# [('A', 'B'), ('A', 'C'), ('B', 'A'), ('B', 'C'), ('C', 'A'), ('C', 'B')]# 组合(顺序不重要)combinations = list(itertools.combinations(items, 2))print(f"组合 (AB = BA): {combinations}") # [('A', 'B'), ('A', 'C'), ('B', 'C')]# 笛卡尔积product = list(itertools.product([1, 2], ['A', 'B']))print(f"笛卡尔积: {product}") # [(1, 'A'), (1, 'B'), (2, 'A'), (2, 'B')]# 4. 分组操作print("\n分组操作示例:")data = [ ('苹果', '水果'), ('香蕉', '水果'), ('胡萝卜', '蔬菜'), ('土豆', '蔬菜'), ('鸡肉', '肉类')]# 按类别分组(需要先排序)data.sort(key=lambda x: x[1])for category, group in itertools.groupby(data, key=lambda x: x[1]): items = [item[0] for item in group]print(f"{category}: {', '.join(items)}")# 5. 链式操作print("\n链式操作示例:")chain1 = [1, 2, 3]chain2 = ['A', 'B', 'C']chain3 = (True, False)chained = list(itertools.chain(chain1, chain2, chain3))print(f"链式连接: {chained}") # [1, 2, 3, 'A', 'B', 'C', True, False]# 压缩操作(按布尔值过滤)values = [1, 2, 3, 4, 5]selectors = [True, False, True, False, True]compressed = list(itertools.compress(values, selectors))print(f"压缩过滤: {compressed}") # [1, 3, 5]
classDataPipeline:"""数据处理管道""" @staticmethoddefbatch_processor(data, batch_size=10):"""批量处理数据""" it = iter(data)whileTrue: batch = list(itertools.islice(it, batch_size))ifnot batch:returnyield batch @staticmethoddefsliding_window(data, window_size=3):"""滑动窗口""" it = iter(data) window = list(itertools.islice(it, window_size))iflen(window) < window_size:returnyieldtuple(window)for item in it: window.pop(0) window.append(item)yieldtuple(window) @staticmethoddefcombinations_filter(data, min_size=1, max_size=3):"""组合过滤器""" result = []for r inrange(min_size, min(max_size, len(data)) + 1):for combo in itertools.combinations(data, r): result.append(combo)return result @staticmethoddefcartesian_product(dict_of_lists):"""字典列表的笛卡尔积""" keys = list(dict_of_lists.keys()) values = list(dict_of_lists.values())for product in itertools.product(*values):yielddict(zip(keys, product)) @staticmethoddefpipeline_example():"""管道示例:数据处理流程"""# 生成测试数据 numbers = range(1, 101)# 1. 批量处理print("批量处理(每批10个):")for i, batch inenumerate(DataPipeline.batch_processor(numbers, 10), 1):print(f" 批次{i}: {batch[:5]}...") # 只显示前5个if i >= 3: # 只显示3批break# 2. 滑动窗口print("\n滑动窗口(窗口大小5):") sample_data = list(range(1, 11))for i, window inenumerate(DataPipeline.sliding_window(sample_data, 5), 1):print(f" 窗口{i}: {window}")if i >= 3: # 只显示3个窗口break# 3. 组合过滤器print("\n组合过滤器:") small_set = ['A', 'B', 'C', 'D'] combos = DataPipeline.combinations_filter(small_set, 2, 3)print(f" 2-3个元素的组合数: {len(combos)}")print(f" 前5个组合: {combos[:5]}")# 4. 笛卡尔积print("\n笛卡尔积(参数组合):") param_grid = {'learning_rate': [0.01, 0.1, 1.0],'batch_size': [32, 64, 128],'optimizer': ['SGD', 'Adam'] } param_count = 0for params in DataPipeline.cartesian_product(param_grid): param_count += 1if param_count <= 3: # 只显示3种组合print(f" 组合{param_count}: {params}")print(f" 总参数组合数: {param_count}")# 使用示例# DataPipeline.pipeline_example()
functools模块提供了用于高阶函数和函数式编程的工具,包括装饰器、偏函数等。
import functools# 1. lru_cache:函数缓存(记忆化)@functools.lru_cache(maxsize=128)deffibonacci(n):"""计算斐波那契数列(使用缓存优化)"""if n < 2:return nreturn fibonacci(n-1) + fibonacci(n-2)print("斐波那契数列(使用缓存):")for i inrange(10):print(f" fib({i}) = {fibonacci(i)}")# 查看缓存信息print(f"\n缓存信息: {fibonacci.cache_info()}")# 2. partial:偏函数(固定部分参数)defpower(base, exponent):"""计算幂"""return base ** exponent# 创建平方函数(固定exponent=2)square = functools.partial(power, exponent=2)print(f"\n平方函数:")print(f" square(5) = {square(5)}") # 25print(f" square(10) = {square(10)}") # 100# 创建立方函数(固定exponent=3)cube = functools.partial(power, exponent=3)print(f" cube(3) = {cube(3)}") # 27# 3. wraps:装饰器工具deflog_decorator(func):"""记录函数调用的装饰器""" @functools.wraps(func)defwrapper(*args, **kwargs):print(f"调用函数: {func.__name__}")print(f"参数: args={args}, kwargs={kwargs}") result = func(*args, **kwargs)print(f"结果: {result}")return resultreturn wrapper@log_decoratordefadd(a, b):"""加法函数"""return a + bprint(f"\n装饰器示例:")print(f" 函数名: {add.__name__}") # add(而不是wrapper)print(f" 文档: {add.__doc__}") # 加法函数print(f" 调用结果: {add(3, 5)}")# 4. reduce:累积计算numbers = [1, 2, 3, 4, 5]# 计算乘积product = functools.reduce(lambda x, y: x * y, numbers)print(f"\nreduce示例:")print(f" 列表: {numbers}")print(f" 乘积: {product}") # 120# 计算最大值max_value = functools.reduce(lambda x, y: x if x > y else y, numbers)print(f" 最大值: {max_value}") # 5# 5. total_ordering:简化比较运算符@functools.total_orderingclassStudent:def__init__(self, name, score):self.name = nameself.score = scoredef__eq__(self, other):returnself.score == other.scoredef__lt__(self, other):returnself.score < other.scoredef__repr__(self):returnf"Student({self.name}, {self.score})"print(f"\ntotal_ordering示例:")students = [ Student("张三", 85), Student("李四", 92), Student("王五", 78)]sorted_students = sorted(students)print(f" 排序后: {sorted_students}")print(f" 张三 < 李四? {students[0] < students[1]}")print(f" 王五 >= 张三? {students[2] >= students[0]}")
classFunctionalTools:"""函数式编程工具集""" @staticmethoddefmemoize(func):"""记忆化装饰器(通用版)""" cache = {} @functools.wraps(func)defwrapper(*args, **kwargs):# 创建缓存键(考虑位置参数和关键字参数) key = (args, tuple(sorted(kwargs.items())))if key notin cache: cache[key] = func(*args, **kwargs)return cache[key] wrapper.cache = cachereturn wrapper @staticmethoddefcompose(*functions):"""函数组合:compose(f, g, h)(x) = f(g(h(x)))"""defcomposed(arg): result = argfor func inreversed(functions): result = func(result)return resultreturn composed @staticmethoddefpipe(*functions):"""管道:pipe(f, g, h)(x) = h(g(f(x)))"""defpiped(arg): result = argfor func in functions: result = func(result)return resultreturn piped @staticmethoddefcurry(func):"""柯里化装饰器""" @functools.wraps(func)defcurried(*args, **kwargs):iflen(args) + len(kwargs) >= func.__code__.co_argcount:return func(*args, **kwargs)defpartial(*more_args, **more_kwargs): new_args = args + more_args new_kwargs = kwargs.copy() new_kwargs.update(more_kwargs)return curried(*new_args, **new_kwargs)return partialreturn curried @staticmethoddefdemonstrate():"""演示函数式工具"""print("函数式编程工具演示:")print("=" * 50)# 1. 记忆化示例 @FunctionalTools.memoizedefexpensive_calculation(n):print(f" 计算 expensive_calculation({n})...")return n * nprint("\n1. 记忆化(相同参数不重复计算):")print(f" 第一次: {expensive_calculation(5)}")print(f" 第二次(从缓存): {expensive_calculation(5)}")print(f" 新参数: {expensive_calculation(6)}")# 2. 函数组合示例defdouble(x):return x * 2defincrement(x):return x + 1defsquare(x):return x * x composed = FunctionalTools.compose(double, increment, square)print(f"\n2. 函数组合: compose(double, increment, square)(3)")print(f" 结果: {composed(3)}") # double(increment(square(3))) = double(increment(9)) = double(10) = 20# 3. 管道示例 piped = FunctionalTools.pipe(double, increment, square)print(f"\n3. 管道: pipe(double, increment, square)(3)")print(f" 结果: {piped(3)}") # square(increment(double(3))) = square(increment(6)) = square(7) = 49# 4. 柯里化示例 @FunctionalTools.currydefmultiply_three(a, b, c):return a * b * cprint(f"\n4. 柯里化: multiply_three(2)(3)(4)") step1 = multiply_three(2) step2 = step1(3) result = step2(4)print(f" 结果: {result}") # 24# 也可以一步完成print(f" 直接调用: {multiply_three(2, 3, 4)}") # 24# 使用示例# FunctionalTools.demonstrate()
#!/usr/bin/env python3"""自动备份脚本功能:备份指定目录到目标位置,支持增量备份、压缩、日志记录"""import osimport sysimport jsonimport shutilimport hashlibimport datetimeimport argparsefrom pathlib import PathclassAutoBackup:"""自动备份系统"""def__init__(self, config_path="backup_config.json"):self.config = self._load_config(config_path)self.backup_history = []def_load_config(self, config_path):"""加载配置文件""" default_config = {"source_dirs": ["./important_data"],"backup_dir": "./backups","max_backups": 10,"compress": True,"log_file": "./backup.log","exclude_patterns": [".git", "__pycache__", "*.tmp"] }try:if os.path.exists(config_path):withopen(config_path, "r", encoding="utf-8") as f: user_config = json.load(f)# 合并配置 default_config.update(user_config)except (json.JSONDecodeError, IOError) as e:print(f"警告: 配置文件加载失败,使用默认配置 ({e})")return default_configdef_calculate_md5(self, filepath):"""计算文件的MD5哈希值""" hash_md5 = hashlib.md5()withopen(filepath, "rb") as f:for chunk initer(lambda: f.read(4096), b""): hash_md5.update(chunk)return hash_md5.hexdigest()def_scan_files(self, source_dir):"""扫描目录中的文件,生成文件列表和哈希值""" files_info = []for root, dirs, files in os.walk(source_dir):# 排除不需要的目录 dirs[:] = [d for d in dirs ifnotany( pattern in d for pattern inself.config["exclude_patterns"] )]for file in files:# 排除不需要的文件ifany(file.endswith(pattern.strip("*")) for pattern inself.config["exclude_patterns"] if"*"in pattern):continue full_path = os.path.join(root, file)try: file_info = {'path': full_path,'size': os.path.getsize(full_path),'modified': os.path.getmtime(full_path),'hash': self._calculate_md5(full_path),'relative_path': os.path.relpath(full_path, source_dir) } files_info.append(file_info)except OSError as e:print(f"警告: 无法读取文件 {full_path}: {e}")return files_infodef_create_backup_name(self):"""创建备份名称""" timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")returnf"backup_{timestamp}"def_log_message(self, message, level="INFO"):"""记录日志""" timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") log_entry = f"[{timestamp}] [{level}] {message}"print(log_entry)# 写入日志文件try:withopen(self.config["log_file"], "a", encoding="utf-8") as f: f.write(log_entry + "\n")except IOError as e:print(f"警告: 无法写入日志文件: {e}")def_cleanup_old_backups(self):"""清理旧的备份"""try: backup_path = Path(self.config["backup_dir"])ifnot backup_path.exists():return backups = sorted(backup_path.iterdir(), key=os.path.getmtime)# 保留最新的 max_backups 个备份whilelen(backups) > self.config["max_backups"]: old_backup = backups.pop(0)try:if old_backup.is_file(): old_backup.unlink()else: shutil.rmtree(old_backup)self._log_message(f"删除旧备份: {old_backup.name}")except OSError as e:self._log_message(f"删除备份失败 {old_backup.name}: {e}", "WARNING")except OSError as e:self._log_message(f"清理备份时出错: {e}", "ERROR")defrun_backup(self):"""执行备份操作"""self._log_message("开始备份操作")# 1. 检查源目录 source_dirs = self.config["source_dirs"] valid_sources = []for source_dir in source_dirs:if os.path.exists(source_dir): valid_sources.append(source_dir)else:self._log_message(f"源目录不存在: {source_dir}", "WARNING")ifnot valid_sources:self._log_message("错误: 没有有效的源目录", "ERROR")returnFalse# 2. 创建备份目录 backup_dir = self.config["backup_dir"] backup_name = self._create_backup_name() full_backup_path = os.path.join(backup_dir, backup_name)try: os.makedirs(full_backup_path, exist_ok=True)self._log_message(f"创建备份目录: {full_backup_path}")except OSError as e:self._log_message(f"创建备份目录失败: {e}", "ERROR")returnFalse# 3. 备份文件 total_files = 0 total_size = 0for source_dir in valid_sources:self._log_message(f"备份源目录: {source_dir}") files_info = self._scan_files(source_dir)for file_info in files_info: source_path = file_info['path'] relative_path = file_info['relative_path'] dest_path = os.path.join(full_backup_path, relative_path)# 创建目标目录 dest_dir = os.path.dirname(dest_path) os.makedirs(dest_dir, exist_ok=True)# 复制文件try: shutil.copy2(source_path, dest_path) total_files += 1 total_size += file_info['size']except OSError as e:self._log_message(f"复制文件失败 {source_path}: {e}", "WARNING")# 4. 压缩备份(如果启用)ifself.config["compress"]:try: shutil.make_archive( full_backup_path,'zip', root_dir=backup_dir, base_dir=backup_name )# 删除原始备份目录 shutil.rmtree(full_backup_path) backup_file = f"{full_backup_path}.zip"self._log_message(f"备份已压缩: {backup_file}")except OSError as e:self._log_message(f"压缩备份失败: {e}", "ERROR") backup_file = full_backup_pathelse: backup_file = full_backup_path# 5. 记录备份信息 backup_info = {'timestamp': datetime.datetime.now().isoformat(),'name': backup_name,'source_dirs': valid_sources,'file_count': total_files,'total_size': total_size,'compressed': self.config["compress"],'path': backup_file }self.backup_history.append(backup_info)# 6. 清理旧备份self._cleanup_old_backups()# 7. 保存备份历史 history_file = os.path.join(backup_dir, "backup_history.json")try:withopen(history_file, "w", encoding="utf-8") as f: json.dump(self.backup_history, f, indent=2, ensure_ascii=False)except IOError as e:self._log_message(f"保存备份历史失败: {e}", "WARNING")self._log_message(f"备份完成: {total_files} 个文件,"f"总大小: {total_size / (1024*1024):.2f} MB" )returnTruedefmain():"""主函数""" parser = argparse.ArgumentParser(description="自动备份工具") parser.add_argument("--config", default="backup_config.json",help="配置文件路径") parser.add_argument("--list", action="store_true",help="列出备份历史") parser.add_argument("--restore", metavar="BACKUP_NAME",help="恢复指定备份") args = parser.parse_args()if args.list:# 列出备份历史 history_file = os.path.join("backups", "backup_history.json")if os.path.exists(history_file):withopen(history_file, "r", encoding="utf-8") as f: history = json.load(f)print(f"{'备份名称':<25}{'时间':<25}{'文件数':<8}{'大小':<12}")print("-" * 70)for item in history[-10:]: # 显示最近10个 size_mb = item['total_size'] / (1024*1024)print(f"{item['name']:<25}{item['timestamp']:<25} "f"{item['file_count']:<8}{size_mb:<8.2f} MB")else:print("没有备份历史")return# 执行备份 backup = AutoBackup(args.config) success = backup.run_backup() sys.exit(0if success else1)if __name__ == "__main__": main()
#!/usr/bin/env python3"""数据处理与分析管道功能:读取多种格式数据,进行清洗、转换、分析,生成报告"""import osimport sysimport jsonimport csvimport reimport mathimport statisticsimport itertoolsimport functoolsfrom datetime import datetime, timedeltafrom collections import defaultdict, Counterfrom typing importList, Dict, Any, Optional, TupleclassDataPipeline:"""数据处理管道"""def__init__(self, config=None):self.config = config or {}self.data = []self.results = {} @staticmethoddefread_csv(filepath, delimiter=",", encoding="utf-8"):"""读取CSV文件""" data = []try:withopen(filepath, "r", encoding=encoding) as f: reader = csv.DictReader(f, delimiter=delimiter)for row in reader: data.append(row)print(f"成功读取CSV文件: {filepath} ({len(data)} 行)")return dataexcept (IOError, csv.Error) as e:print(f"读取CSV文件失败 {filepath}: {e}")return [] @staticmethoddefread_json(filepath, encoding="utf-8"):"""读取JSON文件"""try:withopen(filepath, "r", encoding=encoding) as f: data = json.load(f)print(f"成功读取JSON文件: {filepath}")return data ifisinstance(data, list) else [data]except (IOError, json.JSONDecodeError) as e:print(f"读取JSON文件失败 {filepath}: {e}")return [] @staticmethoddefread_text(filepath, delimiter=None, encoding="utf-8"):"""读取文本文件"""try:withopen(filepath, "r", encoding=encoding) as f: lines = f.readlines() data = []for line in lines: line = line.strip()ifnot line:continueif delimiter: parts = line.split(delimiter) data.append(parts)else: data.append(line)print(f"成功读取文本文件: {filepath} ({len(data)} 行)")return dataexcept IOError as e:print(f"读取文本文件失败 {filepath}: {e}")return []defload_data(self, filepaths, file_type="auto"):"""加载数据文件""" loaded_data = []for filepath in filepaths:ifnot os.path.exists(filepath):print(f"文件不存在: {filepath}")continue# 自动检测文件类型if file_type == "auto":if filepath.endswith(".csv"): file_data = self.read_csv(filepath)elif filepath.endswith(".json"): file_data = self.read_json(filepath)else: file_data = self.read_text(filepath)else:# 指定文件类型if file_type == "csv": file_data = self.read_csv(filepath)elif file_type == "json": file_data = self.read_json(filepath)elif file_type == "text": file_data = self.read_text(filepath)else:print(f"不支持的文件类型: {file_type}")continue loaded_data.extend(file_data)self.data = loaded_dataprint(f"总共加载 {len(self.data)} 条数据")returnselfdeffilter_data(self, condition_func):"""过滤数据"""self.data = list(filter(condition_func, self.data))print(f"过滤后剩余 {len(self.data)} 条数据")returnselfdeftransform_data(self, transform_func):"""转换数据"""self.data = list(map(transform_func, self.data))returnselfdefclean_data(self):"""数据清洗"""ifnotself.data:returnself# 假设数据是字典列表 cleaned = []for item inself.data:ifnotisinstance(item, dict):continue# 移除空值字段 cleaned_item = {k: v for k, v in item.items() if v notin [None, "", "null", "NULL", "Null"]}# 标准化字符串(去除空格)for key, value in cleaned_item.items():ifisinstance(value, str): cleaned_item[key] = value.strip() cleaned.append(cleaned_item)self.data = cleanedprint(f"数据清洗完成,{len(self.data)} 条有效数据")returnselfdefanalyze_data(self):"""数据分析"""ifnotself.data:print("没有数据可分析")returnself# 基础统计self.results['basic_stats'] = {'total_records': len(self.data),'first_record': self.data[0] ifself.data elseNone,'last_record': self.data[-1] ifself.data elseNone }# 字段分析ifself.data andisinstance(self.data[0], dict): fields = list(self.data[0].keys())self.results['fields'] = fields# 数值字段统计 numeric_fields = []for field in fields: values = []for item inself.data: value = item.get(field)ifisinstance(value, (int, float)): values.append(value)if values: numeric_fields.append(field)self.results[f'{field}_stats'] = {'count': len(values),'mean': statistics.mean(values),'median': statistics.median(values),'std': statistics.stdev(values) iflen(values) > 1else0,'min': min(values),'max': max(values) }self.results['numeric_fields'] = numeric_fields# 分类字段统计 categorical_fields = []for field in fields:if field notin numeric_fields: values = [str(item.get(field, '')) for item inself.data] counter = Counter(values)if counter: categorical_fields.append(field)self.results[f'{field}_distribution'] = dict(counter.most_common(10))self.results['categorical_fields'] = categorical_fieldsprint(f"数据分析完成,生成 {len(self.results)} 项结果")returnselfdefgenerate_report(self, report_file="data_report.md"):"""生成分析报告"""ifnotself.results:print("没有分析结果可报告")returnself report_lines = []# 报告标题 report_lines.append("# 数据分析报告") report_lines.append(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") report_lines.append("")# 基础信息 report_lines.append("## 1. 基础信息")if'basic_stats'inself.results: stats = self.results['basic_stats'] report_lines.append(f"- 总记录数: {stats['total_records']}") report_lines.append("")# 字段信息if'fields'inself.results: report_lines.append("## 2. 字段信息") report_lines.append(f"- 总字段数: {len(self.results['fields'])}")if'numeric_fields'inself.results: report_lines.append(f"- 数值字段: {', '.join(self.results['numeric_fields'])}")if'categorical_fields'inself.results: report_lines.append(f"- 分类字段: {', '.join(self.results['categorical_fields'])}") report_lines.append("")# 数值字段统计if'numeric_fields'inself.results: report_lines.append("## 3. 数值字段统计")for field inself.results['numeric_fields']: stats_key = f'{field}_stats'if stats_key inself.results: stats = self.results[stats_key] report_lines.append(f"### {field}") report_lines.append(f"- 有效值数量: {stats['count']}") report_lines.append(f"- 平均值: {stats['mean']:.4f}") report_lines.append(f"- 中位数: {stats['median']:.4f}") report_lines.append(f"- 标准差: {stats['std']:.4f}") report_lines.append(f"- 最小值: {stats['min']:.4f}") report_lines.append(f"- 最大值: {stats['max']:.4f}") report_lines.append("")# 分类字段分布if'categorical_fields'inself.results: report_lines.append("## 4. 分类字段分布")for field inself.results['categorical_fields']: dist_key = f'{field}_distribution'if dist_key inself.results: dist = self.results[dist_key] report_lines.append(f"### {field} (Top 10)")for value, count in dist.items(): percentage = (count / self.results['basic_stats']['total_records']) * 100 report_lines.append(f"- {value}: {count} ({percentage:.1f}%)") report_lines.append("")# 写入报告文件try:withopen(report_file, "w", encoding="utf-8") as f: f.write("\n".join(report_lines))print(f"报告已生成: {report_file}")except IOError as e:print(f"生成报告失败: {e}")returnselfdefexport_data(self, output_file, format="csv"):"""导出数据"""ifnotself.data:print("没有数据可导出")returnselftry:ifformat == "csv":withopen(output_file, "w", newline="", encoding="utf-8") as f:ifself.data andisinstance(self.data[0], dict): writer = csv.DictWriter(f, fieldnames=self.data[0].keys()) writer.writeheader() writer.writerows(self.data)else: writer = csv.writer(f) writer.writerows(self.data)elifformat == "json":withopen(output_file, "w", encoding="utf-8") as f: json.dump(self.data, f, ensure_ascii=False, indent=2)else:print(f"不支持的导出格式: {format}")returnselfprint(f"数据已导出到: {output_file} ({format}格式)")except IOError as e:print(f"导出数据失败: {e}")returnselfdefmain():"""主函数示例"""# 创建数据处理管道 pipeline = DataPipeline()# 示例:处理销售数据print("=" * 60)print("数据处理管道示例 - 销售数据分析")print("=" * 60)# 假设有数据文件# 这里使用虚拟数据演示 sample_data = [ {"date": "2024-01-01", "product": "A", "quantity": 10, "price": 100}, {"date": "2024-01-01", "product": "B", "quantity": 5, "price": 200}, {"date": "2024-01-02", "product": "A", "quantity": 8, "price": 100}, {"date": "2024-01-02", "product": "C", "quantity": 3, "price": 150}, {"date": "2024-01-03", "product": "B", "quantity": 7, "price": 200}, {"date": "2024-01-03", "product": "C", "quantity": 4, "price": 150}, ]# 保存为临时CSV文件演示import tempfilewith tempfile.NamedTemporaryFile(mode="w", suffix=".csv", delete=False) as f: writer = csv.DictWriter(f, fieldnames=["date", "product", "quantity", "price"]) writer.writeheader() writer.writerows(sample_data) temp_file = f.nametry:# 使用管道处理 (pipeline .load_data([temp_file], "csv") .clean_data() .transform_data(lambda x: { **x,"total": int(x["quantity"]) * float(x["price"]) }) .analyze_data() .generate_report("sales_report.md") .export_data("sales_processed.csv", "csv"))# 显示部分结果print("\n分析结果摘要:")if'basic_stats'in pipeline.results:print(f"- 总记录数: {pipeline.results['basic_stats']['total_records']}")if'numeric_fields'in pipeline.results:for field in pipeline.results['numeric_fields']: stats_key = f'{field}_stats'if stats_key in pipeline.results: stats = pipeline.results[stats_key]print(f"- {field}: 平均={stats['mean']:.2f}, 最大={stats['max']}")finally:# 清理临时文件if os.path.exists(temp_file): os.unlink(temp_file)print("\n数据处理管道示例完成!")if __name__ == "__main__": main()
- 1. os模块中,哪个函数用于安全地拼接路径?A) os.path.join()B) os.concat_path()C) os.merge_path()D) os.path.concat()
- 2. sys.argv[0] 表示什么?A) 第一个命令行参数B) 脚本名称C) 参数个数D) Python解释器路径
- 3. datetime模块中,哪个函数可以将字符串转换为datetime对象?A) datetime.parse()B) datetime.strptime()C) datetime.from_string()D) datetime.decode()
- 4. math模块中,math.isinf() 用于判断什么?A) 是否为整数B) 是否为无穷大C) 是否为NaND) 是否为有限数
- 5. random模块中,哪个函数可以从序列中随机选择不重复的多个元素?A) random.choice()B) random.choices()C) random.sample()D) random.selection()
- 6. json模块中,哪个参数可以确保中文字符正确显示?A) ensure_ascii=FalseB) encoding='utf-8'C) unicode=TrueD) ascii=False
- 7. re模块中,哪个函数用于替换匹配的文本?A) re.replace()B) re.sub()C) re.exchange()D) re.swap()
- 8. itertools模块中,哪个函数用于生成排列?A) itertools.combinations()B) itertools.permutations()C) itertools.arrangements()D) itertools.sequences()
- 9. functools模块中,@lru_cache装饰器的主要作用是什么?A) 记录函数调用B) 缓存函数结果C) 柯里化函数D) 偏函数应用
- 10. 哪个模块提供了处理命令行参数的更高级功能?A) osB) sysC) argparseD) getopt
答案:1.A 2.B 3.B 4.B 5.C 6.A 7.B 8.B 9.B 10.C
任务要求:实现一个完整的系统监控工具,具体要求如下:
- • 文件系统监控:监控指定目录的文件变化(新增、修改、删除)
- 3. 实现步骤:步骤1:配置文件管理
# 实现ConfigManager类,支持:# 1. 加载JSON配置文件# 2. 提供默认配置# 3. 配置验证
步骤2:文件系统监控器# 实现FileMonitor类,支持:# 1. 递归监控目录变化# 2. 检测文件新增、修改、删除# 3. 计算文件哈希值(用于检测内容变化)
步骤3:系统资源监控器# 实现ResourceMonitor类,支持:# 1. 监控CPU使用率# 2. 监控内存使用情况# 3. 设置阈值告警
步骤4:日志管理器# 实现LogManager类,支持:# 1. 结构化日志记录# 2. 日志轮转(按大小或时间)# 3. 日志分析(提取关键信息)
步骤5:报告生成器# 实现ReportGenerator类,支持:# 1. 生成HTML格式报告# 2. 包含图表和数据表格# 3. 支持邮件发送报告
步骤6:主程序集成# 实现SystemMonitor主类,集成所有模块:# 1. 读取配置# 2. 启动监控线程# 3. 处理信号(优雅退出)# 4. 定时生成报告
- • 实现实时Web仪表板(使用http.server或第三方库)
import osimport sysimport jsonimport timeimport threadingimport signalfrom datetime import datetimefrom collections import defaultdictfrom typing importDict, List, OptionalclassConfigManager:"""配置管理器"""passclassFileMonitor:"""文件系统监控器"""passclassResourceMonitor:"""系统资源监控器"""passclassLogManager:"""日志管理器"""passclassReportGenerator:"""报告生成器"""passclassSystemMonitor:"""系统监控主类"""def__init__(self, config_path="monitor_config.json"):self.config_manager = ConfigManager(config_path)self.config = self.config_manager.load_config()self.file_monitor = FileMonitor(self.config['monitor_dirs'])self.resource_monitor = ResourceMonitor()self.log_manager = LogManager(self.config['log_file'])self.report_generator = ReportGenerator()self.running = Falseself.threads = []defstart(self):"""启动监控"""self.running = True# 启动文件监控线程 file_thread = threading.Thread(target=self._monitor_files) file_thread.daemon = True file_thread.start()self.threads.append(file_thread)# 启动资源监控线程 resource_thread = threading.Thread(target=self._monitor_resources) resource_thread.daemon = True resource_thread.start()self.threads.append(resource_thread)# 启动报告生成定时器 report_thread = threading.Thread(target=self._generate_reports) report_thread.daemon = True report_thread.start()self.threads.append(report_thread)print("系统监控已启动")defstop(self):"""停止监控"""self.running = Falsefor thread inself.threads: thread.join(timeout=5)print("系统监控已停止")def_monitor_files(self):"""监控文件变化"""whileself.running: changes = self.file_monitor.check_changes()if changes:self.log_manager.log_file_changes(changes) time.sleep(self.config['file_check_interval'])def_monitor_resources(self):"""监控系统资源"""whileself.running: stats = self.resource_monitor.get_stats()self.log_manager.log_resource_stats(stats) time.sleep(self.config['resource_check_interval'])def_generate_reports(self):"""生成报告"""whileself.running: time.sleep(self.config['report_interval']) report = self.report_generator.generate_report(self.log_manager.get_recent_logs() )self.log_manager.log_report_generated(report)defmain():"""主函数""" monitor = SystemMonitor()# 设置信号处理defsignal_handler(signum, frame):print(f"\n收到信号 {signum},准备退出...") monitor.stop() sys.exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler)# 启动监控 monitor.start()# 主线程等待try:whileTrue: time.sleep(1)except KeyboardInterrupt: monitor.stop()if __name__ == "__main__": main()
通过今天的实战,咱们一起掌握了Python最常用的内置模块:
- 3. datetime模块:时间日期处理全方位解决方案
- 8. itertools模块:迭代器工具与高效循环
- 9. functools模块:函数式编程与高阶函数工具
- • ✅ 数据处理:高效处理时间、数学、随机、JSON数据
记住:熟练掌握Python内置模块,是成为Python高手的第一步。在实际开发中,先查看标准库是否有现成解决方案,避免重复造轮子。