asyncio 的锁与 multiprocessing 及 multithreading 模块中的锁非常相似。我们获取一个锁,执行临界区内的工作,完成后再释放锁,让其他感兴趣方获得。主要区别在于:asyncio 锁是可等待对象,在阻塞时会暂停协程的执行。这意味着当协程被阻塞等待获取锁时,其他代码可以运行。此外,asyncio 锁还是异步上下文管理器,最佳实践是使用 async with 语法。
为了熟悉锁的工作原理,我们看一个简单例子:两个协程共享一个锁。我们获取锁,这样其他协程就无法在临界区内运行,直到锁被释放。
import asynciofrom asyncio import Lockfrom util import delayasync def a(lock: Lock): print('Coroutine a waiting to acquire the lock') async with lock: print('Coroutine a is in the critical section') await delay(2) print('Coroutine a released the lock')async def b(lock: Lock): print('Coroutine b waiting to acquire the lock') async with lock: print('Coroutine b is in the critical section') await delay(2) print('Coroutine b released the lock')async def main(): lock = Lock() await asyncio.gather(a(lock), b(lock))asyncio.run(main())
运行上述代码,你会看到协程 a 先获取了锁,协程 b 被迫等待,直到 a 释放锁。一旦 a 释放锁,b 就能进入临界区工作,得到以下输出:
Coroutine a waiting to acquire the lockCoroutine a is in the critical sectionsleeping for 2 second(s)Coroutine b waiting to acquire the lockfinished sleeping for 2 second(s)Coroutine a released the lockCoroutine b is in the critical sectionsleeping for 2 second(s)finished sleeping for 2 second(s)Coroutine b released the lock
这里我们用了 async with 语法。当然,也可以直接使用 acquire 与 release 方法:
await lock.acquire()try: print('In critical section')finally: lock.release()
但强烈建议尽可能使用 async with 语法。
需要注意的是,我们是在 main 协程中创建锁的。由于锁在各个协程之间是共享的,你可能会想把它定义为全局变量,以避免每次都传参。比如:
lock = Lock()# 协程定义async def main(): await asyncio.gather(a(), b())
但如果这么做,很快就会发现程序崩溃,报错提示“多个事件循环”。原因有点玄妙:asyncio 的大多数对象都提供一个可选的 loop 参数,允许指定特定的事件循环。如果未提供,asyncio 会尝试获取当前运行的事件循环,如果没有,则创建一个新的。上述情况中,创建 Lock 时会创建一个新事件循环(因为脚本刚开始运行时还没有循环),随后 asyncio.run(main()) 创建了第二个事件循环。当试图使用锁时,我们就混用了两个独立的事件循环,导致崩溃。
这个行为很诡异,以至于从 Python 3.10 开始,事件循环参数将被移除,这种混乱行为也会消失。但在那之前,使用全局 asyncio 变量时需要格外小心。
现在我们了解了基本原理,来看看如何用锁来修复第11.3节中的那个问题:在意外关闭套接字前发送消息。解决办法是在两个地方使用锁:一是用户断开连接时,二是发送消息时。这样,如果断开发生在消息发送期间,系统会等到所有消息都发送完毕后再关闭套接字。
import asynciofrom asyncio import Lockclass MockSocket: def __init__(self): self.socket_closed = False async def send(self, msg: str): if self.socket_closed: raise Exception('Socket is closed!') print(f'Sending: {msg}') await asyncio.sleep(1) print(f'Sent: {msg}') def close(self): self.socket_closed = Trueuser_names_to_sockets = {'John': MockSocket(), 'Terry': MockSocket(), 'Graham': MockSocket(), 'Eric': MockSocket()}async def user_disconnect(username: str, user_lock: Lock): print(f'{username} disconnected!') async with user_lock: print(f'Removing {username} from dictionary') socket = user_names_to_sockets.pop(username) socket.close()async def message_all_users(user_lock: Lock): print('Creating message tasks') async with user_lock: messages = [socket.send(f'Hello {user}') for user, socket in user_names_to_sockets.items()] await asyncio.gather(*messages)async def main(): user_lock = Lock() await asyncio.gather(message_all_users(user_lock), user_disconnect('Eric', user_lock))asyncio.run(main())
Creating message tasksEric disconnected!Sending: Hello JohnSending: Hello TerrySending: Hello GrahamSending: Hello EricSent: Hello JohnSent: Hello TerrySent: Hello GrahamSent: Hello EricRemoving Eric from dictionary
我们首先获取锁并创建消息任务。在此期间,埃里克断开了连接,disconnect 代码尝试获取锁。由于 message_all_users 仍在持有锁,我们必须等待它完成才能运行 disconnect 的代码。这确保了所有消息发送完毕后再关闭套接字,完美解决了原来的问题。
你很可能很少需要在 asyncio 代码中使用锁,因为它的单线程特性已经避开了很多并发问题。即便出现竞态条件,有时也能重构代码,避免在协程暂停期间修改状态(例如使用不可变对象)。当无法重构时,锁可以强制修改以期望的顺序进行。理解了如何用锁防止并发错误后,让我们看看如何用同步机制来实现在 asyncio 应用中新增的功能。