有一件可能不太显而易见的事:asyncio 在概念上和 async/await 语法及协程是分开的。甚至,协程的类定义都不在 asyncio 的库模块里!
协程和 async/await 语法本身是独立于它们能否被执行的概念。虽然我们已经有了默认的事件循环实现(即 asyncio),但现在我们可以用任何事件循环实现,甚至可以自己造一个。在前一节,我们看到了如何用性能更好的 uvloop 替换 asyncio。现在,我们来看看如何构建一个简单版本的事件循环,专门处理非阻塞的套接字。
在 2016 年的 Python 3.5 版本引入 async/await 语法之前,协程和生成器的关系是很清晰的。让我们用旧语法写一个睡眠 1 秒的简单协程,看看背后的原理。
import asyncio@asyncio.coroutinedef coroutine(): print('Sleeping!') yield from asyncio.sleep(1) print('Finished!')asyncio.run(coroutine())
这里,我们不再使用 async 关键字,而是用 @asyncio.coroutine 装饰器来声明这是一个协程函数。我们也不再用 await,而是用熟悉的 yield from 语法。现在看来,async/await 只不过是这种构造的语法糖。
请注意,基于生成器的协程在未来的 Python 3.10 版本中将被彻底移除。你可能在老代码里见到它们,但以后千万别再用这种风格写新的异步代码了。
那为什么生成器在单线程并发模型中这么合理呢?还记得协程在遇到阻塞操作时需要暂停执行,以便让其他协程有机会运行。而生成器在碰到 yield 点时会暂停执行,相当于在半路被打断。这意味着,如果有两个生成器,我们可以交替运行它们。先让第一个生成器运行到 yield 点(也就是协程术语里的 await 点),然后让第二个运行到它自己的 yield 点,如此反复,直到两个生成器都耗尽为止。下面我们用一个极简的例子来演示生成器的交替执行。
from typing import Generatordef generator(start: int, end: int): for i in range(start, end): yield ione_to_five = generator(1, 5)five_to_ten = generator(5, 10)def run_generator_step(gen: Generator[int, None, None]): # ❶ try: return gen.send(None) except StopIteration as si: return si.valuewhile True: # ❷ one_to_five_result = run_generator_step(one_to_five) five_to_ten_result = run_generator_step(five_to_ten) print(one_to_five_result) print(five_to_ten_result) if one_to_five_result is None and five_to_ten_result is None: break
上面的例子中,我们创建了一个简单的生成器,用来从一个起始整数递增到一个结束整数,途中不断 yield 值。然后我们创建了两个实例:一个是从 1 到 4,另一个是从 5 到 9。
我们还定义了一个便利方法 run_generator_step 来处理运行生成器的一步。生成器类有一个 send 方法,它可以将生成器推进到下一个 yield 语句,中间的所有代码都会被执行。调用 send 之后,我们就可以认为生成器被暂停了,直到下次调用 send。这就允许我们运行其他生成器的代码。send 方法可以接收一个值作为参数,传给生成器。这里我们不需要,所以传了个 None。当生成器达到末尾时,它会抛出 StopIteration 异常,该异常包含了生成器返回的值,我们在这里把它返回了。最后,我们创建了一个循环,一步一步运行每个生成器。这相当于把两个生成器交错执行,最终输出如下:
想象一下,我们不是 yield 数字,而是 yield 到某个慢速操作上。一旦慢操作完成,我们就能恢复生成器,从上次中断的地方继续执行,同时让其他未暂停的生成器也能运行其他代码。这正是事件循环的核心机制。我们持续追踪那些因慢操作而暂停执行的生成器。一旦一个生成器暂停,其他生成器就可以继续运行。当慢操作完成时,我们通过再次调用 send 来唤醒之前的生成器,让它前进到下一个 yield 点。
如前所述,async 和 await 只是生成器语法的糖衣。我们可以通过创建一个协程实例并调用 send 来证明这一点。我们来写一个例子,包含两个只打印简单消息的协程,还有一个调用前两者的第三个协程。然后,我们用生成器的 send 方法来看看是怎么调用这些协程的。
async def say_hello(): print('Hello!')async def say_goodbye(): print('Goodbye!')async def meet_and_greet(): await say_hello() await say_goodbye()coro = meet_and_greet()coro.send(None)
Hello!Goodbye!Traceback (most recent call last): File "chapter_14/listing_14_7.py", line 16, in <module> coro.send(None)StopIteration
调用 send 会立刻执行 meet_and_greet 里的所有代码。因为没有真正需要“暂停”的地方(即使在 await 语句里,代码也是立即运行的),所以整个过程很快就结束了。
那我们该怎么让协程在慢操作上暂停并等待恢复呢?为此,我们先来定义一个自定义的可等待对象,这样我们就可以使用 await 语法,而不是生成器风格的语法。
怎么定义可等待对象?它们在底层是如何工作的?我们可以通过在一个类上实现 __await__ 方法来定义一个可等待对象,但这个方法具体应该怎么实现?它应该返回什么?
__await__ 方法的唯一要求是返回一个迭代器,但这个要求本身并没有太大帮助。我们能不能在事件循环的背景下,让“迭代器”这个概念变得有意义?为了搞清楚这是如何工作的,我们将实现自己的 asyncio.Future 的版本,叫做 CustomFuture,然后用它来构建我们自己的事件循环实现。
回忆一下,Future 是一个包装着未来可能出现的值的对象,它有两个状态:完成和未完成。想象我们处于一个无限的事件循环中,想通过迭代器来检查一个 Future 是否完成了。如果操作已经完成了,我们只需返回结果,迭代器也就结束了。如果没有完成,我们需要某种方式告诉它“我还没完,稍后再检查我”。在这种情况下,迭代器可以直接 yield 自己!
这就是我们将在 CustomFuture 类中实现 __await__ 方法的方式。如果结果还没出来,我们的迭代器就返回 self 本身;如果结果已经有了,我们就返回结果,迭代器也就完成了。如果还没有完成,我们就 yield self。如果结果没出来,下一次尝试推进迭代器时,__await__ 里的代码会重新执行。在这个实现中,我们还添加了一个方法,用来向 Future 添加一个回调函数,当值被设置时,这个回调函数就会运行。后面在实现事件循环时,我们还需要这个功能。
class CustomFuture: def __init__(self): self._result = None self._is_finished = False self._done_callback = None def result(self): return self._result def is_finished(self): return self._is_finished def set_result(self, result): self._result = result self._is_finished = True if self._done_callback: self._done_callback(result) def add_done_callback(self, fn): self._done_callback = fn def __await__(self): if not self._is_finished: yield self return self.result()
在上面的代码中,我们定义了 CustomFuture 类,其中包含了 __await__ 方法,以及设置结果、获取结果和添加回调的方法。__await__ 方法会检查 Future 是否已经完成。如果完成了,我们就直接返回结果,迭代器就结束了。如果没完成,我们就返回 self,意味着这个迭代器会一直无限地返回自身,直到值被设置为止。从生成器的角度看,这意味着我们可以不停地调用 __await__,直到有人给我们设置好值。
下面我们用一个小例子来感受一下这种工作流程。我们创建一个自定义 Future,并在几次迭代后设置它的值,每次都调用 __await__。
from listing_14_8 import CustomFuturefuture = CustomFuture()i = 0while True: try: print('Checking future...') gen = future.__await__() gen.send(None) print('Future is not done...') if i == 1: print('Setting future value...') future.set_result('Finished!') i = i + 1 except StopIteration as si: print(f'Value is: {si.value}') break
在上面的例子中,我们创建了一个自定义 Future,然后进入一个循环,调用 await 方法并尝试推进迭代器。如果 Future 完成了,就会抛出 StopIteration 异常,里面包含 Future 的结果。否则,我们的迭代器就会返回 Future 本身,我们继续循环。在这个例子中,我们在几轮迭代后设置了值,输出如下:
Checking future...Future is not done...Checking future...Future is not done...Setting future value...Checking future...Value is: Finished!
这个例子只是为了强化我们对可等待对象工作原理的理解,真实代码里一般不会这么写,因为通常会有其他东西来设置 Future 的值。接下来,我们把它扩展一下,用在套接字和 selector 模块上。
在第 3 章,我们学过一点 selector 模块,它允许我们注册回调函数,当套接字发生某个事件(如新连接或有数据可读)时,就运行这些回调。现在,我们将利用我们自定义的 Future 类来与 selectors 交互,当套接字事件发生时,就设置 Future 的结果。
记得,selectors 让我们能注册回调函数,当套接字发生读或写事件时就运行。这个概念完美契合我们之前构建的 Future。我们可以注册 set_result 方法作为读事件的回调。当我们想异步等待套接字的结果时,我们创建一个新的 Future,把这个 Future 的 set_result 方法注册到 selector 模块中,指定这个套接字的读事件,然后返回这个 Future。然后我们就可以 await 它,当 selector 为我们调用回调时,我们就能拿到结果了。
为了亲眼见证这一切,我们来写一个应用,监听一个非阻塞套接字上的连接。一旦接到连接,我们就返回它,然后让应用程序终止。
import functoolsimport selectorsimport socketfrom listing_14_8 import CustomFuturefrom selectors import BaseSelectordef accept_connection(future: CustomFuture, connection: socket): # ❶ print(f'We got a connection from {connection}!') future.set_result(connection)async def sock_accept(sel: BaseSelector, sock) -> socket: # ❷ print('Registering socket to listen for connections') future = CustomFuture() sel.register(sock, selectors.EVENT_READ, functools.partial(accept_connection, future)) print('Pausing to listen for connections...') connection: socket = await future return connectionasync def main(sel: BaseSelector): sock = socket.socket() sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind(('127.0.0.1', 8000)) sock.listen() sock.setblocking(False) print('Waiting for socket connection!') connection = await sock_accept(sel, sock) # ❸ print(f'Got a connection {connection}!')selector = selectors.DefaultSelector()coro = main(selector)while True: # ❹ try: state = coro.send(None) events = selector.select() for key, mask in events: print('Processing selector events...') callback = key.data callback(key.fileobj) except StopIteration as si: print('Application finished!') break
- ❶:当客户端连接时,把连接套接字设置到
Future 中。 - ❷:注册
accept_connection 函数到 selector,并暂停等待客户端连接。 - ❹:无限循环,对主协程调用
send。每当一个 selector 事件发生时,就运行注册的回调。
在上面的代码中,我们首先定义了一个 accept_connection 函数。它接收一个 CustomFuture 以及一个客户端套接字。我们打印一条消息表示有了一个套接字,然后把该套接字作为 Future 的结果设置进去。然后我们定义 sock_accept 函数;这个函数接收一个服务器套接字和一个 selector,并注册 accept_connection(绑定到一个 CustomFuture)作为服务器套接字读事件的回调。然后我们 await 这个 Future,暂停等待连接的到来,最后返回它。
接着我们定义了一个主协程函数。在这个函数里,我们创建一个服务器套接字,然后 awaitsock_accept 协程,直到我们收到连接,之后打印一条消息并终止。这样我们就构建了一个“最小可行的事件循环”。我们创建了主协程的实例,传入一个 selector,然后无限循环。在循环里,我们首先调用 send,将主协程推进到第一个 await 语句,然后调用 selector.select,它会阻塞直到有客户端连接。然后我们调用任何注册的回调;在这种情况下,总是 accept_connection。一旦有人连接,我们再次调用 send,这会将所有协程再次推进,让我们的应用得以完成。如果你运行以下代码,并通过 Telnet 连接,你应该能看到类似下面的输出:
Waiting for socket connection!Registering socket to listen for connectionsPausing to listen for connections...Processing selector events...We got a connection from <socket.socketfd=4,family=AddressFamily.AF_INET,type=SocketKind.SOCK_STREAM,proto=0,laddr=('127.0.0.1', 8000)>!Got a connection <socket.socketfd=4,family=AddressFamily.AF_INET,type=SocketKind.SOCK_STREAM,proto=0,laddr=('127.0.0.1', 8000)>!Application finished!
我们现在已经用 async / await 关键字构建了一个基本的异步应用,而完全没有依赖 asyncio!我们结尾的 while 循环就是一个极简的事件循环,它生动地展示了 asyncio 事件循环的核心思想。当然,我们还不能同时做太多事情,因为我们缺乏创建 tasks 的能力。
tasks 是 Future 和 coroutine 的结合体。当它所包裹的 coroutine 结束时,其 Future 就完成了。通过继承,我们可以用 Future 包装一个 coroutine,方法是继承我们的 CustomFuture 类并写一个接受 coroutine 的构造函数,但我们仍然需要一个方法来运行这个 coroutine。我们可以通过构建一个称为 step 的方法来实现,它会调用 coroutine 的 send 方法,并跟踪结果,本质上就是每调用一次就运行一步 coroutine。
在实现这个方法时,我们必须考虑到 send 也可能返回其他 Future。为了处理这种情况,我们需要使用 send 返回的任何 Future 的 add_done_callback 方法。我们会注册一个回调,当 Future 完成时,它会调用 send 方法,把 Future 的结果传给 task 的 coroutine。
from chapter_14.listing_14_8 import CustomFutureclass CustomTask(CustomFuture): def __init__(self, coro, loop): super(CustomTask, self).__init__() self._coro = coro self._loop = loop self._current_result = None self._task_state = None loop.register_task(self) # ❶ def step(self): # ❷ try: if self._task_state is None: self._task_state = self._coro.send(None) if isinstance(self._task_state, CustomFuture): # ❸ self._task_state.add_done_callback(self._future_done) except StopIteration as si: self.set_result(si.value) def _future_done(self, result): # ❹ self._current_result = result try: self._task_state = self._coro.send(self._current_result) except StopIteration as si: self.set_result(si.value)
- ❸:如果协程产生了
Future,就添加一个完成回调。
在上面的代码中,我们继承了 CustomFuture,创建了一个构造函数,接收一个 coroutine 和一个事件循环,通过调用 loop.register_task 把任务注册到循环中。然后,在 step 方法里,我们调用 coroutine 的 send 方法,如果 coroutine 产生了 CustomFuture,我们就添加一个 done 回调。此时,我们的 done 回调会拿到 Future 的结果,然后把它发送给被封装的 coroutine,从而推动其完成。
现在,我们已经知道如何运行协程,并实现了 Future 与 Task,因此我们拥有了构建事件循环所需的所有基本构件。那么,我们的事件循环接口需要具备哪些方法来构建一个异步套接字应用呢?我们需要以下几个方法,各司其职:
- 我们需要一个方法来接收主入口协程,类似于
asyncio.run。 - 我们需要方法来接收连接、接收数据和关闭套接字。这些方法需要注册和注销套接字到
selector。 - 我们需要一个方法来注册
CustomTask;这其实就是我们之前在 CustomTask 构造函数里使用的那个方法。
首先,我们谈谈主入口点;我们将称这个方法为 run。这是我们事件循环的核心引擎。这个方法接收一个主入口协程,调用其 send,在一个无限循环中跟踪生成器的结果。如果主协程产生了 Future,我们会添加一个 done 回调来跟踪 Future 完成后的结果。一旦完成,我们就会调用 step 方法来运行任何已注册的任务,然后调用 selector 等待任意套接字事件触发。一旦事件触发,我们就会运行相关的回调并触发循环的下一次迭代。如果在任何时刻主协程抛出了 StopIteration 异常,我们知道应用已经结束,可以退出并返回异常内的值。
接着,我们需要协程方法来接收套接字连接和从客户端套接字接收数据。我们的策略是:创建一个 CustomFuture 实例,回调函数将设置其结果,并将这个回调注册到 selector 以在读事件时触发。然后我们 await 这个 Future。
最后,我们需要一个方法来注册任务到事件循环。这个方法只需要接收一个任务并将其添加到列表中。在每次事件循环迭代中,我们都会调用 step 方法来运行所有已注册的任务,如果它们准备好了,就推进它们。实现这一切,我们就得到了一个“最小可行”的事件循环。
import functoolsimport selectorsfrom typing import Listfrom chapter_14.listing_14_11 import CustomTaskfrom chapter_14.listing_14_8 import CustomFutureclass EventLoop: _tasks_to_run: List[CustomTask] = [] def __init__(self): self.selector = selectors.DefaultSelector() self.current_result = None def _register_socket_to_read(self, sock, callback): # ❶ future = CustomFuture() try: self.selector.get_key(sock) except KeyError: sock.setblocking(False) self.selector.register(sock, selectors.EVENT_READ, functools.partial(callback, future)) else: self.selector.modify(sock, selectors.EVENT_READ, functools.partial(callback, future)) return future def _set_current_result(self, result): self.current_result = result async def sock_recv(self, sock): # ❷ print('Registering socket to listen for data...') return await self._register_socket_to_read(sock, self.recieved_data) async def sock_accept(self, sock): # ❸ print('Registering socket to accept connections...') return await self._register_socket_to_read(sock, self.accept_connection) def sock_close(self, sock): self.selector.unregister(sock) sock.close() def register_task(self, task): # ❹ self._tasks_to_run.append(task) def recieved_data(self, future, sock): data = sock.recv(1024) future.set_result(data) def accept_connection(self, future, sock): result = sock.accept() future.set_result(result) def run(self, coro): # ❺ self.current_result = coro.send(None) while True: try: if isinstance(self.current_result, CustomFuture): self.current_result.add_done_callback(self._set_current_result) if self.current_result.result() is not None: self.current_result = coro.send(self.current_result.result()) else: self.current_result = coro.send(self.current_result) except StopIteration as si: return si.value for task in self._tasks_to_run: task.step() self._tasks_to_run = [task for task in self._tasks_to_run if not task.is_finished()] events = self.selector.select() print('Selector has an event, processing...') for key, mask in events: callback = key.data callback(key.fileobj)
- ❶:将套接字注册到
selector 上,用于读取事件。 - ❺:运行主协程直到其结束,每次迭代都执行任何待处理的任务。
我们首先定义了一个 _register_socket_to_read 便利方法。这个方法接收一个套接字和一个回调函数,并将它们注册到 selector。如果套接字还没有注册,就进行注册;如果已经注册了,就更新回调。回调的第一个参数必须是一个 Future,在这里我们创建一个新的 Future,并将其部分应用于回调函数。最后,我们返回绑定到回调的 Future,这意味着调用我们方法的用户现在可以 await 它,并暂停执行,直到回调完成。
然后我们定义了两个协程方法来接收套接字数据和接受新的客户端连接,分别是 sock_recv 与 sock_accept。这些方法调用了我们刚刚定义的 _register_socket_to_read 便利方法,传递了处理数据和新连接的回调函数(这些方法只是将数据设置到一个 Future)。
最后,我们构建了 run 方法。这个方法接收主入口协程,调用 send 来推进它,使其到达第一个挂起点,并保存 send 的结果。然后我们启动一个无限循环,首先检查主协程当前的结果是否是 CustomFuture;如果是,我们就注册一个回调来存储结果,然后可以将结果送回主协程(如果需要)。如果结果不是 CustomFuture,我们就直接将其送回协程。在控制了主协程的流程之后,我们通过调用 step 方法来运行任何已注册的任务。一旦我们运行完任务,就从任务列表中移除已完成的任务。
最后,我们调用 selector.select,它会阻塞,直到注册的套接字上发生了任何事件。一旦我们有套接字事件或一组事件,我们就循环处理它们,调用我们之前在 _register_socket_to_read 中注册的回调。在我们的实现中,任何套接字事件都会触发事件循环的一次迭代。我们现在已经实现了 EventLoop 类,准备好在没有 asyncio 支持的情况下,构建我们的第一个异步应用了!
现在,我们有了事件循环,就来构建一个非常简单的服务器应用,用于记录来自已连接客户端的消息。我们将创建一个服务器套接字,并编写一个协程函数,用来在无限循环中监听连接。一旦有了连接,我们就会创建一个任务,用于并发地从该客户端读取数据,直到它断开连接。这看起来和我们在第 3 章构建的差不多,唯一的不同是这次我们用的是自己的事件循环,而不是 asyncio 内置的那个。
import socketfrom chapter_14.listing_14_11 import CustomTaskfrom chapter_14.listing_14_12 import EventLoopasync def read_from_client(conn, loop: EventLoop): # ❶ print(f'Reading data from client {conn}') try: while data := await loop.sock_recv(conn): print(f'Got {data} from client!') finally: loop.sock_close(conn)async def listen_for_connections(sock, loop: EventLoop): # ❷ while True: print('Waiting for connection...') conn, addr = await loop.sock_accept(sock) CustomTask(read_from_client(conn, loop), loop) print(f'I got a new connection from {sock}!')async def main(loop: EventLoop): server_socket = socket.socket() server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_socket.bind(('127.0.0.1', 8000)) server_socket.listen() server_socket.setblocking(False) await listen_for_connections(server_socket, loop)event_loop = EventLoop() # ❸event_loop.run(main(event_loop))
- ❷:监听客户端连接,当有客户端连接时,创建一个任务来读取数据。
在上面的代码中,我们首先定义了一个协程函数,用于在循环中从客户端读取数据,并在接收到时打印结果。我们还定义了一个协程函数,用于在无限循环中监听来自服务器套接字的客户端连接,并在有新连接时创建一个 CustomTask 来并发监听该客户端的数据。在我们的主协程中,我们创建了一个服务器套接字,并调用了 listen_for_connections 协程函数。然后,我们创建了事件循环实现的实例,并将 main 协程传给 run 方法。
运行这段代码,你应该能通过 Telnet 用多个客户端同时连接,并向服务器发送消息。例如,两个客户端连接并发送一些测试消息,输出可能看起来像这样:
Waiting for connection...Registering socket to accept connections...Selector has an event, processing...I got a new connection from <socket.socketfd=4,family=AddressFamily.AF_INET,type=SocketKind.SOCK_STREAM,proto=0,laddr=('127.0.0.1', 8000)>!Waiting for connection...Registering socket to accept connections...Reading data from client <socket.socketfd=7,family=AddressFamily.AF_INET,type=SocketKind.SOCK_STREAM,proto=0,laddr=('127.0.0.1', 8000), raddr=('127.0.0.1', 58641)>!Registering socket to listen for data...Selector has an event, processing...Got b'test from client one!\r\n' from client!Registering socket to listen for data...Selector has an event, processing...I got a new connection from <socket.socketfd=4,family=AddressFamily.AF_INET,type=SocketKind.SOCK_STREAM,proto=0,laddr=('127.0.0.1', 8000)>!Waiting for connection...Registering socket to accept connections...Reading data from client <socket.socketfd=8,family=AddressFamily.AF_INET,type=SocketKind.SOCK_STREAM,proto=0,laddr=('127.0.0.1', 8000), raddr=('127.0.0.1', 58645)>!Registering socket to listen for data...Selector has an event, processing...Got b'test from client two!\r\n' from client!Registering socket to listen for data...
在上述输出中,一个客户端连接,触发 selector 恢复 listen_for_connections 从 loop.sock_accept 的暂停点继续执行。这同时也注册了客户端连接,当我们为 read_from_client 创建任务时。第一个客户端发送消息 test from client one!,这又触发了 selector 调用任何回调。在这种情况下,我们推进了 read_from_client 任务,将客户端的消息输出到控制台。然后,第二个客户端连接,同样的过程再次发生。
虽然这还不是生产级别的事件循环(我们根本没有很好地处理异常,且仅允许套接字事件触发事件循环迭代,还有很多不足之处),但它让你对事件循环和 Python 异步编程的内部工作机制有了一定了解。一个值得一试的练习是,将这里学到的概念拓展开来,构建一个可用于生产的事件循环。或许,你能创造下一代异步的 Python 框架。
- 我们可以通过检查可调用参数是否为协程,来设计能够同时处理协程和普通函数的 API。
- 当你需要在协程之间传递状态,但又希望这些状态独立于你的参数时,可以使用上下文本地变量。
- asyncio 的
sleep 协程可以用来强制触发事件循环的一次迭代。当你需要在没有自然 await 点的情况下触发事件循环做一些事时,这很有用。 - asyncio 只是 Python 标准的事件循环实现。其他实现存在,比如
uvloop,我们可以随时更换它们,同时仍然使用 async/await 语法。我们也可以自己创建事件循环,设计具有不同特性的系统,更好地满足我们的需求。