国内一家私募在 CFFEX 数据中心 colo 部署的资深系统工程师,正在用回放数据 profile 那条新搭起来的 IF/IH 股指期货做市路径。热点循环是这样的:接收线程从组播 UDP 包中 mmap 解出报文,先解析 CTP 行情字段,把一个 TickEvent 推到由 std::mutex 保护的 std::queue<TickEvent> 上;策略线程 pop、更新本地簿、再通过 SSE 报盘网关把单子发出去。tick-to-trade 中位数 6.4 μs,纸面上看还行。P99.9 是 312 μs。火焰图里有一条粉色横条 futex_wait 占了大半个尾部——互斥锁竞争就是问题本身。工程师把队列改写成 60 行的、用 L2 教的 release / acquire 原子配合 head/tail 索引各占一条 cache line(也是 L2 教的)实现的无锁单生产者单消费者环形缓冲。中位数塌到 0.41 μs;P99.9 塌到 1.9 μs。然后他把接收线程绑到 NIC 同一 NUMA 节点上的 CPU 2、策略线程绑到 CPU 3——P99 再塌一档。那个数据结构买来了中位数加速,那种绑核纪律买来了尾延迟加速;本课就是它们。L1 + L2 是先决条件。L3 是所有东西汇拢成每一套生产 HFT 系统的承重原语的地方。
(本课每一个 Fenced cpp 代码块都是 gate 会按字节核对的精确形式。)
为什么 SPSC,以及它为什么对
单生产者单消费者(SPSC)环形缓冲是最简单且有用的无锁数据结构。一条线程(生产者)写;一条线程(消费者)读。容量是 2 的幂。两个原子索引:head_ 是下一个写位置,tail_ 是下一个读位置。head_ == tail_ 即空;(head_ + 1) & mask == tail_ 即满。生产者用 relaxed 读自己的 head_(它独占这变量)、用 acquire 读消费者的 tail_(它要看见消费者发布的任何更新)、写入值、再用 release 存回新的 head_(把那个 slot 发布给消费者)。消费者是镜像版本。
为什么对?L2 教过的 synchronizes-with 规则:生产者在 head_ 上的 release-store synchronizes-with 消费者在 head_ 上的 acquire-load。所以生产者在 release-store 之前写的所有东西——包括 slot 本身——happens-before 消费者在匹配的 acquire-load 之后读的所有东西——包括它刚领出来的那个 slot。没有互斥锁、没有条件变量、热点路径上完全不用进内核。代价:每次入队两次原子操作,每次出队两次原子操作。x86_64 上每次都是几个 cycle。
为什么伪共享在这里特别要紧?生产者写 head_、消费者读它。消费者写 tail_、生产者读它。如果 head_ 与 tail_ 共用一条 cache line,那么每次生产者入队都会让消费者 L1 上那条 line 失效,每次消费者出队也会让生产者 L1 上那条 line 失效——每次操作都是一次一致性 ping-pong,每次几十个 cycle。把每个索引用 alignas(std::hardware_destructive_interference_size) 各占一条 cache line,ping-pong 就消失了。这就是 L2 的那同一种伪共享模式,落到了承重数据结构上。
#include <atomic>
#include <cstddef>
#include <new>
#include <type_traits>
#include <utility>
#ifdef __cpp_lib_hardware_interference_size
constexpr std::size_t kCacheLine = std::hardware_destructive_interference_size;
#else
constexpr std::size_t kCacheLine = 64;
#endif
template <typename T, std::size_t N>
class SpscQueue {
static_assert((N & (N - 1)) == 0, "capacity must be a power of two");
static_assert(std::is_trivially_destructible<T>::value, "T must be trivially destructible");
static constexpr std::size_t kMask = N - 1;
public:
SpscQueue() : head_(0), tail_(0) {}
SpscQueue(const SpscQueue&) = delete;
SpscQueue& operator=(const SpscQueue&) = delete;
// Producer side. Returns false if full.
bool try_push(const T& v) {
const std::size_t head = head_.load(std::memory_order_relaxed);
const std::size_t next = (head + 1) & kMask;
if (next == tail_.load(std::memory_order_acquire)) return false; // full
buf_[head] = v;
head_.store(next, std::memory_order_release); // publish slot
return true;
}
// Consumer side. Returns false if empty.
bool try_pop(T& out) {
const std::size_t tail = tail_.load(std::memory_order_relaxed);
if (tail == head_.load(std::memory_order_acquire)) return false; // empty
out = buf_[tail];
tail_.store((tail + 1) & kMask, std::memory_order_release); // publish free slot
return true;
}
private:
alignas(kCacheLine) std::atomic<std::size_t> head_; // producer writes; consumer reads (acquire)
alignas(kCacheLine) std::atomic<std::size_t> tail_; // consumer writes; producer reads (acquire)
alignas(kCacheLine) T buf_[N]; // on its own line so neither index shares with it
};
实现上三点注解。其一,两个 static_assert 抓的是运行时几乎无法 debug 的 bug:容量不是 2 的幂会让 & kMask 的模运算技巧产出垃圾;T 不是平凡可析构的话,在槽位里就地覆写就要调用析构函数(而一个完全无每槽状态的无锁队列做不到这一点)。两者都是编译期错误;早抓早好。其二,buf 本身也加 alignas(kCacheLine),让第一个元素不与 tail_ 共用一条 line。其三,用数学记号把索引关系写出来:
用 1 亿条序列号验证
SPSC 队列的正确性是经验性的:推入 0, 1, 2, ..., N-1,pop 出来,验证消费者按顺序收到、无重复、无空洞。1 亿条消息足够让 release / acquire 选错引起的任何竞争在几秒内现形。
#include <chrono>
#include <cstdint>
#include <cstdio>
#include <thread>
static constexpr std::size_t kCapacity = 4096;
static constexpr std::int64_t kN = 100'000'000;
SpscQueue<std::int64_t, kCapacity> g_q;
void producer() {
for (std::int64_t i = 0; i < kN; ++i) {
while (!g_q.try_push(i)) { /* busy-wait if full */ }
}
}
void consumer() {
std::int64_t expected = 0;
std::int64_t got;
while (expected < kN) {
if (g_q.try_pop(got)) {
if (got != expected) {
std::fprintf(stderr, "ORDER ERROR at %lld: got %lld\n", (long long)expected, (long long)got);
std::abort();
}
++expected;
}
}
}
int main() {
const auto t0 = std::chrono::steady_clock::now();
std::thread cons(consumer);
std::thread prod(producer);
prod.join();
cons.join();
const auto t1 = std::chrono::steady_clock::now();
const double sec = std::chrono::duration<double>(t1 - t0).count();
std::printf("%lld messages in %.3f s -> %.2f Mmsg/s, %.1f ns/msg\n",
(long long)kN, sec, kN / sec / 1e6, sec / kN * 1e9);
return 0;
}
两个值得点名的设计选择。容量取 4096——小到整个缓冲都能进 L1d(多数现代 x86 是 32 KiB),让生产者的写和消费者的读都贴在最近的缓存里。消费者线程先启动,于是生产者不会在消费者还没存在时就把 1 亿条消息一次性灌完。在 Intel Xeon Gold 6342 / 6354(Ice Lake-SP)或同档 AMD EPYC 上、同一 socket 内绑核运行可达 50–200 Mmsg/s,每条消息延迟远低于 100 ns;安静机器上能压到每条 5 ns。
把线程绑到 CPU
接收线程与策略线程应当跑在专用核上。Linux 原语是 pthread_setaffinity_np,它把一条线程限定到指定的一组 CPU 上:
#include <pthread.h>
#include <sched.h>
// Pin the calling thread to a single physical CPU. Linux only.
bool pin_this_thread_to_cpu(int cpu) {
cpu_set_t set;
CPU_ZERO(&set);
CPU_SET(cpu, &set);
return pthread_setaffinity_np(pthread_self(), sizeof(set), &set) == 0;
}
// Usage inside the SPSC benchmark above:
// std::thread cons([]{ pin_this_thread_to_cpu(2); consumer(); });
// std::thread prod([]{ pin_this_thread_to_cpu(3); producer(); });
cpu_set_t 是一个位图(每个逻辑 CPU 一位)。CPU_ZERO 全清;CPU_SET(N, &set) 置位第 N 位。pthread_setaffinity_np 成功后,该线程就只能跑在 CPU N 上。shell 等价是 taskset -c N ./binary,它在 exec 之前设亲和性,于是整个进程都被绑。延迟关键工作里偏好程序内 pthread_setaffinity_np,因为它允许你把不同线程绑到不同核,无需 fork 子进程。
三个与亲和性同框的运维概念。**isolcpus=2,3 作为内核启动参数** 把 CPU 2 与 3 从操作系统调度器中彻底拿走,作为只有应用显式亲和请求才能用的专用核。没有后台守护进程、不会被迁移。**numactl --cpunodebind=0 --membind=0** 把线程与内存分配都绑到单一 NUMA 节点——在双 socket 机器上是关键,因为跨 socket 访存额外要付 ~100 ns、且经过窄得多的互联。**chrt --rr 50 ./binary** 把这个二进制以 SCHED_RR 实时调度优先级 50 跑起来(kernel chrt man page 解释 1–99 区间与 SCHED_FIFO / SCHED_RR / SCHED_OTHER 三类),把应用提到默认 SCHED_OTHER 分时调度之上。仅在测出 baseline 之后再组合使用;每一项都会增加运维复杂度,且收益是非线性叠加的。
三配置吞吐 benchmark
同一份 SPSC benchmark,分别在三种绑核配置下跑,能直接看出缓存一致性故事和 NUMA 罚款故事。在安静机器上跑:
# Build the SPSC benchmark.
g++ -std=c++17 -O3 -pthread spsc_bench.cpp -o spsc_bench
# Configuration 1: unpinned. Let the kernel pick cores.
./spsc_bench
# Configuration 2: pinned to two cores on the same socket (CPUs 2 and 3 on socket 0).
taskset -c 2,3 ./spsc_bench
# Configuration 3: pinned to two cores on different sockets (CPU 2 on socket 0, CPU (cores_per_socket + 2) on socket 1).
# On a 24-core-per-socket Xeon Gold / EPYC, that is CPUs 2 and 26. Check with: lscpu --extended
taskset -c 2,26 ./spsc_bench
# Bonus: bind memory to socket 0 explicitly to make the same-socket case faithful.
numactl --cpunodebind=0 --membind=0 taskset -c 2,3 ./spsc_bench
# Expected ordering on a quiet dual-socket box:
# Config 2 (same socket) > Config 1 (unpinned, variance high) > Config 3 (cross-socket, NUMA penalty visible).
预期顺序:Config 2 最快(两核同 socket、共享 L3、无 NUMA 罚款);Config 1 慢一档且方差大(kernel 会迁移线程,把热的 L1 / L2 状态驱出);Config 3 最慢(每次 head_ 与 tail_ 之间的 cache line ping-pong 现在都要跨 socket 互联(UPI / Infinity Fabric)走一遍,每次往返几百纳秒)。
收官:接收-到-策略流水
L3 的收官把所有东西串起来。接收线程产生 1000 万条合成 tick(生产里这里换成 L4 的 ITCH 解码器),通过 SpscQueue<TickEvent, 4096> 推过去,策略线程 pop 并维护一个 running maximum 与 running max-drawdown——策略能做的最简单的非平凡簿记。
struct TickEvent {
std::int64_t ts_ns;
std::uint32_t symbol_id;
double price;
};
static constexpr std::size_t kCap = 4096;
static constexpr std::int64_t kEvents = 10'000'000;
SpscQueue<TickEvent, kCap> g_pipe;
// Producer (receive thread): generate kEvents synthetic ticks.
void receive_thread() {
pin_this_thread_to_cpu(2);
for (std::int64_t i = 0; i < kEvents; ++i) {
TickEvent t{i, 0u, 4.20 + 0.0001 * static_cast<double>(i % 1000) - 0.05};
while (!g_pipe.try_push(t)) { /* spin */ }
}
// Sentinel: ts_ns = -1 means end-of-stream.
TickEvent sentinel{-1, 0u, 0.0};
while (!g_pipe.try_push(sentinel)) { /* spin */ }
}
// Consumer (strategy thread): running max and running max-drawdown.
void strategy_thread() {
pin_this_thread_to_cpu(3);
double running_max = -1.0;
double max_drawdown = 0.0;
std::int64_t n = 0;
TickEvent t;
while (true) {
if (!g_pipe.try_pop(t)) continue;
if (t.ts_ns < 0) break; // sentinel
if (t.price > running_max) running_max = t.price;
const double dd = (running_max - t.price) / running_max;
if (dd > max_drawdown) max_drawdown = dd;
++n;
}
std::printf("strategy: %lld events, max=%.4f, max_drawdown=%.6f\n",
(long long)n, running_max, max_drawdown);
}
接收线程绑到 CPU 2;策略线程绑到 CPU 3。合成价格走的是 4.20 + 0.0001 * (i mod 1000) - 0.05,产生一段在 [4.15, 4.25] 之间平滑振荡的序列——既能锻炼 running-max-drawdown 逻辑,又不依赖任何行情文件。-1 sentinel 标示流末尾——就是 L1 的同款模式、只是这一次没有锁。端到端吞吐应当超过每秒一千万事件(也就是总耗时低于 1 秒),每事件中位数延迟在十几纳秒。验证方式:写一个把同一串价格灌进 std::vector<double>、然后跑同一套 max-drawdown 循环的单线程参考实现 mc_reference(),验证两边 max_drawdown 在 6 位小数内一致。
这就是每一条生产 tick-to-trade 路径的单位模式。接收线程解码 ITCH(或 FIX、或你自家的内部二进制行情);策略线程更新本地簿、计算信号;下单线程(第三段,我们这里不写)把单子发到交易所。每对相邻阶段恰好就是这把无锁队列。国内最激进 quant(幻方 / 鸣熙 / 九坤 / 明汯 / 灵均 等)在 CFFEX / SSE / SZSE COLO 机房的 tick-to-trade 预算是 1–5 μs;上面这个数据结构是把同步开销压到这么低预算允许之内的唯一办法。
下一课
L4 转向「线」——BSD 套接字、FIX 4.4 会话报文、可公开学习的 ITCH 5.0 二进制行情记录、epoll 支撑高连接数服务、以及一段 Asio 简介。L4 拥有 L3 接收线程将要解码的那批数据。L5 再下沉到内核网络栈以下:DPDK / Solarflare OpenOnload / AF_XDP / io_uring 与硬件 NIC 时间戳——tick-to-trade 预算从 5 μs 被压到 1 μs 以下的那一层。本课的 SPSC 队列在 L4 与 L5 每个范例里都原样作为骨干。
练习
Exercise
(a) 把上面的 SpscQueue<T, N> 实现出来。构建 1 亿条序列号的验证器并运行:taskset -c 2,3 ./spsc_bench。确认消费者没有报 ORDER ERROR(即 FIFO 顺序被准确保持)。记录吞吐 Mmsg/s 与每条延迟 ns。预期:在现代 Xeon / EPYC、同 socket 两核绑核条件下,50–200 Mmsg/s 且 < 100 ns/msg。(b) 同一份 benchmark 跑三种配置:unpinned(./spsc_bench)、同 socket 绑核(taskset -c 2,3 ./spsc_bench)、跨 socket 绑核(24 核/socket 的盒子上是 taskset -c 2,26 ./spsc_bench;通过 lscpu --extended 找你自己机器上的实际跨 socket 配对)。汇报三者吞吐。同 socket 应当最快;跨 socket 应当呈现 NUMA 罚款(~2–5x 慢)。用一句话说明 NUMA 边界两侧的缓存一致性流量有何不同。(c) 把 head_ 与 tail_ 的 alignas(kCacheLine) 限定符去掉(让它们落到同一 cache line 上)。重跑同 socket benchmark。吞吐应当下跌 5–10x,因为生产者对 head_ 的写和消费者对 tail_ 的写现在会 ping-pong 同一 cache line。用一句话说明这恰好是 L2 教的伪共享问题。恢复 alignas 行。(d) 收官:搭起 receive_thread / strategy_thread 流水,kEvents = 10'000'000,跑 taskset -c 2,3 ./capstone。确认策略线程报告 10000000 events 且 max_drawdown 与单线程参考实现一致(把参考实现写成一个 mc_reference() 函数,把同一串价格灌入 std::vector<double> 再跑同一段 max-drawdown 循环)。两边 max_drawdown 必须在 6 位小数内一致。吞吐应当超过 10 Mevents/s(即总耗时 < 1 秒)。用一句话说明为什么这条流水是每一条 quant tick-to-trade 路径的单位模式。
提示
(a):容量上的 static_assert 抓的是常见笔误(例如 N = 1000)。延迟把 wall-clock 秒数除以 kN、再乘 1e9。
提示
(c):当 head_ 与 tail_ 共用一条 cache line 时,生产者对 head_ 的每次写都让消费者 L1 上这条 line 失效,迫使消费者下一次写 tail_ 时重取——L2 那种伪共享模式,落在承重数据结构上。