← 返回模块
3.3.2.3beta 可读 · 未来付费内容校验中内容版本 2026-05-24

同步原语:threading 与 asyncio

3.3.2 · 设计模式与工具 · 编程

开场

某私募周二上午九点三十一,沪深300 ETF(510300.SH)刚收完集合竞价,团队的 tick 处理器跑了九十秒。两个生产者线程在排空一个 Tushare 风格的行情 websocket——一个分片走 SSE(上海)回报,一个分片走 SZSE(深圳)回报——四个消费者线程各跑一个下游策略。九十三秒时,四个策略共写的盈亏累计器显示 -1400000 元而非 -140000 元,半秒后又跳到 +80000 元。数字之所以错,是因为两个线程在没有锁保护的情况下对 pnl 做了读-改-写;策略逻辑没坏,协调坏了。3.3.1 L2 把 ThreadPoolExecutor 作为高层 fan-out 工具引入,明确把同步原语放到这一课;3.3.1 L3 把 asyncio.Semaphore 作为限流器引入,把其余 asyncio 原语也放到这一课。本课就是被推迟的那一章——执行器之下的底层协调表面,加上它的 asyncio 双胞胎。

第一层:threading 原语

threading.Lock 是基本互斥量:同一时刻最多一个线程持有,作为上下文管理器使用,保证 release 与 acquire 配对:

import threading
lock = threading.Lock()
counter = 0
def increment():
    global counter
    with lock:
        counter += 1
threads = [threading.Thread(target=increment) for _ in range(100)]
for t in threads: t.start()
for t in threads: t.join()
assert counter == 100

经典的 lost-release bug 是 lock.acquire(); do_work(); lock.release() 没有 try/finally——do_work 抛异常就把锁漏掉了,之后所有 acquire 都永远阻塞。规范修法就是上面的 with 形式。规则 1:永远用 with lock: 而不是裸的 acquire/release(lost-release 是经典 bug)。

threading.RLock 是可重入变体:同一个线程可以 N 次 acquire 必须 N 次 release;当一个持锁方法去调另一个也想持同一把锁的方法时需要它。警告:不要用 RLock 去糊一个锁顺序 bug。当递归在结构上必须存在时才用它——一个递归数据结构,或者一个合理地在同一把锁下调用另一个公开方法的公开方法。

threading.Semaphore(K) 是有界资源池:最多 K 个线程并发持有。典型用法是给外发 API 调用限流——比如把对 AMAC 风格基金 NAV API 的并发请求限到 10,写法是 sem = threading.Semaphore(10) 配合 with sem: requests.get(...)threading.Event 是一次性信号:一个或多个线程 event.wait() 直到某个生产者线程 event.set();一旦 set,之后每次 wait() 都立即返回。典型用法:等到数据加载线程初始化完毕。threading.Barrier(N) 是 N 方汇合:N 个线程调 barrier.wait() 全都阻塞到第 N 个到达,然后全部一起放行。典型用法:并行仿真里「所有 worker 必须先到 phase 2 才有人开始 phase 3」。

threading.Condition 是「等谓词」的形式:cond.wait_for(predicate) 阻塞到谓词为真,且通知线程调用了 cond.notify_all()wait_for 是关键的那个形式,因为它关闭了纯 cond.wait() 容易踩中的 lost-wakeup 竞态:

import threading
cond = threading.Condition()
buf: list = []
def consumer():
    with cond:
        cond.wait_for(lambda: len(buf) >= 10)
        batch = buf[:10]
        del buf[:10]
    process(batch)
def producer(item):
    with cond:
        buf.append(item)
        cond.notify_all()

第二层:queue.Queue 作为生产者-消费者

这是真实世界里被用得最多的一类线程模式,也是新手会拿 Lock 去手搓的那个。Queue 在构造上就是线程安全的:q.put(item)q.get() 在底层都是原子的,不需要显式 Lock。规范写法用一个 None 哨兵关闭协议——生产者每个消费者推一次 None,每个消费者收到 None 就 break:

import queue, threading
q: queue.Queue = queue.Queue(maxsize=1000)
def producer(n):
    for i in range(n): q.put(i)
def consumer():
    while True:
        item = q.get()
        if item is None: break
        # process item
        q.task_done()
for _ in range(num_consumers): q.put(None)

maxsize=1000 在消费者落后时提供背压——队列满后生产者会在 q.put(item) 上阻塞,这是语言层防止内存无界增长的机制。工作锚定例是 tick 处理流水线,从一个 Tushare 风格的行情 websocket 排空 510300.SH 的 tick,约 100 条/秒。2 个生产者线程从两个模拟分片接入(SSE 与 SZSE);4 个消费者线程各跑一个下游策略——momentum_consumermean_reversion_consumervol_breakout_consumerlogger_consumer。关闭协议推 4 次 None(一个消费者一次)。如果你想让生产者等所有消费者把队列排空再退出,q.task_done()q.join() 做这本账——join 会等到每个 put 都被一次 task_done 抵销才返回。

3.3.1 L2 提供了在消费者计算密集时换 ProcessPool 的替代路线;这一课保持线程,因为工作锚定例是 I/O 密集(消费者是网络策略)。同样这套 API 在 multiprocessing 模块里有跨进程版本(multiprocessing.Lockmultiprocessing.Queuemultiprocessing.Manager 代理);分布式协调(Redis 锁、etcd lease、Kafka 消费者组)在 3.6.4。

第三层:asyncio 的双胞胎

协作世界里有同样的五个原语:asyncio.Lockasyncio.Semaphore(K)asyncio.Eventasyncio.Conditionasyncio.Queue。API 几乎一致:async with lock: 替代 with lock:await q.get() 替代 q.get()。生产者-消费者的 asyncio 版本:

import asyncio
async def producer(q: asyncio.Queue, n: int) -> None:
    for i in range(n): await q.put(i)
    await q.put(None)
async def consumer(q: asyncio.Queue) -> None:
    while True:
        item = await q.get()
        if item is None: break
        # process item
async def main():
    q: asyncio.Queue = asyncio.Queue(maxsize=1000)
    async with asyncio.TaskGroup() as tg:
        tg.create_task(producer(q, 100))
        tg.create_task(consumer(q))
asyncio.run(main())

规则 2 关键:asyncio 原语不是线程安全的——它们协调的是一个事件循环上的协程,不是 OS 线程。当一个协程与一个线程需要协调时,从线程一侧用 loop.run_in_executor(3.3.1 L3 见过)把线程工作回挂到协程上,或从线程里用 asyncio.run_coroutine_threadsafe(coro, loop) 把协程调度回事件循环。把 asyncio.Lock 交给两个 OS 线程会静默失败——既不死锁也不报错,只是临界区没被保护住。

threading ↔ asyncio 对照表

primitivethreading APIasyncio APIone-sentence use case
mutexthreading.Lockasyncio.Lockprotect a shared mutable invariant
bounded poolthreading.Semaphore(K)asyncio.Semaphore(K)rate-limit to K concurrent holders
one-shot signalthreading.Eventasyncio.Eventwake one or many waiters when state becomes ready
predicate waitthreading.Conditionasyncio.Conditionwait until a condition over shared state holds
producer-consumerqueue.Queueasyncio.Queuehand items from N producers to M consumers with backpressure

三个经典 bug 与各自的修法

​锁顺序不一致 → 死锁。​​线程 A 持 lock_xlock_y;线程 B 持 lock_ylock_x。两边永远阻塞,单看任一栈帧都说不出原因。修法:定义一个全局锁获取顺序(例如「永远先 lock_xlock_y」),处处遵守。更好的做法是重构成每个临界区最多只用一把锁。

​对普通 Lock 双次 acquire → 自死锁。​​方法 a() 拿到 lock 后调 b()b() 也要拿同一把 lockb 永远阻塞。修法:当递归在结构上必须时用 RLock;否则把加锁的活从 a()b() 都抽出来放到第三个助手函数,在锁下只跑一次。

​Condition 的 lost wakeup。​​消费者调 cond.wait(),谓词在「检查谓词」与「调 wait」之间被满足,消费者就永远睡下去。修法:用 cond.wait_for(predicate)(前面例子)——它在被唤醒后会重新检查谓词,仍为假就重新睡——规范修法已经在 API 里了,记得用。

长视角一句话:CPython 3.13 出了一个实验性的 free-threaded 构建(PEP 703)去掉了 GIL;这些原语会变得更关键而不是更不关键,因为真正的共享状态竞争变成可能。2026 年的日常建议没变。

练习

Exercise

搭一个有界生产者-消费者流水线。(a) 创建 q = queue.Queue(maxsize=10)。(b) 定义 def producer(n): for i in range(n): q.put(i),启动一个生产者线程,n = 50。(c) 定义 def consumer(out): while True: item = q.get(); out.append(item)out 是共享列表。可以在 out.append(item) 周围用 threading.Lock 保护,(等价且推荐)用第二个 queue.Queue 取代列表。启动 5 个消费者线程。(d) 生产者结束后,在 q 上 put 5 次 None 关闭消费者,每个消费者收到 None 即 break。(e) join 所有线程,验证 len(out) == 50sorted(out) == list(range(50))。(f) 用一句话说明:为什么第二种「用第二个队列」的形式完全不需要 Lock

提示
哨兵口诀是「每个消费者一份 None」:生产者循环结束后 for _ in range(5): q.put(None),每个消费者正好收到一份关闭令牌。
提示
queue.Queue 内部已加锁——putget 都是原子的——把 out 换成一个 queue.Queue 等于把原本由 Lock 保护的共享 mutation 全部消除。

通往下一课的桥

你现在能写一个不丢自增的线程安全计数器、用规范的哨兵关闭协议把行情 websocket 排进一个有背压的队列、用谓词等待不丢唤醒,也能把每个 threading 原语翻译成对应的 asyncio 版本(同时记得不要混用两边)。下一课关上 Python 这条 track,从第一课覆盖的类系统表面再往下走一层。第一课里你写的那些 dunder——__eq____iter____enter__——是语言从你这里消费的。第四课要讲的是类机制它自己用来组装实例与查找属性时所用的那些 dunder:__get____set____init_subclass__ 以及元类钩子。

阅读清单:《Fluent Python》第 2 版 中译 第 19 章 (并发执行器) 的 threadingqueue.Queue 段落; CPython 官方 threading / queue / asyncio 中文文档; 《Python Cookbook》第 3 版 中译 第 12 章 (并发) 的同步原语小节; Raymond Hettinger 的 Keynote on Concurrency (PyBay 2017) — 队列优先, 锁次之 — 国内有中文搬运字幕; PEP 654 (ExceptionGroup) 与 PEP 692 (TaskGroup) 的中文社区导读对 asyncio 部分有帮助; 一句话提示: 国内 私募 / 资管 团队 2026 年常用 Python 3.11+, asyncio.TaskGroup 与 asyncio.Queue 的 shutdown 方法 (Python 3.13+) 可视团队基线决定是否使用。