传输和协议是底层接口,适合需要对数据发送和接收过程有完全控制的场景。比如,如果你在设计一个网络库或网页框架,可能会考虑使用传输和协议。但对于大多数应用来说,我们并不需要这么细粒度的控制,如果直接使用它们,往往要写一大堆重复代码。
因此,asyncio 的设计者们创建了更高层级的 流(streams) 接口。这个接口将传输和协议的标准用例封装成了两个易于理解和使用的类:StreamReader 和 StreamWriter。顾名思义,它们分别负责从流中读取和向流中写入。使用这两个类来开发网络应用,是如今在 asyncio 里推荐的方式。
为了弄懂如何使用这些接口,我们拿之前的发送 HTTP GET 请求的例子来改造一下,把它改成使用流的方式。不再直接实例化 StreamReader 和 StreamWriter,asyncio 提供了一个库级的协程函数 open_connection,它可以帮我们创建好这两个对象。这个协程接收一个我们要连接的主机和端口,返回一个包含 StreamReader 与 StreamWriter 的元组。我们的计划是使用 StreamWriter 发送请求,用 StreamReader 读取响应。StreamReader 的方法很容易理解,我们有一个方便的 readline 协程,会一直等,直到收到一行数据。也可以用 read 协程来等待指定数量的字节到达。
StreamWriter 稍微复杂一些。它有个 write 方法,我们预期如此,但它不是协程。在内部,流写入器会立即尝试将数据写入套接字的输出缓冲区,但这个缓冲区可能已满。如果套接字的写缓冲区满了,数据就会被暂存在一个内部队列里,稍后才能进入缓冲区。这带来了潜在问题:调用 write 不一定立刻就把数据发出去。想象网络连接很慢,每秒只能发送 1KB,但你的应用却每秒产生 1MB 的数据。这时,应用的写缓冲区会以远快于发送的速度填满,最终触发机器的内存限制,导致程序崩溃。
那么,我们如何确保所有数据都被正确发送?解决这个问题的方法是使用一个叫 drain 的协程方法。它会阻塞,直到队列中所有数据都已发送到套接字,保证了在继续执行前数据已全部写出。最佳实践是,在每次调用 write 后,都要 awaitdrain。技术上讲,每个 write 后不强制要求 drain,但养成这个习惯有助于避免潜在的错误。
import asynciofrom asyncio import StreamReaderfrom typing import AsyncGeneratorasync def read_until_empty(stream_reader: StreamReader) -> AsyncGenerator[str, None]: while response := await stream_reader.readline(): <span class="fm-combinumeral">❶</span> yield response.decode()async def main(): host: str = 'www.example.com' request: str = f"GET / HTTP/1.1\r\n" \ f"Connection: close\r\n" \ f"Host: {host}\r\n\r\n" stream_reader, stream_writer = await asyncio.open_connection('www.example.com', 80) try: stream_writer.write(request.encode()) <span class="fm-combinumeral">❷</span> await stream_writer.drain() responses = [response asyncfor response inread_until_empty(stream_reader)] <span class="fm-combinumeral">❸</span> print(''.join(responses)) finally: stream_writer.close() <span class="fm-combinumeral">❹</span> await stream_writer.wait_closed()asyncio.run(main())
❶ 读取一行并解码,直到没有更多内容。❷ 写入 HTTP 请求,并排空写入器。❸ 读取每一行,存储到列表中。❹ 关闭写入器,并等待它完全关闭。
在上面的代码中,我们首先创建了一个便捷的异步生成器,用于从 StreamReader 读取所有行,将其解码为字符串,直到无数据可读。然后在 main 协程中,我们打开与 example.com 的连接,过程中创建了 StreamReader 与 StreamWriter。接着我们用 write 和 drain 方法写入请求。请求发出后,我们用异步生成器获取每一行响应,并存入 responses 列表。最后,通过调用 close 来关闭 StreamWriter 实例,并 await wait_closed 协程。为什么要同时调用方法和协程呢?原因是调用 close 时会发生一系列异步事件,比如注销套接字并调用底层传输的 connection_lost。这些操作都在事件循环的后续迭代中完成,意味着调用 close 后连接并不会立即关闭。如果你想在继续之前等待连接关闭,或担心关闭期间可能出现异常,那么调用 wait_closed 是最佳实践。
我们现在已经通过构建网络请求掌握了流接口的基本知识。这些类的用途不仅限于网络应用。接下来,我们将看到如何利用流读取器来创建非阻塞的命令行应用。