- Python 多线程与多进程全方位学习教程(含详细示例、代码、预期输出)
- Python 中实现并发的主要方式是 threading(多线程)和 multiprocessing(多进程)。本教程从基础到进阶,结合实际示例讲解。
背景
1. 并发基础与 GIL(全局解释器锁)
为什么需要并发?
- I/O 密集型任务(如网络请求、文件读写):等待时间长,使用并发可显著提升效率。
- CPU 密集型任务(如计算、图像处理):需要真正并行执行。
GIL(Global Interpreter Lock):
- CPython(标准 Python 实现)中有一个全局锁,只允许一个线程同时执行 Python 字节码。
- 这意味着多线程无法利用多核 CPU 进行真正的并行计算(CPU-bound 任务受限)。
- 但 I/O 操作、C 扩展(如 NumPy、Pandas 部分操作)会释放 GIL,因此多线程在 I/O 密集型任务中依然高效。
结论:
- I/O 密集 → 优先用 多线程(轻量、共享内存容易)。
- CPU 密集 → 优先用 多进程(绕过 GIL,每个进程有独立解释器和 GIL)。
多线程适用于 I/O 密集型
- I/O 密集型任务的特点是程序大部分时间都在等待外部资源(网络响应、磁盘读写等),此时 CPU 大部分时间是空闲的。多线程可以在一个线程等待 I/O 时,让其他线程继续执行,从而大幅提升并发效率。
from concurrent.futures import ThreadPoolExecutor, as_completedimport requestsfrom tqdm import tqdmimport timedeffetch(url): resp = requests.get(url, timeout=10)return url, resp.status_codeurls = ["https://httpbin.org/delay/1","https://httpbin.org/delay/2","https://httpbin.org/delay/1.5","https://httpbin.org/delay/3"] * 2
1. 单线程访问网址,并使用tqdm记录进度
defsingle_thread(urls): start = time.time()for url in tqdm(urls, total=len(urls),desc="进度"): url, code = fetch(url)# code = fetch(url)[1]# print(f"{url} 返回状态码: {code}") print(f"总耗时: {time.time()-start:.2f} 秒") # 通常比单线程快数倍single_thread(urls)# 进度: 100%|██████████| 8/8 [00:25<00:00, 3.21s/it]# 总耗时: 25.68 秒
2. 多线程方法一:(使用ThreadPoolExecutor:submit() method + as_completed() + tqdm())
defmulti_thread_submit(urls):""" using submit() method + as_completed() + tqdm() """ start = time.time()with ThreadPoolExecutor(max_workers=20) as executor: futures = [executor.submit(fetch, url) for url in urls]for future in tqdm(as_completed(futures), total=len(futures), desc="进度"): url, code = future.result()# print(f"{url} 返回状态码: {code}") print(f"总耗时: {time.time()-start:.2f} 秒") # 通常比单线程快数倍multi_thread_submit(urls)# 进度: 100%|██████████| 8/8 [00:04<00:00, 1.80it/s]# 总耗时: 4.46 秒
3.多线程方法二:(使用ThreadPoolExecutor:map())
defmulti_thread_map(urls):""" using map() method """ start = time.time()with ThreadPoolExecutor(max_workers=20) as executor: results = list(executor.map(fetch, urls)) # 如果需要传入两个参数,可以借鉴zip()函数,# 如:results = list(executor.map(fetch, zip(urls, range(len(urls)))))for (url, code) in results:# print(f"{url} 返回状态码: {code}")pass print(f"总耗时: {time.time()-start:.2f} 秒") # 通常比单线程快数倍multi_thread_map(urls)# 总耗时: 4.12 秒
多进程适用于 CPU 密集型
- CPU 密集型任务的特点是大量时间用于纯计算,需要真正利用多核 CPU。多进程能绕过 GIL,每个进程独立运行在不同 CPU 核心上,实现真正并行。
from concurrent.futures import ProcessPoolExecutor, as_completedfrom multiprocessing.pool import Poolimport requestsfrom tqdm import tqdmimport timedeffetch(url): resp = requests.get(url, timeout=10)return url, resp.status_codeurls = ["https://httpbin.org/delay/1","https://httpbin.org/delay/2","https://httpbin.org/delay/1.5","https://httpbin.org/delay/3"] * 2
1. 单进程访问网址,并使用tqdm记录进度
defsingle_process(urls): start = time.time()for url in tqdm(urls, total=len(urls),desc="进度"): url, code = fetch(url)# code = fetch(url)[1]# print(f"{url} 返回状态码: {code}") print(f"总耗时: {time.time()-start:.2f} 秒") # 通常比单线程快数倍# single_process(urls)# 进度: 100%|██████████| 8/8 [00:25<00:00, 3.21s/it]# 总耗时: 25.68 秒
2. 多线程方法一:(使用pool.apply_async)
defmulti_process_pool(urls):""" using Pool() method """ start = time.time() pool = Pool(processes=20) results = []for url in urls: results.append(pool.apply_async(fetch, args=(url,))) pool.close() pool.join()for result in results: url, code = result.get()# print(f"{url} 返回状态码: {code}") print(f"总耗时: {time.time()-start:.2f} 秒") # 通常比单线程快数倍# multi_process_pool(urls)# 总耗时: 4.68 秒
3. 多线程方法二:(使用ProcessPoolExecutor:submit() method + as_completed() + tqdm())
defmulti_process_submit(urls):""" using submit() method + as_completed() + tqdm() """ start = time.time()with ProcessPoolExecutor(max_workers=20) as executor: futures = [executor.submit(fetch, url) for url in urls]for future in tqdm(as_completed(futures), total=len(futures), desc="进度"): url, code = future.result()# print(f"{url} 返回状态码: {code}") print(f"总耗时: {time.time()-start:.2f} 秒") # 通常比单线程快数倍# multi_process_submit(urls)# 进度: 100%|██████████| 8/8 [00:04<00:00, 1.80it/s]# 总耗时: 4.46 秒
4. 多线程方法三:(使用ProcessPoolExecutor:map())
defmulti_process_map(urls):""" using map() method """ start = time.time()with ProcessPoolExecutor(max_workers=20) as executor: results = list(executor.map(fetch, urls)) # 如果需要传入两个参数,可以借鉴zip()函数,# 如:results = list(executor.map(fetch, zip(urls, range(len(urls)))))for (url, code) in results:# print(f"{url} 返回状态码: {code}")pass print(f"总耗时: {time.time()-start:.2f} 秒") # 通常比单线程快数倍multi_process_map(urls)# 总耗时: 4.12 秒