← 返回模块
3.2.4.3beta 可读 · 未来付费校验通过内容版本 2026-05-24

HTTP API 与具备韧性的数据抓取

3.2.4 · 合成数据与 API · 编程

某私募的固定收益研究员要把过去三个月的 10 年期中国国债收益率拉成时间序列,放进久期模型的样本。AKShare 的公开接口 ak.bond_china_yield 不要 token、本地能跑、数据按日更新——但研究 notebook 一旦在用户面前演示时撞上 429,整场会议就要等十分钟手动 retry。本课把 AKShare 调用包成一个 fetch_yield_curve 函数,把五件套配齐:超时、指数退避重试、客户端限流、Pydantic 校验、Parquet 缓存。出来的对象是一只 pd.DataFrame,带 dateyield 两列,可以无缝接到 3.2.2 的时间序列处理与 3.2.3 的回归脚本。本课是纯 Python 工程课,不再触碰 SDE 或微观结构,但所有合成数据的回退路径都假设这一层是稳的。

requests 的最小可用 API

四个参数装得紧的 requests.get 调用:URL、params 字典、headers 字典、timeout 二元组。timeout 给得偏紧——(connect=3.0, read=10.0)——因为没有超时的 HTTP 调用可以挂死整个 notebook 进程,等到内核被你 Ctrl-C 之前研究节奏已经断了。响应的三件套:response.status_code 看返回码、response.json() 解码、response.raise_for_status() 把 4xx / 5xx 翻成异常往上抛。requests.Session() 复用底层 TCP 连接,共用一份 headers / params 基线,迭代抓多日数据时按 session 走比一次性 requests.get 快一倍。httpx 是同口径的现代替代:类型注解齐备,带 httpx.AsyncClient 异步原型,迁移成本几乎为零;异步分支在 3.3.1 并发与性能里展开,这里维持同步路径。

import requests
r = requests.get(url, params=params, headers=headers, timeout=(3.0, 10.0))
r.raise_for_status()
payload = r.json()

指数退避重试 + 抖动

对瞬态失败(5xx、429、ConnectionErrorTimeout)做重试,对其他 4xx 立刻 raise——程序员错(401 没传 token、404 拼错 URL、422 入参不合规)重试只会越打越糟。退避公式正式写法是 waitn=2n+U(0,1)\text{wait}_n = 2^n + U(0, 1) 秒,KaTeX 形式给出标准的递推:

waitattempt=2attempt+jitter,jitterU(0,1)\text{wait}_{\text{attempt}} = 2^{\text{attempt}} + \text{jitter}, \qquad \text{jitter} \sim U(0, 1)

退避公式 2 ** attempt + rng.uniform(0, 1):第一次失败睡 1-2 秒、第二次 2-3 秒、第三次 4-5 秒……加 jitter 是为了把多个并发客户端同时唤醒打散开。生产环境推荐 tenacity 把这个模式做成装饰器,本课用手卷循环把每一步都摆在明面上。

import time
from requests.exceptions import ConnectionError, Timeout, HTTPError
last_exc = None
for attempt in range(5):
    try:
        r = s.get(url, params=params, timeout=(3.0, 10.0))
        r.raise_for_status()
        return r.json()
    except (ConnectionError, Timeout) as exc:
        last_exc = exc
    except HTTPError as exc:
        if r.status_code in (429, 500, 502, 503, 504):
            last_exc = exc
        else:
            raise  # 4xx other than 429 is a programmer error
    time.sleep(2 ** attempt + rng.uniform(0, 1))
raise last_exc

对瞬态失败这一类——5xx 状态码、429 Too Many Requests、ConnectionErrorTimeout——用指数退避加抖动重试;其他 4xx 必须直接 re-raise,因为那是程序员错,重试是有害的。最大 5 次尝试。ratelimitslowapi 是把限流做成装饰器的库,这里用手卷的 time.sleep(1.0 / max_qps) 在循环之外限速,简单可控。

Pydantic 校验

数据从 HTTP 边界进入分析代码之前,必须做一次结构校验。AKShare 的 bond_china_yield 返回一只 DataFrame,但若直接对接公开接口的 JSON 形式,字段是 [{"date": "2024-01-02", "rate": 2.55}, ...]。Pydantic 在边界处把任何不合规字段炸成 ValidationError,远比 50 行下游撞 KeyError 容易追踪:

from pydantic import BaseModel, TypeAdapter
class AkShareYieldRow(BaseModel):
    date: str
    rate: float
parsed = TypeAdapter(list[AkShareYieldRow]).validate_python(records)

r.json() 解码后的 payload 上用 pydantic.BaseModel.model_validate(...) 做边界校验;响应格式异常应该在边界抛 pydantic.ValidationError,而不是渗到下游 50 行外面才以 KeyError 出现。dataclasses.dataclass 加手动类型检查可以作为轻量替代,但 Pydantic v2 在原生速度与错误信息质量上仍然是默认选项。

缓存到 Parquet

研究 notebook 与 CI 测试都不应该每次都去打公开接口,既会浪费配额,也会让结果不可复现。缓存键由请求(URL 加排序后的 params)的 SHA-256 哈希决定;json.dumps(..., sort_keys=True) 是关键——逻辑等价的请求字典必须给出相同的字符串。文件存 Parquet,因为 Pandas 原生读写、压缩比好、单机 in-memory 用例下比 SQLite 又快又简单。Redis 缓存是生产级的延伸,放到 Subject 3.6 讲。

import hashlib, json, time, pathlib
cache_key = hashlib.sha256(f'{url}|{json.dumps(params, sort_keys=True)}'.encode()).hexdigest()[:16]
cache_path = cache_dir / f'{cache_key}.parquet'
if cache_path.exists() and (time.time() - cache_path.stat().st_mtime) < max_age_seconds:
    return pd.read_parquet(cache_path)
# ... 走网络抓取 ...
df.to_parquet(cache_path)

五件套合体:fetch_yield_curve

def fetch_yield_curve(date, cache_dir: pathlib.Path = pathlib.Path('./cache'), max_age_seconds: int = 86400) -> pd.DataFrame:
    cache_dir.mkdir(parents=True, exist_ok=True)
    url = "https://www.chinabond.com.cn/cb-pub/.../yieldCurve"
    params = {"date": date.isoformat()}
    cache_key = hashlib.sha256(f'{url}|{json.dumps(params, sort_keys=True)}'.encode()).hexdigest()[:16]
    cache_path = cache_dir / f'{cache_key}.parquet'
    if cache_path.exists() and (time.time() - cache_path.stat().st_mtime) < max_age_seconds:
        return pd.read_parquet(cache_path)
    s = requests.Session()
    last_exc = None
    for attempt in range(5):
        try:
            time.sleep(0.5)
            r = s.get(url, params=params, timeout=(3.0, 10.0))
            r.raise_for_status()
            records = r.json()
            parsed = TypeAdapter(list[AkShareYieldRow]).validate_python(records)
            df = pd.DataFrame([{"date": row.date, "yield": row.rate} for row in parsed])
            df.to_parquet(cache_path)
            return df
        except (ConnectionError, Timeout) as exc:
            last_exc = exc
        except HTTPError as exc:
            if r.status_code in (429, 500, 502, 503, 504):
                last_exc = exc
            else:
                raise
        time.sleep(2 ** attempt + 0.1)
    raise last_exc

time.sleep(0.5) 是客户端节流——把请求频率压到每秒 2 次以内,远低于公开接口的限流阈值,避免触发 429。即便如此,生产代码里也要带 tenacity 或者 ratelimit 做正式的令牌桶;这里手卷的版本是给读者看清楚每一步的形状。

A 股的 10 年期中债收益率(CCDC 中央国债登记结算)是固定收益研究的基础参考。AKShare 的 ak.bond_china_yield 用爬取的方式拉公开数据,不需要 token;Tushare 的 pro.cn_gov_bond_yield 走 token 接口,免费版每天 500 次配额。国内的 pip 镜像(清华 https://pypi.tuna.tsinghua.edu.cn/simple 或阿里 https://mirrors.aliyun.com/pypi/simple)装 akshare 与 pyarrow 比走默认源稳定一个量级——A 股研究环境里这是必备的实务设置。WebSocket 流式接口、FIX 引擎、撮合网关接入归 3.6 生产交易系统;Bloomberg BLPAPI、Refinitiv Eikon、FactSet、Wind、同花顺 iFinD 与优矿都是 LiveProvider 的生产同行,接口形态各异但重试/缓存/校验的纪律完全可迁移。

Formula Explorer

\\text{wait}_{attempt} = 2^{attempt} + \\text{jitter}

Exercise

实现 `fetch_yield_curve(date, cache_dir=pathlib.Path("./cache"), max_age_seconds=86400)`,按本课规范。函数必须 (1) 构造 `cache_key = hashlib.sha256(f"{url}|{json.dumps(params, sort_keys=True)}".encode()).hexdigest()[:16]`,并在 `(time.time() - cache_path.stat().st_mtime) < max_age_seconds` 时短路返回缓存 Parquet;(2) 在 5 次尝试的指数退避加抖动重试循环里发起 GET,对 (429, 500, 502, 503, 504, ConnectionError, Timeout) 重试,对其他 4xx 直接 re-raise;(3) 每次尝试调用前调一次 `time.sleep(0.5)` 做客户端节流;(4) 用 Pydantic `BaseModel` 校验解析后的 payload,模型字段对齐本课所述的接口;(5) 返回两列 `(date, yield)` 的 `pd.DataFrame`,并在返回前把它写到缓存文件。

提示
Hint 1: 缓存键用 url + sorted-params 构造,逻辑等价的请求必须产出相同键,跟字典迭代顺序无关;json.dumps(..., sort_keys=True) 是关键。
提示
Hint 2: 重试循环里把每一个瞬态异常捕到 last_exc,循环结束后 raise last_exc;不要在 5 次都失败时静默返回 None。

参考阅读:requests 官方文档 (https://requests.readthedocs.io);`httpx` 官方文档 (https://www.python-httpx.org);AKShare 官方文档 (https://akshare.akfamily.xyz);Tushare 官方文档 (https://tushare.pro);`pydantic` v2 官方文档 (https://docs.pydantic.dev/latest/);`tenacity` 官方文档 (https://tenacity.readthedocs.io);国内 pip 镜像配置说明(清华 / 阿里)。

一段实务校验:研究 notebook 与 CI 的运维差异

研究 notebook 与 CI 的运行 profile 完全不同,五件套要按场景轻重微调。研究 notebook 通常跑在研究员的 macbook 或者 A 股私募的 Linux 工作站上,实测网络抖动按周计,因此 max_age_seconds=86400(一天)略激进——可以放宽到 7 * 86400,把每周一次的全网刷新策略落实下来,这样大部分调用都命中缓存,真正过墙的请求每周不超过几十次,完全在 AKShare 与 Tushare 免费版的配额内。CI 则相反——CI 跑得频繁、每次都希望快速失败,缓存设 0 强制走网络也行,但更稳的做法是给 CI 一份"录制版"缓存(把生产环境跑过的 Parquet 直接 commit 进仓库),让 CI 永远命中缓存、永远 deterministic。

第二条实务校验:对账。每个公开接口都有自己的限流文档,真正用之前要把"客户端节流值 × 每天调用次数 < 每日配额"算清楚,否则上线第一天就 429。AKShare 是爬取式接口,理论上没有显式配额但单 IP 高频访问会触发上游网站的风控;Tushare 免费版每天 500 次、每分钟 5 次,token 接口的 pro.cn_gov_bond_yieldpro.daily 共用配额;沪深 300 ETF 与 50ETF 的日度 NAV 走另一支免费接口(ak.fund_etf_hist_em),也按 IP 限流。上线前在 staging 环境跑一遍 24 小时的 traffic mirror,把所有限流告警接到飞书 / 钉钉机器人,是国内私募研究部署的标准动作。

下一课把 L1 / L2 的合成工厂与 L3 的 fetch_yield_curve 同时塞到一个 DataProvider 协议背面,让回测代码在 CI(合成,离线,确定性)与研究 notebook(真接口,带缓存)之间切换时一行代码都不用改,顺带把 responses / respx 的 mock HTTP 测试纪律一次性教完。