后进先出(LIFO)队列在计算机科学领域通常被称为栈。我们可以把它想象成一叠扑克筹码:下注时,你从栈顶拿筹码(或者说“弹出”它们),当你希望赢手牌时,你又把筹码放回栈顶(或者说“压入”它们)。这种队列适用于你想让工作协程优先处理最近添加的项目。
我们不会构建一个过于复杂的例子,只是为了演示工作协程处理元素的顺序。至于何时使用 LIFO 队列,取决于你的应用需要按什么顺序处理队列中的项目。你需要优先处理最近插入的项目吗?如果是,那么就应该使用 LIFO 队列。
import asynciofrom asyncio import Queue, LifoQueuefrom dataclasses import dataclass, field@dataclass(order=True)class WorkItem: priority: int order: int data: str = field(compare=False)async def worker(queue: Queue): while not queue.empty(): work_item: WorkItem = await queue.get() # ❶ print(f'正在处理工作项 {work_item}') queue.task_done()async def main(): lifo_queue = LifoQueue() work_items = [WorkItem(3, 1, '最低优先级第一'), WorkItem(3, 2, '最低优先级第二'), WorkItem(3, 3, '最低优先级第三'), WorkItem(2, 4, '中等优先级'), WorkItem(1, 5, '最高优先级')] worker_task = asyncio.create_task(worker(lifo_queue)) for work in work_items: lifo_queue.put_nowait(work) # ❷ await asyncio.gather(lifo_queue.join(), worker_task)asyncio.run(main())
❶ 从队列中获取一个项目,或者说“弹出”它,从栈中。 ❷ 将一个项目放入队列,或者说“压入”它,到栈中。
在上面的代码中,我们创建了一个 LIFO 队列和一组工作项。然后我们依次将它们插入队列,拉出来并进行处理。运行后,你会看到以下输出:
正在处理工作项 WorkItem(priority=1, order=5, data='最高优先级')正在处理工作项 WorkItem(priority=2, order=4, data='中等优先级')正在处理工作项 WorkItem(priority=3, order=3, data='最低优先级第三')正在处理工作项 WorkItem(priority=3, order=2, data='最低优先级第二')正在处理工作项 WorkItem(priority=3, order=1, data='最低优先级第一')
注意,我们是以与插入顺序相反的顺序处理队列中的项目。因为这是一个栈,所以这就说得通了,因为我们是优先处理最近添加的工作项。
我们现在已经见识了 asyncio 队列库提供的所有类型的队列。使用这些队列有没有什么坑?我们能不能在任何时候都随便用?我们将在第13章中讨论这个问题。
asyncio 队列是任务队列,适用于生产数据的协程与负责处理这些数据的协程之间的工作流。- 队列将数据生成与数据处理解耦,因为生产者可以将项目放入队列,多个工作协程可以独立并行地处理它们。
- 我们可以使用优先级队列来赋予某些任务相对于其他任务更高的优先级。这对于某些工作比其他工作更重要并且总是需要优先处理的情况非常有用。
asyncio 队列不是分布式的,不是持久的,也不是可靠的。如果你需要这些特性,就需要转向一个独立的架构组件,比如 Celery、RabbitMQ。