我们尝试了事件、条件和队列。每一种都比上一种更接近目标,但在实际并发环境下仍然会失效。以下是最终有效的可观察模式。
在 Python 中,围绕共享状态协调并发任务是最常见的问题之一 asyncio。标准库提供了 asyncio.Event 和 asyncio.Condition,但它们都存在缺陷,只有在真正的并发压力下才会显现出来。我们在构建Inngest 的 Python SDK[1]时就遇到了这个问题,其中多个异步处理程序需要围绕 WebSocket 连接状态进行协调。
这篇文章逐一分析了每个基本问题,准确地指出了问题所在,并逐步迭代,最终找到了一个能够处理我们提出的每一种情况的解决方案。
场景
想象一下,一个异步 Python 应用程序管理着一个会经历不同状态的连接:
disconnected → connecting → connected → closing → closed
当连接开始关闭时,你的某个并发处理程序需要清除待处理的请求。它必须等待 closing 状态:
state = "disconnected"
async def drain_requests():
# Need to wait until state == "closing"
...
print("draining pending requests")
很简单。我们来看看每个标准库工具是如何处理的。
尝试 1:轮询
最直接的方法:在循环中检查该值。
async def drain_requests():
while state != "closing":
await asyncio.sleep(0.1)
print("draining pending requests")
这种方法可行,但弊端重重:
- • 延迟与效率:睡眠间隔过短会浪费 CPU 周期,睡眠间隔过长则会增加延迟。两者各有优劣。
- • 重复:每个消费者都以相同的权衡重新实现相同的轮询循环。
- • 无事件驱动唤醒:无论是否有任何变化,消费者都会运行。
我们可以做得更好。我们真正想要的是休眠直到状态发生变化,而不是休眠任意时长后再进行检查。
尝试 2:asyncio.Event
asyncio.Event 是标准库对“发生某些事情时叫醒我”的回答:
closing_event = asyncio.Event()
async def drain_requests():
await closing_event.wait()
print("draining pending requests")
无需轮询,避免浪费周期。处理程序会阻塞,直到事件触发。但 Event 是布尔值:要么已设置,要么未设置。我们的连接有五种状态,而 drain_requests 只关心其中一种。当另一个处理程序需要等待 connected 时会发生什么?你需要第二个事件。第三个处理程序等待“未断开连接”状态?则需要逻辑相反的第三个事件。设置器必须了解所有这些状态:
closing_event = asyncio.Event()
connected_event = asyncio.Event()
async def set_state(new_state):
global state
state = new_state
if new_state == "closing":
closing_event.set()
if new_state == "connected":
connected_event.set()
每出现一个新的条件,就需要另一个 Event 对象。事件之间的协调是 bug 的根源。如果漏掉一个 set() 或 clear() 调用,消费者就会永远阻塞。
尝试 3:asyncio.Condition
asyncio.Condition 允许消费者等待任意谓词:
state = "disconnected"
condition = asyncio.Condition()
async def drain_requests():
async with condition:
await condition.wait_for(lambda: state == "closing")
print("draining pending requests")
一个协调点,任意谓词,没有Event对象的泛滥。这好多了。
但它遵循着一种常见的模式。
丢失的更新
Condition 的设计目的是在消费者唤醒时检查当前值。当状态只向前移动时,这没有问题,但当状态转换很快时,这种方法就失效了。当 setter 改变状态时,它会调用 notify_all(),后者会为每个等待的消费者安排唤醒。但在单线程事件循环中,只有当前协程让出后,消费者才会真正运行。如果在此之前值再次改变,消费者会被唤醒,并针对当前值(而不是触发通知的值)重新评估其谓词。谓词会失败,消费者会再次进入睡眠状态,而且可能永远如此。
# Two transitions in quick succession:
await set_state("closing") # notify_all() schedules wakeups
await set_state("closed") # state changes again before consumers run
# drain_requests finally wakes, sees "closed", not "closing".
# Pending requests get silently dropped.
以下是一个可运行的复现示例:
import asyncio
state = "disconnected"
condition = asyncio.Condition()
async def set_state(new_state):
global state
async with condition:
state = new_state
condition.notify_all()
async def drain_requests():
async with condition:
await condition.wait_for(lambda: state == "closing")
print("draining pending requests")
async def main():
task = asyncio.create_task(drain_requests())
await asyncio.sleep(0) # Let drain_requests start waiting
await set_state("closing") # Briefly "closing"...
await set_state("closed") # ...then immediately "closed"
await asyncio.wait_for(task, timeout=1.0)
# TimeoutError: drain_requests never sees "closing"
asyncio.run(main())
原值为"closing",但当drain_requests醒来并检查时,它已经变成了"closed"。中间状态消失了。
这并非人为设定的极端情况。在我们的 SDK 连接管理器中,关闭信号可能在同一个事件循环周期内到达并导致连接关闭。drain_requests 永远不会执行,任何正在进行的工作都会消失。
解决方案:按消费者排队
与其唤醒消费者并询问“当前状态是否符合您的预期?”,不如将每次状态转换缓冲到一个每个消费者的队列中。每个消费者清空自己的队列并单独检查每次状态转换。这样,消费者就不会错过任何状态。
每个消费者都注册自己的 asyncio.Queue。当值发生变化时,设置者会将 (old, new) 推入每个已注册的队列。以下是一个简化版本,用以说明其核心思想:
class ValueWatcher:
def __init__(self, initial_value):
self._value = initial_value
self._watch_queues: list[asyncio.Queue] = []
@property
def value(self):
return self._value
@value.setter
def value(self, new_value):
if new_value == self._value:
return
old_value = self._value
self._value = new_value
# Notify all consumers
for queue in self._watch_queues:
queue.put_nowait((old_value, new_value))
async def wait_for(self, target):
queue = asyncio.Queue()
self._watch_queues.append(queue)
try:
if self._value == target:
return
while True:
old, new = await queue.get()
if new == target:
return
finally:
self._watch_queues.remove(queue)
wait_for 注册一个队列,检查当前值,然后清空转换直到找到匹配项。try/finally 确保即使调用者取消操作,队列也会被注销。
队列会按顺序缓冲并传递每一个中间转换,即使在消费者运行之前该值发生了多次变化。
使其达到生产就绪状态
我们还需要一些功能才能使其达到生产就绪状态。最终版本需要以下功能:
- • 线程安全:
threading.Lock 保护值和队列列表。每个队列都与其事件循环配对,并且 setter 使用 loop.call_soon_threadsafe 而不是直接使用 put_nowait。 - • 原子注册:
wait_for 检查当前值并在同一锁获取过程中注册队列,从而避免了在注册和初始检查之间发生转换的竞争。 - • 完全泛型类型:
Generic[T] 端到端,因此谓词、队列和返回值都会进行类型检查。 - • 基于谓词的匹配:
wait_for、wait_for_not 和 wait_for_not_none 都通过共享的 _wait_for_condition(predicate) 核心进行路由。 - • 超时:每个等待方法都接受一个可选的
timeout,由 asyncio.wait_for 支持。 - • 条件集:
set_if 仅当当前值满足谓词时才原子地设置值,这对于仅应从特定状态发生的状态机转换非常有用。 - • 变化监视:
wait_for_change 等待任何转换,无论值如何,方便记录或对状态变化做出反应。 - • 回调 API :
on_change 和 on_value 用于同步消费者以及异步等待 API。 - • 弹性通知:设置器捕获
RuntimeError(闭环)并抑制回调异常,因此一个故障不会阻塞其他消费者。
完整的实现代码大约 300 行,其中大部分是基于同一核心的文档字符串和便捷方法。欢迎将其复制到您的代码库中!wait_for_not_none 特别有用,因为我们非常重视类型安全:
# Wait until the state is anything other than "disconnected"
await state.wait_for_not("disconnected")
# For Optional values: waits until non-None and narrows the type
ws_watcher = ValueWatcher[Connection | None](None)
ws: Connection = await ws_watcher.wait_for_not_none()
需要注意一点
setter 方法会根据相等性进行去重:如果新值 == 等于当前值,则不会触发通知。这对于枚举、字符串和整数类型都适用,但对可变对象进行原地修改并重新赋值给同一个引用则不会触发消费者(因为 obj == obj 显然等同于 True)。如果只使用不可变值,则不会出现此问题。
总结
核心洞察很简单:asyncio.Condition 询问消费者“当前状态是否符合您的预期?”,而它应该询问“当前状态是否曾经达到过您的预期?”。基于消费者的队列通过缓冲每一次状态转换,而不是仅仅通知最新的状态转换,从而实现了这一点。
Inngest 的 Python SDK[1]中全程使用 ValueWatcher 来协调 WebSocket 连接状态、工作进程生命周期和优雅关闭。如果您正在使用 asyncio 管理共享的可变状态,不妨试试它。
https://www.inngest.com/blog/no-lost-updates-python-asyncio
引用链接
[1] Inngest 的 Python SDK: https://github.com/inngest/inngest-py