← 返回模块
3.3.1.3beta 可读 · 未来付费内容校验中内容版本 2026-05-27

asyncio 与 I/O 密集型并发

3.3.1 · 并发与性能 · 编程

Hook

周二下午三点,一家上海私募的量化研究员要在 T+1 风控窗口前把沪深300 成分股当日的分钟线快照拉下来,作为隔夜组合 VaR 的输入。上节课用 ThreadPoolExecutor 把 100 个同步 requests.get 压到了 1.8 秒;现在策略组想把每天的拉取面扩到 1500 只 A 股、外加 200 只港股通(Stock Connect)标的,16 条线程已经吃满了内存——每条线程光是栈和 socket 缓冲就要近 8 MB。再加线程会触发交易日 18:00 这一节点的内存告警,但接口本身明明只是在等响应。换工具的时机到了:把「一条 socket 一条线程」换成「一根线程跑几千个协程(coroutine)」。

事件循环:一根线程上的轮转

asyncio 的并发不来自更多线程,而来自一根线程上的合作式调度(cooperative scheduling)。事件循环(event loop)是这根线程的调度器:它维护一份「就绪协程」队列,每次取一个跑到下一个 await,把控制权交回;再去看哪些等待中的 I/O 完成了,把对应协程重新塞回队列。整条主线没有抢占,也没有 GIL 之争,因为本来就只有一条线程。1000 个并发 HTTPS 请求等同于 1000 个挂起的协程对象,内存占用以 KB 计而非 MB——这就是 asyncio 在 I/O 密集(I/O-bound)场景压倒线程池的根本原因。

async defawait

async def f(): ... 不是函数声明的语法糖,它是一个协程工厂:调用 f() 不会执行函数体,只产生一个协程对象(coroutine object)。要让它真正跑起来,要么在另一个协程内 await f(),要么在顶层用 asyncio.run 启动事件循环:

import asyncio

async def main():
    await asyncio.sleep(0.1)
    return 'done'

print(asyncio.run(main()))

第一天最常见的 bug 是漏写 await:直接 f() 不会立刻报错,只产出一个未被等待的协程,Python 在退出时抛 RuntimeWarning: coroutine 'f' was never awaited。看到这条警告,回头给调用补 await

gatherTaskGroup:并发等待

把 N 个协程并发跑起来再统一收结果,传统写法是 asyncio.gather(*coros),按提交顺序返回结果列表。但 gather 默认不会在第一个异常发生时取消其他兄弟任务(sibling tasks),异常被压到 future 里,第二个 future 抛错可能被吞掉——结构化并发(structured concurrency)的缺位。Python 3.11 引入的 asyncio.TaskGroup 解决了这件事:进入 async with 块创建任务,任一兄弟抛错时其余全部取消,所有异常以 ExceptionGroup 形式向外抛出。任务组(TaskGroup)是新代码的默认起点。

import asyncio

async def fetch_one(url):
    await asyncio.sleep(0.05)
    return url

async def main(urls):
    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(fetch_one(u)) for u in urls]
    return [t.result() for t in tasks]

urls = ['600519.SH', '000001.SZ', '600036.SH', '601318.SH', '510300.SH']
print(asyncio.run(main(urls)))

tg.create_task 返回的 Taskasync with 退出时已全部完成,按提交顺序读 t.result() 即可。

aiohttp.ClientSessionasyncio.Semaphore

要让协程真正异步地发 HTTP,需要一个异步原生(async-native)的客户端。aiohttp.ClientSessionrequests.Session 的对位:作为异步上下文管理器(async context manager),它内部维护一个连接池,所有请求复用同一组 TCP 连接。再叠一层 asyncio.Semaphore(10) 做限流(rate limiting via Semaphore),保证任一时刻在飞请求不超过 10 个——这是把握行情接口配额、避开 429 的标准姿势。

import aiohttp, asyncio

async def fetch(session, sem, url):
    async with sem:
        async with session.get(url) as resp:
            return await resp.json()

async def main(urls):
    sem = asyncio.Semaphore(10)
    async with aiohttp.ClientSession() as session:
        return await asyncio.gather(*(fetch(session, sem, u) for u in urls))

urls 换成 100 条形如 https://stub.example.com/cn/bars/600519.SH?start=20240101&end=20240131 的 Tushare 风格地址(返回 {"ts_code": "600519.SH", "trade_date": 20240131, "close": 1820.5, ...}),跑完用 time.perf_counter 与上节课两个基线对齐:

implementationwall-clock secondspeak concurrent connections
sequential requests24.61
ThreadPoolExecutor(max_workers=16)1.916
aiohttp + Semaphore(10)1.810

墙钟(wall clock)上 asyncio 与线程池打平,但峰值连接数少 6 个、常驻内存少一个数量级;把目标连接数从 100 推到 1000 时,只有协程方案不会炸内存。

异步迭代:流式分页

async defyield 即定义一个异步生成器(async generator),它是异步迭代器(async iterator)最常见的形式,消费方用 async for chunk in agen(): 驱动,每次 yield 都允许事件循环切走干别的事。典型用法是分页 REST 接口(paginated REST endpoint):拉取沪深300 完整成分权重时,AMAC 风格接口每页 50 个名字,每翻一页都要一次 await session.get(...) 才能拿到下一页。

把翻页封装成 async def weights(session),内部 while True 循环 await 下一页、yield 每行,遇到空页 return,调用方一句 async for row in weights(session): persist(row) 即可按需消费——下游无需等全表就绪,可以与上游拉取重叠运行。

两条承重的规则

​规则 1:协程里不要调用阻塞函数​​。time.sleep(1)、同步的 requests.get、长跑的纯 Python 循环、阻塞型数据库驱动——任何一条都会卡住整根线程,让其他几百个协程跟着停摆。修复三选一:(a) 换异步原生库(aiohttpasyncpgaiofiles);(b) 把 time.sleep 换成 await asyncio.sleep;(c) 实在没异步版本,用 await loop.run_in_executor(None, blocking_fn, *args) 把它推进线程池——上节课的 ThreadPoolExecutor 在这里以「逃生通道」身份回归。

​规则 2:用 TaskGroup,不要裸用 create_task​​。asyncio.create_task(coro) 只把 task 弱引用进事件循环,调用方若不持有引用也不 await,task 可能被 GC 提前回收,留下 Task was destroyed but it is pending! 警告并悄无声息地丢失异常。TaskGroup 在块退出前强引用其下所有任务,是首选写法。

调试

排查「明明 async 了为什么还是慢」时,开 asyncio.run(main(), debug=True) 或环境变量 PYTHONASYNCIODEBUG=1:事件循环会标出任何把回调阻塞超过 100ms 的协程并打印 slow-callback 警告,多半是隐藏的同步调用。需要更高吞吐时把策略换成 libuv 后端 uvloopasyncio.set_event_loop_policy(uvloop.EventLoopPolicy()),API 兼容,调度可再快 2–4x。

决策框架

并发原语的边界很清晰:瓶颈在 I/O 延迟且对应库有异步实现,asyncio 是正解;库只有同步且任务规模小,上节课的 ThreadPoolExecutor 够用;瓶颈在 CPU 而非 I/O,两者都救不了,回到 ProcessPoolExecutor 或下一课的 JIT 路径。

衔接下一课

到这里你能把 1000 个行情请求压到 2 秒内、内存只占线程方案的一个零头,I/O 密集这一面的工具栈已经完整。下一课换一种瓶颈:单核上一段跑不动的数值热循环(hot loop)——@numba.njit 这类即时编译(just-in-time compilation, JIT)工具能把 GARCH(1,1) 方差递推从 Python 速度推到接近 C 速度,并配合 nogil=Trueprange 把多核也用上。

练习

Exercise

给定同步函数 def slow_fetch(url): import time; time.sleep(0.5); return url.upper()

(a) 写一个异步包装 async def fetch(url),用 asyncio.get_running_loop() 取当前事件循环,再用 loop.run_in_executor(None, slow_fetch, url) 调用同步函数。

(b) 用 asyncio.TaskGroupfetch 在列表 urls = ["a", "b", "c", "d"] 上扇出并收集结果。

(c) 用 time.perf_counter() 给整次调用计时,验证墙钟约为 0.5 秒(approximately 0.5 seconds),而非 2.0 秒。

(d) 用一句话解释为什么用 run_in_executor 包住一个阻塞函数后,它能与事件循环配合。

提示
fetch 内部:loop = asyncio.get_running_loop(),然后 return await loop.run_in_executor(None, slow_fetch, url),第一个参数 None 表示走默认线程池。外层用 TaskGroup 扇出。
提示
run_in_executor 返回的对象可 await:阻塞函数被丢进后台线程,事件循环立刻拿回控制权调度其他协程。4 个 fetch 的 0.5 秒等待因此重叠在同一段时间里,而非串行累加成 2.0 秒。

延伸阅读:《Fluent Python》第 2 版中译第 21 章(异步编程);CPython 官方 asyncio 中文文档与 aiohttp 中文文档;PEP 654(ExceptionGroup)与 PEP 692(TaskGroup)的中文导读;国内 aiohttp 客户端常见踩坑(DNS 解析失败时 ClientSession 不会自动重试,connector 默认 limit=100 在国内运营商代理下需要下调),安装可走 npmmirrorpypi.tuna 镜像。