周二早上 8:47。某上海私募的股票策略组组长在数据团队飞书里点了 @:「我们 fundamentals_pit 表里 600519 以后的股票全部缺失。今早策略对沪深300 后 30% 的标的完全没头寸。」值班数据工程师调出昨晚的入库日志。Wind 数据 服务 SFTP 文件 02:14 完成下载——SFTP close() 返回成功、cron 日志写「入库 完成」、仓库表已更新。但文件是 4.2 GB,前五天的同名 drop 全是 7.8 ~ 8.1 GB。网络在传输中途断了;操作系统报告文件已正常关闭;不完整的文件被无告警地推到了生产。两天后复盘锁定根因:没有做 manifest 校验。Wind 每个文件都附带 .sha256 sidecar;入库脚本根本没读它。L1 + L2 给你装好的是基本面 + 另类数据的 what;本课要装入的是 how——四种投递模式、幂等性、schema 注册表、manifest 契约、权限治理 层——把上面这个 bug 在结构上变得不可能发生。
四种投递模式
任意一家数据供应商,都通过下面四种物理模式之一交付数据。模式由数据的形态决定(延迟要求、体量、批 vs 流),不由供应商的偏好决定。把模式匹配到数据集,80% 的 ingestion 工程决策就做完了:
| pattern | initiation | canonical use case | typical vendor anchor |
| REST API pull | consumer-initiated | on-demand reference + low-volume daily updates + historical backfills | Polygon.io / IEX Cloud / Tushare Pro / Nasdaq Data Link |
| WebSocket / streaming push | vendor-initiated | real-time market + news + social-media firehoses | Polygon WebSocket / Refinitiv RDP / Bloomberg MarketData Cloud / Wind 实时 行情 |
| SFTP file drop | vendor-initiated batch | batch fundamentals + credit-card + ESG + web-scraped | S&P Compustat / Sharadar / CRSP / Wind 数据 服务 SFTP |
| S3 / object-storage drop | vendor-initiated event-driven | modern alt-data + bulk historical + PIT fundamentals snapshots | Sharadar S3 / modern alt-data vendors / Aliyun OSS / Tencent COS |
REST API pull 是「消费方主动发起」模式:买方的 cron 作业(或 Airflow / Step Functions / Dagster 编排)打供应商 endpoint、翻页消费 response、尊重限速(HTTP 429 + Retry-After 头)、5xx 上指数退避,把数据落到 staging。典型场景是按需参考数据——标的主数据、公司行动——以及历史回填(首次部署时拉过去六个月)。Polygon.io 日级聚合、IEX Cloud 快照、Tushare Pro 接口、Nasdaq Data Link / Quandl 批量下载,CN 一侧的 Tushare Pro 接口、Wind API 历史拉取、通联 REST,都在这里。
WebSocket / streaming push 把方向反过来:供应商推送,消费方保持长连接、按到达顺序处理事件。契约是:维持 session、用序列号(或供应商提供的幂等键)去重、断线后从最后一个序列号恢复、下游消费跟不上时做反压。典型场景是实时行情(Polygon WebSocket / Refinitiv RDP streaming / Bloomberg MarketData Cloud / Wind 实时 行情 / 通联 行情 推送)以及实时新闻 / 舆情 流(RavenPack live / Twitter X Decahose / 同花顺 舆情 推送)。
SFTP file drop 是经典批量投递模式:供应商按日 / 时 / 周节奏把文件 推到 SFTP;消费方每 N 分钟轮询、列目录、按 名字 / 时间戳 / 大小 模式识别新文件、下载、校验 校验和、落到数据湖的 raw-vendor 分区、向下游清洗管线发信号。典型场景:批量基本面(Compustat / Sharadar / CRSP / Wind 数据 服务 SFTP)、批量信用卡(Earnest)、批量 ESG(Sustainalytics / WIND ESG 评级 批量交付)、批量爬取数据(Burning Glass)。
S3 / object-storage drop 是 2024+ 时代另类数据的默认模式:供应商把 Parquet 文件 drop 到共享 S3 / 阿里云 OSS / 腾讯云 COS / MinIO bucket;消费方的 IAM 凭据有读权限;消费方订阅 s3:ObjectCreated:* 事件(通过 SNS + SQS / EventBridge)做事件响应。完整性纪律与 SFTP 完全一致——校验和、行数、schema 校验——但投递机制从轮询变成了事件驱动。在 CN 一侧,阿里云 OSS / 腾讯云 COS / 华为云 OBS 取代 AWS S3,因 PIPL + 数据 安全 法 要求 CN 居民数据境内存储。
幂等性:一条管线最重要的属性
每个入库作业由一个三元组的自然主键定位,用同一个键再跑一次必须是 no-op 或受控覆写——绝不能出现重复插入:
1. vendor — the vendor identifier (`bloomberg` / `compustat` / `wind` / `tushare` / `polygon`)
2. dataset — the specific dataset name (`fundamentals_quarterly` / `news_firehose` / `daily_aggs`)
3. vintage_id — the vendor-supplied or consumer-derived batch identifier (typically a date-window or a vendor-supplied batch UUID)
生产规则:每个入库作业在 (vendor, dataset, vintage_id) 上保持幂等;用同一个键再跑一次必须是 no-op 或受控覆写,绝不能出现重复插入。
满足规则的三种实现:(i) MERGE——仓库 原生 的 MERGE INTO <table> USING <staging> ON <natural_key> upsert 语句;Snowflake / BigQuery / Postgres / ClickHouse 都支持。(ii) Iceberg / Delta snapshot-write——入库作业写一个新的 snapshot;读端看到最新 snapshot;旧 snapshot 仍可时间旅行查询。(iii) 入库账本(ingestion ledger)——一张 ingestion_log(vendor, dataset, vintage_id, status, ingested_at, file_path, row_count) 表,作业开始前查询;如果 status 是 complete,作业 no-op。
非幂等管线最经典的 bug:凌晨 5 点 cron 因 AWS Step Functions 一次瞬时 500 自动重试;入库跑了两次;每行基本面被复制;下游 P/E 计算 截面 均值翻倍,研究员一天后才发现。生产侧 修复 没有商量余地:任何入库作业都不允许写裸的 INSERT INTO <table> SELECT ... FROM <staging> 而不带去重机制。 MERGE / snapshot-write / ledger 三选一,并通过代码评审强制执行。
Manifest 校验:永远不要相信「已关闭的文件」
供应商与消费方之间的契约是 manifest——按文件 / 按批次发布的一份小文档,记录行数、最小 / 最大日期、校验和(SHA-256 优于 MD5;两者都常见)、schema 版本号。消费方的工作是在 推到生产之前 校验全部四项:
# never promote a file to production without manifest-verify; a passing SFTP close() does not imply a complete file
import hashlib, json
def verify_manifest(file_path: str, manifest_path: str) -> None:
manifest = json.load(open(manifest_path))
h = hashlib.sha256()
rows = 0
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(1 << 20), b""):
h.update(chunk)
rows += chunk.count(b"\n")
assert h.hexdigest() == manifest["sha256"], "checksum mismatch"
assert rows == manifest["row_count"], "row count mismatch"
assert manifest["min_date"] <= manifest["max_date"], "date range invalid"
assert is_compatible(manifest["schema_version"]), "schema version unsupported"
业内三种对账模式:(i) inline manifest——供应商在每个文件旁边发布 <filename>.manifest 或 <filename>.sha256;消费方下载两份就近校验。(ii) batch manifest——供应商按日发布 manifest.json 列出当日所有已交付文件 + 各自 SHA-256;消费方在检测到所有预期文件到位后下载批量 manifest、逐一校验、整批晋升。(iii) manifest-of-manifests——Compustat 全 历史 回填 / Wind 全市场 历史 批量 使用层级化 manifest;消费方走层级。
本课开篇的 bug 恰好就是「没有 manifest」案例:SFTP close() 返回成功,但文件 4.2 GB 而非 7.8 GB;.sha256 sidecar 是有的;只是脚本根本没读。永远不要相信「已关闭的文件」。
Schema 注册表 + Quarantine:永远不要静默强转
每个供应商数据集都在 schemas/<vendor>/<dataset>/v<n>.yaml 注册一份 schema——列名、列类型、可空性、取值范围、主键、版本号。入库作业在处理前加载 schema、按行校验;任一违规(未预期列、类型不匹配、取值越界、应非空 而为空)都把文件路由到 quarantine 前缀 + 触发告警:
1. load_schema(vendor, dataset, version) — read schemas/<vendor>/<dataset>/v<n>.yaml
2. validate_row(row, schema) — check column names + types + nullability + value-range; return list of violations
3. route_to_quarantine(file_path, violations) — move to quarantine/<vendor>/<dataset>/<date>/; emit alert to Slack / PagerDuty
4. bump_schema_version() — the data team's response when the violation is a legitimate vendor schema change
5. reject_file() — the data team's response when the violation is bad data
生产规则:永远不要静默强转;始终 quarantine + 告警;数据团队在 SLA 内分诊。 典型 SLA 是 4 个工作小时;数据团队在三种处置之间选择:(i) 升 schema 版本(供应商加了合法新列)、(ii) 拒绝文件(供应商发了坏数据)、(iii) 联系供应商(供应商未通知变更了 schema)。CN 一侧的告警 endpoint 把 PagerDuty / Slack 换成 飞书 / 钉钉 机器人——这是 CN 团队的标准做法。
非 quarantine 管线最经典的 bug:供应商在 Q3 发布中新加了一列 nonGAAPNetIncome;静默强转管线把这一列丢弃;基本面仓库静默 退化 持续两周,直到研究员注意到一条不相关 的 PnL 异常,回溯发现这一列丢失。
CN 供应商生态与权限治理 现实
供应商许可证在 账户 / IP / API key / 机构 维度上强制执行。把生态与权限治理 层背下来:
| tier | us | cn |
| production canonical (fundamentals) | Bloomberg + S&P Compustat | Wind 万得 |
| production canonical (market data + news) | Bloomberg + Refinitiv / LSEG | Wind + 通联数据 + 同花顺 |
| affordable alternative | Sharadar + Polygon.io + IEX Cloud | Tushare Pro + Choice 数据 + 聚宽 / 米筐 |
| academic canonical | CRSP via WRDS | (no direct equivalent; academic CN research typically uses Wind / RESSET / CSMAR) |
| analyst-estimate canonical | I/B/E/S via Refinitiv + FactSet StarMine | 朝阳永续 一致预期 + Wind 一致预期 |
| alt-data anchor | RavenPack + Planet Labs + Earnest | 同花顺 舆情 + 万得 资讯 + 微博 / 雪球 / 东方财富 股吧 scrapes via 通联 |
三条权限规则锚定 CN 一侧的许可证现实。Wind 万得 Terminal 是 per-seat(账户)——一个账号一个登录设备,Wind 强制设备指纹 + 并发会话;Wind API + WindPy 同样是 per-seat(账户),并对并发进程做限制。Choice 数据 是 per-seat。Tushare Pro 是 per-API-key,积分制额度 + 收费接口。通联数据 datayes 是 per-API-key + 机构 授权。CN 私募 量化 团队 10-50 人规模的典型形态:3-5 个 Wind 万得 账户 + 一个集中的数据团队跑 SFTP + OSS 管线 + 给研究员个体配更便宜的账号(Choice / Tushare / 聚宽)。
数据团队的工作:每家供应商一个共享服务账号,把内部访问通过权限治理 组中转(例如 阿里云 RAM 角色 fundamentals.wind.daily、news.tonghuashun.firehose、market-data.tonglian.l1),维护「谁读了什么」的审计日志以应对供应商审计。L2 的合规桥(另类数据的四闸 + MNPI / PII / GDPR-PIPL)在入库层继续有效;数据团队是运营 owner。CN 一侧的合规底色:PIPL 个人信息保护法 + 数据 安全 法 数据安全法 关闭跨境数据传输;数据团队把 CN 居民数据保留在境内,用 阿里云 OSS / 腾讯云 COS / 华为云 OBS 而非 AWS S3 实现 S3 模式。
工作示例:四个 ~30 行的参考实现
参考实现位于 pull_rest.py、consume_websocket.py、sftp_drop_watcher.py、s3_drop_watcher.py。每个都展示该模式 + 幂等性 + schema 注册表 + manifest 校验。
pull_rest.py 拉 Tushare Pro 的 600519 / 000001 / 510300 日线,翻页消费 response,尊重限速,按 (symbol, date) MERGE 到 staging.daily_aggs_tushare。幂等性 键 = (tushare, daily_aggs, YYYYMMDD);同一日期 二次 调用 通过 MERGE 退化为 no-op。
consume_websocket.py 消费 Wind 实时 行情 / 通联数据 行情 推送,维持 session,断线后从最后一个序列号恢复,按 (symbol, sequence_no) 去重,把流冲到 Kafka topic tickers.wind.live 供下游消费。
sftp_drop_watcher.py 每五分钟轮询 Wind 数据 服务 SFTP endpoint,识别匹配 wind_qf_<YYYYMMDD>.zip 的新文件,下载到 staging,按 .sha256 sidecar 校验 SHA-256,按 schema 注册表加载,不匹配则 quarantine,成功则写 ledger 条目。开篇 bug 在结构上变得不可能——脚本在 校验 和 不匹配 时主动中止。
s3_drop_watcher.py 订阅 阿里云 OSS / 腾讯云 COS 的对象创建事件(消息队列 转发),事件到达时下载 Parquet 文件,按同 bucket 内供应商发布的 manifest 校验,按 schema 注册表加载。
五句话的纪律收尾:仓库里的每一字节都可追溯到一次有 manifest 的供应商投递;入库作业在 (vendor, dataset, vintage_id) 上幂等;schema 注册、强制、不匹配则 quarantine;供应商对账通过 manifest 不容妥协;权限治理 是数据团队的职责,不是研究台的职责。
A 股 的 T+1 结算 周期 与 证监会 信息披露 SLA 共同 决定 了 CN 入库管线的节奏窗口;订阅 / 接入 的交易成本(包括许可证 + 数据工程 + 维护)是数据团队预算中显眼的一项。
练习
Exercise
你为 CN 私募 量化 团队 设计三个供应商数据集的入库管线。对每一个,指定:(i) 投递模式(从 REST API pull / WebSocket / streaming push / SFTP file drop / S3 / object-storage drop 中选一);(ii) 幂等性 键 的三个组件;(iii) schema 注册表路径,以及至少两条示例行级校验规则;(iv) manifest 校验方式。
数据集 A:实时舆情 firehose,来自 同花顺 舆情——峰值 ~1000 events/sec。
数据集 B:每日 Wind 万得 基本面 批量文件,夜间投递。
数据集 C:每周卫星停车场计数数据集,来自 假设的 通联 卫星 数据,以 Parquet 文件投递到 阿里云 OSS。
把答案做成一张三行表。
提示
提示
(tonghuashun, news_firehose, <session_id + sequence_no>) 用 序列号 去重;B:(wind, fundamentals_quarterly, YYYYMMDD) 用 MERGE;C:(tonglian, parking_lot_counts, YYYYMMDD) 用 snapshot-write。schema 路径:schemas/tonghuashun/news_firehose/v1.yaml 依此类推。Formula Explorer
\text{throughput} = \text{rate\_limit} \cdot (1 - p_{\text{5xx}})本课组装清单
本课按顺序组装的可核对件:
- Inline-code table——四种投递模式映射到 主动方 / 用例 / 供应商 锚点。
- Inline-code listing——幂等性 键 三元组
vendor/dataset/vintage_id。 - Fenced ```python 代码块——
verify_manifest()函数(流式 SHA-256 + 行数 + 日期范围 + schema 版本 四项断言)。 - Inline-code listing——schema 注册表 + quarantine 工作流(
load_schema、validate_row、route_to_quarantine、bump_schema_version、reject_file)。 - Inline-code table——跨区供应商生态六个层级(基本面主选、行情 + 新闻 主选、可负担替代、学术主选、一致预期 主选、另类数据 锚点)。
- 练习——三个数据集跨四种模式的管线设计,含两条递进式 Hint。
- FormulaExplorer——vintage-id 组合
batch_uuid ⊕ date_window。
五句话的纪律收尾:仓库里的每一字节都可追溯到一次有 manifest 的供应商投递;入库作业在 (vendor, dataset, vintage_id) 上幂等;schema 注册、强制、不匹配则 quarantine;供应商对账通过 manifest 不容妥协;权限治理 是数据团队的职责,不是研究台的职责。
下一课
掌握四种入库模式与它们生成的完整性保证之后,下一课打开消费这些已校验投递的仓库架构——数据湖、列式仓库、时点视图、血缘图、权限治理。本课内化的 idempotency 与 manifest 纪律是 L4 期望的输入契约。