处理高并发请求时程序响应缓慢:当1000个用户同时访问你的API接口,同步代码只能逐个处理,后面的用户需要排队等待,体验极差。爬虫效率低下浪费大量时间:使用requests库同步抓取100个网页,每个网页耗时2秒,总时间就是200秒,大部分时间都在等待网络响应。数据库查询阻塞主线程:在Web应用中执行数据库查询时,整个线程都被阻塞,无法同时处理其他用户请求,服务器资源利用率极低。实时通信应用连接数受限:使用同步代码开发WebSocket服务器,每个连接都需要一个线程,1000个连接就需要1000个线程,系统资源很快耗尽。处理大文件时内存爆炸:一次性读取几个GB的大文件到内存,程序因OOM(内存溢出)崩溃,无法实现流式处理。asyncio模块正是为解决这些痛点而生!它提供了完整的异步编程框架,让你用单线程就能处理成千上万的并发连接,真正发挥出硬件的全部潜力。通过本文,你将掌握asyncio的核心原理、最新特性、实战应用场景和常见问题解决方案,彻底告别同步编程的性能瓶颈。asyncio是Python 3.4版本引入的标准库模块,旨在为异步IO操作提供基础设施,支持编写单线程并发代码。它的设计哲学是"非阻塞IO+事件循环",通过协程(Coroutine)和事件循环(Event Loop)的巧妙组合,实现高效的并发处理。- 单线程并发:避免多线程带来的锁竞争和上下文切换开销
- 非阻塞IO:遇到IO操作时立即让出控制权,不阻塞其他任务执行
- 回调驱动:基于事件驱动的编程模型,通过回调函数处理异步结果
- 结构化并发:Python 3.11+引入TaskGroup,提供更安全的并发控制
asyncio模块包含多个核心组件,下面是其主要功能分类: | | | |
| | | |
| | | |
| | | asyncio.run(), get_event_loop() |
| | | |
| | | open_connection(), start_server() |
| | | |
| | | |
- 高并发网络应用:Web服务器、API网关、实时通信服务器
- IO密集型任务:网络爬虫、文件批量处理、数据库批量操作
- 实时数据处理:WebSocket服务器、实时监控系统
不适合的场景:纯计算密集型任务(如图像处理、复杂数学运算),这类任务更适合使用多进程(multiprocessing)或专门的数值计算库。协程是asyncio的基础构建块,使用async def定义,通过await调用。import asyncio# 定义协程函数asyncdefsay_hello(name: str) -> str:"""简单的协程函数示例 Args: name: 要问候的名字 Returns: 问候语字符串 """await asyncio.sleep(1) # 模拟耗时操作,非阻塞等待returnf"Hello, {name}!"asyncdefmain():"""主协程函数"""# 调用协程 greeting = await say_hello("Python开发者") print(greeting) # 输出: Hello, Python开发者!# 同时执行多个协程 tasks = [ say_hello("Alice"), say_hello("Bob"), say_hello("Charlie") ]# 使用gather并发执行 results = await asyncio.gather(*tasks) print(results) # 输出: ['Hello, Alice!', 'Hello, Bob!', 'Hello, Charlie!']# 运行入口if __name__ == "__main__": asyncio.run(main())
- async def定义的是协程函数,调用时返回协程对象,不会立即执行
- await只能在协程函数内部使用,用于等待另一个协程或异步操作完成
- asyncio.run()是Python 3.7+推荐的运行入口,负责创建和管理事件循环
2.2 任务调度:create_task()和TaskGroup任务(Task)是事件循环调度的基本单位,可以通过create_task()创建。import asyncioimport timeasyncdefdownload_file(filename: str, size: int):"""模拟下载文件的协程 Args: filename: 文件名 size: 文件大小(MB) """ print(f"开始下载 {filename} ({size}MB)...")await asyncio.sleep(size * 0.1) # 模拟下载时间 print(f"下载完成 {filename}")return {"filename": filename, "size": size}asyncdefmain_old_style():"""旧式任务调度(Python 3.7之前)""" start_time = time.time()# 创建任务但不立即等待 task1 = asyncio.create_task(download_file("report.pdf", 10)) task2 = asyncio.create_task(download_file("data.csv", 5)) task3 = asyncio.create_task(download_file("image.jpg", 8))# 分别等待任务完成 result1 = await task1 result2 = await task2 result3 = await task3 end_time = time.time() print(f"总下载时间: {end_time - start_time:.2f}秒") print(f"下载结果: {[result1, result2, result3]}")# 实际耗时约1.0秒(三个任务并发执行),而不是2.3秒(串行执行)# Python 3.11+ 新特性:TaskGroup(结构化并发)asyncdefmain_new_style():"""新式任务调度(Python 3.11+)""" start_time = time.time()asyncwith asyncio.TaskGroup() as tg:# 使用TaskGroup创建任务,自动管理异常和取消 task1 = tg.create_task(download_file("report.pdf", 10)) task2 = tg.create_task(download_file("data.csv", 5)) task3 = tg.create_task(download_file("image.jpg", 8))# TaskGroup退出时会自动等待所有任务完成 end_time = time.time() print(f"TaskGroup总下载时间: {end_time - start_time:.2f}秒")# 运行比较if __name__ == "__main__": print("=== 旧式任务调度 ===") asyncio.run(main_old_style()) print("\n=== 新式TaskGroup调度 ===") asyncio.run(main_new_style())
- 异常传播:一个任务失败会自动取消其他任务,防止资源泄漏
- 自动等待:不需要显式调用gather()或分别await每个任务
2.3 事件循环:asyncio.run()和底层API事件循环是asyncio的心脏,负责调度所有协程的执行。import asyncioimport threadingasyncdefworker(name: str, delay: float):"""工作协程""" print(f"Worker {name} 开始工作,预计耗时{delay}秒")await asyncio.sleep(delay) print(f"Worker {name} 完成工作")returnf"Result from {name}"# 方式1:推荐的简单方式(Python 3.7+)asyncdefsimple_main():"""使用asyncio.run()简化的事件循环管理""" tasks = [ worker("A", 1.5), worker("B", 1.0), worker("C", 2.0) ] results = await asyncio.gather(*tasks) print(f"所有任务完成: {results}")# 方式2:手动管理事件循环(需要更精细控制时)defmanual_event_loop():"""手动管理事件循环(适用于特殊场景)"""# 获取或创建事件循环 loop = asyncio.new_event_loop() asyncio.set_event_loop(loop)try:# 运行多个任务 tasks = [ worker("手动-A", 1.0), worker("手动-B", 0.5) ]# 使用gather并发执行 results = loop.run_until_complete(asyncio.gather(*tasks)) print(f"手动模式结果: {results}")# 在事件循环中运行同步函数(线程池)import concurrent.futuresdefblocking_io(): print("执行阻塞IO操作...") time.sleep(1)return"阻塞IO完成"# 将同步函数放到线程池中运行 future = loop.run_in_executor(None, blocking_io) result = loop.run_until_complete(future) print(f"线程池执行结果: {result}")finally:# 清理资源 loop.close()if __name__ == "__main__": print("=== 简单方式(推荐) ===") asyncio.run(simple_main()) print("\n=== 手动管理事件循环 ===") manual_event_loop()
- 优先使用asyncio.run():适用于大多数场景,自动管理循环生命周期
- 避免在异步代码中混合同步阻塞调用:会阻塞整个事件循环
- 使用run_in_executor()处理阻塞操作:将同步函数放到线程池执行
2.4 同步原语:Lock、Semaphore、Eventimport asyncio# 1. Lock(互斥锁):保护共享资源asyncdefuse_lock(lock: asyncio.Lock, resource_id: int):"""使用锁保护共享资源访问"""asyncwith lock: # 自动获取和释放锁 print(f"协程 {resource_id} 获得锁,访问共享资源")await asyncio.sleep(0.5) print(f"协程 {resource_id} 释放锁")return resource_id# 2. Semaphore(信号量):限制并发数量asyncdeflimited_concurrent(sem: asyncio.Semaphore, task_id: int):"""使用信号量控制并发数"""asyncwith sem: print(f"任务 {task_id} 获得信号量,开始执行")await asyncio.sleep(1) print(f"任务 {task_id} 释放信号量")return task_id# 3. Event(事件):协程间通信asyncdefwait_for_event(event: asyncio.Event, waiter_id: int):"""等待事件的协程""" print(f"等待者 {waiter_id} 开始等待事件")await event.wait() # 等待事件被设置 print(f"等待者 {waiter_id} 检测到事件已触发")asyncdeftrigger_event(event: asyncio.Event):"""触发事件的协程"""await asyncio.sleep(2) print("触发器:设置事件") event.set() # 设置事件,唤醒所有等待者asyncdefmain_sync_primitives():"""演示同步原语的使用""" print("=== 1. Lock演示 ===") lock = asyncio.Lock() lock_tasks = [use_lock(lock, i) for i in range(3)]await asyncio.gather(*lock_tasks) print("\n=== 2. Semaphore演示 ===") sem = asyncio.Semaphore(2) # 最多同时2个并发 sem_tasks = [limited_concurrent(sem, i) for i in range(5)]await asyncio.gather(*sem_tasks) print("\n=== 3. Event演示 ===") event = asyncio.Event()# 创建3个等待者和1个触发器 waiters = [wait_for_event(event, i) for i in range(3)] trigger = trigger_event(event)await asyncio.gather(*waiters, trigger)if __name__ == "__main__": asyncio.run(main_sync_primitives())
- Semaphore:限制API调用频率、控制数据库连接池大小
- Event:等待系统初始化完成、通知多个协程开始执行
异步上下文管理器使用async with语句,可以在进入和退出时执行异步操作。import asyncioimport aiofilesclassAsyncDatabaseConnection:"""异步数据库连接上下文管理器"""def__init__(self, connection_string: str): self.connection_string = connection_string self.connection = Noneasyncdef__aenter__(self):"""进入上下文时建立连接""" print(f"连接到数据库: {self.connection_string}")await asyncio.sleep(0.5) # 模拟连接建立 self.connection = {"status": "connected", "connection_string": self.connection_string}return self.connectionasyncdef__aexit__(self, exc_type, exc_val, exc_tb):"""退出上下文时关闭连接"""if self.connection: print("关闭数据库连接")await asyncio.sleep(0.2) # 模拟连接关闭 self.connection = Noneasyncdefquery(self, sql: str):"""执行查询"""ifnot self.connection:raise RuntimeError("数据库未连接") print(f"执行查询: {sql}")await asyncio.sleep(0.3) # 模拟查询执行return [{"id": 1, "name": "测试数据"}]asyncdefasync_context_example():"""异步上下文管理器示例"""# 使用异步上下文管理器asyncwith AsyncDatabaseConnection("postgresql://localhost/mydb") as db: print(f"数据库连接状态: {db}")# 执行查询 results = await db.query("SELECT * FROM users") print(f"查询结果: {results}")# 上下文退出后自动关闭连接 print("上下文已退出,连接自动关闭")# 异步文件操作(使用aiofiles库)asyncdefasync_file_operations():"""异步文件读写示例"""# 异步写入文件asyncwith aiofiles.open('test_async.txt', mode='w') as f:await f.write('异步写入的内容\n')await f.write('第二行内容\n')# 异步读取文件asyncwith aiofiles.open('test_async.txt', mode='r') as f: content = await f.read() print(f"文件内容:\n{content}")# 异步逐行读取asyncwith aiofiles.open('test_async.txt', mode='r') as f:asyncfor line in f: print(f"行: {line.strip()}")asyncdefmain_advanced():"""高级用法演示主函数""" print("=== 异步上下文管理器 ===")await async_context_example() print("\n=== 异步文件操作 ===")await async_file_operations()if __name__ == "__main__": asyncio.run(main_advanced())
- 异常安全:即使在协程中发生异常,__aexit__也会被调用
异步迭代器使用async for语句,可以在迭代过程中执行异步操作。import asynciofrom typing import AsyncIterator, ListclassAsyncDataFetcher:"""异步数据获取迭代器"""def__init__(self, urls: List[str], batch_size: int = 3): self.urls = urls self.batch_size = batch_size self.current_index = 0def__aiter__(self) -> "AsyncDataFetcher":"""返回异步迭代器自身"""return selfasyncdef__anext__(self) -> str:"""获取下一个数据"""if self.current_index >= len(self.urls):raise StopAsyncIteration# 模拟异步数据获取 url = self.urls[self.current_index] print(f"开始获取: {url}")await asyncio.sleep(0.5) # 模拟网络请求 self.current_index += 1returnf"数据 from {url}"# 异步生成器(Python 3.6+)asyncdefasync_generator(start: int, end: int):"""异步生成器示例"""for i in range(start, end):# 在yield前执行异步操作await asyncio.sleep(0.1)yield iasyncdefprocess_large_dataset():"""处理大型数据集的异步生成器"""# 模拟从数据库分页读取数据 page_size = 100 total_records = 1000for page in range(0, total_records, page_size):# 模拟异步数据库查询await asyncio.sleep(0.2)# 生成当前页的数据 start_id = page + 1 end_id = min(page + page_size, total_records)for record_id in range(start_id, end_id + 1):yield {"id": record_id, "data": f"记录{record_id}"}asyncdefmain_async_iteration():"""异步迭代演示""" print("=== 异步迭代器示例 ===") urls = ["https://api.example.com/data1","https://api.example.com/data2", "https://api.example.com/data3","https://api.example.com/data4" ] fetcher = AsyncDataFetcher(urls)asyncfor data in fetcher: print(f"获取到数据: {data}") print("\n=== 异步生成器示例 ===")asyncfor number in async_generator(1, 6): print(f"生成器产生的数字: {number}") print("\n=== 处理大型数据集 ===") count = 0asyncfor record in process_large_dataset(): count += 1if count <= 5: # 只显示前5条记录 print(f"记录: {record}")if count >= 50: # 只处理前50条记录作为演示break print(f"总共处理了 {count} 条记录")if __name__ == "__main__": asyncio.run(main_async_iteration())
掌握以下技巧可以显著提升asyncio应用的性能。import asyncioimport timefrom functools import wraps# 技巧1:连接复用(HTTP客户端)asyncdefhttp_connection_pooling():"""HTTP连接池复用示例"""import aiohttp urls = [f"https://httpbin.org/delay/{i%3}"for i in range(10)]# 创建连接池,限制最大连接数 connector = aiohttp.TCPConnector(limit=5, limit_per_host=3)asyncwith aiohttp.ClientSession(connector=connector) as session: tasks = []for url in urls: task = asyncio.create_task( session.get(url, timeout=10) ) tasks.append(task) responses = await asyncio.gather(*tasks)# 处理响应for resp in responses:if resp.status == 200: data = await resp.text() print(f"成功获取 {len(data)} 字节数据") resp.close()# 技巧2:批量处理减少上下文切换asyncdefbatch_processing():"""批量处理减少await次数"""# ❌ 不推荐:频繁await# async def process_single(item):# await asyncio.sleep(0.01)# return item * 2# ✅ 推荐:批量处理asyncdefprocess_batch(items):"""批量处理一组数据"""await asyncio.sleep(0.05) # 假设批量处理需要固定时间return [item * 2for item in items]# 准备测试数据 all_items = list(range(100)) batch_size = 10# 分批处理 results = []for i in range(0, len(all_items), batch_size): batch = all_items[i:i + batch_size] batch_result = await process_batch(batch) results.extend(batch_result) print(f"批量处理完成,共 {len(results)} 个结果")return results# 技巧3:使用asyncio.timeout()控制超时(Python 3.11+)asyncdeftimeout_control():"""使用超时控制防止任务无限等待"""asyncdefslow_task(duration: float):"""模拟耗时任务"""await asyncio.sleep(duration)returnf"任务完成,耗时{duration}秒"try:# 使用timeout上下文管理器asyncwith asyncio.timeout(2.0): result = await slow_task(3.0) # 这个任务需要3秒,会超时return resultexcept asyncio.TimeoutError: print("任务执行超时,已取消")returnNone# 技巧4:协程缓存(memoization)defasync_cache(maxsize=128):"""异步函数缓存装饰器""" cache = {}defdecorator(func): @wraps(func)asyncdefwrapper(*args, **kwargs):# 生成缓存键 key = (args, frozenset(kwargs.items()))if key in cache: print(f"缓存命中: {func.__name__}{args}")return cache[key]# 执行函数 result = await func(*args, **kwargs)# 更新缓存if len(cache) >= maxsize:# 简单的LRU:移除第一个键 cache.pop(next(iter(cache))) cache[key] = resultreturn resultreturn wrapperreturn decorator@async_cache(maxsize=10)asyncdefexpensive_operation(x: int):"""模拟昂贵的计算或IO操作"""await asyncio.sleep(1.0)return x * xasyncdefmain_performance():"""性能优化演示""" print("=== 连接池复用 ===")await http_connection_pooling() print("\n=== 批量处理 ===")await batch_processing() print("\n=== 超时控制 ===")await timeout_control() print("\n=== 协程缓存 ===")# 第一次调用会执行 result1 = await expensive_operation(5) print(f"第一次结果: {result1}")# 第二次调用相同参数,命中缓存 result2 = await expensive_operation(5) print(f"第二次结果: {result2}")if __name__ == "__main__": asyncio.run(main_performance())
- 连接复用:HTTP连接、数据库连接要复用,避免频繁创建销毁
使用FastAPI + asyncio构建能处理高并发的API服务器。"""场景描述:电商平台需要构建商品查询API,每秒需要处理上千个并发请求,同时查询数据库、调用推荐系统、记录访问日志。"""import asyncioimport timefrom typing import List, Dictfrom datetime import datetime# 模拟异步数据库客户端classAsyncDatabase:"""模拟异步数据库操作""" @staticmethodasyncdefquery_products(product_ids: List[int]) -> List[Dict]:"""查询商品信息""" print(f"数据库查询: {len(product_ids)} 个商品")await asyncio.sleep(0.1) # 模拟数据库查询时间return [ {"id": pid, "name": f"商品{pid}", "price": pid * 10, "stock": 100}for pid in product_ids ] @staticmethodasyncdefquery_reviews(product_id: int) -> List[Dict]:"""查询商品评价"""await asyncio.sleep(0.05)return [ {"id": 1, "user": "用户A", "rating": 5, "comment": "很好"}, {"id": 2, "user": "用户B", "rating": 4, "comment": "不错"} ]# 模拟推荐系统客户端classRecommendationSystem:"""模拟异步推荐系统""" @staticmethodasyncdefget_recommendations(user_id: int, product_id: int) -> List[int]:"""获取相关推荐"""await asyncio.sleep(0.08)return [product_id + i for i in range(1, 4)]# 日志系统classAsyncLogger:"""异步日志记录""" @staticmethodasyncdeflog_access(user_id: int, endpoint: str, duration: float):"""记录访问日志"""await asyncio.sleep(0.01) # 异步写入日志 print(f"[{datetime.now()}] 用户{user_id} 访问{endpoint}, 耗时{duration:.3f}秒")asyncdefget_product_details(product_id: int, user_id: int) -> Dict:"""获取商品详情(整合多个数据源)""" start_time = time.time()# 并发执行多个异步任务 db = AsyncDatabase() recsys = RecommendationSystem()# 使用TaskGroup(Python 3.11+)确保所有任务完成asyncwith asyncio.TaskGroup() as tg:# 并行查询商品信息和评价 product_task = tg.create_task(db.query_products([product_id])) reviews_task = tg.create_task(db.query_reviews(product_id)) recommendations_task = tg.create_task( recsys.get_recommendations(user_id, product_id) )# 获取所有结果 products = product_task.result() reviews = reviews_task.result() recommendations = recommendations_task.result()# 组装响应数据ifnot products:return {"error": "商品不存在"} product = products[0]# 计算响应时间 duration = time.time() - start_time# 异步记录访问日志(不阻塞主流程) log_task = asyncio.create_task( AsyncLogger.log_access(user_id, f"/products/{product_id}", duration) )return {"product": product,"reviews": reviews,"recommendations": recommendations,"response_time": f"{duration:.3f}秒" }asyncdefbatch_get_products(product_ids: List[int], user_id: int) -> Dict:"""批量获取商品信息(优化版本)""" start_time = time.time() db = AsyncDatabase()# 批量查询商品信息 products = await db.query_products(product_ids)# 并发查询每个商品的评价 review_tasks = [ asyncio.create_task(db.query_reviews(pid))for pid in product_ids ] reviews_list = await asyncio.gather(*review_tasks)# 组织返回数据 result = {}for i, product in enumerate(products): result[product['id']] = {"product": product,"reviews": reviews_list[i] if i < len(reviews_list) else [] } duration = time.time() - start_time# 记录批量操作日志await AsyncLogger.log_access( user_id, f"/products/batch/{len(product_ids)}", duration )return {"data": result,"total_products": len(products),"response_time": f"{duration:.3f}秒" }asyncdefdemo_web_api():"""演示Web API场景""" print("=== 单个商品查询 ===")# 模拟多个并发请求 user_requests = []for user_id in range(1, 6): product_id = user_id * 10 task = asyncio.create_task( get_product_details(product_id, user_id) ) user_requests.append(task)# 并发处理所有用户请求 results = await asyncio.gather(*user_requests)for i, result in enumerate(results): print(f"用户{i+1} 查询结果: 商品{result.get('product', {}).get('id', 'N/A')}") print("\n=== 批量商品查询 ===")# 批量查询演示 batch_result = await batch_get_products([101, 102, 103, 104, 105], 1001) print(f"批量查询结果: {batch_result['total_products']} 个商品") print(f"响应时间: {batch_result['response_time']}")if __name__ == "__main__": asyncio.run(demo_web_api())
- 响应速度快:通过并发查询多个数据源,大幅减少响应时间
"""场景描述:需要从多个新闻网站抓取最新文章,要求高效、稳定、支持重试机制,同时遵守robots.txt规则,避免对目标网站造成压力。"""import asyncioimport aiohttpimport aiofilesfrom typing import List, Dict, Optionalfrom urllib.parse import urlparseimport timeclassAsyncWebCrawler:"""异步网络爬虫"""def__init__(self, max_concurrent: int = 10, timeout: float = 10.0, retry_count: int = 3):""" 初始化爬虫 Args: max_concurrent: 最大并发数 timeout: 请求超时时间(秒) retry_count: 重试次数 """ self.max_concurrent = max_concurrent self.timeout = timeout self.retry_count = retry_count self.semaphore = asyncio.Semaphore(max_concurrent)# 统计信息 self.stats = {"total_requests": 0,"successful": 0,"failed": 0,"total_bytes": 0,"start_time": None }asyncdeffetch_url(self, session: aiohttp.ClientSession, url: str, headers: Optional[Dict] = None) -> Optional[str]:"""抓取单个URL""" self.stats["total_requests"] += 1for attempt in range(self.retry_count):try:asyncwith self.semaphore: # 控制并发数 print(f"抓取: {url} (尝试 {attempt + 1}/{self.retry_count})")asyncwith session.get( url, headers=headers, timeout=aiohttp.ClientTimeout(total=self.timeout) ) as response:if response.status == 200: content = await response.text() self.stats["successful"] += 1 self.stats["total_bytes"] += len(content.encode('utf-8')) print(f"成功: {url} ({len(content)} 字节)")return contentelse: print(f"HTTP错误 {response.status}: {url}")except asyncio.TimeoutError: print(f"超时: {url} (尝试 {attempt + 1})")except aiohttp.ClientError as e: print(f"客户端错误: {url} - {e}")except Exception as e: print(f"未知错误: {url} - {e}")# 如果不是最后一次尝试,等待后重试if attempt < self.retry_count - 1:await asyncio.sleep(2 ** attempt) # 指数退避 self.stats["failed"] += 1 print(f"最终失败: {url}")returnNoneasyncdefcrawl_urls(self, urls: List[str], output_dir: str = "crawled_data") -> Dict:"""批量抓取URL列表""" self.stats["start_time"] = time.time()# 确保输出目录存在import os os.makedirs(output_dir, exist_ok=True)# 设置合理的HTTP头部 headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36","Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8","Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8","Accept-Encoding": "gzip, deflate", }asyncwith aiohttp.ClientSession(headers=headers) as session: tasks = []for url in urls: task = asyncio.create_task( self.fetch_url(session, url) ) tasks.append(task)# 等待所有任务完成 results = await asyncio.gather(*tasks)# 保存结果到文件 save_tasks = []for i, (url, content) in enumerate(zip(urls, results)):if content:# 生成安全文件名 parsed = urlparse(url) filename = f"{parsed.netloc.replace('.', '_')}_{i}.html" filepath = os.path.join(output_dir, filename) save_task = asyncio.create_task( self.save_content(filepath, content, url) ) save_tasks.append(save_task)# 等待所有保存任务完成await asyncio.gather(*save_tasks)# 计算总耗时 duration = time.time() - self.stats["start_time"] self.stats["duration"] = durationreturn self.statsasyncdefsave_content(self, filepath: str, content: str, url: str):"""异步保存内容到文件"""try:asyncwith aiofiles.open(filepath, 'w', encoding='utf-8') as f:await f.write(f"<!-- Source URL: {url} -->\n")await f.write(content) print(f"已保存: {filepath}")except Exception as e: print(f"保存文件失败 {filepath}: {e}")asyncdefextract_links(self, html: str, base_url: str) -> List[str]:"""从HTML中提取链接(简化版)"""# 这里可以使用beautifulsoup4,为简化使用字符串查找import re# 简单的链接提取正则 pattern = r'href=["\'](https?://[^"\']+)["\']' links = re.findall(pattern, html)# 过滤和去重 unique_links = [] seen = set()for link in links:if link notin seen and link.startswith('http'): seen.add(link) unique_links.append(link) print(f"从 {base_url} 提取到 {len(unique_links)} 个链接")return unique_linksasyncdefdemo_crawler():"""爬虫演示"""# 测试URL列表(实际使用时应替换为真实URL) test_urls = ["https://httpbin.org/delay/1","https://httpbin.org/delay/2", "https://httpbin.org/delay/1","https://httpbin.org/status/200","https://httpbin.org/status/404","https://httpbin.org/bytes/1024","https://httpbin.org/bytes/2048","https://httpbin.org/bytes/4096", ]# 模拟新闻网站URL(这里使用httpbin模拟) news_urls = [f"https://httpbin.org/delay/{i%3}"for i in range(12)] print("=== 异步爬虫演示 ===") crawler = AsyncWebCrawler( max_concurrent=5, timeout=5.0, retry_count=2 )# 开始抓取 stats = await crawler.crawl_urls(news_urls, "crawled_news")# 输出统计信息 print("\n=== 爬虫统计 ===") print(f"总请求数: {stats['total_requests']}") print(f"成功: {stats['successful']}") print(f"失败: {stats['failed']}") print(f"总数据量: {stats['total_bytes'] / 1024:.2f} KB") print(f"总耗时: {stats['duration']:.2f}秒")# 计算平均速度if stats['duration'] > 0: req_per_sec = stats['total_requests'] / stats['duration'] data_per_sec = stats['total_bytes'] / stats['duration'] / 1024 print(f"平均请求速度: {req_per_sec:.2f} 请求/秒") print(f"平均数据速度: {data_per_sec:.2f} KB/秒")# 演示链接提取(模拟) print("\n=== 链接提取演示 ===")# 读取一个保存的文件并提取链接import osif os.path.exists("crawled_news"): files = os.listdir("crawled_news")if files: sample_file = os.path.join("crawled_news", files[0])asyncwith aiofiles.open(sample_file, 'r', encoding='utf-8') as f: content = await f.read()# 提取链接 links = await crawler.extract_links(content, "httpbin.org")if links: print(f"示例链接(前3个):")for link in links[:3]: print(f" - {link}")if __name__ == "__main__": asyncio.run(demo_crawler())
- 连接复用:使用ClientSession复用TCP连接
- 异步文件IO:使用aiofiles避免文件写入阻塞事件循环
"""场景描述:需要监控服务器集群的健康状态,实时收集指标(CPU、内存、网络),在异常时触发告警,同时支持历史数据查询和分析。"""import asyncioimport randomimport timefrom typing import Dict, List, Optionalfrom datetime import datetime, timedeltafrom dataclasses import dataclassimport json@dataclassclassServerMetric:"""服务器监控指标""" server_id: str timestamp: datetime cpu_percent: float memory_percent: float network_in_mbps: float network_out_mbps: float disk_usage_percent: float@dataclass classAlert:"""告警信息""" alert_id: str server_id: str alert_type: str # "high_cpu", "high_memory", "network_anomaly" severity: str # "warning", "critical" metric_value: float threshold: float timestamp: datetime message: strclassRealTimeMonitor:"""实时监控系统"""def__init__(self, alert_thresholds: Optional[Dict] = None, check_interval: float = 5.0):""" 初始化监控器 Args: alert_thresholds: 告警阈值配置 check_interval: 检查间隔(秒) """ self.check_interval = check_interval# 默认告警阈值 self.alert_thresholds = alert_thresholds or {"high_cpu": 80.0, # CPU使用率超过80%"high_memory": 85.0, # 内存使用率超过85%"high_disk": 90.0, # 磁盘使用率超过90%"network_spike": 100.0# 网络流量突增超过100Mbps }# 模拟的服务器列表 self.servers = [f"web-server-{i:03d}"for i in range(1, 11) ] + [f"db-server-{i:03d}"for i in range(1, 6) ] + [f"cache-server-{i:03d}"for i in range(1, 4) ]# 状态存储 self.metrics_history: Dict[str, List[ServerMetric]] = { server: [] for server in self.servers } self.active_alerts: Dict[str, Alert] = {} self.alert_counter = 0# 监控任务 self.monitoring_task: Optional[asyncio.Task] = None self.is_running = Falseasyncdefcollect_metrics(self, server_id: str) -> ServerMetric:"""收集单个服务器的指标(模拟)"""# 模拟数据收集的延迟await asyncio.sleep(random.uniform(0.1, 0.3))# 模拟正常和异常情况 is_healthy = random.random() > 0.1# 90%时间健康if is_healthy: cpu = random.uniform(10.0, 60.0) memory = random.uniform(20.0, 70.0) network_in = random.uniform(5.0, 50.0) network_out = random.uniform(5.0, 40.0) disk = random.uniform(30.0, 80.0)else:# 10%时间可能有问题 cpu = random.uniform(70.0, 99.0) memory = random.uniform(75.0, 98.0) network_in = random.uniform(60.0, 200.0) network_out = random.uniform(50.0, 180.0) disk = random.uniform(85.0, 99.0) metric = ServerMetric( server_id=server_id, timestamp=datetime.now(), cpu_percent=cpu, memory_percent=memory, network_in_mbps=network_in, network_out_mbps=network_out, disk_usage_percent=disk )return metricasyncdefanalyze_metrics(self, metric: ServerMetric) -> List[Alert]:"""分析指标并生成告警""" alerts = []# 检查CPUif metric.cpu_percent > self.alert_thresholds["high_cpu"]: severity = "critical"if metric.cpu_percent > 90.0else"warning" alert = Alert( alert_id=f"alert-{self.alert_counter:06d}", server_id=metric.server_id, alert_type="high_cpu", severity=severity, metric_value=metric.cpu_percent, threshold=self.alert_thresholds["high_cpu"], timestamp=metric.timestamp, message=f"服务器 {metric.server_id} CPU使用率过高: {metric.cpu_percent:.1f}%" ) alerts.append(alert) self.alert_counter += 1# 检查内存if metric.memory_percent > self.alert_thresholds["high_memory"]: severity = "critical"if metric.memory_percent > 95.0else"warning" alert = Alert( alert_id=f"alert-{self.alert_counter:06d}", server_id=metric.server_id, alert_type="high_memory", severity=severity, metric_value=metric.memory_percent, threshold=self.alert_thresholds["high_memory"], timestamp=metric.timestamp, message=f"服务器 {metric.server_id} 内存使用率过高: {metric.memory_percent:.1f}%" ) alerts.append(alert) self.alert_counter += 1# 检查网络突增 total_network = metric.network_in_mbps + metric.network_out_mbpsif total_network > self.alert_thresholds["network_spike"]: alert = Alert( alert_id=f"alert-{self.alert_counter:06d}", server_id=metric.server_id, alert_type="network_anomaly", severity="warning", metric_value=total_network, threshold=self.alert_thresholds["network_spike"], timestamp=metric.timestamp, message=f"服务器 {metric.server_id} 网络流量突增: {total_network:.1f} Mbps" ) alerts.append(alert) self.alert_counter += 1return alertsasyncdefprocess_alerts(self, alerts: List[Alert]):"""处理告警"""for alert in alerts:# 记录活跃告警 self.active_alerts[alert.alert_id] = alert# 模拟告警处理if alert.severity == "critical": print(f"🚨 CRITICAL告警: {alert.message}")# 这里可以集成邮件、短信、钉钉等通知else: print(f"⚠️ WARNING告警: {alert.message}")asyncdefcleanup_old_alerts(self, max_age_hours: int = 24):"""清理旧告警""" cutoff_time = datetime.now() - timedelta(hours=max_age_hours) to_remove = []for alert_id, alert in self.active_alerts.items():if alert.timestamp < cutoff_time: to_remove.append(alert_id)for alert_id in to_remove:del self.active_alerts[alert_id]if to_remove: print(f"清理了 {len(to_remove)} 个旧告警")asyncdefmonitor_loop(self):"""监控主循环""" print(f"开始监控 {len(self.servers)} 台服务器...") self.is_running = Truetry:while self.is_running: cycle_start = time.time()# 并发收集所有服务器的指标 tasks = [ asyncio.create_task(self.collect_metrics(server))for server in self.servers ] metrics = await asyncio.gather(*tasks)# 处理每个指标for metric in metrics:# 保存历史数据 self.metrics_history[metric.server_id].append(metric)# 保留最近100条记录if len(self.metrics_history[metric.server_id]) > 100: self.metrics_history[metric.server_id] = \ self.metrics_history[metric.server_id][-100:]# 分析指标并生成告警 alerts = await self.analyze_metrics(metric)if alerts:await self.process_alerts(alerts)# 定期清理旧告警if random.random() < 0.2: # 20%概率执行清理await self.cleanup_old_alerts()# 输出周期统计 cycle_duration = time.time() - cycle_start print(f"监控周期完成: {cycle_duration:.2f}秒, "f"活跃告警: {len(self.active_alerts)}个")# 等待下一个周期await asyncio.sleep(self.check_interval)except asyncio.CancelledError: print("监控任务被取消")finally: self.is_running = Falseasyncdefstart_monitoring(self):"""启动监控"""if self.monitoring_task isNoneor self.monitoring_task.done(): self.monitoring_task = asyncio.create_task(self.monitor_loop())asyncdefstop_monitoring(self):"""停止监控""" self.is_running = Falseif self.monitoring_task andnot self.monitoring_task.done(): self.monitoring_task.cancel()try:await self.monitoring_taskexcept asyncio.CancelledError:passasyncdefget_server_stats(self, server_id: str) -> Dict:"""获取服务器统计信息"""if server_id notin self.metrics_history:return {"error": "服务器不存在"} metrics = self.metrics_history[server_id]ifnot metrics:return {"server_id": server_id, "message": "暂无数据"}# 计算统计信息 latest = metrics[-1] cpu_values = [m.cpu_percent for m in metrics] memory_values = [m.memory_percent for m in metrics] stats = {"server_id": server_id,"latest_metric": {"timestamp": latest.timestamp.isoformat(),"cpu_percent": latest.cpu_percent,"memory_percent": latest.memory_percent,"network_in_mbps": latest.network_in_mbps,"network_out_mbps": latest.network_out_mbps,"disk_usage_percent": latest.disk_usage_percent },"statistics": {"cpu_avg": sum(cpu_values) / len(cpu_values),"cpu_max": max(cpu_values),"memory_avg": sum(memory_values) / len(memory_values),"memory_max": max(memory_values),"total_metrics": len(metrics),"monitoring_period_minutes": len(metrics) * self.check_interval / 60 } }return statsasyncdefdemo_monitoring():"""监控系统演示""" print("=== 实时监控系统演示 ===")# 创建监控器 monitor = RealTimeMonitor( alert_thresholds={"high_cpu": 75.0, # 降低阈值以便更容易触发"high_memory": 80.0,"high_disk": 85.0,"network_spike": 80.0 }, check_interval=3.0# 更短的检查间隔 )# 启动监控await monitor.start_monitoring()# 让监控运行一段时间 print("监控运行中,持续15秒...")await asyncio.sleep(15)# 停止监控await monitor.stop_monitoring()# 展示统计信息 print("\n=== 监控统计 ===")# 随机选择几台服务器展示统计 sample_servers = random.sample(monitor.servers, min(3, len(monitor.servers)))for server_id in sample_servers: stats = await monitor.get_server_stats(server_id) print(f"\n服务器: {server_id}")if"latest_metric"in stats: latest = stats["latest_metric"] print(f" 最新指标:") print(f" 时间: {latest['timestamp']}") print(f" CPU: {latest['cpu_percent']:.1f}%") print(f" 内存: {latest['memory_percent']:.1f}%") print(f" 网络流入: {latest['network_in_mbps']:.1f} Mbps") print(f" 网络流出: {latest['network_out_mbps']:.1f} Mbps")if"statistics"in stats: stat = stats["statistics"] print(f" 统计信息:") print(f" 平均CPU: {stat['cpu_avg']:.1f}%") print(f" 最高CPU: {stat['cpu_max']:.1f}%") print(f" 监控周期: {stat['monitoring_period_minutes']:.1f} 分钟")# 展示活跃告警 print(f"\n=== 活跃告警 ===") print(f"总活跃告警数: {len(monitor.active_alerts)}")if monitor.active_alerts:for alert_id, alert in list(monitor.active_alerts.items())[:3]: print(f"\n告警ID: {alert.alert_id}") print(f" 服务器: {alert.server_id}") print(f" 类型: {alert.alert_type}") print(f" 严重程度: {alert.severity}") print(f" 指标值: {alert.metric_value:.1f}") print(f" 阈值: {alert.threshold}") print(f" 时间: {alert.timestamp}") print(f" 消息: {alert.message}")else: print("暂无活跃告警")asyncdefstress_test_monitor():"""压力测试:模拟大量服务器""" print("\n=== 监控系统压力测试 ===")# 创建包含100台服务器的监控器 large_monitor = RealTimeMonitor( alert_thresholds={"high_cpu": 85.0}, check_interval=2.0 )# 替换为大量服务器 large_monitor.servers = [f"stress-server-{i:04d}"for i in range(100)] large_monitor.metrics_history = { server: [] for server in large_monitor.servers }# 运行10秒压力测试await large_monitor.start_monitoring()await asyncio.sleep(10)await large_monitor.stop_monitoring()# 统计 total_metrics = sum(len(metrics) for metrics in large_monitor.metrics_history.values()) print(f"压力测试结果:") print(f" 服务器数量: {len(large_monitor.servers)}") print(f" 总指标收集次数: {total_metrics}") print(f" 活跃告警数: {len(large_monitor.active_alerts)}") print(f" 平均每秒处理指标数: {total_metrics / 10:.1f}")if __name__ == "__main__":# 运行演示 asyncio.run(demo_monitoring())# 可选:运行压力测试# asyncio.run(stress_test_monitor())
问题描述:在协程中调用了同步阻塞函数(如time.sleep()、同步数据库查询),导致整个事件循环被阻塞。import asyncioimport timeasyncdefbad_example():"""错误:在协程中使用同步阻塞调用""" print("开始任务") time.sleep(5) # ❌ 同步阻塞,会卡住整个事件循环 print("任务完成")asyncdefmain(): tasks = [bad_example() for _ in range(3)]await asyncio.gather(*tasks) # 实际上还是串行执行# 运行结果:总耗时约15秒,而不是预期的5秒
import asyncio# 方案1:使用异步版本的函数asyncdefcorrect_example1():"""正确:使用异步等待""" print("开始任务")await asyncio.sleep(5) # ✅ 非阻塞等待 print("任务完成")# 方案2:将同步函数放到线程池执行import concurrent.futuresdefblocking_io_operation():"""模拟同步阻塞IO操作""" time.sleep(5)return"操作完成"asyncdefcorrect_example2():"""正确:使用run_in_executor""" print("开始阻塞操作")# 在线程池中执行阻塞函数 loop = asyncio.get_running_loop() result = await loop.run_in_executor(None, # 使用默认线程池 blocking_io_operation ) print(f"操作结果: {result}")# 方案3:寻找异步库替代同步库# 同步requests → 异步aiohttp或httpx# 同步sqlite3 → 异步aiosqlite# 同步psycopg2 → 异步asyncpgasyncdefdemo_solutions():"""演示解决方案""" print("=== 方案1: 使用异步函数 ===") start = time.time()await asyncio.gather(*[correct_example1() for _ in range(3)]) print(f"总耗时: {time.time() - start:.2f}秒") # 约5秒 print("\n=== 方案2: 使用线程池 ===") start = time.time()await correct_example2() print(f"总耗时: {time.time() - start:.2f}秒") # 约5秒if __name__ == "__main__": asyncio.run(demo_solutions())
- 对于无法避免的同步调用,使用run_in_executor
问题描述:定义了协程函数但调用时忘记加await,导致协程没有实际执行。import asyncioasyncdeffetch_data():"""获取数据的协程"""await asyncio.sleep(1)return {"data": "示例数据"}asyncdefbad_example():"""错误:忘记await""" result = fetch_data() # ❌ 没有加await,result是协程对象,不是结果 print(f"结果: {result}") # 输出: <coroutine object fetch_data at 0x...># 更隐蔽的错误:在表达式中忘记await data_list = [fetch_data() for _ in range(3)] # ❌ 列表中是协程对象 print(f"数据列表: {data_list}")asyncdefmain():await bad_example()
import asyncioasyncdeffetch_data():"""获取数据的协程"""await asyncio.sleep(1)return {"data": f"数据-{id(asyncio.current_task())}"}asyncdefcorrect_example1():"""正确:显式await""" print("开始获取数据") result = await fetch_data() # ✅ 正确使用await print(f"获取到数据: {result}")return resultasyncdefcorrect_example2():"""正确:批量await""" print("开始批量获取数据")# 创建任务列表 tasks = [asyncio.create_task(fetch_data()) for _ in range(3)]# 等待所有任务完成 results = await asyncio.gather(*tasks) print(f"批量结果: {results}")return resultsasyncdefcorrect_example3():"""正确:在列表推导中使用await""" print("开始列表推导获取数据")# ❌ 错误:data_list = [fetch_data() for _ in range(3)]# ✅ 正确:先创建任务,再gather tasks = [fetch_data() for _ in range(3)] data_list = await asyncio.gather(*tasks) print(f"列表推导结果: {data_list}")return data_list# 高级技巧:自动检测未await的协程(开发时辅助)import warningsdefwarn_unawaited_coroutine(coro):"""警告未await的协程(开发辅助)"""import inspect# 获取调用栈信息 stack = inspect.stack() caller_info = stack[1] # 调用者信息 warnings.warn(f"协程 '{coro.__name__}' 在 {caller_info.filename}:{caller_info.lineno} "f"被调用但未使用await,这通常是个错误!", category=RuntimeWarning, stacklevel=2 )return coroasyncdefdemo_await_solutions():"""演示await解决方案""" print("=== 方案1: 显式await ===")await correct_example1() print("\n=== 方案2: 批量gather ===")await correct_example2() print("\n=== 方案3: 列表推导正确用法 ===")await correct_example3() print("\n=== 开发辅助:未await警告 ===")# 模拟忘记await的情况 coro = fetch_data() warn_unawaited_coroutine(coro)# 正确的应该这样:# result = await fetch_data()if __name__ == "__main__": asyncio.run(demo_await_solutions())
- 使用类型注解:async def func() -> Awaitable[ResultType]
问题描述:协程中发生异常但没有被正确捕获,导致任务失败但不报错,难以调试。import asyncioasyncdefrisky_operation():"""可能失败的协程"""await asyncio.sleep(0.5)ifTrue: # 模拟异常条件raise ValueError("模拟的业务异常")return"成功"asyncdefbad_example1():"""错误:异常被静默吞没"""try: task = asyncio.create_task(risky_operation())# 忘记await task,异常不会传播 print("任务已创建,但不会等待")except Exception as e: print(f"捕获到异常: {e}") # 这行不会执行!asyncdefbad_example2():"""错误:gather时异常处理不当""" tasks = [risky_operation() for _ in range(3)]# ❌ 错误:如果某个任务失败,整个gather会失败try: results = await asyncio.gather(*tasks) print(f"结果: {results}")except ValueError as e: print(f"有任务失败: {e}") # 只能看到一个异常asyncdefmain(): print("=== 错误示例1 ===")await bad_example1()await asyncio.sleep(1) # 给任务时间执行 print("\n=== 错误示例2 ===")await bad_example2()
import asyncioasyncdefrisky_operation(id: int):"""可能失败的协程,带ID标识"""await asyncio.sleep(0.5)# 模拟:ID为偶数的任务失败if id % 2 == 0:raise ValueError(f"任务{id}的业务异常")returnf"任务{id}成功"asyncdefcorrect_example1():"""正确:确保所有任务都被await""" print("方法1: 确保await每个任务") task = asyncio.create_task(risky_operation(1))try: result = await task # ✅ 正确:await任务 print(f"任务结果: {result}")except ValueError as e: print(f"任务失败: {e}")asyncdefcorrect_example2():"""正确:使用gather的return_exceptions参数""" print("\n方法2: 使用return_exceptions收集所有结果") tasks = [risky_operation(i) for i in range(5)]# ✅ 正确:收集所有结果,包括异常 results = await asyncio.gather(*tasks, return_exceptions=True)for i, result in enumerate(results):if isinstance(result, Exception): print(f"任务{i}失败: {result}")else: print(f"任务{i}成功: {result}")asyncdefcorrect_example3():"""正确:使用TaskGroup(Python 3.11+)自动异常处理""" print("\n方法3: 使用TaskGroup结构化并发")try:asyncwith asyncio.TaskGroup() as tg:# 创建多个任务 task1 = tg.create_task(risky_operation(10)) task2 = tg.create_task(risky_operation(11)) task3 = tg.create_task(risky_operation(12)) # 偶数,会失败 print("所有任务已创建,等待完成...")except* ValueError as eg: # 异常组处理 print(f"捕获到异常组,包含{len(eg.exceptions)}个异常:")for exc in eg.exceptions: print(f" - {exc}")else: print("所有任务成功完成")asyncdefcorrect_example4():"""正确:添加任务完成回调监控""" print("\n方法4: 使用add_done_callback监控任务")asyncdefmonitor_task(task: asyncio.Task, task_id: int):"""监控任务完成状态"""defdone_callback(future):if future.cancelled(): print(f"任务{task_id}被取消")elif future.exception(): print(f"任务{task_id}异常: {future.exception()}")else: print(f"任务{task_id}成功: {future.result()}") task.add_done_callback(done_callback)return task# 创建并监控多个任务 tasks = []for i in range(3): task = asyncio.create_task(risky_operation(i)) monitored_task = await monitor_task(task, i) tasks.append(monitored_task)# 等待所有任务await asyncio.gather(*tasks, return_exceptions=True)asyncdefdemo_exception_solutions():"""演示异常处理解决方案"""await correct_example1()await correct_example2()# Python 3.11+ 特有功能import sysif sys.version_info >= (3, 11):await correct_example3()await correct_example4()if __name__ == "__main__": asyncio.run(demo_exception_solutions())
- 使用return_exceptions:收集所有任务结果,不因单个失败而中断
- 利用TaskGroup:Python 3.11+ 提供更安全的并发控制
问题描述:异步操作中创建了资源(数据库连接、HTTP会话、文件句柄)但没有正确关闭,导致资源泄漏。import asyncioasyncdefbad_example():"""错误:资源未正确关闭"""# 模拟数据库连接 conn = {"status": "connected"}try:await asyncio.sleep(1) print("执行查询...")return"查询结果"finally:# ❌ 错误:忘记关闭连接# 应该调用: await conn.close() print("忘记关闭连接!")# 连接资源泄漏asyncdefbad_example2():"""错误:未使用异步上下文管理器"""# 模拟文件操作 file = open("temp.txt", "w")try: file.write("测试数据")await asyncio.sleep(0.5) # 模拟异步操作finally: file.close() # ✅ 关闭了,但不是异步方式# 在异步环境中,同步IO可能阻塞事件循环
import asynciofrom contextlib import asynccontextmanager# 方案1:使用异步上下文管理器classAsyncDatabaseConnection:"""异步数据库连接"""def__init__(self, connection_string: str): self.connection_string = connection_string self._is_closed = Falseasyncdefconnect(self):"""建立连接""" print(f"连接数据库: {self.connection_string}")await asyncio.sleep(0.3) self._is_closed = Falsereturn selfasyncdefclose(self):"""关闭连接"""ifnot self._is_closed: print("关闭数据库连接")await asyncio.sleep(0.2) self._is_closed = Trueasyncdefquery(self, sql: str):"""执行查询"""if self._is_closed:raise RuntimeError("连接已关闭") print(f"执行查询: {sql}")await asyncio.sleep(0.5)return [{"id": 1, "result": "数据"}]asyncdef__aenter__(self):"""进入上下文"""returnawait self.connect()asyncdef__aexit__(self, exc_type, exc_val, exc_tb):"""退出上下文"""await self.close()# 方案2:使用contextlib.asynccontextmanager装饰器@asynccontextmanagerasyncdefmanaged_http_session():"""管理HTTP会话的上下文管理器"""import aiohttp session = aiohttp.ClientSession()try: print("创建HTTP会话")yield sessionfinally: print("关闭HTTP会话")await session.close()# 方案3:资源池模式(连接复用)classConnectionPool:"""连接池,复用资源"""def__init__(self, max_size: int = 10): self.max_size = max_size self.pool = asyncio.Queue(maxsize=max_size) self._in_use = set()# 初始化连接池for i in range(max_size): self.pool.put_nowait(f"connection-{i}")asyncdefacquire(self):"""从池中获取连接""" conn = await self.pool.get() self._in_use.add(conn) print(f"获取连接: {conn}, 使用中: {len(self._in_use)}")return connasyncdefrelease(self, conn):"""释放连接回池中"""if conn in self._in_use: self._in_use.remove(conn)await self.pool.put(conn) print(f"释放连接: {conn}, 剩余可用: {self.pool.qsize()}")else: print(f"警告: 尝试释放未使用的连接: {conn}")asyncdefcorrect_example1():"""正确:使用异步上下文管理器""" print("=== 异步上下文管理器示例 ===")asyncwith AsyncDatabaseConnection("postgresql://localhost/db") as conn: results = await conn.query("SELECT * FROM users") print(f"查询结果: {results}")# 连接自动关闭 print("上下文退出,资源已清理")asyncdefcorrect_example2():"""正确:使用contextlib装饰器""" print("\n=== contextlib装饰器示例 ===")asyncwith managed_http_session() as session:asyncwith session.get("https://httpbin.org/get") as response: data = await response.text() print(f"获取到数据,长度: {len(data)}") print("HTTP会话已关闭")asyncdefcorrect_example3():"""正确:使用连接池复用资源""" print("\n=== 连接池示例 ===") pool = ConnectionPool(max_size=3)asyncdefworker(worker_id: int):"""工作协程,使用连接池""" conn = await pool.acquire()try: print(f"工作者 {worker_id} 使用连接 {conn}")await asyncio.sleep(1) # 模拟工作returnf"工作者 {worker_id} 完成"finally:await pool.release(conn)# 创建多个工作者,超过连接池大小 tasks = [asyncio.create_task(worker(i)) for i in range(5)] results = await asyncio.gather(*tasks) print(f"所有工作完成: {results}") print(f"连接池状态: 可用{pool.pool.qsize()}, 总数{pool.max_size}")asyncdefcorrect_example4():"""正确:使用asyncio.create_task的清理""" print("\n=== 任务级资源清理示例 ===")asyncdefmonitored_task(task_id: int):"""带资源监控的任务""" resources = []try:# 模拟获取资源 resource = f"resource-{task_id}" resources.append(resource) print(f"任务{task_id} 获取资源: {resource}")await asyncio.sleep(1)returnf"任务{task_id} 结果"finally:# 清理所有资源for res in resources: print(f"任务{task_id} 清理资源: {res}")# 创建并运行任务 task = asyncio.create_task(monitored_task(100))try: result = await task print(f"任务完成: {result}")except Exception as e: print(f"任务失败: {e}")# 即使失败,finally块也会执行asyncdefdemo_resource_solutions():"""演示资源管理解决方案"""await correct_example1()await correct_example2()await correct_example3()await correct_example4()if __name__ == "__main__": asyncio.run(demo_resource_solutions())
- 总是使用上下文管理器:async with确保资源正确释放
问题描述:异步代码的调用栈比同步代码复杂,错误发生时难以追踪问题源头。import asyncioasyncdefdeep_function():"""深层函数,会抛出异常"""await asyncio.sleep(0.1)raise ValueError("深层异常")asyncdefmiddle_function():"""中层函数"""await asyncio.sleep(0.1)returnawait deep_function()asyncdeftop_function():"""顶层函数"""await asyncio.sleep(0.1)returnawait middle_function()asyncdefbad_example():"""错误:没有足够的调试信息"""try: result = await top_function()return resultexcept ValueError as e: print(f"捕获到异常: {e}")# ❌ 错误:没有记录调用栈,难以定位问题# 不知道是deep_function抛出的异常asyncdefmain():await bad_example()
import asyncioimport tracebackimport loggingfrom functools import wraps# 配置日志logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')logger = logging.getLogger(__name__)# 方案1:增强的异常处理asyncdefdeep_function_enhanced():"""增强版深层函数"""try:await asyncio.sleep(0.1)raise ValueError("深层异常")except Exception as e:# 记录详细上下文 logger.error(f"deep_function失败: {e}") logger.debug(f"调用栈:\n{traceback.format_exc()}")raise# 重新抛出异常# 方案2:装饰器自动添加调试信息defdebug_async(func):"""异步函数调试装饰器""" @wraps(func)asyncdefwrapper(*args, **kwargs): func_name = func.__name__ logger.debug(f"开始执行异步函数: {func_name}")try: result = await func(*args, **kwargs) logger.debug(f"异步函数完成: {func_name}")return resultexcept Exception as e: logger.error(f"异步函数失败: {func_name}, 错误: {e}") logger.debug(f"详细调用栈:\n{traceback.format_exc()}")raisereturn wrapper@debug_asyncasyncdefmiddle_function_enhanced():"""装饰器增强的中层函数"""await asyncio.sleep(0.1)returnawait deep_function_enhanced()@debug_asyncasyncdeftop_function_enhanced():"""装饰器增强的顶层函数"""await asyncio.sleep(0.1)returnawait middle_function_enhanced()# 方案3:使用asyncio调试模式asyncdefdebug_mode_example():"""使用asyncio调试模式"""# 启用调试模式 asyncio.get_event_loop().set_debug(True)# 这会提供更多调试信息,包括:# - 未await的协程警告# - 慢回调和资源泄漏检测 print("启用asyncio调试模式")asyncdeftask_with_slow_callback():"""有慢回调的任务"""await asyncio.sleep(0.01)# 创建多个任务 tasks = [asyncio.create_task(task_with_slow_callback()) for _ in range(3)]await asyncio.gather(*tasks)# 方案4:结构化日志记录classStructuredLogger:"""结构化日志记录器"""def__init__(self, component: str): self.component = componentasyncdeflog_operation(self, operation: str, **kwargs):"""记录结构化操作日志""" log_data = {"timestamp": datetime.now().isoformat(),"component": self.component,"operation": operation,"task_id": id(asyncio.current_task()), **kwargs } logger.info(f"结构化日志: {json.dumps(log_data)}")return log_dataasyncdefstructured_logging_example():"""结构化日志示例""" db_logger = StructuredLogger("database")asyncdefquery_database():"""查询数据库,带结构化日志""" start_time = time.time()await db_logger.log_operation("query_start", sql="SELECT * FROM users", params={"limit": 100} )try:await asyncio.sleep(0.5) # 模拟查询 result = [{"id": 1, "name": "测试"}]await db_logger.log_operation("query_success", duration=time.time() - start_time, row_count=len(result) )return resultexcept Exception as e:await db_logger.log_operation("query_failure", duration=time.time() - start_time, error=str(e), error_type=type(e).__name__ )raise# 执行带结构化日志的操作try: results = await query_database() print(f"查询结果: {results}")except Exception as e: print(f"查询失败: {e}")# 方案5:任务追踪和监控asyncdeftask_tracing_example():"""任务追踪示例"""classTaskTracer:"""任务追踪器"""def__init__(self): self.active_tasks = {} self.completed_tasks = []defregister_task(self, task: asyncio.Task, name: str):"""注册任务""" task_id = id(task) self.active_tasks[task_id] = {"name": name,"start_time": time.time(),"task": task }# 添加完成回调 task.add_done_callback(lambda t: self._task_completed(t)) logger.info(f"注册任务: {name} (ID: {task_id})")return task_iddef_task_completed(self, task: asyncio.Task):"""任务完成回调""" task_id = id(task)if task_id in self.active_tasks: task_info = self.active_tasks.pop(task_id) duration = time.time() - task_info["start_time"] status = "cancelled"if task.cancelled() else"failed"if task.exception() else"completed" task_record = { **task_info,"duration": duration,"status": status,"end_time": time.time() } self.completed_tasks.append(task_record) logger.info(f"任务完成: {task_info['name']}, "f"状态: {status}, 耗时: {duration:.3f}秒" )defget_stats(self):"""获取统计信息"""return {"active": len(self.active_tasks),"completed": len(self.completed_tasks),"total_duration": sum(t["duration"] for t in self.completed_tasks),"avg_duration": ( sum(t["duration"] for t in self.completed_tasks) / len(self.completed_tasks) if self.completed_tasks else0 ) } tracer = TaskTracer()asyncdeftraced_task(name: str, duration: float):"""被追踪的任务""" task = asyncio.current_task() tracer.register_task(task, name)await asyncio.sleep(duration)if random.random() < 0.3: # 30%概率模拟失败raise RuntimeError(f"任务 {name} 模拟失败")returnf"任务 {name} 完成"# 创建多个被追踪的任务 tasks = []for i in range(5): task = asyncio.create_task( traced_task(f"工作{i}", random.uniform(0.5, 1.5)) ) tasks.append(task)# 等待所有任务try: results = await asyncio.gather(*tasks, return_exceptions=True) print(f"所有任务结果: {results}")finally:# 输出追踪统计 stats = tracer.get_stats() print(f"任务追踪统计: {stats}")asyncdefdemo_debugging_solutions():"""演示调试解决方案""" print("=== 增强异常处理 ===")try:await top_function_enhanced()except ValueError as e: print(f"捕获到增强异常: {e}")# 此时日志中已经有详细调用栈 print("\n=== 结构化日志 ===")await structured_logging_example() print("\n=== 任务追踪 ===")await task_tracing_example() print("\n=== 调试模式 ===")await debug_mode_example()if __name__ == "__main__": asyncio.run(demo_debugging_solutions())
- 性能 profiling:定期检查慢回调和资源使用
通过本文的学习,你已经掌握了asyncio模块的核心知识和实战技能:- 异步编程原理:理解了事件循环、协程、任务三要素的协作机制
- 最新特性掌握:学会了Python 3.11+的TaskGroup结构化并发和timeout控制
- 实战应用能力:能够构建高性能Web API、高效爬虫、实时监控系统
- 问题解决技巧:掌握了异步编程中的常见问题和解决方案
- 性能优化方法:了解了连接复用、批量处理、缓存等优化技巧
- 《Python高级编程(第3版)》:全面覆盖Python高级特性,包括异步编程
- 《流畅的Python(第2版)》:Luciano Ramalho的经典之作,第18章专门讲解asyncio
- 《Asyncio in Practice》:专注于asyncio实战的英文书籍
- FastAPI:https://github.com/tiangolo/fastapi
- aiohttp:https://github.com/aio-libs/aiohttp
- uvicorn:https://github.com/encode/uvicorn
- Real Python的Asyncio教程:https://realpython.com/async-io-python/
- Async/await深入理解:https://docs.python.org/3/library/asyncio.html
- Python Discord服务器:https://discord.gg/python
- Stack Overflow:https://stackoverflow.com/questions/tagged/python-asyncio
- 静态类型检查工具(mypy、Pyright)的实战配置
敬请期待下一篇《Python typing模块深度解析:从类型提示到静态检查,构建健壮可维护的Python代码体系》!
温馨提示:在实际项目中使用asyncio时,建议:
希望本文能帮助你在异步编程的道路上走得更远、更稳!如果有任何问题或建议,欢迎在评论区交流讨论。🙂