导语在 Python 开发中,想要提升程序运行效率,并发编程是绕不开的话题。面对 I/O 密集型和 CPU 密集型任务,我们该如何优雅地实现多线程与多进程? 今天,我们来彻底搞懂 Python 标准库中的并发神器——concurrent.futures。
concurrent.futures 是 Python 标准库中用于异步执行任务的高层级接口,提供了线程池和进程池两种并行执行方式。它极大地简化了并发编程的复杂度,让我们无需手动管理线程/进程的创建与销毁。
一、 核心基类:Executor 执行器
Executor 是抽象基类,提供了 submit()、map()、shutdown() 三个核心方法。
1. submit():提交单个任务
用于向执行器提交单个任务,返回一个 Future 对象,代表异步执行的结果。
import concurrent.futuresdef task(x, y): return x ** y# 使用 with 语句自动管理资源with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: future = executor.submit(task, 2, 10) # 阻塞等待结果返回 print(f"结果: {future.result()}") # 输出: 结果: 1024
2. map():并行映射
与内置的 map() 函数类似,但它是并行执行的。返回一个迭代器。(注:chunksize 参数仅对进程池有效,用于分块提交任务以提升性能)
import concurrent.futuresimport mathnumbers = [1, 4, 9, 16, 25]with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: results = executor.map(math.sqrt, numbers) for num, result in zip(numbers, results): print(f"sqrt({num}) = {result}")
3. shutdown():关闭执行器
用于释放执行器资源。如果使用 with 语句,会自动调用此方法。
wait=True:等待所有任务完成后再关闭。cancel_futures=True:取消所有尚未执行的任务(Python 3.9+)。
二、 ThreadPoolExecutor:线程池
适用场景:I/O 密集型任务(如网络爬虫、API请求、文件读写)。原理:I/O 等待时,线程会被挂起,此时 CPU 可以切换去执行其他线程。
1. 核心参数
max_workers:最大线程数。默认为 min(32, os.process_cpu_count() + 4)。thread_name_prefix:线程名称前缀,方便调试。
2. 实战:并发网络请求与异常处理
结合 as_completed() 可以按任务完成的先后顺序获取结果,非常适合处理带有异常的网络请求。
import concurrent.futuresimport urllib.requesturls = [ 'https://httpbin.org/delay/1', 'https://httpbin.org/status/500', # 模拟失败请求 'https://httpbin.org/get']def fetch_url(url): with urllib.request.urlopen(url, timeout=10) as response: return f"{url}: 成功"with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: # 建立 Future 到 URL 的映射 future_to_url = {executor.submit(fetch_url, url): url for url in urls} # as_completed 会在 future 完成时立即 yield for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: result = future.result() print(f"✅ {result}") except Exception as exc: print(f"❌ {url} 发生异常: {exc}")
三、 ProcessPoolExecutor:进程池
适用场景:CPU 密集型任务(如大量数学计算、图像处理、数据清洗)。原理:利用多核 CPU 优势,完美突破 Python GIL(全局解释器锁) 的限制。
1. 核心参数
max_workers:最大进程数。默认为 CPU 核心数 cpu_count()。max_tasks_per_child:每个工作进程最大任务数,达到后进程会重启,有效防止内存泄漏。
2. 实战:并行计算素数
import concurrent.futuresimport mathdef is_prime(n): if n < 2: return False if n == 2: return True if n % 2 == 0: return False sqrt_n = int(math.floor(math.sqrt(n))) for i in range(3, sqrt_n + 1, 2): if n % i == 0: return False return Truenumbers = [104729, 104723, 99991, 100003]with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor: results = executor.map(is_prime, numbers) for num, prime in zip(numbers, results): print(f"{num} 是素数: {prime}")
(💡 提示:Python 3.14+ 新增了 terminate_workers() 和 kill_workers() 方法,用于优雅终止或强制杀死工作进程。)
四、 Future 对象:掌控异步状态
Future 封装了异步执行的任务,可以把它看作是一张“提货券”。
1. 获取结果与检查状态
result(timeout=None):阻塞获取结果。可设置超时时间。done():任务是否完成(正常结束、异常或取消均返回 True)。cancel():尝试取消任务。注意:只有排队中尚未开始执行的任务才能被取消,正在运行的无法取消。
2. 异步回调:add_done_callback()
任务完成后自动触发回调函数,无需阻塞主线程等待。
import concurrent.futuresdef task(x): if x < 0: raise ValueError("负数不能计算平方根") return x ** 0.5def callback(future): try: print(f"✅ 成功: {future.result()}") except Exception as e: print(f"❌ 失败: {e}")with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: f1 = executor.submit(task, 16) f1.add_done_callback(callback) # 注册回调 f2 = executor.submit(task, -4) f2.add_done_callback(callback)
五、 高级控制:模块级函数
1. wait():灵活的等待策略
与 as_completed 不同,wait 可以一次性返回“已完成”和“未完成”的 Future 集合,并支持三种返回时机:
FIRST_COMPLETED:任意一个任务完成。FIRST_EXCEPTION:任意一个任务抛出异常。ALL_COMPLETED:所有任务完成(默认)。
done, not_done = concurrent.futures.wait( futures, timeout=2, return_when=concurrent.futures.FIRST_COMPLETED)print(f"已完成: {len(done)} 个, 未完成: {len(not_done)} 个")
六、 避坑指南:异常处理
在并发编程中,异常处理尤为重要,否则容易导致线程/进程卡死。
CancelledError:当调用 future.result() 获取一个已被取消的任务时抛出。TimeoutError:当 future.result(timeout=5) 超时未返回时抛出。BrokenExecutor:当底层工作线程/进程意外死亡(如 Segmentation Fault)导致执行器损坏时抛出。此时执行器无法再提交新任务。
七、 实战总结与技巧
1. 线程池 vs 进程池,到底怎么选?
任务场景 | 推荐方案 | 核心原因 |
|---|
网络请求、文件读写、数据库查询 | ThreadPoolExecutor
| I/O 等待时不占用 CPU,线程切换成本低。 |
数学计算、图像处理、大数据清洗 | ProcessPoolExecutor
| 突破 GIL 限制,真正利用多核 CPU 并行计算。 |
混合型任务 | 视瓶颈而定 | 找出耗时最长的环节(I/O 还是 CPU)进行针对性优化。 |
2. 永远使用 with 语句
使用 with 语句可以确保无论代码是否发生异常,执行器都能被正确 shutdown(),避免资源泄漏。
3. ⚠️ 警惕 Future 死锁
绝对不要在有限的工作线程中,让一个 Future 去等待另一个 Future 的结果。
# ❌ 错误示范:会导致死锁executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)def wait_on_future(): f = executor.submit(lambda: 42) return f.result() # 唯一的工作线程被占用,这里永远等不到 f 完成f = executor.submit(wait_on_future)f.result() # 程序永久卡死!
解决办法:确保 max_workers 足够大,或者重构代码,避免 Future 之间的嵌套等待。
💡 结语concurrent.futures 提供了极其优雅的 API,让我们能像写同步代码一样编写异步并发程序。掌握线程池与进程池的适用场景,避开死锁陷阱,你的 Python 程序性能将迎来质的飞跃!
如果觉得这篇文章对你有帮助,欢迎点赞、在看、分享三连支持!你在并发编程中遇到过哪些坑?欢迎在评论区留言交流~ 👇
💡 给公众号运营者的排版建议:
- 代码块高亮:使用 Markdown 排版工具(如 Md2All 或 墨滴)时,请选择一套适合代码高亮的主题(如 GitHub 或 Vue 主题),确保 Python 代码在公众号中带有颜色高亮。
- 重点标注:文中的
⚠️ 和 💡 等 Emoji 在公众号中会正常显示,能有效缓解长文带来的视觉疲劳。 - 表格渲染:本文已将容易在手机端溢出的参数表格优化为列表,但保留了最后的“场景对比表格”,排版工具会自动将其渲染为精美的 Markdown 表格。