【接上篇】
三、asyncio深度解析:事件循环与协程调度
先来看事件循环机制,以下是代码
import asyncioimport selectorsclass SimpleEventLoop: """简化版事件循环示例""" def __init__(self): self._ready = [] # 就绪任务队列 self._scheduled = [] # 定时任务 self._selector = selectors.DefaultSelector() def create_task(self, coro): """将协程包装为任务""" task = asyncio.Task(coro) self._ready.append(task) return task def run_until_complete(self, coro): """运行直到任务完成""" task = self.create_task(coro) while self._ready or self._scheduled: # 执行所有就绪任务 while self._ready: current = self._ready.pop(0) try: # 执行一步协程 result = current.send(None) # 处理返回的Future等 except StopIteration as e: # 协程执行完成 print(f'Task completed with result: {e.value}') # 处理I/O事件(简化版本) # 实际asyncio会使用selector监控文件描述符
再来看协程调度原理和状态转换机制:
import asynciofrom enum import Enumclass TaskState(Enum): PENDING = 1 RUNNING = 2 DONE = 3async def demonstrate_scheduling(): """演示协程调度状态转换""" print("1. 协程开始执行") # 遇到await,协程挂起,控制权返回事件循环 await asyncio.sleep(1) print("2. 恢复执行,继续运行") # 另一个挂起点 future = asyncio.Future() await future print("3. 协程执行完成") return "result" # 创建任务 task = asyncio.create_task(demonstrate_scheduling()) # 模拟事件循环调度 print(f"任务状态: {task._state}")
四、同步 vs 异步:Web应用性能对比
测试框架设计
import asyncioimport aiohttpimport requestsimport threadingfrom concurrent.futures import ThreadPoolExecutorimport timefrom statistics import meanimport matplotlib.pyplot as pltclass PerformanceTester: """性能对比测试框架""" def __init__(self, url, concurrent_requests=100): self.url = url self.concurrent = concurrent_requests async def async_test(self): """异步测试""" async with aiohttp.ClientSession() as session: tasks = [] start = time.time() for _ in range(self.concurrent): task = session.get(self.url) tasks.append(task) responses = await asyncio.gather(*tasks) elapsed = time.time() - start return elapsed, len(responses) def sync_threaded_test(self): """多线程同步测试""" def make_request(): return requests.get(self.url) with ThreadPoolExecutor(max_workers=50) as executor: start = time.time() results = list(executor.map(make_request, range(self.concurrent))) elapsed = time.time() - start return elapsed, len(results) def run_comparison(self): """运行对比测试""" print(f"测试 {self.concurrent} 个并发请求到 {self.url}") # 异步测试 loop = asyncio.get_event_loop() async_time, async_count = loop.run_until_complete(self.async_test()) # 同步测试 sync_time, sync_count = self.sync_threaded_test() # 输出结果 print(f"\n{'='*50}") print(f"异步(asyncio)结果: {async_time:.2f}秒, 处理 {async_count} 请求") print(f"多线程同步结果: {sync_time:.2f}秒, 处理 {sync_count} 请求") print(f"性能提升: {sync_time/async_time:.1f}倍") # 可视化 self.visualize_results(async_time, sync_time) def visualize_results(self, async_time, sync_time): """结果可视化""" labels = ['asyncio异步', '多线程同步'] times = [async_time, sync_time] plt.figure(figsize=(8, 5)) bars = plt.bar(labels, times, color=['skyblue', 'lightcoral']) plt.ylabel('处理时间 (秒)') plt.title(f'并发请求性能对比 ({self.concurrent} 请求)') # 在柱状图上显示数值 for bar, time_val in zip(bars, times): plt.text(bar.get_x() + bar.get_width()/2, bar.get_height(), f'{time_val:.2f}s', ha='center', va='bottom') plt.tight_layout() plt.show()# 运行测试if __name__ == '__main__': # 使用本地测试服务器 tester = PerformanceTester('http://httpbin.org/delay/1', concurrent_requests=100) tester.run_comparison()
性能差异分析
测试结果特征,低并发场景(<100连接):差异不明显;中等并发场景(100-1000连接):异步性能优势开始显现;高并发场景(>1000连接):异步显著优于同步,内存占用更低。
内存使用对比:
同步模型中,每个线程约8MB栈内存,1000线程≈8GB;异步模型中,每个协程约1KB,1000协程≈1MB。
五、实战:构建高性能异步Web应用
异步Web服务器架构
from aiohttp import webimport asyncpgimport aioredisfrom datetime import datetimeclass AsyncWebService: """高性能异步Web服务示例""" def __init__(self): self.app = web.Application() self.setup_routes() self.setup_middleware() async def init_db(self): """初始化数据库连接池""" self.db_pool = await asyncpg.create_pool( user='user', password='password', database='database', host='localhost', min_size=5, max_size=20) async def init_cache(self): """初始化Redis连接池""" self.redis = await aioredis.create_redis_pool( 'redis://localhost', minsize=5, maxsize=20) def setup_middleware(self): """设置中间件""" @web.middleware async def timing_middleware(request, handler): start = datetime.now() response = await handler(request) elapsed = (datetime.now() - start).total_seconds() response.headers['X-Response-Time'] = f'{elapsed:.3f}s' return response self.app.middlewares.append(timing_middleware) def setup_routes(self): """设置路由""" self.app.router.add_get('/api/users/{id}', self.get_user) self.app.router.add_post('/api/users', self.create_user) self.app.router.add_get('/api/products', self.list_products) async def get_user(self, request): """获取用户信息(带缓存)""" user_id = request.match_info['id'] # 首先尝试从缓存获取 cached = await self.redis.get(f'user:{user_id}') if cached: return web.json_response({'cached': True, 'data': cached}) # 缓存未命中,查询数据库 async with self.db_pool.acquire() as conn: user = await conn.fetchrow('SELECT * FROM users WHERE id = $1', user_id) if user: # 写入缓存,设置5分钟过期 await self.redis.setex( f'user:{user_id}', 300, user['name']) return web.json_response(dict(user)) else: return web.json_response( {'error': 'User not found'}, status=404) async def create_user(self, request): """创建用户(异步批量处理示例)""" data = await request.json() # 模拟异步批量处理 tasks = [] async with self.db_pool.acquire() as conn: # 开启事务 async with conn.transaction(): # 批量插入用户 for user_data in data['users']: task = conn.execute('''INSERT INTO users (name, email) VALUES ($1, $2)''', user_data['name'], user_data['email']) tasks.append(task) # 并行执行所有插入操作 await asyncio.gather(*tasks) return web.json_response({'status': 'success', 'created': len(data['users'])} ) async def list_products(self, request): """商品列表(连接多个外部API)""" # 并发查询多个数据源 tasks = [self.fetch_products_from_source('source1'), self.fetch_products_from_source('source2'), self.fetch_products_from_cache()] # 并行执行所有查询 results = await asyncio.gather(*tasks, return_exceptions=True) # 合并结果 products = [] for result in results: if isinstance(result, Exception): # 处理错误,但不中断其他结果 print(f"查询失败: {result}") elif result: products.extend(result) return web.json_response({'count': len(products), 'products': products}) async def fetch_products_from_source(self, source): """从外部API获取商品数据""" # 模拟外部API调用 await asyncio.sleep(0.1) return [{'id': 1, 'name': f'Product from {source}'}] async def fetch_products_from_cache(self): """从缓存获取商品数据""" # 这里可以添加实际的缓存逻辑 return [] async def startup(self, app): """应用启动时的初始化""" await self.init_db() await self.init_cache() print("应用启动完成") async def cleanup(self, app): """应用关闭时的清理""" await self.db_pool.close() self.redis.close() await self.redis.wait_closed() print("应用关闭完成") def run(self): """运行应用""" self.app.on_startup.append(self.startup) self.app.on_cleanup.append(self.cleanup) web.run_app(self.app, host='0.0.0.0', port=8080, # 使用高性能HTTP解析器 access_log=None # 生产环境关闭访问日志提升性能)if __name__ == '__main__': service = AsyncWebService() service.run()
最佳实践与性能优化,连接池管理
# 优化数据库连接池配置async def get_optimized_db_pool(): return await asyncpg.create_pool(dsn='postgresql://user:pass@localhost/db', min_size=5, # 最小连接数 max_size=50, # 最大连接数,根据CPU核心数调整 max_queries=50000, # 连接复用次数 max_inactive_connection_lifetime=300, # 空闲连接超时 command_timeout=60, # 查询超时时间 setup=self.setup_connection # 连接初始化函数)# 异步批处理模式:async def batch_process(items, batch_size=100): """异步批处理模式""" results = [] for i in range(0, len(items), batch_size): batch = items[i:i + batch_size] # 为每个批次创建任务 tasks = [process_item(item) for item in batch] # 并行处理当前批次 batch_results = await asyncio.gather(*tasks) results.extend(batch_results) # 避免资源耗尽,小睡片刻 await asyncio.sleep(0.001) return results
六、总结:选择合适的并发模型
决策矩阵
场景特征 | 推荐模型 | 理由 |
CPU密集型计算 | 多进程 | 绕过GIL,真正并行 |
I/O密集型,并发<1000 | 多线程 | 简单易用,开发成本低 |
高并发I/O,连接>1000 | asyncio协程 | 高并发,低内存 |
混合型任务 | 混合架构 | 进程处理CPU,协程处理I/O |