有时候,我们需要等待某个外部事件发生才能继续。例如,等待缓冲区填满后再处理数据,等待设备连接到应用,或者等待初始化完成。我们也可能有多个任务在等待数据,而数据还未就绪。Event 对象为我们提供了机制,可以在等待特定事件发生时保持空闲。
Event 类内部维护一个标志位,表示事件是否已发生。我们可以通过两个方法控制这个标志位:set 和 clear。set 方法将内部标志位设为 True,并通知所有等待的观察者事件已发生。clear 将内部标志位设为 False,此时所有等待该事件的协程将再次被阻塞。
有了这两个方法,我们可以管理内部状态,但如何阻塞直到事件发生呢?Event 类有一个协程方法叫 wait。当我们 await 这个协程时,它会阻塞,直到有人调用事件对象的 set。一旦 set 被调用,所有后续的 wait 调用将不再阻塞,会立刻返回。如果我们调用 clear 之后再次调用 set,那么 wait 的调用将再次阻塞,直到下一次 set。
我们来创建一个简单的例子,看看事件的实际用法。假设有两个任务依赖于某个事件的发生。我们将让它们等待,直到我们触发事件。
import asyncioimport functoolsfrom asyncio import Eventdef trigger_event(event: Event): event.set()async def do_work_on_event(event: Event): print('Waiting for event...') await event.wait() print('Performing work!') await asyncio.sleep(1) print('Finished work!') event.clear()async def main(): event = asyncio.Event() asyncio.get_running_loop().call_later(5.0, functools.partial(trigger_event, event)) await asyncio.gather(do_work_on_event(event), do_work_on_event(event))asyncio.run(main())
在上面的代码中,我们创建了一个 do_work_on_event 协程,它接收一个事件,并首先调用其 wait 协程。这将阻塞,直到有人调用事件的 set 方法来表明事件发生了。我们还创建了一个简单的 trigger_event 函数,用于设置一个给定的事件。在主协程中,我们创建了一个事件对象,并用 call_later 在5秒后触发该事件。然后我们用 gather 调用 do_work_on_event 两次,这会为我们创建两个并发任务。你会看到这两个任务会空闲5秒钟,直到我们触发事件,之后它们开始工作,输出如下:
Waiting for event...Waiting for event...Triggering event!Performing work!Performing work!Finished work!Finished work!
这展示了基本用法:等待事件会阻塞一个或多个协程,直到我们触发事件,之后它们才能继续工作。接下来,我们看一个更真实的例子。假设你正在构建一个接收客户端文件上传的 API。由于网络延迟和缓冲,文件上传可能需要一段时间才能完成。考虑到这个约束,我们希望我们的 API 有一个协程能阻塞,直到文件完全上传。调用者可以等待所有数据到达后,再进行任何操作。
我们可以用事件来实现。创建一个协程监听上传的数据,并将其存储在内部缓冲区中。一旦文件结束,我们就触发事件,表示上传已完成。我们再创建一个协程方法来获取文件内容,它将等待事件被设置。一旦事件被设置,我们就可以返回完整的上传数据。我们来创建一个名为 FileUpload 的类来实现这个接口:
import asynciofrom asyncio import StreamReader, StreamWriterclass FileUpload: def __init__(self, reader: StreamReader, writer: StreamWriter): self._reader = reader self._writer = writer self._finished_event = asyncio.Event() self._buffer = b'' self._upload_task = None def listen_for_uploads(self): self._upload_task = asyncio.create_task(self._accept_upload()) async def _accept_upload(self): while data := await self._reader.read(1024): self._buffer = self._buffer + data self._finished_event.set() self._writer.close() await self._writer.wait_closed() async def get_contents(self): await self._finished_event.wait() return self._buffer
现在,我们来创建一个文件上传服务器来测试这个接口。假设每次成功上传后,我们都想将内容打印到标准输出。当客户端连接时,我们会创建一个 FileUpload 对象,并调用 listen_for_uploads。然后,我们再创建一个单独的任务,去等待 get_contents 的结果。
import asynciofrom asyncio import StreamReader, StreamWriterfrom chapter_11.listing_11_11 import FileUploadclass FileServer: def __init__(self, host: str, port: int): self.host = host self.port = port self.upload_event = asyncio.Event() async def start_server(self): server = await asyncio.start_server(self._client_connected, self.host, self.port) await server.serve_forever() async def dump_contents_on_complete(self, upload: FileUpload): file_contents = await upload.get_contents() print(file_contents) def _client_connected(self, reader: StreamReader, writer: StreamWriter): upload = FileUpload(reader, writer) upload.listen_for_uploads() asyncio.create_task(self.dump_contents_on_complete(upload))async def main(): server = FileServer('127.0.0.1', 9000) await server.start_server()asyncio.run(main())
在上面的代码中,我们创建了一个 FileServer 类。每当有客户端连接到服务器时,我们就会创建一个第11.11节中定义的 FileUpload 类的实例,它会开始监听来自连接客户端的上传。我们同时创建一个任务来调用 dump_contents_on_complete 协程。这个函数调用 get_contents 协程(只有在上传完成时才会返回),并将文件内容打印到标准输出。
你可以用 netcat 来测试这个服务器。选择一个文件,运行以下命令,将 file 替换为你选择的文件名:
cat file | nc localhost 9000
你就会看到,一旦文件全部上传完毕,其内容就会被打印到标准输出。
需要注意的是,事件有个潜在缺点:它们可能比你的协程响应速度更快。假设你使用单个事件来唤醒多个任务,如生产者-消费者工作流。如果所有工作者都在长时间忙碌,事件在你工作期间运行了,你将永远看不到它。我们来创建一个示例证明这一点。我们创建两个工作协程,每个都做5秒的工作。我们再创建一个每秒触发一次事件的任务,频率超过了消费者的处理能力。
import asynciofrom asyncio import Eventfrom contextlib import suppressasync def trigger_event_periodically(event: Event): while True: print('Triggering event!') event.set() await asyncio.sleep(1)async def do_work_on_event(event: Event): while True: print('Waiting for event...') await event.wait() event.clear() print('Performing work!') await asyncio.sleep(5) print('Finished work!')async def main(): event = asyncio.Event() trigger = asyncio.wait_for(trigger_event_periodically(event), 5.0) with suppress(asyncio.TimeoutError): await asyncio.gather(do_work_on_event(event), do_work_on_event(event), trigger)asyncio.run(main())
运行这段代码,你会看到事件触发,两个工作协程开始并行工作。与此同时,我们不断触发事件。由于工作者正忙,它们在第二次工作结束后才会第二次调用 event.wait(),因此看不见第二轮事件。如果你关心每个事件都必须响应,那就需要使用队列机制,这将在下一章学习。
事件适用于需要在特定事件发生时发出警报的情况,但如果你需要在等待事件的同时独占访问共享资源(比如数据库连接),该怎么办?这时,条件变量(Condition)可以帮助我们解决这类流程。