在前几期分享了线程(threading)、进程(multiprocessing)和concurrent.futures,今天我们来聊聊Python异步编程的终极武器——asyncio模块。
先来看一个简单的对比:
# 同步版本 - 逐个下载网页
import requests
import time
defsync_download(url):
response = requests.get(url)
return len(response.content)
urls = ['https://example.com'] * 10
start = time.time()
results = [sync_download(url) for url in urls]
print(f"同步耗时: {time.time() - start:.2f}秒")
异步版本 - 同时下载网页
import aiohttp
import asyncio
import time
asyncdefasync_download(session, url):
asyncwith session.get(url) as response:
content = await response.read()
return len(content)
asyncdefmain():
asyncwith aiohttp.ClientSession() as session:
urls = ['https://example.com'] * 10
tasks = [async_download(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
start = time.time()
results = asyncio.run(main())
print(f"异步耗时: {time.time() - start:.2f}秒")
在同样的10个网页下载任务中,异步版本通常比同步版本快5-10倍!这是因为asyncio能够在等待网络响应时切换执行其他任务,而不是像同步代码那样"傻等"。让我们深入了解这个强大的异步编程框架。
想象一下餐厅点餐的场景:
asyncio的核心是基于事件循环的单线程并发模型:
async def定义的函数,可以在特定点挂起和恢复关键特点:
import asyncio
import time
# 1. 定义协程
asyncdefsay_hello(name):
"""简单的协程函数"""
print(f"Hello, {name}!")
await asyncio.sleep(1) # 模拟I/O等待
returnf"Greeting for {name} completed"
# 2. 运行协程
asyncdefmain():
# 方法1: 直接await
result = await say_hello("Alice")
print(result)
# 方法2: 创建任务
task = asyncio.create_task(say_hello("Bob"))
result = await task
print(result)
# 方法3: 同时运行多个任务
task1 = asyncio.create_task(say_hello("Charlie"))
task2 = asyncio.create_task(say_hello("David"))
results = await asyncio.gather(task1, task2)
print(results)
# 3. 启动事件循环
asyncio.run(main())
import asyncio
import time
asyncdefsimple_coroutine(name, delay):
"""简单的协程示例"""
print(f"[{name}] 开始执行,等待 {delay} 秒")
await asyncio.sleep(delay) # 模拟I/O操作
print(f"[{name}] 执行完成")
returnf"{name}的结果"
asyncdefbasic_example():
print("1. 基本协程示例:")
# 顺序执行
print("顺序执行:")
start = time.time()
result1 = await simple_coroutine("任务A", 1)
result2 = await simple_coroutine("任务B", 1)
print(f"结果: {result1}, {result2}")
print(f"耗时: {time.time() - start:.2f}秒")
# 并发执行
print("\n并发执行:")
start = time.time()
task1 = asyncio.create_task(simple_coroutine("任务A", 1))
task2 = asyncio.create_task(simple_coroutine("任务B", 1))
results = await asyncio.gather(task1, task2)
print(f"结果: {results}")
print(f"耗时: {time.time() - start:.2f}秒")
# 运行基本示例
asyncio.run(basic_example())
asyncdefcontrol_flow_example():
"""协程控制流示例"""
asyncdefprocess_item(item):
"""处理单个项目"""
print(f"开始处理: {item}")
await asyncio.sleep(0.1) # 模拟处理时间
result = item * 2
print(f"处理完成: {item} -> {result}")
return result
# 列表推导式
items = [1, 2, 3, 4, 5]
tasks = [process_item(item) for item in items]
results = await asyncio.gather(*tasks)
print(f"所有结果: {results}")
# 异步for循环
print("\n异步迭代器:")
classAsyncCounter:
"""异步迭代器"""
def__init__(self, stop):
self.current = 0
self.stop = stop
def__aiter__(self):
return self
asyncdef__anext__(self):
if self.current < self.stop:
await asyncio.sleep(0.1)
value = self.current
self.current += 1
return value
else:
raise StopAsyncIteration
asyncfor num in AsyncCounter(3):
print(f"异步迭代: {num}")
# 运行控制流示例
asyncio.run(control_flow_example())
asyncdeftimeout_and_cancel_example():
"""超时和取消示例"""
asyncdeflong_running_task(name, duration):
"""长时间运行的任务"""
try:
print(f"[{name}] 开始执行")
await asyncio.sleep(duration)
print(f"[{name}] 执行完成")
returnf"{name}完成"
except asyncio.CancelledError:
print(f"[{name}] 被取消")
raise
# 超时控制
print("超时控制:")
try:
# 设置2秒超时
await asyncio.wait_for(long_running_task("任务A", 5), timeout=2.0)
except asyncio.TimeoutError:
print("任务超时")
# 任务取消
print("\n任务取消:")
task = asyncio.create_task(long_running_task("任务B", 5))
await asyncio.sleep(1) # 让任务运行一会儿
task.cancel() # 取消任务
try:
await task
except asyncio.CancelledError:
print("任务已被取消")
# 运行超时和取消示例
asyncio.run(timeout_and_cancel_example())
asyncdefsemaphore_example():
"""信号量控制并发数"""
# 创建信号量,限制同时运行3个协程
semaphore = asyncio.Semaphore(3)
asyncdeflimited_task(name, duration):
"""受限制的任务"""
asyncwith semaphore: # 获取信号量
print(f"[{name}] 开始执行 (并发限制)")
await asyncio.sleep(duration)
print(f"[{name}] 执行完成")
returnf"{name}结果"
# 创建10个任务
tasks = [limited_task(f"任务{i}", 1) for i in range(10)]
print("开始执行10个任务(最多同时3个)...")
start = time.time()
results = await asyncio.gather(*tasks)
print(f"所有任务完成: {len(results)} 个结果")
print(f"总耗时: {time.time() - start:.2f}秒")
# 运行信号量示例
asyncio.run(semaphore_example())
这是asyncio最常见的应用场景,注意因为requests库是同步框架不能在协程中使用:
import asyncio
import aiohttp
import time
asyncdeffetch_url(session, url):
"""异步获取URL内容"""
try:
asyncwith session.get(url, timeout=10) as response:
content = await response.read()
return {
'url': url,
'status': response.status,
'content_length': len(content),
'success': True
}
except Exception as e:
return {'url': url, 'error': str(e), 'success': False}
asyncdefconcurrent_fetcher(urls, max_concurrent=3):
"""并发获取多个URL"""
connector = aiohttp.TCPConnector(limit=max_concurrent)
asyncwith aiohttp.ClientSession(connector=connector) as session:
tasks = []
for url in urls:
task = asyncio.create_task(fetch_url(session, url))
tasks.append(task)
results = []
for future in asyncio.as_completed(tasks):
result = await future
results.append(result)
if result['success']:
print(f"✓ {result['url']} ({result['status']})")
else:
print(f"✗ {result['url']}: {result['error']}")
return results
# 测试URL
test_urls = [
'https://httpbin.org/get',
'https://httpbin.org/html',
'https://httpbin.org/json',
'https://example.com',
]
asyncdefnetwork_example():
results = await concurrent_fetcher(test_urls, max_concurrent=2)
successful = sum(1for r in results if r['success'])
print(f"成功: {successful}/{len(results)}")
try:
asyncio.run(network_example())
except ImportError:
print("需要安装 aiohttp: pip install aiohttp")
虽然文件I/O通常由操作系统缓存,但在某些场景下异步文件操作仍有价值: 1.异步文件操作
import asyncio
import aiofiles
import os
import time
from pathlib import Path
from typing import List
print("=== 异步文件操作 ===")
# 注意:需要安装 aiofiles: pip install aiofiles
asyncdefasync_file_operations():
"""异步文件操作示例"""
# 1. 异步写文件
print("1. 异步写文件:")
asyncdefwrite_file_async(filename: str, content: str):
"""异步写入文件"""
asyncwith aiofiles.open(filename, 'w', encoding='utf-8') as f:
await f.write(content)
print(f"已写入: {filename}")
# 创建测试目录
test_dir = Path("async_test")
test_dir.mkdir(exist_ok=True)
# 同时写入多个文件
tasks = []
for i in range(5):
filename = test_dir / f"file_{i}.txt"
content = f"这是文件 {i} 的内容\n" * 100
task = asyncio.create_task(write_file_async(filename, content))
tasks.append(task)
await asyncio.gather(*tasks)
# 2. 异步读文件
print("\n2. 异步读文件:")
asyncdefread_file_async(filename: str) -> str:
"""异步读取文件"""
asyncwith aiofiles.open(filename, 'r', encoding='utf-8') as f:
content = await f.read()
return content
# 同时读取多个文件
read_tasks = []
for i in range(5):
filename = test_dir / f"file_{i}.txt"
task = asyncio.create_task(read_file_async(filename))
read_tasks.append(task)
contents = await asyncio.gather(*read_tasks)
print(f"读取了 {len(contents)} 个文件")
print(f"总字符数: {sum(len(c) for c in contents)}")
# 3. 异步追加文件
print("\n3. 异步追加文件:")
asyncdefappend_to_file_async(filename: str, content: str):
"""异步追加到文件"""
asyncwith aiofiles.open(filename, 'a', encoding='utf-8') as f:
await f.write(content)
append_file = test_dir / "appended.txt"
# 先清空文件
asyncwith aiofiles.open(append_file, 'w', encoding='utf-8') as f:
await f.write("开始:\n")
# 并发追加
append_tasks = []
for i in range(10):
task = asyncio.create_task(
append_to_file_async(append_file, f"行 {i}\n")
)
append_tasks.append(task)
await asyncio.gather(*append_tasks)
print(f"已追加到: {append_file}")
# 4. 异步文件信息统计
print("\n4. 异步文件信息统计:")
asyncdefget_file_info(filename: str) -> dict:
"""获取文件信息"""
stat = os.stat(filename)
return {
'filename': filename,
'size': stat.st_size,
'modified': stat.st_mtime
}
# 获取所有文件信息
all_files = list(test_dir.iterdir())
info_tasks = [get_file_info(str(f)) for f in all_files]
file_infos = await asyncio.gather(*info_tasks)
print("文件统计:")
total_size = 0
for info in file_infos:
print(f" {info['filename']}: {info['size']} 字节")
total_size += info['size']
print(f"总大小: {total_size} 字节")
# 清理
import shutil
shutil.rmtree(test_dir)
print(f"\n已清理测试目录: {test_dir}")
# 运行异步文件操作示例
try:
asyncio.run(async_file_operations())
except ImportError:
print("需要安装 aiofiles: pip install aiofiles")
print("跳过异步文件操作示例...")
asyncdefasync_directory_walk():
"""异步目录遍历"""
print("\n异步目录遍历:")
import aiopath
asyncdefscan_directory(path: Path):
"""异步扫描目录"""
files = []
dirs = []
# 使用异步迭代器遍历目录
asyncfor item in aiopath.AsyncPath(path).iterdir():
ifawait item.is_file():
files.append(item)
elifawait item.is_dir():
dirs.append(item)
return files, dirs
# 创建测试目录结构
test_root = Path("async_walk_test")
test_root.mkdir(exist_ok=True)
# 创建一些文件和子目录
for i in range(3):
file_path = test_root / f"file_{i}.txt"
file_path.write_text(f"文件 {i} 内容")
dir_path = test_root / f"dir_{i}"
dir_path.mkdir(exist_ok=True)
sub_file = dir_path / f"sub_file_{i}.txt"
sub_file.write_text(f"子文件 {i} 内容")
# 扫描目录
files, dirs = await scan_directory(test_root)
print(f"目录: {test_root}")
print(f"文件数: {len(files)}")
print(f"子目录数: {len(dirs)}")
# 清理
shutil.rmtree(test_root)
print(f"已清理: {test_root}")
# 运行异步目录遍历示例
try:
asyncio.run(async_directory_walk())
except ImportError:
print("需要安装 aiopath: pip install aiopath")
print("跳过异步目录遍历示例...")
asyncio不适合CPU密集型任务,会阻塞事件循环Q: 什么时候应该使用asyncio?
A: 高并发的I/O密集型应用,如Web服务器、爬虫、实时通信等。
Q: asyncio和线程哪个更好?
A: asyncio适合I/O密集型高并发,线程适合I/O操作较少或需要利用多核的场景。
Q: 如何在asyncio中运行同步代码?
A: 使用asyncio.to_thread()或loop.run_in_executor()在单独的线程中运行。
Q: 如何调试asyncio程序?
A: 使用asyncio.debug()模式,或使用专门的调试工具。
Q: asyncio是线程安全的吗?
A: 不是,asyncio对象不是线程安全的,应该在同一个线程中使用。
async/await而非回调通过本文的学习,你应该已经掌握了:
async/await、asyncio.run()、asyncio.create_task()核心要点回顾:
asyncio的核心调度器async def定义的函数asyncio.create_task()创建任务asyncio.gather()并发执行async with管理异步资源性能优势:
下期预告:
掌握了高性能的异步编程,下期我们将学习专业的日志记录系统logging ,让你的程序在生产环境中更加可靠!
往期推荐:
统一并发接口 —— concurrent.futures模块
关注我,免费分享Python学习资料!探索的乐趣,在于分享和发现。别忘了在评论区留下你的想法和问题,我们下期见!