- 使用 async 和 await 管理计算密集型工作
- 使用 MapReduce 解决一个计算密集型问题,结合 asyncio
直到现在,我们一直关注的是使用 asyncio 并发执行 I/O 密集型任务所能获得的性能提升。运行 I/O 密集型任务是 asyncio 的拿手好戏,而到目前为止我们写的代码,必须小心不要在协程中运行任何计算密集型代码。这看起来似乎严重限制了 asyncio,但这个库的功能远不止处理 I/O 密集型任务。
asyncio 提供了一个与 Python multiprocessing 库互操作的 API。这让我们可以使用 async await 语法以及 asyncio API 来处理多个进程。通过这种方式,即使在使用计算密集型代码时,我们也能享受到 asyncio 库带来的好处。这使我们能够为计算密集型任务(如数学运算或数据处理)实现性能提升,从而绕过全局解释器锁(GIL),充分利用多核机器的全部性能。
在本章中,我们将首先学习 multiprocessing 模块,熟悉多进程执行的概念。然后,我们会了解“进程池执行器”及其如何与 asyncio 集成。接着,我们会利用这些知识,使用 MapReduce 解决一个计算密集型问题。我们还会学习如何管理多个进程之间的共享状态,并引入锁的概念以避免并发错误。最后,我们将回顾如何使用 multiprocessing 来提升一个既包含 I/O 又包含计算密集型操作的应用程序的性能,就像我们在第 5 章看到的那样。
在第 1 章中,我们介绍了全局解释器锁(GIL)。GIL 防止超过一个 Python 字节码片段并行运行。这意味着对于除 I/O 密集型任务以外的任何任务,除了少数例外情况,使用多线程并不会带来性能提升,这与 Java 或 C++ 这样的语言不同。看起来我们似乎被卡住了,无法解决 Python 中可并行化的计算密集型任务,但这就是 multiprocessing 库提供的解决方案。
与其让父进程创建线程来并行化任务,不如我们直接创建子进程来处理工作。每个子进程都有自己的 Python 解释器,并且受 GIL 限制,但不再是只有一个解释器,而是有多个,每个都有自己独立的 GIL。假设我们运行在具有多个 CPU 核心的机器上,这就意味着我们可以有效地并行化任何计算密集型负载。即使我们拥有的进程数超过了核心数,操作系统也会使用抢占式多任务调度,让我们的多个任务并发运行。这种设置既是并发的,也是并行的。
要开始使用 multiprocessing 库,我们先来并行运行几个函数。我们将使用一个非常简单的计算密集型函数,从零计数到一个很大的数字,以研究 API 的工作方式以及性能优势。
列表 6.1 使用 multiprocessing 的两个并行进程
import timefrom multiprocessing import Processdef count(count_to: int) -> int: start = time.time() counter = 0 while counter < count_to: counter = counter + 1 end = time.time() print(f'完成计数到 {count_to},耗时 {end-start}') return counterif __name__ == "__main__": start_time = time.time() to_one_hundred_million = Process(target=count, args=(100000000,)) # ❶ to_two_hundred_million = Process(target=count, args=(200000000,)) to_one_hundred_million.start() # ❷ to_two_hundred_million.start() to_one_hundred_million.join() # ❸ to_two_hundred_million.join() end_time = time.time() print(f'完成耗时 {end_time-start_time}')
在上面的代码中,我们创建了一个简单的计数函数,它接收一个整数并逐个循环直到计数到传入的整数。然后我们创建两个进程,一个用于计数到 1 亿,另一个用于计数到 2 亿。Process 类接受两个参数:target 是我们希望在进程中运行的函数名,args 表示传递给该函数的参数元组。然后我们调用每个进程的 start 方法。此方法会立即返回,并开始运行进程。在这个例子中,我们依次启动了两个进程。然后我们调用每个进程的 join 方法。这将导致主进程阻塞,直到每个进程都完成。如果没有这个步骤,我们的程序几乎会立刻退出,终止子进程,因为没有任何东西在等待它们完成。列表 6.1 并发运行了两个计数函数;假设我们运行在至少有两个 CPU 核心的机器上,我们应该能看到性能提升。当这段代码在一台 2.5 GHz 的 8 核机器上运行时,我们得到以下结果:
完成计数到 100000000,耗时 5.3844完成计数到 200000000,耗时 10.6265完成耗时 10.8586
总计数函数耗时略超 16 秒,但应用程序仅耗时不到 11 秒。相比于串行运行,我们节省了大约 5 秒的时间。当然,你运行这段代码时的结果会因机器的不同而有很大差异,但你应该能看到大致类似的结果。
注意,我们在应用中加入了 if __name__ == "__main__":,这是我们以前没有加过的。这是 multiprocessing 库的一个特性;如果你不加这个,可能会收到如下错误:An attempt has been made to start a new process before the current process has finished its bootstrapping phase。发生这种情况的原因是为了防止导入你代码的人意外启动多个进程。
这已经带来了不错的性能提升;然而,这种方式很别扭,因为我们必须为每个启动的进程调用 start 和 join。我们也不知道哪个进程会先完成;如果我们想做类似 asyncio.as_completed 的事,一等结果就处理,那可就无能为力了。join 方法也不会返回目标函数的返回值;事实上,目前还没有办法在不使用共享进程间内存的情况下获取函数的返回值!
这个 API 对于简单的情况是可行的,但显然当我们想要获取返回值或者想一拿到结果就处理时,它就不适用了。幸运的是,进程池提供了一种解决这个问题的方法。