在第四章的结尾我们给了一个练习题,让处理一份含有脏数据的电子订单,并分类统计出销售额。数据如下:
[ {"id": 1, "product": "笔记本", "category": "电子设备", "price": "¥4589.15", "quantity": 2}, {"id": 2, "product": "鼠标", "category": "电子设备", "price": 123.0, "quantity": 5}, {"id": 3, "product": "书桌椅", "category": "家具", "price": "199.99", "quantity": 1}, {"id": 4, "product": "办公笔记本", "category": "文具", "price": "¥25.00", "quantity": None}, # 数量缺失 {"id": 5, "product": "键盘", "category": "电子设备", "price": "¥1,000", "quantity": -3}, # 负数数量 {"id": 6, "product": "Python从入门到放弃", "category": "书籍", "price": "invalid_price", "quantity": 10} # 价格错误]我写了几个函数用于实现数据的处理:
第一个函数是clean_order,用于处理单条订单数据,清洗掉有问题的数据,将数据的结果统一起来。
第二个函数是batch_process,用于批量处理订单数据,这会调用clean_order函数,并将最终的处理结果返回。
第三个函数是statistical_result,用于整合统计结果。
第四个函数是execute_time,这是利用装饰器做的一个程序运行计时器。
还有第五个函数run_analysis,用于将这些函数的运行编排起来,方便调用。
最后写一个程序运行的入口,执行程序。
具体实现代码如下:
import timedef clean_order(order: dict) -> dict | None: """ 清洗单条订单 :param order: 原始订单记录 :type order: dict :return: 清洗后的订单记录 :rtype: dict """ # 获取价格数据并转换为字符串,设置默认值为0 price_str = str(order.get('price', 0)) # 移除掉¥和,并且清除首尾空格 cleaned_price_str = price_str.replace('¥', '').replace(',', '').strip() # 转换为float类型 try: price = float(cleaned_price_str) except ValueError: return None # 处理数量 quantity = order.get('quantity', 0) if quantity is None or quantity < 0: quantity = 0 # 计算总价 total = price * quantity # 返回处理好的新的对象 return { "id": order.get('id'), "product": order.get('product'), "category": order.get('category'), "price": price, "quantity": quantity, "total": total }def batch_process(orders: list, filter_func = None) -> list: """ 批量处理订单 :param orders: 原始订单列表数据 :type orders: list :param filter_func: 过滤函数 :return: 处理好的订单列表数据 :rtype: list """ if filter_func is None: filter_func = lambda x : x is not None # 清洗和过滤数据 cleaned_data = map(clean_order, orders) valid_data = filter(filter_func, cleaned_data) return list(valid_data)def statistical_result(orders: list) -> dict: """ 统计分类和销售额 :param orders: 清洗后的订单数据 :type orders: list :return: 统计结果 :rtype: dict """ results = {} for order in orders: category = order.get('category', 'Unknown') total = order.get('total') if category in results: results[category] += total else: results[category] = total #金额保留2位小数 return {k : round(v, 2) for k, v in results.items()}# 装饰器,计时方法def execute_time(func): """ 记录执行时间 :param func: 被装饰的函数 """ def wrapper(*args, **kwargs): start = time.time() res = func(*args, **kwargs) end = time.time() print(f"执行时间:{end - start:.4f} 秒") return res return wrapper@execute_timedef run_analysis(orders): """ 将分析功能串联起来,便于统计运行时间 :param orders: 订单数据 """ batch_data = batch_process(orders) print('---清洗后数据---') for order in batch_data: print(order) res = statistical_result(batch_data) print('---分类统计结果---') for category, total in res.items(): print(f'{category}:{total}元')# 运行程序的入口if __name__ == "__main__": raw_data = [ {"id": 1, "product": "笔记本", "category": "电子设备", "price": "¥4589.15", "quantity": 2}, {"id": 2, "product": "鼠标", "category": "电子设备", "price": 123.0, "quantity": 5}, {"id": 3, "product": "书桌椅", "category": "家具", "price": "199.99", "quantity": 1}, {"id": 4, "product": "办公笔记本", "category": "文具", "price": "¥25.00", "quantity": None}, {"id": 5, "product": "键盘", "category": "电子设备", "price": "¥1,000", "quantity": -3}, {"id": 6, "product": "Python从入门到放弃", "category": "书籍", "price": "invalid_price", "quantity": 10} ] run_analysis(raw_data)在我的机器上运行结果如下:
---清洗后数据---{'id': 1, 'product': '笔记本', 'category': '电子设备', 'price': 4589.15, 'quantity': 2, 'total': 9178.3}{'id': 2, 'product': '鼠标', 'category': '电子设备', 'price': 123.0, 'quantity': 5, 'total': 615.0}{'id': 3, 'product': '书桌椅', 'category': '家具', 'price': 199.99, 'quantity': 1, 'total': 199.99}{'id': 4, 'product': '办公笔记本', 'category': '文具', 'price': 25.0, 'quantity': 0, 'total': 0.0}{'id': 5, 'product': '键盘', 'category': '电子设备', 'price': 1000.0, 'quantity': 0, 'total': 0.0}---分类统计结果---电子设备:9793.3元家具:199.99元文具:0.0元执行时间:0.0009 秒你在练习时遇到了哪些问题呢?我们可以一起来讨论。