我们前面介绍的队列示例都按先进先出(FIFO)的顺序处理项目。谁先来,谁先被处理。在很多情况下,这很好用,无论是在软件工程还是日常生活中。
然而,在某些应用场景中,让所有任务被视为同等重要并非理想。想象一下,我们正在构建一个数据处理流水线,每个任务都是一个耗时几分钟的查询。假设有两个任务在同一时间到达。第一个是低优先级的数据查询,第二个却是关键性的数据更新,必须尽快处理。在普通的队列中,第一个任务会先处理,导致更重要的第二个任务不得不等着。想象一下第一个任务需要好几个小时,或者如果所有工作协程都忙,第二个任务可能会等待很长时间。
我们可以通过使用优先级队列来解决这个问题,让工作协程优先处理最重要的任务。内部上,优先级队列是基于堆(使用 heapq 模块)而非简单的 Python 列表实现的,这与普通队列不同。要创建一个 asyncio 优先级队列,只需创建一个 asyncio.PriorityQueue 实例。
我们不必深入讨论数据结构的细节,但堆是一种二叉树,其特性是每个父节点的值都小于其所有子节点的值(参见图12.1)。这不同于通常用于排序和搜索问题的二叉查找树,后者的特性是左孩子小于父节点,右孩子大于父节点。我们利用的是堆的特性:最顶层的节点永远是树中最小的元素。如果我们始终把最小值当作最高优先级,那么高优先级的节点就总是在队列最前面。
图12.1 左边是一个满足堆特性的二叉树;右边是一个不满足堆特性的二叉查找树
我们不太可能将纯整数这样的工作项放入队列,因此我们需要一种方法来构造一个带有合理优先级规则的工作项。一种方法是使用元组,其中第一个元素是表示优先级的整数,第二个是任意的任务数据。默认队列实现会根据元组的第一个值决定优先级,数值越小优先级越高。我们来看一个使用元组作为工作项的示例,看看优先级队列是如何工作的。
import asynciofrom asyncio import Queue, PriorityQueuefrom typing import Tupleasync def worker(queue: Queue): while not queue.empty(): work_item: Tuple[int, str] = await queue.get() print(f'正在处理工作项 {work_item}') queue.task_done()async def main(): priority_queue = PriorityQueue() work_items = [(3, '最低优先级'), (2, '中等优先级'), (1, '最高优先级')] worker_task = asyncio.create_task(worker(priority_queue)) for work in work_items: priority_queue.put_nowait(work) await asyncio.gather(priority_queue.join(), worker_task)asyncio.run(main())
在上面的代码中,我们创建了三个工作项:一个高优先级,一个中等优先级,一个低优先级。然后,我们以相反的优先级顺序将它们加入优先级队列,即先插入最低优先级项,最后插入最高优先级项。在普通队列中,这会导致最低优先级项最先被处理,但如果你运行这段代码,你会发现输出如下:
正在处理工作项 (1, '最高优先级')正在处理工作项 (2, '中等优先级')正在处理工作项 (3, '最低优先级')
这表明我们是按照优先级顺序处理工作项,而不是它们被插入队列的顺序。元组适用于简单情况,但如果你的工作项包含大量数据,元组可能会变得混乱不堪。有没有办法创建一个类,使其在堆上按我们期望的方式工作呢?当然可以,而且最简洁的方式是使用 dataclass(当然,如果 dataclass 不可用,你也可以实现 __lt__、__le__、__gt__、__ge__ 等“魔术”方法)。
import asynciofrom asyncio import Queue, PriorityQueuefrom dataclasses import dataclass, field@dataclass(order=True)class WorkItem: priority: int data: str = field(compare=False)async def worker(queue: Queue): while not queue.empty(): work_item: WorkItem = await queue.get() print(f'正在处理工作项 {work_item}') queue.task_done()async def main(): priority_queue = PriorityQueue() work_items = [WorkItem(3, '最低优先级'), WorkItem(2, '中等优先级'), WorkItem(1, '最高优先级')] worker_task = asyncio.create_task(worker(priority_queue)) for work in work_items: priority_queue.put_nowait(work) await asyncio.gather(priority_queue.join(), worker_task)asyncio.run(main())
在上述代码中,我们创建了一个 dataclass,并将 ordered 设置为 True。然后我们添加一个优先级整数和一个字符串数据字段,并将其排除在比较之外。这意味着当我们把工作项加入队列时,它们只会按优先级字段进行排序。运行上述代码,我们可以看到它按正确的顺序被处理:
正在处理工作项 WorkItem(priority=1, data='最高优先级')正在处理工作项 WorkItem(priority=2, data='中等优先级')正在处理工作项 WorkItem(priority=3, data='最低优先级')
现在我们已经掌握了优先级队列的基础知识,让我们回到前面提到的订单管理 API 的例子中。假设我们有一些“高级会员”客户,他们在我们的电商平台上有很高的消费额。我们想要确保他们的订单总是能第一时间得到处理,以保证他们获得最佳体验。让我们修改之前例子,使用优先级队列来处理这些用户。
import asynciofrom asyncio import Queue, Taskfrom dataclasses import field, dataclassfrom enum import IntEnumfrom typing import Listfrom random import randrangefrom aiohttp import webfrom aiohttp.web_app import Applicationfrom aiohttp.web_request import Requestfrom aiohttp.web_response import Responseroutes = web.RouteTableDef()QUEUE_KEY = 'order_queue'TASKS_KEY = 'order_tasks'class UserType(IntEnum): POWER_USER = 1 NORMAL_USER = 2@dataclass(order=True) # ❶class Order: user_type: UserType order_delay: int = field(compare=False)async def process_order_worker(worker_id: int, queue: Queue): while True: print(f'工人 {worker_id}: 等待订单...') order = await queue.get() print(f'工人 {worker_id}: 正在处理订单 {order}') await asyncio.sleep(order.order_delay) print(f'工人 {worker_id}: 已处理订单 {order}') queue.task_done()@routes.post('/order')async def place_order(request: Request) -> Response: body = await request.json() user_type = UserType.POWER_USER if body['power_user'] == 'True' else UserType.NORMAL_USER order_queue = app[QUEUE_KEY] await order_queue.put(Order(user_type, randrange(5))) # ❷ return Response(body='订单已提交!')async def create_order_queue(app: Application): print('正在创建订单队列和任务。') queue: Queue = asyncio.PriorityQueue(10) app[QUEUE_KEY] = queue app[TASKS_KEY] = [asyncio.create_task(process_order_worker(i, queue)) for i in range(5)]async def destroy_queue(app: Application): order_tasks: List[Task] = app[TASKS_KEY] queue: Queue = app[QUEUE_KEY] print('等待正在进行的队列任务完成....') try: await asyncio.wait_for(queue.join(), timeout=10) finally: print('已完成所有待处理项,正在取消工人任务...') [task.cancel() for task in order_tasks]app = web.Application()app.on_startup.append(create_order_queue)app.on_shutdown.append(destroy_queue)app.add_routes(routes)web.run_app(app)
❶ 一个用于表示工作项的订单类,其优先级基于用户类型。 ❷ 将请求解析为一个订单。
上述代码看起来与我们之前交互慢速订单管理系统的 API 非常相似,唯一的区别是,我们使用了优先级队列,并创建了一个 Order 类来表示传入的订单。当我们收到一个订单时,我们现在期望其负载包含一个“高权限用户”标志,设置为 True 表示高级用户,False 表示普通用户。你可以用 cURL 这样调用这个端点:
curl -X POST -d '{"power_user":"False"}' localhost:8080/order
传入所需的高级用户值。如果一个用户是高级用户,他们的订单将始终在任何可用的工作人员之前被处理,优先于普通用户。
在优先级队列中,一个有趣但可能让人困惑的边界情况是:如果连续添加两个具有相同优先级的工作项,会发生什么?它们是否会按插入顺序被工作人员处理?让我们做一个简单的例子来测试一下。
import asynciofrom asyncio import Queue, PriorityQueuefrom dataclasses import dataclass, field@dataclass(order=True)class WorkItem: priority: int data: str = field(compare=False)async def worker(queue: Queue): while not queue.empty(): work_item: WorkItem = await queue.get() print(f'正在处理工作项 {work_item}') queue.task_done()async def main(): priority_queue = PriorityQueue() work_items = [WorkItem(3, '最低优先级'), WorkItem(3, '最低优先级第二'), WorkItem(3, '最低优先级第三'), WorkItem(2, '中等优先级'), WorkItem(1, '最高优先级')] worker_task = asyncio.create_task(worker(priority_queue)) for work in work_items: priority_queue.put_nowait(work) await asyncio.gather(priority_queue.join(), worker_task)asyncio.run(main())
在上述代码中,我们先向队列中加入了三个低优先级的任务。我们可能会期望它们按插入顺序被处理,但当我们运行代码时,实际的行为并不是这样:
正在处理工作项 WorkItem(priority=1, data='最高优先级')正在处理工作项 WorkItem(priority=2, data='中等优先级')正在处理工作项 WorkItem(priority=3, data='最低优先级第三')正在处理工作项 WorkItem(priority=3, data='最低优先级第二')正在处理工作项 WorkItem(priority=3, data='最低优先级')
原来,我们是按与插入相反的顺序处理低优先级任务。这是因为底层的 heapsort 算法不是稳定排序算法,相同的元素不保证插入顺序。在优先级相等的情况下顺序可能不是问题,但如果你在乎这一点,就需要添加一个打破平局的键,来获得你想要的顺序。一种简单的方法是为工作项添加一个序列号,尽管还有许多其他方法可以做到这一点。
import asynciofrom asyncio import Queue, PriorityQueuefrom dataclasses import dataclass, field@dataclass(order=True)class WorkItem: priority: int order: int data: str = field(compare=False)async def worker(queue: Queue): while not queue.empty(): work_item: WorkItem = await queue.get() print(f'正在处理工作项 {work_item}') queue.task_done()async def main(): priority_queue = PriorityQueue() work_items = [WorkItem(3, 1, '最低优先级'), WorkItem(3, 2, '最低优先级第二'), WorkItem(3, 3, '最低优先级第三'), WorkItem(2, 4, '中等优先级'), WorkItem(1, 5, '最高优先级')] worker_task = asyncio.create_task(worker(priority_queue)) for work in work_items: priority_queue.put_nowait(work) await asyncio.gather(priority_queue.join(), worker_task)asyncio.run(main())
在之前的代码中,我们在 WorkItem 类中添加了一个 order 字段。然后,当我们插入工作项时,会加入一个代表插入顺序的整数。当优先级相同时,排序将基于这个字段。在我们的例子中,这为我们实现了低优先级项目插入顺序的预期排序:
正在处理工作项 WorkItem(priority=1, order=5, data='最高优先级')正在处理工作项 WorkItem(priority=2, order=4, data='中等优先级')正在处理工作项 WorkItem(priority=3, order=1, data='最低优先级')正在处理工作项 WorkItem(priority=3, order=2, data='最低优先级第二')正在处理工作项 WorkItem(priority=3, order=3, data='最低优先级第三')
我们现在已经学会了如何按先进先出队列顺序和优先级队列顺序处理工作项。那么,如果我们想先处理最近添加的工作项呢?接下来,我们看看如何用一个后进先出(LIFO)队列来实现这一点。