← 返回模块
3.4.5.2beta 可读 · 未来付费内容校验中内容版本 2026-05-24

行情处理器

3.4.5 · C++ 交易系统 · 编程

某 CFFEX 张江 COLO 机房里,一位延迟工程师在 SSE Level-2 行情上跑 tcpdump,问你:一个 09:30:00.000001234 时刻穿过交换机的报文,为什么 09:30:00.000004718 才到达策略线程?这 3.5 µs 就是 L1 委托簿能消费的预算上限——而其中大部分都付给了线路到委托簿之间这一层:行情处理器。本课就把它建出来。一条固定 CPU、只做 recvmsg 与解析的接收线程;一个把两路冗余多播去重的 A/B 仲裁器;以及三种回调到 OrderBook::install_snapshot() 的生产级缺口恢复方案。

架构模式

生产级行情处理器是一条定型流水线:一到两条接收线程,每条订阅一个多播组、绑定到一颗隔离 CPU,工作只有 recv + 解析 + 推入 SPSC 队列。当启用 A/B 冗余时,仲裁器线程从两条上游队列读、按序号去重、把存活那条推入下游 SPSC 队列。策略线程从下游队列读、对每条事件应用到 L1 委托簿、然后通知策略。热路径之外,快照恢复线程盯着一个 bool 标志,标志翻转时去和交易所的 TCP 快照服务对话。热路径上一切由 3.4.4 L3 的 std::atomic + SPSC 队列承载;不调 malloc,不加锁,不留分支 99% 走不到的死路径。

事件 POD

线程之间的基本单位是 MarketDataEvent——一个 32 字节、半条 cache line 装下的 packed POD。字段顺序对照 ITCH 5.0 最常出现的消息类型,让解析器热路径写入连续。

#include <cstdint>

enum class EventType : std::uint8_t {
    AddOrder = 0,    // ITCH 'A' / 'F'
    Executed = 1,    // ITCH 'E' / 'C'
    Cancel   = 2,    // ITCH 'X'
    Delete   = 3,    // ITCH 'D'
    Replace  = 4,    // ITCH 'U' (delete + add)
};

struct MarketDataEvent {
    EventType     type;
    std::uint8_t  side;          // 0 = buy, 1 = sell
    std::uint16_t flags;         // reserved; future tagged-state
    std::uint32_t shares;        // ITCH quantity (4-byte big-endian on the wire)
    std::uint64_t order_id;      // exchange order reference
    std::int32_t  price_ticks;   // integer price in tick units (never floating-point)
    std::uint64_t seq_no;        // multicast-feed sequence number
    std::uint64_t timestamp_ns;  // ITCH 6-byte nanoseconds-since-midnight (zero-extended to 8)
} __attribute__((packed));
static_assert(sizeof(MarketDataEvent) <= 64, "MarketDataEvent must fit in one cache line");

那个 static_assert 是承重的:如果你改了这个 struct 让它超过 64 字节,每次队列推入要付双 cache line 传输代价,L3 策略线程会在 perf stat 上立刻看见。__attribute__((packed)) 拿掉编译器原本会插入的字段间填充字节;x86 上未对齐访问几乎免费,按每秒百万级事件计,省下的字节累积成可观的带宽。

接收线程

接收线程做三件事:把一个 UDP socket 绑到多播组上、把内核接收缓冲提到 64 MiB 以便消费者短暂卡顿不会让 NIC 丢包、循环 recv 到一段 2 KB 的复用静态 buf——热路径上零分配。

#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <unistd.h>
#include <cstring>

int open_multicast_rx(const char* group, std::uint16_t port, const char* iface) {
    int fd = ::socket(AF_INET, SOCK_DGRAM, 0);
    int reuse = 1;
    ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));   // allow multiple subscribers on the host

    sockaddr_in addr{};
    addr.sin_family      = AF_INET;
    addr.sin_port        = htons(port);
    addr.sin_addr.s_addr = htonl(INADDR_ANY);                           // accept on any local IP
    ::bind(fd, reinterpret_cast<const sockaddr*>(&addr), sizeof(addr));

    ip_mreq mreq{};
    mreq.imr_multiaddr.s_addr = ::inet_addr(group);
    mreq.imr_interface.s_addr = iface ? ::inet_addr(iface) : htonl(INADDR_ANY);
    ::setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq));

    int rcvbuf = 64 * 1024 * 1024;                                       // 64 MiB receive buffer
    ::setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(rcvbuf));

    return fd;
}

void feed_recv_loop(int fd, SpscQueue<MarketDataEvent>& out, std::atomic<bool>& running) {
    char buf[2048];                                  // static buffer reused per packet (no allocation on hot path)
    while (running.load(std::memory_order_relaxed)) {
        ssize_t n = ::recv(fd, buf, sizeof(buf), 0);
        if (n <= 0) continue;                        // EINTR or empty datagram
        MarketDataEvent ev{};
        if (!parse_itch(buf, static_cast<std::size_t>(n), ev)) continue;   // parse failures logged separately
        while (!out.try_push(ev)) {
            // back-pressure: queue full. Production code: escalate to kill switch.
            // For didactic L2 we spin briefly; L3's kill-switch wiring takes over.
        }
    }
}

几处细节落地一下。SO_REUSEADDR 让本机多个进程能同时订阅同一多播组——策略进程、并行日志器、临时挂上来抓包的 tcpdump 都能各得其所。64 MiB 接收缓冲消化开盘集合竞价之后那一波数据洪峰;没有它,内核会在 socket 上丢包,下一节的缺口恢复就要无谓地触发。parse_itch 是 ITCH 5.0 线路格式解码器:把 payload 首字节看作消息类型('A' / 'D' / 'E' / 'X' / 'U' 等),通过 std::memcpy 按固定偏移读出大端字段(避免未对齐访问的 UB),再写到 MarketDataEvent 的对应槽位。

A/B 仲裁

生产交易所对每路多播行情都同时发布两条物理隔离的链路("A 路"与"B 路")到不同交换机上的两个多播组。每条消息在两条流上同序号出现。仲裁器的工作是:让每个序号只向下游转发一次,谁先到就走谁,并在两条都落后时把快照标志翻起来。

#include <bitset>

class AbArbitrator {
public:
    // Returns true if the event should be forwarded; false if it is a duplicate.
    bool admit(const MarketDataEvent& ev) noexcept {
        const std::uint64_t seq = ev.seq_no;

        if (seq == next_expected_) {
            ++next_expected_;
            recent_.set(seq % kRingSize);
            // Drain any contiguous run that arrived earlier from the other feed.
            while (recent_.test(next_expected_ % kRingSize)) {
                recent_.reset(next_expected_ % kRingSize);
                ++next_expected_;
            }
            return true;
        }
        if (seq < next_expected_) {
            // Late arrival or duplicate from the other feed; drop.
            return false;
        }
        // seq > next_expected_: out-of-order arrival from the faster feed.
        // Mark seen; emit only when the gap fills.
        if (seq - next_expected_ >= kRingSize) {
            // Gap larger than the dedup ring: snapshot-recovery territory.
            request_snapshot_ = true;
            next_expected_   = seq;       // skip ahead so the strategy does not stall
            return true;                  // dispatch this event; the gap is escalated separately
        }
        recent_.set(seq % kRingSize);
        return false;                     // do not forward yet; wait for the missing sequences
    }

    bool needs_snapshot() const noexcept { return request_snapshot_; }
    void clear_snapshot_flag() noexcept { request_snapshot_ = false; }
private:
    static constexpr std::size_t kRingSize = 1024;
    std::bitset<kRingSize> recent_{};
    std::uint64_t          next_expected_ = 0;
    bool                   request_snapshot_ = false;
};

三个分支一个排空循环。按序分支是暖路径——绝大多数事件落在这里。排空循环处理这样的情形:先到那条流送过来 N+1、N+2,慢的那条还没送到 N;当 N 终于到达慢路径时,循环往上滚过所有已就位的连续序号,按序一次性吐出。第三个分支(seq > next_expected_)是冷路径:一条流送来的序号但对面那条还没到对应序号;在 ring 上标记后等。一旦缺口超过 kRingSize = 1024,仲裁器放弃去重、翻起快照标志、跳过缺口,让策略不至于被永久阻塞。

Formula Explorer

effective_loss_prob = p_A * p_B

完整流水线

四条钉在四颗隔离核心上的热路径线程,外加一条非热路径的快照恢复线程:

// Threads:
//   CPU 2: A-feed receive thread     -> ab_inbox SPSC queue
//   CPU 4: B-feed receive thread     -> ab_inbox SPSC queue (shared)
//   CPU 3: arbitrator thread         -> dedup SPSC queue
//   CPU 6: strategy thread           -> reads dedup queue, applies to OrderBook
//
// Recovery: when arbitrator.needs_snapshot() flips true, a separate snapshot-recovery
// thread (not on the hot path) requests a full book snapshot from the TCP snapshot
// service, calls book.install_snapshot(...), and clears the flag.

void arbitrator_loop(SpscQueue<MarketDataEvent>& in_a, SpscQueue<MarketDataEvent>& in_b,
                     SpscQueue<MarketDataEvent>& out, AbArbitrator& arb,
                     std::atomic<bool>& running) {
    MarketDataEvent ev{};
    while (running.load(std::memory_order_relaxed)) {
        bool got = in_a.try_pop(ev) || in_b.try_pop(ev);   // round-robin: A first, then B
        if (!got) continue;
        if (arb.admit(ev)) {
            while (!out.try_push(ev)) { /* see L3 kill switch */ }
        }
    }
}

void strategy_loop(SpscQueue<MarketDataEvent>& in, OrderBook& book, Strategy& strat,
                   std::atomic<bool>& running) {
    MarketDataEvent ev{};
    while (running.load(std::memory_order_relaxed)) {
        if (!in.try_pop(ev)) continue;
        // 1) Apply to the L1 order book first so book state is current before the strategy sees the event.
        book.apply(ev);
        // 2) Notify the strategy with the post-apply book state.
        strat.on_tick(ev, book);
    }
}

strategy_loop 的顺序是承重的:策略必须看到的总是这条事件应用之后的委托簿状态。如果在 book.apply(ev) 之前先问策略,OnTick 调用 book.best_bid_ticks() 会读到当前 tick 之前的内部价——在沪深300 ETF 这种快速行情上,一条消息的陈旧视图足够让策略下错方向的单。

三种缺口恢复方案

包丢失时,行情处理器在三种恢复方案中选其一。三者的差别在恢复延迟、可适用的缺口大小、以及对应的消费方场景。

方案恢复延迟适用缺口大小适用消费方场景L2 工作示例落地
TCP 重传毫秒级小(不超过几百序号)非 HFT(中频、风控报告、事后处理)课文点名,未实现(与交易所规约强绑)
A/B 冗余仲裁微秒级至多 dedup ring 大小(~1024)HFT本课实现,即上文 AbArbitrator
快照+回放秒级(与快照服务一次往返)任意大小万能后备热路径之外的快照恢复线程,调用 L1 的 OrderBook::install_snapshot()AbArbitrator::needs_snapshot() 翻转时触发

TCP 重传是交易所侧方案:处理器单开一条到交易所重传服务器的 TCP 单播会话,请求序号 X 到 Y、按序接收并喂回解析器。延迟一个网络往返加交易所响应时间,毫秒级。每分钟跑一次的风控报告完全够用;对 5 µs 端到端预算的 HFT 消费方毫无意义。

A/B 仲裁是本地方案:冗余多播流就是你自带的重传。A 路丢了的包 B 路按时到了,仲裁器就如期吐出,策略全程不知道有过缺口。有效丢包率是两条独立丢包率的乘积——p_A * p_B——通常比单条流的丢包率低四到六个数量级。

快照+回放是万能后备:两条流同时丢同一序号、或者缺口大到 dedup ring 装不下时,没法靠增量恢复。你向交易所要一个新序号下的完整委托簿、安装、从那个序号继续。这就是 L1 的 install_snapshot 契约;L2 给出触发条件。

背压:策略卡住会怎样

如果策略线程在某条事件上耗时过长——一次冷缓存未命中、一次错预测、或者邻居进程一次 GC 暂停——dedup SPSC 队列会被填满。生产中三种有界响应:drop-oldest(丢队首继续)、kill-switch(系统已坏,停止交易)、上游背压(多播流意味着 NIC 那一层最终还是丢)。HFT 私募一律选 kill-switch:用陈旧数据继续下单的策略是最糟糕的失败模式,远比干净停机更危险。L3 框架会把 kill-switch 接到队满路径上;当前 while (!out.try_push(ev)) {} 这段自旋是占位,下一课将其闭合。

衔接下一课

至此你拥有了一台每微秒往策略线程投放一条解析好的 MarketDataEvent、并以三种工业级方式承接丢包的行情处理器。策略线程从 dedup 队列读、对每条事件应用到 L1 委托簿、然后咨询策略——但策略本身仍是占位。L3 把它写满:CRTP 形式的 Strategy<Derived> 基类做编译期多态、std::variant<MarketDataEvent, ExecutionReport, TimerEvent, RiskBreach> 事件并集、每张订单必走的六道风控门、以及拥有线路会话状态的 FIX 协议订单路由器,让策略本身对交易所协议保持无状态。

练习

Exercise

(a)改造 feed_recv_loop:在单路(无 A/B)场景下按序号缺口统计丢包。维护每线程一个 uint64_t next_expected_seq;每解析到一条事件,若 ev.seq_no != next_expected_seq,按差值记录缺口大小后重置;运行结束时报告总丢包数。用一份注入了 0.1% 合成丢包的 ITCH 回放数据跑,核对总数与注入的丢包数。

(b)扩展 AbArbitrator::admit:处理 B 路领先 A 路超过 kRingSize 序号(A 路卡住、B 路继续)的情形。仲裁器应识别缺口、翻起 request_snapshot_ = true、把 next_expected_ 快进到 B 路当前序号以免拖住策略。写一个单元测试,喂给仲裁器一段 A 路停滞 2000 序号、B 路持续推进的合成交错流,验证快照标志翻转、策略线程无延迟收到 B 路事件。

(c)接入 snapshot_recovery_thread(独立于仲裁器与策略):每毫秒轮询 arbitrator.needs_snapshot(),翻转时调用桩函数 request_full_book_snapshot()(返回一对 std::vector<Order> 与快照序号),调用 L1 OrderBook::install_snapshot() 安装、清掉快照标志。轮询用 std::this_thread::sleep_for(std::chrono::milliseconds(1))。一句话说明为何快照恢复要跑在独立线程(off the hot path)。

(d)借 3.4.3 L1 的工具,对稳态回放负载下的接收线程跑 perf stat -e cycles,instructions,cache-misses,branch-misses ./feed_handler;报告 IPC 与 cache-miss 率;验证 IPC > 1.0 且 cache-miss < 5%。一句话说明这两项数字为何指示接收线程的缓存是暖的、以及接收线程一旦颠簸会是什么样。

(e)(无须实现)为下列场景各选一种缺口恢复方案(TCP 重传、A/B 仲裁、快照+回放)并各用一句话给出理由:(i)一条 5 µs 端到端预算的做市策略遇到 5 序号缺口;(ii)同一条做市策略遇到一次 30 秒断流导致的 50000 序号缺口;(iii)一个每日一次的风控报告消费方遇到 200 序号缺口;(iv)一个从离线文件回放的回测研究消费方遇到任意缺口。

提示
关于(a):只需在按序分支处理;不命中时累加 ev.seq_no - next_expected_seqtotal_drops,然后 next_expected_seq = ev.seq_no + 1 继续。
提示
关于(b):现有第三个分支已处理 seq - next_expected_ >= kRingSize 的情形;用 2000 条 B 路事件的合成流喂入、核对 request_snapshot_ 翻转且策略持续消费即可。