周一早盘前,风控同事把 SSE 全年的 tick CSV 丢到你账户里——压缩前 18GB。需求是按 code 汇总全年的全市场成交额(turnover),9:00 开盘前发邮件。上次月报你写得很直接:rows = open(path).read().splitlines(),然后 Counter 累加,本机内存被它撑得换页换到 swap 区。同一段汇总逻辑,从「整批装进内存」改成「逐条流过」,需要的不是更猛的机器,而是把 for 循环背后的协议看清楚——以及 Python 里那一把让你写「流式管线」而不是「大列表」的零件。这一课讲三件事:迭代器协议、生成器函数与生成器表达式,以及 with 上下文管理器。
迭代器协议(iterator protocol)
for x in seq: 之所以「对什么都管用」,是因为 Python 把它退糖(desugar)成两个调用:iter(seq) 拿到一个迭代器对象,next(it) 反复取下一个值,直到对方抛 StopIteration。可迭代对象(iterable)就是 iter() 能接受的;迭代器(iterator)是 iter() 返回的——它必须实现 __next__(),而它的 __iter__() 通常返回自身。
亲手把这条退糖路径走一遍:
it = iter([10, 20, 30])
next(it) # 10
next(it) # 20
next(it) # 30
next(it) # raises StopIteration
for x in [10, 20, 30]: 的全部含义就是:
it = iter([10, 20, 30])
while True:
try:
x = next(it)
except StopIteration:
break
# ...loop body with x...
后面所有「自定义迭代」「无穷流」「管线」的写法都是这条退糖的变体。手写一个实现 __iter__ / __next__ 的类是一种办法,但 99% 的场景你应该用生成器函数(generator function)。
生成器函数:yield 一次就够
def 体里只要出现 yield,调用该函数不再「跑完返回结果」,而是返回一个 生成器对象(generator object):外层每次 next() 都从上一次 yield 暂停处继续,直到函数走完或 return,自动抛 StopIteration。
把开头的需求落成代码——逐行读 ticks.csv(列:code,price,volume),每行产出一条 Tick:
from dataclasses import dataclass
@dataclass(frozen=True)
class Tick:
code: str
price: float
volume: int
def parse_ticks(path):
with open(path) as f:
next(f) # 跳过表头
for row in f:
code, p, v = row.rstrip().split(",")
yield Tick(code, float(p), int(v))
turnover = sum(t.price * t.volume for t in parse_ticks("ticks.csv"))
样本行 600519,1820.50,200 流过去时,内存里只有当前这条 Tick 实例;整年 18GB 的文件能算完成交额而进程驻留内存维持平稳。两个细节值得拎出来:parse_ticks 体内有 yield,所以它一被调用即返回生成器,函数体并未执行——直到外层 for / sum 取第一个值才会跑到第一个 yield。另一个细节是 sum(... for t in ...) 用了生成器表达式(generator expression),下一节讲它。
三个常用小生成器
下面三个写一遍就够,后面拼管线时反复出现:
def count_up(start):
while True:
yield start
start += 1
def take(n, gen):
for i, v in enumerate(gen):
if i >= n:
return
yield v
def windows(seq, size):
for i in range(len(seq) - size + 1):
yield seq[i:i + size]
list(windows([1, 2, 3, 4, 5], 3)) # [[1, 2, 3], [2, 3, 4], [3, 4, 5]]
count_up(0) 是无穷流——只有外层 take(n, ...) 或 break 才会让它停。take 是「截断到前 n 个」的最小实现,写法等价于 itertools.islice(gen, n),自己手写一遍是为了把「return 在生成器里就是 StopIteration」这一点刻下来。windows 是滚动窗口(rolling window)的最小实现,等会儿练习里你会写一个它的衍生版本。
list 推导 vs 生成器表达式
把方括号换成圆括号,语义从「先建一个 N 元素的列表,再交出去」切换成「一次一个值、按需产出」:
sum([x*x for x in range(1_000_000)]) # list comprehension: 先建百万元素列表
sum(x*x for x in range(1_000_000)) # generator expression: 流式
# 两者都等于 333332833333500000
两个表达式数学上完全等价;区别在内存。前者构造一份长度为 100 万的临时列表,分配并立即释放——峰值占用大约 30+ MB;后者只在 sum 的累加器里保留一个状态变量。在循环里只用一次的中间集合,几乎总是写成生成器表达式更合适;要复用(多次遍历或就地修改),才写 list 推导。
itertools:流式工具箱
itertools 是标准库里专为「流式」准备的工具箱,下面四样最常用——名字 + 一句话即可:
chain(a, b, c):把多个可迭代对象首尾相接成一个流。islice(it, start, stop):对迭代器切片(不能用it[a:b],迭代器没有__getitem__)。takewhile(pred, it):一边吐值,一边遇到pred第一次为假就停。groupby(it, key=...):把相邻key相同的连续段聚成一组——注意它只看「相邻」,多数情况下你要先按key排序。
把它们和上面三个小生成器拼起来,「按 code 求每天最大盘口成交额」这种问题就是一行管线。
with 与上下文管理器
资源管理(resource management)的核心问题是:打开了得关、加了锁得放、改了某种全局状态得复原——而且抛错也要复原。with EXPR as NAME: 这一行解决的就是这件事,它退糖(desugar)成两次调用:
ctx = EXPR
NAME = ctx.__enter__()
try:
# with block 内的代码
finally:
ctx.__exit__(exc_type, exc_value, tb)
__exit__ 的三个参数在正常退出时全是 None,异常退出时承载异常信息——返回真值(truthy)即吞掉该异常,假值则让它继续向上传播。三个内置上下文管理器你几乎天天用:
open(path):进入即打开文件,退出即调用.close(),无论中间抛没抛。threading.Lock():with lock:进入即acquire、退出即release——比手写try / finally短一半,还不会漏。contextlib.suppress(KeyError):把指定异常吃掉(用于「字典里没有就算了」这类清理代码),别滥用。
自己写一个:@contextlib.contextmanager
要写一次性的上下文管理器,没必要去定义一个带 __enter__ / __exit__ 的类。contextlib 提供了一个装饰器(decorator)——具体「装饰器是什么」下一课讲——它把一个只 yield 一次的生成器函数包成上下文管理器:yield 之前是 __enter__,yield 之后是 __exit__。
工程师最常写的一个就是「打印这段代码跑了多久」:
from contextlib import contextmanager
import time
@contextmanager
def timing(label):
start = time.perf_counter()
yield
dt = time.perf_counter() - start
print(f"{label} 耗时: {dt:.3f}s")
with timing("turnover"):
turnover = sum(t.price * t.volume for t in parse_ticks("ticks.csv"))
yield 一旦执行,控制权交给 with 块;块退出后,从 yield 之后继续——所以 start = ... 自然落在「进入」一侧,dt = ...; print(...) 自然落在「退出」一侧。生产代码里要把异常处理写完整:把 yield 包进 try / finally,确保即便块体抛错也会打印耗时。这一步改造留给下一课讲完装饰器之后回来做。
练习
Exercise
写一个生成器函数 moving_average(values, window),按 values 的顺序、一次产出过去 window 个值的算术平均,从第一个能取满整窗的下标开始产出。当 window=3、values=[10, 20, 30, 40, 50] 时,list(moving_average(values, 3)) 必须等于 [20.0, 30.0, 40.0]。要求:内存占用不随 len(values) 增长——也就是说,不要在函数里构造一份全长 list。
提示
collections.deque(maxlen=window) 当滑窗:每收一个值 append,满窗时(len(buf) == window)yield sum(buf) / window。deque 自动丢掉最旧元素,不留全长列表。提示
20.0 等浮点。除法用 / 自然得 float;若担心整数输入退化,可写 sum(buf) / float(window)。结果用 yield 一个个产出,别先攒进 list 再返回。衔接下一课
这一课的三个工具——迭代器协议、生成器函数、上下文管理器——背后藏着一个尚未被拆开的共同零件:「函数接受函数、返回函数」。@contextlib.contextmanager 就是这种零件的现成实例——它接受一个只 yield 一次的生成器函数,返回一个新的上下文管理器对象。下一课把这件事掀开,讲装饰器(decorator)与 functools:@wraps、@lru_cache、partial,以及为什么 @dataclass(上一课用过)和 @contextmanager(这一课用过)其实是同一种东西的两副面孔。