某私募的固定收益研究员要把过去三个月的 10 年期中国国债收益率拉成时间序列,放进久期模型的样本。AKShare 的公开接口 ak.bond_china_yield 不要 token、本地能跑、数据按日更新——但研究 notebook 一旦在用户面前演示时撞上 429,整场会议就要等十分钟手动 retry。本课把 AKShare 调用包成一个 fetch_yield_curve 函数,把五件套配齐:超时、指数退避重试、客户端限流、Pydantic 校验、Parquet 缓存。出来的对象是一只 pd.DataFrame,带 date 与 yield 两列,可以无缝接到 3.2.2 的时间序列处理与 3.2.3 的回归脚本。本课是纯 Python 工程课,不再触碰 SDE 或微观结构,但所有合成数据的回退路径都假设这一层是稳的。
requests 的最小可用 API
四个参数装得紧的 requests.get 调用:URL、params 字典、headers 字典、timeout 二元组。timeout 给得偏紧——(connect=3.0, read=10.0)——因为没有超时的 HTTP 调用可以挂死整个 notebook 进程,等到内核被你 Ctrl-C 之前研究节奏已经断了。响应的三件套:response.status_code 看返回码、response.json() 解码、response.raise_for_status() 把 4xx / 5xx 翻成异常往上抛。requests.Session() 复用底层 TCP 连接,共用一份 headers / params 基线,迭代抓多日数据时按 session 走比一次性 requests.get 快一倍。httpx 是同口径的现代替代:类型注解齐备,带 httpx.AsyncClient 异步原型,迁移成本几乎为零;异步分支在 3.3.1 并发与性能里展开,这里维持同步路径。
import requests
r = requests.get(url, params=params, headers=headers, timeout=(3.0, 10.0))
r.raise_for_status()
payload = r.json()
指数退避重试 + 抖动
对瞬态失败(5xx、429、ConnectionError、Timeout)做重试,对其他 4xx 立刻 raise——程序员错(401 没传 token、404 拼错 URL、422 入参不合规)重试只会越打越糟。退避公式正式写法是 秒,KaTeX 形式给出标准的递推:
退避公式 2 ** attempt + rng.uniform(0, 1):第一次失败睡 1-2 秒、第二次 2-3 秒、第三次 4-5 秒……加 jitter 是为了把多个并发客户端同时唤醒打散开。生产环境推荐 tenacity 把这个模式做成装饰器,本课用手卷循环把每一步都摆在明面上。
import time
from requests.exceptions import ConnectionError, Timeout, HTTPError
last_exc = None
for attempt in range(5):
try:
r = s.get(url, params=params, timeout=(3.0, 10.0))
r.raise_for_status()
return r.json()
except (ConnectionError, Timeout) as exc:
last_exc = exc
except HTTPError as exc:
if r.status_code in (429, 500, 502, 503, 504):
last_exc = exc
else:
raise # 4xx other than 429 is a programmer error
time.sleep(2 ** attempt + rng.uniform(0, 1))
raise last_exc
对瞬态失败这一类——5xx 状态码、429 Too Many Requests、ConnectionError、Timeout——用指数退避加抖动重试;其他 4xx 必须直接 re-raise,因为那是程序员错,重试是有害的。最大 5 次尝试。ratelimit 与 slowapi 是把限流做成装饰器的库,这里用手卷的 time.sleep(1.0 / max_qps) 在循环之外限速,简单可控。
Pydantic 校验
数据从 HTTP 边界进入分析代码之前,必须做一次结构校验。AKShare 的 bond_china_yield 返回一只 DataFrame,但若直接对接公开接口的 JSON 形式,字段是 [{"date": "2024-01-02", "rate": 2.55}, ...]。Pydantic 在边界处把任何不合规字段炸成 ValidationError,远比 50 行下游撞 KeyError 容易追踪:
from pydantic import BaseModel, TypeAdapter
class AkShareYieldRow(BaseModel):
date: str
rate: float
parsed = TypeAdapter(list[AkShareYieldRow]).validate_python(records)
在 r.json() 解码后的 payload 上用 pydantic.BaseModel.model_validate(...) 做边界校验;响应格式异常应该在边界抛 pydantic.ValidationError,而不是渗到下游 50 行外面才以 KeyError 出现。dataclasses.dataclass 加手动类型检查可以作为轻量替代,但 Pydantic v2 在原生速度与错误信息质量上仍然是默认选项。
缓存到 Parquet
研究 notebook 与 CI 测试都不应该每次都去打公开接口,既会浪费配额,也会让结果不可复现。缓存键由请求(URL 加排序后的 params)的 SHA-256 哈希决定;json.dumps(..., sort_keys=True) 是关键——逻辑等价的请求字典必须给出相同的字符串。文件存 Parquet,因为 Pandas 原生读写、压缩比好、单机 in-memory 用例下比 SQLite 又快又简单。Redis 缓存是生产级的延伸,放到 Subject 3.6 讲。
import hashlib, json, time, pathlib
cache_key = hashlib.sha256(f'{url}|{json.dumps(params, sort_keys=True)}'.encode()).hexdigest()[:16]
cache_path = cache_dir / f'{cache_key}.parquet'
if cache_path.exists() and (time.time() - cache_path.stat().st_mtime) < max_age_seconds:
return pd.read_parquet(cache_path)
# ... 走网络抓取 ...
df.to_parquet(cache_path)
五件套合体:fetch_yield_curve
def fetch_yield_curve(date, cache_dir: pathlib.Path = pathlib.Path('./cache'), max_age_seconds: int = 86400) -> pd.DataFrame:
cache_dir.mkdir(parents=True, exist_ok=True)
url = "https://www.chinabond.com.cn/cb-pub/.../yieldCurve"
params = {"date": date.isoformat()}
cache_key = hashlib.sha256(f'{url}|{json.dumps(params, sort_keys=True)}'.encode()).hexdigest()[:16]
cache_path = cache_dir / f'{cache_key}.parquet'
if cache_path.exists() and (time.time() - cache_path.stat().st_mtime) < max_age_seconds:
return pd.read_parquet(cache_path)
s = requests.Session()
last_exc = None
for attempt in range(5):
try:
time.sleep(0.5)
r = s.get(url, params=params, timeout=(3.0, 10.0))
r.raise_for_status()
records = r.json()
parsed = TypeAdapter(list[AkShareYieldRow]).validate_python(records)
df = pd.DataFrame([{"date": row.date, "yield": row.rate} for row in parsed])
df.to_parquet(cache_path)
return df
except (ConnectionError, Timeout) as exc:
last_exc = exc
except HTTPError as exc:
if r.status_code in (429, 500, 502, 503, 504):
last_exc = exc
else:
raise
time.sleep(2 ** attempt + 0.1)
raise last_exc
time.sleep(0.5) 是客户端节流——把请求频率压到每秒 2 次以内,远低于公开接口的限流阈值,避免触发 429。即便如此,生产代码里也要带 tenacity 或者 ratelimit 做正式的令牌桶;这里手卷的版本是给读者看清楚每一步的形状。
A 股的 10 年期中债收益率(CCDC 中央国债登记结算)是固定收益研究的基础参考。AKShare 的 ak.bond_china_yield 用爬取的方式拉公开数据,不需要 token;Tushare 的 pro.cn_gov_bond_yield 走 token 接口,免费版每天 500 次配额。国内的 pip 镜像(清华 https://pypi.tuna.tsinghua.edu.cn/simple 或阿里 https://mirrors.aliyun.com/pypi/simple)装 akshare 与 pyarrow 比走默认源稳定一个量级——A 股研究环境里这是必备的实务设置。WebSocket 流式接口、FIX 引擎、撮合网关接入归 3.6 生产交易系统;Bloomberg BLPAPI、Refinitiv Eikon、FactSet、Wind、同花顺 iFinD 与优矿都是 LiveProvider 的生产同行,接口形态各异但重试/缓存/校验的纪律完全可迁移。
Formula Explorer
\\text{wait}_{attempt} = 2^{attempt} + \\text{jitter}Exercise
实现 `fetch_yield_curve(date, cache_dir=pathlib.Path("./cache"), max_age_seconds=86400)`,按本课规范。函数必须 (1) 构造 `cache_key = hashlib.sha256(f"{url}|{json.dumps(params, sort_keys=True)}".encode()).hexdigest()[:16]`,并在 `(time.time() - cache_path.stat().st_mtime) < max_age_seconds` 时短路返回缓存 Parquet;(2) 在 5 次尝试的指数退避加抖动重试循环里发起 GET,对 (429, 500, 502, 503, 504, ConnectionError, Timeout) 重试,对其他 4xx 直接 re-raise;(3) 每次尝试调用前调一次 `time.sleep(0.5)` 做客户端节流;(4) 用 Pydantic `BaseModel` 校验解析后的 payload,模型字段对齐本课所述的接口;(5) 返回两列 `(date, yield)` 的 `pd.DataFrame`,并在返回前把它写到缓存文件。
提示
url + sorted-params 构造,逻辑等价的请求必须产出相同键,跟字典迭代顺序无关;json.dumps(..., sort_keys=True) 是关键。提示
last_exc,循环结束后 raise last_exc;不要在 5 次都失败时静默返回 None。参考阅读:requests 官方文档 (https://requests.readthedocs.io);`httpx` 官方文档 (https://www.python-httpx.org);AKShare 官方文档 (https://akshare.akfamily.xyz);Tushare 官方文档 (https://tushare.pro);`pydantic` v2 官方文档 (https://docs.pydantic.dev/latest/);`tenacity` 官方文档 (https://tenacity.readthedocs.io);国内 pip 镜像配置说明(清华 / 阿里)。
一段实务校验:研究 notebook 与 CI 的运维差异
研究 notebook 与 CI 的运行 profile 完全不同,五件套要按场景轻重微调。研究 notebook 通常跑在研究员的 macbook 或者 A 股私募的 Linux 工作站上,实测网络抖动按周计,因此 max_age_seconds=86400(一天)略激进——可以放宽到 7 * 86400,把每周一次的全网刷新策略落实下来,这样大部分调用都命中缓存,真正过墙的请求每周不超过几十次,完全在 AKShare 与 Tushare 免费版的配额内。CI 则相反——CI 跑得频繁、每次都希望快速失败,缓存设 0 强制走网络也行,但更稳的做法是给 CI 一份"录制版"缓存(把生产环境跑过的 Parquet 直接 commit 进仓库),让 CI 永远命中缓存、永远 deterministic。
第二条实务校验:对账。每个公开接口都有自己的限流文档,真正用之前要把"客户端节流值 × 每天调用次数 < 每日配额"算清楚,否则上线第一天就 429。AKShare 是爬取式接口,理论上没有显式配额但单 IP 高频访问会触发上游网站的风控;Tushare 免费版每天 500 次、每分钟 5 次,token 接口的 pro.cn_gov_bond_yield 与 pro.daily 共用配额;沪深 300 ETF 与 50ETF 的日度 NAV 走另一支免费接口(ak.fund_etf_hist_em),也按 IP 限流。上线前在 staging 环境跑一遍 24 小时的 traffic mirror,把所有限流告警接到飞书 / 钉钉机器人,是国内私募研究部署的标准动作。
下一课把 L1 / L2 的合成工厂与 L3 的 fetch_yield_curve 同时塞到一个 DataProvider 协议背面,让回测代码在 CI(合成,离线,确定性)与研究 notebook(真接口,带缓存)之间切换时一行代码都不用改,顺带把 responses / respx 的 mock HTTP 测试纪律一次性教完。