之前我们看到,当我们直接调用一个协程时,我们并没有把它放到事件循环上运行。相反,我们得到了一个协程对象,然后需要要么用 await 关键字处理它,要么把它传给 asyncio.run 来运行并获取一个值。仅凭这些工具,我们可以写出异步代码,但无法实现任何并发。要并发运行协程,我们需要引入任务。
任务是围绕协程的包装器,它会尽快安排协程在事件循环上运行。这种调度和执行是以非阻塞的方式进行的,意味着一旦我们创建了一个任务,就可以立即执行其他代码,而任务仍在运行。这与使用 await 关键字的行为形成鲜明对比,await 是阻塞式的,意味着我们暂停整个协程,直到 await 表达式的结果返回。
正因为我们可以创建任务并立即将其安排在事件循环上运行,所以我们可以几乎同时执行多个任务。当这些任务包装着长时间运行的操作时,它们的等待时间会并发发生。为了说明这一点,让我们创建两个任务并尝试同时运行它们。
通过使用 asyncio.create_task 函数来创建任务。当我们调用这个函数时,我们传入一个协程来运行,它会立即返回一个任务对象。一旦我们有了任务对象,就可以将其放入 await 表达式,一旦完成,就能提取返回值。
import asynciofrom util import delayasync def main(): sleep_for_three = asyncio.create_task(delay(3)) print(type(sleep_for_three)) result = await sleep_for_three print(result)asyncio.run(main())
在上面的代码中,我们创建了一个需要 3 秒才能完成的任务。我们还打印出了任务的类型,即 <class '_asyncio.Task'>,以表明它与协程不同。
另一个需要注意的地方是,我们的打印语句是在任务启动后立即执行的。如果我们只是对 delay 协程使用 await,我们会在输出消息前等待 3 秒。
一旦我们打印完消息,就对任务 sleep_for_three 应用了 await 表达式。这会暂停我们的 main 协程,直到我们从任务中获得结果。
非常重要的一点是,我们通常需要在应用程序的某个地方对任务使用 await。在第 2.8 个示例中,如果我们没有使用 await,任务会被安排运行,但几乎会立即被停止并“清理”,因为 asyncio.run 关闭了事件循环。在应用程序中对任务使用 await 也会影响异常的处理方式,这将在第 3 章中讨论。现在我们已经看到了如何创建任务并允许其他代码并发运行,我们可以学习如何同时运行多个长时间运行的操作了。
既然任务是立即创建并尽快安排运行的,这就允许我们并发运行许多长时间运行的任务。我们可以通过连续启动多个使用长时间运行协程的任务来实现这一点。
import asynciofrom util import delayasync def main(): sleep_for_three = asyncio.create_task(delay(3)) sleep_again = asyncio.create_task(delay(3)) sleep_once_more = asyncio.create_task(delay(3)) await sleep_for_three await sleep_again await sleep_once_moreasyncio.run(main())
在上面的示例中,我们启动了三个任务,每个都耗时 3 秒。每次调用 create_task 都会立即返回,因此我们很快就到达了 await sleep_for_three 语句。之前我们提到过,任务是“尽可能快”地安排运行的。通常这意味着,第一次遇到 await 语句后,任何待处理的任务都会运行,因为 await 触发了事件循环的一次迭代。
由于我们遇到了 await sleep_for_three,所有三个任务都会开始运行,并并发执行任何睡眠操作。这意味着第 2.9 个示例中的程序大约需要 3 秒就能完成。我们可以将并发性可视化,如图 2.3 所示,注意到所有三个任务都在同一时间运行它们的睡眠协程。
请注意,在图 2.3 中,标有 RUN delay(3) 的任务中的代码(例如一些打印语句)并不会与其他任务并发运行;只有睡眠协程是并发运行的。如果我们按顺序运行这些延迟操作,应用程序的运行时间将超过 9 秒。通过这种方式并发运行,我们把应用的总运行时间缩短了三倍!
注意 随着我们添加更多的任务,这种优势会进一步放大;如果我们启动了 10 个这样的任务,仍然只需要大约 3 秒,性能提升了十倍。
执行这些长时间运行的操作并发运行是 asyncio 真正闪耀的地方,它能极大地提升我们应用程序的性能,但这并不是全部。在第 2.9 个示例中,我们的应用程序在等待 3 秒完成 delay 协程时处于空闲状态。在我们的代码等待时,我们可以执行其他代码。举个例子,假设我们想在运行一些长时间任务时,每秒打印一条状态消息。
import asynciofrom util import delayasync def hello_every_second(): for i in range(2): await asyncio.sleep(1) print("I'm running other code while I'm waiting!")async def main(): first_delay = asyncio.create_task(delay(3)) second_delay = asyncio.create_task(delay(3)) await hello_every_second() await first_delay await second_delay
在上述代码中,我们创建了两个任务,每个任务都需要 3 秒才能完成。在这些任务等待时,我们的应用程序是空闲的,这给了我们运行其他代码的机会。在这种情况下,我们运行了一个协程 hello_every_second,它每秒打印一次消息,共打印两次。在我们的两个任务运行时,我们会看到消息被输出,结果如下:
sleeping for 3 second(s)sleeping for 3 second(s)I'm running other code while I'm waiting!I'm running other code while I'm waiting!finished sleeping for 3 second(s)finished sleeping for 3 second(s)
首先,我们启动了两个睡 3 秒的任务;然后,当这两个任务处于空闲状态时,我们开始看到 I'm running other code while I'm waiting! 每秒被打印一次。这意味着即使在运行耗时操作时,我们的应用程序仍然可以执行其他任务。
任务的一个潜在问题是,它们可能需要无限长的时间才能完成。我们可能会发现自己想在任务花费太长时间时停止它。任务支持这种用例,通过取消功能。