到现在为止,我们都是单向、非交互式的通信。但假如子进程要主动问你问题呢?比如让你输入密码、用户名之类的。
如果你只有一两次输入要处理,communicate 仍是最佳选择。之前用 gpg 加密就示范过,但让我们看看当子进程主动要输入时怎么做。
先写个简单 Python 脚本,让用户输入名字,然后回显出来:
username = input('请输入用户名: ')print(f'你的用户名是 {username}')
接下来我们用 communicate 输入用户名:
列表 13.10 使用 communicate 传标准输入
import asynciofrom asyncio.subprocess import Processasync def main(): program = ['python3', 'listing_13_9.py'] process: Process = await asyncio.create_subprocess_exec(*program, stdout=asyncio .subprocess.PIPE, stdin=asyncio .subprocess.PIPE) stdout, stderr = await process.communicate(b'Zoot') print(stdout) print(stderr)asyncio.run(main())
运行后你会看到输出包含 Please enter a username: 与 Your username is Zoot\n,接着程序就退出了。这是因为程序运行一次输入就结束,不再等待。但如果是一个反复询问输入的应用,就不灵了。
比如这个脚本,它会不停问你输入,直到你说 quit:
user_input = ''while user_input != 'quit': user_input = input('请输入要回显的内容: ') print(user_input)
因为 communicate 会一直等到子进程结束,我们得改用 wait,同时并行读取标准输出和写入标准输入。Process 类提供了 stdin 字段,类型是 StreamWriter。只要我们把标准输入设成 PIPE,就能用它来写入数据。
我们可以并行运行 StreamReader 读输出,StreamWriter 写输入,处理这类交互式程序。下面是例子:我们要向子进程发送几段文本,看看效果。
import asynciofrom asyncio import StreamWriter, StreamReaderfrom asyncio.subprocess import Processasync def consume_and_send(text_list, stdout: StreamReader, stdin: StreamWriter): for text in text_list: line = await stdout.read(2048) print(line) stdin.write(text.encode()) await stdin.drain()async def main(): program = ['python3', 'listing_13_11.py'] process: Process = await asyncio.create_subprocess_exec(*program, stdout=asyncio .subprocess.PIPE, stdin=asyncio .subprocess.PIPE) text_input = ['one\n', 'two\n', 'three\n', 'four\n', 'quit\n'] await asyncio.gather(consume_and_send(text_input, process.stdout, process.stdin), process.wait())asyncio.run(main())
b'Enter text to echo: 'b'one\nEnter text to echo: 'b'two\nEnter text to echo: 'b'three\nEnter text to echo: 'b'four\nEnter text to echo: '
我们目前的程序是个确定性的系统,输出固定、输入时机明确。但现实中很多应用可能随时问你输入,或偶尔才输出一段,也可能突然输出海量内容。我们来改一下之前的回显程序:每次输入后随机重复 1~10 次,中间加半秒延时。
from random import randrangeimport timeuser_input = ''while user_input != 'quit': user_input = input('请输入要回显的内容: ') for i in range(randrange(10)): time.sleep(.5) print(user_input)
如果我们用前面那种写法,它其实还能跑通,因为我们最终还是会发起输入请求。但弊端是:读输出和写输入的逻辑耦合得太紧。随着逻辑越来越复杂,代码会变得难以阅读和维护。
解决方法就是 解耦读写操作。我们分别写两个协程:一个专门读标准输出,一个专门写标准输入。前者收到提示后触发一个事件;后者等事件触发后再写入数据。
这样做之后,我们有两个各自专注的协程,互相不干扰,代码清晰易懂,可维护性大大提升。
import asynciofrom asyncio import StreamWriter, StreamReader, Eventfrom asyncio.subprocess import Processasync def output_consumer(input_ready_event: Event, stdout: StreamReader): while (data := await stdout.read(1024)) != b'': print(data) if data.decode().endswith("Enter text to echo: "): input_ready_event.set()async def input_writer(text_data, input_ready_event: Event, stdin: StreamWriter): for text in text_data: await input_ready_event.wait() stdin.write(text.encode()) await stdin.drain() input_ready_event.clear()async def main(): program = ['python3', 'interactive_echo_random.py'] process: Process = await asyncio.create_subprocess_exec(*program, stdout=asyncio .subprocess.PIPE, stdin=asyncio .subprocess.PIPE) input_ready_event = asyncio.Event() text_input = ['one\n', 'two\n', 'three\n', 'four\n', 'quit\n'] await asyncio.gather(output_consumer(input_ready_event, process.stdout), input_writer(text_input, input_ready_event, process.stdin), process.wait())asyncio.run(main())
上面代码里,output_consumer 协程读取标准输出,一旦碰到 “Enter text to echo:” 就设置一个 input_ready_event 事件。input_writer 协程不断轮询这个事件,一触发就写入数据,并清空事件。这样下一个循环又能等待。
通过这种设计,我们实现了职责分离:一个负责读输出,一个负责写输入,代码更清爽,更容易调试和扩展。
- 我们可以用
asyncio 的 subprocess 模块,通过 create_subprocess_shell 与 create_subprocess_exec 异步启动子进程。尽量用 create_subprocess_exec,因为它跨平台行为更一致。 - 默认情况下,子进程的输出会流向我们的应用标准输出。如需读取或与标准输入/输出互动,必须将它们配置为指向
StreamReader 与 StreamWriter。 - 并发运行大量子进程时,建议使用信号量来控制数量,避免过度占用系统资源。
- 可以用
communicate 协程向子进程的标准输入发送数据,实现一次性输入。