当我们构建服务器时,比如我们的回显服务器,会创建一个服务器套接字,将其绑定到端口,并等待传入连接。虽然这可行,但 asyncio 让我们可以在更高的抽象层次上创建服务器,这意味着我们可以完全不用操心管理套接字。以这种方式创建服务器简化了我们编写的代码,因此,使用这些高级别接口来创建和管理服务器是推荐的做法。
我们可以通过 asyncio.start_server 协程来创建一个服务器。这个协程接受若干可选参数来配置诸如加密(SSL)等事项,但主要关注的参数是 host、port 以及 client_connected_cb。host 和 port 与我们之前见过的相同:它们是服务器套接字监听连接的地址。更有趣的部分是 client_connected_cb,它要么是一个回调函数,要么是一个协程,每当有客户端连接到服务器时都会运行。该回调接收一个 StreamReader 和一个 StreamWriter 作为参数,这使我们能够读取和写入连接到服务器的客户端数据。
当我们 awaitstart_server 时,它会返回一个 AbstractServer 对象。这个类缺乏许多我们可能需要的方法,除了 serve_forever,它会无限期地运行服务器直到终止。这个类本身也是一个异步上下文管理器。这意味着我们可以使用 async with 语法来使用它的实例,从而确保在退出时能正确关闭服务器。
为了理解如何创建服务器,让我们再次创建一个回显服务器,但让它更先进一些。除了回显输出外,我们还将显示有多少其他客户端已连接。我们还将在客户端断开连接时显示相关信息。为了管理这些,我们将创建一个名为 ServerState 的类来管理在线用户数。当有用户连接时,我们会将他们添加到服务器状态中,并通知其他客户端他们已连接。
import asyncioimport loggingfrom asyncio import StreamReader, StreamWriterclass ServerState: def __init__(self): self._writers = [] async def add_client(self, reader: StreamReader, writer: StreamWriter): <span class="fm-combinumeral">❶</span> self._writers.append(writer) await self._on_connect(writer) asyncio.create_task(self._echo(reader, writer)) async def _on_connect(self, writer: StreamWriter): <span class="fm-combinumeral">❷</span> writer.write(f'欢迎!{len(self._writers)} 位用户正在线上!\n'.encode()) await writer.drain() await self._notify_all('新用户已连接!\n') async def _echo(self, reader: StreamReader, writer: StreamWriter): <span class="fm-combinumeral">❸</span> try: while (data := await reader.readline()) != b'': writer.write(data) await writer.drain() self._writers.remove(writer) await self._notify_all(f'客户端已断开连接。{len(self._writers)} 位用户正在线上!\n') except Exception as e: logging.exception('读取客户端数据时出错。', exc_info=e) self._writers.remove(writer) async def _notify_all(self, message: str): <span class="fm-combinumeral">❹</span> for writer in self._writers: try: writer.write(message.encode()) await writer.drain() except ConnectionError as e: logging.exception('无法写入客户端。', exc_info=e) self._writers.remove(writer)async def main(): server_state = ServerState() async def client_connected(reader: StreamReader, writer: <span class="fm-combinumeral">❺</span> StreamWriter) -> None: await server_state.add_client(reader, writer) server = await asyncio.start_server(client_connected, '127.0.0.1', 8000) <span class="fm-combinumeral">❻</span> async with server: await server.serve_forever()asyncio.run(main())
❶ 将客户端添加到服务器状态,并创建一个回显任务。❷ 在新连接时,通知客户端有多少用户在线,并通知其他用户有新用户连接。❸ 处理用户输入回显,当客户端断开连接时通知其他用户。❹ 辅助方法,向所有其他用户发送消息。如果发送失败,则移除该用户。❺ 当客户端连接时,将其添加到服务器状态。❻ 启动服务器,并开始永久服务。
当用户连接到我们的服务器时,client_connected 回调会响应,并传递给该用户的读取器和写入器,这反过来会调用服务器状态的 add_client 协程。在 add_client 协程中,我们存储 StreamWriter,以便向所有已连接的客户端发送消息,并在客户端断开连接时将其移除。然后我们调用 _on_connect,向客户端发送一条消息,告知他们有多少其他用户在线。在 _on_connect 中,我们还通知任何其他已连接的客户端有新用户连接。
_echo 协程与我们过去做过的类似,但不同之处在于,当用户断开连接时,我们通知其他已连接的客户端有人已断开连接。运行此代码后,你应该会得到一个功能正常的回显服务器,每个独立的客户端都能知道何时有新用户连接或断开连接。
我们现在看到如何创建一个比以往更先进的 asyncio 服务器。接下来,我们将在此基础上进一步学习,创建一个聊天服务器和聊天客户端——这将会更加高级。