很多应用一辈子都用不着离开 Python 的世界。我们从其他 Python 库和模块中调用代码,或者使用 multiprocessing、multithreading 来并发执行代码。但并不是所有想交互的东西都写成 Python。比如你可能有个用 C++、Go、Rust 或其他语言写的现成应用,它们性能更好,或干脆就是现成的,不用再重写一遍。你也可能想用系统自带的命令行工具,像 GREP 用来搜大文件、cURL 做 HTTP 请求,或者其他一大堆随手可用的应用。
在标准 Python 里,我们可以用 subprocess 模块来启动不同程序作为独立进程。跟大多数其他模块一样,标准的 subprocess API 是阻塞式的,这使得它和 asyncio 不兼容,除非配合多线程或多进程。好在 asyncio 提供了一个模仿 subprocess 模块的模块,让我们可以用协程异步地创建和管理子进程。
这一章,我们就来学习如何用 asyncio 创建和管理子进程的基础知识——以一个用别种语言写的程序为例。还会教你怎么处理输入输出,读取标准输出,以及把数据从你的主程序发给子进程。
假设你想扩展一个已有的 Python Web API 功能。你们公司另一个团队已经用命令行工具做了一个批量处理功能,但问题是这个工具是用 Rust 写的。既然这东西已经存在了,自然不想再去用 Python 重新造轮子。那有没有办法,在现有的 Python API 里直接用上它的功能呢?
好在这玩意儿有命令行接口,我们就可以通过子进程来复用它。我们把它当命令行程序调起来,运行在一个独立的子进程中。然后读取它的返回结果,该拿来就拿,再也不用自己重写了。
那问题来了:怎么创建一个子进程并运行它?asyncio 本身就提供了两个协程函数来创建子进程:asyncio.create_subprocess_shell 与 asyncio.create_subprocess_exec。这两个函数都会返回一个 Process 实例,它有几个方法可以用来等待进程结束、终止进程等。既然两样都差不多,为什么要两个?什么时候该用哪个?
create_subprocess_shell 会在系统安装的 shell(比如 zsh、bash)里运行子进程。通常来说,除非必须用 shell 特性,否则你应该首选 create_subprocess_exec。因为用 shell 会埋坑:不同机器可能装的 shell 不同,或者同一个 shell 配置也不一样。这样就很难保证程序在各种环境里行为一致。
我们先用最简单的例子试试,写个 asyncio 程序运行一个命令行程序。这次我们用 ls 命令来列出当前目录的内容,做个测试(虽然实际项目里不会这么干)。如果你用的是 Windows,就把 ls -l 换成 cmd /c dir。
import asynciofrom asyncio.subprocess import Processasync def main(): process: Process = await asyncio.create_subprocess_exec('ls', '-l') print(f'进程 PID 是: {process.pid}') status_code = await process.wait() print(f'状态码: {status_code}')asyncio.run(main())
上面这段代码里,我们用 create_subprocess_exec 启动了 ls -l 命令。你也可以在后面加更多参数。这里我们传了 -l,能显示更多文件信息,比如是谁创建的。创建完进程后,我们打印出进程号,然后调用 wait 协程。这个协程会一直等,直到子进程结束。一结束就返回状态码,正常应该是 0。
默认情况下,子进程的输出会直接发送到我们自己的标准输出。所以运行时你会看到类似这样的输出(根据你目录内容略有不同):
进程 PID 是: 54438total 8drwxr-xr-x 4 matthewfowler staff 128 Dec 23 15:20 .drwxr-xr-x 25 matthewfowler staff 800 Dec 23 14:52 ..-rw-r--r-- 1 matthewfowler staff 0 Dec 23 14:52 __init__.py-rw-r--r-- 1 matthewfowler staff 293 Dec 23 15:20 basics.py状态码: 0
注意,wait 协程会一直阻塞,直到子进程结束。但谁也不知道进程要跑多久,甚至有可能永远跑不完。如果担心“逃逸进程”,就得用 asyncio.wait_for 加个超时。不过有个坑:wait_for 超时后会终止它所在的协程任务,并不会杀死子进程本身,只停止那个等待的任务。
所以我们得换个更靠谱的方式关掉超时的进程。幸运的是,Process 有两个方法能派上用场:terminate 与 kill。terminate 发送 SIGTERM 信号,kill 发送 SIGKILL 信号。这两个方法都不是协程,也是非阻塞的——它们只是发信号。如果你还想知道终止后的状态码,或者要等清理工作完成,还是得再次调用 wait。
我们来试一下用 sleep 命令(在 Windows 上换成 cmd /c timeout 3)搞个长时间运行的子进程,然后强制终止它:
import asynciofrom asyncio.subprocess import Processasync def main(): process: Process = await asyncio.create_subprocess_exec('sleep', '3') print(f'进程 PID 是: {process.pid}') try: status_code = await asyncio.wait_for(process.wait(), timeout=1.0) print(status_code) except asyncio.TimeoutError: print('等待超时,正在终止...') process.terminate() status_code = await process.wait() print(status_code)asyncio.run(main())
在这个例子中,我们创建了个要睡 3 秒的子进程,外层套了 wait_for,超时时间 1 秒。1 秒后 wait_for 会抛出 TimeoutError,在 except 块里我们手动终止进程,再等它结束并打印状态码。输出大概如下:
进程 PID 是: 54709等待超时,正在终止...-15
一点小提醒:except 块里的 wait 也有可能等很久,如果担心这个问题,可以也用 wait_for 包一层。
前面的例子中,子进程的标准输出直接输出到了我们应用的控制台。但如果不想这样呢?也许你想对输出做点处理,又或者输出根本无关紧要,完全可忽略。
create_subprocess_exec 协程有个 stdout 参数,允许你指定输出去哪儿。这个参数接收一个枚举值,你可以选择:重定向到自己程序的标准输出、用 StreamReader 接收、或者直接丢进 /dev/null 忽略掉。
假设我们要同时运行多个子进程,并且显示它们各自的输出。为了区分是哪个进程的输出,我们可以给每个输出加个前缀,说明来源命令。下面我们用 ls -la 来试试。
import asynciofrom asyncio import StreamReaderfrom asyncio.subprocess import Processasync def write_output(prefix: str, stdout: StreamReader): while line := await stdout.readline(): print(f'[{prefix}]: {line.rstrip().decode()}')async def main(): program = ['ls', '-la'] process: Process = await asyncio.create_subprocess_exec(*program, stdout=asyncio .subprocess.PIPE) print(f'进程 PID 是: {process.pid}') stdout_task = asyncio.create_task(write_output(' '.join(program), process.stdout)) return_code, _ = await asyncio.gather(process.wait(), stdout_task) print(f'进程返回: {return_code}')asyncio.run(main())
这里我们先定义了一个 write_output 协程,逐行读取输出流,并在每行前面加上前缀。在主协程里,我们创建了子进程,并设置 stdout 为 PIPE,让它把输出发给 StreamReader。然后我们启了一个任务来运行 write_output,并与 process.wait() 并发运行。当你运行这个程序,会发现输出都带上了命令前缀:
进程 PID 是: 56925[ls -la]: total 32[ls -la]: drwxr-xr-x 7 matthewfowler staff 224 Dec 23 09:07 .[ls -la]: drwxr-xr-x 25 matthewfowler staff 800 Dec 23 14:52 ..[ls -la]: -rw-r--r-- 1 matthewfowler staff 0 Dec 23 14:52 __init__.py进程返回: 0
然而用管道传输数据时,死锁风险特别高。尤其是当子进程产生大量输出而你没及时读取时,很容易卡住。来演示一下:
import sys[sys.stdout.buffer.write(b'Hello there!!\n') for _ in range(1000000)]sys.stdout.flush()
这段代码连续向标准输出缓冲区写 100 万次 Hello there!!,然后一次性刷新。我们看看用管道接住这个程序但不读取会发生什么。
import asynciofrom asyncio.subprocess import Processasync def main(): program = ['python3', 'listing_13_4.py'] process: Process = await asyncio.create_subprocess_exec(*program, stdout=asyncio .subprocess.PIPE) print(f'进程 PID 是: {process.pid}') return_code = await process.wait() print(f'进程返回: {return_code}')asyncio.run(main())
如果你运行这个例子,会看到刚打出进程号,就再也没动静了,程序永久卡住,只能强行关闭。如果没遇到,就增大循环次数,肯定会出现问题。
这么简单个程序为啥会死锁?关键在于 StreamReader 缓冲区的行为。当缓冲区满了,往里写数据就会阻塞,直到缓冲区有空位。可这时子进程还在拼命写输出,于是它就依赖 StreamReader 释放空间。但你压根没去读,缓冲区永远不空,死锁就此形成。
之前我们是靠一边等 wait 一边读输出避免了这种问题——只要缓冲区填满,我们就能实时读走,不让子进程卡住。所以用管道时,务必及时消费数据,别让缓冲区被堵住。
当然还有更绝的解法:别用 wait,改用 communicate 这个协程。它在子进程结束后才返回,而且会自动持续读取标准输出和错误输出,避免死锁。来改成用 communicate 修复刚才的问题:
import asynciofrom asyncio.subprocess import Processasync def main(): program = ['python3', 'listing_13_4.py'] process: Process = await asyncio.create_subprocess_exec(*program, stdout=asyncio .subprocess.PIPE) print(f'进程 PID 是: {process.pid}') stdout, stderr = await process.communicate() print(stdout) print(stderr) print(f'进程返回: {process.returncode}')asyncio.run(main())
这次运行你会发现所有输出瞬间刷出来,然后是 None —— 因为你没往标准输出写任何东西。内部 communicate 会创建几个后台任务不停地读取标准输出和标准错误,彻底避免死锁。但这也有个致命缺点:你无法在输出过程中即时响应。如果需要实时处理输出,比如看到某个关键字就终止,或触发新任务,那就得老老实实用 wait,但一定要记得及时读取输出,不然照样卡住。
还有一个问题是:communicate 会把所有标准输出和标准输入的数据全部缓存在内存里。如果子进程输出巨量数据,很可能撑爆内存。下节我们会讲怎么解决这些问题。
现在咱们掌握了创建、终止、读取输出的基本操作,下面开始实战:并发运行多个程序。
想象一下,你要加密内存中的多段文本,出于安全考虑,决定用 Twofish 加密算法。可是 hashlib 没支持这个算法,只好另想办法。我们可以用 gpg(GNU Privacy Guard,PGP 的免费替代品)命令行工具。官网下载地址:https://gnupg.org/download/(https://gnupg.org/download/)
首先定义好加密命令。用 gpg 可以设密码,选算法,然后把待加密的文本“喂”进去。例如要加密 "encrypt this!",命令是:
echo 'encrypt this!' | gpg -c --batch --passphrase 3ncryptm3 --cipher-algo TWOFISH
?Q+??/??*??C??H`??`)R??u??7þ_{f{R;n?FE .?b5??(?i??????o\k?b<????`%
命令行上没问题,但你在用 create_subprocess_exec 时就废了,因为没法用 | 管道(create_subprocess_shell 才支持)。那怎么传入要加密的文本?别急,communicate 与 wait 不仅能传标准输出和错误,还能传标准输入。
communicate 协程在启动时还能指定输入数据。只要你把标准输入设成 PIPE,这些数据就会被传给子进程。太棒了!我们可以直接传字符串过去。
现在试试用随机文本加密 100 条,全都并发执行:
import asyncioimport randomimport stringimport timefrom asyncio.subprocess import Processasync def encrypt(text: str) -> bytes: program = ['gpg', '-c', '--batch', '--passphrase', '3ncryptm3', '--cipher-algo', 'TWOFISH'] process: Process = await asyncio.create_subprocess_exec(*program, stdout=asyncio .subprocess.PIPE, stdin=asyncio .subprocess.PIPE) stdout, stderr = await process.communicate(text.encode()) return stdoutasync def main(): text_list = [''.join(random.choice(string.ascii_letters) for _ in range(1000)) for _ in range(100)] s = time.time() tasks = [asyncio.create_task(encrypt(text)) for text in text_list] encrypted_text = await asyncio.gather(*tasks) e = time.time() print(f'总耗时: {e - s}') print(encrypted_text)asyncio.run(main())
这里的 encrypt 协程创建了 gpg 进程,并用 communicate 把待加密的文本传进去。为简化,我们只返回标准输出结果,也没处理异常——实际项目里肯定得加点容错逻辑。主函数里我们生成 100 个随机 1000 字符长的文本,对每个创建一个 encrypt 任务,并用 gather 并发运行。最后打印总时间和加密结果。
对比一下:把 await 加在 asyncio.create_task 前面,去掉 gather,那就是串行运行。你会发现并发版本明显快多了。
但你现在只加密了 100 个。要是变成几千甚至上万个呢?现在的代码是一口气创建 100 个进程,这在资源有限的机器上几乎等于自杀。不仅内存吃光,上下文切换的开销也会爆炸。
更糟的是,gpg 本身依赖共享状态(比如加密种子文件),你一旦并发开太多,很可能看到类似下面的报错:
gpg: waiting for lock on `/Users/matthewfowler/.gnupg/random_seed'...
不光进程太多,而且还被锁住,全在那儿傻等。那怎么办?当然是限制并发数。这时就该登场“信号量”了。既然任务是计算密集型,我们限制最大并发数等于你机器的核数,就很合理了。
试试看:用限定了核数的信号量,来加密 1000 个文本,看看效果:
import asyncioimport randomimport stringimport timeimport osfrom asyncio import Semaphorefrom asyncio.subprocess import Processasync def encrypt(sem: Semaphore, text: str) -> bytes: program = ['gpg', '-c', '--batch', '--passphrase', '3ncryptm3', '--cipher-algo', 'TWOFISH'] async with sem: process: Process = await asyncio.create_subprocess_exec(*program, stdout=asyncio .subprocess.PIPE, stdin=asyncio .subprocess.PIPE) stdout, stderr = await process.communicate(text.encode()) return stdoutasync def main(): text_list = [''.join(random.choice(string.ascii_letters) for _ in range(1000)) for _ in range(1000)] semaphore = Semaphore(os.cpu_count()) s = time.time() tasks = [asyncio.create_task(encrypt(semaphore, text)) for text in text_list] encrypted_text = await asyncio.gather(*tasks) e = time.time() print(f'总耗时: {e - s}')asyncio.run(main())
相比无限并发的版本,现在不但性能提升了,内存占用也降下来。你可能会觉得这不就跟第 6 章的 ProcessPoolExecutor 一样吗?没错!其实内部 ProcessPoolExecutor 就用了信号量来控制最大并发数。
我们现在已经掌握了创建、终止、并发运行子进程的基础知识。下一节,我们来看看怎么和子进程进行更互动式的通信。