一提到 Python 异步编程,很多人第一反应是:async/await 我会写,但 asyncio 里的 Event Loop、Task、gather 这些东西到底是怎么运转的?为什么有时候写完性能没提升,有时候还比同步更慢?这篇文章就来彻底讲清楚。
为什么需要异步?
先说一个现实:Python 的 GIL(全局解释器锁) 决定了同一时刻只有一个线程在执行 Python 字节码。这意味着多线程在 CPU 密集型场景下效果有限。
但 Web 服务的瓶颈往往不在 CPU,而在 I/O 等待——等数据库返回结果、等第三方 API 响应、等文件读写完成。在这段等待时间里,线程什么也没干,白白浪费了。
异步的思路是:等待期间去做别的事,等 I/O 完成了再回来继续。用一个比喻:
同步 = 你在奶茶店盯着店员做完你的单才走
异步 = 你拿了号码牌去旁边刷手机,叫号了再回来取
asyncio 的核心概念
事件循环(Event Loop)
事件循环是 asyncio 的心脏,本质上是一个 while True 的调度器:
# 简化版事件循环原理(伪代码)while True: ready_tasks = get_ready_tasks() # 找到可以执行的任务 for task in ready_tasks: task.run_until_next_await() # 执行到下一个 await 暂停点 wait_for_io_events() # 等待 I/O 事件(epoll/kqueue)
实际使用时,你不需要手动管理事件循环:
import asyncioasync def main(): print("Hello asyncio")# Python 3.7+ 的标准写法asyncio.run(main())
协程(Coroutine)
用 async def 定义的函数就是协程函数,调用它返回的是协程对象,不会立刻执行:
async def fetch_data(): print("开始请求") await asyncio.sleep(1) # 模拟 I/O 等待 print("请求完成") return {"data": "ok"}# 注意:这里只是创建了协程对象,还没执行coro = fetch_data()print(type(coro)) # <class 'coroutine'># 必须放入事件循环才会执行asyncio.run(fetch_data())
Task:让协程并发跑起来
协程默认是顺序执行的。要让多个协程真正并发,需要把它们包装成 Task:
import asyncioimport timeasync def fetch_user(user_id: int) -> dict: """模拟查询用户信息,耗时 1 秒""" await asyncio.sleep(1) return {"id": user_id, "name": f"用户{user_id}"}async def main_sequential(): """顺序执行:总耗时 ≈ 3 秒""" start = time.time() u1 = await fetch_user(1) # 等 1 秒 u2 = await fetch_user(2) # 再等 1 秒 u3 = await fetch_user(3) # 再等 1 秒 print(f"顺序执行耗时: {time.time() - start:.1f}s") # 约 3.0sasync def main_concurrent(): """并发执行:总耗时 ≈ 1 秒""" start = time.time() # gather 会同时启动所有协程 u1, u2, u3 = await asyncio.gather( fetch_user(1), fetch_user(2), fetch_user(3), ) print(f"并发执行耗时: {time.time() - start:.1f}s") # 约 1.0sasyncio.run(main_sequential())asyncio.run(main_concurrent())
这就是为什么 FastAPI 比 Flask 快的核心原因——同样处理 100 个需要等 DB 的请求,FastAPI 可以在等待期间处理其他请求。
实战:用 aiohttp 并发抓取数据
下面是一个真实场景:并发请求多个接口,汇总结果。
import asyncioimport aiohttpimport time# 模拟需要请求的 URL 列表URLS = [ "https://httpbin.org/delay/1", # 每个请求延迟 1 秒 "https://httpbin.org/delay/1", "https://httpbin.org/delay/1", "https://httpbin.org/delay/1", "https://httpbin.org/delay/1",]async def fetch_one(session: aiohttp.ClientSession, url: str, idx: int) -> dict: """请求单个 URL""" async with session.get(url) as response: data = await response.json() print(f" ✓ 请求 {idx+1} 完成,状态: {response.status}") return dataasync def fetch_all(urls: list[str]) -> list[dict]: """并发请求所有 URL""" # 使用单个 Session 复用连接(重要!) connector = aiohttp.TCPConnector(limit=10) # 最大并发连接数 async with aiohttp.ClientSession(connector=connector) as session: tasks = [ fetch_one(session, url, i) for i, url in enumerate(urls) ] # gather 并发执行,任意一个出错时其他仍继续 results = await asyncio.gather(*tasks, return_exceptions=True) return resultsasync def main(): print(f"开始并发请求 {len(URLS)} 个接口...") start = time.time() results = await fetch_all(URLS) elapsed = time.time() - start success = sum(1 for r in results if not isinstance(r, Exception)) print(f"\n完成!成功 {success}/{len(URLS)},耗时 {elapsed:.1f}s") # 5 个各需 1 秒的请求,并发只需约 1 秒asyncio.run(main())
最佳实践 & 常见坑
✅ 1. 不要在异步代码里调同步阻塞操作
import asyncioimport time# ❌ 错误:time.sleep 会阻塞整个事件循环async def bad_example(): time.sleep(2) # 这会冻结所有其他协程!# ✅ 正确:用 asyncio.sleep 让出控制权async def good_example(): await asyncio.sleep(2) # 等待期间其他协程可以运行
常见的需要替换成异步版本的库:
requeststime.sleepopen()pymysql / psycopg2 → aiomysql / asyncpg
✅ 2. 用 TaskGroup 管理一组任务(Python 3.11+)
# 新写法更优雅,任意一个 Task 出错会取消整个组async def main(): async with asyncio.TaskGroup() as tg: task1 = tg.create_task(fetch_user(1)) task2 = tg.create_task(fetch_user(2)) task3 = tg.create_task(fetch_user(3)) # 走到这里说明所有任务都成功完成 print(task1.result(), task2.result(), task3.result())
✅ 3. 给任务设置超时
async def main(): try: # 超过 5 秒则抛出 TimeoutError result = await asyncio.wait_for(fetch_data(), timeout=5.0) except asyncio.TimeoutError: print("请求超时,使用缓存数据") result = get_cached_data()
✅ 4. CPU 密集任务用 ProcessPoolExecutor
import asynciofrom concurrent.futures import ProcessPoolExecutordef cpu_heavy_task(n: int) -> int: """纯 CPU 计算,不适合 asyncio""" return sum(i * i for i in range(n))async def main(): loop = asyncio.get_event_loop() with ProcessPoolExecutor() as pool: # 把 CPU 密集任务放到子进程,不阻塞事件循环 result = await loop.run_in_executor(pool, cpu_heavy_task, 10_000_000) print(f"计算结果: {result}")
⚠️ 5. asyncio 不等于"更快"
异步只在 I/O 密集场景有优势。如果你的代码主要是 CPU 计算(矩阵运算、图像处理),用 asyncio 反而可能更慢(额外的调度开销)。
总结
asyncio 的本质是单线程内的协作式多任务:
- I/O 完成后,被挂起的协程继续从
await 处恢复
记住三个核心:
asyncio.run()asyncio.gather()- 不要在 async 函数里用同步阻塞库
掌握这三点,日常的异步开发场景基本都能覆盖。如果要深入,可以继续研究 asyncio.Queue(生产者消费者模式)和 asyncio.Semaphore(限制并发数),这两个在爬虫和批量任务处理中非常实用。