在前面的例子中,我们手动创建了进程,并调用了它们的 start 和 join 方法来运行和等待它们。我们指出了这种方法的几个问题,从代码质量到无法访问进程返回的结果。multiprocessing 模块提供了一个名为“进程池”的 API 来解决这个问题。
进程池的概念类似于我们在第 5 章看到的连接池。不同之处在于,这里不是一组数据库连接,而是创建一组 Python 进程,我们可以用来并行运行函数。当我们有一个需要在进程中运行的计算密集型函数时,我们可以直接要求池来为我们运行它。在后台,这将在一个可用的进程中执行此函数,运行它并返回该函数的返回值。为了看看进程池是如何工作的,让我们创建一个简单的进程池,并用它运行几个“你好世界”风格的函数。
from multiprocessing import Pooldef say_hello(name: str) -> str: return f'嗨,{name}'if __name__ == "__main__": with Pool() as process_pool: # ❶ hi_jeff = process_pool.apply(say_hello, args=('Jeff',)) # ❷ hi_john = process_pool.apply(say_hello, args=('John',)) print(hi_jeff) print(hi_john)
- ❷ 在一个独立的进程中运行
say_hello 函数,并获取结果。
在上面的代码中,我们使用 with Pool()as process_pool 创建了一个进程池。这是一个上下文管理器,因为一旦我们完成对池的操作,就需要适当地关闭我们创建的 Python 进程。如果不这样做,就有泄漏进程的风险,可能导致资源利用率问题。当我们实例化这个池时,它会自动创建等于我们运行机器上 CPU 核心数的 Python 进程。你可以在 Python 中通过运行 multiprocessing.cpu_count() 函数来确定你的 CPU 核心数。当你调用 Pool() 时,可以将 processes 参数设置为你想要的任意整数。默认值通常是一个不错的起点。
接下来,我们使用进程池的 apply 方法在独立的进程中运行我们的 say_hello 函数。这个方法看起来和我们之前使用 Process 类时相似,其中我们传递了一个目标函数和一个参数元组。不同之处在于,我们不需要自己启动进程或调用 join。我们还能得到函数的返回值,这在之前的例子中是做不到的。运行这段代码,你应该会看到以下输出:
这管用,但有个问题。apply 方法会阻塞,直到我们的函数完成。这意味着,如果每次调用 say_hello 都需要 10 秒,那么我们整个程序的运行时间将会是大约 20 秒,因为我们是顺序运行的,这完全抵消了并行运行的意义。我们可以用进程池的 apply_async 方法来解决这个问题。
在前面的例子中,每次调用 apply 都会阻塞直到我们的函数完成。如果我们要构建一个真正的并行工作流,这行不通。为了绕过这个问题,我们可以改用 apply_async 方法。此方法会立即返回一个 AsyncResult,并在后台启动进程。一旦我们有了 AsyncResult,就可以使用其 get 方法来阻塞并获取函数调用的结果。让我们把我们的 say_hello 例子改编一下,使用异步结果。
from multiprocessing import Pooldef say_hello(name: str) -> str: return f'嗨,{name}'if __name__ == "__main__": with Pool() as process_pool: hi_jeff = process_pool.apply_async(say_hello, args=('Jeff',)) hi_john = process_pool.apply_async(say_hello, args=('John',)) print(hi_jeff.get()) print(hi_john.get())
当我们调用 apply_async 时,我们的两个 say_hello 调用会立即在不同的进程中启动。然后,当我们调用 get 方法时,我们的父进程会阻塞,直到每个进程返回一个值。这使得事情可以并发运行,但如果 hi_jeff 花了 10 秒,而 hi_john 只花了 1 秒呢?在这种情况下,由于我们首先调用 get 在 hi_jeff,我们的程序会阻塞 10 秒,才能打印出 hi_john 消息,尽管我们 1 秒后就已经准备好了。如果我们希望一完成就响应,那就遇到了问题。我们真正想要的,是像 asyncio 里的 as_completed 这样。接下来,我们来看看如何将进程池执行器与 asyncio 结合使用,以解决这个问题。