题 3:实现一个装饰器,用于统计函数运行时间
@timerdef slow_func():time.sleep(2)slow_func()# 输出: "函数 slow_func 运行时间: 2.00s"
基础实现(推荐用于日常调试)
import timefrom functools import wrapsdef timer(func): @wraps(func) def wrapper(*args, **kwargs): start = time.perf_counter() # 使用高精度计时器 result = func(*args, **kwargs) end = time.perf_counter() print(f"{func.__name__} 执行耗时: {end - start:.6f} 秒") return result return wrapper# 使用示例@timerdef slow_func(): time.sleep(1) # 模拟耗时操作slow_func() # 输出类似:slow_func 执行耗时: 1.001234 秒
关键说明
@wraps(func):保留原函数的元信息(如函数名、文档字符串),避免调试困难
time.perf_counter():推荐用于性能测量,精度高且不受系统时钟调整影响
*args, **‌kwargs:确保装饰器适用于任意参数的函数
进阶版本(支持自定义时间单位)
若需支持毫秒、微秒等单位,可使用带参数的装饰器:
import timefrom functools import wrapsfrom typing import Callable, Anydef timer(unit: str = 'seconds') -> Callable: def decorator(func: Callable) -> Callable: @wraps(func) def wrapper(*args, **kwargs) -> Any: start = time.perf_counter() result = func(*args, **kwargs) end = time.perf_counter() elapsed = end - start if unit == 'milliseconds': elapsed *= 1000 unit_str = '毫秒' elif unit == 'microseconds': elapsed *= 1000000 unit_str = '微秒' else: unit_str = '秒' print(f"{func.__name__} 执行耗时: {elapsed:.6f}{unit_str}") return result return wrapper return decorator# 使用示例@timer('milliseconds')def slow_func(): time.sleep(0.1)slow_func() # 输出:slow_func 执行耗时: 100.123456 毫秒
题 4:数据库查询与聚合
假设你连接了 MySQL 数据库 company,表结构如下:
employee(id INT, name VARCHAR(50), dept VARCHAR(50), salary FLOAT)
请用 Python 写出:
查询每个部门的平均工资;
将结果按平均工资降序输出;
使用 pymysql 或 mysql.connector 实现。
推断的 employee 表结构(基于常见案例)
empno:员工编号(主键)
ename:员工姓名
job:职位
mgr:直接上级编号(自引用)
hiredate:入职日期
sal:月薪
comm:奖金(可能为 NULL)
deptno:部门编号(外键)
常见查询与聚合操作示例
1. 基本聚合查询
- 统计员工总数:
SELECTCOUNT(*) FROM employee;
- 计算平均工资(忽略 NULL):
SELECTAVG(sal) FROM employee;
- 查询最高/最低工资:
SELECTMAX(sal), MIN(sal) FROM employee;
- 按部门分组统计平均工资:
SELECT deptno, AVG(sal) AS avg_salary
FROM employee
GROUPBY deptno;
2. 带条件的聚合(HAVING)
- 查询平均工资高于 2000 的部门:
SELECT deptno, AVG(sal) AS avg_salary
FROM employee
GROUPBY deptno
HAVINGAVG(sal) >2000;
3. 多表关联查询(假设存在 dept 表)
显示员工姓名及其所在部门名称:
SELECT e.ename, d.dname
FROM employee e
INNERJOIN dept d ON e.deptno = d.deptno;
查询每个部门的员工人数及部门位置:
SELECT d.deptno, d.dname, d.loc, COUNT(e.empno) AS emp_count
FROM dept d
LEFTJOIN employee e ON d.deptno = e.deptno
GROUPBY d.deptno;
题5:实现一个自定义上下文管理器
要求能安全打开文件并自动关闭:
with FileManager("test.txt", "w") as f:f.write("hello")# 即使抛出异常,也能确保文件关闭要实现一个自定义上下文管理器来安全地打开文件并自动关闭,我们可以使用Python的contextlib模块中的contextmanager装饰器。下面是一个简单的例子,展示了如何创建一个名为FileManage的类,该类使用上下文管理协议(即__enter__和__exit__方法)来管理文件的打开和关闭。
示例代码
python
import contextlibclass FileManage: def __init__(self, filename, mode='r'): self.filename = filename self.mode = mode self.file = None def __enter__(self): # 在进入上下文时打开文件 self.file = open(self.filename, self.mode) return self.file def __exit__(self, exc_type, exc_value, traceback): # 在退出上下文时关闭文件 if self.file: self.file.close() # 返回False表示异常应该被重新抛出 return False# 使用FileManage类with FileManage('example.txt') as file: # 在这个块中,file是一个打开的文件对象 print(file.read())# 文件在这里自动关闭,不需要手动调用close()
解释
__init__方法:初始化时接收文件名和模式(默认为读取模式'r')。
__enter__方法:当进入with语句块时调用,这里打开文件并返回文件对象。这使得在with块内部可以通过as file获取到文件对象。
__exit__方法:当退出with语句块时调用,这里关闭文件。该方法还接受三个参数(异常类型、异常值、追踪信息),用于处理异常情况。如果文件成功关闭,返回False表示不抑制异常;如果需要捕获异常并处理,可以返回True。
注意事项
在使用with语句块时,如果在块内部发生异常,并且你想捕获并处理该异常而不让它在外部抛出,可以在__exit__方法中捕获异常并返回True。例如:python
def __exit__(self, exc_type, exc_value, traceback): if self.file: self.file.close() if exc_type is not None: # 处理异常... print(f"An error occurred: {exc_value}") return True # 抑制异常的进一步传播 return False # 正常退出时不抑制异常
确保在__exit__方法中检查文件是否已打开,以避免在文件未打开时调用.close()引发错误。这通过在进入方法中初始化self.file = None并在打开文件后设置它来实现
题 6:多线程任务分发
假设有一个函数:
def process_data(x):time.sleep(1)return x * x
要求使用 ThreadPoolExecutor 并发处理 [1,2,3,4,5],并打印所有结果
在Python中,实现多线程任务分发通常有多种方式,例如使用threading模块或concurrent.futures模块。下面将展示如何使用这两种方法来实现你的需求:process_data(x)函数接收一个参数x,休眠1秒后返回结果。
方法1:使用threading模块
python
import threadingimport timedef process_data(x): time.sleep(1) return xdef worker(x): result = process_data(x) print(f"Processed {x}: {result}")# 创建线程列表threads = []# 创建并启动多个线程for i in range(5): # 假设我们想要处理5个数据项 t = threading.Thread(target=worker, args=(i,)) threads.append(t) t.start()# 等待所有线程完成for t in threads: t.join()
方法2:使用concurrent.futures模块(推荐用于更高级的并发控制)
python
from concurrent.futures import ThreadPoolExecutorimport timedef process_data(x): time.sleep(1) return xdef main(): with ThreadPoolExecutor(max_workers=5) as executor: # 最大工作线程数为5 futures = [executor.submit(process_data, i) for i in range(5)] # 提交任务到线程池 for future in futures: result = future.result() # 获取结果 print(f"Processed {future.args[0]}: {result}")if __name__ == "__main__": main()
方法3:使用concurrent.futures的异步方式(Python 3.5+)
python
from concurrent.futures import as_completed, ThreadPoolExecutorimport timedef process_data(x): time.sleep(1) return xdef main(): with ThreadPoolExecutor(max_workers=5) as executor: # 最大工作线程数为5 futures = [executor.submit(process_data, i) for i in range(5)] # 提交任务到线程池 for future in as_completed(futures): # 按完成顺序处理结果 result = future.result() # 获取结果,此时已经是异步的,不需要等待所有任务完成即可获取结果 print(f"Processed {future.args[0]}: {result}")if __name__ == "__main__": main()
总结:
使用threading模块适合简单的多线程任务。
使用concurrent.futures.ThreadPoolExecutor提供了更高级的并发控制,例如可以限制最大工作线程数,并且可以更方便地管理多个异步任务。
使用concurrent.futures.as_completed可以在任务完成时立即处理结果,适用于需要按完成顺序处理结果的场景。