我们现在知道如何创建服务器并处理异步命令行输入。我们可以将这两方面的知识结合起来,创建两个应用程序:第一个是能同时接收多个聊天客户端的聊天服务器,第二个是能连接到服务器并发送和接收聊天消息的聊天客户端。
在开始设计我们的应用之前,先列出一些需求,这有助于我们做出正确的设计决策。首先是服务器的需求:
- 一旦用户连接,他们应该能够向服务器发送聊天消息,每条消息都应该发送给所有已连接的用户。
- 为了防止空闲用户占用资源,如果用户空闲超过一分钟,服务器应该断开他们的连接。
- 当用户启动应用时,客户端应该提示输入用户名,并尝试连接到服务器。
- 连接成功后,用户应看到其他客户端的消息从屏幕顶部向下滚动。
- 用户应该在屏幕底部有一个输入字段。当用户按回车时,输入框中的文本应该被发送到服务器,然后再发送到所有其他已连接的客户端。
考虑到这些需求,让我们先思考一下客户端与服务器之间的通信应该是什么样子的。首先,我们需要从客户端向服务器发送一条包含用户名的消息。我们需要区分连接与消息发送,因此我们将引入一个简单的命令协议来表明我们正在发送用户名。为了保持简单,我们只是发送一个字符串,以名为 CONNECT 的命令开头,后面跟上用户提供的用户名。例如,CONNECT MissIslington 就是我们发送给服务器以连接用户名为 “MissIslington” 的用户的指令。
一旦我们连接成功,我们就直接向服务器发送消息,服务器会将消息发送给所有已连接的客户端(包括我们自己;如需要,你可以优化掉这个,不必多发一份)。为了使应用更健壮,你可能希望考虑增加一个服务器发送回客户端的命令,以确认消息已被接收,但为简洁起见,这里我们略过。
考虑到这一点,我们有足够的信息来开始设计我们的服务器了。我们将创建一个 ChatServer 类,类似于我们之前的部分。一旦客户端连接,我们将等待他们提供带有 CONNECT 命令的用户名。假设他们提供了,我们将创建一个任务来监听来自客户端的消息,并将它们发送给所有其他连接的客户端。为了跟踪连接的客户端,我们将维护一个连接用户名与其 StreamWriter 实例的字典。如果一个连接的用户空闲超过一分钟,我们将断开他们的连接,并从字典中移除他们,同时向其他用户发送他们已离开聊天的消息。
import asyncioimport loggingfrom asyncio import StreamReader, StreamWriterclass ChatServer: def __init__(self): self._username_to_writer = {} async def start_chat_server(self, host: str, port: int): server = await asyncio.start_server(self.client_connected, host, port) async with server: await server.serve_forever() async def client_connected(self, reader: StreamReader, writer: StreamWriter):<span class="fm-combinumeral">❶</span> command = await reader.readline() print(f'CONNECTED {reader} {writer}') command, args = command.split(b' ') if command == b'CONNECT': username = args.replace(b'\n', b'').decode() self._add_user(username, reader, writer) await self._on_connect(username, writer) else: logging.error('从客户端收到了无效命令,断开连接。') writer.close() await writer.wait_closed() def _add_user(self, username: str, reader: <span class="fm-combinumeral">❷</span> StreamReader, writer: StreamWriter): <span class="fm-combinumeral">❷</span> self._username_to_writer[username] = writer asyncio.create_task(self._listen_for_messages(username, reader)) async def _on_connect(self, username: str, writer: StreamWriter): <span class="fm-combinumeral">❸</span> writer.write(f'欢迎!{len(self._username_to_writer)} 位用户正在线上!\n'.encode()) await writer.drain() await self._notify_all(f'{username} 已连接!\n') async def _remove_user(self, username: str): writer = self._username_to_writer[username] del self._username_to_writer[username] try: writer.close() await writer.wait_closed() except Exception as e: logging.exception('关闭客户端写入器时出错,忽略。', exc_info=e) async def _listen_for_messages(self, username: str, reader: StreamReader): <span class="fm-combinumeral">❹</span> try: while (data := await asyncio.wait_for(reader.readline(), 60)) != b'': await self._notify_all(f'{username}: {data.decode()}') await self._notify_all(f'{username} 已离开聊天群聊。\n') except Exception as e: logging.exception('读取客户端数据时出错。', exc_info=e) await self._remove_user(username) async def _notify_all(self, message: str): <span class="fm-combinumeral">❺</span> inactive_users = [] for username, writer in self._username_to_writer.items(): try: writer.write(message.encode()) await writer.drain() except ConnectionError as e: logging.exception('无法写入客户端。', exc_info=e) inactive_users.append(username) [await self._remove_user(username) for username in inactive_users]async def main(): chat_server = ChatServer() await chat_server.start_chat_server('127.0.0.1', 8000)asyncio.run(main())
❶ 等待客户端提供有效的用户名命令;否则,断开连接。❷ 存储用户的流写入器实例,并创建一个任务来监听消息。❸ 一旦用户连接,通知所有其他用户他们已连接。❹ 监听来自客户端的消息,并将其发送给所有其他客户端,最多等待一分钟接收消息。❺ 向所有连接的客户端发送消息,并移除任何已断开连接的用户。
我们的 ChatServer 类将服务器的所有功能封装在一个简洁的接口中。主要入口点是 start_chat_server 协程。该协程在指定的主机和端口上启动服务器,并调用 serve_forever。对于服务器的 client_connected 回调,我们使用 client_connected 协程。该协程等待客户端的第一个数据行,如果收到一个有效的 CONNECT 命令,就调用 _add_user 然后调用 _on_connect;否则,终止连接。
_add_user 函数 [注:此处原文标注 marker-1031643 有误,应为 ❷ 标签] 将用户名和用户的流写入器存储在内部字典中,然后创建一个任务来监听用户的聊天消息。_on_connect 协程向客户端发送欢迎消息,并通知所有其他连接的客户端用户已连接。
当我们调用 _add_user 时,我们创建了一个 asyncio.Task,用于运行 _listen_for_messages 协程。这个协程是应用的核心所在。我们无限循环,读取来自客户端的消息,直到看到空行,这表示客户端已断开连接。一旦我们得到一条消息,就调用 _notify_all 将聊天消息发送给所有已连接的客户端。为了满足要求,如果客户机闲置超过一分钟就应该断开连接,我们将 readline 协程包装在 wait_for 中。如果客户端闲置超过一分钟,这将抛出一个 TimeoutError。在这种情况下,我们有一个宽泛的异常捕获块,捕获 TimeoutError 和任何其他抛出的异常。我们通过从 _username_to_writer 字典中移除客户端来处理任何异常,这样我们就停止向他们发送消息。
我们现在有了一个完整的服务器,但没有客户端,服务器就是无意义的。我们将按照之前编写的命令行数据库客户端的方式实现客户端。我们将创建一个协程来监听来自服务器的消息,并将它们追加到消息存储中,当有新消息时重绘屏幕。我们还会将输入置于屏幕底部,当用户按下回车时,我们将消息发送到聊天服务器。
import asyncioimport osimport loggingimport ttyfrom asyncio import StreamReader, StreamWriterfrom collections import dequefrom chapter_08.listing_8_5 import create_stdin_readerfrom chapter_08.listing_8_7 import *from chapter_08.listing_8_8 import read_linefrom chapter_08.listing_8_9 import MessageStoreasync def send_message(message: str, writer: StreamWriter): writer.write((message + '\n').encode()) await writer.drain()async def listen_for_messages(reader: StreamReader, message_store: MessageStore): <span class="fm-combinumeral">❶</span> while (message := await reader.readline()) != b'': await message_store.append(message.decode()) await message_store.append('服务器已关闭连接。')async def read_and_send(stdin_reader: StreamReader, writer: StreamWriter): <span class="fm-combinumeral">❷</span> while True: message = await read_line(stdin_reader) await send_message(message, writer)async def main(): async def redraw_output(items: deque): save_cursor_position() move_to_top_of_screen() for item in items: delete_line() sys.stdout.write(item) restore_cursor_position() tty.setcbreak(0) os.system('clear') rows = move_to_bottom_of_screen() messages = MessageStore(redraw_output, rows - 1) stdin_reader = await create_stdin_reader() sys.stdout.write('请输入用户名:') username = await read_line(stdin_reader) reader, writer = await asyncio.open_connection('127.0.0.1', 8000) <span class="fm-combinumeral">❸</span> writer.write(f'CONNECT {username}\n'.encode()) await writer.drain() message_listener = asyncio.create_task(listen_for_messages(reader, messages))<span class="fm-combinumeral">❹</span> input_listener = asyncio.create_task(read_and_send(stdin_reader, writer)) try: await asyncio.wait([message_listener, input_listener], return_when=asyncio.FIRST_COMPLETED) except Exception as e: logging.exception(e) writer.close() await writer.wait_closed()asyncio.run(main())
❶ 监听来自服务器的消息,并将它们追加到消息存储中。❷ 读取用户输入,并将它们发送到服务器。❸ 打开与服务器的连接,并发送带有用户名的连接消息。❹ 创建一个任务来监听消息,另一个任务来监听输入;等待其中一个完成。
我们首先向用户询问用户名,一旦获得,就向服务器发送我们的 CONNECT 消息。然后,我们创建两个任务:一个用于监听来自服务器的消息,另一个用于连续读取聊天消息并发送到服务器。然后,我们将这两个任务打包在 asyncio.wait 中,等待任一任务完成。我们这样做是因为服务器可能断开连接,或者输入监听器可能抛出异常。如果我们分别 await 每个任务,我们可能会陷入僵局。例如,如果服务器断开了连接,而我们率先 await 了输入监听器,我们可能找不到停止它的方法。使用 wait 协程可以防止这种问题,因为一旦消息监听器或输入监听器之一完成,我们的应用就会退出。如果我们希望拥有更健壮的逻辑,可以通过检查 wait 返回的 done 集合和 pending 集合来实现。例如,如果输入监听器抛出异常,我们可以取消消息监听器任务。
如果你先运行服务器,再运行几个聊天客户端,你将能够像普通聊天应用一样发送和接收消息。例如,两个用户连接到聊天室,可能产生如下输出:
欢迎!1 位用户正在线上!MissIslington 已连接!SirBedevere 已连接!SirBedevere: 那是你的鼻子吗?MissIslington: 不,是假的!
我们构建了一个聊天服务器和客户端,能够同时处理多个用户,而且仅使用一个线程。这个应用还有改进空间。例如,你可以考虑在发送失败时重试,或设计一个协议来确认客户端已收到消息。让这个应用达到生产级别的健壮性相当复杂,超出了本书的范围,但可能对读者来说是个有趣的练习,因为需要思考诸多可能的故障点。通过运用本例中探索的概念,你将能够创建满足自己需求的健壮客户端和服务器应用。
- 我们学会了如何使用低层级的传输和协议接口来构建一个简单的 HTTP 客户端。这些接口是更高层级 asyncio 流接口的基础,通常不推荐在一般情况下使用。
- 我们学会了如何使用
StreamReader 与 StreamWriter 类来构建网络应用。这些高层级接口是使用 asyncio 流的推荐方式。 - 我们学会了如何使用流来创建非阻塞的命令行应用,即使在后台运行任务时也能保持对用户输入的响应。
- 我们学会了如何使用
start_server 协程来创建服务器。这种方法是推荐在 asyncio 里创建服务器的方式,而不是直接使用套接字。 - 我们学会了如何使用流和服务器创建响应式客户端和服务器应用。利用这些知识,我们可以构建基于网络的应用,如聊天服务器和客户端。