next():迭代控制的"导航器"
1. 基础用法:控制迭代器前进
next()函数用于从迭代器中获取下一个元素,支持默认值以防止迭代耗尽时的异常。
# 基本迭代器使用
numbers = [1, 2, 3, 4, 5]
iter_numbers = iter(numbers)
print(f"第一个元素: {next(iter_numbers)}") # 输出: 1
print(f"第二个元素: {next(iter_numbers)}") # 输出: 2
print(f"第三个元素: {next(iter_numbers)}") # 输出: 3
# 使用默认值处理迭代结束
print(f"继续获取: {next(iter_numbers, '迭代结束')}") # 输出: 4
print(f"继续获取: {next(iter_numbers, '迭代结束')}") # 输出: 5
print(f"迭代结束: {next(iter_numbers, '迭代结束')}") # 输出: 迭代结束
# 无默认值时的异常
try:
next(iter_numbers) # 已耗尽
except StopIteration as e:
print(f"捕获异常: {e}")
2. 实际应用:自定义迭代控制
class SmartIterator:
def __init__(self, data, max_retries=3):
self.data = iter(data)
self.max_retries = max_retries
self.retry_count = 0
def __iter__(self):
return self
def __next__(self):
try:
return next(self.data)
except StopIteration:
self.retry_count += 1
if self.retry_count <= self.max_retries:
print(f"迭代结束,重试 {self.retry_count}/{self.max_retries}")
# 可以重新初始化迭代器或其他恢复逻辑
raise StopIteration("已达到最大重试次数")
else:
raise
def safe_next(self, default=None):
"""安全获取下一个元素,支持默认值"""
try:
return next(self.data)
except StopIteration:
return default
def peek(self, default=None):
"""查看下一个元素但不消耗它"""
try:
# 保存当前状态
saved_data = list(self.data)
self.data = iter(saved_data)
if saved_data:
return saved_data[0]
except:
pass
return default
# 使用示例
data = [1, 2, 3, 4, 5]
smart_iter = SmartIterator(data)
print("基本迭代:")
for _ in range(3):
print(f"下一个: {smart_iter.safe_next()}")
print(f"查看下一个: {smart_iter.peek()}")
print(f"实际获取: {smart_iter.safe_next()}")
# 测试安全获取
while True:
item = smart_iter.safe_next(default="结束")
if item == "结束":
break
print(f"安全获取: {item}")
智能迭代器和基类设计
1 高级迭代器模式
class BufferedIterator:
"""带缓冲的迭代器"""
def __init__(self, iterable, buffer_size=2):
self.iterator = iter(iterable)
self.buffer_size = buffer_size
self.buffer = []
self._fill_buffer()
def _fill_buffer(self):
"""填充缓冲区"""
while len(self.buffer) < self.buffer_size:
try:
item = next(self.iterator)
self.buffer.append(item)
except StopIteration:
break
def __iter__(self):
return self
def __next__(self):
if not self.buffer:
raise StopIteration
item = self.buffer.pop(0)
self._fill_buffer()
return item
def peek(self, n=0):
"""查看前n个元素"""
if 0 <= n < len(self.buffer):
return self.buffer[n]
return None
def has_next(self):
"""检查是否有下一个元素"""
return len(self.buffer) > 0
class DataProcessor:
"""数据处理类,使用缓冲迭代器"""
def __init__(self, data_source):
self.iterator = BufferedIterator(data_source, buffer_size=3)
def process_with_lookahead(self):
"""带预读的数据处理"""
results = []
while self.iterator.has_next():
current = next(self.iterator)
# 查看下一个元素
next_item = self.iterator.peek()
if next_item is not None:
# 根据下一个元素决定处理方式
if isinstance(next_item, (int, float)):
processed = current * 2
else:
processed = str(current).upper()
else:
processed = current
results.append(processed)
return results
# 使用示例
print("=== 缓冲迭代器示例 ===")
data = [1, 2, 3, "hello", "world", 4, 5]
buffered_iter = BufferedIterator(data, buffer_size=2)
print("缓冲迭代:")
for _ in range(3):
print(f"当前: {next(buffered_iter)}")
print(f"预读: {buffered_iter.peek()}")
# 数据处理示例
processor = DataProcessor(data)
result = processor.process_with_lookahead()
print(f"处理结果: {result}")
2. 迭代器控制模式
class IteratorController:
"""迭代器控制器"""
@staticmethod
def batch_process(iterator, batch_size, process_func):
"""批量处理迭代器元素"""
batch = []
for item in iterator:
batch.append(item)
if len(batch) == batch_size:
yield process_func(batch)
batch = []
if batch:
yield process_func(batch)
@staticmethod
def conditional_next(iterator, condition_func, default=None):
"""条件获取下一个元素"""
for item in iterator:
if condition_func(item):
return item
return default
@staticmethod
def peekable_iterator(iterable):
"""创建可预读的迭代器"""
iterator = iter(iterable)
class Peekable:
def __init__(self):
self._buffer = []
def __iter__(self):
return self
def __next__(self):
if self._buffer:
return self._buffer.pop(0)
return next(iterator)
def peek(self, n=0):
"""预读第n个元素(0表示下一个)"""
while len(self._buffer) <= n:
try:
self._buffer.append(next(iterator))
except StopIteration:
return None
return self._buffer[n]
def has_next(self):
"""检查是否有更多元素"""
if self._buffer:
return True
try:
self._buffer.append(next(iterator))
return True
except StopIteration:
return False
return Peekable()
# 使用示例
print("=== 迭代器控制器示例 ===")
controller = IteratorController()
# 批量处理
data = list(range(10))
iterator = iter(data)
print("批量处理结果:")
for batch in controller.batch_process(iterator, 3, sum):
print(f"批次和: {batch}")
# 条件获取
numbers = [1, 3, 5, 2, 4, 6]
result = controller.conditional_next(
iter(numbers),
condition_func=lambda x: x > 3,
default=-1
)
print(f"第一个大于3的数: {result}")
# 可预读迭代器
peekable = controller.peekable_iterator([1, 2, 3, 4, 5])
print(f"预读下一个: {peekable.peek()}")
print(f"实际获取: {next(peekable)}")
print(f"预读两个: {peekable.peek(0)}, {peekable.peek(1)}")
总结
本文详细解析了Python中基础但强大的功能:
next(iterator, default) - 迭代控制的导航器,迭代器控制、流式处理、条件迭代
关键知识点总结:
- •
next()控制迭代器前进,支持默认值处理迭代耗尽