🐍异步编程入门 — asyncio 的协程魔法
🕐 预计用时:2-3 小时 | 🎯 目标:掌握 async/await、事件循环、协程概念
📖 今日目录
- asyncio.gather — 并发执行多个协程
- asyncio.create_task — 创建任务
1. 什么是异步编程?
异步编程 = 一个线程,高效处理大量 I/O 操作。核心思想:在等待 I/O 时,去做其他事情。
# 同步(串行):排队等
# 买咖啡 → 等5分钟 → 拿到 → 买面包 → 等3分钟 → 拿到
# 总耗时: 8分钟
# 异步(并发):先去做别的
# 买咖啡 → 下单 → 去买面包 → 下单 → 回来取咖啡 → 取面包
# 总耗时: 5分钟(重叠等待时间)
2. 协程 — async / await
协程(Coroutine)是用 async def 定义的函数,调用时不会立即执行,而是返回一个协程对象。
# 普通函数
def sync_hello():
return "Hello"
# 异步函数(协程)
async def async_hello():
return "Hello"
# 调用普通函数 → 立即执行
result = sync_hello()
print(result) # Hello
# 调用协程 → 返回协程对象(不会立即执行!)
coroutine = async_hello()
print(coroutine) # <coroutine object async_hello at 0x...>
# 必须用 asyncio.run() 来执行协程
import asyncio
result = asyncio.run(async_hello())
print(result) # Hello
# 协程中用 await 等待其他协程
import asyncio
async def fetch_data():
print("⏳ 获取数据...")
await asyncio.sleep(2) # 模拟 I/O 等待
print("✅ 数据获取完成")
return {"name": "张三", "age": 25}
async def main():
print("🚀 程序开始")
data = await fetch_data() # 等待协程完成
print(f"📊 数据: {data}")
print("🏁 程序结束")
asyncio.run(main())
💡 async/await 规则:
• async def 定义协程函数
• 只能在协程内部用 await
• await 等待的是一个"可等待对象"(协程、Task、Future)
• asyncio.run() 是程序入口,一个程序只调用一次
3. asyncio.run() — 启动事件循环
import asyncio
async def say(what, delay):
await asyncio.sleep(delay)
print(f"[{delay}s] {what}")
async def main():
print("开始")
await say("你好", 1)
await say("世界", 2)
print("结束")
# 顺序执行(每个 await 等完才执行下一个)
asyncio.run(main())
# 总耗时: 3秒(1+2)
4. await — 等待异步结果
import asyncio
async def fetch_user(user_id):
"""模拟获取用户信息"""
await asyncio.sleep(1) # 模拟网络请求
return {"id": user_id, "name": f"用户{user_id}"}
async def fetch_order(order_id):
"""模拟获取订单信息"""
await asyncio.sleep(1.5)
return {"order_id": order_id, "amount": 99.9}
async def main():
# 顺序等待
user = await fetch_user(1)
print(f"用户: {user}")
order = await fetch_order("ORD001")
print(f"订单: {order}")
# 总耗时: 2.5秒(1 + 1.5)
asyncio.run(main())
5. asyncio.gather — 并发执行多个协程
gather 是并发执行的核心——同时运行多个协程,等它们全部完成。
import asyncio
import time
async def fetch(name, delay):
print(f"⏳ {name} 开始")
await asyncio.sleep(delay)
print(f"✅ {name} 完成")
return f"{name} 的数据"
async def main():
start = time.time()
# 并发执行!
results = await asyncio.gather(
fetch("用户", 1),
fetch("订单", 2),
fetch("商品", 1.5),
)
elapsed = time.time() - start
print(f"结果: {results}")
print(f"耗时: {elapsed:.1f}秒") # 2.0秒(不是 4.5 秒!)
asyncio.run(main())
# ⏳ 用户 开始
# ⏳ 订单 开始
# ⏳ 商品 开始
# ✅ 用户 完成
# ✅ 商品 完成
# ✅ 订单 完成
# 结果: ['用户 的数据', '订单 的数据', '商品 的数据']
# 耗时: 2.0秒
💡 gather 的威力:3 个各需 1-2 秒的任务,总共只花了 2 秒(取最慢的那个)。同步串行需要 4.5 秒。
6. asyncio.create_task — 创建任务
import asyncio
async def background_task(name, delay):
"""后台任务"""
for i in range(3):
await asyncio.sleep(delay)
print(f"🔄 {name}: 步骤 {i+1}")
return f"{name} 完成"
async def main():
# 创建任务(立即开始执行)
task1 = asyncio.create_task(background_task("任务A", 0.5))
task2 = asyncio.create_task(background_task("任务B", 0.8))
print("主程序继续做其他事...")
await asyncio.sleep(1)
print("主程序等够了,开始收集结果")
# 等待任务完成
result1 = await task1
result2 = await task2
print(f"结果: {result1}, {result2}")
asyncio.run(main())
create_task vs gather
import asyncio
async def work(n):
await asyncio.sleep(n)
return n
async def main():
# create_task: 逐个创建,灵活控制
t1 = asyncio.create_task(work(1))
t2 = asyncio.create_task(work(2))
# 可以在中间做其他事
print("等待期间做别的事...")
r1 = await t1 # 先等 t1
r2 = await t2 # 再等 t2
# gather: 一步到位,简洁
results = await asyncio.gather(work(1), work(2), work(3))
return results
7. 异步上下文管理器
import asyncio
class AsyncDB:
"""模拟异步数据库连接"""
async def __aenter__(self):
"""异步进入"""
print("🔌 连接数据库...")
await asyncio.sleep(0.5)
print("✅ 已连接")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""异步退出"""
print("🔌 断开数据库...")
await asyncio.sleep(0.2)
print("✅ 已断开")
async def query(self, sql):
await asyncio.sleep(0.3)
return f"查询结果: {sql}"
async def main():
async with AsyncDB() as db:
result = await db.query("SELECT * FROM users")
print(result)
asyncio.run(main())
# 🔌 连接数据库...
# ✅ 已连接
# 查询结果: SELECT * FROM users
# 🔌 断开数据库...
# ✅ 已断开
8. 异步迭代器
import asyncio
class AsyncCounter:
"""异步计数器"""
def __init__(self, start, end):
self.current = start
self.end = end
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.end:
raise StopAsyncIteration
await asyncio.sleep(0.1) # 模拟异步操作
value = self.current
self.current += 1
return value
async def main():
async for num in AsyncCounter(0, 5):
print(f"数字: {num}")
asyncio.run(main())
# 数字: 0
# 数字: 1
# 数字: 2
# 数字: 3
# 数字: 4
9. 实战:异步 HTTP 请求
import asyncio
import time
# 模拟异步 HTTP 请求(实际用 aiohttp 库)
async def fetch_url(url, delay=1):
"""模拟异步请求"""
print(f"🌐 请求: {url}")
await asyncio.sleep(delay) # 模拟网络延迟
status = 200
print(f"✅ 响应: {url} ({status})")
return {"url": url, "status": status, "time": delay}
async def main():
urls = [
"https://api.example.com/users",
"https://api.example.com/orders",
"https://api.example.com/products",
"https://api.example.com/stats",
"https://api.example.com/logs",
]
# 并发请求所有 URL
start = time.time()
results = await asyncio.gather(*[fetch_url(url) for url in urls])
elapsed = time.time() - start
print(f"\n📊 请求 {len(urls)} 个 URL")
print(f"⏱️ 总耗时: {elapsed:.1f}秒(串行需要 5 秒)")
for r in results:
print(f" {r['url']}: {r['status']}")
asyncio.run(main())
10. 实战:异步爬虫
import asyncio
import time
async def fetch_page(url, delay=0.5):
"""模拟抓取网页"""
await asyncio.sleep(delay)
return f"<html>{url} 的内容</html>"
async def parse_page(html, url):
"""模拟解析网页"""
await asyncio.sleep(0.1)
title = url.split("/")[-1]
return {"title": title, "length": len(html)}
async def save_result(result):
"""模拟保存结果"""
await asyncio.sleep(0.05)
print(f" 💾 保存: {result['title']} ({result['length']} 字符)")
async def crawl_single(url):
"""单个页面的抓取→解析→保存"""
html = await fetch_page(url)
result = await parse_page(html, url)
await save_result(result)
return result
async def main():
urls = [f"https://example.com/page{i}" for i in range(10)]
start = time.time()
results = await asyncio.gather(*[crawl_single(url) for url in urls])
elapsed = time.time() - start
print(f"\n📊 抓取 {len(results)} 个页面 | 耗时: {elapsed:.1f}秒")
asyncio.run(main())
11. 三种并发方式对比
💡 选型口诀:
• 少量 I/O → 多线程(简单直接)
• CPU 计算 → 多进程(绕过 GIL)
• 大量 I/O(上万并发)→ asyncio(最高效)
12. 今日小结
| | |
|---|
| | async def func(): |
| | result = await coroutine |
| | await asyncio.gather(*coros) |
| | asyncio.create_task(coro) |
| | asyncio.run(main()) |
| | async def __aenter__ |
| | async def __anext__ |
核心要点
- ✅
async def 定义协程,调用返回协程对象(不会立即执行) - ✅
asyncio.run() 是程序入口,只调用一次 - ✅
asyncio.gather() 并发执行多个协程,取最慢的耗时 - ✅ asyncio 适合大量 I/O 并发,CPU 密集型用多进程
- ✅ 实际项目中用
aiohttp 做异步 HTTP 请求
🎯 练习建议:
1. 用 asyncio.gather 并发请求 20 个 URL,统计成功率
2. 实现一个异步生产者-消费者模型(用 asyncio.Queue)
3. 对比同步和异步爬取同一组网页的速度差异
📚 Day38 完成!明天学习网络基础 — socket 编程