传统上,当需要用 Python 获取用户输入时,我们会用 input 函数。这个函数会暂停执行,直到用户输入内容并按下回车。但如果想在等待输入的同时让代码在后台运行怎么办?比如,我们可能希望用户能并发启动多个耗时任务,例如长时间运行的 SQL 查询。又或者在命令行聊天应用中,用户应能一边接收别人的消息,一边输入新消息。
由于 asyncio 是单线程的,所以在 asyncio 应用中使用 input 意味着会阻塞事件循环,直到用户输入,从而导致整个应用停滞。即使使用任务来启动后台操作也无济于事。为了演示这一点,让我们尝试创建一个应用,让用户输入一个睡眠时间。我们希望能同时运行多个睡眠任务,还能继续接受用户输入,所以我们会询问用户想要睡多久,然后在一个循环里创建 delay 任务。
import asynciofrom util import delayasync def main(): while True: delay_time = input('请输入要休眠的时间:') asyncio.create_task(delay(int(delay_time)))asyncio.run(main())
如果代码能按预期工作,输入一个数字后,我们期望看到 "sleeping for n second(s)" 被打印出来,几分钟后再看到 "finished sleeping for n second(s)"。但实际上,我们只看到提示输入,看不到任何输出。这是因为代码中没有 await,所以任务永远没有机会在事件循环中运行。我们可以用 await asyncio.sleep(0) 来“hack”解决,这相当于“交出事件循环”,任务就能运行起来(这个概念将在第 14 章详细介绍)。即便如此,因为它仍会阻塞整个线程,input 调用依然会阻止所有后台任务的完成。
真正理想的情况是,input 函数能变成一个协程,这样我们就能写成 delay_time = await input('请输入要休眠的时间:')。如果能做到这点,我们的任务就能正常调度并持续运行,同时我们也在等待用户输入。可惜,input 没有对应的协程版本,所以我们得另辟蹊径。
这时候,协议和流读取器就能派上用场了。回想一下,流读取器有个 readline 协程,正是我们所需要的。如果我们能找到一种方法把流读取器连接到标准输入,就可以用这个协程来获取用户输入。
asyncio 提供了事件循环的一个协程方法 connect_read_pipe,它可以将一个协议连接到一个类似文件的对象,这几乎是我们想要的。该方法接受一个 协议工厂 和一个 管道(pipe)。协议工厂就是一个创建协议实例的函数。而“管道”是一个类似文件的对象,定义了诸如 read、write 这类方法。connect_read_pipe 协程会将管道连接到协议工厂创建的协议上,把管道中的数据发给协议。
说到标准终端输入,sys.stdin 正好符合我们对类似文件对象的要求,可以传给 connect_read_pipe。一旦调用这个协程,我们就会得到一个元组,里面是协议工厂创建的协议和一个 ReadTransport。接下来的问题是:我们该在工厂里创建什么协议?又该如何将其与一个拥有 readline 协程的 StreamReader 连接?
asyncio 提供了一个名为 StreamReaderProtocol 的实用类,用于将流读取器实例与协议连接起来。当我们实例化这个类时,需要传入一个流读取器实例。这个协议类随后会委托给所创建的流读取器,从而让我们能用它来从标准输入读取数据。将这些组件整合起来,我们就能创建一个在等待用户输入时不会阻塞事件循环的命令行应用。
对于使用 Windows 的用户很不幸,在 Windows 上 无法与 一起使用。这是由于微软实现文件描述符的方式导致的一个未修复的缺陷。要想在 Windows 上运行,你需要使用第 7 章探讨的技术,在单独的线程中调用 。你可以在 https://bugs.python.org/issue26832 查看更多信息。
由于我们将在本章余下部分反复使用这个异步的标准输入读取器,我们不妨把它单独放在一个文件 listing_8_5.py 里。之后就可以在章节其他地方导入它。
import asynciofrom asyncio import StreamReaderimport sysasync def create_stdin_reader() -> StreamReader: stream_reader = asyncio.StreamReader() protocol = asyncio.StreamReaderProtocol(stream_reader) loop = asyncio.get_running_loop() await loop.connect_read_pipe(lambda: protocol, sys.stdin) return stream_reader
在这段代码中,我们创建了一个可重用的协程 create_stdin_reader,它会创建一个 StreamReader 用于异步读取标准输入。我们首先创建一个流读取器实例,并将其传给流读取器协议。然后调用 connect_read_pipe,传入一个返回之前创建的流读取器协议的 lambda 函数。我们还将 sys.stdin 作为参数传入,以便将标准输入连接到我们的流读取器协议上。因为不打算用返回的 transport 与 protocol,所以直接忽略。现在我们可以用这个函数异步读取标准输入,构建应用了。
import asynciofrom chapter_08.listing_8_5 import create_stdin_readerfrom util import delayasync def main(): stdin_reader = await create_stdin_reader() while True: delay_time = await stdin_reader.readline() asyncio.create_task(delay(int(delay_time)))asyncio.run(main())
在主协程中,我们调用 create_stdin_reader,然后无限循环,使用 readline 协程等待用户输入。当用户按回车键后,这个协程就会交付输入的内容。拿到输入后,我们将其转为整数(需要注意的是,真实应用中应加入处理非法输入的代码,否则现在传入字符串会导致程序崩溃),并创建一个 delay 任务。运行这段代码后,你就可以并发运行多个 delay 任务,同时继续输入命令行指令。例如,分别输入 5 秒、4 秒和 3 秒,你应该能看到如下输出:
5sleeping for 5 second(s)4sleeping for 4 second(s)3sleeping for 3 second(s)finished sleeping for 5 second(s)finished sleeping for 4 second(s)finished sleeping for 3 second(s)
这样看起来是可行的,但这种方法有一个致命缺陷。如果我们在输入时,恰好有消息从延迟任务输出到控制台,会发生什么呢?为了测试,我们输入 3 秒的延迟,然后快速地猛按 1 键。你会看到类似这样的结果:
3sleeping for 3 second(s)111111finished sleeping for 3 second(s)11
在我们输入的过程中,延迟任务的消息打印了出来,打断了我们的输入行,并强制它显示在下一行。此外,输入缓冲区现在只剩下 11,意味着如果此时按回车,我们会创建一个针对这个时长的任务,丢失掉前面的输入。这是因为,默认情况下,终端运行在 “精简模式”(cooked mode)。在这种模式下,终端会自动将用户输入回显到标准输出,并处理特殊按键,如回车和 Ctrl-C。问题是,delay 协程在写入标准输出的同时,终端也在回显输出,导致了竞争条件。
此外,标准输出的写入位置是屏幕上的单一位置,这被称为 光标(cursor),就像你在文字处理器里看到的那样。当你输入时,光标停在键盘输入输出的那行。这意味着,其他协程输出的消息也会出现在同一行,造成奇怪的行为。
为解决这些问题,我们需要两种方案的结合。第一种是将输入的回显从终端移到我们自己的 Python 应用中。这样一来,即使在回显用户输入时,也不会同时写入其他协程的输出,避免了竞争条件。第二种是写入消息时移动光标,确保输出消息不会写在输入行上。这可以通过修改终端设置和使用转义序列来实现。
由于我们的终端处于“精简模式”,它会帮我们在 readline 之外处理输入回显。那么,如何将这个处理流程纳入我们的应用,从而避免之前遇到的竞争条件呢?
答案是切换到 “原始模式”(raw mode)。在原始模式下,终端不再帮我们缓冲、预处理或回显输入。每一次按键都会直接发送到应用。接下来的事由我们自己决定,我们可以自行决定是否回显和预处理。这虽然意味着要多做一些工作,但也赋予了我们对写入标准输出的精细控制,正好可以用来规避竞争条件。
当然,Python 还允许进入一种叫 “短语模式”(cbreak mode),它行为类似于原始模式,但能为我们保留对如 Ctrl-C 这类按键的解释,减轻了部分工作量。我们可以通过 tty 模块和 setcbreak 函数进入短语模式:
import ttyimport systty.setcbreak(sys.stdin)
进入 cbreak 模式后,我们就需要重新思考应用的设计了。readline 协程将不再生效,因为在原始模式下它不会自动回显输入。取而代之的,我们应该一次读取一个字符,并存储在自己的内部缓冲区中,同时逐个回显输入。我们之前创建的标准输入流读取器有一个 read 方法,它接受要读取的字节数。调用 read(1) 会一次读取一个字符,我们可以将它存入缓冲区并回显到标准输出。
现在,我们有两个拼图的关键部分:进入 cbreak 模式和一次读取一个字符并回显。接下来,我们需要考虑如何展示 delay 协程的输出,确保它不会干扰我们的输入。
让我们定下几点需求,让应用更友好,解决输出覆盖输入行的问题。这些需求也将指导我们的实现:
考虑到这些要求,我们如何显示 delay 协程的输出呢?考虑到我们想要在消息超出屏幕行数时实现向上滚动的效果,直接用 print 输出会变得很麻烦。因此,我们采取另一种做法:维护一个双端队列(deque),里面存放我们要写入标准输出的消息。我们将双端队列的最大元素数设为终端屏幕的行数。当双端队列满了时,队列尾部的元素会被丢弃,这就自然形成了滚动效果。每当有新消息添加到双端队列,我们就移动到屏幕顶部,重绘每一条消息。这样做就能获得所需的滚动效果,而无需记录标准输出的完整状态信息。这使得应用的运行流程如图 8.3 所示。
- 将光标移至屏幕底部,当按键被按下时,将其添加到内部缓冲区,并回显到标准输出。
- 当用户按下回车键时,创建一个
delay 任务。不再直接写入标准输出,而是将消息追加到一个最大元素数等于控制台行数的双端队列中。 - 一旦消息进入双端队列,我们就重绘屏幕输出。首先将光标移至屏幕左上角,然后打印双端队列中的所有消息。完成后,将光标恢复到原来的位置。
为实现这个应用,我们需要先学习如何移动光标。我们可以使用 ANSI 转义码(escape codes) 来实现。这些是特殊的代码,可以写入标准输出,执行如更改文本颜色、移动光标、删除行等操作。转义序列通常以转义码开头;在 Python 中,可以通过打印 \033 到控制台来实现。我们常用的许多转义序列都以 控制序列引入符(control sequence introducers) 开始,其格式为 \033[。为了更好地理解,我们来看如何将光标移动到当前行下方的第 5 行:
sys.stdout.write('\033[5E')
这个转义序列以控制序列引入符开头,后面跟着 5E。其中 5 代表从当前光标行向下移动的行数,而 E 是“向下移动指定行数”的代码。转义序列虽然简洁,但阅读起来有点难懂。在下一个清单中,我们会创建几个名称清晰的函数,说明每个转义码的作用,并在后续的清单中导入它们。如果你想了解更多关于 ANSI 转义序列及其工作原理的信息,维基百科的相关文章非常全面,地址是:https://en.wikipedia.org/wiki/ANSI_escape_code(https://en.wikipedia.org/wiki/ANSI_escape_code)。
我们来思考一下,为了满足我们的需求,需要如何移动光标。首先,我们需要将光标移到屏幕底部以接受用户输入。其次,当用户按回车后,我们需要清除他们输入的文本。为了从屏幕顶部打印协程输出消息,我们需要能够移动到屏幕的第一行。我们还需要保存和恢复光标的当前位置,因为在打字时,协程可能会输出消息,这时我们必须将光标移回正确的位置。我们可以使用以下转义码函数来实现:
import sysimport shutildef save_cursor_position(): sys.stdout.write('\0337')def restore_cursor_position(): sys.stdout.write('\0338')def move_to_top_of_screen(): sys.stdout.write('\033[H')def delete_line(): sys.stdout.write('\033[2K')def clear_line(): sys.stdout.write('\033[2K\033[0G')def move_back_one_char(): sys.stdout.write('\033[1D')def move_to_bottom_of_screen() -> int: _, total_rows = shutil.get_terminal_size() input_row = total_rows - 1 sys.stdout.write(f'\033[{input_row}E') return total_rows
现在我们有了一个可复用的函数集来移动光标,接下来实现一个用于逐字符读取标准输入的可复用协程。我们将使用 read 协程来完成这项工作。一旦读取到一个字符,我们将其写入标准输出,同时存入内部缓冲区。因为我们还想处理用户按 Delete 键的情况,所以我们还要监听 Delete 键。当用户按下时,我们从缓冲区中删除该字符,并从标准输出中删除。
import sysfrom asyncio import StreamReaderfrom collections import dequefrom chapter_08.listing_8_7 import move_back_one_char, clear_lineasync def read_line(stdin_reader: StreamReader) -> str: def erase_last_char(): <span class="fm-combinumeral">❶</span> move_back_one_char() sys.stdout.write(' ') move_back_one_char() delete_char = b'\x7f' input_buffer = deque() while (input_char := await stdin_reader.read(1)) != b'\n': if input_char == delete_char: <span class="fm-combinumeral">❷</span> if len(input_buffer) > 0: input_buffer.pop() erase_last_char() sys.stdout.flush() else: input_buffer.append(input_char) <span class="fm-combinumeral">❸</span> sys.stdout.write(input_char.decode()) sys.stdout.flush() clear_line() return b''.join(input_buffer).decode()
❶ 用于从标准输出中删除前一个字符的便捷函数❷ 如果输入字符是退格键,则移除最后一个字符。❸ 如果输入字符不是退格键,则将其追加到缓冲区并回显。
我们的协程接收一个已连接到标准输入的流读取器。然后,我们定义了一个便捷函数来从标准输出中删除前一个字符,因为在用户按 Delete 键时需要用到。然后,我们进入一个 while 循环,逐字符读取,直到用户按回车。如果用户按了 Delete 键,我们从缓冲区和标准输出中移除最后一个字符。否则,我们将字符追加到缓冲区并回显。当用户按回车后,我们清空输入行并返回缓冲区的内容。
接下来,我们需要定义一个用来存储我们想打印到标准输出的消息的队列。由于我们希望在添加消息时重新绘制输出,我们将创建一个包装 deque 的类,并接收一个可等待的回调函数。我们传入的回调将负责重新绘制输出。我们还会为这个类添加一个 append 协程方法,该方法将向 deque 追加项目,并使用当前 deque 中的项调用回调。
from collections import dequefrom typing import Callable, Deque, Awaitableclass MessageStore: def __init__(self, callback: Callable[[Deque], Awaitable[None]], max_size: int): self._deque = deque(maxlen=max_size) self._callback = callback async def append(self, item): self._deque.append(item) await self._callback(self._deque)
现在,我们拥有了构建应用所需的所有零件。我们将重写 delay 协程,使其将消息添加到消息存储中。然后,在主协程中,我们将创建一个帮助函数,用于将双端队列中的消息重新绘制到标准输出。这个函数就是我们传给 MessageStore 的回调。接着,我们将使用之前实现的 read_line 协程来接受用户输入,当用户按下回车时,创建一个延迟任务。
import asyncioimport osimport ttyfrom collections import dequefrom chapter_08.listing_8_5 import create_stdin_readerfrom chapter_08.listing_8_7 import *from chapter_08.listing_8_8 import read_linefrom chapter_08.listing_8_9 import MessageStoreasync def sleep(delay: int, message_store: MessageStore): await message_store.append(f'Starting delay {delay}') <span class="fm-combinumeral">❶</span> await asyncio.sleep(delay) await message_store.append(f'Finished delay {delay}')async def main(): tty.setcbreak(sys.stdin) os.system('clear') rows = move_to_bottom_of_screen() async def redraw_output(items: deque): <span class="fm-combinumeral">❷</span> save_cursor_position() move_to_top_of_screen() for item in items: delete_line() print(item) restore_cursor_position() messages = MessageStore(redraw_output, rows - 1) stdin_reader = await create_stdin_reader() while True: line = await read_line(stdin_reader) delay_time = int(line) asyncio.create_task(sleep(delay_time, messages))asyncio.run(main())
❶ 将输出消息添加到消息存储。❷ 回调函数,将光标移至屏幕顶部;重绘输出,再将光标移回。
运行这段代码,你将能够创建延迟,同时在输入时也能看到内容写入控制台,即使你在敲击输入。虽然它比第一次尝试更复杂,但我们构建的应用避免了早期面临的向标准输出写入的问题。
我们构建的这个应用对于 delay 协程是有效的,但现实一点说,能否应用于更真实的情况?我们刚才定义的这些部件足够健壮,足以通过重用它们来创建更有用的应用。例如,我们可以思考如何创建一个命令行数据库客户端。某些查询可能需要很长时间执行,但你可能想在执行的同时运行其他查询或取消正在进行的查询。使用我们刚构建的代码,就可以实现这种类型的客户端。我们来使用第 5 章的电商业务产品数据库来构建这样一个客户端,其中我们创建了一个包含服装品牌、产品和商品编码的模式。我们将创建一个连接池来连接数据库,并重用之前例子中的代码来接受和运行查询。目前,我们会在控制台输出查询的基本信息——比如返回的行数。
import asyncioimport asyncpgimport osimport ttyfrom collections import dequefrom asyncpg.pool import Poolfrom chapter_08.listing_8_5 import create_stdin_readerfrom chapter_08.listing_8_7 import *from chapter_08.listing_8_8 import read_linefrom chapter_08.listing_8_9 import MessageStoreasync def run_query(query: str, pool: Pool, message_store: MessageStore): async with pool.acquire() as connection: try: result = await connection.fetchrow(query) await message_store.append(f'从查询中获取了 {len(result)} 行: {query}') except Exception as e: await message_store.append(f'查询失败: {e} 从: {query}')async def main(): tty.setcbreak(0) os.system('clear') rows = move_to_bottom_of_screen() async def redraw_output(items: deque): save_cursor_position() move_to_top_of_screen() for item in items: delete_line() print(item) restore_cursor_position() messages = MessageStore(redraw_output, rows - 1) stdin_reader = await create_stdin_reader() async with asyncpg.create_pool(host='127.0.0.1', port=5432, user='postgres', password='password', database='products', min_size=6, max_size=6) as pool: while True: query = await read_line(stdin_reader) asyncio.create_task(run_query(query, pool, messages))asyncio.run(main())
我们的代码几乎和之前一样,唯一的区别是从 delay 协程变成了 run_query 协程。它不再是随意休眠一段任意时间,而是运行用户输入的查询,而这个查询可能花费任意长的时间。这使我们能够在其他查询仍在运行的同时,从命令行发出新的查询,并且在输入新查询时仍能查看已完成查询的输出。
现在我们知道了如何创建能够处理输入,同时让其他代码执行并在控制台上写入的命令行客户端。接下来,我们将学习如何使用更高级的 asyncio API 来创建服务器。