前两篇我们做了异步IO的入门,学会了async/await和事件循环。这篇是收尾,把异步编程的实战模式和进阶特性全部拉通。
01 协程链式调用
协程的一个关键特性是:可以嵌套等待。一个协程await另一个协程,形成一条链。
看这个实战场景——批量获取用户信息,再获取他们的帖子:
import asyncio
import random
import time
async def main():
user_ids = [1, 2, 3]
start = time.perf_counter()
await asyncio.gather(
*(get_user_with_posts(uid) for uid in user_ids)
)
end = time.perf_counter()
print(f"\n总共耗时: {end - start:.2f} 秒")
async def get_user_with_posts(user_id):
user = await fetch_user(user_id)
await fetch_posts(user)
async def fetch_user(user_id):
delay = random.uniform(0.5, 2.0)
print(f"获取用户: user_id={user_id}...")
await asyncio.sleep(delay)
user = {"id": user_id, "name": f"User{user_id}"}
print(f"用户获取完成: user_id={user_id} (耗时 {delay:.1f}s)")
return user
async def fetch_posts(user):
delay = random.uniform(0.5, 2.0)
print(f"获取帖子: {user['name']}...")
await asyncio.sleep(delay)
posts = [f"Post {i} by {user['name']}" for i in range(1, 3)]
print(f"帖子获取完成: {user['name']} 共{len(posts)}篇 (耗时 {delay:.1f}s)")
for post in posts:
print(f" - {post}")
if __name__ == "__main__":
random.seed(444)
asyncio.run(main())
输出大概是这样的:
获取用户: user_id=1...
获取用户: user_id=2...
获取用户: user_id=3...
用户获取完成: user_id=2 (耗时 0.5s)
获取帖子: User2...
用户获取完成: user_id=1 (耗时 1.0s)
获取帖子: User1...
用户获取完成: user_id=3 (耗时 1.2s)
获取帖子: User3...
帖子获取完成: User2 共2篇...
帖子获取完成: User1 共2篇...
帖子获取完成: User3 共2篇...
总共耗时: 2.68 秒
同步版本需要大约7.6秒,异步版本只用了2.68秒。更重要的是:用户1的获取还没完成时,用户2的帖子已经在拉了。 这就是异步并发的效果,链式调用也没影响并发度。
02 异步队列模式:生产者-消费者
链式调用适合步骤清晰的场景,但有时候生产者和消费者是松耦合的——你不知道什么时候会有新任务进来。
asyncio.Queue就是为这种场景设计的:
import asyncio
import random
import time
async def main():
queue = asyncio.Queue()
user_ids = [1, 2, 3]
start = time.perf_counter()
await asyncio.gather(
producer(queue, user_ids),
*(consumer(queue) for _ in user_ids),
)
end = time.perf_counter()
print(f"\n总共耗时: {end - start:.2f} 秒")
async def producer(queue, user_ids):
async def fetch_user(user_id):
delay = random.uniform(0.5, 2.0)
print(f"生产者: 获取用户 user_id={user_id}...")
await asyncio.sleep(delay)
user = {"id": user_id, "name": f"User{user_id}"}
print(f"生产者: 获取完成 user_id={user_id} (耗时 {delay:.1f}s)")
await queue.put(user)
await asyncio.gather(*(fetch_user(uid) for uid in user_ids))
for _ in range(len(user_ids)):
await queue.put(None) # 哨兵:告诉消费者该结束了
async def consumer(queue):
while True:
user = await queue.get()
if user is None:
break
delay = random.uniform(0.5, 2.0)
print(f"消费者: 正在获取 {user['name']} 的帖子...")
await asyncio.sleep(delay)
posts = [f"Post {i} by {user['name']}" for i in range(1, 3)]
print(f"消费者: {user['name']} 共{len(posts)}篇 (耗时 {delay:.1f}s)")
for post in posts:
print(f" - {post}")
if __name__ == "__main__":
random.seed(444)
asyncio.run(main())
模式解读:
- 生产者负责生成数据,完成后往队列里放一个
None(哨兵值,也叫毒丸) asyncio.Queue是线程安全的,生产者和消费者之间不需要额外加锁
这个模式在爬虫、消息处理、流式数据处理中非常常见。
03 async for:异步迭代器
有时候你需要逐条处理异步产生的数据——比如从数据库流式读取大量记录。
async for就是为这个场景设计的:
import asyncio
async def powers_of_two(stop=10):
exponent = 0
while exponent < stop:
yield 2 ** exponent
exponent += 1
await asyncio.sleep(0.2) # 模拟异步工作
async def main():
g = []
async for i in powers_of_two(5): # 异步迭代
g.append(i)
print(g)
# 异步推导式
f = [j async for j in powers_of_two(5) if not (j // 3 % 5)]
print(f)
asyncio.run(main())
注意:async for并不会让迭代并发。它只是让你能在每次迭代之间让出控制权给事件循环。真正要并发,还得靠gather()。
04 async with:异步上下文管理器
文件操作、网络连接、数据库会话——这些场景都有"打开-使用-关闭"的模式。在异步代码里,async with确保资源被正确释放。
看一个用aiohttp并发检查网站状态的例子:
import asyncio
import aiohttp
async def check(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
print(f"{url}: 状态码 -> {response.status}")
async def main():
websites = [
"https://www.python.org",
"https://github.com",
"https://stackoverflow.com",
]
await asyncio.gather(*(check(url) for url in websites))
asyncio.run(main())
async with aiohttp.ClientSession() → 异步创建HTTP会话
async with session.get(url) → 异步发送请求并等待响应
两个async with嵌套在一起,确保了无论是成功还是出错,连接都会被正确回收。
05 create_task vs gather vs as_completed
这三个函数是并发调度的三把利器:
asyncio.create_task() —— 把一个协程包装成Task,扔到后台运行:
task = asyncio.create_task(my_coro())
# 此时 my_coro 已经在后台跑了
result = await task # 等结果
有个坑要注意:如果你create_task()之后不await它,并且主协程先结束了,后台的Task会被取消。必须await你创建的Task才能等到它完成。
asyncio.gather() —— 把多个协程打包成一个Future,并发执行,按原顺序返回结果:
results = await asyncio.gather(task1, task2, task3)
结果的顺序跟传入的顺序一致,不管哪个先完成。
asyncio.as_completed() —— 协程完成的顺序就是返回的顺序,先跑完的先处理:
for task in asyncio.as_completed([task1, task2]):
result = await task
print(f"拿到结果: {result}")
适合需要"谁先完成先处理谁"的场景。
06 异步异常处理(Python 3.11+)
多个协程并发跑,可能各自抛不同类型的异常。从Python 3.11开始,可以用ExceptionGroup统一处理:
import asyncio
async def coro_a():
await asyncio.sleep(1)
raise ValueError("协程A出错了")
async def coro_b():
await asyncio.sleep(2)
raise TypeError("协程B出错了")
async def main():
results = await asyncio.gather(
coro_a(), coro_b(),
return_exceptions=True # 先把异常当结果返回
)
exceptions = [e for e in results if isinstance(e, Exception)]
if exceptions:
raise ExceptionGroup("并发任务异常", exceptions)
try:
asyncio.run(main())
except* ValueError as ve:
print(f"处理值错误: {ve.exceptions}")
except* TypeError as te:
print(f"处理类型错误: {te.exceptions}")
用except*可以分别捕获不同类型的异常,各自处理各自的。这个特性在复杂的异步服务里非常实用。
07 异步生态一览
有了asyncio这把锤子,你还得找钉子。以下是与异步生态深度集成的第三方库:
Web框架:
- Quart —— Flask的异步"亲戚",API几乎一样
HTTP客户端:
- aiohttp —— 老牌异步HTTP库,客户端+服务端二合一
数据库:
- Databases —— SQLAlchemy兼容的异步数据库层
- Tortoise ORM —— 轻量级异步ORM,熟悉Django ORM的话上手很快
其他:
- websockets —— WebSocket服务端/客户端
- pytest-asyncio —— 测试异步代码的标配
总结
三篇文章下来,你已经掌握了Python异步IO的完整知识图谱:
| 篇 |
核心内容 |
| 第1篇 |
概念、并发模型对比、什么时候用异步IO |
| 第2篇 |
协程、async/await、事件循环、asyncio REPL |
| 第3篇 |
链式调用、队列、async for/with、create_task/gather、异常处理、生态 |
说实话,asyncio并不简单,它的回调、future、协议、传输等底层概念还是挺深的。但对于日常开发来说,学会async/await + gather + 队列 这三个核心点,你已经能搞定90%的异步场景了。
异步IO的精髓不在一行行代码的理解,而在于一种思维方式的转变——别等,别堵,别阻塞。想通了这一点,你的Python编程能力就上了一个台阶。
关注「Bug与灵光」,追更Python全系列教程。如果有帮助,点个在看、分享给需要的朋友,下篇见。