Python 多线程 vs 多进程,一次说清楚(附完整代码)
写了三年 Python,有没有人和我一样——每次遇到"慢"的问题,第一反应是加个 threading,跑完发现……还是这么慢。
然后搜一圈,满屏都是"GIL 锁"、"IO 密集型"、"CPU 密集型"这些词,看完更懵了。
今天就一次说清楚:什么时候用多线程,什么时候用多进程,怎么用才不翻车。
先说结论,省你时间
| 场景 | 推荐方案 | 原因 |
|---|
| 爬虫、请求接口、读写文件 | 多线程 threading | IO 期间线程可切换,GIL 不是瓶颈 |
| 数据计算、图像处理、压缩 | 多进程 multiprocessing | 绕过 GIL,真正并行 |
| 要简单、懒得管线程 | concurrent.futures | 统一接口,几行搞定 |
| 异步 IO 高并发 | asyncio(另说) | 协程,不是今天的主题 |
记住这张表,90% 的场景你都能做出正确选择。
为什么多线程有时候没用?聊聊 GIL
Python(CPython 实现)有个机制叫 全局解释器锁(GIL),简单说就是:同一时刻,只有一个线程能执行 Python 字节码。
所以你开了 8 个线程做 CPU 计算,其实还是一个核在跑,轮流用,反而比单线程慢(切换有开销)。
但 IO 操作(网络请求、读文件)不占 Python 字节码时间,线程等 IO 的时候会释放 GIL,让其他线程跑——所以 IO 密集型任务,多线程是有效的。
多线程实战:批量下载文件
用 threading 手动管线程,适合理解原理:
import threading
import time
import urllib.request
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
]
def download(url, idx):
print(f"[线程{idx}] 开始下载 {url}")
try:
urllib.request.urlopen(url, timeout=10)
print(f"[线程{idx}] 下载完成")
except Exception as e:
print(f"[线程{idx}] 失败: {e}")
# 单线程串行
start = time.time()
for i, url in enumerate(urls):
download(url, i)
print(f"串行耗时: {time.time() - start:.2f}s")
# 多线程并行
start = time.time()
threads = []
for i, url in enumerate(urls):
t = threading.Thread(target=download, args=(url, i))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"多线程耗时: {time.time() - start:.2f}s")
效果对比:- 串行:约 4 秒(一个一个等)
-
- 多线程:约 1 秒(同时发请求)
-
更优雅的写法:ThreadPoolExecutor
手动管 Thread 太费劲,用线程池,5 行搞定:
from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request
urls = [f"https://httpbin.org/delay/1" for _ in range(8)]
def fetch(url):
urllib.request.urlopen(url, timeout=10)
return f"完成: {url}"
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(fetch, url): url for url in urls}
for future in as_completed(futures):
try:
print(future.result())
except Exception as e:
print(f"失败: {e}")
max_workers=4 限制最大并发数,防止同时发 100 个请求把对方服务器搞挂(也防止自己被封 IP)。
多进程实战:批量数据处理
CPU 密集型任务,换 ProcessPoolExecutor:
from concurrent.futures import ProcessPoolExecutor
import time
def heavy_compute(n):
"""模拟 CPU 密集计算:计算 n 的阶乘"""
result = 1
for i in range(1, n + 1):
result *= i
return len(str(result)) # 返回结果位数
numbers = [50000, 60000, 70000, 80000, 90000, 100000]
# 单进程
start = time.time()
results = [heavy_compute(n) for n in numbers]
print(f"单进程耗时: {time.time() - start:.2f}s, 结果: {results}")
# 多进程(必须在 if __name__ == '__main__': 块内)
if __name__ == '__main__':
start = time.time()
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(heavy_compute, numbers))
print(f"多进程耗时: {time.time() - start:.2f}s, 结果: {results}")
注意:多进程代码必须放在 if __name__ == '__main__': 里,否则 Windows 上会报错(Linux/Mac 不受影响,但养成习惯)。
效果对比(4 核机器):
实战场景:多进程处理大 CSV 文件
import multiprocessing
import pandas as pd
from pathlib import Path
def process_chunk(chunk_file):
"""处理单个分块文件"""
df = pd.read_csv(chunk_file)
# 模拟耗时处理
df['new_col'] = df['value'].apply(lambda x: x ** 2 + x * 3)
result_file = chunk_file.parent / f"result_{chunk_file.name}"
df.to_csv(result_file, index=False)
return str(result_file)
def split_csv(filepath, chunk_size=10000):
"""将大 CSV 切分为小块"""
df = pd.read_csv(filepath)
chunks = []
for i, start in enumerate(range(0, len(df), chunk_size)):
chunk = df.iloc[start:start + chunk_size]
chunk_file = Path(f"/tmp/chunk_{i}.csv")
chunk.to_csv(chunk_file, index=False)
chunks.append(chunk_file)
return chunks
if __name__ == '__main__':
# 切分文件
chunks = split_csv("big_data.csv", chunk_size=10000)
# 多进程处理
with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:
results = pool.map(process_chunk, chunks)
print(f"处理完成,共 {len(results)} 个结果文件")
for r in results:
print(f" - {r}")
进程间通信:Queue
多进程之间内存不共享,要传数据用 Queue:
import multiprocessing
import time
def producer(queue, items):
for item in items:
queue.put(item)
print(f"生产: {item}")
time.sleep(0.1)
queue.put(None) # 结束信号
def consumer(queue):
while True:
item = queue.get()
if item is None:
break
print(f"消费: {item * 2}")
if __name__ == '__main__':
q = multiprocessing.Queue()
data = list(range(10))
p = multiprocessing.Process(target=producer, args=(q, data))
c = multiprocessing.Process(target=consumer, args=(q,))
p.start()
c.start()
p.join()
c.join()
常见坑,踩过才知道
坑1:多进程的函数必须可 pickleLambda、嵌套函数、类方法在多进程下经常报 can't pickle 错误——把处理函数写成模块级别的普通函数就行。
坑2:线程不安全的操作多线程写同一个变量要加锁:
import threading
counter = 0
lock = threading.Lock()
def increment():
global counter
with lock: # 加锁保护
counter += 1
threads = [threading.Thread(target=increment) for _ in range(1000)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"最终值: {counter}") # 应该是 1000,不加锁可能小于 1000
坑3:进程数不是越多越好进程创建有开销,任务太少开 16 个进程反而比 4 个慢。一般设 cpu_count() 或 cpu_count() - 1 就够了。
选型决策树
任务是否涉及大量等待(网络/IO)?
├── 是 → 多线程 ThreadPoolExecutor
│ 并发数建议 10~50,视接口限制
└── 否(纯 CPU 计算)→ 多进程 ProcessPoolExecutor
进程数建议 cpu_count() 或 cpu_count()-1
总结
| 多线程 | 多进程 |
|---|
| 内存 | 共享 | 独立 |
| 通信 | 直接(要加锁) | Queue/Pipe |
| GIL 影响 | 有(CPU 任务无效) | 无(真并行) |
| 适合 | IO 密集型 | CPU 密集型 |
| 启动开销 | 小 | 较大 |
并发不是银弹,选对工具才是关键。IO 等待多就用线程,CPU 算得猛就用进程,高并发网络请求就上 asyncio——别乱用,乱用比串行还慢。
如果这篇帮你搞清楚了 GIL 和并发选型,点个在看 让更多人看到。有踩过的坑,评论区聊聊👇
几行代码,专注分享能用上的 Python 技巧。