我们的应用所需的资源通常是有限的。我们可能只允许有限数量的数据库连接并发使用;可能不想过度占用有限的处理器;或者在使用某个 API 时,根据订阅套餐,仅允许少量并发请求。甚至自己内部的 API 也可能担心因高负载而被压垮,相当于对自身发动分布式拒绝服务攻击。
这时,信号量(Semaphore)就派上用场了。信号量的工作方式类似于锁:我们可以获取和释放它。主要区别在于,它可以被多次获取,直到达到我们指定的上限。内部,信号量维护一个计数器:每次获取信号量,计数器减一;每次释放,计数器加一。当计数器归零时,任何进一步的获取请求都会被阻塞,直到有人调用 release 增加计数。类比来说,锁其实就是一种极限为1的特殊信号量。
要亲眼见证信号量的作用,我们来搭建一个简单示例:我们只想让两个任务同时运行,但总共有四个任务要运行。为此,我们创建一个极限为2的 semaphore 并在协程中获取它。
import asynciofrom asyncio import Semaphoreasync def operation(semaphore: Semaphore): print('Waiting to acquire semaphore...') async with semaphore: print('Semaphore acquired!') await asyncio.sleep(2) print('Semaphore released!')async def main(): semaphore = Semaphore(2) await asyncio.gather(*[operation(semaphore) for _ in range(4)])asyncio.run(main())
在主协程中,我们创建了一个极限为2的信号量,表示最多可以成功获取两次,之后的获取请求将被阻塞。我们创建了四个并发调用 operation:这个协程用 async with 块获取信号量,用 sleep 模拟阻塞工作。运行后,你会看到如下输出:
Waiting to acquire semaphore...Semaphore acquired!Waiting to acquire semaphore...Semaphore acquired!Waiting to acquire semaphore...Waiting to acquire semaphore...Semaphore released!Semaphore released!Semaphore acquired!Semaphore acquired!Semaphore released!Semaphore released!
因为信号量只允许两次获取而不阻塞,前两个任务顺利获得了信号量,而后两个任务则在等待前两个任务释放信号量。一旦前两个任务的工作完成并释放信号量,后两个任务就可以获取信号量并开始执行它们的工作。
让我们把这个模式应用到一个真实场景。想象你是一家资金紧张的小公司,刚与第三方 REST API 供应商合作。他们的无限制查询合同价格昂贵,但提供一个每秒仅支持10次并发请求的计划,性价比更高。如果同时发起超过10次请求,他们的 API 会返回 429 (Too Many Requests) 状态码。你可以发送一批请求,遇到 429 就重试,但这效率低下,会给供应商的服务器额外压力,大概率会惹恼他们网站的可靠性工程师。更好的做法是创建一个极限为10的信号量,每次发出请求时都先获取它。这样做能确保你任何时候最多只有10个请求在飞行中。
我们用 aiohttp 库演示如何实现。我们要向一个示例接口发起1000次请求,但用信号量将并发请求数限制在10次以内。注意,aiohttp 本身也有连接数限制(默认最多100个),通过调整这个限制也可以达到类似效果。
import asynciofrom asyncio import Semaphorefrom aiohttp import ClientSessionasync def get_url(url: str, session: ClientSession, semaphore: Semaphore): print('Waiting to acquire semaphore...') async with semaphore: print('Acquired semaphore, requesting...') response = await session.get(url) print('Finished requesting') return response.statusasync def main(): semaphore = Semaphore(10) async with ClientSession() as session: tasks = [get_url('https://www.example.com', session, semaphore) for _ in range(1000)] await asyncio.gather(*tasks)asyncio.run(main())
虽然输出因外部延迟因素而不确定,但你应该能看到类似下面的结果:
Acquired semaphore, requesting...Acquired semaphore, requesting...Acquired semaphore, requesting...Acquired semaphore, requesting...Acquired semaphore, requesting...Finished requestingFinished requestingAcquired semaphore, requesting...Acquired semaphore, requesting...
每次请求完成后,信号量就会被释放,使得原本在等待信号量的任务可以开始运行。这意味着任何时候最多只有10个请求在运行,当一个请求完成,就能启动新的请求。
这解决了并发请求数过多的问题,但上面的代码有点“爆发性”(bursty),意味着可能瞬间突发10个请求,造成流量尖峰。如果你关心向目标 API 发送的请求量,不希望有太大波动,那么就需要结合“漏桶”或“令牌桶”这类流量整形算法。
信号量的一个特性是:允许释放次数超过获取次数。如果我们一直使用 async with 块,这种情况就不会发生,因为每次 acquire 都自动配对一个 release。但如果我们需要更细粒度地控制获取和释放(比如有些分支代码可以提前释放),就可能出现问题。举个例子,看下面的代码:一个普通协程用 async with 块获取并释放信号量,而另一个协程在此期间主动调用了 release。
import asynciofrom asyncio import Semaphoreasync def acquire(semaphore: Semaphore): print('Waiting to acquire') async with semaphore: print('Acquired') await asyncio.sleep(5) print('Releasing')async def release(semaphore: Semaphore): print('Releasing as a one off!') semaphore.release() print('Released as a one off!')async def main(): semaphore = Semaphore(2) print("Acquiring twice, releasing three times...") await asyncio.gather(acquire(semaphore), acquire(semaphore), release(semaphore)) print("Acquiring three times...") await asyncio.gather(acquire(semaphore), acquire(semaphore), acquire(semaphore))asyncio.run(main())
在上面的代码中,我们创建了一个拥有两个许可的信号量。然后运行了两次 acquire 和一次 release,总共调用了三次 release。第一次 gather 调用似乎正常运行,输出如下:
Acquiring twice, releasing three times...Waiting to acquireAcquiredWaiting to acquireAcquiredReleasing as a one off!Released as a one off!ReleasingReleasing
但第二次调用 acquire 三次却出了问题,一次性获取了三次锁!我们无意中增加了信号量的可用许可数量:
Acquiring three times...Waiting to acquireAcquiredWaiting to acquireAcquiredWaiting to acquireAcquiredReleasingReleasingReleasing
为了解决这类问题,asyncio 提供了 BoundedSemaphore。它的行为与之前的信号量几乎一样,关键区别在于:如果 release 被调用的次数多于 acquire,它会抛出 ValueError: BoundedSemaphore released too many times 异常。来看一个简单的例子:
import asynciofrom asyncio import BoundedSemaphoreasync def main(): semaphore = BoundedSemaphore(1) await semaphore.acquire() semaphore.release() semaphore.release()asyncio.run(main())
运行这段代码,第二次 release 会抛出 ValueError,提示你释放次数超了。将第11.8节中的代码改成使用 BoundedSemaphore,也会看到类似结果。如果你在手动调用 acquire 和 release,并且动态增加许可数是一种错误,那么使用 BoundedSemaphore 是明智的选择,它能及时抛出异常提醒你犯了错。
至此,我们已学会如何用信号量限制并发度,这在需要约束应用内并发度的场景下非常有用。asyncio 的同步原语不仅能控制并发,还能在特定事件发生时通知任务。接下来,我们看看如何用 Event 同步原语实现这一功能。