🐍 迭代器与生成器 — 惰性求值的魔法
🕐 预计用时:2-3 小时 | 🎯 目标:掌握迭代协议、生成器函数、yield、惰性求值
📖 今日目录
- __iter__ 与 __next__ — 迭代协议
1. 什么是迭代?
迭代(Iteration)就是"一个一个地取元素"——for 循环就是迭代。
# 你每天都在迭代,只是不知道它叫这个名字
for i in [1, 2, 3]:
print(i)
for char in "hello":
print(char)
for line in open("data.txt"):
print(line.strip())
# 这些都是迭代!背后都是同一个协议:__iter__ + __next__
💡 为什么要学迭代器?
1. 理解 for 循环的底层原理
2. 处理大数据集时节省内存(惰性求值)
3. 自定义可迭代对象(让你的类支持 for 循环)
4. 生成器是异步编程的基础(Day38)
2. 可迭代对象 vs 迭代器
这两个概念容易混淆,但它们是不同的:
from collections.abc import Iterable, Iterator
# 可迭代对象(有 __iter__ 方法)
print(isinstance([1, 2, 3], Iterable)) # True
print(isinstance("hello", Iterable)) # True
print(isinstance(123, Iterable)) # False
# 迭代器(有 __iter__ 和 __next__ 方法)
it = iter([1, 2, 3])
print(isinstance(it, Iterator)) # True
print(isinstance([1, 2, 3], Iterator)) # False(列表不是迭代器!)
# 关键区别:
# 可迭代对象 → 可以创建迭代器(调用 iter())
# 迭代器 → 可以逐个取元素(调用 next())
# 手动迭代
nums = [10, 20, 30]
# iter() 从可迭代对象创建迭代器
it = iter(nums)
# next() 逐个取元素
print(next(it)) # 10
print(next(it)) # 20
print(next(it)) # 30
print(next(it)) # ❌ StopIteration(没有更多元素了)
# for 循环本质上就是:
# it = iter(iterable)
# while True:
# try:
# value = next(it)
# # 循环体
# except StopIteration:
# break
⚠️ 核心区别:
• 列表是可迭代对象,但不是迭代器(不能直接 next())
• iter([1,2,3]) 返回的才是迭代器
• 迭代器用完就没了(只能遍历一次)
3. __iter__ 与 __next__ — 迭代协议
迭代协议就是两个魔法方法:__iter__() 返回迭代器,__next__() 返回下一个元素。
class Countdown:
"""倒计时迭代器"""
def __init__(self, start):
self.current = start
def __iter__(self):
"""返回迭代器本身"""
return self
def __next__(self):
"""返回下一个值"""
if self.current <= 0:
raise StopIteration # 迭代结束
value = self.current
self.current -= 1
return value
# 使用
for num in Countdown(5):
print(num, end=" ")
# 输出: 5 4 3 2 1
# 手动使用迭代器
cd = Countdown(3)
print(next(cd)) # 3
print(next(cd)) # 2
print(next(cd)) # 1
# print(next(cd)) # StopIteration!
4. 自定义可迭代类
class NumberRange:
"""模拟 range() 的自定义可迭代对象
可以被多次迭代(每次创建新的迭代器)
"""
def __init__(self, start, end):
self.start = start
self.end = end
def __iter__(self):
"""每次调用返回一个新的迭代器"""
return NumberRangeIterator(self.start, self.end)
def __len__(self):
return self.end - self.start
class NumberRangeIterator:
"""迭代器类"""
def __init__(self, start, end):
self.current = start
self.end = end
def __iter__(self):
return self
def __next__(self):
if self.current >= self.end:
raise StopIteration
value = self.current
self.current += 1
return value
# 使用
nums = NumberRange(1, 6)
print(list(nums)) # [1, 2, 3, 4, 5]
# 可以多次迭代(因为 __iter__ 每次返回新迭代器)
for n in nums:
print(n, end=" ") # 1 2 3 4 5
print()
for n in nums:
print(n, end=" ") # 1 2 3 4 5(可以再来一次!)
💡 分离设计的好处:
• NumberRange(可迭代对象):存储配置,可以多次迭代
• NumberRangeIterator(迭代器):存储状态,只能用一次
• 这就是 Python 标准库 list、dict、str 的设计模式
5. yield 关键字 — 生成器函数
生成器(Generator)是用 yield 创建迭代器的简便方式——不用手写类,一个函数搞定。
# 普通函数:一次返回所有值
def countdown_list(n):
result = []
while n > 0:
result.append(n)
n -= 1
return result
# 生成器函数:每次 yield 一个值
def countdown_gen(n):
while n > 0:
yield n # 暂停,返回值,下次从这里继续
n -= 1
# 使用
for num in countdown_gen(5):
print(num, end=" ")
# 输出: 5 4 3 2 1
# yield 的神奇之处:函数状态被保留!
def simple_gen():
print("第一步")
yield 1
print("第二步")
yield 2
print("第三步")
yield 3
print("结束")
gen = simple_gen()
print(next(gen)) # 执行到第一个 yield
# 输出:
# 第一步
# 1
print(next(gen)) # 从第一个 yield 后继续,执行到第二个 yield
# 输出:
# 第二步
# 2
print(next(gen)) # 从第二个 yield 后继续
# 输出:
# 第三步
# 3
# next(gen) # StopIteration(函数执行完毕)
💡 yield 的本质:
1. 遇到 yield → 暂停函数,返回值
2. 下次 next() → 从暂停处继续执行
3. 函数执行完 → 抛出 StopIteration
4. 函数的局部变量全部保留!
6. 生成器的工作原理
# 生成器 vs 列表:内存对比
import sys
# 列表:一次性创建所有元素
big_list = [i for i in range(1000000)]
print(f"列表大小: {sys.getsizeof(big_list) / 1024 / 1024:.1f} MB") # ~8 MB
# 生成器:每次只创建一个元素
big_gen = (i for i in range(1000000))
print(f"生成器大小: {sys.getsizeof(big_gen)} bytes") # ~200 bytes!
# 生成器比列表节省了几万倍内存!
# 生成器只能遍历一次!
gen = (i * 2 for i in range(5))
# 第一次遍历
for x in gen:
print(x, end=" ") # 0 2 4 6 8
print()
# 第二次遍历(什么都没有!)
for x in gen:
print(x, end=" ") # (空)
print()
# 生成器是"惰性"的——按需计算
def fibonacci():
"""无限斐波那契数列"""
a, b = 0, 1
while True:
yield a
a, b = b, a + b
# 取前 10 个斐波那契数
fib = fibonacci()
for _ in range(10):
print(next(fib), end=" ")
# 0 1 1 2 3 5 8 13 21 34
7. 生成器表达式
生成器表达式是列表推导式的"惰性版"——用圆括号代替方括号。
# 列表推导式:一次性创建所有元素
squares_list = [x**2 for x in range(10)]
print(squares_list) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# 生成器表达式:按需生成元素
squares_gen = (x**2 for x in range(10))
print(squares_gen) # <generator object <genexpr> at 0x...>
# 生成器可以直接传给 sum/max/min/sorted 等函数
print(sum(x**2 for x in range(10))) # 285(不需要额外括号)
print(max(x**2 for x in range(10))) # 81
print(sorted(x**2 for x in range(10))) # [0, 1, 4, 9, ...]
# 配合 any/all
print(any(x > 50 for x in range(10))) # False
print(all(x >= 0 for x in range(10))) # True
💡 生成器表达式 vs 列表推导式:
• 遍历一次 → 用生成器(省内存)
• 遍历多次 → 用列表(可重复使用)
• 传给 sum/any/all → 用生成器(不需要额外括号)
8. yield from — 委托生成器
yield from 把迭代委托给另一个可迭代对象——简化嵌套生成器。
# 不用 yield from:需要手动 for 循环
def chain_manual(*iterables):
for it in iterables:
for item in it:
yield item
# 用 yield from:一行搞定
def chain(*iterables):
for it in iterables:
yield from it # 直接委托
# 使用
result = list(chain([1, 2], [3, 4], [5, 6]))
print(result) # [1, 2, 3, 4, 5, 6]
# 递归生成器:展平嵌套列表
def flatten(nested):
"""展平任意深度的嵌套列表"""
for item in nested:
if isinstance(item, (list, tuple)):
yield from flatten(item) # 递归委托
else:
yield item
# 使用
nested = [1, [2, 3], [4, [5, 6, [7, 8]]]]
print(list(flatten(nested)))
# [1, 2, 3, 4, 5, 6, 7, 8]
9. 实战:大文件逐行处理
处理几 GB 的日志文件?不能全部加载到内存,用生成器逐行读取。
def read_large_file(filepath, encoding="utf-8"):
"""逐行读取大文件(生成器)"""
with open(filepath, "r", encoding=encoding) as f:
for line in f:
yield line.rstrip("\n")
def filter_lines(lines, keyword):
"""过滤包含关键词的行(生成器)"""
for line in lines:
if keyword in line:
yield line
def parse_log_lines(lines):
"""解析日志行(生成器)"""
import re
pattern = r"\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\] \[(\w+)\] (.+)"
for line in lines:
match = re.match(pattern, line)
if match:
yield {
"time": match.group(1),
"level": match.group(2),
"message": match.group(3)
}
# 链式调用:读取 → 过滤 → 解析
lines = read_large_file("app.log")
errors = filter_lines(lines, "ERROR")
parsed = parse_log_lines(errors)
for entry in parsed:
print(f"🚨 [{entry['time']}] {entry['message']}")
# 统计大文件的行数(不占内存)
def count_lines(filepath):
"""统计文件行数"""
count = 0
for _ in read_large_file(filepath):
count += 1
return count
# 统计各级别日志数量
def count_by_level(filepath):
"""按日志级别统计"""
from collections import Counter
counter = Counter()
for line in read_large_file(filepath):
if "[ERROR]" in line:
counter["ERROR"] += 1
elif "[WARN]" in line:
counter["WARN"] += 1
elif "[INFO]" in line:
counter["INFO"] += 1
return counter
💡 生成器管道模式:
每个函数都是一个"处理器",通过生成器串联起来。
读取 → 过滤 → 转换 → 聚合
数据像水一样流过管道,每个环节只处理一行,内存占用恒定!
10. 实战:无限序列与管道
无限序列
def count_from(start=0, step=1):
"""无限计数器"""
n = start
while True:
yield n
n += step
def take(n, iterable):
"""从可迭代对象中取前 n 个"""
for i, item in enumerate(iterable):
if i >= n:
break
yield item
def skip(n, iterable):
"""跳过前 n 个"""
for i, item in enumerate(iterable):
if i >= n:
yield item
# 组合使用
counter = count_from(1)
first_5 = take(5, counter)
print(list(first_5)) # [1, 2, 3, 4, 5]
# 从 10 开始,跳过 3 个,取 5 个
numbers = count_from(10)
result = list(take(5, skip(3, numbers)))
print(result) # [13, 14, 15, 16, 17]
数据处理管道
def read_data():
"""数据源"""
data = [
{"name": "张三", "age": 25, "salary": 10000},
{"name": "李四", "age": 30, "salary": 15000},
{"name": "王五", "age": 22, "salary": 8000},
{"name": "赵六", "age": 35, "salary": 20000},
{"name": "钱七", "age": 28, "salary": 12000},
]
for item in data:
yield item
def filter_by(data, key, min_val=None, max_val=None):
"""按条件过滤"""
for item in data:
value = item[key]
if min_val is not None and value < min_val:
continue
if max_val is not None and value > max_val:
continue
yield item
def transform(data, func):
"""转换数据"""
for item in data:
yield func(item)
def collect(data):
"""收集结果"""
return list(data)
# 构建管道:读取 → 过滤 → 转换 → 收集
pipeline = read_data()
pipeline = filter_by(pipeline, "age", min_val=25)
pipeline = filter_by(pipeline, "salary", min_val=10000)
pipeline = transform(pipeline, lambda x: f"{x['name']} ({x['age']}岁) 月薪{x['salary']}")
result = collect(pipeline)
for r in result:
print(f" {r}")
# 输出:
# 张三 (25岁) 月薪10000
# 李四 (30岁) 月薪15000
# 赵六 (35岁) 月薪20000
# 钱七 (28岁) 月薪12000
itertools 标准库
import itertools
# count: 无限计数
for i in itertools.count(10, 2):
if i > 20:
break
print(i, end=" ") # 10 12 14 16 18 20
# cycle: 无限循环
colors = itertools.cycle(["红", "绿", "蓝"])
for _, color in zip(range(6), colors):
print(color, end=" ") # 红 绿 蓝 红 绿 蓝
# chain: 连接多个可迭代对象
combined = list(itertools.chain([1, 2], [3, 4], [5, 6]))
print(combined) # [1, 2, 3, 4, 5, 6]
# islice: 切片迭代器
result = list(itertools.islice(itertools.count(), 5, 10))
print(result) # [5, 6, 7, 8, 9]
# groupby: 分组
data = sorted(["apple", "avocado", "banana", "blueberry", "cherry"], key=lambda x: x[0])
for key, group in itertools.groupby(data, key=lambda x: x[0]):
print(f"{key}: {list(group)}")
# a: ['apple', 'avocado']
# b: ['banana', 'blueberry']
# c: ['cherry']
11. 今日小结
| | |
|---|
| | __iter__() |
| | __iter__() |
| | yield |
| | (x for x in ...) |
| | yield from iterable |
核心要点
- ✅
iter(obj) 创建迭代器,next(it) 取下一个元素 - ✅
yield 暂停函数并保留状态,下次从暂停处继续 - ✅ 生成器节省内存:按需计算,不一次性创建所有元素
- ✅ 生成器表达式:
(x for x in ...) 替代 [x for x in ...] - ✅
yield from 委托迭代,简化嵌套生成器 - ✅ 生成器管道模式:读取 → 过滤 → 转换 → 聚合
- ✅
itertools 提供 count/cycle/chain/groupby 等工具
🎯 练习建议:
1. 写一个生成器,生成所有素数(无限序列)
2. 写一个 map_gen(func, iterable) 函数,模拟内置 map() 的惰性版本
3. 用生成器管道处理一个 CSV 文件:读取 → 过滤空行 → 转换类型 → 输出
📚 Day33 完成!明天学习闭包与装饰器 — 函数的"包装艺术"