我们现在知道了如何创建和管理多个线程来处理阻塞工作。这种方法的缺点是,我们必须逐个创建并跟踪线程。我们希望能够在不手动管理它们的情况下,使用我们学过的所有 asyncio 基础 API 来等待线程的结果。就像第 6 章中的进程池一样,我们可以使用 线程池 来以这种方式管理线程。在本节中,我们将介绍一个流行的阻塞式 HTTP 客户端库,并展示如何使用线程与 asyncio 并发地运行网络请求。
requests 库是 Python 中一个流行的 HTTP 客户端库,自称为“人类的 HTTP”。你可以在 https://requests.readthedocs.io/en/master/(https://requests.readthedocs.io/en/master/) 查看最新的文档。使用它,你可以像之前用 aiohttp 一样向网络服务器发出 HTTP 请求。我们将使用最新版本(截至本文写作时,版本为 2.24.0)。你可以通过运行以下 pip 命令来安装这个库:
pip install -Iv requests==2.24.0
安装好库后,我们就可以开始进行基本的 HTTP 请求了。让我们先向 example.com 发送几个请求,获取状态码,就像我们之前用 aiohttp 做的一样。
import requestsdef get_status_code(url: str) -> int: response = requests.get(url) return response.status_codeurl = 'https:// www .example .com'print(get_status_code(url))print(get_status_code(url))
上述代码依次执行了两次 HTTP GET 请求。运行后,你应该能看到两个 200 的输出。我们这里没有创建 HTTP 会话,就像我们在 aiohttp 中做的那样,但该库支持在需要保持跨请求的持久化 cookie 时使用会话。
requests 库是阻塞的,这意味着每次调用 requests.get 都会阻止当前线程执行其他 Python 代码,直到请求完成。这对如何在 asyncio 中使用这个库有影响。如果你尝试单独在一个协程或任务中使用这个库,它会阻塞整个事件循环,直到请求完成。如果一个请求耗时 2 秒,你的应用在这 2 秒内什么都做不了。为了正确地使用这个库与 asyncio,我们必须将这些阻塞操作放在一个 线程 中运行。
与进程池执行器类似,concurrent.futures 库提供了 Executor 抽象类的一个实现,用于处理线程,名为 ThreadPoolExecutor。与进程池维护一组工作进程不同,线程池执行器会创建并维护一组线程,我们可以向其中提交工作。
虽然进程池默认会为机器上每个可用的 CPU 核心创建一个工作进程,但确定要创建多少个工作线程则更复杂一些。内部公式为 min(32, os.cpu_count() + 4)。这导致工作线程的最大上限为 32,最小下限为 5。上限设为 32 是为了避免在拥有大量 CPU 核心的机器上创建过多线程(记住,线程是资源密集型的)。下限设为 5 是因为在 1-2 核心的小型机器上,仅启动少数几个线程不太可能显著提升性能。对于 I/O 密集型任务,通常创建比可用核心数稍多的线程是有意义的。例如,在一个 8 核机器上,上述公式意味着我们将创建 12 个线程。虽然只有 8 个线程可以并发运行,但我们还可以有其他线程暂停,等待 I/O 完成,让操作系统在 I/O 完成时恢复它们。
让我们将列表 7.3 的例子改造成使用线程池并发运行 1000 次 HTTP 请求。我们将对结果进行计时,以了解其带来的性能提升。
import timeimport requestsfrom concurrent.futures import ThreadPoolExecutordef get_status_code(url: str) -> int: response = requests.get(url) return response.status_codestart = time.time()with ThreadPoolExecutor() as pool: urls = ['https:// www .example .com' for _ in range(1000)] results = pool.map(get_status_code, urls) for result in results: print(result)end = time.time()print(f'finished requests in {end - start:.4f} second(s)')
在一个 8 核、网速快的机器上,这段代码使用默认的线程数可以在 8-9 秒内完成。很容易写出同步版本来理解线程带来的影响,如下所示:
start = time.time()urls = ['https:// www .example .com' for _ in range(1000)]for url in urls: print(get_status_code(url))end = time.time()print(f'finished requests in {end - start:.4f} second(s)')
运行这段代码可能需要超过 100 秒!这使得我们的多线程代码比同步代码快了 10 倍以上,性能提升相当明显。
虽然这显然是一次改进,但你可能会记得第 4 章中关于 aiohttp 时,我们能在不到 1 秒内并发完成 1000 次请求。为什么这次慢这么多?记住,我们工作线程的最大数量被限制在 32(即 CPU 核心数加 4),这意味着默认情况下我们最多只能并发运行 32 次请求。我们可以尝试通过传入 max_workers=1000 来绕过这个问题,如下所示:
with ThreadPoolExecutor(max_workers=1000) as pool: urls = ['https:// www .example .com' for _ in range(1000)] results = pool.map(get_status_code, urls) for result in results: print(result)
这种方法可以带来一些改进,因为我们现在为每次请求都有一个线程。然而,这仍然无法接近我们基于协程的代码的速度。这是因为线程存在资源开销。线程是在操作系统级别创建的,创建成本远高于协程。此外,线程在操作系统层面还有上下文切换的开销。在上下文切换时保存和恢复线程状态会消耗掉一部分使用线程所获得的性能增益。
在确定特定问题应使用多少线程时,最好从小规模开始(通常以可用的 CPU 核心数加几个作为起点),进行测试和基准测试,然后逐步增加线程数量。你通常会发现一个“甜蜜点”,之后运行时间会趋于平稳,甚至可能随着线程数量的增加而下降。这个甜蜜点通常相对于你想要发起的请求数量来说是一个相对较低的数字(明确一点,为 1000 次请求创建 1000 个线程可能并不是资源的最佳利用方式)。
7.2.3 与 asyncio 结合使用的线程池执行器
使用 asyncio 事件循环与线程池执行器并没有比使用 ProcessPoolExecutors 复杂多少。这就是拥有抽象的 Executor 基类的美妙之处,我们只需要更改一行代码,就能用同样的代码来运行线程或进程。让我们将运行 1000 次 HTTP 请求的例子改编为使用 asyncio.gather 而不是 pool.map。
import functoolsimport requestsimport asynciofrom concurrent.futures import ThreadPoolExecutorfrom util import async_timeddef get_status_code(url: str) -> int: response = requests.get(url) return response.status_code@async_timed()async def main(): loop = asyncio.get_running_loop() with ThreadPoolExecutor() as pool: urls = ['https:// www .example .com' for _ in range(1000)] tasks = [loop.run_in_executor(pool, functools.partial(get_status_code, url)) for url in urls] results = await asyncio.gather(*tasks) print(results)asyncio.run(main())
我们像之前一样创建线程池,但不再使用 map,而是通过调用 get_status_code 函数并使用 loop.run_in_executor 来创建一个任务列表。有了任务列表后,我们可以用 asyncio.gather 或我们之前学到的其他 asyncio API 来等待它们完成。
内部,loop.run_in_executor 会调用线程池执行器的 submit 方法。这会将我们传递的每个函数放入队列中。池中的工作线程会从队列中拉取任务,运行每个工作项直到完成。这种方法在不使用 asyncio 的情况下并不会带来性能优势,但在我们等待 await asyncio.gather 完成时,其他代码可以运行。
阅读 asyncio 文档时,你可能会注意到 run_in_executor 方法的 executor 参数可以是 None。在这种情况下,run_in_executor 会使用事件循环的 默认执行器。什么是默认执行器?可以把它看作是整个应用程序的可复用的单例执行器。默认执行器始终默认为 ThreadPoolExecutor,除非我们用 loop.set_default_executor 方法设置了自定义执行器。这意味着我们可以简化列表 7.5 的代码,如下所示。
import functoolsimport requestsimport asynciofrom util import async_timeddef get_status_code(url: str) -> int: response = requests.get(url) return response.status_code@async_timed()async def main(): loop = asyncio.get_running_loop() urls = ['https:// www .example .com' for _ in range(1000)] tasks = [loop.run_in_executor(None, functools.partial(get_status_code, url)) for url in urls] results = await asyncio.gather(*tasks) print(results)asyncio.run(main())
在上面的代码中,我们省略了创建自己的 ThreadPoolExecutor 并在上下文管理器中使用它的步骤,而是直接传入 None 作为执行器。第一次调用 run_in_executor 时,asyncio 会为我们创建并缓存一个默认的线程池执行器。后续每次调用 run_in_executor 都会复用之前创建的默认执行器,这意味着执行器对事件循环来说是全局的。这个池的关闭方式也与之前不同。之前,我们创建的线程池执行器在退出上下文管理器的 with 块时会关闭。而使用默认执行器时,它不会关闭,直到事件循环关闭,这通常发生在我们的应用结束时。在需要使用线程时使用默认线程池执行器简化了事情,但我们可以做得更简单吗?
在 Python 3.9 中,引入了 asyncio.to_thread 协程,进一步简化了将工作提交到默认线程池执行器的过程。它接受一个要在线程中运行的函数和一组要传递给该函数的参数。以前,我们需要使用 functools.partial 来传递参数,这使得代码稍微复杂一些。现在,to_thread 协程会在线程池执行器中运行函数及其参数,并且当前运行的事件循环。这让我们可以进一步简化线程代码。使用 to_thread 协程可以省去 functools.partial 和 asyncio.get_running_loop 的调用,大大减少了代码行数。
import requestsimport asynciofrom util import async_timeddef get_status_code(url: str) -> int: response = requests.get(url) return response.status_code@async_timed()async def main(): urls = ['https:// www .example .com' for _ in range(1000)] tasks = [asyncio.to_thread(get_status_code, url) for url in urls] results = await asyncio.gather(*tasks) print(results)asyncio.run(main())
到目前为止,我们只看到了如何在线程中运行阻塞代码。将线程与 asyncio 结合的强大之处在于,我们可以在等待线程完成的同时运行其他代码。为了看看如何在运行线程的同时运行其他代码,我们回到第 6 章的例子,即定期输出一个长时间运行任务的状态。