事件适用于简单的通知,但对于更复杂的用例呢?设想你需要获取一个共享资源,这需要一个锁,同时要等待一组复杂事实成立才能继续,或只唤醒一定数量的任务而非全部。条件变量在这种场景下非常有用。它们是目前为止最复杂的同步原语,因此你很可能很少用到它们。
条件(Condition)将锁和事件的功能融合于一体,相当于包装了两者的行为。我们首先获取条件的锁,从而获得对任何共享资源的独占访问权,安全地修改所需的状态。然后,我们使用 wait 或 wait_for 协程等待特定事件。这些协程会释放锁并阻塞,直到事件发生;一旦发生,它们会重新获取锁,给予我们独占访问权。
可能有点绕,我们来做一个示例。创建两个工作协程,每个都尝试获取条件锁,然后等待事件通知。几秒钟后,我们触发条件,这将唤醒两个工作协程,允许它们开始工作。
import asynciofrom asyncio import Conditionasync def do_work(condition: Condition): while True: print('Waiting for condition lock...') async with condition: print('Acquired lock, releasing and waiting for condition...') await condition.wait() print('Condition event fired, re-acquiring lock and doing work...') await asyncio.sleep(1) print('Work finished, lock released.')async def fire_event(condition: Condition): while True: await asyncio.sleep(5) print('About to notify, acquiring condition lock...') async with condition: print('Lock acquired, notifying all workers.') condition.notify_all() print('Notification finished, releasing lock.')async def main(): condition = Condition() asyncio.create_task(fire_event(condition)) await asyncio.gather(do_work(condition), do_work(condition))asyncio.run(main())
运行后,你会看到两个工作协程立即开始,并在等待 fire_event 协程调用 notify_all 时被阻塞。一旦 fire_event 调用 notify_all,工作协程就会醒来,然后执行工作。
条件还有一个额外的协程方法 wait_for。它不等待别人通知,而是接受一个谓词(一个无参数返回布尔值的函数),并持续阻塞直到该谓词返回 True。当某个共享资源的状态必须变为真时,这非常有用。
举个例子,假设我们正在创建一个类来封装数据库连接并执行查询。底层连接一次不能运行多个查询,且数据库连接可能在有人尝试查询时还未初始化。这种共享资源和需要等待的事件组合,正好适合使用 Condition。我们用一个模拟数据库连接类来演示。这个类运行查询,但只有在正确初始化连接后才执行。然后,我们用这个模拟连接类,尝试在连接初始化完成前并发运行两个查询。
import asynciofrom enum import Enumclass ConnectionState(Enum): WAIT_INIT = 0 INITIALIZING = 1 INITIALIZED = 2class Connection: def __init__(self): self._state = ConnectionState.WAIT_INIT self._condition = asyncio.Condition() async def initialize(self): await self._change_state(ConnectionState.INITIALIZING) print('initialize: Initializing connection...') await asyncio.sleep(3) # simulate connection startup time print('initialize: Finished initializing connection') await self._change_state(ConnectionState.INITIALIZED) async def execute(self, query: str): async with self._condition: print('execute: Waiting for connection to initialize') await self._condition.wait_for(self._is_initialized) print(f'execute: Running {query}!!!') await asyncio.sleep(3) # simulate a long query async def _change_state(self, state: ConnectionState): async with self._condition: print(f'change_state: State changing from {self._state} to {state}') self._state = state self._condition.notify_all() def _is_initialized(self): if self._state is not ConnectionState.INITIALIZED: print(f'_is_initialized: Connection not finished initializing, state is {self._state}') return False print(f'_is_initialized: Connection is initialized!') return Trueasync def main(): connection = Connection() query_one = asyncio.create_task(connection.execute('select * from table')) query_two = asyncio.create_task(connection.execute('select * from other_table')) asyncio.create_task(connection.initialize()) await query_one await query_twoasyncio.run(main())
在上面的代码中,我们创建了一个连接类,它包含一个条件对象,并将内部状态初始化为 WAIT_INIT,表示正在等待初始化。我们还创建了 Connection 类的一些方法。第一个是 initialize,它模拟创建数据库连接。此方法在首次调用时调用 _change_state 将状态设为 INITIALIZING,连接初始化后,再将状态设为 INITIALIZED。在 _change_state 方法中,我们设置内部状态,然后调用条件的 notify_all 方法,这会唤醒所有正在等待条件的任务。
在 execute 方法中,我们用 async with 块获取条件对象,然后调用 wait_for,传入一个检查状态是否为 INITIALIZED 的谓词。这将一直阻塞,直到数据库连接完全初始化,防止我们意外在连接存在之前发出查询。然后,在主协程中,我们创建连接类,创建两个任务来运行查询,接着创建一个任务来初始化连接。运行代码后,你会看到以下输出,表明查询正确等待初始化任务完成后才运行:
execute: Waiting for connection to initialize_is_initialized: Connection not finished initializing, state is ConnectionState.WAIT_INITexecute: Waiting for connection to initialize_is_initialized: Connection not finished initializing, state is ConnectionState.WAIT_INITchange_state: State changing from ConnectionState.WAIT_INIT to ConnectionState.INITIALIZINGinitialize: Initializing connection..._is_initialized: Connection not finished initializing, state is ConnectionState.INITIALIZING_is_initialized: Connection not finished initializing, state is ConnectionState.INITIALIZINGinitialize: Finished initializing connectionchange_state: State changing from ConnectionState.INITIALIZING to ConnectionState.INITIALIZED_is_initialized: Connection is initialized!execute: Running select * from table!!!_is_initialized: Connection is initialized!execute: Running select * from other_table!!!
条件变量适用于需要访问共享资源,并在某个状态为真时才开始工作的情形。这是一个相对复杂的应用场景,因此你很可能在 asyncio 代码中不会经常遇到或需要使用条件变量。
- 我们学习了单线程并发错误及其与多线程/多进程并发错误的区别。
- 我们知道如何使用
asyncio 锁来防止并发错误并同步协程。由于 asyncio 的单线程特性,这种情况较少见,但在共享状态可能在 await 期间改变时仍需使用。 - 我们学习了如何用信号量控制对有限资源的访问并限制并发度,这在流量整形中有用。
- 我们知道如何用事件来触发动作,例如初始化或唤醒工作协程。
- 我们知道如何用条件变量等待一个动作,并在该动作发生后获取对共享资源的访问。