← 返回模块
4.1.2.3beta 可读 · 未来付费内容校验中内容版本 2026-05-28

数据供应商接口与数据采集

4.1.2 · 基本面、另类数据与数据基础设施 · 量化全流程

周二早上 8:47。某上海私募的股票策略组组长在数据团队飞书里点了 @:「我们 fundamentals_pit 表里 600519 以后的股票全部缺失。今早策略对沪深300 后 30% 的标的完全没头寸。」值班数据工程师调出昨晚的入库日志。Wind 数据 服务 SFTP 文件 02:14 完成下载——SFTP close() 返回成功、cron 日志写「入库 完成」、仓库表已更新。但文件是 4.2 GB,前五天的同名 drop 全是 7.8 ~ 8.1 GB。网络在传输中途断了;操作系统报告文件已正常关闭;不完整的文件被无告警地推到了生产。两天后复盘锁定根因:​​没有做 manifest 校验​​。Wind 每个文件都附带 .sha256 sidecar;入库脚本根本没读它。L1 + L2 给你装好的是基本面 + 另类数据的 what;本课要装入的是 how——四种投递模式、幂等性、schema 注册表、manifest 契约、权限治理 层——把上面这个 bug 在结构上变得不可能发生。

四种投递模式

任意一家数据供应商,都通过下面四种物理模式之一交付数据。模式由数据的形态决定(延迟要求、体量、批 vs 流),不由供应商的偏好决定。把模式匹配到数据集,80% 的 ingestion 工程决策就做完了:

| pattern                  | initiation                       | canonical use case                                              | typical vendor anchor                                                       |
| REST API pull            | consumer-initiated               | on-demand reference + low-volume daily updates + historical backfills | Polygon.io / IEX Cloud / Tushare Pro / Nasdaq Data Link                  |
| WebSocket / streaming push | vendor-initiated                | real-time market + news + social-media firehoses                | Polygon WebSocket / Refinitiv RDP / Bloomberg MarketData Cloud / Wind 实时 行情 |
| SFTP file drop           | vendor-initiated batch           | batch fundamentals + credit-card + ESG + web-scraped            | S&P Compustat / Sharadar / CRSP / Wind 数据 服务 SFTP                       |
| S3 / object-storage drop | vendor-initiated event-driven    | modern alt-data + bulk historical + PIT fundamentals snapshots  | Sharadar S3 / modern alt-data vendors / Aliyun OSS / Tencent COS           |

REST API pull 是「消费方主动发起」模式:买方的 cron 作业(或 Airflow / Step Functions / Dagster 编排)打供应商 endpoint、翻页消费 response、尊重限速(HTTP 429 + Retry-After 头)、5xx 上指数退避,把数据落到 staging。典型场景是按需参考数据——标的主数据、公司行动——以及历史回填(首次部署时拉过去六个月)。Polygon.io 日级聚合、IEX Cloud 快照、Tushare Pro 接口、Nasdaq Data Link / Quandl 批量下载,CN 一侧的 Tushare Pro 接口、Wind API 历史拉取、通联 REST,都在这里。

WebSocket / streaming push 把方向反过来:供应商推送,消费方保持长连接、按到达顺序处理事件。契约是:维持 session、用序列号(或供应商提供的幂等键)去重、断线后从最后一个序列号恢复、下游消费跟不上时做反压。典型场景是实时行情(Polygon WebSocket / Refinitiv RDP streaming / Bloomberg MarketData Cloud / Wind 实时 行情 / 通联 行情 推送)以及实时新闻 / 舆情 流(RavenPack live / Twitter X Decahose / 同花顺 舆情 推送)。

SFTP file drop 是经典批量投递模式:供应商按日 / 时 / 周节奏把文件 推到 SFTP;消费方每 N 分钟轮询、列目录、按 名字 / 时间戳 / 大小 模式识别新文件、下载、校验 校验和、落到数据湖的 raw-vendor 分区、向下游清洗管线发信号。典型场景:批量基本面(Compustat / Sharadar / CRSP / Wind 数据 服务 SFTP)、批量信用卡(Earnest)、批量 ESG(Sustainalytics / WIND ESG 评级 批量交付)、批量爬取数据(Burning Glass)。

S3 / object-storage drop 是 2024+ 时代另类数据的默认模式:供应商把 Parquet 文件 drop 到共享 S3 / 阿里云 OSS / 腾讯云 COS / MinIO bucket;消费方的 IAM 凭据有读权限;消费方订阅 s3:ObjectCreated:* 事件(通过 SNS + SQS / EventBridge)做事件响应。完整性纪律与 SFTP 完全一致——校验和、行数、schema 校验——但投递机制从轮询变成了事件驱动。在 CN 一侧,阿里云 OSS / 腾讯云 COS / 华为云 OBS 取代 AWS S3,因 PIPL + 数据 安全 法 要求 CN 居民数据境内存储。

幂等性:一条管线最重要的属性

每个入库作业由一个三元组的自然主键定位,​​用同一个键再跑一次必须是 no-op 或受控覆写——绝不能出现重复插入​​:

1. vendor      — the vendor identifier (`bloomberg` / `compustat` / `wind` / `tushare` / `polygon`)
2. dataset     — the specific dataset name (`fundamentals_quarterly` / `news_firehose` / `daily_aggs`)
3. vintage_id  — the vendor-supplied or consumer-derived batch identifier (typically a date-window or a vendor-supplied batch UUID)

生产规则:​​每个入库作业在 (vendor, dataset, vintage_id) 上保持幂等;用同一个键再跑一次必须是 no-op 或受控覆写,绝不能出现重复插入。​

满足规则的三种实现:(i) MERGE——仓库 原生 的 MERGE INTO <table> USING <staging> ON <natural_key> upsert 语句;Snowflake / BigQuery / Postgres / ClickHouse 都支持。(ii) Iceberg / Delta snapshot-write——入库作业写一个新的 snapshot;读端看到最新 snapshot;旧 snapshot 仍可时间旅行查询。(iii) 入库账本(ingestion ledger)——一张 ingestion_log(vendor, dataset, vintage_id, status, ingested_at, file_path, row_count) 表,作业开始前查询;如果 status 是 complete,作业 no-op。

非幂等管线最经典的 bug:凌晨 5 点 cron 因 AWS Step Functions 一次瞬时 500 自动重试;入库跑了两次;每行基本面被复制;下游 P/E 计算 截面 均值翻倍,研究员一天后才发现。生产侧 修复 没有商量余地:​​任何入库作业都不允许写裸的 INSERT INTO <table> SELECT ... FROM <staging> 而不带去重机制。​ MERGE / snapshot-write / ledger 三选一,并通过代码评审强制执行。

Manifest 校验:永远不要相信「已关闭的文件」

供应商与消费方之间的契约是 manifest——按文件 / 按批次发布的一份小文档,记录行数、最小 / 最大日期、校验和(SHA-256 优于 MD5;两者都常见)、schema 版本号。消费方的工作是在 推到生产之前 校验全部四项:

# never promote a file to production without manifest-verify; a passing SFTP close() does not imply a complete file
import hashlib, json

def verify_manifest(file_path: str, manifest_path: str) -> None:
    manifest = json.load(open(manifest_path))
    h = hashlib.sha256()
    rows = 0
    with open(file_path, "rb") as f:
        for chunk in iter(lambda: f.read(1 << 20), b""):
            h.update(chunk)
            rows += chunk.count(b"\n")
    assert h.hexdigest() == manifest["sha256"],         "checksum mismatch"
    assert rows           == manifest["row_count"],      "row count mismatch"
    assert manifest["min_date"] <= manifest["max_date"], "date range invalid"
    assert is_compatible(manifest["schema_version"]),    "schema version unsupported"

业内三种对账模式:(i) ​inline manifest​​——供应商在每个文件旁边发布 <filename>.manifest<filename>.sha256;消费方下载两份就近校验。(ii) ​batch manifest​​——供应商按日发布 manifest.json 列出当日所有已交付文件 + 各自 SHA-256;消费方在检测到所有预期文件到位后下载批量 manifest、逐一校验、整批晋升。(iii) ​manifest-of-manifests​​——Compustat 全 历史 回填 / Wind 全市场 历史 批量 使用层级化 manifest;消费方走层级。

本课开篇的 bug 恰好就是「没有 manifest」案例:SFTP close() 返回成功,但文件 4.2 GB 而非 7.8 GB;.sha256 sidecar 是有的;只是脚本根本没读。永远不要相信「已关闭的文件」。

Schema 注册表 + Quarantine:永远不要静默强转

每个供应商数据集都在 schemas/<vendor>/<dataset>/v<n>.yaml 注册一份 schema——列名、列类型、可空性、取值范围、主键、版本号。入库作业在处理前加载 schema、按行校验;任一违规(未预期列、类型不匹配、取值越界、应非空 而为空)都把文件路由到 quarantine 前缀 + 触发告警:

1. load_schema(vendor, dataset, version)  — read schemas/<vendor>/<dataset>/v<n>.yaml
2. validate_row(row, schema)              — check column names + types + nullability + value-range; return list of violations
3. route_to_quarantine(file_path, violations) — move to quarantine/<vendor>/<dataset>/<date>/; emit alert to Slack / PagerDuty
4. bump_schema_version()                  — the data team's response when the violation is a legitimate vendor schema change
5. reject_file()                          — the data team's response when the violation is bad data

生产规则:​​永远不要静默强转;始终 quarantine + 告警;数据团队在 SLA 内分诊。​ 典型 SLA 是 4 个工作小时;数据团队在三种处置之间选择:(i) 升 schema 版本(供应商加了合法新列)、(ii) 拒绝文件(供应商发了坏数据)、(iii) 联系供应商(供应商未通知变更了 schema)。CN 一侧的告警 endpoint 把 PagerDuty / Slack 换成 ​飞书 / 钉钉 机器人​​——这是 CN 团队的标准做法。

非 quarantine 管线最经典的 bug:供应商在 Q3 发布中新加了一列 nonGAAPNetIncome;静默强转管线把这一列丢弃;基本面仓库静默 退化 持续两周,直到研究员注意到一条不相关 的 PnL 异常,回溯发现这一列丢失。

CN 供应商生态与权限治理 现实

供应商许可证在 账户 / IP / API key / 机构 维度上强制执行。把生态与权限治理 层背下来:

| tier                                | us                                                                | cn                                                                                  |
| production canonical (fundamentals) | Bloomberg + S&P Compustat                                          | Wind 万得                                                                            |
| production canonical (market data + news) | Bloomberg + Refinitiv / LSEG                                  | Wind + 通联数据 + 同花顺                                                              |
| affordable alternative              | Sharadar + Polygon.io + IEX Cloud                                  | Tushare Pro + Choice 数据 + 聚宽 / 米筐                                              |
| academic canonical                  | CRSP via WRDS                                                      | (no direct equivalent; academic CN research typically uses Wind / RESSET / CSMAR)   |
| analyst-estimate canonical          | I/B/E/S via Refinitiv + FactSet StarMine                           | 朝阳永续 一致预期 + Wind 一致预期                                                     |
| alt-data anchor                     | RavenPack + Planet Labs + Earnest                                  | 同花顺 舆情 + 万得 资讯 + 微博 / 雪球 / 东方财富 股吧 scrapes via 通联                  |

三条权限规则锚定 CN 一侧的许可证现实。​​Wind 万得 Terminal 是 per-seat(账户)​​——一个账号一个登录设备,Wind 强制设备指纹 + 并发会话;​​Wind API + WindPy 同样是 per-seat(账户)​​,并对并发进程做限制。​​Choice 数据 是 per-seat​​。​​Tushare Pro 是 per-API-key​​,积分制额度 + 收费接口。​​通联数据 datayes 是 per-API-key + 机构 授权​​。CN 私募 量化 团队 10-50 人规模的典型形态:3-5 个 Wind 万得 账户 + 一个集中的数据团队跑 SFTP + OSS 管线 + 给研究员个体配更便宜的账号(Choice / Tushare / 聚宽)。

数据团队的工作:每家供应商一个共享服务账号,把内部访问通过权限治理 组中转(例如 阿里云 RAM 角色 fundamentals.wind.dailynews.tonghuashun.firehosemarket-data.tonglian.l1),维护「谁读了什么」的审计日志以应对供应商审计。L2 的合规桥(另类数据的四闸 + MNPI / PII / GDPR-PIPL)在入库层继续有效;数据团队是运营 owner。CN 一侧的合规底色:​​PIPL 个人信息保护法 + 数据 安全 法 数据安全法 关闭跨境数据传输​​;数据团队把 CN 居民数据保留在境内,用 阿里云 OSS / 腾讯云 COS / 华为云 OBS 而非 AWS S3 实现 S3 模式。

工作示例:四个 ~30 行的参考实现

参考实现位于 pull_rest.pyconsume_websocket.pysftp_drop_watcher.pys3_drop_watcher.py。每个都展示该模式 + 幂等性 + schema 注册表 + manifest 校验。

pull_rest.py 拉 Tushare Pro 的 600519 / 000001 / 510300 日线,翻页消费 response,尊重限速,按 (symbol, date) MERGE 到 staging.daily_aggs_tushare。幂等性 键 = (tushare, daily_aggs, YYYYMMDD);同一日期 二次 调用 通过 MERGE 退化为 no-op。

consume_websocket.py 消费 Wind 实时 行情 / 通联数据 行情 推送,维持 session,断线后从最后一个序列号恢复,按 (symbol, sequence_no) 去重,把流冲到 Kafka topic tickers.wind.live 供下游消费。

sftp_drop_watcher.py 每五分钟轮询 Wind 数据 服务 SFTP endpoint,识别匹配 wind_qf_<YYYYMMDD>.zip 的新文件,下载到 staging,按 .sha256 sidecar 校验 SHA-256,按 schema 注册表加载,不匹配则 quarantine,成功则写 ledger 条目。开篇 bug 在结构上变得不可能——脚本在 校验 和 不匹配 时主动中止。

s3_drop_watcher.py 订阅 阿里云 OSS / 腾讯云 COS 的对象创建事件(消息队列 转发),事件到达时下载 Parquet 文件,按同 bucket 内供应商发布的 manifest 校验,按 schema 注册表加载。

五句话的纪律收尾:仓库里的每一字节都可追溯到一次有 manifest 的供应商投递;入库作业在 (vendor, dataset, vintage_id) 上幂等;schema 注册、强制、不匹配则 quarantine;供应商对账通过 manifest 不容妥协;权限治理 是数据团队的职责,不是研究台的职责。

A 股 的 T+1 结算 周期 与 证监会 信息披露 SLA 共同 决定 了 CN 入库管线的节奏窗口;订阅 / 接入 的交易成本(包括许可证 + 数据工程 + 维护)是数据团队预算中显眼的一项。

练习

Exercise

你为 CN 私募 量化 团队 设计三个供应商数据集的入库管线。对每一个,指定:(i) 投递模式(从 REST API pull / WebSocket / streaming push / SFTP file drop / S3 / object-storage drop 中选一);(ii) 幂等性 键 的三个组件;(iii) schema 注册表路径,以及至少两条示例行级校验规则;(iv) manifest 校验方式。

数据集 A:实时舆情 firehose,来自 同花顺 舆情——峰值 ~1000 events/sec。

数据集 B:每日 Wind 万得 基本面 批量文件,夜间投递。

数据集 C:每周卫星停车场计数数据集,来自 假设的 通联 卫星 数据,以 Parquet 文件投递到 阿里云 OSS。

把答案做成一张三行表。

提示
模式由数据形态决定。A 是实时 + 高频 + 供应商推 — WebSocket。B 是日级 + 批量 + 供应商推 — SFTP。C 是周度 + 文件 + 供应商以 OSS 事件推 — S3 / 对象存储 投递(在 CN 即阿里云 OSS)。
提示
幂等性 键,A:(tonghuashun, news_firehose, <session_id + sequence_no>) 用 序列号 去重;B:(wind, fundamentals_quarterly, YYYYMMDD) 用 MERGE;C:(tonglian, parking_lot_counts, YYYYMMDD) 用 snapshot-write。schema 路径:schemas/tonghuashun/news_firehose/v1.yaml 依此类推。

Formula Explorer

\text{throughput} = \text{rate\_limit} \cdot (1 - p_{\text{5xx}})

本课组装清单

本课按顺序组装的可核对件:

  • Inline-code table——四种投递模式映射到 主动方 / 用例 / 供应商 锚点。
  • Inline-code listing——幂等性 键 三元组 vendor / dataset / vintage_id
  • Fenced ```python 代码块——verify_manifest() 函数(流式 SHA-256 + 行数 + 日期范围 + schema 版本 四项断言)。
  • Inline-code listing——schema 注册表 + quarantine 工作流(load_schemavalidate_rowroute_to_quarantinebump_schema_versionreject_file)。
  • Inline-code table——跨区供应商生态六个层级(基本面主选、行情 + 新闻 主选、可负担替代、学术主选、一致预期 主选、另类数据 锚点)。
  • 练习——三个数据集跨四种模式的管线设计,含两条递进式 Hint。
  • FormulaExplorer——vintage-id 组合 batch_uuid ⊕ date_window

五句话的纪律收尾:仓库里的每一字节都可追溯到一次有 manifest 的供应商投递;入库作业在 (vendor, dataset, vintage_id) 上幂等;schema 注册、强制、不匹配则 quarantine;供应商对账通过 manifest 不容妥协;权限治理 是数据团队的职责,不是研究台的职责。

下一课

掌握四种入库模式与它们生成的完整性保证之后,下一课打开消费这些已校验投递的仓库架构——数据湖、列式仓库、时点视图、血缘图、权限治理。本课内化的 idempotencymanifest 纪律是 L4 期望的输入契约。