← 返回模块
3.6.4.2beta 可读 · 未来付费校验通过内容版本 2026-05-24

量化管道中的 Kafka

3.6.4 · 行情数据的消息与流式处理 · 编程

上海一家 私募 的数据工程主管在晨会上只放了一张幻灯片:沪深300 ETF 的 ticker plant 已经连续运行 19 个月没有漏一条 tick。那张图背后只有一件基础设施:三节点 Kafka 集群、按 symbol 做 partition key、replication factor 3、min.insync.replicas=2,再加一个「处理完再 commit」的 Python feed handler。本课每一项选择都是这张图上的一个旋钮;上一课在抽象层给每根轴起好了名字(耐久性、去重、分区、提交语义)。本课要把每根轴落到具体的 Kafka 配置值上——你接第一个 feed handler 那天就会敲进 Producer({...})Consumer({...}) 字典里的那些字面量;而 3.6 之后所有模块都假设 broker 就是 Kafka。

Kafka 的结构是什么

Kafka 是一个​​复制 + 分区 + 持久化​​的提交日志,​​不是​​一个队列。差别要紧:队列语义是「消费者读一条,消息就没了」;提交日志语义是「消费者在 offset N 读一条,这条仍在日志里;另一个 group 里的消费者可以独立读它;同一个或第三个消费者过一会儿还能重新读」。这就是为什么 Kafka 能 replay(新消费者加入、从 offset 0 开始读)、为什么一个 group 里的慢消费者不会阻塞另一个 group 里的快消费者。四个基本原语:

  • topic — 一个具名的 append-only 日志,通过 kafka-topics.sh --create --topic ticks.sse.510300 --partitions 16 --replication-factor 3 创建。名字是地址;分区数固定并行度;副本因子固定耐久性。
  • partition — 磁盘上的一条完全有序日志;一个 16 分区的 topic 就是 16 条独立日志。顺序保证只在分区​​内部​​成立,跨分区​​不​​成立。
  • replica + ISR — 每个分区在 replication_factor 个 broker 上有副本;其中一个是 leader(处理读写),其它是 follower(从 leader 同步);当前与 leader 同步的副本集合叫 in-sync replica (ISR)。acks=all 的写要等 ISR 全部 ack。规则:replication_factor=3 + min.insync.replicas=2 能扛住一个 broker 宕机不丢数据。
  • offset — 分区内单调递增的 64 位下标。消费者从一个起始 offset 开始读(通过 auto.offset.reset='earliest''latest' 配置),不断推进;下一条要读的 offset 提交回 Kafka 内置的 __consumer_offsets topic。

创建 topic

任何量化 Kafka 管道的第一行都是 topic-create 脚本,按 3.6.2 进 git 并保持可重复执行。行情 topic 的标准调用:

kafka-topics.sh --bootstrap-server broker:9092 --create --topic ticks.sse.510300 --partitions 16 --replication-factor 3 --config min.insync.replicas=2 --config retention.ms=604800000

16 个分区给你 16 路消费者并行(一个分区被一个 group 内消费者拥有)。replication factor 3 + min.insync.replicas=2 是「扛住一个 broker 宕机不丢数据」的耐久性默认。retention.ms=604800000 是 7 天;A-股 量化 行情 topic 经常 30 天或 90 天以便回测重放与合规存档。

Producer:七个要紧的旋钮

confluent-kafka Python 客户端(标准绑定;纯 Python 的 kafka-python 在原型阶段能用,但协议版本落后、延迟开销可测量)拿一个字典做配置,通过 producer.produce(...) 发消息。量化 行情 入库的七个要紧旋钮:

from confluent_kafka import Producer
import json, time

producer = Producer({
    'bootstrap.servers': 'broker:9092',
    'acks': 'all',
    'enable.idempotence': True,
    'linger.ms': 5,
    'compression.type': 'lz4',
    'max.in.flight.requests.per.connection': 5,
    'batch.size': 65536,
})

producer.produce(
    topic='ticks.sse.510300',
    key=symbol.encode(),
    value=json.dumps(tick).encode(),
    callback=delivery_cb,
)
producer.flush()

acks='all' 等​​全部​​同步副本 ack 后再返回——这是耐久性的来源;acks=1 仅等 leader(更快,但 leader 在 replication 之前崩了就丢消息);acks=0 是发了不管(只用于可丢失的 telemetry)。enable.idempotence=True 让生产者为每个 batch 分配 producer-id 与每分区序列号;broker 按 (pid, seq) 去重,被半 ack 的批次重试不会产生重复。开启幂等会要求 acks='all'retries>=1max.in.flight.requests.per.connection<=5;客户端在你开启幂等时自动设这些默认值。linger.ms=5 让生产者最多等 5 毫秒以攒批——把生产者变成高吞吐喷射器的经典招式,代价是发布端几毫秒延迟。compression.type='lz4' 端到端压缩 batch;lz4 是默认(解压速度对压缩比性价比最高);Snappy 是非常接近的第二;gzip 生产端太慢;zstd 适合消费者足够新的场景。batch.size=65536(64 KiB)是常用默认;延迟允许的情况下可以拉大以换吞吐。max.in.flight.requests.per.connection=5 允许每个连接 5 个在飞 batch;幂等下安全(broker 去重);开启真正的生产端并行。

分区 key 规则

每条消息可选一个 key;生产者对它做哈希(Kafka 默认 partitioner 是 murmur2(key) % num_partitions),消息送到那个分区;相同 key -> 相同分区 -> 在该 key 维度上有序。行情 topic 一律按 symbol 做 key。三种选择按正确度递减:

  • key=symbol — 一个 symbol 的所有 tick 落到同一个分区;group 内一个消费者按 symbol 时间序读到这些 tick;symbol 维度仍能 16 路并行。​​行情 topic 的标准选择。​
  • key=(symbol, ts) — 高基数 key 把同一个 symbol 的 tick 散到 16 个分区;per-symbol 顺序丢失。​​错。​
  • key=venue — 低基数 key 把 上证 所有 tick 集中在一个分区;热点。​​错。​

规则:​**​按「消费者需要保序的那个字段」做 key,行情场景里就是 symbol**​。若消费逻辑依赖 (underlying, strike, expiry) 在期权场景下的顺序,按这三者拼出来做 key;不依赖时按更粗的字段做 key。

Consumer:五个旋钮加 poll 循环

消费者侧对称配置,能挡住「悄无声息丢数据」的五个旋钮:

from confluent_kafka import Consumer, KafkaError

consumer = Consumer({
    'bootstrap.servers': 'broker:9092',
    'group.id': 'feedhandler-warehouse',
    'enable.auto.commit': False,
    'auto.offset.reset': 'earliest',
    'isolation.level': 'read_committed',
})

consumer.subscribe(['ticks.sse.510300'])

while True:
    msg = consumer.poll(timeout=1.0)
    if msg is None or msg.error() is not None:
        continue
    # 处理消息:解码、校验、写入仓库
    handle(msg)
    # 处理完再提交,永远不提前
    consumer.commit(asynchronous=False)

group.id='feedhandler-warehouse' 给消费者组起名;同组消费者共享 topic 的分区。enable.auto.commit=False 是生产设置——另一种方式按定时器(默认 5 秒)推进 committed offset,可能把还没处理的消息越过;消费者在处理中崩了,接管该分区的下一个消费者从已提交 offset 开始,在飞消息​​丢失​​。auto.offset.reset='earliest' 首次加入时从日志最早处读起(ticker plant 想回补历史就用这个);'latest' 只读新消息(适合实时面板)。isolation.level='read_committed' 让消费者跳过已中止事务的消息;上游若用 Kafka 事务,这是合理默认。​​操作顺序​​是绝对的:poll、解码、写仓库、​​再​ commit。处理完再提交,永远不提前。批量处理(性能关键路径):攒 N 条消息、一次 executemany INSERT、提交最后一条消息的 offset;批量是原子单元。

kcat:日常 CLI

日常 CLI 是 kcat(二进制以前叫 kafkacat)。四种每天会用的调用:

kcat -b broker:9092 -L
kcat -b broker:9092 -t ticks.sse.510300 -C -o end
kcat -b broker:9092 -t ticks.sse.510300 -C -o beginning -e -c 5
kcat -b broker:9092 -t ticks.sse.510300 -P -K:

第一条列出 topic。第二条尾巴模式跟最新消息(从 end 消费并保持连接)。第三条从开头读前 5 条然后退出。第四条把 stdin 每行当作 key:value 推到 topic。日常 workflow:消费者沉默时先用 kcat 确认 topic 里确实有消息再去 debug 消费端代码。这一条比任何其它 Kafka 习惯节省的 debug 时间都多。

消费滞后:最重要的生产指标

kafka-consumer-groups.sh --bootstrap-server broker:9092 --describe --group feedhandler-warehouse 给出每分区的滞后:

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID

LAG 列 = LOG-END-OFFSET - CURRENT-OFFSET。LAG 持续增长说明消费者比生产者慢;要么横向扩(加消费者,最多扩到分区数)要么纵向扩(消费者侧向量化——更大的 batch、更少的仓库往返)。规则:​​消费滞后是 Kafka 管道最重要的生产指标​​。3.6.6 会接 Prometheus 抓取 kafka_consumergroup_lag 并在 Grafana 上画板。

Kafka Connect:什么时候不要手写消费者

Kafka Connect 是一个独立的 Java 进程,跑 connector。source connector 把上游系统数据拉进 Kafka topic(Debezium 做 Postgres CDC、MQTT 接 IoT 流、file-tail 做日志托运)。sink connector 把 Kafka topic 推到下游存储。配置是一份 JSON,通过 Connect REST API 提交:

{"name": "ticks-to-timescaledb", "config": {"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "topics": "ticks.sse.510300", "connection.url": "jdbc:postgresql://timescaledb:5432/warehouse", "connection.user": "${env:DB_USER}", "connection.password": "${env:DB_PASSWORD}", "auto.create": "false", "insert.mode": "insert", "pk.mode": "record_key", "pk.fields": "symbol,ts", "table.name.format": "ticks_raw"}}

规则:但凡有现成 connector 的 sink 一律走 Connect(JDBC sink 接 TimescaleDB、InfluxDB sink、S3 sink);只有当 normalisation 逻辑现成 connector 表达不出时才手写消费者——这是 L4 capstone 的情形,合成 feed 需要 symbol 专属的 normalisation,没有 connector 现成支持。

运维旋钮与 EOS

再点名三个:log.retention.hours=168 是 topic 默认保留时长;行情 topic 经常 30 天或 90 天以满足 重放 + 合规。log.segment.bytes=1GiB 是磁盘 segment 大小——没量过就别调。cleanup.policy=compact 让 topic 只保留每个 key 的最新值、丢弃更早的,典型场景是 instrument-master topic(每个 symbol 只关心最新 tick size、lot size、合约说明);ticks topic 上​​不​​对——每条记录都有意义,用 cleanup.policy=delete(默认)。Kafka 事务与 exactly-once-semantics (EOS):producer 可以 init_transactions()、然后 begin_transaction(); produce(...); commit_transaction();同进程内 consumer 配 isolation.level='read_committed',组合起来给你 Kafka 边界内的 exactly-once read-process-write。手写 feed handler 几乎用不上——(symbol, ts) 主键在仓库侧已经做了幂等;默认仍是 at-least-once + 幂等消费者。

Exercise

Exercise

Stand up a single-broker Kafka (via docker compose -f kafka.yml up -d — the compose file is provided; do not author it) and complete the producer / consumer round-trip. (a) Create a topic ticks.sse.510300 with 8 partitions, replication factor 1 (single broker), min.insync.replicas=1, retention 168h via the kafka-topics.sh --create invocation from this lesson. (b) Write producer.py (~40 lines using confluent-kafka) that emits one JSON tick every 20 milliseconds for three symbols, keyed by symbol, using the seven producer-config keys from this lesson (acks=all, enable.idempotence=True, linger.ms=5, compression.type=lz4, max.in.flight.requests.per.connection=5, batch.size=65536). (c) Write consumer.py (~40 lines) that subscribes in group feedhandler-warehouse with the five consumer-config keys from this lesson (enable.auto.commit=False, auto.offset.reset=earliest, isolation.level=read_committed), accumulates 1000 messages into a batch, prints batch=<N>, first_offset=<O>, last_symbol=<S> per batch, and calls consumer.commit(asynchronous=False) after the print succeeds. (d) Verify with kcat -b broker:9092 -t ticks.sse.510300 -C -o end that messages are being produced; verify with kafka-consumer-groups.sh --describe --group feedhandler-warehouse that LAG returns to near zero after the consumer has caught up. (e) Kill the consumer mid-batch; restart it; confirm that the second run sees no message gap and no duplicates by tracking observed offsets across runs. (f) State in two sentences why setting enable.auto.commit=True would have produced data loss in step (e), and which Kafka config + which 3.6.3-warehouse property combined would have produced exactly-once-from-the-consumer-perspective behaviour.

提示
第 (b) 步里 key=symbol.encode() 才是 symbol 维度保序的来源——不传 key,partitioner 会 round-robin,510300 的 tick 会散到多个分区上。delivery callback 应把每条 ack 的 partition 与 offset 打日志,第 (d) 步好确认 keying 正确。
提示
第 (e) 步:一条 batch 打印后、下一次 commit 前用 kill -9 杀掉消费者;重启后看第一条 batch 的 first_offset 应当等于上一轮 last_offset + 1。仓库主键就是消费端的幂等锚——这也是第 (f) 步答案的后半。

下一课

对于绝大多数买方管道,Kafka 都是正确答案——耐久、可重放、acks='all' 下端到端毫秒级。但有些 workload 卡在毫秒底板上:feed handler 需要把交易所原始 multicast 包变成策略决策、延迟预算个位数微秒;cluster 内扇出根本不需要 broker 的耐久性、但要为它付延迟成本;同机 feed-handler 线程到策略线程之间的递手,每微秒都要紧。下一课教这些场景下该上什么——ZeroMQ 做 cluster 内无耐久扇出、UDP multicast 做交易所直连行情、共享内存环形 buffer 做同机亚微秒路径。Kafka 的纪律往后带:按 symbol 做 key;acks='all' + enable.idempotence=True 永远;处理完再 commit、永远不提前;批大小给个上界;监控消费滞后;现成 sink 走 Kafka Connect;只有 normalisation 现成 connector 表达不出时才手写消费者。

阅读清单

  • Kafka 中文 文档(kafka.apachecn.org)——Producer Configs / Consumer Configs / Connect 章节。
  • 《Apache Kafka 实战》中文版(胡夕 著)。
  • 《Kafka 权威指南》中文版(Neha Narkhede 等 著,O'Reilly)。
  • confluent-kafka-python 文档(英文权威,github.com/confluentinc/confluent-kafka-python)。
  • kcat / kafkacat 文档(github.com/edenhill/kcat)。
  • Confluent 关于 EOS 的博客「Exactly-Once Semantics Are Possible: Here's How Kafka Does It」。
  • 《数据密集型应用系统设计》中文版(Martin Kleppmann)第 11 章 流处理。

一条额外注释:A-股 量化 firm 的 Kafka 集群通常部署在内网物理机上(3 节点起步、副本因子 3,运维实践与海外对齐);Confluent Cloud / AWS MSK 因数据安全法规与跨境带宽在国内罕见。行情 vendor(通联 / 万得 / 恒生)通常已经把交易所行情 normalise 成 TCP 推送,firm 把它写进内网 Kafka topic——这是本课 producer 角色在实际工作里的落点。把这套规则收成一句:​​按 symbol 做 key;处理完再 commit;监控滞后;现成 sink 走 Kafka Connect。​

速查卡

本课反复出现的展示形态——抄进笔记:

  1. Inline-code 定义——四个 Kafka 原语(topicpartitionreplica + ISRoffset)与三种分区 key 选项。
  2. Fenced ```bash 块——kafka-topics.sh --create 调用、四种 kcat 命令、kafka-consumer-groups.sh --describe 调用。
  3. Fenced ```python 块——七旋钮 Producer({...}) 配置 + producer.produce(..., key=symbol.encode(), ...)、五旋钮 Consumer({...}) 配置 + 「处理完再 commit」的 poll 循环。
  4. Fenced ```json 块——Kafka Connect JDBC sink connector 配置。
  5. Exercise 与两条渐进 Hint——单 broker producer / consumer round-trip 加上一次 crash-restart 幂等性测试。

国内 私募 部署补充

A-股 量化 私募 部署 Kafka 的实际场景:ShanghaiStockExchange (SSE) 与 ShenzhenStockExchange (SZSE) 通过 vendor 中介(TongLianData、WindData、HsiHengSheng)把 510300 等 沪深300 ETF tick 推送到 firm 内网,写进 ticks.sse.510300 topic;CFFEX 中金所 通过 ChinaTradePlatform 的 CTP API 把 IF / IH / IC / IM 期货 tick 写进 ticks.cffex.if2406 topic。监管侧 ChinaSecuritiesRegulatoryCommission 对 90 天 行情留痕的要求体现在 Kafka 的 retention.ms 配置上;ZhongJinSuo 与 ShangJiaoSuo 文档由 SecuritiesAssociationOfChina 与 ShanghaiStockExchange 公开。