3.4 使用 selectors 模块构建套接字事件循环
操作系统提供了高效的 API,可以监控套接字的传入数据和其他事件。虽然具体的 API 取决于操作系统(如 kqueue、epoll、IOCP 是几种常见的),但这些 I/O 通知系统都遵循相似的概念。我们提供一个想要监控的套接字列表,而不是不断地检查每个套接字是否有数据,操作系统会明确地告诉我们哪些套接字有数据。
由于这是在硬件级别实现的,监控期间使用的 CPU 资源非常少,从而实现了高效的资源利用。这些通知系统正是 asyncio 实现并发的核心。理解它们是如何工作的,能让我们一窥 asyncio 事件循环底层的运作机制。
这些事件通知系统因操作系统而异。幸运的是,Python 的 selectors 模块是抽象化的,我们可以获得适用于我们运行环境的正确事件。这让我们的代码能够在不同的操作系统间移植。
这个库暴露了一个名为 BaseSelector 的抽象基类,它有多个针对不同事件通知系统的实现。它还包含一个 DefaultSelector 类,它会自动选择最适用于我们系统的实现。
BaseSelector 类有两个关键概念。首先是 注册。当我们有一个感兴趣的套接字想获取通知时,我们会将它注册到选择器,并告知它我们关心的事件类型。这些事件包括读和写。反过来,我们也可以注销不再关心的套接字。
第二个主要概念是 选择。select 方法会阻塞,直到某个事件发生,一旦发生,调用就会返回一个已准备好处理的套接字列表以及触发它的事件。它还支持超时,超时后会返回一个空的事件集。
有了这些基础构件,我们就能创建一个不会压榨我们 CPU 的非阻塞回显服务器。一旦创建了服务器套接字,我们就用默认选择器注册它,让它监听来自客户端的连接。然后,每当有人连接到我们的服务器套接字时,我们就会用选择器注册客户端的连接套接字,以监视任何发送的数据。如果从一个不是我们服务器套接字的套接字收到数据,我们知道这是来自已发送数据的客户端。然后我们接收数据并将其写回客户端。我们还会添加一个超时来演示,即使在等待时,我们也能执行其他代码。
列表 3.7 使用 selectors 构建非阻塞服务器
import selectorsimport socketfrom selectors import SelectorKeyfrom typing import List, Tupleselector = selectors.DefaultSelector()server_socket = socket.socket()server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)server_address = ('127.0.0.1', 8000)server_socket.setblocking(False)server_socket.bind(server_address)server_socket.listen()selector.register(server_socket, selectors.EVENT_READ)while True: events: List[Tuple[SelectorKey, int]] = selector.select(timeout=1) # ❶ if len(events) == 0: # ❷ print('No events, waiting a bit more!') for event, _ in events: event_socket = event.fileobj # ❸ if event_socket == server_socket: # ❹ connection, address = server_socket.accept() connection.setblocking(False) print(f"I got a connection from {address}") selector.register(connection, selectors.EVENT_READ) # ❺ else: data = event_socket.recv(1024) # ❻ print(f"I got some data: {data}") event_socket.send(data)
- ❷ 如果没有事件发生,就打印信息。这发生在超时发生时。
- ❸ 获取事件对应的套接字,存储在
fileobj 字段中。 - ❹ 如果事件套接字与服务器套接字相同,说明这是一个连接请求。
- ❻ 如果事件套接字不是服务器套接字,就从客户端接收数据并回显。
当我们运行列表 3.7 时,除非收到连接事件,否则大约每秒会打印一次 “No events, waiting a bit more!”。一旦收到连接,我们就会注册该连接以监听读事件。然后,如果客户端发送数据,我们的选择器会返回一个事件,表明有数据准备就绪,我们可以用 socket.recv 读取。
这是一个完全正常的回显服务器,支持多个客户端。这个服务器没有任何阻塞问题,因为我们只在有数据可处理时才读或写。而且,由于我们使用了操作系统高效的事件通知系统,其 CPU 利用率非常低(见图 3.4)。
图 3.4 使用 selectors 的回显服务器的 CPU 图。此方法的利用率始终在 0% 到 1% 之间浮动。
我们所构建的,大致相当于 asyncio 事件循环在后台做的事情的一部分。在这种情况下,重要的事件是套接字接收数据。我们事件循环的每一次迭代,以及 asyncio 事件循环的每一次迭代,都是由套接字事件发生或超时触发的。在 asyncio 事件循环中,当其中任何一个事件发生时,正在等待运行的协程就会运行,直到它们完成或遇到下一个 await 语句。当我们在一个使用非阻塞套接字的协程中遇到 await 时,它会将该套接字注册到系统的选择器中,并记录该协程暂停等待结果。我们可以将其转换为伪代码,以演示这一概念:
paused = []ready = []while True: paused, new_sockets = run_ready_tasks(ready) selector.register(new_sockets) timeout = calculate_timeout() events = selector.select(timeout) ready = process_events(events)
我们运行所有准备好运行的协程,直到它们在 await 语句处暂停,并将它们存储在 paused 数组中。我们还会跟踪运行这些协程时需要监视的新套接字,并将它们注册到选择器中。然后,我们计算调用 select 时所需的超时时间。虽然这个超时计算有些复杂,但通常是在查看我们计划在特定时间或持续时间内运行的任务。一个例子就是 asyncio.sleep。然后我们调用 select 并等待任何套接字事件或超时。一旦发生任一情况,我们就会处理这些事件,并将其转换为一组准备好运行的协程列表。
虽然我们构建的事件循环仅限于套接字事件,但它展示了使用选择器注册我们关心的套接字,并且只有在我们想要处理的事情发生时才被唤醒的核心思想。在本书的末尾,我们将更深入地探讨如何构建自定义事件循环。
现在,我们已经理解了让 asyncio 正常运转的大部分机制。然而,如果我们只使用 selectors 来构建应用,我们最终还是会不得不实现自己的事件循环来达到与 asyncio 相同的功能。为了了解如何用 asyncio 实现这一点,让我们回顾一下我们学到的知识,并将其转化为 async / await 代码,利用已经为我们实现好的事件循环。