学习搭子,异步编程的学习过程中,咱们总会遇到各种困惑和疑问。有些问题看似简单,却触及异步编程的核心原理;有些问题则在实际开发中频繁出现。今天,咱们就集中解答这些常见问题,帮你彻底扫清学习障碍!
咱们的目标不仅是“知道答案”,更是要“理解为什么”,形成解决异步问题的系统性思维。
| | |
| 并发模型 | | |
| 内存开销 | | |
| 切换成本 | | |
| 适用场景 | | |
| 调试难度 | | |
| 是否需要锁 | | |
- • 异步编程:一个聪明的服务员,在等待厨房做菜时,去服务其他客人
- • 多线程:多个服务员同时工作,但可能争抢同一个资源(需要协调)
# 异步版本
asyncdefasync_fetch(urls):
asyncwith aiohttp.ClientSession() as session:
tasks = [session.get(url) for url in urls]
responses = await asyncio.gather(*tasks)
return [await r.text() for r in responses]
# 多线程版本
defthreaded_fetch(urls):
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(requests.get, url) for url in urls]
return [f.result().text for f in futures]
协程本质:
协程是可暂停、可恢复的函数状态机,由事件循环在用户态调度执行。
- • 历史渊源:Python的协程最初基于生成器实现(
yield/yield from) - • 语法演进:Python 3.5引入
async/await语法,让协程成为一等公民
# 旧版(基于生成器)
@asyncio.coroutine
defold_style_coro():
yieldfrom asyncio.sleep(1)
# 新版(async/await)
asyncdefnew_style_coro():
await asyncio.sleep(1)
# 两者底层都使用生成器机制
Q3: GIL(全局解释器锁)对异步编程有什么影响?
import asyncio
import concurrent.futures
asyncdefhybrid_approach():
# I/O密集型:使用异步
asyncwith aiohttp.ClientSession() as session:
data = await fetch_io_data(session)
# CPU密集型:使用进程池
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(
pool,
cpu_intensive_computation,
data
)
return result
asyncdefmain():
coro = async_function() # 错误:只是创建协程对象
# 需要:result = await async_function()
# 缺少:asyncio.run(main())
asyncdefmain():
print("不会执行")
# 正确:asyncio.run(main())
asyncdefmain():
# 仅创建,未await或create_task
coro = asyncio.sleep(1)
# 需要:task = asyncio.create_task(coro)
import asyncio
asyncdefdebug_async_execution():
print("Step 1: 函数开始")
# 模拟异步操作
await asyncio.sleep(0.1)
print("Step 2: 异步操作完成")
return"成功"
# 测试执行
try:
result = asyncio.run(debug_async_execution())
print(f"结果: {result}")
except Exception as e:
print(f"错误: {e}")
Q5: async with和async for是什么?什么时候使用?
- • 用途:异步上下文管理器,用于异步资源的获取和释放
- • 适用场景:数据库连接、HTTP会话、文件操作等
# 异步数据库连接
asyncwith aiomysql.create_pool(...) as pool:
asyncwith pool.acquire() as conn:
asyncwith conn.cursor() as cursor:
await cursor.execute("SELECT ...")
result = await cursor.fetchall()
# 异步文件操作
asyncwith aiofiles.open('data.txt', 'r') as f:
content = await f.read()
# 异步数据库游标
asyncfor row in cursor:
process(row)
# 自定义异步生成器
asyncdefasync_generator():
for i inrange(10):
await asyncio.sleep(0.1)
yield i
asyncdefmain():
asyncfor value in async_generator():
print(value)
import asyncio
import time
asyncdefslow_operation():
# 危险:同步阻塞调用
time.sleep(1) # 应该用 await asyncio.sleep(1)
return"完成"
asyncdefmonitor_tasks():
import asyncio
tasks = asyncio.all_tasks()
current = asyncio.current_task()
active_tasks = [t for t in tasks if t isnot current]
print(f"活跃Task数: {len(active_tasks)}")
# 如果数量持续增长,可能有泄漏
asyncio.run(main(), debug=True)
# 会显示:慢回调警告、Task创建堆栈等
import cProfile
import pstats
defprofile_async():
asyncio.run(async_function())
cProfile.run('profile_async()', 'profile_stats')
stats = pstats.Stats('profile_stats')
stats.sort_stats('cumulative').print_stats(10)
asyncdefsafe_operation():
try:
result = await risky_operation()
return result
except SpecificError as e:
# 处理特定异常
return fallback_value
except Exception as e:
# 记录日志
logging.error(f"操作失败: {e}")
raise# 或返回默认值
- 2. 使用gather的return_exceptions:
asyncdefmultiple_operations():
tasks = [
operation1(),
operation2(),
operation3()
]
# 不会因单个失败而整体失败
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result inenumerate(results):
ifisinstance(result, Exception):
print(f"任务{i}失败: {result}")
# 处理异常,但不中断
else:
print(f"任务{i}成功: {result}")
asyncdefmonitored_task():
task = asyncio.create_task(operation())
try:
result = await task
return result
except Exception as e:
# 记录额外信息
print(f"Task失败: {task}, 异常: {e}")
raise
import asyncio
asyncdefdebug_deadlock():
# 启用调试
asyncio.get_event_loop().set_debug(True)
# 设置慢回调阈值(默认100ms)
asyncio.get_event_loop().slow_callback_duration = 0.05
# 你的代码...
import asyncio
asyncdefwith_timeout():
try:
# 设置超时
result = await asyncio.wait_for(
long_operation(),
timeout=5.0
)
return result
except asyncio.TimeoutError:
print("操作超时,可能死锁")
# 执行清理
returnNone
import asyncio
import logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s [%(levelname)s] %(message)s'
)
asyncdeftraced_operation():
logging.debug("开始操作")
await asyncio.sleep(1)
logging.debug("操作完成")
Q9: 为什么asyncio.sleep(0)能让出控制权?
- 4. 事件循环发现定时器到期,将Task移回就绪队列
asyncdefsleep_zero():
# 内部实现简化
future = asyncio.Future()
# 立即完成
future.set_result(None)
# await让出控制权
returnawait future
asyncdefcooperative_loop():
for i inrange(1000000):
# 定期让出,避免独占
if i % 1000 == 0:
await asyncio.sleep(0)
# 执行一些计算
process_item(i)
import asyncio
# 不安全版本
counter = 0
asyncdefunsafe_increment():
global counter
temp = counter
await asyncio.sleep(0.001) # 切换点
counter = temp + 1
# 安全版本:使用锁
counter = 0
lock = asyncio.Lock()
asyncdefsafe_increment():
global counter
asyncwith lock:
temp = counter
await asyncio.sleep(0.001)
counter = temp + 1
import asyncio
cache = {}
cache_lock = asyncio.Lock()
asyncdefget_cached_data(key):
# 双重检查锁模式(异步版本)
if key in cache:
return cache[key]
asyncwith cache_lock:
# 再次检查(其他任务可能已填充)
if key in cache:
return cache[key]
# 获取数据
data = await fetch_data(key)
cache[key] = data
return data
import asyncio
queue = asyncio.Queue(maxsize=100)
asyncdefproducer():
for i inrange(100):
await queue.put(i)
await asyncio.sleep(0.01)
asyncdefconsumer():
whileTrue:
item = await queue.get()
process(item)
queue.task_done()
asyncdefcpu_hog():
# 错误:大量计算无await
result = 0
for i inrange(10**8):
result += i * i
# 改进:分片计算
result = 0
chunk_size = 10000
for start inrange(0, 10**8, chunk_size):
end = min(start + chunk_size, 10**8)
result += sum(i * i for i inrange(start, end))
# 定期让出
await asyncio.sleep(0)
# 改进:合理设置超时
asyncdefefficient_wait():
while has_work:
# 检查是否有任务
ifnot tasks:
# 短时间休眠,避免空转
await asyncio.sleep(0.01)
else:
await process_tasks()
import asyncio
import uvloop
# 替换默认事件循环
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# 显著提升性能
import aiohttp
asyncdefoptimized_fetch(urls):
connector = aiohttp.TCPConnector(
limit=100, # 最大连接数
limit_per_host=10, # 每主机最大连接
ttl_dns_cache=300, # DNS缓存时间
force_close=False# 保持连接复用
)
timeout = aiohttp.ClientTimeout(
total=30,
connect=5,
sock_read=10
)
asyncwith aiohttp.ClientSession(
connector=connector,
timeout=timeout
) as session:
# 使用信号量控制并发
semaphore = asyncio.Semaphore(50)
asyncdeffetch_one(url):
asyncwith semaphore:
asyncwith session.get(url) as resp:
returnawait resp.text()
tasks = [asyncio.create_task(fetch_one(url))
for url in urls]
returnawait asyncio.gather(*tasks)
asyncdefbatch_requests(urls, batch_size=50):
results = []
for i inrange(0, len(urls), batch_size):
batch = urls[i:i+batch_size]
# 并发处理批次
batch_results = await asyncio.gather(
*[fetch(url) for url in batch],
return_exceptions=True
)
results.extend(batch_results)
# 批次间短暂暂停
await asyncio.sleep(0.1)
return results
import aiomysql
asyncdefcreate_optimized_pool():
pool = await aiomysql.create_pool(
host='localhost',
port=3306,
user='user',
password='pass',
db='database',
minsize=5, # 最小连接数
maxsize=20, # 最大连接数
echo=False, # 关闭SQL日志
autocommit=True,
pool_recycle=3600, # 连接回收时间
max_queries=50000, # 最大查询数后重建连接
loop=asyncio.get_event_loop()
)
return pool
asyncdefsafe_transaction(pool, user_id, amount):
asyncwith pool.acquire() as conn:
asyncwith conn.cursor() as cursor:
try:
# 开始事务
await conn.begin()
# 多个操作
await cursor.execute(
"UPDATE accounts SET balance = balance - %s WHERE id = %s",
(amount, user_id)
)
await cursor.execute(
"INSERT INTO transactions VALUES (%s, %s, NOW())",
(user_id, amount)
)
# 提交事务
await conn.commit()
except Exception as e:
# 回滚
await conn.rollback()
raise
import asyncio
import weakref
classTaskTracker:
def__init__(self):
self._tasks = weakref.WeakSet()
deftrack(self, task):
self._tasks.add(task)
return task
defactive_count(self):
returnlen(self._tasks)
# 使用
tracker = TaskTracker()
asyncdefmonitored_task():
task = asyncio.create_task(operation())
tracker.track(task)
returnawait task
import objgraph
import asyncio
asyncdefanalyze_memory():
# 记录当前对象
objgraph.show_growth(limit=10)
# 运行一段时间后
await asyncio.sleep(60)
# 再次检查
objgraph.show_growth(limit=10)
# 显示最多实例的类型
objgraph.show_most_common_types(limit=20)
import gc
asyncdefcheck_memory_leaks():
# 启用调试
gc.set_debug(gc.DEBUG_SAVEALL)
# 强制回收
collected = gc.collect()
print(f"回收对象: {collected}")
# 检查未回收对象
unreachable = gc.garbage
print(f"未回收对象数: {len(unreachable)}")
if unreachable:
for obj in unreachable[:10]:
print(f"未回收对象: {type(obj)}, id: {id(obj)}")
# 1. 识别I/O密集型模块
# 2. 创建异步版本的服务
# 3. 通过线程池桥接
import asyncio
import concurrent.futures
from threading import Thread
classAsyncBridge:
def__init__(self):
self._loop = None
self._thread = None
defstart(self):
"""启动事件循环线程"""
self._loop = asyncio.new_event_loop()
defrun_loop():
asyncio.set_event_loop(self._loop)
self._loop.run_forever()
self._thread = Thread(target=run_loop, daemon=True)
self._thread.start()
defstop(self):
ifself._loop:
self._loop.call_soon_threadsafe(self._loop.stop)
asyncdef_async_operation(self, data):
# 异步实现
await asyncio.sleep(0.1)
return processed_data
defsync_call(self, data):
"""同步调用异步操作"""
future = asyncio.run_coroutine_threadsafe(
self._async_operation(data),
self._loop
)
return future.result(timeout=10)
# 同步Flask → 异步FastAPI
from fastapi import FastAPI
import asyncio
app = FastAPI()
@app.get("/data")
asyncdefget_data():
# 异步数据库查询
result = await async_db_query()
return result
# 原有同步代码通过线程池调用
@app.get("/legacy")
asyncdeflegacy_endpoint():
loop = asyncio.get_running_loop()
# 将同步函数委托给线程池
result = await loop.run_in_executor(
None, # 默认线程池
sync_legacy_function,
"参数"
)
return result
import asyncio
import signal
from contextlib import asynccontextmanager
classGracefulShutdown:
def__init__(self):
self.should_exit = False
self.active_connections = set()
defadd_connection(self, conn):
self.active_connections.add(conn)
defremove_connection(self, conn):
if conn inself.active_connections:
self.active_connections.remove(conn)
asyncdefwait_for_shutdown(self):
"""等待优雅关闭"""
# 停止接收新请求
print("停止接收新请求...")
# 等待活跃连接完成
ifself.active_connections:
print(f"等待 {len(self.active_connections)} 个活跃连接完成...")
# 设置超时
try:
await asyncio.wait_for(
self._wait_all_connections(),
timeout=30.0
)
except asyncio.TimeoutError:
print("超时,强制关闭剩余连接")
# 执行清理
awaitself.cleanup_resources()
print("关闭完成")
asyncdefmain():
shutdown = GracefulShutdown()
# 设置信号处理
loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(
sig,
lambda: asyncio.create_task(shutdown.wait_for_shutdown())
)
# 运行服务...
await server.serve_forever()
import asyncio
asyncdefperiodic_task(interval: float):
"""定期执行的任务"""
whileTrue:
print("执行定时任务...")
await asyncio.sleep(interval)
asyncdefscheduled_task(delay: float, func, *args):
"""延迟执行的任务"""
await asyncio.sleep(delay)
returnawait func(*args)
# 启动
asyncdefmain():
# 定期任务
periodic = asyncio.create_task(periodic_task(5.0))
# 延迟任务
result = await scheduled_task(10.0, some_async_function, "参数")
# 取消定期任务
periodic.cancel()
try:
await periodic
except asyncio.CancelledError:
print("定期任务已取消")
import asyncio
import heapq
from typing importCallable, Any
from dataclasses import dataclass, field
from datetime import datetime, timedelta
@dataclass(order=True)
classScheduledJob:
scheduled_time: datetime
job_id: int = field(compare=False)
func: Callable = field(compare=False)
args: tuple = field(compare=False)
kwargs: dict = field(compare=False)
repeat: bool = False
interval: timedelta = None
classAsyncScheduler:
def__init__(self):
self._jobs = []
self._next_id = 1
self._running = True
self._task = asyncio.create_task(self._run())
asyncdefschedule(self, func: Callable, delay: float,
repeat: bool = False, interval: float = None,
*args, **kwargs) -> int:
"""调度任务"""
job_id = self._next_id
self._next_id += 1
scheduled_time = datetime.now() + timedelta(seconds=delay)
if repeat and interval:
interval_td = timedelta(seconds=interval)
else:
interval_td = None
job = ScheduledJob(
scheduled_time=scheduled_time,
job_id=job_id,
func=func,
args=args,
kwargs=kwargs,
repeat=repeat,
interval=interval_td
)
heapq.heappush(self._jobs, job)
return job_id
asyncdefcancel(self, job_id: int) -> bool:
"""取消任务"""
for i, job inenumerate(self._jobs):
if job.job_id == job_id:
self._jobs.pop(i)
heapq.heapify(self._jobs)
returnTrue
returnFalse
asyncdef_run(self):
"""调度器主循环"""
whileself._running:
ifnotself._jobs:
await asyncio.sleep(0.1)
continue
now = datetime.now()
next_job = self._jobs[0]
if now >= next_job.scheduled_time:
job = heapq.heappop(self._jobs)
# 执行任务
asyncio.create_task(self._execute_job(job))
# 如果需要重复,重新调度
if job.repeat and job.interval:
new_job = ScheduledJob(
scheduled_time=now + job.interval,
job_id=job.job_id,
func=job.func,
args=job.args,
kwargs=job.kwargs,
repeat=job.repeat,
interval=job.interval
)
heapq.heappush(self._jobs, new_job)
else:
# 等待到下一个任务
wait_time = (next_job.scheduled_time - now).total_seconds()
await asyncio.sleep(min(wait_time, 0.1))
asyncdef_execute_job(self, job: ScheduledJob):
"""执行具体任务"""
try:
if asyncio.iscoroutinefunction(job.func):
await job.func(*job.args, **job.kwargs)
else:
# 同步函数使用线程池
loop = asyncio.get_running_loop()
await loop.run_in_executor(
None,
job.func,
*job.args,
**job.kwargs
)
except Exception as e:
print(f"任务执行失败: {e}")
asyncdefstop(self):
"""停止调度器"""
self._running = False
self._task.cancel()
try:
awaitself._task
except asyncio.CancelledError:
pass
import pytest
import asyncio
# 基础测试
@pytest.mark.asyncio
asyncdeftest_async_function():
result = await async_function()
assert result == "预期值"
# 模拟测试
@pytest.mark.asyncio
asyncdeftest_with_mock():
with patch('module.async_func',
return_value="模拟值"):
result = await target_function()
assert result == "预期结果"
# 超时测试
@pytest.mark.asyncio
asyncdeftest_timeout():
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(
long_running_operation(),
timeout=0.1
)
import asyncio
import unittest
classAsyncTestCase(unittest.TestCase):
def__init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._loop = asyncio.new_event_loop()
defrun_async(self, coro):
"""运行异步函数"""
returnself._loop.run_until_complete(coro)
defsetUp(self):
asyncio.set_event_loop(self._loop)
deftearDown(self):
self._loop.run_until_complete(
self._loop.shutdown_asyncgens()
)
self._loop.close()
asyncio.set_event_loop(None)
import structlog
import asyncio
# 配置
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
],
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
wrapper_class=structlog.BoundLogger,
cache_logger_on_first_use=True,
)
asyncdefasync_logging_example():
log = structlog.get_logger()
# 异步日志记录
await log.info("开始处理请求",
request_id="123",
user_id="456")
try:
result = await process_request()
await log.info("请求处理成功",
request_id="123",
duration_ms=150)
return result
except Exception as e:
await log.error("请求处理失败",
request_id="123",
error=str(e),
traceback=True)
raise
import logging
import asyncio
from concurrent.futures import ThreadPoolExecutor
classAsyncLogHandler(logging.Handler):
def__init__(self, level=logging.NOTSET):
super().__init__(level)
self._executor = ThreadPoolExecutor(max_workers=1)
asyncdefasync_emit(self, record):
"""异步处理日志记录"""
# 这里执行实际的日志记录(可能是I/O操作)
awaitself._write_log(record)
defemit(self, record):
"""同步接口,委托给异步处理"""
loop = asyncio.get_event_loop()
# 将日志记录委托给线程池执行
future = asyncio.run_coroutine_threadsafe(
self.async_emit(record),
loop
)
# 可选:等待完成或忽略
try:
future.result(timeout=1.0)
except Exception:
pass# 日志记录失败不应影响主程序
# 正确:异步调用同步(通过线程池)
asyncdefasync_calls_sync():
loop = asyncio.get_running_loop()
# 将同步函数委托给线程池
result = await loop.run_in_executor(
None,
sync_function,
"参数"
)
return result
# 危险:同步直接调用异步
defsync_calls_async():
# 错误做法:可能阻塞或死锁
asyncio.run(async_function())
# 正确做法:使用专为同步环境设计的桥接
result = asyncio.run(async_function())
return result
import threading
import asyncio
classThreadLocalAsync:
"""每个线程有自己的事件循环"""
def__init__(self):
self._local = threading.local()
defget_loop(self):
"""获取或创建线程本地的事件循环"""
ifnothasattr(self._local, 'loop'):
self._local.loop = asyncio.new_event_loop()
returnself._local.loop
defrun_in_thread(self, coro_func, *args):
"""在调用线程中运行异步函数"""
loop = self.get_loop()
ifnot loop.is_running():
# 在新线程中启动事件循环
defrun_loop():
asyncio.set_event_loop(loop)
loop.run_forever()
thread = threading.Thread(target=run_loop, daemon=True)
thread.start()
# 提交任务
future = asyncio.run_coroutine_threadsafe(
coro_func(*args),
loop
)
return future.result(timeout=30)
# 跟踪最新特性
import sys
if sys.version_info >= (3, 11):
# 使用TaskGroup等新特性
import asyncio
asyncdefmodern_async():
asyncwith asyncio.TaskGroup() as tg:
task1 = tg.create_task(operation1())
task2 = tg.create_task(operation2())
return task1.result(), task2.result()
defshould_use_async(scenario):
criteria = {
'high_concurrent_io': True, # 高并发I/O
'cpu_bound': False, # CPU密集型
'existing_sync_infra': False, # 已有同步架构
'team_experience': True, # 团队有经验
'project_duration': 'long', # 项目周期
}
if (criteria['high_concurrent_io'] and
not criteria['cpu_bound'] and
not criteria['existing_sync_infra']):
return"推荐异步"
return"考虑同步或混合方案"
| | |
| | |
| | |
| | |
| | 明确异常处理、使用return_exceptions |
| | |
学习搭子,异步编程的世界充满挑战,但掌握其原理和最佳实践后,你将能构建出高性能、高可靠的现代应用。
异步不是终点,而是通往更高效、更可靠软件系统的桥梁。在实际项目中,要根据具体场景做出明智的选择:何时该用异步,何时该用同步,何时需要混合方案。
如果在实践中遇到新的问题,或者想深入探讨某个主题,随时可以继续交流。咱们一起在这个快速发展的技术领域里不断成长!