那天晚上下班挺晚的,大概十一点多,我在公司楼下等外卖,手机那边我们组小李还在吐槽:“哥,我这个脚本要跑一个小时,老板说让‘搞搞并行’,你说这玩意儿不是一句for就完了吗,还并行啥啊?”
我就让他把代码甩过来看了一眼,典型的“一个 for 把一堆活全干了”的那种:
defprocess_file(path: str) -> dict:
# 模拟处理一下
import time, hashlib
time.sleep(0.5)
return {
"path": path,
"md5": hashlib.md5(path.encode()).hexdigest(),
}
paths = [f"file_{i}.txt"for i in range(20)]
results = [process_file(p) for p in paths]
print(results)
20 个文件,每个 0.5 秒,串行就是 10 秒往上,这还只是测试环境。你要是真在服务器上扫几千个日志文件,可能人都下班了还没跑完。
我当时就跟他说,其实你这活儿,改“一行”基本就多核飞起来了,只是那一行前后要搭点简单骨架,不然 Python 不认识你在干嘛。
说人话的并行:到底在“多核”哪里省时间的
简单先说清一个事,免得等会儿概念打架:
这俩在 Python 里走的路不一样:
所以那天我先问小李一句:“你这个 process_file 主要是在干嘛?读磁盘 + sleep?” 他说差不多,读一堆日志,里面还有点 HTTP 请求。那就好办,多线程一行就能搞。
一行线程池并行:把 for 换成 map 就行
我直接把他那段改成这样(核心就一行,别急看解释):
from concurrent.futures import ThreadPoolExecutor
defprocess_file(path: str) -> dict:
import time, hashlib
time.sleep(0.5)
return {
"path": path,
"md5": hashlib.md5(path.encode()).hexdigest(),
}
paths = [f"file_{i}.txt"for i in range(20)]
with ThreadPoolExecutor(max_workers=8) as pool:
results = list(pool.map(process_file, paths)) # ← 关键的一行并行
print(results)
你看,原来是:
results = [process_file(p) for p in paths]
现在变成:
results = list(pool.map(process_file, paths))
逻辑没变:还是“对每个 path 调一次 process_file,再把结果收集成 list”。 但这次是池子里 8 个线程一起干活,而不是一个人干 20 次。
如果你比较强迫症,非要“一行版本”的话,在脚本里这么写也是合法的(虽然可读性差一点):
from concurrent.futures import ThreadPoolExecutor; results = list(ThreadPoolExecutor(max_workers=8).map(process_file, paths))
这真就物理上“一行 Python 代码并行”了,只是一般不推荐这样写,日后同事看到会想揍你。
我给小李做了个简单测速,他听完就不吵吵了
我当时一边等外卖一边敲了个小 demo 发给他,你有电脑可以自己跑一下:
import time
from concurrent.futures import ThreadPoolExecutor
defwork(x: int) -> int:
time.sleep(0.5) # 模拟 I/O 等待
return x * x
numbers = list(range(20))
# 串行版本
start = time.perf_counter()
serial = [work(n) for n in numbers]
print("serial cost:", time.perf_counter() - start)
# 并行版本(关键一行)
start = time.perf_counter()
with ThreadPoolExecutor(max_workers=8) as pool:
parallel = list(pool.map(work, numbers))
print("parallel cost:", time.perf_counter() - start)
print(serial == parallel)
你大概能看到类似这样的输出(时间别抠毫秒级,就个量级感觉):
结果还是一样的 serial == parallel,但时间直接砍好几倍。
关键加速就来自那一行:
parallel = list(pool.map(work, numbers))
我跟小李说,你只要记住一个心法就行:
“原来写 for 的地方,能改成 pool.map,就已经是并行思维了。”
其他什么线程池、上下文管理器,那些就是外围包装。
等等,有 CPU 算得很猛的版本吗?
说完 I/O 那种,我又想起来前两天有个兄弟做图片批量缩放,用 Pillow,直接 for 循环,CPU 打满一核,另外几核摸鱼。那种就不能再用线程池了,要换成进程池。
我直接给他发了一个这样的版本:
from concurrent.futures import ProcessPoolExecutor
from pathlib import Path
from PIL import Image
defresize_image(path: str) -> str:
img = Image.open(path)
img = img.resize((256, 256))
out = Path("output") / Path(path).name
out.parent.mkdir(parents=True, exist_ok=True)
img.save(out)
return str(out)
if __name__ == "__main__": # 注意这个!
images = [str(p) for p in Path("images").glob("*.jpg")]
with ProcessPoolExecutor() as pool:
results = list(pool.map(resize_image, images)) # 这里一行多进程并行
print(results)
这里就不搞“一行极限压缩写法”了,主要想让你看到:
ThreadPoolExecutor 换成 ProcessPoolExecutor;if __name__ == "__main__":,尤其在 Windows 上,不然会无限创建子进程,电脑要被你搞炸;你真想写成物理一行,也可以这么搞(别在生产这么写就行):
from concurrent.futures import ProcessPoolExecutor; from pathlib import Path; from PIL import Image
defresize_image(path: str) -> str:
img = Image.open(path); img = img.resize((256, 256)); out = Path("output") / Path(path).name; out.parent.mkdir(parents=True, exist_ok=True); img.save(out); return str(out)
if __name__ == "__main__":
images = [str(p) for p in Path("images").glob("*.jpg")]
results = list(ProcessPoolExecutor().map(resize_image, images))
print(results)
核心思路还是不变:把“要处理的一堆任务”变成一个可迭代,然后一行 Executor().map(func, iterable) 抛给池子。
中间插一句:GIL 这事到底要不要管它
我知道你脑子里可能蹦一个词:GIL。 小李当时也是,上来就说:“Python 有 GIL,多线程不就没用了吗?”
我就跟他解释了半天,大概意思:
所以:
time.sleep、requests.get、read() 这类,线程池很香;这俩你脑子里有个简单判断就够了,不用把 GIL 想得太神秘。
“一行并行”的几个坑,早说免得你踩
我跟小李那天语音聊了十几分钟,顺便把几个常见坑提前说了下,你也可以顺着记一记:
函数必须能被序列化
ProcessPoolExecutor),你的 func 不能是 lambda、不能是嵌套在函数里的函数,得是顶层定义的;全局变量别乱依赖
map。任务太少别上池子
max_workers = min(32, cpu_count() * 5) 之类,别夸张。进度条要自己配合
你要边算边看进度,可以配合 tqdm,比如:
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
with ThreadPoolExecutor(max_workers=8) as pool:
results = list(tqdm(pool.map(process_file, paths), total=len(paths)))
这也是典型的一行并行 + 一行进度条,结合起来体验挺好。
异常别吞
map 里面如果有任务抛异常,再去取结果的时候会在那一项抛出来;as_completed,但那就不止一行了。最后我真给小李整了个“一行并行脚本模板”,他现在逢人就用
那天外卖到了,我走回家的路上,顺手给他丢了个最简化版本,他之后新写的那种“批量处理脚本”基本都照这个抄一遍就行了,你也可以当模板用:
# 假设这是 cpu_bound.py
from concurrent.futures import ProcessPoolExecutor
import time
from math import sqrt
defheavy(x: int) -> float:
# 模拟重计算
s = 0
for i in range(50_000):
s += sqrt((x + i) % 1000)
return s
if __name__ == "__main__":
tasks = list(range(16))
start = time.perf_counter()
with ProcessPoolExecutor() as pool:
results = list(pool.map(heavy, tasks)) # ← 一行多进程并行
print("cost:", time.perf_counter() - start)
print(results)
你要把它改成 I/O 版本,就把 ProcessPoolExecutor 换成 ThreadPoolExecutor,把 heavy 里面那一堆循环换成 time.sleep(0.5) 或 requests.get(url) 就完事。
整篇啰里八嗦说了这么多,其实就想让你脑子里形成一个非常具体的画面:
以后你看到那种“for 循环处理一大堆相互独立的小任务”的 Python 代码,就先在心里默念一句: “能不能把它改成
Executor().map(func, tasks)那一行?”
能改,大概率就能并行。 至于到底用线程池还是进程池,就看你那段逻辑是在“算”,还是在“等”。
行了,我先去冲杯咖啡,等会儿想起来再跟你唠唠分布式那种“多机器一起干活”的版……
-END-
我为大家打造了一份RPA教程,完全免费:songshuhezi.com/rpa.html