一、什么是协程?
协程(Coroutine) 是一种比线程更轻量级的并发编程模型。在 Python 中,协程最早是基于生成器实现的,通过 yield 表达式实现函数的中断和恢复,并且可以通过 send() 方法向协程发送数据。
简单来说:协程是可以暂停执行、可以接收外部传入数据的生成器函数。
defsimple_coroutine():
"""最简单的协程示例"""
print("协程开始")
x = yield
print(f"收到数据: {x}")
y = yield
print(f"再次收到数据: {y}")
coro = simple_coroutine()
next(coro) # 启动协程,执行到第一个 yield
coro.send(10) # 发送数据,协程恢复执行
coro.send(20) # 再次发送数据
二、为什么需要协程?
- • 协作式多任务:协程主动让出控制权,不涉及操作系统调度,开销小。
- • 状态保持:协程自动保存执行状态,无需手动管理。
- • 双向通信:通过
send() 和 yield,协程可以与调用者双向传递数据。 - • 简化异步代码:基于协程可以写出看起来像同步代码的异步程序(
async/await 的基础)。
三、生成器 vs 协程
| | |
| | |
| | |
| next() | |
| | |
| | |
| yield value | value = yield |
四、协程的基本用法
4.1 最简单的协程
defecho():
"""回显协程:接收数据并打印"""
print("协程已启动")
whileTrue:
received = yield
print(f"收到: {received}")
coro = echo()
next(coro) # 必须先激活协程
coro.send("Hello")
coro.send("World")
coro.send("Python")
# 协程不会结束,因为循环是无限的
4.2 协程的四态
协程有四种状态(通过 inspect.getgeneratorstate() 查看):
from inspect import getgeneratorstate
defcoroutine():
x = yield
y = yield
coro = coroutine()
print(getgeneratorstate(coro)) # GEN_CREATED(已创建,未启动)
next(coro)
print(getgeneratorstate(coro)) # GEN_SUSPENDED(暂停在 yield)
coro.send(10)
print(getgeneratorstate(coro)) # GEN_SUSPENDED
coro.send(20) # StopIteration
# print(getgeneratorstate(coro)) # GEN_CLOSED(已关闭)
4.3 带返回值的协程
defaccumulator():
total = 0
whileTrue:
value = yield total
if value isNone:
break
total += value
return total
acc = accumulator()
next(acc) # 启动
print(acc.send(10)) # 10
print(acc.send(20)) # 30
print(acc.send(30)) # 60
try:
acc.send(None) # 结束协程
except StopIteration as e:
print(f"最终结果: {e.value}") # 最终结果: 60
五、协程的装饰器(自动激活)
手动调用 next() 容易忘记,可以使用装饰器自动激活。
defcoroutine(func):
"""装饰器:自动激活协程"""
defwrapper(*args, **kwargs):
gen = func(*args, **kwargs)
next(gen) # 自动激活
return gen
return wrapper
@coroutine
defaccumulator():
total = 0
whileTrue:
value = yield total
if value isNone:
break
total += value
return total
acc = accumulator() # 已经自动激活
print(acc.send(10)) # 10
print(acc.send(20)) # 30
六、协程的异常处理
使用 throw() 方法可以向协程内部抛出异常。
defcoroutine_with_exception():
try:
whileTrue:
value = yield
print(f"正常处理: {value}")
except ValueError:
print("捕获到 ValueError")
except GeneratorExit:
print("协程被关闭")
else:
print("协程正常结束")
finally:
print("清理资源")
coro = coroutine_with_exception()
next(coro)
coro.send(10)
coro.send(20)
coro.throw(ValueError, "发生错误")
# coro.close() # 关闭协程,会触发 GeneratorExit
七、协程与管道
多个协程可以串联成处理管道,数据在协程之间流动。
@coroutine
defsource(target):
"""数据源:产生数据并发送给目标协程"""
for i inrange(10):
target.send(i)
target.close()
@coroutine
deffilter_odd(target):
"""过滤奇数"""
try:
whileTrue:
value = yield
if value % 2 == 0:
target.send(value)
except GeneratorExit:
target.close()
@coroutine
defsink():
"""数据终点:接收并处理数据"""
try:
whileTrue:
value = yield
print(f"收到: {value}")
except GeneratorExit:
print("管道结束")
# 构建管道:source -> filter_odd -> sink
s = sink()
f = filter_odd(s)
source(f)
八、协程与 yield from
yield from 可以将协程的执行委托给另一个协程,简化嵌套。
defcoro_a():
for i inrange(3):
value = yieldf"a: {i}"
print(f"a 收到: {value}")
defcoro_b():
for i inrange(3):
value = yieldf"b: {i}"
print(f"b 收到: {value}")
defcoro_main():
a = coro_a()
b = coro_b()
next(a); next(b)
for i inrange(3):
result = yieldfrom a
print(f"main 从 a 收到: {result}")
result = yieldfrom b
print(f"main 从 b 收到: {result}")
main = coro_main()
next(main)
main.send("A1")
main.send("B1")
九、实战案例
9.1 工作队列(生产者-消费者)
from collections import deque
import time
@coroutine
defworker(task_queue, result_queue):
"""工作协程:从队列取任务执行"""
whileTrue:
task = yield
print(f"处理任务: {task}")
time.sleep(0.5) # 模拟工作
result_queue.append(f"完成: {task}")
@coroutine
deftask_dispatcher(workers, task_queue, result_queue):
"""任务分发器:将任务分配给空闲的工作协程"""
worker_index = 0
whileTrue:
task = yield
workers[worker_index].send(task)
worker_index = (worker_index + 1) % len(workers)
defproducer(dispatcher, task_queue):
"""生产者:产生任务"""
for i inrange(10):
task = f"任务-{i}"
print(f"生产: {task}")
dispatcher.send(task)
time.sleep(0.2)
dispatcher.close()
# 创建队列和协程
task_queue = deque()
result_queue = deque()
workers = [worker(task_queue, result_queue) for _ inrange(3)]
dispatcher = task_dispatcher(workers, task_queue, result_queue)
next(dispatcher)
# 启动生产者
producer(dispatcher, task_queue)
9.2 数据流处理管道
@coroutine
defproducer(target):
"""生产数据"""
for i inrange(1, 101):
target.send(i)
target.close()
@coroutine
defmultiply(target, factor=2):
"""乘以系数"""
try:
whileTrue:
value = yield
target.send(value * factor)
except GeneratorExit:
target.close()
@coroutine
deffilter_condition(target, condition):
"""条件过滤"""
try:
whileTrue:
value = yield
if condition(value):
target.send(value)
except GeneratorExit:
target.close()
@coroutine
defcollector():
"""收集结果"""
results = []
try:
whileTrue:
value = yield
results.append(value)
print(f"收集: {value}")
except GeneratorExit:
print(f"总计收集: {len(results)} 条")
# 构建管道:producer -> multiply(3) -> filter(>50) -> collector
c = collector()
f = filter_condition(c, lambda x: x > 50)
m = multiply(f, 3)
producer(m)
9.3 协程实现状态机
defstate_machine():
"""用协程实现状态机"""
state = "IDLE"
print(f"初始状态: {state}")
whileTrue:
if state == "IDLE":
event = yield
if event == "start":
state = "RUNNING"
print("切换到 RUNNING")
else:
print(f"Ignore event: {event}")
elif state == "RUNNING":
event = yield
if event == "stop":
state = "STOPPED"
print("切换到 STOPPED")
elif event == "pause":
state = "PAUSED"
print("切换到 PAUSED")
else:
print(f"运行中处理: {event}")
elif state == "PAUSED":
event = yield
if event == "resume":
state = "RUNNING"
print("切换回 RUNNING")
elif event == "stop":
state = "STOPPED"
print("切换到 STOPPED")
elif state == "STOPPED":
event = yield
print(f"已停止,忽略事件: {event}")
sm = state_machine()
next(sm)
sm.send("start")
sm.send("工作1")
sm.send("工作2")
sm.send("pause")
sm.send("resume")
sm.send("工作3")
sm.send("stop")
sm.send("任何事件")
十、协程的局限性
- 1. 不支持并行:协程本质是单线程的,无法利用多核 CPU。
- 4. 学习曲线:理解
yield、send()、throw()、close() 需要时间。
现代 Python 的替代方案:
- • Python 3.5+ 引入了
async/await 语法,提供了更高级的协程支持(原生协程)。 - •
asyncio 库提供了事件循环和异步 I/O 支持,更适合实际开发。
十一、总结
- • 协程是基于生成器实现的双向通信机制,可以暂停执行并接收外部数据。
- • 协程有四种状态:
GEN_CREATED、GEN_RUNNING、GEN_SUSPENDED、GEN_CLOSED。 - • 协程的启动需要先调用
next() 或 send(None) 激活。 - •
send() 向协程发送数据,throw() 向协程抛出异常,close() 关闭协程。 - • 协程可以实现轻量级的协作式多任务、数据流管道、状态机等。
- • 现代 Python 开发中,
async/await 和 asyncio 是协程的主流用法,但理解生成器协程是掌握原生协程的基础。
协程是 Python 并发编程的重要组成部分,虽然现代异步编程更多使用 async/await,但理解生成器协程的原理对于深入掌握 Python 的执行模型非常有帮助。