1. 创始时间与作者
2. 官方资源
Python 官方文档:https://docs.python.org/3/library/concurrent.futures.html
源码地址:https://github.com/python/cpython/tree/main/Lib/concurrent/futures
PEP 3148:https://www.python.org/dev/peps/pep-3148/
教程指南:https://docs.python.org/3/library/concurrent.futures.html
3. 核心功能
4. 应用场景
1. 基础线程池使用
import concurrent.futuresimport urllib.requestimport timedef download_url(url):"""下载URL内容"""print(f"开始下载: {url}")start_time = time.time()with urllib.request.urlopen(url, timeout=10) as response:content = response.read()size = len(content)end_time = time.time()print(f"下载完成: {url}, 大小: {size} 字节, 耗时: {end_time - start_time:.2f}秒")return sizedef basic_threadpool_demo():"""基础线程池演示"""urls = ['http://www.python.org','http://www.google.com','http://www.github.com','http://www.stackoverflow.com','http://www.wikipedia.org' ]# 使用线程池执行器with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:# 方法1: submit - 逐个提交任务print("=== submit 方法 ===")future_to_url = {executor.submit(download_url, url): urlfor url in urls }# 收集结果results = []for future in concurrent.futures.as_completed(future_to_url):url = future_to_url[future]try:result = future.result()results.append((url, result))print(f"URL: {url}, 结果: {result}")except Exception as e:print(f"URL: {url} 发生异常: {e}")print(f"总共下载了 {len(results)} 个URL")if __name__ == '__main__':basic_threadpool_demo()2. 进程池并行计算
import concurrent.futuresimport mathimport timedef is_prime(n):"""检查数字是否为素数"""if n<2:return Falseif n == 2:return Trueif n%2 == 0:return Falsesqrt_n = int(math.floor(math.sqrt(n)))for i in range(3, sqrt_n+1, 2):if n%i == 0:return Falsereturn Truedef count_primes_in_range(start_end):"""计算指定范围内的素数数量"""start, end = start_endcount = 0for num in range(start, end+1):if is_prime(num):count += 1return countdef process_pool_demo():"""进程池演示"""# 定义计算范围ranges = [ (1, 50000), (50001, 100000), (100001, 150000), (150001, 200000) ]print(f"开始并行计算素数,使用 {concurrent.futures.cpu_count()} 个CPU核心")start_time = time.time()# 使用进程池执行器with concurrent.futures.ProcessPoolExecutor() as executor:# 方法1: map - 顺序保持print("=== map 方法 ===")results_map = list(executor.map(count_primes_in_range, ranges))# 方法2: submit + as_completedprint("=== submit + as_completed 方法 ===")future_to_range = {executor.submit(count_primes_in_range, r): rfor r in ranges }results_async = []for future in concurrent.futures.as_completed(future_to_range):r = future_to_range[future]result = future.result()results_async.append(result)print(f"范围 {r} 完成,找到 {result} 个素数")end_time = time.time()total_primes_map = sum(results_map)total_primes_async = sum(results_async)print(f"\n计算结果:")print(f"map 方法总素数: {total_primes_map}")print(f"异步方法总素数: {total_primes_async}")print(f"总耗时: {end_time - start_time:.2f} 秒")if __name__ == '__main__':process_pool_demo()3. 高级任务调度与回调
import concurrent.futuresimport timeimport randomimport threadingclass AdvancedTaskManager:"""高级任务管理器"""def __init__(self, max_workers=None):self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)self.completed_tasks = 0self.failed_tasks = 0self.lock = threading.Lock()self.results = []def complex_task(self, task_id, duration):"""复杂任务模拟"""print(f"任务 {task_id} 开始执行,预计耗时 {duration:.1f}秒")# 模拟任务执行time.sleep(duration)# 模拟随机失败if random.random() <0.1: # 10% 失败率raise Exception(f"任务 {task_id} 执行失败")result = f"任务 {task_id} 完成,结果: {task_id * 100}"print(f"任务 {task_id} 执行成功")return resultdef task_completed_callback(self, future, task_id):"""任务完成回调"""try:result = future.result()with self.lock:self.completed_tasks += 1self.results.append((task_id, result, 'success'))print(f"回调: 任务 {task_id} 成功完成")except Exception as e:with self.lock:self.failed_tasks += 1self.results.append((task_id, str(e), 'failed'))print(f"回调: 任务 {task_id} 失败: {e}")def submit_tasks(self, num_tasks):"""提交多个任务"""futures = []for i in range(num_tasks):# 随机任务时长duration = random.uniform(1, 5)# 提交任务future = self.executor.submit(self.complex_task, i, duration)# 添加回调future.add_done_callback(lambda f, task_id=i: self.task_completed_callback(f, task_id) )futures.append(future)return futuresdef wait_for_completion(self, futures, timeout=None):"""等待任务完成"""# 使用 wait 方法等待完成done, not_done = concurrent.futures.wait(futures, timeout=timeout,return_when=concurrent.futures.ALL_COMPLETED )print(f"\n任务完成情况:")print(f"已完成: {len(done)}")print(f"未完成: {len(not_done)}")return done, not_donedef get_statistics(self):"""获取统计信息"""with self.lock:return {'completed': self.completed_tasks,'failed': self.failed_tasks,'total': self.completed_tasks+self.failed_tasks,'success_rate': self.completed_tasks/ (self.completed_tasks+self.failed_tasks) *100if (self.completed_tasks+self.failed_tasks) >0 else 0 }def shutdown(self):"""关闭执行器"""self.executor.shutdown(wait=True)def advanced_demo():"""高级功能演示"""manager = AdvancedTaskManager(max_workers=3)try:# 提交任务print("提交任务...")futures = manager.submit_tasks(10)# 等待任务完成print("等待任务完成...")done, not_done = manager.wait_for_completion(futures, timeout=30)# 显示统计信息stats = manager.get_statistics()print(f"\n最终统计:")print(f"成功任务: {stats['completed']}")print(f"失败任务: {stats['failed']}")print(f"成功率: {stats['success_rate']:.1f}%")# 显示结果print(f"\n任务结果:")for task_id, result, status in manager.results:print(f"任务 {task_id}: {status} - {result}")finally:manager.shutdown()if __name__ == '__main__':advanced_demo()4. 性能对比分析
import concurrent.futuresimport timeimport mathfrom typing import List, Callabledef performance_comparison():"""性能对比分析"""def cpu_intensive_work(n: int) ->float:"""CPU密集型工作"""return sum(math.sqrt(i) for i in range(n))def io_intensive_work(duration: float) ->str:"""I/O密集型工作"""time.sleep(duration)return f"IO工作完成,耗时 {duration}秒"def run_sequential(tasks: List, work_func: Callable):"""顺序执行"""start_time = time.time()results = [work_func(task) fortaskintasks]end_time = time.time()return results, end_time-start_timedef run_threadpool(tasks: List, work_func: Callable, max_workers: int):"""线程池执行"""start_time = time.time()with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:results = list(executor.map(work_func, tasks))end_time = time.time()return results, end_time-start_timedef run_processpool(tasks: List, work_func: Callable, max_workers: int):"""进程池执行"""start_time = time.time()with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) asexecutor:results = list(executor.map(work_func, tasks))end_time = time.time()return results, end_time-start_time# CPU密集型任务测试print("=== CPU密集型任务测试 ===")cpu_tasks = [1000000] *8# 8个相同的CPU密集型任务# 顺序执行_, seq_time = run_sequential(cpu_tasks, cpu_intensive_work)# 线程池执行_, thread_time = run_threadpool(cpu_tasks, cpu_intensive_work, 4)# 进程池执行_, process_time = run_processpool(cpu_tasks, cpu_intensive_work, 4)print(f"顺序执行耗时: {seq_time:.2f}秒")print(f"线程池执行耗时: {thread_time:.2f}秒")print(f"进程池执行耗时: {process_time:.2f}秒")print(f"进程池加速比: {seq_time / process_time:.2f}x")# I/O密集型任务测试print("\n=== I/O密集型任务测试 ===")io_tasks = [0.5] *8# 8个相同的I/O密集型任务# 顺序执行_, seq_time_io = run_sequential(io_tasks, io_intensive_work)# 线程池执行_, thread_time_io = run_threadpool(io_tasks, io_intensive_work, 8)# 进程池执行_, process_time_io = run_processpool(io_tasks, io_intensive_work, 8)print(f"顺序执行耗时: {seq_time_io:.2f}秒")print(f"线程池执行耗时: {thread_time_io:.2f}秒")print(f"进程池执行耗时: {process_time_io:.2f}秒")print(f"线程池加速比: {seq_time_io / thread_time_io:.2f}x")# 性能分析总结print("\n=== 性能分析总结 ===")print("CPU密集型任务:")print(" - 进程池表现最佳(绕过GIL限制)")print(" - 线程池受GIL限制,性能接近顺序执行")print("\nI/O密集型任务:")print(" - 线程池表现最佳(创建开销小)")print(" - 进程池创建开销较大,但仍优于顺序执行")if __name__ == '__main__':performance_comparison()
5. 底层逻辑与技术原理
核心架构
关键技术
Executor 设计模式:
# 统一的执行器接口class Executor:def submit(self, fn, *args, **kwargs) ->Futuredef map(self, fn, *iterables, timeout=None) ->Iteratordef shutdown(self, wait=True)
Future 对象:
线程池实现:
基于 threading 模块
工作线程复用,减少创建开销
线程安全的任务队列
进程池实现:
基于 multiprocessing 模块
使用进程间通信(队列、管道)
独立的内存空间,绕过 GIL
任务调度策略:
FIFO:先进先出调度
Work Stealing:工作窃取(某些实现)
负载均衡:自动分配任务
Future 状态机
# Future 对象的内部状态class FutureState:PENDING = 'PENDING'# 等待执行RUNNING = 'RUNNING'# 正在执行CANCELLED = 'CANCELLED'# 已取消FINISHED = 'FINISHED'# 已完成EXCEPTION = 'EXCEPTION'# 异常结束
6. 安装与配置
基础安装
# concurrent.futures 是 Python 标准库的一部分,无需额外安装# 验证安装python -c"import concurrent.futures; print(concurrent.futures.__doc__)"
环境要求
| 组件 | 最低要求 | 推荐配置 |
|---|
| Python | 3.2+ | 3.6+ |
| 操作系统 | Windows/Linux/macOS | 同左 |
| 内存 | 512MB | 4GB+ |
| CPU | 多核 | 多核多线程 |
平台优化配置
import concurrent.futuresimport osdef optimize_executor_config():"""优化执行器配置"""# 获取系统信息cpu_count = os.cpu_count()print(f"CPU 核心数: {cpu_count}")# 根据任务类型推荐配置config = {'cpu_intensive': {'executor': 'ProcessPoolExecutor','max_workers': cpu_count,'description': 'CPU密集型任务,使用进程池绕过GIL' },'io_intensive': {'executor': 'ThreadPoolExecutor','max_workers': min(32, (cpu_count or 1) *5),'description': 'I/O密集型任务,使用线程池减少开销' },'mixed_workload': {'executor': 'ThreadPoolExecutor','max_workers': min(32, (cpu_count or 1) *3),'description': '混合工作负载,平衡CPU和I/O' } }return configif __name__ == '__main__':config = optimize_executor_config()for workload, settings in config.items():print(f"{workload}: {settings}")
7. 性能指标
| 操作类型 | 执行时间 | 内存开销 | 适用场景 |
|---|
| 线程创建 | 0.1-1ms | 1-8MB | I/O密集型任务 |
| 进程创建 | 10-100ms | 10-50MB | CPU密集型任务 |
| 任务提交 | 0.01-0.1ms | 可忽略 | 高频任务调度 |
| 结果获取 | 0.001-0.01ms | 可忽略 | 实时结果处理 |
8. 高级功能使用
1. 自定义执行器
import concurrent.futuresimport threadingimport queueimport timeclass CustomThreadPoolExecutor(concurrent.futures.Executor):"""自定义线程池执行器"""def __init__(self, max_workers=None, thread_name_prefix=''):if max_workers is None:max_workers = min(32, (os.cpu_count() or 1) *4)self._max_workers = max_workersself._work_queue = queue.Queue()self._threads = set()self._shutdown = Falseself._shutdown_lock = threading.Lock()self._thread_name_prefix = thread_name_prefixdef submit(self, fn, *args, **kwargs):"""提交任务"""with self._shutdown_lock:if self._shutdown:raise RuntimeError('不能在新任务提交后关闭执行器')# 创建Future对象f = concurrent.futures.Future()# 将任务放入队列self._work_queue.put((f, fn, args, kwargs))# 确保有足够的工作线程self._start_worker_if_needed()return fdef _start_worker_if_needed(self):"""如果需要,启动新的工作线程"""if len(self._threads) < self._max_workers:t = threading.Thread(target=self._worker,name=f'{self._thread_name_prefix}_{len(self._threads)}' )t.daemon = Truet.start()self._threads.add(t)def _worker(self):"""工作线程主函数"""while True:try:# 获取任务future, fn, args, kwargs = self._work_queue.get(timeout=0.1)# 如果执行器已关闭且队列为空,退出if self._shutdown and self._work_queue.empty():break# 执行任务if not future.set_running_or_notify_cancel():continuetry:result = fn(*args, **kwargs)future.set_result(result)except Exception as e:future.set_exception(e)except queue.Empty:# 检查是否需要退出if self._shutdown:breakcontinuedef shutdown(self, wait=True):"""关闭执行器"""with self._shutdown_lock:self._shutdown = Trueif wait:for t in self._threads:t.join()def custom_executor_demo():"""自定义执行器演示"""def task(n):time.sleep(1)return n*nwith CustomThreadPoolExecutor(max_workers=2) as executor:futures = [executor.submit(task, i) for i in range(5)]results = [f.result() for f in futures]print(f"自定义执行器结果: {results}")if __name__ == '__main__':custom_executor_demo()2. 错误处理与重试机制
import concurrent.futuresimport timeimport randomfrom functools import wrapsdef retry_on_failure(max_retries=3, delay=1):"""重试装饰器"""def decorator(func):@wraps(func)def wrapper(*args, **kwargs):for attempt in range(max_retries):try:return func(*args, **kwargs)except Exception as e:if attempt == max_retries-1:raise eprint(f"尝试 {attempt + 1} 失败: {e}, {delay}秒后重试...")time.sleep(delay)return Nonereturn wrapperreturn decoratorclass RobustTaskManager:"""健壮的任务管理器"""def __init__(self, max_workers=None):self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)@retry_on_failure(max_retries=3, delay=1)def unreliable_task(self, task_id):"""不可靠的任务(模拟随机失败)"""# 模拟30%的失败率if random.random() <0.3:raise Exception(f"任务 {task_id} 随机失败")time.sleep(random.uniform(0.5, 2))return f"任务 {task_id} 成功完成"def execute_with_timeout(self, task_id, timeout=3):"""带超时的任务执行"""future = self.executor.submit(self.unreliable_task, task_id)try:result = future.result(timeout=timeout)return resultexcept concurrent.futures.TimeoutError:future.cancel()returnf"任务 {task_id} 超时"except Exception as e:returnf"任务 {task_id} 失败: {e}"def batch_execute_with_fallback(self, tasks):"""批量执行,带降级策略"""results = []with self.executor:# 提交所有任务future_to_task = {self.executor.submit(self.unreliable_task, task): taskfor task in tasks }for future in concurrent.futures.as_completed(future_to_task):task = future_to_task[future]try:result = future.result()results.append((task, result, 'success'))except Exception as e:# 降级策略:返回默认值fallback_result = f"任务 {task} 降级处理"results.append((task, fallback_result, 'fallback'))print(f"任务 {task} 失败,使用降级策略: {e}")return resultsdef robust_demo():"""健壮性演示"""manager = RobustTaskManager(max_workers=3)# 单任务超时测试print("=== 单任务超时测试 ===")for i in range(3):result = manager.execute_with_timeout(i, timeout=2)print(result)# 批量执行测试print("\n=== 批量执行测试 ===")tasks = list(range(10))results = manager.batch_execute_with_fallback(tasks)success_count = sum(1 for _, _, status in results if status == 'success')fallback_count = sum(1 for _, _, status in results if status == 'fallback')print(f"成功任务: {success_count}")print(f"降级任务: {fallback_count}")print(f"成功率: {success_count / len(results) * 100:.1f}%")if __name__ == '__main__':robust_demo()
9. 与同类工具对比
| 特性 | concurrent.futures | threading | multiprocessing | asyncio |
|---|
| 抽象级别 | 高 | 中 | 中 | 高 |
| 编程模型 | 同步/异步混合 | 同步 | 同步 | 异步 |
| 执行单元 | 线程/进程 | 线程 | 进程 | 协程 |
| GIL 影响 | 线程池受限制 | 受限制 | 无 | 无 |
| 内存开销 | 中等 | 低 | 高 | 低 |
| 适用场景 | 通用并发 | I/O密集型 | CPU密集型 | I/O密集型 |
10. 企业级应用案例
Web 服务
Django、Flask 应用的并发请求处理
异步数据库查询和外部 API 调用
数据处理
Pandas 数据框的并行处理
大规模数据清洗和转换
机器学习
Scikit-learn 的并行模型训练
超参数搜索和交叉验证
网络爬虫
金融分析
总结
concurrent.futures 是 Python 并发编程的现代化接口,核心价值在于:
统一抽象:线程和进程的统一编程接口
简单易用:高级 API 隐藏底层复杂性
灵活强大:支持多种并发模式和调度策略
未来兼容:为异步编程提供平滑过渡
技术亮点:
Future 模式提供统一的异步结果处理
执行器接口支持多种并发后端
丰富的任务调度和结果收集机制
内置错误处理和超时控制
适用场景:
需要简单并发解决方案的应用
混合型工作负载(CPU + I/O)
快速原型开发和概念验证
教育和小型项目开发
安装使用:
# 无需安装,直接导入python -c"import concurrent.futures; print('concurrent.futures 模块可用')"学习资源:
官方文档:https://docs.python.org/3/library/concurrent.futures.html
PEP 3148:https://www.python.org/dev/peps/pep-3148/
实战教程:https://realpython.com/python-concurrency/
concurrent.futures 模块作为 Python 并发编程的现代化入口点,为开发者提供了简单而强大的工具来处理各种并发场景,是每个 Python 开发者都应该掌握的并发编程工具。