今天您将学习什么
- 什么是迭代器和可迭代对象
- 生成器函数和生成器表达式
- yield语句和生成器状态
- 生成器的实际应用
- 真实世界示例:文件处理、数据流、无限序列
什么是迭代器?
迭代器是Python中用于遍历可迭代对象的对象。迭代器实现了__iter__()和__next__()方法,允许您逐个访问集合中的元素。
迭代器的特点:
- 内存效率:不需要一次性加载所有数据
- 惰性求值:只在需要时才计算下一个值
- 单向遍历:只能向前遍历,不能后退
- 状态保持:记住当前遍历位置
1. 基本迭代器
创建自定义迭代器
class CountDown:
"""倒计时迭代器"""
def __init__(self, start):
self.start = start
self.current = start
def __iter__(self):
return self
def __next__(self):
if self.current <= 0:
raise StopIteration
else:
self.current -= 1
return self.current + 1
# 使用自定义迭代器
print("倒计时:")
for number in CountDown(5):
print(number, end=" ")
print()
# 手动使用迭代器
countdown = CountDown(3)
iterator = iter(countdown)
print(f"第一个值:{next(iterator)}")
print(f"第二个值:{next(iterator)}")
print(f"第三个值:{next(iterator)}")
try:
print(f"第四个值:{next(iterator)}")
except StopIteration:
print("迭代结束")
斐波那契迭代器
class FibonacciIterator:
"""斐波那契数列迭代器"""
def __init__(self, limit):
self.limit = limit
self.count = 0
self.a, self.b = 0, 1
def __iter__(self):
return self
def __next__(self):
if self.count >= self.limit:
raise StopIteration
if self.count == 0:
result = self.a
elif self.count == 1:
result = self.b
else:
result = self.a + self.b
self.a, self.b = self.b, result
self.count += 1
return result
# 使用斐波那契迭代器
print("斐波那契数列:")
fib_iter = FibonacciIterator(10)
for num in fib_iter:
print(num, end=" ")
print()
2. 生成器函数
基本生成器
def count_up_to(n):
"""生成器函数:计数到n"""
i = 1
while i <= n:
yield i
i += 1
def fibonacci_generator(limit):
"""斐波那契数列生成器"""
a, b = 0, 1
count = 0
while count < limit:
yield a
a, b = b, a + b
count += 1
def even_numbers_generator(start, end):
"""偶数生成器"""
for i in range(start, end + 1):
if i % 2 == 0:
yield i
# 使用生成器函数
print("计数到5:")
for num in count_up_to(5):
print(num, end=" ")
print()
print("斐波那契数列(前8项):")
for num in fibonacci_generator(8):
print(num, end=" ")
print()
print("偶数(10到20):")
for num in even_numbers_generator(10, 20):
print(num, end=" ")
print()
生成器状态
def stateful_generator():
"""有状态的生成器"""
print("生成器开始")
yield 1
print("生成器继续")
yield 2
print("生成器继续")
yield 3
print("生成器结束")
# 演示生成器状态
print("=== 生成器状态演示 ===")
gen = stateful_generator()
print("创建生成器")
print("获取第一个值:")
print(next(gen))
print("获取第二个值:")
print(next(gen))
print("获取第三个值:")
print(next(gen))
try:
print("尝试获取第四个值:")
print(next(gen))
except StopIteration:
print("生成器已结束")
真实世界示例1:文件处理生成器
def read_large_file(file_path, chunk_size=1024):
"""读取大文件的生成器"""
with open(file_path, 'r', encoding='utf-8') as file:
while True:
chunk = file.read(chunk_size)
if not chunk:
break
yield chunk
def read_lines_generator(file_path):
"""逐行读取文件的生成器"""
with open(file_path, 'r', encoding='utf-8') as file:
for line in file:
yield line.strip()
def filter_lines_generator(file_path, keyword):
"""过滤包含关键词的行"""
for line in read_lines_generator(file_path):
if keyword in line:
yield line
def csv_parser_generator(file_path):
"""CSV文件解析生成器"""
with open(file_path, 'r', encoding='utf-8') as file:
# 读取标题行
header = file.readline().strip().split(',')
for line in file:
if line.strip():
values = line.strip().split(',')
row = dict(zip(header, values))
yield row
# 创建示例文件
def create_sample_files():
"""创建示例文件"""
# 创建大文件
with open('large_file.txt', 'w', encoding='utf-8') as f:
for i in range(1000):
f.write(f"这是第{i+1}行数据,包含一些重要信息。\n")
# 创建CSV文件
with open('data.csv', 'w', encoding='utf-8') as f:
f.write("name,age,city\n")
f.write("张三,25,北京\n")
f.write("李四,30,上海\n")
f.write("王五,28,广州\n")
f.write("赵六,35,深圳\n")
# 使用文件处理生成器
print("=== 文件处理生成器示例 ===")
create_sample_files()
print("逐行读取文件:")
line_count = 0
for line in read_lines_generator('large_file.txt'):
if line_count < 5: # 只显示前5行
print(f" {line}")
line_count += 1
print(f" 总共读取了 {line_count} 行")
print("\n过滤包含'重要'的行:")
important_lines = list(filter_lines_generator('large_file.txt', '重要'))
print(f"找到 {len(important_lines)} 行包含'重要'的内容")
print("\n解析CSV文件:")
for row in csv_parser_generator('data.csv'):
print(f" {row}")
真实世界示例2:数据流处理
import random
import time
def data_stream_generator():
"""模拟数据流生成器"""
while True:
# 模拟传感器数据
data = {
'timestamp': time.time(),
'temperature': random.uniform(20, 30),
'humidity': random.uniform(40, 80),
'pressure': random.uniform(1000, 1020)
}
yield data
time.sleep(0.1) # 模拟数据采集间隔
def filter_data_generator(data_stream, condition):
"""数据过滤生成器"""
for data in data_stream:
if condition(data):
yield data
def transform_data_generator(data_stream, transform_func):
"""数据转换生成器"""
for data in data_stream:
yield transform_func(data)
def batch_data_generator(data_stream, batch_size):
"""数据批处理生成器"""
batch = []
for data in data_stream:
batch.append(data)
if len(batch) >= batch_size:
yield batch
batch = []
# 返回剩余数据
if batch:
yield batch
# 使用数据流处理
print("=== 数据流处理示例 ===")
# 创建数据流
data_stream = data_stream_generator()
# 过滤高温数据
high_temp_filter = lambda data: data['temperature'] > 25
high_temp_data = filter_data_generator(data_stream, high_temp_filter)
# 转换数据格式
def format_data(data):
return {
'time': time.strftime('%H:%M:%S', time.localtime(data['timestamp'])),
'temp': f"{data['temperature']:.1f}°C",
'humidity': f"{data['humidity']:.1f}%"
}
formatted_data = transform_data_generator(high_temp_data, format_data)
# 批处理数据
batched_data = batch_data_generator(formatted_data, 3)
print("处理数据流(显示前3批):")
for i, batch in enumerate(batched_data):
if i >= 3: # 只显示前3批
break
print(f"批次 {i+1}: {batch}")
真实世界示例3:无限序列生成器
def prime_generator():
"""质数生成器"""
def is_prime(n):
if n < 2:
return False
for i in range(2, int(n ** 0.5) + 1):
if n % i == 0:
return False
return True
n = 2
while True:
if is_prime(n):
yield n
n += 1
def collatz_sequence_generator(start):
"""Collatz序列生成器"""
n = start
yield n
while n != 1:
if n % 2 == 0:
n = n // 2
else:
n = 3 * n + 1
yield n
def sliding_window_generator(sequence, window_size):
"""滑动窗口生成器"""
window = []
for item in sequence:
window.append(item)
if len(window) > window_size:
window.pop(0)
if len(window) == window_size:
yield window.copy()
def infinite_counter_generator(start=0, step=1):
"""无限计数器生成器"""
current = start
while True:
yield current
current += step
# 使用无限序列生成器
print("=== 无限序列生成器示例 ===")
print("前10个质数:")
prime_gen = prime_generator()
for i, prime in enumerate(prime_gen):
if i >= 10:
break
print(prime, end=" ")
print()
print("\nCollatz序列(从27开始):")
collatz_gen = collatz_sequence_generator(27)
collatz_sequence = list(collatz_gen)
print(f"序列长度:{len(collatz_sequence)}")
print(f"序列:{collatz_sequence}")
print("\n滑动窗口(窗口大小3):")
counter_gen = infinite_counter_generator(1)
window_gen = sliding_window_generator(counter_gen, 3)
for i, window in enumerate(window_gen):
if i >= 5: # 只显示前5个窗口
break
print(f"窗口 {i+1}: {window}")
生成器和迭代器的最佳实践
推荐做法:
- 使用生成器处理大数据集
- 利用惰性求值节省内存
- 合理使用生成器表达式
- 注意生成器的状态管理
避免的做法:
- 对生成器进行多次迭代
- 在生成器中产生副作用
- 忽略内存效率考虑
- 过度使用无限生成器
高级生成器特性
生成器表达式
# 生成器表达式
squares = (x**2 for x in range(10))
print("平方数生成器:")
for square in squares:
print(square, end=" ")
print()
# 过滤和转换
even_squares = (x**2 for x in range(10) if x % 2 == 0)
print("偶数的平方:")
for square in even_squares:
print(square, end=" ")
print()
# 链式生成器表达式
numbers = range(1, 11)
filtered = (x for x in numbers if x % 2 == 0)
transformed = (x * 2 for x in filtered)
print("偶数乘以2:")
for num in transformed:
print(num, end=" ")
print()
生成器管道
def pipeline_generator(*generators):
"""生成器管道"""
def pipe(data):
for gen in generators:
data = gen(data)
return data
return pipe
# 示例管道
def filter_even(data):
return (x for x in data if x % 2 == 0)
def square(data):
return (x**2 for x in data)
def limit(data, n):
return (x for i, x in enumerate(data) if i < n)
# 创建管道
pipeline = pipeline_generator(filter_even, square, lambda data: limit(data, 5))
# 使用管道
numbers = range(1, 21)
result = pipeline(numbers)
print("管道处理结果:")
for num in result:
print(num, end=" ")
print()
回顾
今天您学习了:
- 迭代器和可迭代对象的概念
- 生成器函数的创建和使用
- yield语句和生成器状态管理
- 真实世界示例:文件处理、数据流、无限序列
生成器和迭代器是Python中处理大数据和优化内存使用的重要工具,掌握这些知识将让您能够编写更加高效的代码!