当从零开始开发一个全新的 I/O 密集型应用时,asyncio 可能是一个自然的技术选择。从一开始就,你可以使用与 asyncio 兼容的非阻塞库,比如 asyncpg 和 aiohttp。然而,真正从零开始的项目在现实中是一种奢侈,大多数开发者的工作都是在维护已有代码的基础上进行的,而这些代码往往使用的是阻塞的 I/O 库,比如用于 HTTP 请求的 requests、用于 Postgres 数据库的 psycopg,或者其他各种阻塞库。我们还可能遇到这样的情况:目前还没有适合 asyncio 的库可用。那么,有没有办法在不放弃 asyncio API 的前提下,同时获得并发带来的性能提升呢?
答案是——多线程。因为阻塞的 I/O 操作会释放全局解释器锁(GIL),这使得我们可以在不同的线程中并发地执行 I/O 操作。就像 multiprocessing 库一样,asyncio 也提供了一种机制,让我们可以利用线程池,从而在继续使用 asyncio API(如 gather 和 wait)的同时,享受多线程带来的好处。
在本章中,我们将学习如何将多线程与 asyncio 结合,来在线程中运行像 requests 这样的阻塞型 API。此外,我们还将学习如何像上一章那样同步共享数据,并深入探讨更高级的锁机制,比如可重入锁和死锁。我们还会通过构建一个响应式的图形用户界面(GUI)来运行一个 HTTP 压力测试,来了解如何将 asyncio 与同步代码结合。最后,我们也会看看哪些情况下线程可以用于处理 CPU 密集型任务。
Python 通过 threading 模块让开发者能够创建和管理线程。这个模块暴露了 Thread 类,当你实例化它时,可以传入一个要在独立线程中运行的函数。虽然 Python 解释器在进程中是单线程的,这意味着即使有多个线程在运行,同一时间也只能有一个线程执行 Python 字节码,但全局解释器锁(GIL)只允许一个线程一次执行代码。
这看起来像是在限制我们使用多线程的优势,但实际上,在某些情况下,GIL 会被释放,其中最主要的情况就是进行 I/O 操作。因为底层实现上,Python 会调用操作系统的低级系统调用来执行 I/O,而这些系统调用是在解释器之外进行的,所以不需要运行任何 Python 字节码,此时 GIL 就可以被释放。
为了更好地理解如何在阻塞 I/O 的上下文中创建和运行线程,我们来回顾一下第 3 章中的回显服务器例子。还记得为了处理多个连接,我们需要把套接字切换到非阻塞模式,并使用 select 模块来监听套接字上的事件吗?如果我们在一个遗留代码库中,无法使用非阻塞套接字,该怎么办?我们还能构建一个能同时处理多个客户端的回显服务器吗?
由于套接字的 recv 和 sendall 方法是 I/O 密集型操作,它们会释放 GIL,因此我们可以并发地在不同线程中运行它们。这意味着,我们可以为每个已连接的客户端创建一个线程,然后在这个线程中读写数据。这种模式在像 Apache 这样的网络服务器中很常见,被称为 “每连接一个线程” 模型。
让我们尝试一下这个想法:在主线程中等待连接,一旦有客户端连接,就创建一个线程来为该客户端回显数据。
from threading import Threadimport socketdef echo(client: socket): while True: data = client.recv(2048) print(f'Received {data}, sending!') client.sendall(data)with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server: server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server.bind(('127.0.0.1', 8000)) server.listen() while True: connection, _ = server.accept() <span class="fm-combinumeral">❶</span> thread = Thread(target=echo, args=(connection,)) <span class="fm-combinumeral">❷</span> thread.start() <span class="fm-combinumeral">❸</span>
- ❷ 客户端连接后,创建一个线程来运行我们的
echo 函数。
在上面的代码中,我们进入了一个无限循环,监听服务器套接字上的连接。一旦有客户端连接,我们就创建一个新的线程来运行 echo 函数。我们给线程提供了 target 参数,即要运行的 echo 函数,以及 args,这是一个传递给 echo 函数的参数元组。这意味着我们会在线程中调用 echo(connection)。然后,我们启动线程并再次进入循环,等待下一个连接。与此同时,我们创建的线程会一直循环监听来自客户端的数据,收到后立即回传。
你应该能成功地同时连接任意数量的 telnet 客户端,并且消息能正常回显。由于每个 recv 和 sendall 都在各自的客户端线程中运行,这些操作永远不会互相阻塞;它们只会阻塞自己所在的线程。
这解决了阻塞套接字无法同时处理多个客户端的问题,尽管这种方法也有其独特的线程问题。如果你在有客户端连接的情况下按 Ctrl-C 来终止这个进程,你的应用程序能否干净地关闭所有创建的线程?
事实证明,事情并没有想象中那么干净。如果你强行终止应用,你会看到 KeyboardInterrupt 异常在 server.accept() 上抛出,但你的应用会卡住,因为后台线程仍在运行,阻止程序退出。此外,所有已连接的客户端仍然可以发送和接收消息!
不幸的是,Python 中用户创建的线程不会接收 KeyboardInterrupt 异常;只有主线程才会接收。这意味着我们的线程会继续运行,愉快地从客户端读取数据,阻止应用退出。
解决这个问题的方法有几个,具体来说,我们可以使用所谓的 守护线程(daemon threads),或者想出自己的方式来取消或“中断”正在运行的线程。守护线程是一种特殊的线程,适用于长时间运行的后台任务。它们不会阻止应用程序的关闭。事实上,当只剩下守护线程在运行时,应用程序会自动关闭。由于 Python 的主线程不是守护线程,这意味着如果我们把所有的连接线程都设为守护态,那么在收到 KeyboardInterrupt 时,我们的应用就会正常退出。将列表 7.1 的代码修改为使用守护线程非常简单,只需在调用 thread.start() 之前设置 thread.daemon = True 即可。一旦做了这个改动,你的应用就能在按 Ctrl-C 时正确退出。
不过,这样做有个问题:我们无法在线程停止时运行任何清理或关闭逻辑,因为守护线程会突然终止。假设我们在关闭时想要向每个客户端发送一条消息,告知服务器即将关闭。有没有办法让某个异常中断我们的线程,并干净地关闭套接字?如果你从主线程调用套接字的 shutdown 方法,任何正在进行的 recv 调用都会返回 0,而 sendall 会抛出异常。如果从主线程调用 shutdown,这将会中断那些正在阻塞 recv 或 sendall 调用的客户端线程。然后,我们可以在客户端线程中捕获这个异常,并执行任何想要的清理逻辑。
为此,我们将稍微改变创建线程的方式,通过继承 Thread 类本身。这将使我们能够定义自己的线程类,并添加一个 cancel 方法,该方法内部可以关闭客户端套接字。这样,我们对 recv 和 sendall 的调用就会被中断,从而可以退出 while 循环并关闭线程。
Thread 类有一个 run 方法,我们可以重写它。当我们继承 Thread 时,我们会用我们想要线程运行的代码来实现这个方法。在我们的例子中,就是 recv 和 sendall 的回显循环。
from threading import Threadimport socketclass ClientEchoThread(Thread): def __init__(self, client): super().__init__() self.client = client def run(self): try: while True: data = self.client.recv(2048) if not data: <span class="fm-combinumeral">❶</span> raise BrokenPipeError('Connection closed!') print(f'Received {data}, sending!') self.client.sendall(data) except OSError as e: <span class="fm-combinumeral">❷</span> print(f'Thread interrupted by {e} exception, shutting down!') def close(self): if self.is_alive(): <span class="fm-combinumeral">❸</span> self.client.sendall(bytes('Shutting down!', encoding='utf-8')) self.client.shutdown(socket.SHUT_RDWR) <span class="fm-combinumeral">❹</span>with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server: server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server.bind(('127.0.0.1', 8000)) server.listen() connection_threads = [] try: while True: connection, addr = server.accept() thread = ClientEchoThread(connection) connection_threads.append(thread) thread.start() except KeyboardInterrupt: print('Shutting down!') [thread.close() for thread in connection_threads] <span class="fm-combinumeral">❺</span>
- ❶ 如果没有数据,抛出异常。这发生在客户端关闭连接或连接被主动关闭时。
- ❷ 捕获异常后,退出
run 方法。这会终止线程。 - ❸ 如果线程处于活跃状态,则关闭连接;如果客户端已经关闭连接,线程可能不再活跃。
- ❺ 在键盘中断时,调用线程的
close 方法,关闭每个客户端连接。
我们首先创建了一个新的类 ClientEchoThread,它继承自 Thread。这个类重写了 run 方法,包含了我们原始 echo 函数的代码,但做了一些修改。首先,我们用 try/except 块包裹了所有内容,并拦截 OSError 异常。这类异常通常由 sendall 等方法在关闭客户端套接字时抛出。我们还检查了 recv 返回的数据是否为 0。这种情况有两种可能:一是客户端关闭了连接(比如有人退出了 telnet),二是我们自己主动关闭了客户端连接。在这种情况下,我们手动抛出一个 BrokenPipeError(它是 OSError 的子类),执行 except 块中的 print 语句,并退出 run 方法,从而关闭线程。
我们还在 ClientEchoThread 类上定义了一个 close 方法。这个方法首先检查线程是否处于活跃状态,然后再关闭客户端连接。什么叫“线程是活跃的”?为什么我们需要做这个检查?当线程的 run 方法正在执行时,该线程就是活跃的;在我们的情况下,只要 run 方法没有抛出任何异常,这个条件就成立。我们需要这个检查,是因为客户端本身可能已经关闭了连接,导致 run 方法中抛出了 BrokenPipeError,而我们调用 close 之前。这意味着调用 sendall 会引发异常,因为连接已经无效了。
最后,在我们的主循环中,也就是监听新连接的地方,我们捕获了 KeyboardInterrupt 异常。一旦捕获到,我们就调用每个已创建线程的 close 方法。这会向客户端发送一条消息(前提是连接仍然有效),并关闭连接。
总的来说,取消正在运行的线程在 Python 中,以及在一般情况下,都是一个棘手的问题,取决于你试图处理的具体关闭场景。你需要特别注意,确保你的线程不会阻塞应用的退出,并找出合适的位置来插入中断点,以便退出线程。
我们现在已经看到了几种手动管理线程的方法,比如创建一个带有 target 函数的线程对象,或者继承 Thread 类并重写 run 方法。现在我们已经掌握了线程的基础知识,接下来就看看如何将它们与 asyncio 结合,来处理流行的阻塞库。