从高层面看,传输(transport)就是一种与任意数据流通信的抽象。当我们与一个 socket 通讯,或者与标准输入这类数据流打交道时,我们面对的都是一组熟悉的操作:从源读取数据或向源写入数据,完成后关闭连接。socket 非常契合我们对传输抽象的定义——即我们可以读写它,完成后就关闭。简单来说,传输提供了向/从一个源发送和接收数据的规范。根据不同的数据源,传输有多种实现方式。我们主要关心的是 ReadTransport、WriteTransport 以及 Transport,虽然还有用于处理 UDP 连接和子进程通信的其他类型。图 8.1 展示了传输的类继承关系。
传输负责数据的传输,而连接的生命周期管理则由“协议”负责。当我们的客户端连接服务器后,我们需要发送请求,然后处理收到的响应。这些操作都属于协议的职责范畴。注意,这里的“协议”指的是 Python 类,而不是像 HTTP 这样的网络协议。传输负责数据传输,并在事件发生时调用协议的方法,比如连接建立或数据准备就绪,如图 8.2 所示。
为理解传输和协议如何协同工作,我们来构建一个应用,执行一次简单的 HTTP GET 请求。首先,我们需要定义一个继承自 asyncio.Protocol 的类。我们将实现基类中的几个方法来发起请求、接收数据以及处理连接错误。
第一个需要实现的协议方法是 connection_made。当底层的 socket 成功连接到服务器时,传输会调用此方法。该方法接受一个 Transport 作为参数,我们可用它与服务器通信。在这里,我们立刻使用 transport 发送 HTTP 请求。
第二个需要实现的方法是 data_received。当传输接收到数据时,会调用此方法,并将数据以字节形式传给我们。由于该方法可能被多次调用,我们需要创建一个内部缓冲区来存储数据。
接下来的问题是:我们怎么知道响应已经结束?为此,我们需要实现一个叫 eof_received 的方法。当收到 “文件末尾”(EOF)信号时,该方法会被调用,对于套接字而言,这通常发生在服务器关闭连接时。一旦调用 eof_received,我们可以确保 data_received 再也不会被调用。eof_received 方法返回一个布尔值,决定如何关闭传输(在此例中,是关闭客户端套接字)。返回 False 表示传输将自行关闭,而返回 True 则意味着由我们自己写的协议来负责关闭。本例中,因为我们不需要在关闭时执行特殊逻辑,所以应该返回 False,这样就不必手动处理关闭传输。
到现在为止,我们只有办法把数据存进内部缓冲区。那么,如何让消费者获取到请求结果呢?我们可以创建一个内部的 Future 来持有完成后的结果。然后,在 eof_received 方法中,我们将这个 future 的结果设置为响应体。最后,我们定义一个名为 get_response 的协程,它将 await 这个 future。
让我们把上述思路实现成自己的协议,命名为 HTTPGetClientProtocol。
列表 8.1 使用传输和协议运行一个 HTTP 请求
import asynciofrom asyncio import Transport, Future, AbstractEventLoopfrom typing import Optionalclass HTTPGetClientProtocol(asyncio.Protocol): def __init__(self, host: str, loop: AbstractEventLoop): self._host: str = host self._future: Future = loop.create_future() self._transport: Optional[Transport] = None self._response_buffer: bytes = b'' async def get_response(self): <span class="fm-combinumeral">❶</span> return await self._future def _get_request_bytes(self) -> bytes: <span class="fm-combinumeral">❷</span> request = f"GET / HTTP/1.1\r\n" \ f"Connection: close\r\n" \ f"Host: {self._host}\r\n\r\n" return request.encode() def connection_made(self, transport: Transport): print(f'连接到 {self._host}') self._transport = transport self._transport.write(self._get_request_bytes()) <span class="fm-combinumeral">❸</span> def data_received(self, data): print(f'收到数据!') self._response_buffer = self._response_buffer + data <span class="fm-combinumeral">❹</span> def eof_received(self) -> Optional[bool]: self._future.set_result(self._response_buffer.decode()) <span class="fm-combinumeral">❺</span> return False def connection_lost(self, exc: Optional[Exception]) -> None: <span class="fm-combinumeral">❻</span> if exc is None: print('连接未出错关闭。') else: self._future.set_exception(exc)
❶ 等待内部的 future,直到从服务器收到响应。❷ 创建 HTTP 请求。❸ 建立连接后,用 transport 发送请求。❹ 一收到数据,就存到内部缓冲区。❺ 连接关闭时,用缓冲区完成 future。❻ 如果连接正常关闭,什么都不做;否则,用异常完成 future。
现在我们实现了协议,下面用它来发起一个真实的请求。为此,我们需要学习一个新的协程方法——create_connection,它是 asyncio 事件循环上的一个方法。这个方法会创建一个到指定主机的套接字连接,并将其包装在一个适当的传输中。除了主机和端口外,它还接受一个 协议工厂。协议工厂就是一个函数,用来创建协议实例;本例中,就是我们刚刚创建的 HTTPGetClientProtocol 类的实例。当我们调用这个协程时,返回的是两个对象:一个是协程创建的传输,另一个是工厂创建的协议实例。
import asynciofrom asyncio import AbstractEventLoopfrom chapter_08.listing_8_1 import HTTPGetClientProtocolasync def make_request(host: str, port: int, loop: AbstractEventLoop) -> str: def protocol_factory(): return HTTPGetClientProtocol(host, loop) _, protocol = await loop.create_connection(protocol_factory, host=host, port=port) return await protocol.get_response()async def main(): loop = asyncio.get_running_loop() result = await make_request('www.example.com', 80, loop) print(result)asyncio.run(main())
我们首先定义了一个 make_request 方法,它接收目标主机和端口,以及服务器的响应。在方法内部,我们创建了一个内层的协议工厂函数,用于创建新的 HTTPGetClientProtocol。然后,我们调用 create_connection 并传入主机和端口,它会返回一个传输和由工厂创建的协议。我们不会用到传输,所以忽略它,但我们必须保留协议实例,因为我们需要调用它的 get_response 协程。因此,我们将协议保存在 protocol 变量中。最后,我们 await 协议的 get_response,这会一直等待直到收到服务器的响应。在 main 协程中,我们 awaitmake_request 并打印结果。运行后,你应该能看到类似如下的输出(为简洁省略了部分内容):
连接到 www.example.com收到数据!HTTP/1.1 200 OKAge: 193241Cache-Control: max-age=604800Content-Type: text/html; charset=UTF-8连接未出错关闭。
我们学会了如何使用传输和协议。但这些接口是低层级的,因此不建议在实际项目中直接使用。接下来,我们看看更高层次的抽象——流(streams),它在传输和协议的基础上进行了扩展。