Python异步编程实战:从async/await到高性能爬虫
同步爬虫爬100个页面要100秒,异步爬虫只要3秒——这就是协程的威力
这项技术到底解决了什么问题?
Python的requests库很好用,但有一个致命弱点:它是一次只能做一件事的“单线程思维”。当你爬取一个网页时,程序大部分时间在等待网络响应,CPU几乎空闲。请求100个页面,就要串行等待100次。
异步编程解决了这个IO密集型场景的效率问题:在等待网络响应时,切换到其他任务继续执行,一个线程就能处理成百上千个并发请求。
异步不是多线程——它是在单线程内通过协程切换实现并发。不涉及线程切换开销,也没有GIL的限制,非常适合IO密集型任务。
本教程从基础语法开始,逐步深入到一个实战项目:一个能爬取数千条数据的异步爬虫。
环境准备
# 创建项目目录mkdir python-asynccd python-async# 创建虚拟环境python -m venv venvsource venv/bin/activate# 安装依赖pip install aiohttp # 异步HTTP客户端pip install httpx # 更现代的异步HTTP客户端(支持HTTP/2)pip install pytest pytest-asyncio # 异步测试
第一步:理解协程和事件循环
协程是异步编程的核心概念。它就像一个可以暂停和恢复的函数。
import asyncio# ============================================# 定义协程函数:用 async def 声明# ============================================asyncdefgreet(name):print(f"开始问候 {name}")await asyncio.sleep(1) # 模拟IO操作,释放控制权print(f"你好, {name}!")returnf"结果: {name}"# ============================================# 运行协程的方式# ============================================asyncdefmain():# 方式1: 顺序执行(像同步一样)print("=== 顺序执行 ===") result1 = await greet("张三") result2 = await greet("李四")print(result1, result2)# 方式2: 并发执行(这才是异步的优势)print("\n=== 并发执行 ===") results = await asyncio.gather( greet("王五"), greet("赵六"), greet("钱七"), )print(results)# 入口if __name__ == "__main__": asyncio.run(main())
运行输出:
=== 顺序执行 ===开始问候 张三你好, 张三! (等1秒)开始问候 李四你好, 李四! (又等1秒)=== 并发执行 ===开始问候 王五开始问候 赵六开始问候 钱七你好, 王五! (三个同时完成,共等1秒)你好, 赵六!你好, 钱七!
三个核心概念:
| | |
| async def | |
| await | |
| asyncio.gather() | |
await的作用:它把控制权交还给事件循环,让事件循环可以去执行其他协程。没有await就没有并发——await点是协程之间切换的时机。
第二步:异步HTTP请求
创建 async_http_demo.py:
import asyncioimport aiohttpimport timeasyncdeffetch(session, url):"""异步获取一个URL的内容"""try:asyncwith session.get(url, timeout=10) as response: data = await response.text()return {"url": url,"status": response.status,"length": len(data), }except Exception as e:return {"url": url, "error": str(e)}asyncdeffetch_all(urls, max_concurrency=10):"""并发获取多个URL(控制并发数)"""# 信号量:限制最大并发数 semaphore = asyncio.Semaphore(max_concurrency)asyncdeffetch_with_limit(url):asyncwith semaphore:asyncwith aiohttp.ClientSession() as session:returnawait fetch(session, url) tasks = [fetch_with_limit(url) for url in urls]returnawait asyncio.gather(*tasks)asyncdefmain():# 测试用的URL列表 urls = ["https://httpbin.org/delay/1","https://httpbin.org/delay/1","https://httpbin.org/delay/1","https://httpbin.org/json","https://httpbin.org/ip", ] * 5# 25个请求print(f"开始并发请求 {len(urls)} 个URL...") start = time.time() results = await fetch_all(urls, max_concurrency=20) elapsed = time.time() - start success = [r for r in results if"error"notin r] errors = [r for r in results if"error"in r]print(f"\n耗时: {elapsed:.2f}秒")print(f"成功: {len(success)}")print(f"失败: {len(errors)}")print(f"QPS: {len(results) / elapsed:.0f}")if __name__ == "__main__": asyncio.run(main())
运行对比:
Semaphore的作用:限制同时进行的请求数。如果不加限制,25个请求同时发出,可能被服务器限流或者耗尽本地连接池。
第三步:异步实战——高性能爬虫
创建 crawler.py:
import asyncioimport aiohttpimport jsonimport timefrom typing importList, DictclassAsyncCrawler:"""异步爬虫框架"""def__init__(self, max_concurrency=50):self.max_concurrency = max_concurrencyself.semaphore = asyncio.Semaphore(max_concurrency)self.results = []asyncdeffetch_page(self, session, url):"""获取单个页面"""asyncwithself.semaphore:try:asyncwith session.get(url, timeout=15) as response:if response.status == 200:returnawait response.text()else:returnNoneexcept asyncio.TimeoutError:returnNoneexcept Exception:returnNoneasyncdefparse(self, html):"""解析页面(子类重写此方法)"""return html[:100] if html elseNoneasyncdefcrawl_one(self, session, url):"""爬取一个URL的完整流程:请求 → 解析 → 存储""" html = awaitself.fetch_page(session, url)if html isNone:return {"url": url, "status": "failed"} data = awaitself.parse(html)return {"url": url, "status": "success", "data": data}asyncdefcrawl_many(self, urls):"""并发爬取多个URL""" connector = aiohttp.TCPConnector( limit=self.max_concurrency, # 连接池大小 limit_per_host=20, # 每个域名最多20个并发 ttl_dns_cache=300, # DNS缓存5分钟 )asyncwith aiohttp.ClientSession( connector=connector, headers={"User-Agent": "Mozilla/5.0 (compatible; AsyncCrawler/1.0)", }, ) as session: tasks = [self.crawl_one(session, url) for url in urls]self.results = await asyncio.gather(*tasks)returnself.resultsdefget_stats(self):"""获取爬取统计""" total = len(self.results) success = sum(1for r inself.results if r["status"] == "success") failed = total - successreturn {"total": total,"success": success,"failed": failed,"success_rate": f"{success / total * 100:.1f}%"if total > 0else"N/A", }# ============================================# 自定义爬虫:爬取JSONPlaceholder文章# ============================================classPostCrawler(AsyncCrawler):"""爬取JSONPlaceholder的博客文章"""asyncdefparse(self, html):"""解析JSON数据"""try: data = json.loads(html)return {"id": data.get("id"),"title": data.get("title")[:50] if data.get("title") else"","body_length": len(data.get("body", "")), }except json.JSONDecodeError:returnNoneasyncdefmain():# 生成URL列表(JSONPlaceholder有100篇文章) urls = [f"https://jsonplaceholder.typicode.com/posts/{i}"for i inrange(1, 101) ]print(f"🚀 开始爬取 {len(urls)} 个页面...") start = time.time() crawler = PostCrawler(max_concurrency=20) results = await crawler.crawl_many(urls) elapsed = time.time() - start stats = crawler.get_stats()print(f"\n{'='*40}")print(f"📊 爬取统计")print(f"{'='*40}")print(f"总耗时: {elapsed:.2f}秒")print(f"总请求: {stats['total']}")print(f"成功数: {stats['success']}")print(f"失败数: {stats['failed']}")print(f"成功率: {stats['success_rate']}")print(f"QPS: {stats['total'] / elapsed:.0f}")# 打印几条结果示例print(f"\n📋 结果示例:")for r in results[:3]:if r["status"] == "success"and r["data"]:print(f" [{r['data']['id']}] {r['data']['title']}")if __name__ == "__main__": asyncio.run(main())
运行:
python crawler.py
输出示例:
🚀 开始爬取 100 个页面...========================================📊 爬取统计========================================总耗时: 2.34秒总请求: 100成功数: 100失败数: 0成功率: 100.0%QPS: 43📋 结果示例: [1] sunt aut facere repellat provident... [2] qui est esse [3] ea molestias quasi exercitationem...
第四步:异步文件读写和数据库操作
异步文件写入
import aiofilesasyncdefsave_results(results, filename):"""异步保存结果到文件"""asyncwith aiofiles.open(filename, 'w', encoding='utf-8') as f:for result in results:if result["status"] == "success": line = json.dumps(result["data"], ensure_ascii=False) + "\n"await f.write(line)print(f"结果已保存到 {filename}")
异步数据库操作(PostgreSQL示例)
import asyncpgasyncdefsave_to_db(results):"""异步批量写入数据库""" conn = await asyncpg.connect( user='test', password='test', database='test', host='localhost' )asyncwith conn.transaction():for result in results:if result["status"] == "success": data = result["data"]await conn.execute("INSERT INTO posts (id, title, body_length) VALUES ($1, $2, $3)", data["id"], data["title"], data["body_length"] )await conn.close()
第五步:异步测试
创建 test_crawler.py:
import pytestimport asynciofrom crawler import PostCrawler@pytest.mark.asyncioasyncdeftest_fetch_single_page():"""测试单页面爬取""" crawler = PostCrawler(max_concurrency=1) urls = ["https://jsonplaceholder.typicode.com/posts/1"] results = await crawler.crawl_many(urls)assertlen(results) == 1assert results[0]["status"] == "success"assert results[0]["data"]["id"] == 1assert"title"in results[0]["data"]@pytest.mark.asyncioasyncdeftest_crawl_multiple_pages():"""测试多页面并发爬取""" crawler = PostCrawler(max_concurrency=10) urls = [f"https://jsonplaceholder.typicode.com/posts/{i}"for i inrange(1, 11)] results = await crawler.crawl_many(urls) stats = crawler.get_stats()assert stats["total"] == 10assert stats["success"] == 10assert stats["failed"] == 0@pytest.mark.asyncioasyncdeftest_handle_error():"""测试错误处理""" crawler = PostCrawler(max_concurrency=1) urls = ["https://invalid-domain-12345.com/api"] results = await crawler.crawl_many(urls)assert results[0]["status"] == "failed"
pytest test_crawler.py -v
实用技巧与避坑指南
- 1. 不要在协程中调用同步阻塞函数:
time.sleep()、requests.get()会阻塞整个事件循环。应使用await asyncio.sleep()、aiohttp等异步替代品。 - 2. 控制并发数:使用
asyncio.Semaphore限制并发数。无限制并发可能被服务器封IP,或者耗尽本机文件描述符。 - 3. 正确处理异常:协程中的异常不会自动传播。使用
asyncio.gather(return_exceptions=True)获取异常而不中断其他任务。或者使用try-except包裹每个协程内的逻辑。 - 4. 避免在协程中做CPU密集型计算:异步的优势在于IO密集型场景。大量的JSON解析或正则匹配应该在子线程中执行:
await asyncio.to_thread(cpu_heavy_func, data)。 - 5. Python 3.11+ 的TaskGroup(推荐):
asyncio.TaskGroup是3.11引入的新特性,比asyncio.gather提供了更好的异常处理——组内任一任务失败,自动取消组内其他任务:asyncdefmain():asyncwith asyncio.TaskGroup() as tg: tg.create_task(crawl("url1")) tg.create_task(crawl("url2"))# 如果任何一个任务失败,其他任务会自动取消
总结
本教程完成了Python异步编程的完整学习路径:
| | |
| | async def |
| | asyncio.gather() |
| | aiohttp |
| | aiofiles |
| | pytest.mark.asyncio |
掌握了异步编程,你就能处理任何IO密集型的Python任务——爬虫、API网关、消息处理、实时数据流——用更少的资源实现更高的吞吐量。