当我们在应用中使用多个线程或多个进程时,必须警惕非原子操作引发的竞态条件。即使只是并发地递增一个整数,也可能导致微妙且难以复现的错误。然而当我们使用 asyncio 时,通常只有一个线程在运行(除非与多线程/多进程交互),这是否意味着我们就不需要担心竞态条件了?其实情况没那么简单。
虽然 asyncio 的单线程特性消除了某些在多线程或多进程应用中常见的并发错误,但这并不完全消除所有风险。尽管你在 asyncio 中不太需要频繁使用同步机制,但依然存在一些场景需要用到这些原语。asyncio 的同步原语可以帮助我们防范单线程并发模型下特有的问题。
同步原语并不仅限于防止并发错误,还有其他用途。例如,你可能在使用某个供应商的 API 时,合同规定只能同时发起少数几次请求;或者你担心自己的应用向某个接口发送太多请求造成过载。又或者你的工作流中有多个工人,需要在新数据可用时立即通知他们。
在本章中,我们将通过几个例子来了解如何在 asyncio 代码中引入竞态条件,并学习如何用锁等并发原语解决这些问题。我们还会学习如何用信号量限制并发度,控制对共享资源(如数据库连接池)的访问。最后,我们会探讨事件和条件变量——它们能帮助我们在特定事件发生时通知任务,或在时机合适时获取共享资源。
在之前关于多进程和多线程的章节中,我们曾提到:当多个进程或线程共享数据时,需要警惕竞态条件。这是因为某个线程或进程在读取数据时,可能被另一个线程或进程正在修改,从而导致状态不一致,造成数据损坏。
这种数据损坏部分源于某些操作是非原子性的——看似是一个操作,实际上包含多个步骤。第6章举过的例子就是整数自增:先读取当前值,再加一,然后重新赋值给变量。这为其他线程和进程提供了在不一致状态下读取数据的机会。
在单线程并发模型中,我们可以避免由非原子操作引发的竞态条件。asyncio 的单线程模型保证在同一时间只有一行代码在执行。这意味着,即使操作本身是非原子的,它也能完整运行完毕,而不会被其他协程读取到不一致的状态。
为了验证这一点,我们来重现一下第7章中多线程环境下实现共享计数器时出现的竞态条件。这次我们不用多线程,而是用多个协程。重复1000次,断言最终结果是正确的。
import asynciocounter: int = 0async def increment(): global counter await asyncio.sleep(0.01) counter = counter + 1async def main(): global counter for _ in range(1000): tasks = [asyncio.create_task(increment()) for _ in range(100)] await asyncio.gather(*tasks) print(f'Counter is {counter}') assert counter == 100 counter = 0asyncio.run(main())
在上面的代码中,我们创建了一个 increment 协程,它将全局计数器加一,并增加1毫秒延迟以模拟慢速操作。在主协程中,我们创建100个任务并发调用 increment,然后用 gather 同时运行它们。接着断言计数器应该等于期望的值(因为运行了100次递增任务),理想情况下应该是100。运行这段代码,你会发现结果总是100,即便整数递增是非原子的。如果换成多线程,这个断言迟早会失败。
这是否意味着单线程模型就完全杜绝了竞态条件?可惜并不是。我们虽然避免了单一非原子操作引发的错误,但仍面临“多个操作顺序出错”带来的问题。要看到这个问题的本质,我们来让 asyncio 看起来把整数递增操作拆成了非原子步骤。
实现方法是复制底层行为:读取全局值 → 递增临时变量 → 再写回。核心思路是:如果我们的协程在 await 暂停期间,其他代码修改了状态,那恢复后可能会进入不一致状态。
import asynciocounter: int = 0async def increment(): global counter temp_counter = counter temp_counter = temp_counter + 1 await asyncio.sleep(0.01) counter = temp_counterasync def main(): global counter for _ in range(1000): tasks = [asyncio.create_task(increment()) for _ in range(100)] await asyncio.gather(*tasks) print(f'Counter is {counter}') assert counter == 100 counter = 0asyncio.run(main())
现在,increment 协程先读取计数器到临时变量,再递增它。然后我们 await asyncio.sleep 来模拟慢速操作,暂时挂起协程,之后才将结果写回全局计数器。运行这段代码,你会发现它立刻崩溃并抛出断言错误,而且计数器永远只变成 1!每个协程都先读取计数器(初始值为0),存到临时变量,然后睡觉。由于是单线程,每次读取都是串行进行的,所以每个协程都把临时变量设成 0,递增成 1。一旦睡眠结束,所有协程都把计数器设成 1,意味着尽管有100个协程在执行,计数器始终只增长到 1。注意,如果你移除 await 表达式,一切会按正确顺序运行,因为在 await 点暂停时没有机会修改应用状态。
这确实是个极简且略显不现实的例子。为了更好理解何时会出现这类问题,我们来构造一个稍微复杂一点的竞态条件。假设你正在开发一个服务器,用于向已连接的用户发送消息。在这个服务器中,我们维护一个用户名到套接字的字典,用来向这些用户发送消息。当用户断开连接时,会触发回调,从字典中移除该用户并关闭其套接字。因为我们在断开时关闭了套接字,尝试发送任何消息都会报错。但如果用户在发送消息过程中断开连接,会发生什么?假设期望的行为是:只要用户在开始发送消息时还在线,就应该收到这条消息。
为了测试这一点,我们来实现一个模拟套接字。这个模拟套接字有一个 send 协程和一个 close 方法。send 协程会模拟通过慢速网络发送消息,并检查一个标志位:如果套接字已关闭,就抛出异常。
import asyncioclass MockSocket: def __init__(self): self.socket_closed = False async def send(self, msg: str): if self.socket_closed: raise Exception('Socket is closed!') print(f'Sending: {msg}') await asyncio.sleep(1) print(f'Sent: {msg}') def close(self): self.socket_closed = Trueuser_names_to_sockets = {'John': MockSocket(), 'Terry': MockSocket(), 'Graham': MockSocket(), 'Eric': MockSocket()}
接下来,我们创建一个字典,包含几位已连接的用户,并为每位用户创建模拟套接字。我们试着向每个用户发送消息,并手动触发其中一个用户的断开连接,看看会发生什么。
async def user_disconnect(username: str): print(f'{username} disconnected!') socket = user_names_to_sockets.pop(username) socket.close()async def message_all_users(): print('Creating message tasks') messages = [socket.send(f'Hello {user}') for user, socket in user_names_to_sockets.items()] await asyncio.gather(*messages)async def main(): await asyncio.gather(message_all_users(), user_disconnect('Eric'))asyncio.run(main())
Creating message tasksEric disconnected!Sending: Hello JohnSending: Hello TerrySending: Hello GrahamTraceback (most recent call last): File 'chapter_11/listing_11_3.py', line 45, in <module> asyncio.run(main()) File "asyncio/runners.py", line 44, in run return loop.run_until_complete(main) File "python3.9/asyncio/base_events.py", line 642, in run_until_complete return future.result() File 'chapter_11/listing_11_3.py', line 42, in main await asyncio.gather(message_all_users(), user_disconnect('Eric')) File 'chapter_11/listing_11_3.py', line 37, in message_all_users await asyncio.gather(*messages) File 'chapter_11/listing_11_3.py', line 11, in send raise Exception('Socket is closed!')Exception: Socket is closed!
在这个例子中,我们先创建了消息任务,然后 await,暂时挂起 message_all_users 协程。这就给了 user_disconnect('Eric') 机会执行,它会关闭埃里克的套接字并将他从 user_names_to_sockets 字典中移除。完成后,message_all_users 协程恢复执行,开始发送消息。但由于埃里克的套接字已被关闭,我们看到了异常,他也没收到我们想发的消息。另外,我们还修改了 user_names_to_sockets 字典。如果后续代码依赖于埃里克仍然在字典里,可能会引发异常或其他错误。
这就是在单线程并发模型中常见的陷阱。你在 await 处遇到暂停点,另一个协程跑起来修改了共享状态,当你恢复时,这种变化是意料之外的。与多线程并发错误的关键区别在于:在多线程应用中,竞态条件可能在任何修改可变状态的地方发生。而在单线程并发模型中,只有在 await 点修改可变状态时才会出现问题。现在我们理解了单线程模型下的并发错误类型,接下来看看如何用 asyncio 的锁来规避它们。