文章标题:Python 协程与异步编程介绍
所属系列:程序开发
前言:本文主要介绍了Python的协程和异步编程,主要包含如下内容:
- 随后,本文介绍了 Python 中异步编程的核心写法和重要API。
- 最后,本文尝试以 Ptyhon的
yield 为例,解释如何从看似“同步”的顺序执行中塑造出“可中断(异步)”的代码逻辑
前置知识
- 如理解协程/异步编程/JS的async语法,有助于理解本文
正文
异步编程的基本概念
协程的引入
这里我们详细介绍一下协程的概念。假设读者已经掌握进程、线程与CPU执行程序的基础概念。我们直接切入一个具体的执行场景。假设有一个单核计算节点,我们需要执行四个相同的函数。每个函数内部会挂起等待20秒,随后返回数值1。我们的目标是尽可能快地完成这四个函数的执行,并计算它们的返回值总和(Sum等于4)。
串行执行模式
在最基础的串行模式下,程序在一个线程内按严格的顺序执行。CPU在遇到等待指令时会完全闲置。四个任务依次执行,总耗时是80秒。
import timedeftask(): time.sleep(20)return1start = time.time()results = [task() for _ in range(4)]print(f"Sum: {sum(results)}, Time: {time.time() - start}s")
资源情况:【1线程,80s】
进程与线程模型
为了缩短等待时间,我们可以开启四个线程,每个线程分别负责执行一个函数。此时操作系统的调度器会介入。当一个线程进入20秒的等待状态时,操作系统会将其挂起,并把CPU时间片分配给其他线程。四个线程的等待时间在物理时间上重叠,因此总耗时降至20秒。这里必然存在操作系统层面的上下文切换(Context Switch)开销。
import timeimport threadingdeftask(results, index): time.sleep(20) results[index] = 1start = time.time()results = [0] * 4threads = []for i in range(4):# 直接干四个线程出来,每个线程跑完后把结果添加进数组里 t = threading.Thread(target=task, args=(results, i)) threads.append(t) t.start()# 等待所有线程结束for t in threads: t.join()print(f"Sum: {sum(results)}, Time: {time.time() - start}s")
资源情况:【4线程,20s】
注1:进程与线程的核心差异在于内存隔离级别。进程之间内存相互独立,数据交互必须依赖进程间通信(IPC)。同一个进程内的多个线程则共享大部分内存空间。因此操作系统的进程上下文切换非常缓慢,而线程的上下文切换相对较快。
注2:Python解释器存在全局解释器锁(GIL)。在上述任务中,休眠操作属于IO等待,Python会自动释放GIL,因此四个线程得以并发等待,总耗时依然是20秒。但如果任务是密集的CPU计算(重量任务),多线程在GIL的限制下永远只能利用一个CPU核心。因此在Python中处理计算密集型并发必须使用多进程。
线程池与任务模型
单纯的线程模型会将任务与执行线程强绑定。线程池模型则将“任务”与“执行者”分离开来。开发者将任务投递到队列中。线程池内部维护一定数量的常驻线程。一旦有线程空闲,就会从任务队列中取出一个任务并执行。
import timefrom concurrent.futures import ThreadPoolExecutordeftask(): time.sleep(20)return1start = time.time()with ThreadPoolExecutor(max_workers=2) as executor: futures = [executor.submit(task) for _ in range(4)] results = [f.result() for f in futures]print(f"Sum: {sum(results)}, Time: {time.time() - start}s")
资源情况:【2线程,40s】
假设线程池的容量设为2。由于这里的任务是不可中断的,任务会强制绑定并占用分配给它的线程(任务中的sleep会带来线程io挂起)。前两个任务会立即占用仅有的两个线程并开始20秒的等待。此时队列中剩余的两个任务只能处于等待分配状态。20秒后前两个任务完成并释放线程,后两个任务才得以执行。总耗时因此变为40秒。
那么线程是如何实现的呢?下面是一个简单的例子:
- 每次submit实际上只会把task投递到队列中。
- 线程池中的每个线程循环从这个公共的队列中取出一个task,执行并标记这个task完成。
- 主线程可以通过task.result等待结果返回后唤醒主线程
import threadingimport queueimport timeclassFuture:def__init__(self): self._result = None self._is_completed = False# 使用事件对象来处理阻塞等待 self._event = threading.Event()defset_result(self, result): self._result = result self._is_completed = True# 唤醒所有正在等待结果的线程 self._event.set()defis_completed(self):return self._is_completeddefresult(self):# 如果任务未完成则阻塞当前调用线程 self._event.wait()return self._resultclassSimpleThreadPool:def__init__(self, num_threads): self.task_queue = queue.Queue()for _ in range(num_threads): worker = threading.Thread(target=self._worker_loop) worker.daemon = True worker.start()def_worker_loop(self):whileTrue:# 工作线程取出任务以及绑定的凭证 func, args, future = self.task_queue.get()try: res = func(*args)# 任务完成后将结果写入凭证 future.set_result(res)finally: self.task_queue.task_done()defsubmit(self, func, *args):# 创建凭证并立即返回给调用者 future = Future() self.task_queue.put((func, args, future))return future##### 测试用例:2个线程处理4个耗时任务(基本类似上面的代码)###### 核心接口:submit任务,result等待结果deftask(task_id): time.sleep(20)return1pool = SimpleThreadPool(2)start = time.time()# 提交任务并收集返回的凭证futures = []for i in range(4): futures.append(pool.submit(task, i))# 通过凭证阻塞获取最终结果results = [f.result() for f in futures]print(f"Sum: {sum(results)}, Time: {time.time() - start}s")
以上讨论的均属于同步执行模式。在这种模式下存在一个核心问题:线程的IO等待(例如休眠或网络请求)会触发操作系统级别的线程挂起。被挂起的线程虽然不再消耗CPU运算周期,但依然会持续占用内存等系统资源。当并发量极高时,海量的阻塞线程会直接耗尽系统内存(详细会在下文介绍)。
我们希望一种更轻量的机制:任务的挂起并不带来线程的挂起,一个线程可以执行一个任务,并且在任务处于io等待的时候选择另一个任务执行。为此,我们引入了“协程”
协程与事件循环
协程是最适合处理IO挂起等待操作的并发模型。它在单一线程内部维护一个事件循环(Event Loop)与任务集合。它实现了我们的希望:当任务遇到IO操作时会主动交出执行权,而不是阻塞整个线程。
import asyncioimport timeasyncdeftask():await asyncio.sleep(20)return1asyncdefmain(): start = time.time() tasks = [asyncio.create_task(task()) for _ in range(4)] results = await asyncio.gather(*tasks) print(f"Sum: {sum(results)}, Time: {time.time() - start}s")asyncio.run(main())
资源情况:【1线程,20s】
协程的本质上使用了事件循环实现任务切换。我们可以用一个例子来揭示单线程如何实现任务的非阻塞异步执行。
我们首先介绍一下python的语法糖yield/yield from,下面展示了其使用方法
- yield:可以“神奇的”暂停当前执行执行的函数并且返回yield值(实现机制见补充部分)。当再次调用这个函数时,其将会从yield处开始继续执行。
- 在python中,这个会被称为“可随时冻结状态的生成器”
- 调用
next(generator) 的时候,当生成器内部的代码全部执行完毕,并遇到return语句时,Python解释器会抛出一个名为StopIteration的异常。return值会绑定在这个exception的value中,因此返回值可以被外部获取。
- yield from:把内层的yield(生成器)自动穿透出去,并且自动解包拿到result结果
definner_sleep(): print("内层:准备休眠")# yield 会在此处暂停 并把 20 扔给最外层的调用者yield20 print("内层:休眠结束")# return 会触发 StopIteration 异常return"内层的结果"defouter_task(): print("外层:开始执行")# yield from 建立了一条秘密通道# 它把内层的 20 透传出去 并在内层结束时自动拿到 return 的值 result = yieldfrom inner_sleep() print(f"外层:拿到 {result}")return"外层的最终结果"# 实例化外层生成器gen = outer_task()print("主程序:第一次推动执行")sleep_time = next(gen)print(f"主程序:收到休眠请求 {sleep_time} 秒")print("\n主程序:第二次推动执行")try: next(gen)except StopIteration as e: print(f"主程序:捕获异常 拿到最终返回值 {e.value}")# ===== 输出 =====# # 主程序:第一次推动执行 # 外层:开始执行 # 内层:准备休眠 # 主程序:收到休眠请求 20 秒 # # 主程序:第二次推动执行 # 内层:休眠结束 # 外层:拿到 内层的结果 # 主程序:捕获异常 拿到最终返回值 外层的最终结果
随后我们介绍如何使用yield来构建出我们的“可中断任务”,我们使用事件循环模型:
- 事件循环使用两个关键量:ready来存储可执行任务(队列);io_waiting使用字典 < io_wait, task> 来表示当收到 io事件 的时候应该唤醒哪个task ;
- 如果其返回了io等待需求则放回等待字典,并发起io请求事件
- 结束事件循环的时候,所有task都完成了 下面是一个示例代码(经过了轻微简化,直接for去查看是否有任务已经完成waiting了,实际实现中一般都是操作系统event trigger的)
import timefrom collections import dequeclassEventLoop:def__init__(self):# 1. ready队列存储可执行任务 self.ready = deque()# 1. io_waiting字典映射等待事件:任务对象 -> 唤醒时间 self.io_waiting = {} self.results = []defcreate_task(self, coro):# 2. 创建任务并推入就绪队列 self.ready.append(coro)defrun(self):# 3. 事件循环:当还有任务未完成时持续运行while self.ready or self.io_waiting: now = time.time()# 3.1 检查IO事件并更新任务状态 finished_io_tasks = []for coro, wake_time in self.io_waiting.items():if now >= wake_time: finished_io_tasks.append(coro)# 将满足条件的任务移出等待字典并放回就绪队列for coro in finished_io_tasks: self.io_waiting.pop(coro) self.ready.append(coro)# 3.2 尝试取出一个ready的任务并执行if self.ready: coro = self.ready.popleft()try:# 推动任务执行并获取IO等待需求 sleep_time = next(coro)# 3.2.1 记录IO等待需求并放回等待字典 self.io_waiting[coro] = time.time() + sleep_timeexcept StopIteration as e:# 3.2.2 任务完成并更新结果 self.results.append(e.value)else:# 避免CPU空转 time.sleep(0.01)# 4. 结束事件循环返回所有结果return self.resultsdefsleep(seconds):yield secondsdeftask():yieldfrom sleep(20)return1start = time.time()loop = EventLoop()for _ in range(4): loop.create_task(task())results = loop.run()print(f"Sum: {sum(results)}, Time: {time.time() - start}s")
很显然,基于事件循环的任务切换是发生在单个线程内部的,事件切换本质上变成了一个函数跳转,因此我们消除了对多线程与线程切换的需要。
为什么用协程替代线程
既然线程在遇到耗时IO等待时会挂起且不占用CPU,那为什么我们还需要协程,而不是直接开启海量的线程呢?核心原因在于两者在底层的性能成本存在巨大差异。我们可以从以下四个维度来剖析。
1. 上下文切换的“重量” (The Weight of Switching)
- 线程(Thread - Spring Boot):由 操作系统(OS) 调度。
- 切换时,CPU 需要保存寄存器、堆栈指针、刷新 TLB(地址转换缓存)。这涉及到内核态与用户态的切换,每次大约耗时几个微秒。
- 当线程数达到几千个时,CPU 可能花 20%~30% 的时间在“切换”上,而不是在“干活”上。
- 协程(Coroutine - FastAPI/Async):由 程序自身(用户态) 调度。
- 切换只是简单的函数跳转和状态记录,完全不经过内核。耗时极短(纳秒级),开销几乎可以忽略不计。
2. 内存开销 (Memory Footprint) —— 往往是真正的瓶颈
- 线程:每个线程需要预分配一个栈(Stack)内存。在 Java/JVM 中,默认通常是 1MB。
- 如果你有 1000 个并发请求 = 1000 个线程 = 1GB 内存 没了。
- 你可以轻松在普通服务器上开启 10 万个协程,但你绝对开不出 10 万个线程。
3. I/O 等待时的状态 (Blocking vs Suspending)
这是性能感知的最大来源:
- 线程池(同步):当线程发起一个数据库查询,它会阻塞(Blocking)。
- 此时线程虽然不占 CPU,但它占着坑(内存和线程名额)。如果数据库慢,坑很快填满,服务器就“假死”了。
- 协程(异步):当协程遇到
await,它会挂起(Suspending)。 - 它会把“坑”让出来给别人用。主线程(Event Loop)可以继续处理成百上千个其他请求。
- 结论:在 高延迟 I/O(比如调用 OpenAI API 需要 10 秒)场景下,协程的吞吐量远高于线程池。
4. CPU 利用率与缓存友好性 (Cache Locality)
- 线程池:由于线程经常在不同的 CPU 核心间飘移,CPU 的 L1/L2 缓存命中率会下降。
- 协程:通常运行在单线程或极少数线程上,CPU 缓存非常热,执行效率更高。
总结与补充
| | 协程 (FastAPI / Python Async) |
|---|
| 切换开销 | | |
| 内存消耗 | | |
| 并发上限 | | |
| 适合场景 | CPU 密集型 | I/O 密集型 |
| 代码复杂度 | | |
协程有一个致命的弱点:由于只有一个线程,一个计算密集型(计算耗时长)的任务会阻塞其他任务的运算。其他任务无法抢占该任务执行,只能长时间等待至让出。
解释: 如果你在 async def 里写了一个耗时 5 秒的 CPU 计算(比如图片压缩、大循环):
- 后果:对于协程模型,整个 Event Loop 会被卡死 5 秒。这 5 秒内,全服务器所有的请求都无法响应!
- 而在 线程池 中,卡死的只是那 20 个线程中的一个,其他 19 个还能继续干活。由于context switch,这个耗时5s的线程只会占用cpu至多一个时间(比如15ms)后就让出给其他线程,而不会长时间阻塞其他任务的运行。
协程关注的是单线程复用,实践中会结合多线程实现 task和执行的彻底解耦(虚拟线程(Virtual Threads)/ 轻量级线程)
异步编程的概念和实现
在理解协程的概念后,理解异步编程就变得很容易了。
定义: 异步编程是一种非阻塞的控制流模式。它的核心在于允许程序在等待耗时操作完成时,不停止整体的执行步伐,而是转去处理其他任务。常见的耗时操作包括网络请求和磁盘读写。
表现: 从代码表现上看,它通常呈现为一种顺序的控制流,但实际上会在执行到关键语句时主动挂起。
目标: 异步编程的主要目标是大幅提升系统的并发能力与资源利用率。这种模式能够有效避免中央处理器在等待输入输出操作时处于闲置状态。
在技术演进的过程中,开发者们探索出了多种实现异步的方式:主要包含 回调函数、Promise/Future模型、协程模型。
1. 回调函数 (Callbacks)这是最早期的异步实现方案。它的工作原理是将一个函数作为参数传递给异步操作,当操作完成时触发该函数。这种方式的致命缺点是容易陷入嵌套过深的回调地狱。代码逻辑会变得异常破碎且难以维护。
// 回调函数示例fs.readFile('file1.txt', (err, data1) => { fs.readFile('file2.txt', (err, data2) => {console.log("两个文件都读取完毕") })})
2. Promise 与 Future为了解决回调嵌套的问题,Promise 提供了一种对回调的封装机制。它通过链式调用的方式让代码结构变得更加扁平。不过这种方式依然需要编写大量的 .then() 方法,整体逻辑在视觉上仍然不够直观。
// Promise 示例readFileAsync('file1.txt') .then(data1 => readFileAsync('file2.txt')) .then(data2 => {console.log("两个文件都读取完毕") })
3. 协程 (Async/Await)协程技术是目前最优雅的异步解决方案。它允许开发者使用完全同步的代码结构来编写复杂的异步逻辑。当程序运行到带有 await 关键字的语句时,协程会主动挂起,并将控制权交还给底层的事件循环。一旦底层的输入输出操作完成,事件循环会唤醒并恢复该协程的执行。
// Async/Await 示例asyncfunctionreadFiles() {const data1 = await readFileAsync('file1.txt')const data2 = await readFileAsync('file2.txt')console.log("两个文件都读取完毕")}
Python asyncio 异步编程
参考阅读:Python asyncio 模块 | 菜鸟教程Python 通过 asyncio 标准库实现异步编程,其三个核心构件如下:
一、异步编程概念在 Python 中的应用
- 协程(Coroutine):使用
async def 定义协程函数,使用 await 挂起当前协程并将控制权交还事件循环:
import asyncioasyncdefsay_hello(): print("Hello")await asyncio.sleep(1) # 挂起,不阻塞线程 print("World")
- 任务(Task):
Task 是协程的封装,通过 asyncio.create_task() 创建后会立即被事件循环调度,是实现并发的主要手段:
asyncdefmain(): task = asyncio.create_task(say_hello())await task # 等待任务完成
- 事件循环入口: Python 3.7+ 推荐使用
asyncio.run() 作为程序入口,它负责创建事件循环、运行顶层协程并在结束后关闭循环:
asyncio.run(main())
补充:“异步具有传染性”。不能在同步函数里直接 await,以及 asyncio.run 是异步的“收敛点”
二、同步 vs 异步 代码示例
以获取多个 URL 为例,我们理解 Python 中如何进行并发编程编码。
同步版本
需要安装:pip install requests
import requestsimport timedeffetch_url(url): print(f"开始同步获取: {url}")# requests.get 是阻塞的,必须等待服务器响应 response = requests.get(url) print(f"完成获取,状态码: {response.status_code}")return response.textdefmain_sync():# 使用 httpbin 的 delay 接口模拟 2 秒网络延迟 urls = [f"https://httpbin.org/delay/2?id={i}"for i in range(3)] start = time.time() results = []for url in urls: results.append(fetch_url(url)) print(f"\n[同步模式] 总耗时: {time.time() - start:.2f} 秒")if __name__ == "__main__": main_sync()
输出:
开始同步获取: https://httpbin.org/delay/2?id=0完成获取,状态码: 200开始同步获取: https://httpbin.org/delay/2?id=1完成获取,状态码: 200开始同步获取: https://httpbin.org/delay/2?id=2完成获取,状态码: 200[同步模式] 总耗时: 6.42 秒 # 约 **6.1 - 6.5 秒**(2s * 3 + 网络开销)。
异步版本
需要安装 aiohttp:pip install aiohttp
import asyncioimport aiohttpimport timeasyncdeffetch_url_async(session, url): print(f"开始获取: {url}")asyncwith session.get(url) as response:await asyncio.sleep(2) # 非阻塞等待,事件循环可调度其他任务 text = await response.text() print(f"完成获取: {url}")returnf"来自 {url} 的数据 (长度: {len(text)})"asyncdefmain_async(): urls = ["https://httpbin.org/get", "https://httpbin.org/delay/1", "https://httpbin.org/headers"]asyncwith aiohttp.ClientSession() as session: tasks = [asyncio.create_task(fetch_url_async(session, url)) for url in urls] results = await asyncio.gather(*tasks) # 并发执行,收集全部结果return resultsstart = time.time()results = asyncio.run(main_async())print(f"总耗时: {time.time() - start:.2f} 秒") # ≈ 2.10 秒
输出:
开始获取: https://httpbin.org/get开始获取: https://httpbin.org/delay/1开始获取: https://httpbin.org/headers完成获取: https://httpbin.org/headers完成获取: https://httpbin.org/get完成获取: https://httpbin.org/delay/1总耗时: 2.10 秒
关键差异:异步版本中三个请求同时发出,总耗时接近单次最慢请求,而非三次之和。
三、重要接口速查
3.1 常用高级函数
| | |
|---|
asyncio.run(coro) | | debug=True |
asyncio.create_task(coro) | 将协程包装为 Task 并立即调度,实现并发的核心 | name= |
asyncio.gather(*aws) | | return_exceptions=True |
asyncio.sleep(delay) | 异步休眠,不阻塞线程(对比 time.sleep) | result= |
asyncio.wait(aws) | 并发运行并按条件返回,结果为 (done, pending) 集合 | timeout= |
asyncio.wait_for(coro, timeout) | 为单个协程设置超时,超时抛出 TimeoutError | timeout |
asyncio.to_thread(func) | | *args, **kwargs |
asyncio.shield(coro) | | |
3.2 同步原语
| | |
|---|
asyncio.Lock() | | async with lock: |
asyncio.Event() | | await event.wait() |
asyncio.Queue() | | await queue.put(item) |
asyncio.Semaphore(n) | | async with sem: |
3.3 网络与子进程
| |
|---|
asyncio.open_connection(host, port) | 建立 TCP 连接,返回 (reader, writer) |
asyncio.start_server(handler, host, port) | |
asyncio.create_subprocess_exec(cmd) | |
四、注意事项
- ⚠️ 避免阻塞调用:协程中禁止使用
time.sleep()、同步文件 I/O 等阻塞操作,应使用对应的异步版本或 asyncio.to_thread()。 - ⚠️ 版本要求:
asyncio.run() 需 Python 3.7+,asyncio.to_thread() 需 Python 3.9+。 - ⚠️ 异常处理:被取消的任务会抛出
CancelledError,需在协程内妥善捕获处理。 - 💡 调试技巧:设置环境变量
PYTHONASYNCIODEBUG=1 或 asyncio.run(main(), debug=True) 启用调试模式。
补充:Python的 yield 底层
在上面的教程中,我们 “顺序但允许中途“挂起” “ 的需求 实际上就依赖了 yield 关键字。
Python 中的 yield 是一个非常强大且优雅的关键字,它允许你编写看起来像是普通顺序执行的函数,但实际上这个函数可以在执行过程中“暂停”(挂起)并交出控制权,之后还能从暂停的地方“恢复”执行。我们下面将介绍 yield 是如何实现这一点的。
1. 概念上的转化:从顺序代码到状态机
在概念上,当 Python 编译器看到函数体内包含 yield 关键字时,它就不会再把这个函数当作普通的函数来编译了。它会将其转化为一个状态机(State Machine)。
每次你调用 next() 或通过 for 循环迭代这个生成器时,实际上是在驱动这个状态机从当前状态转移到下一个状态。yield 语句就是状态转移的边界。
例子:一个简单的生成器
假设我们有这样一段顺序写法的代码:
defsimple_generator(): print("Step 1: 初始化")yield'A' print("Step 2: 中间处理")yield'B' print("Step 3: 结束")yield'C'
等价的状态机实现
在概念上,Python 编译器大概把上面的代码等价转化为了一个类似下面这样的类(状态机):
classSimpleStateMachine:def__init__(self):# 初始状态设为 0 self.state = 0def__iter__(self):return selfdef__next__(self):if self.state == 0: print("Step 1: 初始化") self.state = 1# 转移到下一个状态return'A'# yield 的值elif self.state == 1: print("Step 2: 中间处理") self.state = 2# 转移到下一个状态return'B'# yield 的值elif self.state == 2: print("Step 3: 结束") self.state = 3# 转移到结束状态return'C'# yield 的值else:# 状态机结束,抛出 StopIteration 异常raise StopIteration()# 使用方式和生成器完全一样# gen = SimpleStateMachine()# print(next(gen))
转化逻辑分析:
- 局部变量变为实例属性:如果
simple_generator 中有局部变量,在状态机中它们会变成 self.xxx 这样的实例属性,这样即使函数(__next__)执行结束(return),变量的值依然被保存在对象内部。 yield 切割代码块:每一个 yield 语句都像是一把刀,把原来连续的代码切成了多个代码块。- 状态指针(Instruction Pointer):
self.state 记录了当前执行到了哪一个被切割的代码块。每次调用 __next__,就根据当前的 state 执行对应的代码块,然后更新 state,最后返回 yield 后面的值。
通过这种转化,你写的“顺序代码”实际上被拆解成了多次独立的函数调用,每次调用只执行一段,从而实现了“挂起”和“恢复”的错觉。
2. 底层实现:栈帧(Stack Frame)的保留
虽然状态机模型很好地解释了概念,但在 CPython 的底层实现中,并没有真的去生成一个庞大的 if-elif 状态机类。Python 采用了更高效的做法:操作栈帧(Stack Frame)。
普通函数的栈帧存在于系统的调用栈(Call Stack)中,遵循“后进先出”原则。
- 调用函数时,在内存(调用栈)中分配一个“栈帧”,里面存放着局部变量、指令指针(当前执行到哪行代码)等信息。
- 函数执行完毕(遇到
return),这个栈帧就会被销毁,局部变量随之消失。
调用函数 foo() ->+---------------------------+| 系统调用栈 (Call Stack) |+---------------------------+| [ 栈帧: foo ] | <-- 正在执行| - 局部变量 x = 10 || - 指令指针 IP = 5 |+---------------------------+| [ 栈帧: main ] |+---------------------------+执行 return ->+---------------------------+| 系统调用栈 (Call Stack) |+---------------------------+| [ 栈帧: main ] | <-- foo 的栈帧被弹出并销毁,x 消失+---------------------------+
而在包含 yield 的生成器函数中:
- 调用
simple_generator() 时,并不会执行函数体内的代码,而是直接返回一个生成器对象。这个生成器对象内部包装了一个堆(Heap)上分配的栈帧。

- 当你调用
next(gen) 时,Python 解释器会将这个保存的栈帧推入当前的执行栈中,开始执行代码。

- 记录下当前的指令指针(Instruction Pointer,即下一条要执行的字节码位置)。
- 将这个栈帧从当前的执行栈中弹出(但不销毁),交出 CPU 控制权。

- 当再次调用
next(gen) 时,解释器找到之前保存的那个栈帧,重新推入执行栈,并从上次记录的指令指针处继续往下执行。

总结
- 从表面上看:
yield 让你用写同步、顺序代码的方式,写出了可以暂停和恢复的逻辑。 - 从概念上看:编译器将你的顺序代码按
yield 切割,转化成了一个状态机。局部变量变成了状态机的内部状态,yield 变成了状态转移的触发器。 - 从底层实现看:CPython 通过将栈帧对象(Frame Object)保存在堆内存中,而不是在函数返回时销毁它,从而实现了执行上下文的保存和恢复。指令指针(
f_lasti)记录了“挂起”的具体位置。
省流总结
文章最后提供一个全文的省流takeaway:
异步编程的基本概念
并发模型的演进
本节从一个具体的执行场景出发,逐步引出了四种并发模型。它们的核心区别在于:任务与执行线程之间的耦合程度,以及IO等待期间系统资源的利用效率。 假设:N个任务,每个任务等待T秒。
关键:
- 同步模型(串行/多线程/线程池)的根本缺陷在于IO等待期间线程依然占用系统资源。并发量越高,被阻塞线程积压越多,最终耗尽内存。
- 协程的核心创新是将任务切换从操作系统层面下沉到用户态。切换开销从微秒级降至纳秒级,并且单线程即可调度海量并发任务。
- 协程并非银弹。它天然适合IO密集型场景,但一旦遇到耗时的CPU计算,必须将其移交给独立的线程池或进程池,避免主事件循环阻塞。
- 补充:协程还可以与多线程结合,将海量任务动态映射到多个物理线程上(M:N调度),并通过工作窃取机制实现跨线程的负载均衡。
快速对比:
| 线程池 (Spring Boot / Flask 传统) | 协程 (FastAPI / Python Async) |
|---|
| 切换开销 | | |
| 内存消耗 | | |
| 并发上限 | | |
| 适合场景 | CPU 密集型 | I/O 密集型 |
| 代码复杂度 | | |
异步编程的概念
异步编程的核心是非阻塞,即在等待 IO 耗时操作时,让 CPU 转去处理其他任务以提升并发效率。其实现演进经历了三个阶段:
- 回调函数:通过传参执行后续逻辑,但易陷入代码破碎的“回调地狱”。
- Promise:通过链式调用封装异步操作,解决了嵌套问题但视觉上仍不够直观。
- 协程 (Async/Await):目前最优雅的方案,利用挂起与恢复机制,让开发者能用同步的代码结构编写高性能的异步逻辑。
Python Asynio异步编程
短示例代码
asyncdefmain(): task = asyncio.create_task(say_hello())await task # 等待任务完成asyncio.run(main())
长示例代码:
import asyncioimport aiohttpimport timeasyncdeffetch_url_async(session, url): print(f"开始获取: {url}")asyncwith session.get(url) as response:await asyncio.sleep(2) # 非阻塞等待,事件循环可调度其他任务 text = await response.text() print(f"完成获取: {url}")returnf"来自 {url} 的数据 (长度: {len(text)})"asyncdefmain_async(): urls = ["https://httpbin.org/get", "https://httpbin.org/delay/1", "https://httpbin.org/headers"]asyncwith aiohttp.ClientSession() as session: tasks = [asyncio.create_task(fetch_url_async(session, url)) for url in urls] results = await asyncio.gather(*tasks) # 并发执行,收集全部结果return resultsstart = time.time()results = asyncio.run(main_async())print(f"总耗时: {time.time() - start:.2f} 秒") # ≈ 2.10 秒
| | |
|---|
asyncio.run(coro) | | debug=True |
asyncio.create_task(coro) | 将协程包装为 Task 并立即调度,实现并发的核心 | name= |
asyncio.gather(*aws) | | return_exceptions=True |
asyncio.sleep(delay) | 异步休眠,不阻塞线程(对比 time.sleep) | result= |
asyncio.wait(aws) | 并发运行并按条件返回,结果为 (done, pending) 集合 | timeout= |
asyncio.wait_for(coro, timeout) | 为单个协程设置超时,超时抛出 TimeoutError | timeout |
asyncio.to_thread(func) | | *args, **kwargs |
asyncio.shield(coro) | | |
封面图源自:电视动画 轻音少女!!(K-ON!! 2010)