当前位置:首页>python>Python asyncio模块深度解析:掌握异步编程核心,让你的程序性能飙升10倍!

Python asyncio模块深度解析:掌握异步编程核心,让你的程序性能飙升10倍!

  • 2026-04-12 10:35:54
Python asyncio模块深度解析:掌握异步编程核心,让你的程序性能飙升10倍!
处理高并发请求时程序响应缓慢:当1000个用户同时访问你的API接口,同步代码只能逐个处理,后面的用户需要排队等待,体验极差。
爬虫效率低下浪费大量时间:使用requests库同步抓取100个网页,每个网页耗时2秒,总时间就是200秒,大部分时间都在等待网络响应。
数据库查询阻塞主线程:在Web应用中执行数据库查询时,整个线程都被阻塞,无法同时处理其他用户请求,服务器资源利用率极低。
实时通信应用连接数受限:使用同步代码开发WebSocket服务器,每个连接都需要一个线程,1000个连接就需要1000个线程,系统资源很快耗尽。
处理大文件时内存爆炸:一次性读取几个GB的大文件到内存,程序因OOM(内存溢出)崩溃,无法实现流式处理。
asyncio模块正是为解决这些痛点而生!它提供了完整的异步编程框架,让你用单线程就能处理成千上万的并发连接,真正发挥出硬件的全部潜力。通过本文,你将掌握asyncio的核心原理、最新特性、实战应用场景和常见问题解决方案,彻底告别同步编程的性能瓶颈。
一、模块概述
1.1 模块定位与设计哲学
asyncio是Python 3.4版本引入的标准库模块,旨在为异步IO操作提供基础设施,支持编写单线程并发代码。它的设计哲学是"非阻塞IO+事件循环",通过协程(Coroutine)和事件循环(Event Loop)的巧妙组合,实现高效的并发处理。
核心设计理念
  • 单线程并发:避免多线程带来的锁竞争和上下文切换开销
  • 非阻塞IO:遇到IO操作时立即让出控制权,不阻塞其他任务执行
  • 回调驱动:基于事件驱动的编程模型,通过回调函数处理异步结果
  • 结构化并发:Python 3.11+引入TaskGroup,提供更安全的并发控制
1.2 模块结构全景图
asyncio模块包含多个核心组件,下面是其主要功能分类:
功能类别
核心组件
主要作用
关键API
协程管理
Coroutine
定义可暂停的异步函数
async defawait
任务调度
Task
包装协程进行并发调度
create_task()TaskGroup
事件循环
EventLoop
调度所有协程的执行
asyncio.run()get_event_loop()
同步原语
Lock/Semaphore
控制并发访问共享资源
Lock()Semaphore()
网络通信
Streams
提供TCP/UDP异步通信
open_connection()start_server()
时间控制
Timeouts
设置异步操作超时
timeout()wait_for()
队列管理
Queue
实现生产者-消费者模式
Queue()PriorityQueue()
1.3 适用场景说明
asyncio特别适合以下场景:
  1. 高并发网络应用:Web服务器、API网关、实时通信服务器
  2. IO密集型任务:网络爬虫、文件批量处理、数据库批量操作
  3. 微服务架构:服务间异步通信、消息队列消费
  4. 实时数据处理:WebSocket服务器、实时监控系统
  5. 定时任务调度:周期性的数据同步、状态检查
不适合的场景:纯计算密集型任务(如图像处理、复杂数学运算),这类任务更适合使用多进程(multiprocessing)或专门的数值计算库。
二、核心函数解析
2.1 协程基础:async/await语法
协程是asyncio的基础构建块,使用async def定义,通过await调用。
import asyncio# 定义协程函数asyncdefsay_hello(name: str) -> str:"""简单的协程函数示例    Args:        name: 要问候的名字    Returns:        问候语字符串    """await asyncio.sleep(1)  # 模拟耗时操作,非阻塞等待returnf"Hello, {name}!"asyncdefmain():"""主协程函数"""# 调用协程    greeting = await say_hello("Python开发者")    print(greeting)  # 输出: Hello, Python开发者!# 同时执行多个协程    tasks = [        say_hello("Alice"),        say_hello("Bob"),        say_hello("Charlie")    ]# 使用gather并发执行    results = await asyncio.gather(*tasks)    print(results)  # 输出: ['Hello, Alice!', 'Hello, Bob!', 'Hello, Charlie!']# 运行入口if __name__ == "__main__":    asyncio.run(main())
关键点
  • async def定义的是协程函数,调用时返回协程对象,不会立即执行
  • await只能在协程函数内部使用,用于等待另一个协程或异步操作完成
  • asyncio.run()是Python 3.7+推荐的运行入口,负责创建和管理事件循环
2.2 任务调度:create_task()和TaskGroup
任务(Task)是事件循环调度的基本单位,可以通过create_task()创建。
import asyncioimport timeasyncdefdownload_file(filename: str, size: int):"""模拟下载文件的协程    Args:        filename: 文件名        size: 文件大小(MB)    """    print(f"开始下载 {filename} ({size}MB)...")await asyncio.sleep(size * 0.1)  # 模拟下载时间    print(f"下载完成 {filename}")return {"filename": filename, "size": size}asyncdefmain_old_style():"""旧式任务调度(Python 3.7之前)"""    start_time = time.time()# 创建任务但不立即等待    task1 = asyncio.create_task(download_file("report.pdf"10))    task2 = asyncio.create_task(download_file("data.csv"5))    task3 = asyncio.create_task(download_file("image.jpg"8))# 分别等待任务完成    result1 = await task1    result2 = await task2    result3 = await task3    end_time = time.time()    print(f"总下载时间: {end_time - start_time:.2f}秒")    print(f"下载结果: {[result1, result2, result3]}")# 实际耗时约1.0秒(三个任务并发执行),而不是2.3秒(串行执行)# Python 3.11+ 新特性:TaskGroup(结构化并发)asyncdefmain_new_style():"""新式任务调度(Python 3.11+)"""    start_time = time.time()asyncwith asyncio.TaskGroup() as tg:# 使用TaskGroup创建任务,自动管理异常和取消        task1 = tg.create_task(download_file("report.pdf"10))        task2 = tg.create_task(download_file("data.csv"5))        task3 = tg.create_task(download_file("image.jpg"8))# TaskGroup退出时会自动等待所有任务完成    end_time = time.time()    print(f"TaskGroup总下载时间: {end_time - start_time:.2f}秒")# 运行比较if __name__ == "__main__":    print("=== 旧式任务调度 ===")    asyncio.run(main_old_style())    print("\n=== 新式TaskGroup调度 ===")    asyncio.run(main_new_style())
TaskGroup的优势
  1. 异常传播:一个任务失败会自动取消其他任务,防止资源泄漏
  2. 自动等待:不需要显式调用gather()或分别await每个任务
  3. 结构化:代码更清晰,逻辑更安全
2.3 事件循环:asyncio.run()和底层API
事件循环是asyncio的心脏,负责调度所有协程的执行。
import asyncioimport threadingasyncdefworker(name: str, delay: float):"""工作协程"""    print(f"Worker {name} 开始工作,预计耗时{delay}秒")await asyncio.sleep(delay)    print(f"Worker {name} 完成工作")returnf"Result from {name}"# 方式1:推荐的简单方式(Python 3.7+)asyncdefsimple_main():"""使用asyncio.run()简化的事件循环管理"""    tasks = [        worker("A"1.5),        worker("B"1.0),        worker("C"2.0)    ]    results = await asyncio.gather(*tasks)    print(f"所有任务完成: {results}")# 方式2:手动管理事件循环(需要更精细控制时)defmanual_event_loop():"""手动管理事件循环(适用于特殊场景)"""# 获取或创建事件循环    loop = asyncio.new_event_loop()    asyncio.set_event_loop(loop)try:# 运行多个任务        tasks = [            worker("手动-A"1.0),            worker("手动-B"0.5)        ]# 使用gather并发执行        results = loop.run_until_complete(asyncio.gather(*tasks))        print(f"手动模式结果: {results}")# 在事件循环中运行同步函数(线程池)import concurrent.futuresdefblocking_io():            print("执行阻塞IO操作...")            time.sleep(1)return"阻塞IO完成"# 将同步函数放到线程池中运行        future = loop.run_in_executor(None, blocking_io)        result = loop.run_until_complete(future)        print(f"线程池执行结果: {result}")finally:# 清理资源        loop.close()if __name__ == "__main__":    print("=== 简单方式(推荐) ===")    asyncio.run(simple_main())    print("\n=== 手动管理事件循环 ===")    manual_event_loop()
事件循环最佳实践
  1. 优先使用asyncio.run():适用于大多数场景,自动管理循环生命周期
  2. 避免在异步代码中混合同步阻塞调用:会阻塞整个事件循环
  3. 使用run_in_executor()处理阻塞操作:将同步函数放到线程池执行
2.4 同步原语:Lock、Semaphore、Event
在并发环境中,有时需要协调多个协程的执行顺序。
import asyncio# 1. Lock(互斥锁):保护共享资源asyncdefuse_lock(lock: asyncio.Lock, resource_id: int):"""使用锁保护共享资源访问"""asyncwith lock:  # 自动获取和释放锁        print(f"协程 {resource_id} 获得锁,访问共享资源")await asyncio.sleep(0.5)        print(f"协程 {resource_id} 释放锁")return resource_id# 2. Semaphore(信号量):限制并发数量asyncdeflimited_concurrent(sem: asyncio.Semaphore, task_id: int):"""使用信号量控制并发数"""asyncwith sem:        print(f"任务 {task_id} 获得信号量,开始执行")await asyncio.sleep(1)        print(f"任务 {task_id} 释放信号量")return task_id# 3. Event(事件):协程间通信asyncdefwait_for_event(event: asyncio.Event, waiter_id: int):"""等待事件的协程"""    print(f"等待者 {waiter_id} 开始等待事件")await event.wait()  # 等待事件被设置    print(f"等待者 {waiter_id} 检测到事件已触发")asyncdeftrigger_event(event: asyncio.Event):"""触发事件的协程"""await asyncio.sleep(2)    print("触发器:设置事件")    event.set()  # 设置事件,唤醒所有等待者asyncdefmain_sync_primitives():"""演示同步原语的使用"""    print("=== 1. Lock演示 ===")    lock = asyncio.Lock()    lock_tasks = [use_lock(lock, i) for i in range(3)]await asyncio.gather(*lock_tasks)    print("\n=== 2. Semaphore演示 ===")    sem = asyncio.Semaphore(2)  # 最多同时2个并发    sem_tasks = [limited_concurrent(sem, i) for i in range(5)]await asyncio.gather(*sem_tasks)    print("\n=== 3. Event演示 ===")    event = asyncio.Event()# 创建3个等待者和1个触发器    waiters = [wait_for_event(event, i) for i in range(3)]    trigger = trigger_event(event)await asyncio.gather(*waiters, trigger)if __name__ == "__main__":    asyncio.run(main_sync_primitives())
同步原语使用场景
  • Lock:保护数据库连接、文件写入等共享资源
  • Semaphore:限制API调用频率、控制数据库连接池大小
  • Event:等待系统初始化完成、通知多个协程开始执行
三、高级用法
3.1 异步上下文管理器
异步上下文管理器使用async with语句,可以在进入和退出时执行异步操作。
import asyncioimport aiofilesclassAsyncDatabaseConnection:"""异步数据库连接上下文管理器"""def__init__(self, connection_string: str):        self.connection_string = connection_string        self.connection = Noneasyncdef__aenter__(self):"""进入上下文时建立连接"""        print(f"连接到数据库: {self.connection_string}")await asyncio.sleep(0.5)  # 模拟连接建立        self.connection = {"status""connected""connection_string": self.connection_string}return self.connectionasyncdef__aexit__(self, exc_type, exc_val, exc_tb):"""退出上下文时关闭连接"""if self.connection:            print("关闭数据库连接")await asyncio.sleep(0.2)  # 模拟连接关闭            self.connection = Noneasyncdefquery(self, sql: str):"""执行查询"""ifnot self.connection:raise RuntimeError("数据库未连接")        print(f"执行查询: {sql}")await asyncio.sleep(0.3)  # 模拟查询执行return [{"id"1"name""测试数据"}]asyncdefasync_context_example():"""异步上下文管理器示例"""# 使用异步上下文管理器asyncwith AsyncDatabaseConnection("postgresql://localhost/mydb"as db:        print(f"数据库连接状态: {db}")# 执行查询        results = await db.query("SELECT * FROM users")        print(f"查询结果: {results}")# 上下文退出后自动关闭连接    print("上下文已退出,连接自动关闭")# 异步文件操作(使用aiofiles库)asyncdefasync_file_operations():"""异步文件读写示例"""# 异步写入文件asyncwith aiofiles.open('test_async.txt', mode='w'as f:await f.write('异步写入的内容\n')await f.write('第二行内容\n')# 异步读取文件asyncwith aiofiles.open('test_async.txt', mode='r'as f:        content = await f.read()        print(f"文件内容:\n{content}")# 异步逐行读取asyncwith aiofiles.open('test_async.txt', mode='r'as f:asyncfor line in f:            print(f"行: {line.strip()}")asyncdefmain_advanced():"""高级用法演示主函数"""    print("=== 异步上下文管理器 ===")await async_context_example()    print("\n=== 异步文件操作 ===")await async_file_operations()if __name__ == "__main__":    asyncio.run(main_advanced())
异步上下文管理器优势
  1. 资源自动管理:确保资源(连接、文件等)正确释放
  2. 异常安全:即使在协程中发生异常,__aexit__也会被调用
  3. 代码简洁:避免繁琐的try-finally语句
3.2 异步迭代器和生成器
异步迭代器使用async for语句,可以在迭代过程中执行异步操作。
import asynciofrom typing import AsyncIterator, ListclassAsyncDataFetcher:"""异步数据获取迭代器"""def__init__(self, urls: List[str], batch_size: int = 3):        self.urls = urls        self.batch_size = batch_size        self.current_index = 0def__aiter__(self) -> "AsyncDataFetcher":"""返回异步迭代器自身"""return selfasyncdef__anext__(self) -> str:"""获取下一个数据"""if self.current_index >= len(self.urls):raise StopAsyncIteration# 模拟异步数据获取        url = self.urls[self.current_index]        print(f"开始获取: {url}")await asyncio.sleep(0.5)  # 模拟网络请求        self.current_index += 1returnf"数据 from {url}"# 异步生成器(Python 3.6+)asyncdefasync_generator(start: int, end: int):"""异步生成器示例"""for i in range(start, end):# 在yield前执行异步操作await asyncio.sleep(0.1)yield iasyncdefprocess_large_dataset():"""处理大型数据集的异步生成器"""# 模拟从数据库分页读取数据    page_size = 100    total_records = 1000for page in range(0, total_records, page_size):# 模拟异步数据库查询await asyncio.sleep(0.2)# 生成当前页的数据        start_id = page + 1        end_id = min(page + page_size, total_records)for record_id in range(start_id, end_id + 1):yield {"id": record_id, "data"f"记录{record_id}"}asyncdefmain_async_iteration():"""异步迭代演示"""    print("=== 异步迭代器示例 ===")    urls = ["https://api.example.com/data1","https://api.example.com/data2""https://api.example.com/data3","https://api.example.com/data4"    ]    fetcher = AsyncDataFetcher(urls)asyncfor data in fetcher:        print(f"获取到数据: {data}")    print("\n=== 异步生成器示例 ===")asyncfor number in async_generator(16):        print(f"生成器产生的数字: {number}")    print("\n=== 处理大型数据集 ===")    count = 0asyncfor record in process_large_dataset():        count += 1if count <= 5:  # 只显示前5条记录            print(f"记录: {record}")if count >= 50:  # 只处理前50条记录作为演示break    print(f"总共处理了 {count} 条记录")if __name__ == "__main__":    asyncio.run(main_async_iteration())
异步迭代器适用场景
  1. 分页查询:从数据库或API分页获取数据
  2. 流式处理:实时处理数据流(如日志文件、消息队列)
  3. 批量操作:分批处理大量数据,避免内存溢出
3.3 性能优化技巧
掌握以下技巧可以显著提升asyncio应用的性能。
import asyncioimport timefrom functools import wraps# 技巧1:连接复用(HTTP客户端)asyncdefhttp_connection_pooling():"""HTTP连接池复用示例"""import aiohttp    urls = [f"https://httpbin.org/delay/{i%3}"for i in range(10)]# 创建连接池,限制最大连接数    connector = aiohttp.TCPConnector(limit=5, limit_per_host=3)asyncwith aiohttp.ClientSession(connector=connector) as session:        tasks = []for url in urls:            task = asyncio.create_task(                session.get(url, timeout=10)            )            tasks.append(task)        responses = await asyncio.gather(*tasks)# 处理响应for resp in responses:if resp.status == 200:                data = await resp.text()                print(f"成功获取 {len(data)} 字节数据")            resp.close()# 技巧2:批量处理减少上下文切换asyncdefbatch_processing():"""批量处理减少await次数"""# ❌ 不推荐:频繁await# async def process_single(item):#     await asyncio.sleep(0.01)#     return item * 2# ✅ 推荐:批量处理asyncdefprocess_batch(items):"""批量处理一组数据"""await asyncio.sleep(0.05)  # 假设批量处理需要固定时间return [item * 2for item in items]# 准备测试数据    all_items = list(range(100))    batch_size = 10# 分批处理    results = []for i in range(0, len(all_items), batch_size):        batch = all_items[i:i + batch_size]        batch_result = await process_batch(batch)        results.extend(batch_result)    print(f"批量处理完成,共 {len(results)} 个结果")return results# 技巧3:使用asyncio.timeout()控制超时(Python 3.11+)asyncdeftimeout_control():"""使用超时控制防止任务无限等待"""asyncdefslow_task(duration: float):"""模拟耗时任务"""await asyncio.sleep(duration)returnf"任务完成,耗时{duration}秒"try:# 使用timeout上下文管理器asyncwith asyncio.timeout(2.0):            result = await slow_task(3.0)  # 这个任务需要3秒,会超时return resultexcept asyncio.TimeoutError:        print("任务执行超时,已取消")returnNone# 技巧4:协程缓存(memoization)defasync_cache(maxsize=128):"""异步函数缓存装饰器"""    cache = {}defdecorator(func):        @wraps(func)asyncdefwrapper(*args, **kwargs):# 生成缓存键            key = (args, frozenset(kwargs.items()))if key in cache:                print(f"缓存命中: {func.__name__}{args}")return cache[key]# 执行函数            result = await func(*args, **kwargs)# 更新缓存if len(cache) >= maxsize:# 简单的LRU:移除第一个键                cache.pop(next(iter(cache)))            cache[key] = resultreturn resultreturn wrapperreturn decorator@async_cache(maxsize=10)asyncdefexpensive_operation(x: int):"""模拟昂贵的计算或IO操作"""await asyncio.sleep(1.0)return x * xasyncdefmain_performance():"""性能优化演示"""    print("=== 连接池复用 ===")await http_connection_pooling()    print("\n=== 批量处理 ===")await batch_processing()    print("\n=== 超时控制 ===")await timeout_control()    print("\n=== 协程缓存 ===")# 第一次调用会执行    result1 = await expensive_operation(5)    print(f"第一次结果: {result1}")# 第二次调用相同参数,命中缓存    result2 = await expensive_operation(5)    print(f"第二次结果: {result2}")if __name__ == "__main__":    asyncio.run(main_performance())
性能优化要点
  1. 连接复用:HTTP连接、数据库连接要复用,避免频繁创建销毁
  2. 批量操作:减少await次数,尽量批量处理数据
  3. 超时控制:为所有网络请求设置合理的超时时间
  4. 缓存策略:对计算结果进行缓存,避免重复计算
四、实战应用场景
4.1 场景一:构建高性能Web API服务器
使用FastAPI + asyncio构建能处理高并发的API服务器。
"""场景描述:电商平台需要构建商品查询API,每秒需要处理上千个并发请求,同时查询数据库、调用推荐系统、记录访问日志。"""import asyncioimport timefrom typing import List, Dictfrom datetime import datetime# 模拟异步数据库客户端classAsyncDatabase:"""模拟异步数据库操作"""    @staticmethodasyncdefquery_products(product_ids: List[int]) -> List[Dict]:"""查询商品信息"""        print(f"数据库查询: {len(product_ids)} 个商品")await asyncio.sleep(0.1)  # 模拟数据库查询时间return [            {"id": pid, "name"f"商品{pid}""price": pid * 10"stock"100}for pid in product_ids        ]    @staticmethodasyncdefquery_reviews(product_id: int) -> List[Dict]:"""查询商品评价"""await asyncio.sleep(0.05)return [            {"id"1"user""用户A""rating"5"comment""很好"},            {"id"2"user""用户B""rating"4"comment""不错"}        ]# 模拟推荐系统客户端classRecommendationSystem:"""模拟异步推荐系统"""    @staticmethodasyncdefget_recommendations(user_id: int, product_id: int) -> List[int]:"""获取相关推荐"""await asyncio.sleep(0.08)return [product_id + i for i in range(14)]# 日志系统classAsyncLogger:"""异步日志记录"""    @staticmethodasyncdeflog_access(user_id: int, endpoint: str, duration: float):"""记录访问日志"""await asyncio.sleep(0.01)  # 异步写入日志        print(f"[{datetime.now()}] 用户{user_id} 访问{endpoint}, 耗时{duration:.3f}秒")asyncdefget_product_details(product_id: int, user_id: int) -> Dict:"""获取商品详情(整合多个数据源)"""    start_time = time.time()# 并发执行多个异步任务    db = AsyncDatabase()    recsys = RecommendationSystem()# 使用TaskGroup(Python 3.11+)确保所有任务完成asyncwith asyncio.TaskGroup() as tg:# 并行查询商品信息和评价        product_task = tg.create_task(db.query_products([product_id]))        reviews_task = tg.create_task(db.query_reviews(product_id))        recommendations_task = tg.create_task(            recsys.get_recommendations(user_id, product_id)        )# 获取所有结果    products = product_task.result()    reviews = reviews_task.result()    recommendations = recommendations_task.result()# 组装响应数据ifnot products:return {"error""商品不存在"}    product = products[0]# 计算响应时间    duration = time.time() - start_time# 异步记录访问日志(不阻塞主流程)    log_task = asyncio.create_task(        AsyncLogger.log_access(user_id, f"/products/{product_id}", duration)    )return {"product": product,"reviews": reviews,"recommendations": recommendations,"response_time"f"{duration:.3f}秒"    }asyncdefbatch_get_products(product_ids: List[int], user_id: int) -> Dict:"""批量获取商品信息(优化版本)"""    start_time = time.time()    db = AsyncDatabase()# 批量查询商品信息    products = await db.query_products(product_ids)# 并发查询每个商品的评价    review_tasks = [        asyncio.create_task(db.query_reviews(pid))for pid in product_ids    ]    reviews_list = await asyncio.gather(*review_tasks)# 组织返回数据    result = {}for i, product in enumerate(products):        result[product['id']] = {"product": product,"reviews": reviews_list[i] if i < len(reviews_list) else []        }    duration = time.time() - start_time# 记录批量操作日志await AsyncLogger.log_access(        user_id, f"/products/batch/{len(product_ids)}"        duration    )return {"data": result,"total_products": len(products),"response_time"f"{duration:.3f}秒"    }asyncdefdemo_web_api():"""演示Web API场景"""    print("=== 单个商品查询 ===")# 模拟多个并发请求    user_requests = []for user_id in range(16):        product_id = user_id * 10        task = asyncio.create_task(            get_product_details(product_id, user_id)        )        user_requests.append(task)# 并发处理所有用户请求    results = await asyncio.gather(*user_requests)for i, result in enumerate(results):        print(f"用户{i+1} 查询结果: 商品{result.get('product', {}).get('id', 'N/A')}")    print("\n=== 批量商品查询 ===")# 批量查询演示    batch_result = await batch_get_products([101102103104105], 1001)    print(f"批量查询结果: {batch_result['total_products']} 个商品")    print(f"响应时间: {batch_result['response_time']}")if __name__ == "__main__":    asyncio.run(demo_web_api())
场景优势
  • 高并发处理:单台服务器可轻松处理上千并发请求
  • 响应速度快:通过并发查询多个数据源,大幅减少响应时间
  • 资源利用率高:IO等待期间CPU可以处理其他请求
4.2 场景二:高效网络爬虫系统
构建能同时抓取数百个网页的高性能爬虫。
"""场景描述:需要从多个新闻网站抓取最新文章,要求高效、稳定、支持重试机制,同时遵守robots.txt规则,避免对目标网站造成压力。"""import asyncioimport aiohttpimport aiofilesfrom typing import List, Dict, Optionalfrom urllib.parse import urlparseimport timeclassAsyncWebCrawler:"""异步网络爬虫"""def__init__(self,                  max_concurrent: int = 10,                 timeout: float = 10.0,                 retry_count: int = 3):"""        初始化爬虫        Args:            max_concurrent: 最大并发数            timeout: 请求超时时间(秒)            retry_count: 重试次数        """        self.max_concurrent = max_concurrent        self.timeout = timeout        self.retry_count = retry_count        self.semaphore = asyncio.Semaphore(max_concurrent)# 统计信息        self.stats = {"total_requests"0,"successful"0,"failed"0,"total_bytes"0,"start_time"None        }asyncdeffetch_url(self,                        session: aiohttp.ClientSession,                       url: str,                       headers: Optional[Dict] = None) -> Optional[str]:"""抓取单个URL"""        self.stats["total_requests"] += 1for attempt in range(self.retry_count):try:asyncwith self.semaphore:  # 控制并发数                    print(f"抓取: {url} (尝试 {attempt + 1}/{self.retry_count})")asyncwith session.get(                        url,                         headers=headers,                        timeout=aiohttp.ClientTimeout(total=self.timeout)                    ) as response:if response.status == 200:                            content = await response.text()                            self.stats["successful"] += 1                            self.stats["total_bytes"] += len(content.encode('utf-8'))                            print(f"成功: {url} ({len(content)} 字节)")return contentelse:                            print(f"HTTP错误 {response.status}{url}")except asyncio.TimeoutError:                print(f"超时: {url} (尝试 {attempt + 1})")except aiohttp.ClientError as e:                print(f"客户端错误: {url} - {e}")except Exception as e:                print(f"未知错误: {url} - {e}")# 如果不是最后一次尝试,等待后重试if attempt < self.retry_count - 1:await asyncio.sleep(2 ** attempt)  # 指数退避        self.stats["failed"] += 1        print(f"最终失败: {url}")returnNoneasyncdefcrawl_urls(self, urls: List[str],                         output_dir: str = "crawled_data") -> Dict:"""批量抓取URL列表"""        self.stats["start_time"] = time.time()# 确保输出目录存在import os        os.makedirs(output_dir, exist_ok=True)# 设置合理的HTTP头部        headers = {"User-Agent""Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36","Accept""text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8","Accept-Language""zh-CN,zh;q=0.9,en;q=0.8","Accept-Encoding""gzip, deflate",        }asyncwith aiohttp.ClientSession(headers=headers) as session:            tasks = []for url in urls:                task = asyncio.create_task(                    self.fetch_url(session, url)                )                tasks.append(task)# 等待所有任务完成            results = await asyncio.gather(*tasks)# 保存结果到文件            save_tasks = []for i, (url, content) in enumerate(zip(urls, results)):if content:# 生成安全文件名                    parsed = urlparse(url)                    filename = f"{parsed.netloc.replace('.''_')}_{i}.html"                    filepath = os.path.join(output_dir, filename)                    save_task = asyncio.create_task(                        self.save_content(filepath, content, url)                    )                    save_tasks.append(save_task)# 等待所有保存任务完成await asyncio.gather(*save_tasks)# 计算总耗时        duration = time.time() - self.stats["start_time"]        self.stats["duration"] = durationreturn self.statsasyncdefsave_content(self, filepath: str, content: str, url: str):"""异步保存内容到文件"""try:asyncwith aiofiles.open(filepath, 'w', encoding='utf-8'as f:await f.write(f"<!-- Source URL: {url} -->\n")await f.write(content)            print(f"已保存: {filepath}")except Exception as e:            print(f"保存文件失败 {filepath}{e}")asyncdefextract_links(self, html: str, base_url: str) -> List[str]:"""从HTML中提取链接(简化版)"""# 这里可以使用beautifulsoup4,为简化使用字符串查找import re# 简单的链接提取正则        pattern = r'href=["\'](https?://[^"\']+)["\']'        links = re.findall(pattern, html)# 过滤和去重        unique_links = []        seen = set()for link in links:if link notin seen and link.startswith('http'):                seen.add(link)                unique_links.append(link)        print(f"从 {base_url} 提取到 {len(unique_links)} 个链接")return unique_linksasyncdefdemo_crawler():"""爬虫演示"""# 测试URL列表(实际使用时应替换为真实URL)    test_urls = ["https://httpbin.org/delay/1","https://httpbin.org/delay/2""https://httpbin.org/delay/1","https://httpbin.org/status/200","https://httpbin.org/status/404","https://httpbin.org/bytes/1024","https://httpbin.org/bytes/2048","https://httpbin.org/bytes/4096",    ]# 模拟新闻网站URL(这里使用httpbin模拟)    news_urls = [f"https://httpbin.org/delay/{i%3}"for i in range(12)]    print("=== 异步爬虫演示 ===")    crawler = AsyncWebCrawler(        max_concurrent=5,        timeout=5.0,        retry_count=2    )# 开始抓取    stats = await crawler.crawl_urls(news_urls, "crawled_news")# 输出统计信息    print("\n=== 爬虫统计 ===")    print(f"总请求数: {stats['total_requests']}")    print(f"成功: {stats['successful']}")    print(f"失败: {stats['failed']}")    print(f"总数据量: {stats['total_bytes'] / 1024:.2f} KB")    print(f"总耗时: {stats['duration']:.2f}秒")# 计算平均速度if stats['duration'] > 0:        req_per_sec = stats['total_requests'] / stats['duration']        data_per_sec = stats['total_bytes'] / stats['duration'] / 1024        print(f"平均请求速度: {req_per_sec:.2f} 请求/秒")        print(f"平均数据速度: {data_per_sec:.2f} KB/秒")# 演示链接提取(模拟)    print("\n=== 链接提取演示 ===")# 读取一个保存的文件并提取链接import osif os.path.exists("crawled_news"):        files = os.listdir("crawled_news")if files:            sample_file = os.path.join("crawled_news", files[0])asyncwith aiofiles.open(sample_file, 'r', encoding='utf-8'as f:                content = await f.read()# 提取链接            links = await crawler.extract_links(content, "httpbin.org")if links:                print(f"示例链接(前3个):")for link in links[:3]:                    print(f"  - {link}")if __name__ == "__main__":    asyncio.run(demo_crawler())
爬虫设计要点
  1. 并发控制:使用信号量限制最大并发数,避免被封IP
  2. 重试机制:实现指数退避重试,提高稳定性
  3. 超时设置:为每个请求设置合理的超时时间
  4. 连接复用:使用ClientSession复用TCP连接
  5. 异步文件IO:使用aiofiles避免文件写入阻塞事件循环
4.3 场景三:实时数据监控与告警系统
构建能实时处理大量数据流的监控系统。
"""场景描述:需要监控服务器集群的健康状态,实时收集指标(CPU、内存、网络),在异常时触发告警,同时支持历史数据查询和分析。"""import asyncioimport randomimport timefrom typing import Dict, List, Optionalfrom datetime import datetime, timedeltafrom dataclasses import dataclassimport json@dataclassclassServerMetric:"""服务器监控指标"""    server_id: str    timestamp: datetime    cpu_percent: float    memory_percent: float    network_in_mbps: float    network_out_mbps: float    disk_usage_percent: float@dataclass  classAlert:"""告警信息"""    alert_id: str    server_id: str    alert_type: str  # "high_cpu", "high_memory", "network_anomaly"    severity: str    # "warning", "critical"    metric_value: float    threshold: float    timestamp: datetime    message: strclassRealTimeMonitor:"""实时监控系统"""def__init__(self,                  alert_thresholds: Optional[Dict] = None,                 check_interval: float = 5.0):"""        初始化监控器        Args:            alert_thresholds: 告警阈值配置            check_interval: 检查间隔(秒)        """        self.check_interval = check_interval# 默认告警阈值        self.alert_thresholds = alert_thresholds or {"high_cpu"80.0,      # CPU使用率超过80%"high_memory"85.0,   # 内存使用率超过85%"high_disk"90.0,     # 磁盘使用率超过90%"network_spike"100.0# 网络流量突增超过100Mbps        }# 模拟的服务器列表        self.servers = [f"web-server-{i:03d}"for i in range(111)        ] + [f"db-server-{i:03d}"for i in range(16)        ] + [f"cache-server-{i:03d}"for i in range(14)        ]# 状态存储        self.metrics_history: Dict[str, List[ServerMetric]] = {            server: [] for server in self.servers        }        self.active_alerts: Dict[str, Alert] = {}        self.alert_counter = 0# 监控任务        self.monitoring_task: Optional[asyncio.Task] = None        self.is_running = Falseasyncdefcollect_metrics(self, server_id: str) -> ServerMetric:"""收集单个服务器的指标(模拟)"""# 模拟数据收集的延迟await asyncio.sleep(random.uniform(0.10.3))# 模拟正常和异常情况        is_healthy = random.random() > 0.1# 90%时间健康if is_healthy:            cpu = random.uniform(10.060.0)            memory = random.uniform(20.070.0)            network_in = random.uniform(5.050.0)            network_out = random.uniform(5.040.0)            disk = random.uniform(30.080.0)else:# 10%时间可能有问题            cpu = random.uniform(70.099.0)            memory = random.uniform(75.098.0)            network_in = random.uniform(60.0200.0)            network_out = random.uniform(50.0180.0)            disk = random.uniform(85.099.0)        metric = ServerMetric(            server_id=server_id,            timestamp=datetime.now(),            cpu_percent=cpu,            memory_percent=memory,            network_in_mbps=network_in,            network_out_mbps=network_out,            disk_usage_percent=disk        )return metricasyncdefanalyze_metrics(self, metric: ServerMetric) -> List[Alert]:"""分析指标并生成告警"""        alerts = []# 检查CPUif metric.cpu_percent > self.alert_thresholds["high_cpu"]:            severity = "critical"if metric.cpu_percent > 90.0else"warning"            alert = Alert(                alert_id=f"alert-{self.alert_counter:06d}",                server_id=metric.server_id,                alert_type="high_cpu",                severity=severity,                metric_value=metric.cpu_percent,                threshold=self.alert_thresholds["high_cpu"],                timestamp=metric.timestamp,                message=f"服务器 {metric.server_id} CPU使用率过高: {metric.cpu_percent:.1f}%"            )            alerts.append(alert)            self.alert_counter += 1# 检查内存if metric.memory_percent > self.alert_thresholds["high_memory"]:            severity = "critical"if metric.memory_percent > 95.0else"warning"            alert = Alert(                alert_id=f"alert-{self.alert_counter:06d}",                server_id=metric.server_id,                alert_type="high_memory",                severity=severity,                metric_value=metric.memory_percent,                threshold=self.alert_thresholds["high_memory"],                timestamp=metric.timestamp,                message=f"服务器 {metric.server_id} 内存使用率过高: {metric.memory_percent:.1f}%"            )            alerts.append(alert)            self.alert_counter += 1# 检查网络突增        total_network = metric.network_in_mbps + metric.network_out_mbpsif total_network > self.alert_thresholds["network_spike"]:            alert = Alert(                alert_id=f"alert-{self.alert_counter:06d}",                server_id=metric.server_id,                alert_type="network_anomaly",                severity="warning",                metric_value=total_network,                threshold=self.alert_thresholds["network_spike"],                timestamp=metric.timestamp,                message=f"服务器 {metric.server_id} 网络流量突增: {total_network:.1f} Mbps"            )            alerts.append(alert)            self.alert_counter += 1return alertsasyncdefprocess_alerts(self, alerts: List[Alert]):"""处理告警"""for alert in alerts:# 记录活跃告警            self.active_alerts[alert.alert_id] = alert# 模拟告警处理if alert.severity == "critical":                print(f"🚨 CRITICAL告警: {alert.message}")# 这里可以集成邮件、短信、钉钉等通知else:                print(f"⚠️  WARNING告警: {alert.message}")asyncdefcleanup_old_alerts(self, max_age_hours: int = 24):"""清理旧告警"""        cutoff_time = datetime.now() - timedelta(hours=max_age_hours)        to_remove = []for alert_id, alert in self.active_alerts.items():if alert.timestamp < cutoff_time:                to_remove.append(alert_id)for alert_id in to_remove:del self.active_alerts[alert_id]if to_remove:            print(f"清理了 {len(to_remove)} 个旧告警")asyncdefmonitor_loop(self):"""监控主循环"""        print(f"开始监控 {len(self.servers)} 台服务器...")        self.is_running = Truetry:while self.is_running:                cycle_start = time.time()# 并发收集所有服务器的指标                tasks = [                    asyncio.create_task(self.collect_metrics(server))for server in self.servers                ]                metrics = await asyncio.gather(*tasks)# 处理每个指标for metric in metrics:# 保存历史数据                    self.metrics_history[metric.server_id].append(metric)# 保留最近100条记录if len(self.metrics_history[metric.server_id]) > 100:                        self.metrics_history[metric.server_id] = \                            self.metrics_history[metric.server_id][-100:]# 分析指标并生成告警                    alerts = await self.analyze_metrics(metric)if alerts:await self.process_alerts(alerts)# 定期清理旧告警if random.random() < 0.2:  # 20%概率执行清理await self.cleanup_old_alerts()# 输出周期统计                cycle_duration = time.time() - cycle_start                print(f"监控周期完成: {cycle_duration:.2f}秒, "f"活跃告警: {len(self.active_alerts)}个")# 等待下一个周期await asyncio.sleep(self.check_interval)except asyncio.CancelledError:            print("监控任务被取消")finally:            self.is_running = Falseasyncdefstart_monitoring(self):"""启动监控"""if self.monitoring_task isNoneor self.monitoring_task.done():            self.monitoring_task = asyncio.create_task(self.monitor_loop())asyncdefstop_monitoring(self):"""停止监控"""        self.is_running = Falseif self.monitoring_task andnot self.monitoring_task.done():            self.monitoring_task.cancel()try:await self.monitoring_taskexcept asyncio.CancelledError:passasyncdefget_server_stats(self, server_id: str) -> Dict:"""获取服务器统计信息"""if server_id notin self.metrics_history:return {"error""服务器不存在"}        metrics = self.metrics_history[server_id]ifnot metrics:return {"server_id": server_id, "message""暂无数据"}# 计算统计信息        latest = metrics[-1]        cpu_values = [m.cpu_percent for m in metrics]        memory_values = [m.memory_percent for m in metrics]        stats = {"server_id": server_id,"latest_metric": {"timestamp": latest.timestamp.isoformat(),"cpu_percent": latest.cpu_percent,"memory_percent": latest.memory_percent,"network_in_mbps": latest.network_in_mbps,"network_out_mbps": latest.network_out_mbps,"disk_usage_percent": latest.disk_usage_percent            },"statistics": {"cpu_avg": sum(cpu_values) / len(cpu_values),"cpu_max": max(cpu_values),"memory_avg": sum(memory_values) / len(memory_values),"memory_max": max(memory_values),"total_metrics": len(metrics),"monitoring_period_minutes": len(metrics) * self.check_interval / 60            }        }return statsasyncdefdemo_monitoring():"""监控系统演示"""    print("=== 实时监控系统演示 ===")# 创建监控器    monitor = RealTimeMonitor(        alert_thresholds={"high_cpu"75.0,      # 降低阈值以便更容易触发"high_memory"80.0,"high_disk"85.0,"network_spike"80.0        },        check_interval=3.0# 更短的检查间隔    )# 启动监控await monitor.start_monitoring()# 让监控运行一段时间    print("监控运行中,持续15秒...")await asyncio.sleep(15)# 停止监控await monitor.stop_monitoring()# 展示统计信息    print("\n=== 监控统计 ===")# 随机选择几台服务器展示统计    sample_servers = random.sample(monitor.servers, min(3, len(monitor.servers)))for server_id in sample_servers:        stats = await monitor.get_server_stats(server_id)        print(f"\n服务器: {server_id}")if"latest_metric"in stats:            latest = stats["latest_metric"]            print(f"  最新指标:")            print(f"    时间: {latest['timestamp']}")            print(f"    CPU: {latest['cpu_percent']:.1f}%")            print(f"    内存: {latest['memory_percent']:.1f}%")            print(f"    网络流入: {latest['network_in_mbps']:.1f} Mbps")            print(f"    网络流出: {latest['network_out_mbps']:.1f} Mbps")if"statistics"in stats:            stat = stats["statistics"]            print(f"  统计信息:")            print(f"    平均CPU: {stat['cpu_avg']:.1f}%")            print(f"    最高CPU: {stat['cpu_max']:.1f}%")            print(f"    监控周期: {stat['monitoring_period_minutes']:.1f} 分钟")# 展示活跃告警    print(f"\n=== 活跃告警 ===")    print(f"总活跃告警数: {len(monitor.active_alerts)}")if monitor.active_alerts:for alert_id, alert in list(monitor.active_alerts.items())[:3]:            print(f"\n告警ID: {alert.alert_id}")            print(f"  服务器: {alert.server_id}")            print(f"  类型: {alert.alert_type}")            print(f"  严重程度: {alert.severity}")            print(f"  指标值: {alert.metric_value:.1f}")            print(f"  阈值: {alert.threshold}")            print(f"  时间: {alert.timestamp}")            print(f"  消息: {alert.message}")else:        print("暂无活跃告警")asyncdefstress_test_monitor():"""压力测试:模拟大量服务器"""    print("\n=== 监控系统压力测试 ===")# 创建包含100台服务器的监控器    large_monitor = RealTimeMonitor(        alert_thresholds={"high_cpu"85.0},        check_interval=2.0    )# 替换为大量服务器    large_monitor.servers = [f"stress-server-{i:04d}"for i in range(100)]    large_monitor.metrics_history = {        server: [] for server in large_monitor.servers    }# 运行10秒压力测试await large_monitor.start_monitoring()await asyncio.sleep(10)await large_monitor.stop_monitoring()# 统计    total_metrics = sum(len(metrics) for metrics in large_monitor.metrics_history.values())    print(f"压力测试结果:")    print(f"  服务器数量: {len(large_monitor.servers)}")    print(f"  总指标收集次数: {total_metrics}")    print(f"  活跃告警数: {len(large_monitor.active_alerts)}")    print(f"  平均每秒处理指标数: {total_metrics / 10:.1f}")if __name__ == "__main__":# 运行演示    asyncio.run(demo_monitoring())# 可选:运行压力测试# asyncio.run(stress_test_monitor())
监控系统特点
  1. 实时性:毫秒级响应,及时发现异常
  2. 可扩展性:轻松支持数百台服务器监控
  3. 智能化:基于阈值的智能告警,减少误报
  4. 历史分析:保留历史数据,支持趋势分析
五、常见问题与解决方案
5.1 问题一:异步函数中调用同步阻塞代码
问题描述:在协程中调用了同步阻塞函数(如time.sleep()、同步数据库查询),导致整个事件循环被阻塞。
错误示例
import asyncioimport timeasyncdefbad_example():"""错误:在协程中使用同步阻塞调用"""    print("开始任务")    time.sleep(5)  # ❌ 同步阻塞,会卡住整个事件循环    print("任务完成")asyncdefmain():    tasks = [bad_example() for _ in range(3)]await asyncio.gather(*tasks)  # 实际上还是串行执行# 运行结果:总耗时约15秒,而不是预期的5秒
解决方案
import asyncio# 方案1:使用异步版本的函数asyncdefcorrect_example1():"""正确:使用异步等待"""    print("开始任务")await asyncio.sleep(5)  # ✅ 非阻塞等待    print("任务完成")# 方案2:将同步函数放到线程池执行import concurrent.futuresdefblocking_io_operation():"""模拟同步阻塞IO操作"""    time.sleep(5)return"操作完成"asyncdefcorrect_example2():"""正确:使用run_in_executor"""    print("开始阻塞操作")# 在线程池中执行阻塞函数    loop = asyncio.get_running_loop()    result = await loop.run_in_executor(None,  # 使用默认线程池        blocking_io_operation    )    print(f"操作结果: {result}")# 方案3:寻找异步库替代同步库# 同步requests → 异步aiohttp或httpx# 同步sqlite3 → 异步aiosqlite# 同步psycopg2 → 异步asyncpgasyncdefdemo_solutions():"""演示解决方案"""    print("=== 方案1: 使用异步函数 ===")    start = time.time()await asyncio.gather(*[correct_example1() for _ in range(3)])    print(f"总耗时: {time.time() - start:.2f}秒")  # 约5秒    print("\n=== 方案2: 使用线程池 ===")    start = time.time()await correct_example2()    print(f"总耗时: {time.time() - start:.2f}秒")  # 约5秒if __name__ == "__main__":    asyncio.run(demo_solutions())
最佳实践
  1. 优先寻找异步版本的库
  2. 对于无法避免的同步调用,使用run_in_executor
  3. 避免在协程中直接调用任何可能阻塞的函数
5.2 问题二:忘记await协程调用
问题描述:定义了协程函数但调用时忘记加await,导致协程没有实际执行。
错误示例
import asyncioasyncdeffetch_data():"""获取数据的协程"""await asyncio.sleep(1)return {"data""示例数据"}asyncdefbad_example():"""错误:忘记await"""    result = fetch_data()  # ❌ 没有加await,result是协程对象,不是结果    print(f"结果: {result}")  # 输出: <coroutine object fetch_data at 0x...># 更隐蔽的错误:在表达式中忘记await    data_list = [fetch_data() for _ in range(3)]  # ❌ 列表中是协程对象    print(f"数据列表: {data_list}")asyncdefmain():await bad_example()
解决方案
import asyncioasyncdeffetch_data():"""获取数据的协程"""await asyncio.sleep(1)return {"data"f"数据-{id(asyncio.current_task())}"}asyncdefcorrect_example1():"""正确:显式await"""    print("开始获取数据")    result = await fetch_data()  # ✅ 正确使用await    print(f"获取到数据: {result}")return resultasyncdefcorrect_example2():"""正确:批量await"""    print("开始批量获取数据")# 创建任务列表    tasks = [asyncio.create_task(fetch_data()) for _ in range(3)]# 等待所有任务完成    results = await asyncio.gather(*tasks)    print(f"批量结果: {results}")return resultsasyncdefcorrect_example3():"""正确:在列表推导中使用await"""    print("开始列表推导获取数据")# ❌ 错误:data_list = [fetch_data() for _ in range(3)]# ✅ 正确:先创建任务,再gather    tasks = [fetch_data() for _ in range(3)]    data_list = await asyncio.gather(*tasks)    print(f"列表推导结果: {data_list}")return data_list# 高级技巧:自动检测未await的协程(开发时辅助)import warningsdefwarn_unawaited_coroutine(coro):"""警告未await的协程(开发辅助)"""import inspect# 获取调用栈信息    stack = inspect.stack()    caller_info = stack[1]  # 调用者信息    warnings.warn(f"协程 '{coro.__name__}' 在 {caller_info.filename}:{caller_info.lineno} "f"被调用但未使用await,这通常是个错误!",        category=RuntimeWarning,        stacklevel=2    )return coroasyncdefdemo_await_solutions():"""演示await解决方案"""    print("=== 方案1: 显式await ===")await correct_example1()    print("\n=== 方案2: 批量gather ===")await correct_example2()    print("\n=== 方案3: 列表推导正确用法 ===")await correct_example3()    print("\n=== 开发辅助:未await警告 ===")# 模拟忘记await的情况    coro = fetch_data()    warn_unawaited_coroutine(coro)# 正确的应该这样:# result = await fetch_data()if __name__ == "__main__":    asyncio.run(demo_await_solutions())
预防措施
  1. 使用类型注解:async def func() -> Awaitable[ResultType]
  2. IDE配置:启用未await协程的静态检查
  3. 代码审查:特别注意协程调用点
  4. 测试覆盖:确保协程被正确await
5.3 问题三:异常处理不当导致任务静默失败
问题描述:协程中发生异常但没有被正确捕获,导致任务失败但不报错,难以调试。
错误示例
import asyncioasyncdefrisky_operation():"""可能失败的协程"""await asyncio.sleep(0.5)ifTrue:  # 模拟异常条件raise ValueError("模拟的业务异常")return"成功"asyncdefbad_example1():"""错误:异常被静默吞没"""try:        task = asyncio.create_task(risky_operation())# 忘记await task,异常不会传播        print("任务已创建,但不会等待")except Exception as e:        print(f"捕获到异常: {e}")  # 这行不会执行!asyncdefbad_example2():"""错误:gather时异常处理不当"""    tasks = [risky_operation() for _ in range(3)]# ❌ 错误:如果某个任务失败,整个gather会失败try:        results = await asyncio.gather(*tasks)        print(f"结果: {results}")except ValueError as e:        print(f"有任务失败: {e}")  # 只能看到一个异常asyncdefmain():    print("=== 错误示例1 ===")await bad_example1()await asyncio.sleep(1)  # 给任务时间执行    print("\n=== 错误示例2 ===")await bad_example2()
解决方案
import asyncioasyncdefrisky_operation(id: int):"""可能失败的协程,带ID标识"""await asyncio.sleep(0.5)# 模拟:ID为偶数的任务失败if id % 2 == 0:raise ValueError(f"任务{id}的业务异常")returnf"任务{id}成功"asyncdefcorrect_example1():"""正确:确保所有任务都被await"""    print("方法1: 确保await每个任务")    task = asyncio.create_task(risky_operation(1))try:        result = await task  # ✅ 正确:await任务        print(f"任务结果: {result}")except ValueError as e:        print(f"任务失败: {e}")asyncdefcorrect_example2():"""正确:使用gather的return_exceptions参数"""    print("\n方法2: 使用return_exceptions收集所有结果")    tasks = [risky_operation(i) for i in range(5)]# ✅ 正确:收集所有结果,包括异常    results = await asyncio.gather(*tasks, return_exceptions=True)for i, result in enumerate(results):if isinstance(result, Exception):            print(f"任务{i}失败: {result}")else:            print(f"任务{i}成功: {result}")asyncdefcorrect_example3():"""正确:使用TaskGroup(Python 3.11+)自动异常处理"""    print("\n方法3: 使用TaskGroup结构化并发")try:asyncwith asyncio.TaskGroup() as tg:# 创建多个任务            task1 = tg.create_task(risky_operation(10))            task2 = tg.create_task(risky_operation(11))            task3 = tg.create_task(risky_operation(12))  # 偶数,会失败            print("所有任务已创建,等待完成...")except* ValueError as eg:  # 异常组处理        print(f"捕获到异常组,包含{len(eg.exceptions)}个异常:")for exc in eg.exceptions:            print(f"  - {exc}")else:        print("所有任务成功完成")asyncdefcorrect_example4():"""正确:添加任务完成回调监控"""    print("\n方法4: 使用add_done_callback监控任务")asyncdefmonitor_task(task: asyncio.Task, task_id: int):"""监控任务完成状态"""defdone_callback(future):if future.cancelled():                print(f"任务{task_id}被取消")elif future.exception():                print(f"任务{task_id}异常: {future.exception()}")else:                print(f"任务{task_id}成功: {future.result()}")        task.add_done_callback(done_callback)return task# 创建并监控多个任务    tasks = []for i in range(3):        task = asyncio.create_task(risky_operation(i))        monitored_task = await monitor_task(task, i)        tasks.append(monitored_task)# 等待所有任务await asyncio.gather(*tasks, return_exceptions=True)asyncdefdemo_exception_solutions():"""演示异常处理解决方案"""await correct_example1()await correct_example2()# Python 3.11+ 特有功能import sysif sys.version_info >= (311):await correct_example3()await correct_example4()if __name__ == "__main__":    asyncio.run(demo_exception_solutions())
异常处理最佳实践
  1. 总是await任务:确保异常能够传播
  2. 使用return_exceptions:收集所有任务结果,不因单个失败而中断
  3. 利用TaskGroup:Python 3.11+ 提供更安全的并发控制
  4. 添加监控回调:重要任务需要状态监控
  5. 记录详细日志:异常发生时记录足够上下文信息
5.4 问题四:资源泄漏:未正确关闭连接和资源
问题描述:异步操作中创建了资源(数据库连接、HTTP会话、文件句柄)但没有正确关闭,导致资源泄漏。
错误示例
import asyncioasyncdefbad_example():"""错误:资源未正确关闭"""# 模拟数据库连接    conn = {"status""connected"}try:await asyncio.sleep(1)        print("执行查询...")return"查询结果"finally:# ❌ 错误:忘记关闭连接# 应该调用: await conn.close()        print("忘记关闭连接!")# 连接资源泄漏asyncdefbad_example2():"""错误:未使用异步上下文管理器"""# 模拟文件操作    file = open("temp.txt""w")try:        file.write("测试数据")await asyncio.sleep(0.5)  # 模拟异步操作finally:        file.close()  # ✅ 关闭了,但不是异步方式# 在异步环境中,同步IO可能阻塞事件循环
解决方案
import asynciofrom contextlib import asynccontextmanager# 方案1:使用异步上下文管理器classAsyncDatabaseConnection:"""异步数据库连接"""def__init__(self, connection_string: str):        self.connection_string = connection_string        self._is_closed = Falseasyncdefconnect(self):"""建立连接"""        print(f"连接数据库: {self.connection_string}")await asyncio.sleep(0.3)        self._is_closed = Falsereturn selfasyncdefclose(self):"""关闭连接"""ifnot self._is_closed:            print("关闭数据库连接")await asyncio.sleep(0.2)            self._is_closed = Trueasyncdefquery(self, sql: str):"""执行查询"""if self._is_closed:raise RuntimeError("连接已关闭")        print(f"执行查询: {sql}")await asyncio.sleep(0.5)return [{"id"1"result""数据"}]asyncdef__aenter__(self):"""进入上下文"""returnawait self.connect()asyncdef__aexit__(self, exc_type, exc_val, exc_tb):"""退出上下文"""await self.close()# 方案2:使用contextlib.asynccontextmanager装饰器@asynccontextmanagerasyncdefmanaged_http_session():"""管理HTTP会话的上下文管理器"""import aiohttp    session = aiohttp.ClientSession()try:        print("创建HTTP会话")yield sessionfinally:        print("关闭HTTP会话")await session.close()# 方案3:资源池模式(连接复用)classConnectionPool:"""连接池,复用资源"""def__init__(self, max_size: int = 10):        self.max_size = max_size        self.pool = asyncio.Queue(maxsize=max_size)        self._in_use = set()# 初始化连接池for i in range(max_size):            self.pool.put_nowait(f"connection-{i}")asyncdefacquire(self):"""从池中获取连接"""        conn = await self.pool.get()        self._in_use.add(conn)        print(f"获取连接: {conn}, 使用中: {len(self._in_use)}")return connasyncdefrelease(self, conn):"""释放连接回池中"""if conn in self._in_use:            self._in_use.remove(conn)await self.pool.put(conn)            print(f"释放连接: {conn}, 剩余可用: {self.pool.qsize()}")else:            print(f"警告: 尝试释放未使用的连接: {conn}")asyncdefcorrect_example1():"""正确:使用异步上下文管理器"""    print("=== 异步上下文管理器示例 ===")asyncwith AsyncDatabaseConnection("postgresql://localhost/db"as conn:        results = await conn.query("SELECT * FROM users")        print(f"查询结果: {results}")# 连接自动关闭    print("上下文退出,资源已清理")asyncdefcorrect_example2():"""正确:使用contextlib装饰器"""    print("\n=== contextlib装饰器示例 ===")asyncwith managed_http_session() as session:asyncwith session.get("https://httpbin.org/get"as response:            data = await response.text()            print(f"获取到数据,长度: {len(data)}")    print("HTTP会话已关闭")asyncdefcorrect_example3():"""正确:使用连接池复用资源"""    print("\n=== 连接池示例 ===")    pool = ConnectionPool(max_size=3)asyncdefworker(worker_id: int):"""工作协程,使用连接池"""        conn = await pool.acquire()try:            print(f"工作者 {worker_id} 使用连接 {conn}")await asyncio.sleep(1)  # 模拟工作returnf"工作者 {worker_id} 完成"finally:await pool.release(conn)# 创建多个工作者,超过连接池大小    tasks = [asyncio.create_task(worker(i)) for i in range(5)]    results = await asyncio.gather(*tasks)    print(f"所有工作完成: {results}")    print(f"连接池状态: 可用{pool.pool.qsize()}, 总数{pool.max_size}")asyncdefcorrect_example4():"""正确:使用asyncio.create_task的清理"""    print("\n=== 任务级资源清理示例 ===")asyncdefmonitored_task(task_id: int):"""带资源监控的任务"""        resources = []try:# 模拟获取资源            resource = f"resource-{task_id}"            resources.append(resource)            print(f"任务{task_id} 获取资源: {resource}")await asyncio.sleep(1)returnf"任务{task_id} 结果"finally:# 清理所有资源for res in resources:                print(f"任务{task_id} 清理资源: {res}")# 创建并运行任务    task = asyncio.create_task(monitored_task(100))try:        result = await task        print(f"任务完成: {result}")except Exception as e:        print(f"任务失败: {e}")# 即使失败,finally块也会执行asyncdefdemo_resource_solutions():"""演示资源管理解决方案"""await correct_example1()await correct_example2()await correct_example3()await correct_example4()if __name__ == "__main__":    asyncio.run(demo_resource_solutions())
资源管理黄金法则
  1. 总是使用上下文管理器async with确保资源正确释放
  2. 连接复用优先:使用连接池避免频繁创建销毁
  3. finally块清理:即使异常也要确保清理
  4. 监控资源使用:记录资源创建和释放,便于调试
  5. 限制资源数量:防止无限制创建导致系统资源耗尽
5.5 问题五:调试困难:异步调用栈复杂
问题描述:异步代码的调用栈比同步代码复杂,错误发生时难以追踪问题源头。
错误示例
import asyncioasyncdefdeep_function():"""深层函数,会抛出异常"""await asyncio.sleep(0.1)raise ValueError("深层异常")asyncdefmiddle_function():"""中层函数"""await asyncio.sleep(0.1)returnawait deep_function()asyncdeftop_function():"""顶层函数"""await asyncio.sleep(0.1)returnawait middle_function()asyncdefbad_example():"""错误:没有足够的调试信息"""try:        result = await top_function()return resultexcept ValueError as e:        print(f"捕获到异常: {e}")# ❌ 错误:没有记录调用栈,难以定位问题# 不知道是deep_function抛出的异常asyncdefmain():await bad_example()
解决方案
import asyncioimport tracebackimport loggingfrom functools import wraps# 配置日志logging.basicConfig(    level=logging.DEBUG,    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')logger = logging.getLogger(__name__)# 方案1:增强的异常处理asyncdefdeep_function_enhanced():"""增强版深层函数"""try:await asyncio.sleep(0.1)raise ValueError("深层异常")except Exception as e:# 记录详细上下文        logger.error(f"deep_function失败: {e}")        logger.debug(f"调用栈:\n{traceback.format_exc()}")raise# 重新抛出异常# 方案2:装饰器自动添加调试信息defdebug_async(func):"""异步函数调试装饰器"""    @wraps(func)asyncdefwrapper(*args, **kwargs):        func_name = func.__name__        logger.debug(f"开始执行异步函数: {func_name}")try:            result = await func(*args, **kwargs)            logger.debug(f"异步函数完成: {func_name}")return resultexcept Exception as e:            logger.error(f"异步函数失败: {func_name}, 错误: {e}")            logger.debug(f"详细调用栈:\n{traceback.format_exc()}")raisereturn wrapper@debug_asyncasyncdefmiddle_function_enhanced():"""装饰器增强的中层函数"""await asyncio.sleep(0.1)returnawait deep_function_enhanced()@debug_asyncasyncdeftop_function_enhanced():"""装饰器增强的顶层函数"""await asyncio.sleep(0.1)returnawait middle_function_enhanced()# 方案3:使用asyncio调试模式asyncdefdebug_mode_example():"""使用asyncio调试模式"""# 启用调试模式    asyncio.get_event_loop().set_debug(True)# 这会提供更多调试信息,包括:# - 未await的协程警告# - 慢回调和资源泄漏检测    print("启用asyncio调试模式")asyncdeftask_with_slow_callback():"""有慢回调的任务"""await asyncio.sleep(0.01)# 创建多个任务    tasks = [asyncio.create_task(task_with_slow_callback()) for _ in range(3)]await asyncio.gather(*tasks)# 方案4:结构化日志记录classStructuredLogger:"""结构化日志记录器"""def__init__(self, component: str):        self.component = componentasyncdeflog_operation(self, operation: str, **kwargs):"""记录结构化操作日志"""        log_data = {"timestamp": datetime.now().isoformat(),"component": self.component,"operation": operation,"task_id": id(asyncio.current_task()),            **kwargs        }        logger.info(f"结构化日志: {json.dumps(log_data)}")return log_dataasyncdefstructured_logging_example():"""结构化日志示例"""    db_logger = StructuredLogger("database")asyncdefquery_database():"""查询数据库,带结构化日志"""        start_time = time.time()await db_logger.log_operation("query_start",            sql="SELECT * FROM users",            params={"limit"100}        )try:await asyncio.sleep(0.5)  # 模拟查询            result = [{"id"1"name""测试"}]await db_logger.log_operation("query_success",                duration=time.time() - start_time,                row_count=len(result)            )return resultexcept Exception as e:await db_logger.log_operation("query_failure",                duration=time.time() - start_time,                error=str(e),                error_type=type(e).__name__            )raise# 执行带结构化日志的操作try:        results = await query_database()        print(f"查询结果: {results}")except Exception as e:        print(f"查询失败: {e}")# 方案5:任务追踪和监控asyncdeftask_tracing_example():"""任务追踪示例"""classTaskTracer:"""任务追踪器"""def__init__(self):            self.active_tasks = {}            self.completed_tasks = []defregister_task(self, task: asyncio.Task, name: str):"""注册任务"""            task_id = id(task)            self.active_tasks[task_id] = {"name": name,"start_time": time.time(),"task": task            }# 添加完成回调            task.add_done_callback(lambda t: self._task_completed(t))            logger.info(f"注册任务: {name} (ID: {task_id})")return task_iddef_task_completed(self, task: asyncio.Task):"""任务完成回调"""            task_id = id(task)if task_id in self.active_tasks:                task_info = self.active_tasks.pop(task_id)                duration = time.time() - task_info["start_time"]                status = "cancelled"if task.cancelled() else"failed"if task.exception() else"completed"                task_record = {                    **task_info,"duration": duration,"status": status,"end_time": time.time()                }                self.completed_tasks.append(task_record)                logger.info(f"任务完成: {task_info['name']}, "f"状态: {status}, 耗时: {duration:.3f}秒"                )defget_stats(self):"""获取统计信息"""return {"active": len(self.active_tasks),"completed": len(self.completed_tasks),"total_duration": sum(t["duration"for t in self.completed_tasks),"avg_duration": (                    sum(t["duration"for t in self.completed_tasks) /                     len(self.completed_tasks) if self.completed_tasks else0                )            }    tracer = TaskTracer()asyncdeftraced_task(name: str, duration: float):"""被追踪的任务"""        task = asyncio.current_task()        tracer.register_task(task, name)await asyncio.sleep(duration)if random.random() < 0.3:  # 30%概率模拟失败raise RuntimeError(f"任务 {name} 模拟失败")returnf"任务 {name} 完成"# 创建多个被追踪的任务    tasks = []for i in range(5):        task = asyncio.create_task(            traced_task(f"工作{i}", random.uniform(0.51.5))        )        tasks.append(task)# 等待所有任务try:        results = await asyncio.gather(*tasks, return_exceptions=True)        print(f"所有任务结果: {results}")finally:# 输出追踪统计        stats = tracer.get_stats()        print(f"任务追踪统计: {stats}")asyncdefdemo_debugging_solutions():"""演示调试解决方案"""    print("=== 增强异常处理 ===")try:await top_function_enhanced()except ValueError as e:        print(f"捕获到增强异常: {e}")# 此时日志中已经有详细调用栈    print("\n=== 结构化日志 ===")await structured_logging_example()    print("\n=== 任务追踪 ===")await task_tracing_example()    print("\n=== 调试模式 ===")await debug_mode_example()if __name__ == "__main__":    asyncio.run(demo_debugging_solutions())
调试最佳实践
  1. 启用asyncio调试模式:开发环境始终开启
  2. 结构化日志:记录操作上下文,便于追踪
  3. 异常增强:捕获异常时记录调用栈
  4. 任务监控:重要任务添加完成回调
  5. 性能 profiling:定期检查慢回调和资源使用
六、总结与扩展学习
6.1 核心要点回顾
通过本文的学习,你已经掌握了asyncio模块的核心知识和实战技能:
  1. 异步编程原理:理解了事件循环、协程、任务三要素的协作机制
  2. 最新特性掌握:学会了Python 3.11+的TaskGroup结构化并发和timeout控制
  3. 实战应用能力:能够构建高性能Web API、高效爬虫、实时监控系统
  4. 问题解决技巧:掌握了异步编程中的常见问题和解决方案
  5. 性能优化方法:了解了连接复用、批量处理、缓存等优化技巧
6.2 延伸学习路径
书籍推荐
  1. 《Python高级编程(第3版)》:全面覆盖Python高级特性,包括异步编程
  2. 《流畅的Python(第2版)》:Luciano Ramalho的经典之作,第18章专门讲解asyncio
  3. 《Asyncio in Practice》:专注于asyncio实战的英文书籍
开源项目学习
  1. FastAPI:https://github.com/tiangolo/fastapi
    • 学习现代异步Web框架的最佳实践
    • 关注如何处理依赖注入、中间件、异常处理
  2. aiohttp:https://github.com/aio-libs/aiohttp
    • 研究异步HTTP客户端和服务器的实现
    • 学习连接池、会话管理、超时重试机制
  3. uvicorn:https://github.com/encode/uvicorn
    • 了解ASGI服务器的工作原理
    • 学习如何优化异步服务器的性能
在线课程
  1. Real Python的Asyncio教程:https://realpython.com/async-io-python/
    • 从基础到高级的完整教程
    • 包含大量实战示例
  2. Async/await深入理解:https://docs.python.org/3/library/asyncio.html
    • 官方文档是最权威的学习资料
    • 关注每个版本的更新日志
社区资源
  1. Python Discord服务器:https://discord.gg/python
    • 活跃的Python开发者社区
    • #async-help频道专门讨论异步编程
  2. Stack Overflow:https://stackoverflow.com/questions/tagged/python-asyncio
    • 大量实际问题和解决方案
    • 学习别人的经验和教训
6.3 下一篇预告
你将学到
  • 类型提示(Type Hints)的完整语法体系
  • 静态类型检查工具(mypy、Pyright)的实战配置
  • 泛型编程在Python中的高级应用
  • 使用Protocol实现结构子类型
  • 大型项目中类型系统的实际应用案例
适合人群
  • 希望提升代码质量和可维护性的Python开发者
  • 需要编写API接口、库或框架的开发者
  • 在团队协作中希望减少类型相关错误的工程师
学习价值
  • 减少运行时类型错误,提前发现问题
  • 提升IDE自动补全和代码导航能力
  • 让代码更易于理解和维护
  • 为大型项目提供更好的架构支持
实践项目
  • 为现有项目添加完整类型注解
  • 配置CI/CD中的类型检查流程
  • 设计类型安全的API接口
  • 构建支持泛型的可复用库
敬请期待下一篇《Python typing模块深度解析:从类型提示到静态检查,构建健壮可维护的Python代码体系》!

温馨提示:在实际项目中使用asyncio时,建议:
  1. 从简单场景开始,逐步扩展到复杂应用
  2. 编写充分的测试,特别是并发场景的测试
  3. 监控系统性能,及时发现和优化瓶颈
  4. 遵循最佳实践,避免常见的陷阱和错误
希望本文能帮助你在异步编程的道路上走得更远、更稳!如果有任何问题或建议,欢迎在评论区交流讨论。🙂

最新文章

随机文章

基本 文件 流程 错误 SQL 调试
  1. 请求信息 : 2026-04-14 00:50:05 HTTP/2.0 GET : https://f.mffb.com.cn/a/484066.html
  2. 运行时间 : 0.132679s [ 吞吐率:7.54req/s ] 内存消耗:5,007.59kb 文件加载:140
  3. 缓存信息 : 0 reads,0 writes
  4. 会话信息 : SESSION_ID=e0e9573d591bd18f8de37580e19b8b36
  1. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/public/index.php ( 0.79 KB )
  2. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/autoload.php ( 0.17 KB )
  3. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/composer/autoload_real.php ( 2.49 KB )
  4. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/composer/platform_check.php ( 0.90 KB )
  5. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/composer/ClassLoader.php ( 14.03 KB )
  6. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/composer/autoload_static.php ( 4.90 KB )
  7. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/helper.php ( 8.34 KB )
  8. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-validate/src/helper.php ( 2.19 KB )
  9. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/helper.php ( 1.47 KB )
  10. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/stubs/load_stubs.php ( 0.16 KB )
  11. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Exception.php ( 1.69 KB )
  12. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-container/src/Facade.php ( 2.71 KB )
  13. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/deprecation-contracts/function.php ( 0.99 KB )
  14. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/polyfill-mbstring/bootstrap.php ( 8.26 KB )
  15. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/polyfill-mbstring/bootstrap80.php ( 9.78 KB )
  16. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/var-dumper/Resources/functions/dump.php ( 1.49 KB )
  17. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-dumper/src/helper.php ( 0.18 KB )
  18. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/var-dumper/VarDumper.php ( 4.30 KB )
  19. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/App.php ( 15.30 KB )
  20. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-container/src/Container.php ( 15.76 KB )
  21. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/psr/container/src/ContainerInterface.php ( 1.02 KB )
  22. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/provider.php ( 0.19 KB )
  23. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Http.php ( 6.04 KB )
  24. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/helper/Str.php ( 7.29 KB )
  25. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Env.php ( 4.68 KB )
  26. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/common.php ( 0.03 KB )
  27. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/helper.php ( 18.78 KB )
  28. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Config.php ( 5.54 KB )
  29. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/app.php ( 0.95 KB )
  30. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/cache.php ( 0.78 KB )
  31. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/console.php ( 0.23 KB )
  32. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/cookie.php ( 0.56 KB )
  33. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/database.php ( 2.48 KB )
  34. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/facade/Env.php ( 1.67 KB )
  35. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/filesystem.php ( 0.61 KB )
  36. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/lang.php ( 0.91 KB )
  37. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/log.php ( 1.35 KB )
  38. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/middleware.php ( 0.19 KB )
  39. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/route.php ( 1.89 KB )
  40. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/session.php ( 0.57 KB )
  41. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/trace.php ( 0.34 KB )
  42. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/view.php ( 0.82 KB )
  43. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/event.php ( 0.25 KB )
  44. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Event.php ( 7.67 KB )
  45. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/service.php ( 0.13 KB )
  46. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/AppService.php ( 0.26 KB )
  47. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Service.php ( 1.64 KB )
  48. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Lang.php ( 7.35 KB )
  49. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/lang/zh-cn.php ( 13.70 KB )
  50. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/initializer/Error.php ( 3.31 KB )
  51. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/initializer/RegisterService.php ( 1.33 KB )
  52. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/services.php ( 0.14 KB )
  53. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/service/PaginatorService.php ( 1.52 KB )
  54. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/service/ValidateService.php ( 0.99 KB )
  55. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/service/ModelService.php ( 2.04 KB )
  56. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-trace/src/Service.php ( 0.77 KB )
  57. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Middleware.php ( 6.72 KB )
  58. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/initializer/BootService.php ( 0.77 KB )
  59. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/Paginator.php ( 11.86 KB )
  60. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-validate/src/Validate.php ( 63.20 KB )
  61. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/Model.php ( 23.55 KB )
  62. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/Attribute.php ( 21.05 KB )
  63. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/AutoWriteData.php ( 4.21 KB )
  64. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/Conversion.php ( 6.44 KB )
  65. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/DbConnect.php ( 5.16 KB )
  66. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/ModelEvent.php ( 2.33 KB )
  67. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/RelationShip.php ( 28.29 KB )
  68. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/contract/Arrayable.php ( 0.09 KB )
  69. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/contract/Jsonable.php ( 0.13 KB )
  70. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/contract/Modelable.php ( 0.09 KB )
  71. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Db.php ( 2.88 KB )
  72. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/DbManager.php ( 8.52 KB )
  73. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Log.php ( 6.28 KB )
  74. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Manager.php ( 3.92 KB )
  75. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/psr/log/src/LoggerTrait.php ( 2.69 KB )
  76. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/psr/log/src/LoggerInterface.php ( 2.71 KB )
  77. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Cache.php ( 4.92 KB )
  78. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/psr/simple-cache/src/CacheInterface.php ( 4.71 KB )
  79. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/helper/Arr.php ( 16.63 KB )
  80. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/cache/driver/File.php ( 7.84 KB )
  81. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/cache/Driver.php ( 9.03 KB )
  82. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/contract/CacheHandlerInterface.php ( 1.99 KB )
  83. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/Request.php ( 0.09 KB )
  84. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Request.php ( 55.78 KB )
  85. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/middleware.php ( 0.25 KB )
  86. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Pipeline.php ( 2.61 KB )
  87. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-trace/src/TraceDebug.php ( 3.40 KB )
  88. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/middleware/SessionInit.php ( 1.94 KB )
  89. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Session.php ( 1.80 KB )
  90. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/session/driver/File.php ( 6.27 KB )
  91. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/contract/SessionHandlerInterface.php ( 0.87 KB )
  92. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/session/Store.php ( 7.12 KB )
  93. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Route.php ( 23.73 KB )
  94. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/RuleName.php ( 5.75 KB )
  95. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/Domain.php ( 2.53 KB )
  96. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/RuleGroup.php ( 22.43 KB )
  97. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/Rule.php ( 26.95 KB )
  98. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/RuleItem.php ( 9.78 KB )
  99. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/route/app.php ( 1.72 KB )
  100. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/facade/Route.php ( 4.70 KB )
  101. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/dispatch/Controller.php ( 4.74 KB )
  102. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/Dispatch.php ( 10.44 KB )
  103. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/controller/Index.php ( 4.81 KB )
  104. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/BaseController.php ( 2.05 KB )
  105. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/facade/Db.php ( 0.93 KB )
  106. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/connector/Mysql.php ( 5.44 KB )
  107. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/PDOConnection.php ( 52.47 KB )
  108. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/Connection.php ( 8.39 KB )
  109. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/ConnectionInterface.php ( 4.57 KB )
  110. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/builder/Mysql.php ( 16.58 KB )
  111. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/Builder.php ( 24.06 KB )
  112. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/BaseBuilder.php ( 27.50 KB )
  113. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/Query.php ( 15.71 KB )
  114. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/BaseQuery.php ( 45.13 KB )
  115. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/TimeFieldQuery.php ( 7.43 KB )
  116. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/AggregateQuery.php ( 3.26 KB )
  117. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/ModelRelationQuery.php ( 20.07 KB )
  118. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/ParamsBind.php ( 3.66 KB )
  119. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/ResultOperation.php ( 7.01 KB )
  120. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/WhereQuery.php ( 19.37 KB )
  121. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/JoinAndViewQuery.php ( 7.11 KB )
  122. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/TableFieldInfo.php ( 2.63 KB )
  123. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/Transaction.php ( 2.77 KB )
  124. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/log/driver/File.php ( 5.96 KB )
  125. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/contract/LogHandlerInterface.php ( 0.86 KB )
  126. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/log/Channel.php ( 3.89 KB )
  127. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/event/LogRecord.php ( 1.02 KB )
  128. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/Collection.php ( 16.47 KB )
  129. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/facade/View.php ( 1.70 KB )
  130. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/View.php ( 4.39 KB )
  131. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Response.php ( 8.81 KB )
  132. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/response/View.php ( 3.29 KB )
  133. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Cookie.php ( 6.06 KB )
  134. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-view/src/Think.php ( 8.38 KB )
  135. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/contract/TemplateHandlerInterface.php ( 1.60 KB )
  136. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-template/src/Template.php ( 46.61 KB )
  137. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-template/src/template/driver/File.php ( 2.41 KB )
  138. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-template/src/template/contract/DriverInterface.php ( 0.86 KB )
  139. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/runtime/temp/067d451b9a0c665040f3f1bdd3293d68.php ( 11.98 KB )
  140. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-trace/src/Html.php ( 4.42 KB )
  1. CONNECT:[ UseTime:0.000747s ] mysql:host=127.0.0.1;port=3306;dbname=f_mffb;charset=utf8mb4
  2. SHOW FULL COLUMNS FROM `fenlei` [ RunTime:0.000735s ]
  3. SELECT * FROM `fenlei` WHERE `fid` = 0 [ RunTime:0.000307s ]
  4. SELECT * FROM `fenlei` WHERE `fid` = 63 [ RunTime:0.000273s ]
  5. SHOW FULL COLUMNS FROM `set` [ RunTime:0.000459s ]
  6. SELECT * FROM `set` [ RunTime:0.000196s ]
  7. SHOW FULL COLUMNS FROM `article` [ RunTime:0.000559s ]
  8. SELECT * FROM `article` WHERE `id` = 484066 LIMIT 1 [ RunTime:0.000684s ]
  9. UPDATE `article` SET `lasttime` = 1776099006 WHERE `id` = 484066 [ RunTime:0.006658s ]
  10. SELECT * FROM `fenlei` WHERE `id` = 66 LIMIT 1 [ RunTime:0.000239s ]
  11. SELECT * FROM `article` WHERE `id` < 484066 ORDER BY `id` DESC LIMIT 1 [ RunTime:0.002130s ]
  12. SELECT * FROM `article` WHERE `id` > 484066 ORDER BY `id` ASC LIMIT 1 [ RunTime:0.000502s ]
  13. SELECT * FROM `article` WHERE `id` < 484066 ORDER BY `id` DESC LIMIT 10 [ RunTime:0.001201s ]
  14. SELECT * FROM `article` WHERE `id` < 484066 ORDER BY `id` DESC LIMIT 10,10 [ RunTime:0.001979s ]
  15. SELECT * FROM `article` WHERE `id` < 484066 ORDER BY `id` DESC LIMIT 20,10 [ RunTime:0.005263s ]
0.134332s