网络连接可能不可靠。用户的连接可能因网络减速而中断,或者网页服务器可能崩溃,导致现有请求陷入困境。在发起这类请求时,我们必须特别小心,不要无限期地等待。这样做可能导致我们的应用程序挂起,永远等待一个可能永远不会到来的结果。这也会导致糟糕的用户体验;如果允许用户发出一个耗时过长的请求,他们不太可能无限期地等待响应。此外,我们可能还想让用户选择是否继续运行一个任务。用户可能主动决定事情耗时太长,或者他们可能想取消自己错误发起的任务。
在我们之前的示例中,如果我们的任务永远运行下去,我们将被困在 await 语句上,没有任何反馈。而且,我们也没有办法在想要时停止它们。asyncio 通过支持任务取消和设置超时,解决了这两种情况。
取消一个任务非常简单。每个任务对象都有一个名为 cancel 的方法,我们可以随时调用它来停止一个任务。取消一个任务会导致该任务在我们 await 它时抛出一个 CancelledError,我们可以根据需要来处理这个异常。
为了说明这一点,假设我们启动了一个长时间运行的任务,但我们不想让它运行超过 5 秒。如果任务在 5 秒内未完成,我们希望停止它,并向用户报告它耗时过长,我们已停止它。我们还希望每秒打印一次状态更新,以向用户提供最新信息,这样他们就不会在几秒钟内无任何信息。
import asynciofrom asyncio import CancelledErrorfrom util import delayasync def main(): long_task = asyncio.create_task(delay(10)) seconds_elapsed = 0 while not long_task.done(): print('Task not finished, checking again in a second.') await asyncio.sleep(1) seconds_elapsed = seconds_elapsed + 1 if seconds_elapsed == 5: long_task.cancel() try: await long_task except CancelledError: print('Our task was cancelled')asyncio.run(main())
在上面的示例中,我们创建了一个需要 10 秒才能运行的任务。然后我们创建了一个 while 循环,检查该任务是否已完成。任务上的 done 方法在任务完成时返回 True,否则返回 False。每秒,我们检查任务是否已完成,并跟踪已经检查了多少秒。如果我们的任务已经运行了 5 秒,我们就取消该任务。然后,我们会继续 awaitlong_task,我们会看到 Our task was cancelled 被打印出来,表明我们捕获到了一个 CancelledError。
关于取消,需要注意的一点是,CancelledError 只能从 await 语句中抛出。这意味着,如果你在任务执行普通 Python 代码时调用 cancel,该代码将继续运行直到完成,直到我们遇到下一个 await 语句(如果存在的话),此时才能抛出 CancelledError。调用 cancel 不会神奇地立即停止任务;它只会停止任务,前提是你正处于 await 点或其下一个 await 点。
2.4.2 设置超时并使用 wait_for 取消
每隔一秒或在某个其他时间间隔检查,然后取消任务,正如我们在前面的例子中所做的那样,并不是处理超时的最佳方式。理想情况下,我们希望能有一个辅助函数,能让我们指定这个超时,并自动处理取消。
asyncio 通过一个叫 asyncio.wait_for 的函数提供了这种功能。这个函数接收一个协程或任务对象,以及一个以秒为单位的超时时间。然后它返回一个协程,我们可以 await 它。如果任务完成所需的时间超过了我们给定的超时时间,就会抛出一个 TimeoutError。一旦达到超时阈值,任务将自动被取消。
为了说明 wait_for 的工作原理,我们来看一个例子,其中我们有一个任务需要 2 秒才能完成,但我们只允许它 1 秒完成。当收到 TimeoutError 时,我们会捕获异常并检查任务是否已被取消。
import asynciofrom util import delayasync def main(): delay_task = asyncio.create_task(delay(2)) try: result = await asyncio.wait_for(delay_task, timeout=1) print(result) except asyncio.exceptions.TimeoutError: print('Got a timeout!') print(f'Was the task cancelled? {delay_task.cancelled()}')asyncio.run(main())
当我们运行上面的示例时,我们的应用程序大约需要 1 秒完成。1 秒后,我们的 wait_for 语句会抛出一个 TimeoutError,我们随后处理它。然后我们会看到原始的 delay 任务被取消,输出如下:
sleeping for 2 second(s)Got a timeout!Was the task cancelled? True
如果任务耗时超过预期,自动取消通常是很好的做法。否则,我们可能会有一个协程无限期地等待,占用资源却永远不会释放。然而,在某些情况下,我们可能希望让协程继续运行。例如,我们可能想在经过一段时间后通知用户某件事比预期耗时更长,但不希望在超时后取消任务。
要做到这一点,我们可以使用 asyncio.shield 函数来包装我们的任务。这个函数会防止传递给它的协程被取消,给它一个“盾牌”,使得取消请求会被忽略。
import asynciofrom util import delayasync def main(): task = asyncio.create_task(delay(10)) try: result = await asyncio.wait_for(asyncio.shield(task), 5) print(result) except TimeoutError: print("Task took longer than five seconds, it will finish soon!") result = await task print(result)asyncio.run(main())
在上面的示例中,我们首先创建了一个任务来包装我们的协程。这与我们第一个取消示例不同,因为我们需要在 except 块中访问该任务。如果我们传入的是一个协程,wait_for 会将其包装在一个任务中,但我们无法引用它,因为它在函数内部。
然后,在一个 try 块中,我们调用 wait_for 并用 shield 包装任务,这将防止任务被取消。在我们的异常块中,我们向用户打印一条有用的消息,让他们知道任务仍在运行,然后我们 await 我们最初创建的任务。这将让它完整地完成,程序的输出将是:
sleeping for 10 second(s)Task took longer than five seconds!finished sleeping for 10 second(s)finished <function delay at 0x10e8cf820> in 10 second(s)
取消和屏蔽是相当棘手的主题,有几个值得注意的情况。我们下面会介绍基础知识,但随着我们进入更复杂的案例,我们会更深入地探讨取消是如何工作的。
我们现在已经介绍了任务和协程的基础知识。这些概念是相互关联的。在下一节中,我们将探讨任务和协程之间的关系,并更深入地理解 asyncio 的结构。