某 CFFEX 张江 COLO 机房里,一位延迟工程师在 SSE Level-2 行情上跑 tcpdump,问你:一个 09:30:00.000001234 时刻穿过交换机的报文,为什么 09:30:00.000004718 才到达策略线程?这 3.5 µs 就是 L1 委托簿能消费的预算上限——而其中大部分都付给了线路到委托簿之间这一层:行情处理器。本课就把它建出来。一条固定 CPU、只做 recvmsg 与解析的接收线程;一个把两路冗余多播去重的 A/B 仲裁器;以及三种回调到 OrderBook::install_snapshot() 的生产级缺口恢复方案。
架构模式
生产级行情处理器是一条定型流水线:一到两条接收线程,每条订阅一个多播组、绑定到一颗隔离 CPU,工作只有 recv + 解析 + 推入 SPSC 队列。当启用 A/B 冗余时,仲裁器线程从两条上游队列读、按序号去重、把存活那条推入下游 SPSC 队列。策略线程从下游队列读、对每条事件应用到 L1 委托簿、然后通知策略。热路径之外,快照恢复线程盯着一个 bool 标志,标志翻转时去和交易所的 TCP 快照服务对话。热路径上一切由 3.4.4 L3 的 std::atomic + SPSC 队列承载;不调 malloc,不加锁,不留分支 99% 走不到的死路径。
事件 POD
线程之间的基本单位是 MarketDataEvent——一个 32 字节、半条 cache line 装下的 packed POD。字段顺序对照 ITCH 5.0 最常出现的消息类型,让解析器热路径写入连续。
#include <cstdint>
enum class EventType : std::uint8_t {
AddOrder = 0, // ITCH 'A' / 'F'
Executed = 1, // ITCH 'E' / 'C'
Cancel = 2, // ITCH 'X'
Delete = 3, // ITCH 'D'
Replace = 4, // ITCH 'U' (delete + add)
};
struct MarketDataEvent {
EventType type;
std::uint8_t side; // 0 = buy, 1 = sell
std::uint16_t flags; // reserved; future tagged-state
std::uint32_t shares; // ITCH quantity (4-byte big-endian on the wire)
std::uint64_t order_id; // exchange order reference
std::int32_t price_ticks; // integer price in tick units (never floating-point)
std::uint64_t seq_no; // multicast-feed sequence number
std::uint64_t timestamp_ns; // ITCH 6-byte nanoseconds-since-midnight (zero-extended to 8)
} __attribute__((packed));
static_assert(sizeof(MarketDataEvent) <= 64, "MarketDataEvent must fit in one cache line");
那个 static_assert 是承重的:如果你改了这个 struct 让它超过 64 字节,每次队列推入要付双 cache line 传输代价,L3 策略线程会在 perf stat 上立刻看见。__attribute__((packed)) 拿掉编译器原本会插入的字段间填充字节;x86 上未对齐访问几乎免费,按每秒百万级事件计,省下的字节累积成可观的带宽。
接收线程
接收线程做三件事:把一个 UDP socket 绑到多播组上、把内核接收缓冲提到 64 MiB 以便消费者短暂卡顿不会让 NIC 丢包、循环 recv 到一段 2 KB 的复用静态 buf——热路径上零分配。
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <unistd.h>
#include <cstring>
int open_multicast_rx(const char* group, std::uint16_t port, const char* iface) {
int fd = ::socket(AF_INET, SOCK_DGRAM, 0);
int reuse = 1;
::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)); // allow multiple subscribers on the host
sockaddr_in addr{};
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = htonl(INADDR_ANY); // accept on any local IP
::bind(fd, reinterpret_cast<const sockaddr*>(&addr), sizeof(addr));
ip_mreq mreq{};
mreq.imr_multiaddr.s_addr = ::inet_addr(group);
mreq.imr_interface.s_addr = iface ? ::inet_addr(iface) : htonl(INADDR_ANY);
::setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq));
int rcvbuf = 64 * 1024 * 1024; // 64 MiB receive buffer
::setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(rcvbuf));
return fd;
}
void feed_recv_loop(int fd, SpscQueue<MarketDataEvent>& out, std::atomic<bool>& running) {
char buf[2048]; // static buffer reused per packet (no allocation on hot path)
while (running.load(std::memory_order_relaxed)) {
ssize_t n = ::recv(fd, buf, sizeof(buf), 0);
if (n <= 0) continue; // EINTR or empty datagram
MarketDataEvent ev{};
if (!parse_itch(buf, static_cast<std::size_t>(n), ev)) continue; // parse failures logged separately
while (!out.try_push(ev)) {
// back-pressure: queue full. Production code: escalate to kill switch.
// For didactic L2 we spin briefly; L3's kill-switch wiring takes over.
}
}
}
几处细节落地一下。SO_REUSEADDR 让本机多个进程能同时订阅同一多播组——策略进程、并行日志器、临时挂上来抓包的 tcpdump 都能各得其所。64 MiB 接收缓冲消化开盘集合竞价之后那一波数据洪峰;没有它,内核会在 socket 上丢包,下一节的缺口恢复就要无谓地触发。parse_itch 是 ITCH 5.0 线路格式解码器:把 payload 首字节看作消息类型('A' / 'D' / 'E' / 'X' / 'U' 等),通过 std::memcpy 按固定偏移读出大端字段(避免未对齐访问的 UB),再写到 MarketDataEvent 的对应槽位。
A/B 仲裁
生产交易所对每路多播行情都同时发布两条物理隔离的链路("A 路"与"B 路")到不同交换机上的两个多播组。每条消息在两条流上同序号出现。仲裁器的工作是:让每个序号只向下游转发一次,谁先到就走谁,并在两条都落后时把快照标志翻起来。
#include <bitset>
class AbArbitrator {
public:
// Returns true if the event should be forwarded; false if it is a duplicate.
bool admit(const MarketDataEvent& ev) noexcept {
const std::uint64_t seq = ev.seq_no;
if (seq == next_expected_) {
++next_expected_;
recent_.set(seq % kRingSize);
// Drain any contiguous run that arrived earlier from the other feed.
while (recent_.test(next_expected_ % kRingSize)) {
recent_.reset(next_expected_ % kRingSize);
++next_expected_;
}
return true;
}
if (seq < next_expected_) {
// Late arrival or duplicate from the other feed; drop.
return false;
}
// seq > next_expected_: out-of-order arrival from the faster feed.
// Mark seen; emit only when the gap fills.
if (seq - next_expected_ >= kRingSize) {
// Gap larger than the dedup ring: snapshot-recovery territory.
request_snapshot_ = true;
next_expected_ = seq; // skip ahead so the strategy does not stall
return true; // dispatch this event; the gap is escalated separately
}
recent_.set(seq % kRingSize);
return false; // do not forward yet; wait for the missing sequences
}
bool needs_snapshot() const noexcept { return request_snapshot_; }
void clear_snapshot_flag() noexcept { request_snapshot_ = false; }
private:
static constexpr std::size_t kRingSize = 1024;
std::bitset<kRingSize> recent_{};
std::uint64_t next_expected_ = 0;
bool request_snapshot_ = false;
};
三个分支一个排空循环。按序分支是暖路径——绝大多数事件落在这里。排空循环处理这样的情形:先到那条流送过来 N+1、N+2,慢的那条还没送到 N;当 N 终于到达慢路径时,循环往上滚过所有已就位的连续序号,按序一次性吐出。第三个分支(seq > next_expected_)是冷路径:一条流送来的序号但对面那条还没到对应序号;在 ring 上标记后等。一旦缺口超过 kRingSize = 1024,仲裁器放弃去重、翻起快照标志、跳过缺口,让策略不至于被永久阻塞。
Formula Explorer
effective_loss_prob = p_A * p_B完整流水线
四条钉在四颗隔离核心上的热路径线程,外加一条非热路径的快照恢复线程:
// Threads:
// CPU 2: A-feed receive thread -> ab_inbox SPSC queue
// CPU 4: B-feed receive thread -> ab_inbox SPSC queue (shared)
// CPU 3: arbitrator thread -> dedup SPSC queue
// CPU 6: strategy thread -> reads dedup queue, applies to OrderBook
//
// Recovery: when arbitrator.needs_snapshot() flips true, a separate snapshot-recovery
// thread (not on the hot path) requests a full book snapshot from the TCP snapshot
// service, calls book.install_snapshot(...), and clears the flag.
void arbitrator_loop(SpscQueue<MarketDataEvent>& in_a, SpscQueue<MarketDataEvent>& in_b,
SpscQueue<MarketDataEvent>& out, AbArbitrator& arb,
std::atomic<bool>& running) {
MarketDataEvent ev{};
while (running.load(std::memory_order_relaxed)) {
bool got = in_a.try_pop(ev) || in_b.try_pop(ev); // round-robin: A first, then B
if (!got) continue;
if (arb.admit(ev)) {
while (!out.try_push(ev)) { /* see L3 kill switch */ }
}
}
}
void strategy_loop(SpscQueue<MarketDataEvent>& in, OrderBook& book, Strategy& strat,
std::atomic<bool>& running) {
MarketDataEvent ev{};
while (running.load(std::memory_order_relaxed)) {
if (!in.try_pop(ev)) continue;
// 1) Apply to the L1 order book first so book state is current before the strategy sees the event.
book.apply(ev);
// 2) Notify the strategy with the post-apply book state.
strat.on_tick(ev, book);
}
}
strategy_loop 的顺序是承重的:策略必须看到的总是这条事件应用之后的委托簿状态。如果在 book.apply(ev) 之前先问策略,OnTick 调用 book.best_bid_ticks() 会读到当前 tick 之前的内部价——在沪深300 ETF 这种快速行情上,一条消息的陈旧视图足够让策略下错方向的单。
三种缺口恢复方案
包丢失时,行情处理器在三种恢复方案中选其一。三者的差别在恢复延迟、可适用的缺口大小、以及对应的消费方场景。
| 方案 | 恢复延迟 | 适用缺口大小 | 适用消费方场景 | L2 工作示例落地 |
|---|---|---|---|---|
| TCP 重传 | 毫秒级 | 小(不超过几百序号) | 非 HFT(中频、风控报告、事后处理) | 课文点名,未实现(与交易所规约强绑) |
| A/B 冗余仲裁 | 微秒级 | 至多 dedup ring 大小(~1024) | HFT | 本课实现,即上文 AbArbitrator |
| 快照+回放 | 秒级(与快照服务一次往返) | 任意大小 | 万能后备 | 热路径之外的快照恢复线程,调用 L1 的 OrderBook::install_snapshot();AbArbitrator::needs_snapshot() 翻转时触发 |
TCP 重传是交易所侧方案:处理器单开一条到交易所重传服务器的 TCP 单播会话,请求序号 X 到 Y、按序接收并喂回解析器。延迟一个网络往返加交易所响应时间,毫秒级。每分钟跑一次的风控报告完全够用;对 5 µs 端到端预算的 HFT 消费方毫无意义。
A/B 仲裁是本地方案:冗余多播流就是你自带的重传。A 路丢了的包 B 路按时到了,仲裁器就如期吐出,策略全程不知道有过缺口。有效丢包率是两条独立丢包率的乘积——p_A * p_B——通常比单条流的丢包率低四到六个数量级。
快照+回放是万能后备:两条流同时丢同一序号、或者缺口大到 dedup ring 装不下时,没法靠增量恢复。你向交易所要一个新序号下的完整委托簿、安装、从那个序号继续。这就是 L1 的 install_snapshot 契约;L2 给出触发条件。
背压:策略卡住会怎样
如果策略线程在某条事件上耗时过长——一次冷缓存未命中、一次错预测、或者邻居进程一次 GC 暂停——dedup SPSC 队列会被填满。生产中三种有界响应:drop-oldest(丢队首继续)、kill-switch(系统已坏,停止交易)、上游背压(多播流意味着 NIC 那一层最终还是丢)。HFT 私募一律选 kill-switch:用陈旧数据继续下单的策略是最糟糕的失败模式,远比干净停机更危险。L3 框架会把 kill-switch 接到队满路径上;当前 while (!out.try_push(ev)) {} 这段自旋是占位,下一课将其闭合。
衔接下一课
至此你拥有了一台每微秒往策略线程投放一条解析好的 MarketDataEvent、并以三种工业级方式承接丢包的行情处理器。策略线程从 dedup 队列读、对每条事件应用到 L1 委托簿、然后咨询策略——但策略本身仍是占位。L3 把它写满:CRTP 形式的 Strategy<Derived> 基类做编译期多态、std::variant<MarketDataEvent, ExecutionReport, TimerEvent, RiskBreach> 事件并集、每张订单必走的六道风控门、以及拥有线路会话状态的 FIX 协议订单路由器,让策略本身对交易所协议保持无状态。
练习
Exercise
(a)改造 feed_recv_loop:在单路(无 A/B)场景下按序号缺口统计丢包。维护每线程一个 uint64_t next_expected_seq;每解析到一条事件,若 ev.seq_no != next_expected_seq,按差值记录缺口大小后重置;运行结束时报告总丢包数。用一份注入了 0.1% 合成丢包的 ITCH 回放数据跑,核对总数与注入的丢包数。
(b)扩展 AbArbitrator::admit:处理 B 路领先 A 路超过 kRingSize 序号(A 路卡住、B 路继续)的情形。仲裁器应识别缺口、翻起 request_snapshot_ = true、把 next_expected_ 快进到 B 路当前序号以免拖住策略。写一个单元测试,喂给仲裁器一段 A 路停滞 2000 序号、B 路持续推进的合成交错流,验证快照标志翻转、策略线程无延迟收到 B 路事件。
(c)接入 snapshot_recovery_thread(独立于仲裁器与策略):每毫秒轮询 arbitrator.needs_snapshot(),翻转时调用桩函数 request_full_book_snapshot()(返回一对 std::vector<Order> 与快照序号),调用 L1 OrderBook::install_snapshot() 安装、清掉快照标志。轮询用 std::this_thread::sleep_for(std::chrono::milliseconds(1))。一句话说明为何快照恢复要跑在独立线程(off the hot path)。
(d)借 3.4.3 L1 的工具,对稳态回放负载下的接收线程跑 perf stat -e cycles,instructions,cache-misses,branch-misses ./feed_handler;报告 IPC 与 cache-miss 率;验证 IPC > 1.0 且 cache-miss < 5%。一句话说明这两项数字为何指示接收线程的缓存是暖的、以及接收线程一旦颠簸会是什么样。
(e)(无须实现)为下列场景各选一种缺口恢复方案(TCP 重传、A/B 仲裁、快照+回放)并各用一句话给出理由:(i)一条 5 µs 端到端预算的做市策略遇到 5 序号缺口;(ii)同一条做市策略遇到一次 30 秒断流导致的 50000 序号缺口;(iii)一个每日一次的风控报告消费方遇到 200 序号缺口;(iv)一个从离线文件回放的回测研究消费方遇到任意缺口。
提示
ev.seq_no - next_expected_seq 到 total_drops,然后 next_expected_seq = ev.seq_no + 1 继续。提示
seq - next_expected_ >= kRingSize 的情形;用 2000 条 B 路事件的合成流喂入、核对 request_snapshot_ 翻转且策略持续消费即可。