在设计需要处理事件或其他类型数据的应用程序时,我们通常需要一种机制来存储这些事件,并将它们分发给一组工作线程。这些工作线程可以并发地根据这些事件执行我们需要的操作,从而相比顺序处理事件节省时间。asyncio 提供了异步队列的实现,让我们能够轻松完成这项任务。我们可以把数据放入队列,然后让多个工作线程同时从队列中拉取数据并处理,只要数据可用就立刻开始。
这通常被称为生产者-消费者工作流。某种东西生成数据或事件,我们需要对其进行处理;而处理这些任务可能耗时很长。队列还能帮助我们在保持用户界面响应性的同时,传输长时间运行的任务。我们将一个任务放进队列,稍后再处理,同时告知用户我们已经在后台启动了这项工作。异步队列还有一个额外好处,就是能限制并发量——每个队列一般都只允许有限数量的工作任务。这一点和第11章中我们看到的信号量很相似。
在本章里,我们将学习如何用 asyncio 队列来处理生产者-消费者工作流。首先通过构建一个模拟杂货店收银队列的例子,让收银员作为我们的消费者,来掌握基础用法。接着把它应用到订单管理的 Web API,演示如何快速响应用户请求,同时让后台队列处理复杂工作。我们还会学习如何按优先级处理任务,这对某个任务虽然后来加入队列但更重要,必须优先处理的情况非常有用。最后,我们会看看后进先出(LIFO)队列,以及异步队列的潜在缺点。
队列是一种先进先出(FIFO)的数据结构。简单说,就是第一个进入队列的元素,也是第一个离开队列的元素。这和你在超市排队结账时的体验其实差不多:你排在队伍末尾,等前面的人都结完账了,才轮到你。当排在你前面的人结完账,你就往前挪一步,后面新来的人得继续在你后面等待。一旦轮到你,你结账走人,整个流程就结束了。
我们描述的这个收银队列是同步的——一个收银员一次只能服务一位顾客。如果我们想利用并发的优势,让队列更像大型超市那样高效运转呢?那就不再是单个收银员,而是多个收银员共用一个队列。每当有收银员空闲,就可以招呼下一位顾客过来结账。这样一来,就形成了多个收银员并行地从队列中引导顾客,同时并行地为顾客结账。
这正是异步队列的核心价值所在:我们将多个待处理的工作项加入队列,然后让多个工作线程在自己有空的时候,从队列中拉取任务来处理。
让我们通过一个超市收银场景来具体感受一下。把我们的工作线程想象成收银员,而“工作项”则是需要结账的顾客。我们为每位顾客创建一份包含商品列表的购物清单,每样商品扫描时间各不相同——比如香蕉要称重还要输入条码,酒类还得经理核对身份证。我们这样模拟现实。
对于超市收银系统,我们将定义几个数据类,用来表示商品,用整数代表收银员检查一件商品所需的时间(单位:秒)。我们还会构建一个顾客类,里面随机安排一些要购买的商品。然后把这些顾客放进一个 asyncio 队列,模拟收银队伍。同时,我们还会创建多个工作协程,充当我们的收银员。这些协程会从队列中拉取顾客,逐个扫描他们商品,用 asyncio.sleep() 模拟扫描过程。
import asynciofrom asyncio import Queuefrom random import randrangefrom typing import Listclass Product: def __init__(self, name: str, checkout_time: float): self.name = name self.checkout_time = checkout_timeclass Customer: def __init__(self, customer_id: int, products: List[Product]): self.customer_id = customer_id self.products = productsasync def checkout_customer(queue: Queue, cashier_number: int): while not queue.empty(): # ❶ customer: Customer = queue.get_nowait() print(f'收银员 {cashier_number} 正在为顾客 {customer.customer_id} 结账') for product in customer.products: # ❷ print(f"收银员 {cashier_number} 正在为顾客 {customer.customer_id} 的 {product.name} 扫描") await asyncio.sleep(product.checkout_time) print(f'收银员 {cashier_number} 已完成为顾客 {customer.customer_id} 的结账') queue.task_done()async def main(): customer_queue = Queue() all_products = [Product('啤酒', 2), Product('香蕉', .5), Product('香肠', .2), Product('尿布', .2)] for i in range(10): # ❸ products = [all_products[randrange(len(all_products))] for _ in range(randrange(10))] customer_queue.put_nowait(Customer(i, products)) cashiers = [asyncio.create_task(checkout_customer(customer_queue, i)) # ❹ for i in range(3)] # ❹ await asyncio.gather(customer_queue.join(), *cashiers)asyncio.run(main())
❶ 如果队列还有顾客,就持续结账。 ❷ 逐一扫描顾客的商品。 ❸ 创建10个带有随机商品的顾客。 ❹ 创建三个“收银员”或工作协程来结账。
上面代码里,我们创建了两个数据类:一个是商品类,一个是超市顾客类。商品类包含商品名和收银员录入所需时间(秒)。顾客类则带有一堆他们要买的东西。我们还定义了一个 checkout_customer 协程函数,它负责处理结账。只要队列里还有顾客,它就会用 queue.get_nowait() 从队列前端拉出一个顾客,再用 asyncio.sleep 模拟扫描商品的时间。当顾客结完账后,我们调用 queue.task_done()。这是告诉队列:“我这个任务完成了”。内部的 Queue 类会为每个从队列获取的任务自动增加一个未完成任务计数器。当你调用 task_done() 时,就相当于说“我搞定了”,队列内部的计数器就会减1(为什么这么设计马上就要讲到 join 时就明白了)。
在主协程 main 中,我们创建了一个可用商品列表,并随机生成10个顾客。我们也创建了3个 checkout_customer 协程工作项,放在 cashiers 列表里,就像有三位真人收银员在为我们虚构的超市干活。最后,我们用 gather 等待所有收银员协程完成,同时也等待 customer_queue.join()。使用 gather 是为了确保如果收银员任务中出现异常,异常信息能冒泡到主协程中。join 协程会阻塞,直到队列为空且所有顾客都被结完账。队列被认为是空的,当内部的待处理任务计数变为零时。所以,务必记得在工作协程里调用 task_done()。如果你忘了,join 协程可能就会得到一个错误的队列状态,永远无法结束。
虽然顾客的商品是随机生成的,但你应该能看到类似下面的输出,说明每个工作协程(收银员)正在并发地从队列中结账:
收银员 0 正在为顾客 0 结账收银员 0 正在为顾客 0 的 香肠 扫描收银员 1 正在为顾客 1 结账收银员 1 正在为顾客 1 的 啤酒 扫描收银员 2 正在为顾客 2 结账收银员 2 正在为顾客 2 的 香蕉 扫描收银员 0 正在为顾客 0 的 香蕉 扫描收银员 2 正在为顾客 2 的 香肠 扫描收银员 0 正在为顾客 0 的 香肠 扫描收银员 2 正在为顾客 2 的 香蕉 扫描收银员 0 已完成为顾客 0 的结账收银员 0 正在为顾客 3 结账
我们的三位收银员同时开始从队列中结账。一旦一个收银员完成当前顾客的结账,就会立即去拉下一个顾客,直到队列为空。
你可能会注意到,向队列添加项目和取出项目的几种方法名字很奇怪:get_nowait、put_nowait。为什么这些方法结尾都加了 nowait 呢?其实在队列操作上,有两种方式获取和取出项目:一种是协程形式、会阻塞的,另一种是非阻塞的、常规方法。get_nowait 与 put_nowait 会立刻执行非阻塞操作,直接返回。那我们为啥还需要一个会阻塞的插入或取出操作呢?
答案就在我们想如何处理队列的上下限。具体来说,就是当队列满了会发生什么,以及队列为空时又会发生什么。
回到我们的超市队列例子,我们来想想为什么之前的模拟和真实世界有点不一样。使用 get、put 的协程版本就能解决这些问题。
- 你不可能刚一开门,就有10位顾客全都涌进来排成一队,然后排空后收银员就歇着了。
- 我们的顾客队列肯定不能无限长——比如新款游戏主机上市了,你是城里唯一一家卖的,瞬间人流爆炸,结果可能有5000个人挤进店里?显然不行。我们得有个办法拒绝部分顾客,或者让他们在店外排队。
针对第一个问题,假设我们想重构应用,让顾客每隔几秒随机产生,以模拟真实的超市流量。但在我们目前的 checkout_customer 实现中,它是循环检查队列是否为空,然后用 get_nowait() 拿出顾客。由于队列有可能为空,我们不能单纯依赖 not queue.empty() 进行循环,因为即使没人排队,收银员也得一直闲着,所以我们需要在工作协程里写 while True:。那在这种情况下,如果调用 get_nowait() 但队列是空的会怎样?很简单,试一下就知道了:新建一个空队列,然后调用 get_nowait():
import asynciofrom asyncio import Queueasync def main(): customer_queue = Queue() customer_queue.get_nowait()asyncio.run(main())
这个方法会抛出一个 asyncio.queues.QueueEmpty 异常。我们当然可以把这个操作包裹在 try...except 里忽略异常,但这行不通——因为队列一旦为空,我们的工作协程就成了“死循环”,不停转圈儿,捕捉空队列异常。这时,我们就该用 get 协程方法了。它会以非占用式的方式阻塞,直到队列里有东西可处理,且不会抛异常。这就相当于工作协程在“干等”,随时准备接收顾客,让自己有活干。
第二个问题:当数千人同时想插队进来怎么办?这就需要考虑队列的边界了。默认情况下,队列是没有上限的,可以无限存放工作项。理论上没问题,但现实中系统都有内存限制,所以给队列设个上限防止内存溢出是个好主意。在这种情况下,我们得想清楚当队列满了时该如何处理。试试看:创建一个最大容量为1的队列,然后尝试用 put_nowait 添加第二个元素:
import asynciofrom asyncio import Queueasync def main(): queue = Queue(maxsize=1) queue.put_nowait(1) queue.put_nowait(2)asyncio.run(main())
在这种情况下,和 get_nowait 一样,put_nowait 会抛出 asyncio.queues.QueueFull 异常。和 get 一样,put 也有一个协程方法。这个方法会阻塞,直到队列里有空位。基于此,我们来改写顾客例子,使用 get、put 的协程版本。
import asynciofrom asyncio import Queuefrom random import randrangeclass Product: def __init__(self, name: str, checkout_time: float): self.name = name self.checkout_time = checkout_timeclass Customer: def __init__(self, customer_id, products): self.customer_id = customer_id self.products = productsasync def checkout_customer(queue: Queue, cashier_number: int): while True: customer: Customer = await queue.get() print(f'收银员 {cashier_number} 正在为顾客 {customer.customer_id} 结账') for product in customer.products: print(f"收银员 {cashier_number} 正在为顾客 {customer.customer_id} 的 {product.name} 扫描") await asyncio.sleep(product.checkout_time) print(f'收银员 {cashier_number} 已完成为顾客 {customer.customer_id} 的结账') queue.task_done()def generate_customer(customer_id: int) -> Customer: # ❶ all_products = [Product('啤酒', 2), Product('香蕉', .5), Product('香肠', .2), Product('尿布', .2)] products = [all_products[randrange(len(all_products))] for _ in range(randrange(10))] return Customer(customer_id, products)async def customer_generator(queue: Queue): # ❷ customer_count = 0 while True: customers = [generate_customer(i) for i in range(customer_count, customer_count + randrange(5))] for customer in customers: print('等待将顾客放入队列...') await queue.put(customer) print('顾客已放入队列!') customer_count = customer_count + len(customers) await asyncio.sleep(1)async def main(): customer_queue = Queue(5) customer_producer = asyncio.create_task(customer_generator(customer_queue)) cashiers = [asyncio.create_task(checkout_customer(customer_queue, i)) for i in range(3)] await asyncio.gather(customer_producer, *cashiers)asyncio.run(main())
❶ 生成一个随机顾客。 ❷ 每秒生成几个随机顾客。
在这个新版本中,我们创建了一个 generate_customer 协程,用来生成带有随机商品列表的顾客。同时,我们还创建了一个 customer_generator 协程函数,它每秒生成1到5个随机顾客,并用 put 将它们加入队列。因为我们使用了协程版的 put,所以如果队列满了,customer_generator 会阻塞,直到队列有空间。具体来说,就是如果队列已经有5个顾客,当“生产者”想加第六个时,队列就会阻塞,让它暂时不能入队,直到有收银员完成结账,释放出空间。我们可以把 customer_generator 看作我们的生产者,因为它在为收银员提供要处理的顾客。
我们也把 checkout_customer 重构为无限循环运行,因为即使队列为空,收银员也得一直待命。同时,我们也改用队列的 get 协程方法,当队列无顾客时会阻塞。在主协程中,我们创建了一个最多容纳5个顾客的队列,启动了3个 checkout_customer 任务并行运行。我们把收银员看作消费者,因为他们从队列中消费顾客来结账。
这段代码会随机生成顾客,但迟早队列会满,使得收银员处理速度赶不上生产者创建的速度。于是我们能看到类似的输出,其中“生产者”在等待,直到有顾客完成结账,才能将新的顾客放进来:
等待将顾客放入队列...收银员 1 正在为顾客 7 的 香肠 扫描收银员 1 正在为顾客 7 的 尿布 扫描收银员 1 正在为顾客 7 的 尿布 扫描收银员 2 已完成为顾客 5 的结账收银员 2 正在为顾客 9 结账收银员 2 正在为顾客 9 的 香蕉 扫描顾客已放入队列!
现在我们已经理解了异步队列的基本原理。不过毕竟我们平时也不会真去建个超市模拟器,下面来看几个真实世界的场景,看看如何把这些技术应用到真正会做的项目中。
队列在网页应用中非常有用,尤其是在处理那些可能耗时很长、可以在后台运行的操作时。如果你在网页请求的主协程里直接运行这个耗时操作,用户会一直卡在页面上等待,最终得到一个缓慢、无响应的页面。
设想你在一个电商公司工作,正面临一个较慢的订单管理系统。处理一个订单可能需要好几秒钟,但我们不想让用户一直在等确认回复。此外,订单系统对高负载处理不好,所以我们希望限制同时发送到它的请求数量。这种情况下,队列能完美解决这两个问题。正如我们之前所见,队列可以设置最大元素数量,超过这个数量要么会阻塞,要么抛出异常。这意味着,如果你有一个设置了上限的队列,那么最多同时运行的消费者任务数量就等于你设定的这个数字。这就自然地控制了并发度。
队列还能解决用户等待时间过长的问题。把元素放到队列里几乎是瞬间完成的,这意味着我们可以立刻通知用户订单已提交,提供一个极其快速的用户体验。当然,在现实世界里,这也带来了背景任务失败却没被用户察觉的风险,因此你需要某种形式的数据持久化和逻辑来应对这种情况。
为了实际体验一下,我们来创建一个简单的 aiohttp 网页应用,用队列来执行后台任务。我们将通过 asyncio.sleep 模拟与一个慢速订单管理系统的交互。在真正的微服务架构中,你可能通过 REST 与 aiohttp 或类似库通信,但为了简化,这里我们直接用 sleep。
我们创建一个 aiohttp 的启动钩子,用来创建队列和一组工作协程,它们将与慢速服务进行交互。我们还会创建一个 HTTP POST /order 接口,用来将订单加入队列(这里我们只是随机生成一个数字,让我们的工作协程 sleep 来模拟慢速服务)。一旦订单加入队列,我们就返回一个 HTTP 200 和一条消息,表示订单已成功提交。
我们还会在 aiohttp 的关闭钩子中加入优雅的关机逻辑,因为应用关闭时,可能还有订单正在处理中。在关闭钩子中,我们会等待所有繁忙的工作者完成工作。
import asynciofrom asyncio import Queue, Taskfrom typing import Listfrom random import randrangefrom aiohttp import webfrom aiohttp.web_app import Applicationfrom aiohttp.web_request import Requestfrom aiohttp.web_response import Responseroutes = web.RouteTableDef()QUEUE_KEY = 'order_queue'TASKS_KEY = 'order_tasks'async def process_order_worker(worker_id: int, queue: Queue): # ❶ while True: print(f'工人 {worker_id}: 等待订单...') order = await queue.get() print(f'工人 {worker_id}: 处理订单 {order}') await asyncio.sleep(order) print(f'工人 {worker_id}: 已处理订单 {order}') queue.task_done()@routes.post('/order')async def place_order(request: Request) -> Response: order_queue = app[QUEUE_KEY] await order_queue.put(randrange(5)) # ❷ return Response(body='订单已提交!')async def create_order_queue(app: Application): # ❸ print('正在创建订单队列和任务。') queue: Queue = asyncio.Queue(10) app[QUEUE_KEY] = queue app[TASKS_KEY] = [asyncio.create_task(process_order_worker(i, queue)) for i in range(5)]async def destroy_queue(app: Application): # ❹ order_tasks: List[Task] = app[TASKS_KEY] queue: Queue = app[QUEUE_KEY] print('等待正在进行的队列任务完成....') try: await asyncio.wait_for(queue.join(), timeout=10) finally: print('已完成所有待处理项,正在取消工人任务...') [task.cancel() for task in order_tasks]app = web.Application()app.on_startup.append(create_order_queue)app.on_shutdown.append(destroy_queue)app.add_routes(routes)web.run_app(app)
❶ 从队列中获取一个订单并处理。 ❷ 把订单加入队列,并立即响应用户。 ❸ 创建一个最多容纳10个元素的队列,以及5个工作协程。 ❹ 等待所有忙碌的任务完成。
在这段代码中,我们首先创建了一个 process_order_worker 协程。它从队列中拉取一个元素(一个整数),然后 sleep 相应的时间来模拟与一个慢速订单管理系统的交互。这个协程会无限循环,不断地从队列中拉取并处理任务。
接下来,我们创建了设置和销毁队列的协程,分别是 create_order_queue 与 destroy_order_queue。创建队列很简单:我们创建一个最大容量为10的 asyncio 队列,然后创建5个工作协程,把它们存进我们的 Application 实例。
销毁队列稍微复杂一点。我们首先用 Queue.join 等待队列处理完所有元素。由于应用正在关闭,不会再接受新的 HTTP 请求,因此不会再有新订单进入队列。这意味着队列里已有的所有任务都会被一个工作协程处理,而当前正在处理的任务也会顺利结束。我们还把 join 包裹在 wait_for 中,并设置了10秒超时。这样做很好,因为你不希望一个失控的任务花太久时间导致应用无法正常关闭。
最后,我们定义了应用程序的路由。我们创建了一个在 /order 地址的 POST 端点。这个端点生成一个随机延迟,并将其加入队列。一旦我们把这个订单加入队列,就立即向用户返回一个 HTTP 200 状态码和一条简短的消息。注意,我们用了协程版本的 put,这意味着如果队列满了,请求会阻塞,直到消息成功入队,这可能需要一些时间。你可能想用 put_nowait 版本,然后返回一个 HTTP 500 错误或其他错误码,让调用方稍后再试。在这里,我们做出了一种权衡,即请求可能需要花费一些时间,以保证订单总是能入队。你的应用可能要求“快速失败”的行为,所以当队列满时返回错误码才是正确的选择。
使用这个队列,只要队列没满,我们的订单接口几乎能即时响应。这为终端用户提供了快速流畅的下单体验——希望能让用户乐于重复购买。
在网页应用中使用 asyncio 队列时,有一点需要注意:队列的故障模式。如果我们的某个 API 实例崩溃了(比如内存不足),或者我们需要重启服务器进行部署,该怎么办?在这种情况下,队列中未处理的订单会全部丢失,因为它们只存在于内存中。有时候丢掉一个队列中的项目没关系,但对于一个顾客订单,这肯定是大问题。
asyncio 队列本身并不具备任务持久化或队列持久性的概念。如果你希望队列里的任务能抵御这类故障,就需要引入某种保存任务的方法,比如数据库。更准确地说,应该使用一个独立于 asyncio 的外部队列,它支持任务持久化。Celery 与 RabbitMQ 都是支持磁盘持久化的任务队列的例子。
当然,使用外部架构队列会带来额外的复杂性。对于支持持久化任务的可靠队列,还需要面对将数据持久化到磁盘带来的性能挑战。要确定最适合你应用的架构,就必须仔细权衡纯内存 asyncio 队列与外部架构组件(如 Celery、RabbitMQ)之间的利弊。
消费者任务也可以是生产者,如果它在处理完任务后又生成了更多需要加入队列的工作。举个例子,一个网络爬虫,访问特定页面上的所有链接。你可以想象一个工作协程负责下载并扫描一个页面的所有链接。当它找到了链接,就可以把它们加入队列。这能让其他可用的工作协程拉取这些链接,然后并行访问它们,并将遇到的任何新链接重新加入队列。
我们来构建这样一个爬虫。我们将创建一个无上限的队列(如果你担心内存溢出,可能想加上上限),用于存放待下载的网址。然后,我们的工作协程会从队列中拉取网址,使用 aiohttp 下载内容。下载完成后,我们会用一个流行的 HTML 解析库 Beautiful Soup 来提取所有 href 链接,并把它们放回队列。
至少对于这个应用,我们不想扫遍整个互联网,所以我们只扫描从根页面出发一定数量的页面。我们称之为“最大深度”;如果最大深度是3,就意味着我们只会跟踪从根页面出发3层以内的链接。
要开始,用以下命令安装 Beautiful Soup 4.9.3 版本:
pip install -Iv beautifulsoup4==4.9.3
我们假设你已经了解 Beautiful Soup。你可以查看文档:https://www.crummy.com/software/BeautifulSoup/bs4/doc(https://www.crummy.com/software/BeautifulSoup/bs4/doc)
我们的计划是创建一个工作协程,从队列中拉取一个页面,用 aiohttp 下载它。完成后,我们用 Beautiful Soup 提取所有形如 <a href="url"> 的链接,再把它们放回队列。
import asyncioimport aiohttpimport loggingfrom asyncio import Queuefrom aiohttp import ClientSessionfrom bs4 import BeautifulSoupclass WorkItem: def __init__(self, item_depth: int, url: str): self.item_depth = item_depth self.url = urlasync def worker(worker_id: int, queue: Queue, session: ClientSession, max_depth: int): print(f'工人 {worker_id}') while True: # ❶ work_item: WorkItem = await queue.get() print(f'工人 {worker_id}: 正在处理 {work_item.url}') await process_page(work_item, queue, session, max_depth) print(f'工人 {worker_id}: 已完成 {work_item.url}') queue.task_done()async def process_page(work_item: WorkItem, queue: Queue, session: ClientSession, max_depth: int): # ❷ try: response = await asyncio.wait_for(session.get(work_item.url), timeout=3) if work_item.item_depth == max_depth: print(f'达到最大深度,' f'跳过 {work_item.url}') else: body = await response.text() soup = BeautifulSoup(body, 'html.parser') links = soup.find_all('a', href=True) for link in links: queue.put_nowait(WorkItem(work_item.item_depth + 1, link['href'])) except Exception as e: logging.exception(f'处理网址 {work_item.url} 时出错')async def main(): # ❸ start_url = 'http:/ /example.com' url_queue = Queue() url_queue.put_nowait(WorkItem(0, start_url)) async with aiohttp.ClientSession() as session: workers = [asyncio.create_task(worker(i, url_queue, session, 3)) for i in range(100)] await url_queue.join() [w.cancel() for w in workers]asyncio.run(main())
❶ 从队列中获取一个网址进行处理,然后开始下载。 ❷ 下载网址内容,解析页面上的所有链接,并将它们放回队列。 ❸ 创建一个队列和100个工作协程来处理网址。
在上面的代码中,我们首先定义了一个 WorkItem 类(❶)。这是一个简单数据类,用来存放一个网址和它的深度。然后我们定义了工作协程,它从队列中拉取一个 WorkItem,并调用 process_page。process_page 协程函数会下载该网址的内容(如果可以的话,可能出现超时或异常,我们只记录并忽略)。然后,它使用 Beautiful Soup 提取所有链接,并将它们放回队列,以便其他工作协程处理。
在我们的主协程 main 中,我们创建了队列,并用第一个 WorkItem 进行初始化。这个例子中,我们硬编码了 example.com,因为它是根页面,所以深度为0。我们创建了一个 aiohttp 会话,并创建了100个工作协程,意味着我们可以并行下载100个网址,最大深度设为3。然后,我们等待队列为空,所有工作协程都完成,使用 Queue.join。一旦队列处理完毕,我们就取消所有工作协程。当你运行这段代码时,你会看到100个工作协程启动,开始从每个下载的网址中寻找链接,输出类似如下内容:
找到 1 个链接来自 http:/ / example .com工人 0: 已完成 http:/ / example .com工人 0: 正在处理 https:/ /www .iana.org/domains/example找到 68 个链接来自 https:/ /www .iana.org/domains/example工人 0: 已完成 https:/ /www .iana.org/domains/example工人 0: 正在处理 /工人 2: 正在处理 /domains工人 3: 正在处理 /numbers工人 4: 正在处理 /protocols工人 5: 正在处理 /about工人 6: 正在处理 /go/rfc2606工人 7: 正在处理 /go/rfc6761工人 8: 正在处理 http:/ /www .icann.org/topics/idn/工人 9: 正在处理 http:/ /www .icann.org/
工作协程将继续下载页面并处理链接,将它们加入队列,直到达到我们指定的最大深度。
至此,我们通过构建一个虚拟的超市收银线、一个订单管理 API 以及一个网络爬虫,已经掌握了异步队列的基础知识。到目前为止,我们的工作协程对队列中的每一个元素都是平等对待的,它们只是按照顺序从前端取出第一个来处理。但如果某些任务想更早被执行,哪怕它在队列的后面呢?让我们来看看优先级队列,看如何做到这一点。