下午 15:30 CST,某 A 股 量化 私募。沪深 收盘 加 15:00 行情 结算 落定 之后 半 小时,行情 vendor 的 tick_510300_20260523.csv.gz 落 在 共享 挂载 /data/market-data/ 上。cron 调起 ingest_ticks.sh。接下来 九十 秒 内,文件 必须 被 加载 到 暂存 表、按 三 条 确定性 校验 校验、汇总 成 1 分钟 bar 进 生产 事实 表、刷新 规划器 统计,并 每 阶段 向 stdout 发 一行 结构化 JSON 日志,让 明天 的 可观测 性 栈 能 显示 『昨日 ingest 87 秒 成功 完成、产出 620 万 bar 行』。明天 的 研究 查询 依赖 这条 管道 干净 收尾。本课 把 L1 的 SQL、L2 的 模式 与 索引、L3 的 TSDB 决策 规则 组合 成 一条 队 友 可以 排程 后 不必 再 操心 的 日终 管道,给 整个 模块 收口。
暂存、校验、提升
每条 可靠 数据 管道 的 形状 都 是 暂存-校验-提升。把 原始 文件 落 进 一张 没人 拿 来 回测 的 暂存 表;跑 廉价 的 校验 查询 抓 明显 损坏;只有 校验 通过 后 才 把 行 提升 到 生产 事实 表。绝不 直接 从 文件 INSERT 进 生产 事实 表——坏 数据 一旦 进 去,把 它们 清出去 的 成本 比 重 跑 的 成本 大。
CREATE TABLE IF NOT EXISTS ticks_raw (
symbol TEXT NOT NULL,
ts TIMESTAMPTZ NOT NULL,
price NUMERIC(18,6) NOT NULL,
size BIGINT NOT NULL,
side CHAR(1) NOT NULL,
source_file TEXT NOT NULL,
ingested_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
source_file 与 ingested_at 是 审计 字段——每一行 记录 是 哪个 文件 在 何时 灌 进 来 的。暂存 表 允许 出现 重复 与 半 灌 入;校验 这一 步 之 所以 存在,正是 让 那些 永远 到 不了 bars_1m。
第 1 步——用 \copy 落 文件
把 CSV.gz 灌 进 暂存 的 两 种 等价 方式。psql 侧:
\copy ticks_raw (symbol, ts, price, size, side, source_file) FROM PROGRAM 'gunzip -c /data/market-data/tick_510300_20260523.csv.gz' WITH (FORMAT csv, HEADER true);
\copy 是 客户 端 形式,把 stdin 流 进 服务 端 的 COPY——它 不 需要 数据库 服务 进程 对 你的 挂载 有 文件 系统 级 访问 权限,并 且 GB 级 文件 也 能 流式 通过 协议线。Python 通过 psycopg 3 的 等价 写法:
import gzip, psycopg
with psycopg.connect(os.environ["DATABASE_URL"]) as conn, conn.cursor() as cur:
with gzip.open(path, "rt") as fh, cur.copy(
"COPY ticks_raw (symbol, ts, price, size, side, source_file) FROM STDIN WITH (FORMAT csv, HEADER true)"
) as copy:
for line in fh:
copy.write(line)
cur.copy(...) 返回 一个 上下文 管理 器,write 直接 把 块 流 进 服务 端 COPY——当 加载 器 用 Python 而 非 SQL 脚本 写 时 的 标准 模式。
第 2 步——用 三 条 确定性 查询 校验
三 条 SELECT,每条 返回 一个 数;编排 shell 读 每个 数、失败 就 中断。
-- (1) row count > 0
SELECT COUNT(*) FROM ticks_raw WHERE source_file = :'source_file';
-- (2) zero NULLs in required columns
SELECT SUM(CASE WHEN symbol IS NULL OR ts IS NULL OR price IS NULL OR size IS NULL OR side IS NULL THEN 1 ELSE 0 END) FROM ticks_raw WHERE source_file = :'source_file';
-- (3) zero duplicate (symbol, ts) keys
SELECT COUNT(*) - COUNT(DISTINCT (symbol, ts)) FROM ticks_raw WHERE source_file = :'source_file';
查询 1 必须 > 0;查询 2、3 必须 恰好 = 0。任何 一 条 失败,脚本 发 一行 结构化 错误 日志 并 退出 非零。psql -tA -v source_file="$SOURCE_FILE" -f validate.sql 以 tuples-only-加-unaligned 模式 跑,让 bash 拿到 不 带 表头 与 对齐 空白 的 干净 数字。
第 3 步——把 行 提升 进 bar,并 保持 幂等
汇总 查询 把 校验 过 的 tick 按 1 分钟 桶 折叠。最 干净 的 写法:CTE 用 ROW_NUMBER 选 每 桶 首 / 末 tick,然后 INSERT ... ON CONFLICT 做 提升。
WITH rolled AS (
SELECT
symbol,
date_trunc('minute', ts) AS bucket,
MIN(price) AS low,
MAX(price) AS high,
SUM(size) AS volume,
(ARRAY_AGG(price ORDER BY ts ASC))[1] AS open,
(ARRAY_AGG(price ORDER BY ts DESC))[1] AS close
FROM ticks_raw
WHERE source_file = :'source_file'
GROUP BY symbol, date_trunc('minute', ts)
)
INSERT INTO bars_1m (symbol, ts, open, high, low, close, volume)
SELECT symbol, bucket, open, high, low, close, volume FROM rolled
ON CONFLICT (symbol, ts) DO NOTHING;
ON CONFLICT (symbol, ts) DO NOTHING 让 管道 幂等:在 同 一日期 重 跑 产生 同 一份 bars_1m,第二 次 insert 是 no-op,而 不是 主键 冲突。明 说 规则:量化 管道 里 每一个 提升 步 都 必须 幂等——对 同 一 输入 的 重 跑 必须 产生 同 一 输出,不 报错,不 重复 计数。 这 也 是 回填 重 跑 两 周 文件 的 安全 保障。
第 4 步——刷新 规划器 统计
ANALYZE bars_1m;
不 跑 这 一步,下 一条 回测 查询 用 的 是 不 知道 你 刚 插入 的 行 的 过期 规划器 统计。从 L2 看,这是 单 一 杠杆 最高 的 习惯;它 出现 在 管道 里,是 因为 大批 加载 模式 正是 统计 过期 的 高 发 场景。
第 5 步——每 阶段 一行 结构化 JSON 日志
每个 阶段 向 stdout 发 一行 JSON。bash 助手 是 一 个 log() 函数,调 printf,格式 串 是 {"ts":"%s","stage":"%s","status":"%s","rows":%s,"source_file":"%s"}\n,依次 喂 五 个 位置 参数:UTC 时间戳 $(date -u +%Y-%m-%dT%H:%M:%SZ)、$1、$2、${3:-0}、$SOURCE_FILE。脚本 按 顺序 发 五 次 调用:log stage_load ok "$rows_loaded"、log stage_validate ok "$rows_validated"、log stage_promote ok "$rows_promoted"、log stage_analyze ok 0、log run_done ok "$rows_promoted"。
五 个 JSON 键(ts、stage、status、rows、source_file),五 个 阶段 标签(stage_load、stage_validate、stage_promote、stage_analyze、run_done)。每 阶段 一行 JSON 的 形状 正 是 通往 3.6.6 可 观测 性 故事 的 前指 信号——本课 只 发 这些 行,不 做 聚合 与 展示。
编排 shell 脚本
七 行 头部 镜像 3.6.1 的 L4 ingest 脚本,每行 单 独 列 出 便于 逐 行 读:
#!/usr/bin/env bash
set -euo pipefail
IFS=$'\n\t'
DATE="${1:-$(date -u -d 'yesterday' +%Y%m%d)}"
SOURCE_FILE="/data/market-data/tick_510300_${DATE}.csv.gz"
WORK_DIR="$(mktemp -d)"
trap 'rm -rf "$WORK_DIR"' EXIT INT TERM
set -euo pipefail 让 未 处理 错误 致命;IFS 重置 把 词分割 限制 到 换行 与 tab;DATE 默认 取 UTC 昨天 的 %Y%m%d;WORK_DIR 是 本 次 运行 的 临时 目录;trap ... EXIT INT TERM 在 任何 退出 路径(含 SIGINT / SIGTERM)都 把 临时 目录 清 干净。四 个 psql 调用 跟在 头部 后面,按 这 个 顺序:
psql -v ON_ERROR_STOP=1 -v source_file="$SOURCE_FILE" -f load.sql——跑\copy进ticks_raw。psql -tA -v ON_ERROR_STOP=1 -v source_file="$SOURCE_FILE" -f validate.sql——捕获 三 条 校验 计数;-tA是 tuples-only + unaligned。psql -v ON_ERROR_STOP=1 -v source_file="$SOURCE_FILE" -f promote.sql——做 幂等 汇总 进bars_1m。psql -v ON_ERROR_STOP=1 -c 'ANALYZE bars_1m;'——刷新 规划器 统计。
psql -v ON_ERROR_STOP=1 -v source_file="$SOURCE_FILE" -f load.sql
psql -tA -v ON_ERROR_STOP=1 -v source_file="$SOURCE_FILE" -f validate.sql
psql -v ON_ERROR_STOP=1 -v source_file="$SOURCE_FILE" -f promote.sql
psql -v ON_ERROR_STOP=1 -c 'ANALYZE bars_1m;'
-v ON_ERROR_STOP=1 不是 可选——少了 它,失败 的 INSERT 会 被 静默 记录、脚本 退出 0,这是 最 糟糕 的 管道 失败:cron / systemd 以为 run 成功 了。
TimescaleDB 变种——恰好 两 处 改动
相对 普通 Postgres 的 diff 恰好 是 两 处 改动:(1) 在 迁移 里 的 CREATE TABLE bars_1m 之后 调 一次 SELECT create_hypertable('bars_1m', 'ts', chunk_time_interval => INTERVAL '7 days');,以及 (2) 汇总 查询 的 桶 由 date_trunc('minute', ts) 改为 time_bucket(INTERVAL '1 minute', ts)。其他 一切——\copy、validate、INSERT ... ON CONFLICT、ANALYZE、结构化 日志——保持 不变。这 正 是 L3 决策 规则 所 依赖 的 低 迁移 摩擦 论 据。
迁移 是 代码、不是 对话
模式 变更——CREATE TABLE bars_1m、L2 的 四 条 CREATE INDEX、Timescale 变种 的 SELECT create_hypertable(...)、instrument / calendar 维度 的 种子 INSERT——通过 git 受 控 的 migrations/ 目录 下 的 迁移 文件 进行,按 3.6.2 的 评审 工作 流。Python 栈 标准 工具 是 alembic:alembic revision -m "add bars_1m and indexes",编辑 生成 的 文件,alembic upgrade head 应用。Java 栈 用 flyway;SQL 优先 的 团队 用 sqitch。明 说 规则:到 生产 的 模式 变更 来自 受 评审 的 受 版本 管理 的 迁移 文件;对 生产 直接 跑 psql -c "ALTER TABLE ..." 是 bug、不是 fix。 凭据 放 ~/.pgpass,权限 0600(3.6.1 已 讲),永远 不 在 代码 里、永远 不 在 受 git 跟踪 的 .env 里。一条 补充:A 股 量化 团队 的 数据 工程 与 量化 研究 通常 共同 维护 此类 一日 一次 EoD 批处理;本课 范围 在 任何 已 部署 PostgreSQL / TimescaleDB / PolarDB-O 的 firm-internal 环境 中 均 可 直接 落地。
这些 模式 在 运维 层 为 何 重要
三 条 规则 撑 起 整 个 形状。暂存-校验-提升 不容 商量,因 为 错误 在 暂存 表 上 便宜、在 事实 表 上 昂贵。ticks_raw 里 一条 坏 行 一个 TRUNCATE 就 消失;bars_1m 里 一条 坏 行 会 污染 之后 每 一条 命中 那 一分钟 的 回测,直到 有人 注意、追查、写 取证 DELETE、重 跑。两 层 清理 成本 的 这 种 不 对称,正 是 即便 文件 看起来 完美 也 要 留 暂存 表 的 理由。
幂等 是 让 你 周日 夜里 能 睡觉 的 运维 纪律。周五 一 次 失败 跑 是 行业 常态——vendor 文件 晚 了 90 秒、行情 网关 传 了 一条 重复 行、磁盘 在 \copy 中 段 满 了。幂等 管道 让 你 周一 早 简单 重 跑 ./ingest_ticks.sh 20260522 就 收 工;不 幂等 的 管道 逼 你 取证 调查 哪 一步 部分 跑 完 了。ON CONFLICT (symbol, ts) DO NOTHING 是 最 小 充分 机制;更 复杂 的 upsert 形式(ON CONFLICT ... DO UPDATE SET close = EXCLUDED.close, ...)会 用 新 值 替换 已有 行,是 当 行情 商 晚到 的 tick 修正 会 改 历史 bar 时 的 正确 选 择——但 DO NOTHING 是 正确 默认,因 为 多数 ingest 一次 性 完成。
结构化 日志 在 每个 阶段 都 发,是 通往 可 观测 性 的 最 廉价 前置 信号。每 行 JSON 携带 ts、stage、status、rows、source_file——足够 明天 的 Grafana 仪表板 在 每个 source_file 维度 画 ingest 延迟、行数、失败 率。形状 镜像 12-factor 应用 约定:写 到 stdout,绝 不 写 到 必须 你 自己 滚转 的 文件,也 绝 不 写 到 一 个 管道 必须 依赖 的 远 程 服务。3.6.6 拿 这些 行 上 一 层 日志 聚合,然后 在 上面 搭 仪表板。
Python 版 加载 器 草图
数据 流水线 在 部分 团队 不 是 bash + SQL 文件 编排,而是 直接 Python。同样 五 步 在 psycopg 3 上 落到 30 行:开 一 条 连接,BEGIN 一 个 事务,cur.copy() 把 文件 灌 进 ticks_raw,跑 三 条 校验 查询、任何 一 条 非 0 就 raise、跑 INSERT ... ON CONFLICT 汇总、跑 ANALYZE bars_1m、conn.commit(),发 同样 五 行 JSON 日志。事务 包裹 让 校验 失败 整体 回滚——绝不 会 在 bars_1m 里 留 下 半 提升 状态。Python 加载 器 在 量化 研究 团队 直接 维护 仓库 的 私募 比较 常见;bash + psql 加载 器 在 数据 工程 团队 独立 维护、且 偏 SQL 文件 走 代码 评审 的 公司 占 主流。一 条 提醒:A 股 量化 团队 多 数 走 后 一种 形态,把 模式 与 加载 SQL 文件 都 当 受 评审 的 代码 资产 在 内部 Git 中 长期 维护。
纪律 总结
每个 数据 集 都 暂存;提升 之前 廉价 校验;通过 ON CONFLICT (symbol, ts) DO NOTHING 让 每个 提升 幂等;大批 加载 之 后 ANALYZE;每个 阶段 一 行 结构化 JSON 日志;失败 退出 非零;模式 变更 走 受 评审 的 迁移 文件,绝不 在 生产 上 临时 ALTER TABLE。这条 capstone 之后,模块 收口——你 会 写 SQL(L1)、能 设计 + 索引 + 诊断 一 张 模式(L2)、能 为 工作 量 挑 正确 的 存储 引擎(L3),并 能 立 起 一条 可 跑 的 日 终 ingest,让 可 观测 性 层(3.6.6)与 消息 层(3.6.4)在 它 之 上 继续 搭建。
本课构件清单。Fenced ```sql 代码 块:ticks_raw 暂存 表 DDL;三 条 按 :'source_file' 参数化 的 校验 查询;幂等 INSERT ... ON CONFLICT (symbol, ts) DO NOTHING 汇总。Bash 七 行 头部(从 #!/usr/bin/env bash 到 trap ... EXIT INT TERM)加 四 条 psql -v ON_ERROR_STOP=1 ... 调用。结构化 JSON log() 助手。Inline-code:TimescaleDB 两 行 diff。一 个 Exercise。Two Hints。市场 数据 锚 在 A 股 沪深 300 的 '510050'、'510500'、'510300' 在 上证 / 深证 / CFFEX 受 监管 的 私募 / 量化 私募 体系,T+1 结算 与 涨跌停 制度,对应 SSE / SZSE 流通 的 50ETF 与 300ETF。
练习
Exercise
撰写 并 跑 通 一条 完整 日 终 tick 摄入 管道。从 空 Postgres 数据库 出发:(a) 写 一个 alembic(或 等价 工具)迁移,建 ticks_raw、bars_1m(带 PRIMARY KEY (symbol, ts))、L2 的 instrument、calendar、corp_action 维度 表,以 及 L2 的 四 条 索引(idx_bars_symbol_ts_inc、idx_bars_ts_brin、idx_instr_active、idx_corp_symbol_date)。(b) 写 load.sql,内 含 \copy ticks_raw (symbol, ts, price, size, side, source_file) FROM PROGRAM 'gunzip -c :source_file' WITH (FORMAT csv, HEADER true); 调用。(c) 写 validate.sql,内 含 本课 三 条 校验 查询,按 :source_file 参数化。(d) 写 promote.sql,内 含 本课 的 幂等 INSERT INTO bars_1m ... ON CONFLICT (symbol, ts) DO NOTHING 汇总。(e) 写 ingest_ticks.sh:本课 的 七 行 头部 加 四 条 psql -v ON_ERROR_STOP=1 ... 调用 加 结构化 JSON 日志 助手。(f) 在 /data/market-data/ 上 种 一份 样本 压缩 tick 文件,至少 100k rows、跨 three symbols、一个 交易 日;连 跑 两 次 ./ingest_ticks.sh <YYYYMMDD>;确认 第 二 次 跑 产生 与 第 一 次 相同 的 bars_1m 内容(幂等),并 每 次 各 发 five structured-JSON log lines。(g) 用 两 行 写 出 仓库 改 成 TimescaleDB 时 到底 改 什么。
提示
promote.sql 里 的 ON CONFLICT (symbol, ts) DO NOTHING,加上 让 load.sql 对 同 一 source_file 可 重 灌(暂存 表 允许 持有 重复;校验 上 的 重复 (symbol, ts) 键 校 验 会 抓 它们)。提示
SELECT create_hypertable('bars_1m', 'ts', chunk_time_interval => INTERVAL '7 days');,加上 汇总 里 date_trunc('minute', ts) 改 time_bucket(INTERVAL '1 minute', ts)。bash 脚本、校验 SQL、\copy 都 不 改。