当我们看到并发运行一些长时间任务能带来的性能提升时,我们可能会忍不住想在应用程序的各个地方都使用协程和任务。虽然这取决于你正在编写的程序,但仅仅将函数标记为 async 并将它们包装在任务中,可能并不会提高应用程序的性能。在某些情况下,这甚至可能降低应用程序的性能。
在尝试将应用程序变为异步时,会出现两个主要错误。第一个是尝试在任务或协程中运行计算密集型代码,而没有使用多进程;第二个是没有使用多线程的情况下使用阻塞的 I/O 密集型 API。
我们可能有一些执行计算密集型操作的函数,比如遍历一个大字典或进行数学计算。当有多个这样的函数,且有可能并发运行时,我们可能会想到在单独的任务中运行它们。从概念上讲,这是一个好主意,但请记住,asyncio 采用单线程并发模型。这意味着我们仍然受到单线程和全局解释器锁的限制。
为了证明这一点,让我们尝试并发运行一些计算密集型函数。
import asynciofrom util import delay@async_timed()async def cpu_bound_work() -> int: counter = 0 for i in range(100000000): counter = counter + 1 return counter@async_timed()async def main(): task_one = asyncio.create_task(cpu_bound_work()) task_two = asyncio.create_task(cpu_bound_work()) await task_one await task_twoasyncio.run(main())
当我们运行上面的示例时,我们会看到,尽管创建了两个任务,但我们的代码仍然按顺序执行。首先,我们运行任务 1,然后运行任务 2,这意味着我们的总运行时间将是两次 cpu_bound_work 调用时间的总和:
starting <function main at 0x10a8f6c10> with args () {}starting <function cpu_bound_work at 0x10a8c0430> with args () {}finished <function cpu_bound_work at 0x10a8c0430> in 4.6750 second(s)starting <function cpu_bound_work at 0x10a8c0430> with args () {}finished <function cpu_bound_work at 0x10a8c0430> in 4.6680 second(s)finished <function main at 0x10a8f6c10> in 9.3434 second(s)
从上面的输出来看,我们可能会误以为使用 async 和 await 没有任何缺点。毕竟,它最终花费的时间与顺序执行相同。然而,这样做我们可能会遇到性能下降的情况。特别是当有其他协程或任务包含 await 表达式时。考虑创建两个计算密集型任务,同时还有一个长时间运行的任务,比如我们的 delay 协程。
import asynciofrom util import async_timed, delay@async_timed()async def cpu_bound_work() -> int: counter = 0 for i in range(100000000): counter = counter + 1 return counter@async_timed()async def main(): task_one = asyncio.create_task(cpu_bound_work()) task_two = asyncio.create_task(cpu_bound_work()) delay_task = asyncio.create_task(delay(4)) await task_one await task_two await delay_taskasyncio.run(main())
运行上面的示例,我们可能会期望其运行时间与示例 2.18 相同。毕竟,delay_task 不是应该与计算密集型工作并发运行吗?在这种情况下不会,因为我们首先创建了两个计算密集型任务,这实际上阻止了事件循环运行其他任何东西。这意味着我们应用程序的运行时间将是两个 cpu_bound_work 任务完成时间之和加上 delay 任务的 4 秒。
如果我们需要执行计算密集型工作,但仍想使用 async / await 语法,我们可以做到。为此,我们仍然需要使用多进程,并告诉 asyncio 在一个进程池中运行我们的任务。我们将在第 6 章中学习如何做到这一点。
我们还可能想通过将现有的 I/O 密集型库包装在协程中来使用它们。然而,这会产生与计算密集型操作相同的问题。这些 API 会阻塞 main 线程。因此,当我们在一个协程中运行阻塞的 API 调用时,我们实际上是在阻塞事件循环线程,这意味着我们停止了任何其他协程或任务的执行。阻塞的 API 调用的例子包括 requests 库或 time.sleep。一般来说,任何执行 I/O 但不是协程或执行耗时的 CPU 操作的函数都可以被视为阻塞的。
作为一个例子,让我们尝试使用 requests 库并发地获取 www.example.com 的状态码三次,看看运行时会发生什么。我们期望这个应用程序的运行时间大约等于获取一次状态码所需的时间。
import asyncioimport requestsfrom util import async_timed@async_timed()async def get_example_status() -> int: return requests.get('http://www.example.com').status_code@async_timed()async def main(): task_1 = asyncio.create_task(get_example_status()) task_2 = asyncio.create_task(get_example_status()) task_3 = asyncio.create_task(get_example_status()) await task_1 await task_2 await task_3asyncio.run(main())
当我们运行上面的示例时,我们会看到类似以下的输出。请注意,main 协程的总运行时间大致是所有任务获取状态码所需时间的总和,这意味着我们并没有获得任何并发优势:
starting <function main at 0x1102e6820> with args () {}starting <function get_example_status at 0x1102e6700> with args () {}finished <function get_example_status at 0x1102e6700> in 0.0839 second(s)starting <function get_example_status at 0x1102e6700> with args () {}finished <function get_example_status at 0x1102e6700> in 0.0441 second(s)starting <function get_example_status at 0x1102e6700> with args () {}finished <function get_example_status at 0x1102e6700> in 0.0419 second(s)finished <function main at 0x1102e6820> in 0.1702 second(s)
这再次是因为 requests 库是阻塞的,这意味着它会阻塞其运行的任何线程。由于 asyncio 只有一个线程,requests 库会阻塞事件循环,使其无法并发执行任何操作。
总的来说,你现在使用的大多数 API 都是阻塞的,无法开箱即用地与 asyncio 一起使用。你需要使用支持协程并利用非阻塞套接字的库。这意味着,如果你使用的库不返回协程,而你又不在自己的协程中使用 await,那么你很可能正在进行一个阻塞调用。
在上面的例子中,我们可以使用像 aiohttp 这样的库,它使用非阻塞套接字并返回协程,以获得正确的并发性。我们将在第 4 章稍后介绍这个库。
如果你想使用 requests 库,你仍然可以使用 async 语法,但你需要明确告诉 asyncio 使用多线程,通过一个线程池执行器。我们将在第 7 章中学习如何做到这一点。
我们现在已经看到了使用 asyncio 时需要注意的一些事项,并构建了一些简单的应用程序。到目前为止,我们还没有手动创建或配置事件循环,而是依赖于便捷的方法来为我们完成。接下来,我们将学习如何创建事件循环,这将使我们能够访问底层的 asyncio 功能和事件循环配置属性。