1. 创始时间与作者
2. 官方资源
Python 官方文档:https://docs.python.org/3/library/asyncio.html
源码地址:https://github.com/python/cpython/tree/main/Lib/asyncio
PEP 3156:https://www.python.org/dev/peps/pep-3156/
教程指南:https://docs.python.org/3/library/asyncio-dev.html
3. 核心功能
4. 应用场景
1. 基础异步编程
import asyncioimport timeasyncdef say_after(delay, message):"""异步函数示例"""await asyncio.sleep(delay)print(message)return f"{message} 完成"async def basic_async_demo():"""基础异步演示"""print(f"开始时间: {time.strftime('%X')}")# 顺序执行result1 = await say_after(2, "Hello")result2 = await say_after(1, "World")print(f"顺序执行结果: {result1}, {result2}")print(f"结束时间: {time.strftime('%X')}")# 并发执行print("\n--- 并发执行 ---")print(f"开始时间: {time.strftime('%X')}")task1 = asyncio.create_task(say_after(2, "Hello"))task2 = asyncio.create_task(say_after(1, "World"))results = await asyncio.gather(task1, task2)print(f"并发执行结果: {results}")print(f"结束时间: {time.strftime('%X')}")# 运行示例asyncio.run(basic_async_demo())2. 异步网络编程
import asyncioimport aiohttpimport timeclass AsyncWebClient:"""异步Web客户端"""def __init__(self):self.session = Noneasync def fetch_url(self, url, session):"""获取URL内容"""try:async with session.get(url, timeout=10) as response:content = await response.read()return {'url': url,'status': response.status,'size': len(content),'success': True }except Exception as e:return {'url': url,'status': 0,'size': 0,'success': False,'error': str(e) }async def download_multiple_urls(self, urls):"""并发下载多个URL"""async with aiohttp.ClientSession() as session:tasks = []for url in urls:task = asyncio.create_task(self.fetch_url(url, session))tasks.append(task)results = await asyncio.gather(*tasks)return resultsasync def controlled_concurrency(self, urls, max_concurrent=3):"""控制并发度的下载"""semaphore = asyncio.Semaphore(max_concurrent)async def bounded_fetch(url):async with semaphore:return await self.fetch_url(url, self.session)async with aiohttp.ClientSession() as session:self.session = sessiontasks = [bounded_fetch(url) for url in urls]results = await asyncio.gather(*tasks)return resultsasync def network_demo():"""网络编程演示"""client = AsyncWebClient()urls = ['https://httpbin.org/delay/1','https://httpbin.org/delay/2','https://httpbin.org/delay/1','https://httpbin.org/json','https://httpbin.org/html' ]print("开始并发下载...")start_time = time.time()# 方法1: 完全并发results1 = await client.download_multiple_urls(urls)# 方法2: 控制并发度results2 = await client.controlled_concurrency(urls, max_concurrent=2)end_time = time.time()print(f"\n下载完成,耗时: {end_time - start_time:.2f}秒")successful_downloads = sum(1 for r in results1 if r['success'])print(f"成功下载: {successful_downloads}/{len(urls)}")for result in results1:status = "成功" if result['success'] else "失败"print(f"URL: {result['url']} - {status} - 大小: {result['size']}")# 运行示例asyncio.run(network_demo())3. 异步Web服务器
import asynciofrom aiohttp import webimport jsonimport timeclass AsyncWebServer:"""异步Web服务器"""def __init__(self):self.app = web.Application()self.setup_routes()self.request_count = 0def setup_routes(self):"""设置路由"""self.app.router.add_get('/', self.handle_root)self.app.router.add_get('/status', self.handle_status)self.app.router.add_get('/api/data', self.handle_api_data)self.app.router.add_post('/api/echo', self.handle_api_echo)async def handle_root(self, request):"""处理根路径请求"""return web.Response(text="欢迎使用异步Web服务器!",content_type='text/html' )async def handle_status(self, request):"""处理状态查询"""self.request_count += 1status_info = {'server': 'Async Web Server','timestamp': time.time(),'request_count': self.request_count,'active_tasks': len(asyncio.all_tasks()) }return web.json_response(status_info)async def handle_api_data(self, request):"""处理API数据请求"""# 模拟数据库查询await asyncio.sleep(0.1)data = {'items': [ {'id': 1, 'name': 'Item 1', 'value': 100}, {'id': 2, 'name': 'Item 2', 'value': 200}, {'id': 3, 'name': 'Item 3', 'value': 300} ],'total': 3,'timestamp': time.time() }return web.json_response(data)async def handle_api_echo(self, request):"""处理回显请求"""try:data = await request.json()response_data = {'echo': data,'received_at': time.time(),'method': request.method }return web.json_response(response_data)except Exception as e:return web.json_response( {'error': str(e)}, status=400 )async def start_server(self, host='localhost', port=8080):"""启动服务器"""runner = web.AppRunner(self.app)await runner.setup()site = web.TCPSite(runner, host, port)await site.start()print(f"服务器启动在 http://{host}:{port}")print("可用端点:")print(" GET / - 欢迎页面")print(" GET /status - 服务器状态")print(" GET /api/data - 获取数据")print(" POST /api/echo - 回显数据")return runnerasync def webserver_demo():"""Web服务器演示"""server = AsyncWebServer()runner = await server.start_server()try:# 保持服务器运行await asyncio.Future() # 永远等待except KeyboardInterrupt:print("\n正在关闭服务器...")finally:await runner.cleanup()# 运行服务器(取消注释以运行)# asyncio.run(webserver_demo())4. 高级异步模式
import asyncioimport randomimport timefrom dataclasses import dataclassfrom typing import List, Optional@dataclassclass TaskResult:"""任务结果"""task_id: intstatus: strresult: Optional[str] = Noneerror: Optional[str] = Noneexecution_time: float = 0.0class AdvancedAsyncPatterns:"""高级异步模式"""def __init__(self, max_concurrent_tasks=5):self.max_concurrent_tasks = max_concurrent_tasksself.completed_tasks = 0self.failed_tasks = 0async def simulated_task(self, task_id: int, duration: float) ->str:"""模拟异步任务"""# 10% 的概率失败if random.random() <0.1:raise Exception(f"任务 {task_id} 随机失败")# 模拟工作await asyncio.sleep(duration)return f"任务 {task_id} 完成,耗时 {duration:.2f}秒"async def run_with_timeout(self, task_id: int, timeout: float = 3.0) ->TaskResult:"""带超时的任务执行"""start_time = time.time()try:duration = random.uniform(1.0, 4.0)result = await asyncio.wait_for(self.simulated_task(task_id, duration), timeout=timeout )execution_time = time.time() -start_timereturn TaskResult(task_id=task_id,status='success',result=result,execution_time=execution_time )except asyncio.TimeoutError:execution_time = time.time() -start_timereturn TaskResult(task_id=task_id,status='timeout',error=f"任务超时 ({timeout}秒)",execution_time=execution_time )except Exception as e:execution_time = time.time() -start_timereturn TaskResult(task_id=task_id,status='failed',error=str(e),execution_time=execution_time )async def producer_consumer_pattern(self, num_tasks: int):"""生产者-消费者模式"""queue = asyncio.Queue(maxsize=10)results = []async def producer():"""生产者协程"""for i in range(num_tasks):await queue.put(i)print(f"生产者: 添加任务 {i} 到队列")await asyncio.sleep(0.1) # 控制生产速度# 发送结束信号for _ in range(self.max_concurrent_tasks):await queue.put(None)async def consumer(consumer_id: int):"""消费者协程"""while True:task_id = await queue.get()if task_id is None:queue.task_done()breakprint(f"消费者 {consumer_id}: 处理任务 {task_id}")result = await self.run_with_timeout(task_id)results.append(result)queue.task_done()await asyncio.sleep(0.05) # 模拟处理时间# 启动生产者和消费者producer_task = asyncio.create_task(producer())consumer_tasks = [asyncio.create_task(consumer(i))for i in range(self.max_concurrent_tasks) ]# 等待所有任务完成await producer_taskawait queue.join()# 取消消费者for task in consumer_tasks:task.cancel()await asyncio.gather(*consumer_tasks, return_exceptions=True)return resultsasync def task_with_retry(self, task_id: int, max_retries: int = 3) ->TaskResult:"""带重试机制的任务"""for attempt in range(max_retries):result = await self.run_with_timeout(task_id)if result.status == 'success':return resultprint(f"任务 {task_id} 第 {attempt + 1} 次尝试失败: {result.error}")await asyncio.sleep(1) # 重试前等待return TaskResult(task_id=task_id,status='failed',error=f"经过 {max_retries} 次重试后仍然失败" )async def run_demo(self):"""运行演示"""print("=== 生产者-消费者模式 ===")start_time = time.time()results = await self.producer_consumer_pattern(20)end_time = time.time()# 分析结果success_count = sum(1 for r in results if r.status == 'success')timeout_count = sum(1 for r in results if r.status == 'timeout')failed_count = sum(1 for r in results if r.status == 'failed')print(f"\n任务执行统计:")print(f"总任务数: {len(results)}")print(f"成功: {success_count}")print(f"超时: {timeout_count}")print(f"失败: {failed_count}")print(f"总耗时: {end_time - start_time:.2f}秒")# 带重试的任务print("\n=== 带重试机制的任务 ===")retry_results = await asyncio.gather(*[self.task_with_retry(i) for i in range(5)] )for result in retry_results:status_icon = "✅" if result.status == 'success' else "❌"print(f"{status_icon} 任务 {result.task_id}: {result.status}")async def advanced_patterns_demo():"""高级模式演示"""patterns = AdvancedAsyncPatterns(max_concurrent_tasks=3)await patterns.run_demo()# 运行示例asyncio.run(advanced_patterns_demo())
5. 底层逻辑与技术原理
核心架构
关键技术
事件循环 (Event Loop):
协程调度和IO事件处理的核心
使用选择器(selector)监听文件描述符
管理定时器和回调
协程 (Coroutine):
async def example():# 挂起点result = await some_async_operation()return result
Future 和 Task:
async/await 语法:
async def:定义异步函数
await:挂起协程,等待异步操作完成
IO多路复用:
在Unix系统使用epoll/kqueue
在Windows系统使用IOCP
统一抽象为selector
事件循环工作流程
# 简化的事件循环伪代码class EventLoop:def run_until_complete(self, future):while not future.done():# 1. 执行就绪的回调self._run_ready_callbacks()# 2. 轮询IO事件events = self._selector.select(timeout)# 3. 处理IO事件for fd, event in events:self._handle_io_event(fd, event)# 4. 执行定时器self._run_timers()# 5. 调度协程self._schedule_coroutines()
6. 安装与配置
基础安装
# asyncio 是 Python 标准库的一部分,无需额外安装# 验证安装python -c"import asyncio; print(asyncio.__doc__)"
环境要求
| 组件 | 最低要求 | 推荐配置 |
|---|
| Python | 3.4+ | 3.7+ |
| 操作系统 | Windows 7+/Linux 2.6+/macOS 10.9+ | 同左 |
| 内存 | 512MB | 4GB+ |
可选依赖安装
# 常用异步库pip install aiohttp # 异步HTTP客户端/服务器pip install aiomysql # 异步MySQL客户端pip install aiopg # 异步PostgreSQL客户端pip install aioredis # 异步Redis客户端pip install websockets # 异步WebSocket
平台特定优化
import asyncioimport platformdef optimize_event_loop():"""优化事件循环配置"""system = platform.system()if system == 'Windows':# Windows 使用 ProactorEventLooploop = asyncio.ProactorEventLoop()asyncio.set_event_loop(loop)print("使用 ProactorEventLoop (Windows)")elif system == 'Linux':# Linux 使用 epolltry:import uvloopasyncio.set_event_loop_policy(uvloop.EventLoopPolicy())print("使用 uvloop (Linux)")except ImportError:loop = asyncio.SelectorEventLoop()asyncio.set_event_loop(loop)print("使用 SelectorEventLoop (Linux)")else:# macOS 和其他系统loop = asyncio.SelectorEventLoop()asyncio.set_event_loop(loop)print("使用 SelectorEventLoop (macOS/其他)")# 应用优化optimize_event_loop()
7. 性能指标
| 操作类型 | 执行时间 | 内存开销 | 并发能力 |
|---|
| 协程创建 | 0.1-1μs | 1-2KB | 数万协程 |
| 上下文切换 | 0.01-0.1μs | 可忽略 | 极高 |
| 网络IO | 0.1-10ms | 可忽略 | 数千连接 |
| 文件IO | 1-100ms | 可忽略 | 受限于系统 |
8. 高级功能使用
1. 自定义事件循环策略
import asyncioimport timefrom typing import List, Dictclass MonitoringEventLoopPolicy(asyncio.DefaultEventLoopPolicy):"""监控事件循环策略"""def __init__(self):super().__init__()self.loop_creation_time = {}self.task_stats = {}def new_event_loop(self):"""创建新的事件循环"""loop = super().new_event_loop()loop_id = id(loop)self.loop_creation_time[loop_id] = time.time()self.task_stats[loop_id] = {'tasks_created': 0,'tasks_completed': 0,'exceptions_caught': 0 }# 添加监控self._add_monitoring_to_loop(loop, loop_id)return loopdef _add_monitoring_to_loop(self, loop, loop_id):"""向事件循环添加监控"""original_create_task = loop.create_taskdef monitored_create_task(coro, name=None):task = original_create_task(coro, name=name)self.task_stats[loop_id]['tasks_created'] += 1# 添加完成回调def task_done_callback(future):self.task_stats[loop_id]['tasks_completed'] += 1if future.exception():self.task_stats[loop_id]['exceptions_caught'] += 1task.add_done_callback(task_done_callback)return taskloop.create_task = monitored_create_taskdef get_stats(self) ->Dict:"""获取统计信息"""return {'total_loops': len(self.loop_creation_time),'task_stats': self.task_stats,'loop_uptime': {loop_id: time.time() -create_timefor loop_id, create_time in self.loop_creation_time.items() } }async def demo_monitoring():"""监控演示"""# 设置自定义事件循环策略policy = MonitoringEventLoopPolicy()asyncio.set_event_loop_policy(policy)async def sample_task(task_id):await asyncio.sleep(1)if task_id%5 == 0:raise Exception(f"任务 {task_id} 故意失败")return f"任务 {task_id} 完成"# 创建多个任务tasks = [sample_task(i) for i in range(10)]results = await asyncio.gather(*tasks, return_exceptions=True)# 显示统计信息stats = policy.get_stats()print("事件循环统计:")print(f"总事件循环数: {stats['total_loops']}")for loop_id, loop_stats in stats['task_stats'].items():print(f"\n事件循环 {loop_id}:")print(f" 创建的任务数: {loop_stats['tasks_created']}")print(f" 完成的任务数: {loop_stats['tasks_completed']}")print(f" 捕获的异常数: {loop_stats['exceptions_caught']}")print(f" 运行时间: {stats['loop_uptime'][loop_id]:.2f}秒")# 运行示例asyncio.run(demo_monitoring())2. 异步上下文管理器
import asyncioimport aiohttpfrom contextlib import asynccontextmanagerfrom typing import AsyncIteratorclass AsyncDatabaseConnection:"""模拟异步数据库连接"""def __init__(self, connection_string):self.connection_string = connection_stringself.is_connected = Falseasync def connect(self):"""连接数据库"""print(f"连接到: {self.connection_string}")await asyncio.sleep(0.5) # 模拟连接时间self.is_connected = Trueprint("连接成功")async def disconnect(self):"""断开连接"""if self.is_connected:print("断开数据库连接")self.is_connected = Falseawait asyncio.sleep(0.1)async def execute_query(self, query):"""执行查询"""if not self.is_connected:raise RuntimeError("数据库未连接")print(f"执行查询: {query}")await asyncio.sleep(0.2) # 模拟查询时间return f"查询结果: {query}"@asynccontextmanagerasync def database_connection(connection_string: str) ->AsyncIterator[AsyncDatabaseConnection]:"""异步上下文管理器"""db = AsyncDatabaseConnection(connection_string)try:await db.connect()yield dbfinally:await db.disconnect()class AsyncResourcePool:"""异步资源池"""def __init__(self, pool_size=5):self.pool_size = pool_sizeself._pool = asyncio.Queue()self._in_use = set()async def __aenter__(self):"""异步上下文管理器入口"""for i in range(self.pool_size):await self._pool.put(f"资源-{i}")return selfasync def __aexit__(self, exc_type, exc_val, exc_tb):"""异步上下文管理器出口"""# 清理资源while not self._pool.empty():await self._pool.get()async def acquire(self) ->str:"""获取资源"""resource = await self._pool.get()self._in_use.add(resource)return resourceasync def release(self, resource: str):"""释放资源"""if resource in self._in_use:self._in_use.remove(resource)await self._pool.put(resource)async def context_manager_demo():"""上下文管理器演示"""# 使用异步上下文管理器print("=== 数据库连接演示 ===")async with database_connection("postgresql://localhost/mydb") asdb:result1 = await db.execute_query("SELECT * FROM users")result2 = await db.execute_query("SELECT COUNT(*) FROM orders")print(result1)print(result2)# 使用资源池print("\n=== 资源池演示 ===")async with AsyncResourcePool(3) aspool:# 并发获取资源tasks = []for i in range(5):task = asyncio.create_task(use_resource(pool, i))tasks.append(task)await asyncio.gather(*tasks)async def use_resource(pool: AsyncResourcePool, user_id: int):"""使用资源"""resource = await pool.acquire()try:print(f"用户 {user_id} 获取资源: {resource}")await asyncio.sleep(1) # 模拟资源使用print(f"用户 {user_id} 使用资源完成: {resource}")finally:await pool.release(resource)# 运行示例asyncio.run(context_manager_demo())
9. 与同类工具对比
| 特性 | asyncio | threading | multiprocessing | concurrent.futures |
|---|
| 并发模型 | 协程/事件循环 | 线程 | 进程 | 线程/进程池 |
| 内存开销 | 极低 | 中等 | 高 | 中等 |
| 性能 | 极高(I/O) | 中等 | 高(CPU) | 中等 |
| 编程复杂度 | 高 | 中等 | 中等 | 低 |
| 适用场景 | I/O密集型 | I/O密集型 | CPU密集型 | 通用并发 |
| GIL影响 | 无 | 受限制 | 无 | 线程池受限制 |
10. 企业级应用案例
高性能Web服务
FastAPI、Sanic等异步Web框架
微服务架构和API网关
实时数据处理
网络爬虫
游戏服务器
金融服务
总结
asyncio 是 Python 异步编程的现代标准,核心价值在于:
高性能:单线程处理数万并发连接
低开销:协程比线程内存开销小几个数量级
现代语法:async/await 语法直观易用
生态丰富:庞大的异步库生态系统
技术亮点:
基于事件循环的协程调度
完整的异步IO支持
丰富的同步原语和工具
跨平台高性能实现
适用场景:
高并发网络应用
I/O密集型服务
实时数据处理
微服务和云原生应用
安装使用:
# 无需安装,直接导入python -c"import asyncio; print('asyncio 模块可用')"学习资源:
官方文档:https://docs.python.org/3/library/asyncio.html
异步编程指南:https://docs.python.org/3/library/asyncio-dev.html
实战教程:https://realpython.com/async-io-python/
asyncio 代表了 Python 并发编程的未来方向,特别适合构建高性能、高并发的网络应用和服务,是现代 Python 开发者必须掌握的核心技术之一。