现在,我们已经创建了一个能处理多个并发连接的回显服务器,并且还能正确记录错误并处理异常时的清理。如果我们需要关闭应用怎么办?难道不能让我们允许任何正在进行的消息完成后再关闭吗?我们可以通过向应用添加自定义关闭逻辑来实现这一点,这允许任何进行中的任务有几秒钟的时间来完成发送它们可能想发送的消息。虽然这不会是一个生产级别的实现,但我们会学习关于关闭以及取消所有运行任务的核心概念。
Windows 不支持信号。因此,本节仅适用于基于 Unix 的系统。Windows 使用不同的系统来处理此事,但在编写本书时,该系统与 Python 不兼容。要了解如何使此代码跨平台运行,请参阅 Stack Overflow 上的以下回答:https://stackoverflow.com/questions/35772001(https://stackoverflow.com/questions/35772001)
信号是基于 Unix 的操作系统中用于异步通知进程发生了操作系统级事件的一种概念。虽然听起来非常底层,但你可能对某些信号很熟悉。例如,一个常见的信号是 SIGINT,即“中断信号”。当按下 CTRL-C 杀死命令行应用程序时,就会触发此信号。在 Python 中,我们通常通过捕获 KeyboardInterrupt 异常来处理它。另一个常见信号是 SIGTERM,即“终止信号”。当运行 kill 命令来停止特定进程的执行时,就会触发此信号。
为了实现自定义关闭逻辑,我们将在应用中为 SIGINT 和 SIGTERM 信号实现监听器。然后,在这些监听器中,我们将实现逻辑,允许我们的任何 echo 任务有几秒钟的时间来完成。
我们如何在应用中监听信号?asyncio 事件循环允许我们直接使用 add_signal_handler 方法监听我们指定的任何事件。这与使用 signal.signal 函数在 signal 模块中设置的信号处理器不同,因为 add_signal_handler 可以安全地与事件循环交互。这个函数接收我们想要监听的信号和一个在应用接收到该信号时调用的函数。为了演示这一点,让我们看看如何添加一个信号处理器,用于取消所有当前运行的任务。asyncio 有一个便利函数,名为 asyncio.all_tasks,它返回一个包含所有运行任务的集合。
import asyncio, signalfrom asyncio import AbstractEventLoopfrom typing import Setfrom util.delay_functions import delaydef cancel_tasks(): print('Got a SIGINT!') tasks: Set[asyncio.Task] = asyncio.all_tasks() print(f'Cancelling {len(tasks)} task(s).') [task.cancel() for task in tasks]async def main(): loop: AbstractEventLoop = asyncio.get_running_loop() loop.add_signal_handler(signal.SIGINT, cancel_tasks) await delay(10)asyncio.run(main())
当我们运行这个应用时,我们会看到延迟协程立即开始并等待 10 秒。如果我们在这 10 秒内按下 CTRL-C,我们应该会看到打印出 got a SIGINT!,然后是一条消息,说明我们正在取消任务。我们还应该看到从 asyncio.run(main()) 抛出的 CancelledError,因为我们已经取消了该任务。
在最初的问题陈述中,我们希望给回显服务器的 echo 任务几秒钟时间来继续运行,然后再关闭。一种方法是将所有 echo 任务包装在 wait_for 中,然后 await 这些包装后的任务。这些任务在超时后会抛出 TimeoutError,然后我们可以终止应用。
你可能会注意到,我们的关闭处理程序是一个普通的 Python 函数,因此我们不能在其中运行任何 await 语句。这对我们构成了一个问题,因为我们的提议解决方案涉及 await。一个可能的解决方案是创建一个协程来执行我们的关闭逻辑,然后在我们的关闭处理程序中将其包装成一个任务:
async def await_all_tasks(): tasks = asyncio.all_tasks() [await task for task in tasks]async def main(): loop = asyncio.get_event_loop() loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(await_all_tasks()))
这种方法可行,但缺点是,如果 await_all_tasks 中的某处抛出异常,我们会留下一个失败的孤儿任务,并出现“异常未被检索”的警告。那么,有更好的方法吗?
我们可以用抛出自定义异常来阻止主协程运行。然后,当我们运行主协程时,可以捕获该异常并运行任何关闭逻辑。为此,我们需要自己创建一个事件循环,而不是使用 asyncio.run。这是因为当发生异常时,asyncio.run 会取消所有运行的任务,这意味着我们无法将 echo 任务包装在 wait_for 中:
class GracefulExit(SystemExit): passdef shutdown(): raise GracefulExit()loop = asyncio.get_event_loop()loop.add_signal_handler(signal.SIGINT, shutdown)try: loop.run_until_complete(main())except GracefulExit: loop.run_until_complete(close_echo_tasks(echo_tasks))finally: loop.close()
async def close_echo_tasks(echo_tasks: List[asyncio.Task]): waiters = [asyncio.wait_for(task, 2) for task in echo_tasks] for task in waiters: try: await task except asyncio.exceptions.TimeoutError: # 我们期望这里抛出超时错误 pass
在 close_echo_tasks 中,我们接收一个 echo 任务列表,并将它们全部包装在一个 wait_for 任务中,超时时间为 2 秒。这意味着任何 echo 任务都有 2 秒的时间完成。完成后,我们遍历所有这些包装后的任务并 await 它们。我们捕获任何 TimeoutErrors,因为我们期望在 2 秒后从任务中抛出此错误。将所有这些部分组合起来,带有关闭逻辑的回显服务器如下所示。
import asynciofrom asyncio import AbstractEventLoopimport socketimport loggingimport signalfrom typing import Listasync def echo(connection: socket, loop: AbstractEventLoop) -> None: try: while data := await loop.sock_recv(connection, 1024): print('got data!') if data == b'boom\r\n': raise Exception("Unexpected network error") await loop.sock_sendall(connection, data) except Exception as ex: logging.exception(ex) finally: connection.close()echo_tasks = []async def connection_listener(server_socket, loop): while True: connection, address = await loop.sock_accept(server_socket) connection.setblocking(False) print(f"Got a connection from {address}") echo_task = asyncio.create_task(echo(connection, loop)) echo_tasks.append(echo_task)class GracefulExit(SystemExit): passdef shutdown(): raise GracefulExit()async def close_echo_tasks(echo_tasks: List[asyncio.Task]): waiters = [asyncio.wait_for(task, 2) for task in echo_tasks] for task in waiters: try: await task except asyncio.exceptions.TimeoutError: # 我们期望这里抛出超时错误 passasync def main(): server_socket = socket.socket() server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_address = ('127.0.0.1', 8000) server_socket.setblocking(False) server_socket.bind(server_address) server_socket.listen() for signame in {'SIGINT', 'SIGTERM'}: loop.add_signal_handler(getattr(signal, signame), shutdown) await connection_listener(server_socket, loop)loop = asyncio.new_event_loop()try: loop.run_until_complete(main())except GracefulExit: loop.run_until_complete(close_echo_tasks(echo_tasks))finally: loop.close()
假设至少有一个客户端连接,如果我们用 CTRL-C 停止这个应用,或者向我们的进程发出 kill 命令,我们的关闭逻辑就会执行。我们会看到应用等待 2 秒,同时允许 echo 任务有时间完成,然后再停止运行。
有几个原因使得这并不是一个生产级别的关闭。第一个是,我们在等待 echo 任务完成时没有关闭连接监听器。这意味着,在我们关闭的过程中,可能会有新的连接进来,然后我们就无法再添加 2 秒的关闭时间。另一个问题是,在我们的关闭逻辑中,我们 await 每个要关闭的 echo 任务,并且只捕获 TimeoutExceptions。这意味着,如果其中一个任务抛出了除这个以外的任何东西,我们会捕获该异常,并忽略任何后续任务可能发生的异常。在第4章中,我们将看到一些 asyncio 方法,可以更优雅地处理一组 awaitables 的失败。
尽管我们的应用并不完美,只是一个玩具示例,但我们已经用 asyncio 构建了一个功能齐全的服务器。这个服务器可以在单个线程中同时处理许多用户。相比之下,我们之前看到的阻塞方法需要转向多线程来处理多个客户端,这增加了复杂性和更高的资源利用率。
在本章中,我们学习了阻塞和非阻塞套接字,并更深入地探讨了 asyncio 事件循环的工作原理。我们还创建了第一个 asyncio 应用,一个高并发的回显服务器。我们探讨了如何处理任务中的错误,并为应用添加了自定义关闭逻辑。
- 我们学会了如何用阻塞套接字创建简单应用。阻塞套接字在等待数据时会阻塞整个线程。这阻止了我们实现并发,因为我们一次只能从一个客户端获取数据。
- 我们学会了如何用非阻塞套接字构建应用。这些套接字总是立即返回,要么带回数据(如果有),要么抛出一个异常,表示没有数据。这些套接字让我们实现了并发,因为它们的方法从不阻塞,会立即返回。
- 我们学会了如何使用
selectors 模块以高效的方式监听套接字上的事件。这个库让我们注册我们想跟踪的套接字,并在非阻塞套接字准备好数据时告诉我们。 - 如果我们将
select 放在一个无限循环中,我们就复制了 asyncio 事件循环的核心功能。我们注册我们感兴趣的套接字,并无限循环,一旦套接字有数据可供处理,就运行我们想要的任何代码。 - 我们学会了如何使用 asyncio 的事件循环方法来构建使用非阻塞套接字的应用。这些方法接收一个套接字并返回一个协程,我们可以将其用在
await 表达式中。这会挂起我们的父协程,直到套接字有数据。在幕后,这使用了 selectors 库。 - 我们看到了如何使用任务来实现 asyncio 回显服务器的并发,允许多个客户端同时发送和接收数据。我们还研究了如何处理这些任务中的错误。
- 我们学会了如何为 asyncio 应用添加自定义关闭逻辑。在我们的案例中,我们决定当服务器关闭时,给它几秒钟时间,让任何剩余的客户端完成发送数据。掌握了这些知识,我们就可以为应用在关闭时添加任何必要的逻辑。