3.5 在 asyncio 事件循环中构建回显服务器
使用 select 通常对于大多数应用来说太底层了。我们可能希望在等待套接字数据时在后台运行代码,或者希望定时运行后台任务。如果仅用 selectors,我们很可能需要自己构建事件循环,而 asyncio 已经为我们提供了一个精心实现的版本。此外,协程和任务在 selectors 之上提供了抽象,使得我们的代码更容易实现和维护,因为我们根本不需要考虑 selectors。
现在,我们对 asyncio 事件循环的工作原理有了更深的理解,让我们回到上一节构建的回显服务器,并再次用协程和任务来重建它。我们仍然使用底层套接字来实现这一点,但我们将使用 asyncio 基础的 API,这些 API 返回协程来管理它们。我们还会为回显服务器添加一些额外功能,以演示几个关键概念,说明 asyncio 是如何工作的。
鉴于套接字是一个相对低级的概念,处理它们的方法在 asyncio 事件循环本身。我们主要会用到三个协程:sock_accept、sock_recv 和 sock_sendall。它们类似于我们之前使用的套接字方法,只不过它们接受一个套接字作为参数,并返回一个协程,我们可以 await 它,直到我们有数据可以处理。
我们先来看 sock_accept。这个协程类似于我们在第一次实现中看到的 socket.accept 方法。这个方法返回一个元组(一个存储有序值序列的数据结构),包含一个套接字连接和一个客户端地址。我们传入我们感兴趣的套接字,然后可以 await 它返回的协程。一旦这个协程完成,我们就能得到连接和地址。这个套接字必须是非阻塞的,并且应该已经绑定到一个端口:
connection, address = await loop.sock_accept(socket)
sock_recv 和 sock_sendall 的调用方式与 sock_accept 类似。它们都接收一个套接字,然后我们可以 await 结果。sock_recv 会 await 直到套接字有我们可以处理的字节。sock_sendall 接收一个套接字和我们想要发送的数据,会等待直到所有我们想发送的数据都已发送,并在成功时返回 None:
data = await loop.sock_recv(socket)success = await loop.sock_sendall(socket, data)
有了这些基础构件,我们就能将之前的方法转换为使用协程和任务的版本。
在第2章中,我们介绍了协程和任务。那么,在我们的回显服务器中,什么时候应该只用协程,什么时候应该将协程包装成任务?让我们分析一下我们希望应用的行为来做出决定。
我们先来看看我们希望应用如何监听连接。当我们监听连接时,由于 socket.accept 只会给我们一个客户端连接,我们一次只能处理一个连接。在后台,如果有多个连接同时到达,传入的连接会被存储在一个称为“队列”的地方,但这里我们不深入讨论其工作原理。
由于我们不需要并发处理多个连接,一个无限循环的单个协程是合理的。这将允许其他代码在我们等待连接时并发运行。我们定义一个名为 listen_for_connections 的协程,它会无限循环并监听任何传入的连接:
async def listen_for_connections(server_socket: socket, loop: AbstractEventLoop): while True: connection, address = await loop.sock_accept(server_socket) connection.setblocking(False) print(f"Got a connection from {address}")
现在,我们有了一个监听连接的协程,那么如何处理已连接客户端的数据读写呢?这应该是协程,还是一个包装在任务中的协程?在这种情况下,我们会有多个连接,每个连接都可能随时发送数据给我们。我们不希望一个连接的数据等待阻塞另一个连接,所以我们需要并发地从多个客户端读写数据。由于我们需要同时处理多个连接,为每个连接创建一个任务来读写数据是合理的。每次我们获得一个连接时,我们都会创建一个任务来同时从该连接读取数据和向其写入数据。
我们创建一个名为 echo 的协程,负责处理来自连接的数据。这个协程会无限循环,监听来自客户端的数据。一旦收到数据,它就会将其发送回客户端。
然后,在 listen_for_connections 中,我们会为每个获得的连接创建一个新任务,包装我们的 echo 协程。有了这两个协程的定义,我们现在就有了构建 asyncio 回显服务器所需的一切。
列表 3.8 构建一个 asyncio 回显服务器
import asyncioimport socketfrom asyncio import AbstractEventLoopasync def echo(connection: socket, loop: AbstractEventLoop) -> None: while data := await loop.sock_recv(connection, 1024): # ❶ await loop.sock_sendall(connection, data) # ❷async def listen_for_connection(server_socket: socket, loop: AbstractEventLoop): while True: connection, address = await loop.sock_accept(server_socket) connection.setblocking(False) print(f"Got a connection from {address}") asyncio.create_task(echo(connection, loop)) # ❸async def main(): server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_address = ('127.0.0.1', 8000) server_socket.setblocking(False) server_socket.bind(server_address) server_socket.listen() await listen_for_connection(server_socket, asyncio.get_event_loop()) # ❹asyncio.run(main())
- ❸ 每当获得一个连接时,就创建一个
echo 任务来监听客户端数据。
前面列表的架构如图 3.5 所示。我们有一个协程 listen_for_connection 在监听连接。一旦客户端连接,我们的协程就会为每个客户端生成一个 echo 任务,然后监听数据并将其写回客户端。
图 3.5 监听连接的协程为每个连接生成一个任务。
当我们运行这个应用时,我们能够同时连接多个客户端,并并发地向它们发送数据。在后台,这全部使用了 selectors,所以我们的 CPU 利用率保持低位。
我们现在已经完全使用 asyncio 构建了一个功能齐全的回显服务器!那么,我们的实现有没有错误?事实上,我们设计的这个回显服务器在 echo 任务失败时确实存在问题,我们需要处理。
网络连接往往不可靠,我们的应用代码中可能会遇到意外的异常。如果读取或写入客户端失败并抛出异常,我们的应用会如何表现?为了测试这一点,让我们修改 echo 的实现,当客户端发送特定关键字时抛出异常:
async def echo(connection: socket, loop: AbstractEventLoop) -> None: while data := await loop.sock_recv(connection, 1024): if data == b'boom\r\n': raise Exception("Unexpected network error") await loop.sock_sendall(connection, data)
现在,每当客户端发送“boom”时,我们就会抛出一个异常,任务就会崩溃。那么,当我们连接一个客户端并向服务器发送这条消息时会发生什么?我们会看到一个带有警告的追踪信息,如下所示:
Task exception was never retrievedfuture: <Task finished name='Task-2' coro=<echo() done, defined at asyncio_echo.py:5> exception=Exception('Unexpected network error')>Traceback (most recent call last): File "asyncio_echo.py", line 9, in echo raise Exception("Unexpected network error")Exception: Unexpected network error
这里的重要部分是 Task exception was never retrieved。这是什么意思?当异常在任务中抛出时,任务被视为已完成,其结果是一个异常。这意味着异常不会向上抛出到调用栈。此外,我们这里也没有清理。如果抛出异常,我们无法对任务失败做出反应,因为我们从未检索到异常。
为了让异常到达我们,我们必须在 await 表达式中使用任务。当我们 await 一个失败的任务时,异常会在我们执行 await 的地方被抛出,追踪信息也会反映这一点。如果我们不在应用的某个地方 await 一个任务,我们就有风险永远看不到任务抛出的异常。虽然在示例中我们看到了异常输出,这可能会让我们觉得问题不大,但我们的应用可能会以微妙的方式改变,导致我们永远看不到这条消息。
为了演示这一点,假设我们没有在 listen_for_connections 中忽略 echo 任务,而是将它们保存在一个列表中,如下所示:
tasks = []async def listen_for_connection(server_socket: socket, loop: AbstractEventLoop): while True: connection, address = await loop.sock_accept(server_socket) connection.setblocking(False) print(f"Got a connection from {address}") tasks.append(asyncio.create_task(echo(connection, loop)))
人们可能会期望这与之前的行为相同。如果我们发送“boom”消息,我们会看到异常打印出来,以及“未检索到任务异常”的警告。然而,事实并非如此,因为我们实际上直到强制终止应用前都不会看到任何输出!
这是因为我们保留了对任务的引用。asyncio 只有在任务被垃圾回收时才会打印失败任务的消息和追踪信息。这是因为它无法判断该任务是否会在应用的其他地方被 await,从而在那里引发异常。由于这些复杂性,我们要么必须 await 我们任务,要么处理任务可能抛出的所有异常。那么,我们如何在回显服务器中做到这一点呢?
解决这个问题的第一步是将 echo 协程中的代码包裹在 try/catch 语句中,记录异常,并关闭连接:
import loggingasync def echo(connection: socket, loop: AbstractEventLoop) -> None: try: while data := await loop.sock_recv(connection, 1024): print('got data!') if data == b'boom\r\n': raise Exception("Unexpected network error") await loop.sock_sendall(connection, data) except Exception as ex: logging.exception(ex) finally: connection.close()
这将解决异常导致服务器抱怨任务异常未被检索的直接问题,因为我们在协程内部处理了它。它还会在 finally 块中正确关闭套接字,因此在发生故障时,我们不会留下悬空的未关闭异常。
重要的是要注意,这种实现将正确关闭应用关闭时任何开放的客户端连接。为什么会这样?在第2章中,我们提到 asyncio.run 在应用关闭时会取消我们剩余的所有任务。我们还了解到,当我们取消一个任务时,每次尝试 await 它时都会引发 CancelledError。
这里的关键是注意异常被抛出的位置。如果我们的任务正在等待 await loop.sock_recv 之类的语句,而我们取消了该任务,那么 CancelledError 就会从 await loop.sock_recv 行抛出。这意味着在上述情况下,finally 块将被执行,因为我们是在取消任务时在 await 表达式上抛出了异常。如果我们把异常块改为捕获和记录这些异常,你将看到每个创建的任务都会出现一个 CancelledError。
我们现在已经处理了 echo 任务失败时的直接问题。如果我们还想在应用关闭时提供一些错误清理或遗留任务的清理,该怎么办?我们可以通过 asyncio 的信号处理器来实现。