周四下午,你在 SZSE 福田 COLO 机房的运维终端前盯着沪深300 ETF 行情接入面板。3.5.2 L3 你亲手写了一个 SPSC 环,目的是让你之后读生产无锁代码时心里有底;但到了生产代码,你 99% 的场合会直接去用 crossbeam_queue。今早的事故复盘把原因摆得明明白白:兄弟基金的策略组自己搓了一个 MPMC 队列,在高竞争下漏掉了一个 ABA 边界,丢了 20 秒的 CFFEX 委托回报。修复不是「再聪明一点」,而是「直接用经过审计与实战检验的生产原语,把手写无锁队列留给那条你能拿 benchmark 证明 unsafe 审计代价值得的特殊路径」。本课就是从 3.5.2 L3 跳到生产级的那一步:何时取 ArrayQueue,何时保留缓存填充的 SPSC,何时 SegQueue 才是正解,以及如何把反压设计成「慢消费者」状态可观测而不是被静默掩盖。
crossbeam 生态
Cargo.toml 一行依赖带进来四个 crate,每个有一条经验法则。Fenced 下面是这套依赖的标准 toml 形态,每个新低延迟项目直接抄过去:
[dependencies]
crossbeam-queue = "0.3"
crossbeam-utils = "0.8"
core_affinity = "0.8"
[dev-dependencies]
criterion = "0.5"
[[bench]]
name = "queue_bench"
harness = false
crossbeam-channel 是 std::sync::mpsc 的生产级替代,原生 MPMC、带 select! 宏;在 3.5.2 L2 已点名,在异步代码里继续复用。crossbeam-queue 是本课的主角,提供 ArrayQueue 有界 MPMC 与 SegQueue 无界 MPMC 两种无锁队列原语。crossbeam-utils 提供 CachePadded<T>、指数退避 Backoff 与 thread::scope 等小工具,你在 L1 已经用过 CachePadded。crossbeam-epoch 是无锁数据结构内部使用的「时代式」垃圾回收器,你几乎不会直接调用,但知道它存在能解释为什么 crossbeam_queue 能安全地把一个槽位归还给生产者,而消费者可能还在读它。
ArrayQueue<T> 细节
ArrayQueue::new(capacity) 返回一个固定容量的连续存储,实现内部把容量向上取整到下一个 2 的幂以便做廉价模运算。共享方式是 Rust 的典型套路:用 Arc<ArrayQueue<T>> 包起来,每个生产者或消费者复制一份 Arc。队列本身对任意 T: Send 都是 Send + Sync。API 是非阻塞的,反压通过返回值显现:try_push(value: T) -> Result<(), T> 在有空槽时返回 Ok(()),队列满时返回 Err(value) 把值还给调用方让它决定怎么办;try_pop() -> Option<T> 在有值时返回 Some(value),空时返回 None。len() / capacity() / is_full() / is_empty() 这些便捷断言适合做遥测,但绝不能在热路径上拿来做控制流 —— is_full() 返回 false 之后到你下一行 try_push 之间,另一个生产者可能就把队列塞满了。永远在 try_push 的 Result 上分支。
四 producer 一 consumer 接 510300.SH feed-handler → strategy 的标准管道:
use crossbeam_queue::ArrayQueue;
use std::sync::Arc;
use std::thread;
#[derive(Debug, Clone, Copy)]
pub struct MdEvent {
pub ts: u64,
pub symbol_id: u32,
pub price_ticks: i32,
pub qty: u32,
}
fn run_pipeline() {
let queue: Arc<ArrayQueue<MdEvent>> = Arc::new(ArrayQueue::new(1024));
// 4 feed-handler producers.
let mut producers = Vec::new();
for feed_id in 0..4_u32 {
let q = Arc::clone(&queue);
producers.push(thread::spawn(move || {
for i in 0..10_000_000_u64 {
let ev = MdEvent { ts: i, symbol_id: feed_id, price_ticks: 4500, qty: 100 };
// drop-oldest backpressure on full queue.
if let Err(ev) = q.try_push(ev) {
let _ = q.try_pop();
let _ = q.try_push(ev);
}
}
}));
}
// 1 strategy consumer.
let qc = Arc::clone(&queue);
let consumer = thread::spawn(move || {
let mut total: u64 = 0;
loop {
if let Some(_ev) = qc.try_pop() {
total += 1;
}
if total >= 40_000_000 { break; }
}
});
for p in producers { p.join().unwrap(); }
consumer.join().unwrap();
}
这段代码里藏着四条生产法则:
- 容量
1024故意小于「四路 producer 合并峰值」,目的就是让反压在基准里真的触发,你才能在测量中看见「慢消费者」状态;生产环境通常从 1024 或 4096 起步,然后用测量去调。 - drop-oldest 是行情 tick 的国内默认,与一线量化 (幻方 / 鸣熙 / 九坤 / 明汯 / 灵均) 的做法一致 —— 一条 50 毫秒前 strategy 还没消费的 tick,比刚到的 tick 价值低。
- 「每个任务一个
Arc::clone(&queue)」是共享模式;永远不要写Arc<Mutex<ArrayQueue<T>>>—— 队列已经是无锁的,外面再包一个互斥锁就把全部好处抹掉了。 MdEvent标了Copy,过队列时拷贝成本可忽略;若放入非Copy负载,try_push取走所有权,Err(value)时再把值还给你。
缓存填充版 SPSC 升级
3.5.2 L3 让你用两个 AtomicUsize 索引手搓了一个 SPSC 环。那个设计正确、可审计、能上生产,但原始草稿里 head 与 tail 在结构体里背靠背,实际多半落在同一条 64 字节缓存行上。稳态下生产者线程写 tail,消费者线程写 head;任一方的写都让另一方 L1 里的这条缓存行失效;吞吐量塌掉 2-3 倍。修复就是把 L1 的 CachePadded<T> 应用到 L2 的数据结构上:
use crossbeam_utils::CachePadded;
use std::cell::UnsafeCell;
use std::mem::MaybeUninit;
use std::sync::atomic::{AtomicUsize, Ordering};
pub struct SpscRingPadded<T> {
buf: Vec<UnsafeCell<MaybeUninit<T>>>,
mask: usize,
head: CachePadded<AtomicUsize>, // consumer writes head
tail: CachePadded<AtomicUsize>, // producer writes tail
}
impl<T> SpscRingPadded<T> {
pub fn with_capacity_pow2(cap: usize) -> Self {
assert!(cap.is_power_of_two());
let mut buf = Vec::with_capacity(cap);
for _ in 0..cap { buf.push(UnsafeCell::new(MaybeUninit::uninit())); }
Self {
buf,
mask: cap - 1,
head: CachePadded::new(AtomicUsize::new(0)),
tail: CachePadded::new(AtomicUsize::new(0)),
}
}
}
// Safety: producer and consumer never access overlapping slots; index discipline proves it.
unsafe impl<T: Send> Send for SpscRingPadded<T> {}
unsafe impl<T: Send> Sync for SpscRingPadded<T> {}
在 CFFEX 张江 COLO 机房的 Xeon Gold 6342 上跑基准,一 producer 一 consumer 各 push / pop 10_000_000 个 u64,两线程用 core_affinity::set_for_current(...) 钉到不同物理核,缓存填充版本比未填充快 2-3 倍。「何时手搓 SPSC」决策树:
- 你在自己工作负载上测出了真实收益 —— 通常在竞争激烈的 SPSC 场景下有 30-50% 的延迟或吞吐量提升。
- 生产者真的只有一个,消费者真的只有一个,这条不变式在代码评审流程中可被强制约束。
- 你接受
unsafe审计成本:每次改动push/pop主体都要由能证明索引纪律仍然成立的同事审过。
默认拿 ArrayQueue;手写 SPSC 的审计与基准成本,只在整个引擎最热的一两条路径上才划算。
反压:四种生产响应
当 ArrayQueue::try_push 返回 Err(value),队列满了,你得决定下一步做什么。四种规范响应覆盖所有情况:
use std::sync::atomic::{AtomicU64, Ordering};
// (1) Drop-oldest: the market-data tick default.
fn try_push_drop_oldest(q: &ArrayQueue<MdEvent>, ev: MdEvent) {
if let Err(ev) = q.try_push(ev) {
let _ = q.try_pop();
let _ = q.try_push(ev);
}
}
// (2) Drop-newest: rare in trading; appropriate when old data ordering matters more.
fn try_push_drop_newest(q: &ArrayQueue<MdEvent>, ev: MdEvent) {
let _ = q.try_push(ev); // Err(_) silently discards.
}
// (3) Block-producer with backoff: never on the tick hot path; bulk-load only.
fn try_push_blocking(q: &ArrayQueue<MdEvent>, mut ev: MdEvent) {
let backoff = crossbeam_utils::Backoff::new();
loop {
match q.try_push(ev) {
Ok(()) => return,
Err(v) => { ev = v; backoff.snooze(); }
}
}
}
// (4) Escalate-to-supervisor: log + counter; required pattern in regulated trading.
fn try_push_escalating(q: &ArrayQueue<MdEvent>, ev: MdEvent, lost: &AtomicU64) {
if q.try_push(ev).is_err() {
lost.fetch_add(1, Ordering::Relaxed);
// production: emit a structured log entry via `tracing::warn!(target = "feed.backpressure", ...);`
}
}
drop-oldest 是行情 tick 的默认,理由前文已述。drop-newest 罕见;用法是审计日志条目里「宁可丢一条也别打乱已写日志的顺序」。block-producer 在 tick 热路径上禁用,因为阻塞会向上游传播形成 head-of-line 阻塞,严重时把整条行情接入打挂;唯一可接受的场景是离线数据回放工具往启动中的引擎灌数据。escalate-to-supervisor 是合规交易的默认:每一次丢弃都必须有一个计数器递增,可观测性管道把这个数报上来,持续非零就 page 值班工程师。证监会与交易所合规检查时想看到的是这条日志与这个计数器,「队列静默溢出」不是可接受的答案。
SegQueue 与无界风险
同一 crate 的 SegQueue<T> 是无界 MPMC;构造 SegQueue::new() 不带容量参数,API 没有 try_push —— 没有满状态,只有永远成功的 push(value: T) 与 pop() -> Option<T>。内部实现是按需分配的「段」链表。风险是显式的:无界队列把慢消费者问题掩盖起来,消费者比生产者慢的话,SegQueue 会一直增长直到耗尽内存,这在 tick 热路径上等于事故。合法用法是日志、遥测、审计这种允许消费者 (通常是磁盘 writer) 在突发时暂时落后、之后追回的场景。本模块 L4 的交易引擎数据路径全部走 ArrayQueue,只把审计日志队列留给 SegQueue 以吸收偶发磁盘写停顿。
四行参考表归纳「何时用哪种队列原语」;use-case 标签、crate / type 名、有界 vs 无界区分、生产法则在两区版本上是逐字节一致的,因为这套纪律是普适的:
Use case Crate / Type Capacity Production rule
Feed-handler -> strategy hot path crossbeam_queue::ArrayQueue<MdEvent> bounded (e.g. 1024) drop-oldest backpressure; escalate on overflow; never SegQueue
Strategy -> order-router crossbeam_queue::ArrayQueue<OrderEvent> bounded (e.g. 256) escalate-to-supervisor on overflow; never silently drop
Hand-rolled SPSC tick path SpscRing<T> with CachePadded indices bounded power-of-2 (e.g. 4096) only when measured > 30% latency win vs ArrayQueue; unsafe audit required
Audit log / telemetry crossbeam_queue::SegQueue<LogEvent> unbounded acceptable because consumer (disk writer) can lag during bursts; OK to absorb a few MiB
crossbeam_epoch 与「安全内存回收」问题点名一次:在带链节点的无锁结构里 (Treiber 栈、Michael-Scott 队列、无锁跳表),被移除节点的内存何时可以释放,而并发读者可能还持有指向它的指针?直接「立即释放」会造成 use-after-free。生产答案是「时代式」垃圾回收:每个参与该结构的线程把自己 pin 到当前时代,数据结构把被移除节点登记为「未来某时代回收」,GC 在「不存在还 pin 在那个可能看见该节点的时代上的线程」时才释放。crossbeam_epoch::pin() 几乎从不在应用代码里直接出现;如果你想去调用它,先停下,看看 crossbeam_queue::ArrayQueue 或 crossbeam_skiplist::SkipMap 是否已经解决了你的问题。
社区里还流通两个 MPMC channel crate 顺带提一句:flume 同时支持 sync 与 async,带 select! 宏,在 async 圈子里很受欢迎;kanal 是 benchmark 数字漂亮的新选手。两者都是可信的候选,但国内 Rust 量化团队 (幻方 / 鸣熙 / 九坤 / 明汯 / 灵均) 与头部券商自营在 feed → strategy 与 strategy → order-router 这两条主要数据流上仍然标准化在 crossbeam_queue,因为它是经过审计的事实标准。课后阅读:Rust Atomics and Locks (marabos.nl/atomics) 第 5-6 章 (英文免费;中文社区翻译进行中);course.rs「Rust 圣经」并发与无锁数据结构章;《Rust 编程之道》原子操作章;1996 PODC 论文 Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms (英文,前向指针);LMAX Disruptor 白皮书 (英文,设计模式参考)。手写 MPMC 无锁队列与 haphazard 风险指针 → 3.5.4 与进阶阅读。
Exercise
Exercise
(a) 用 Arc<ArrayQueue<MdEvent>> 容量 1024 实现一个 4 producer / 1 consumer 管道,每个生产者用 try_push + drop-oldest 反压 (if Err(v) = q.try_push(v) { q.try_pop(); q.try_push(v); }) 推 10_000_000 条合成 MdEvent,消费者用 try_pop 在忙循环里累计到 40_000_000 条 pop;用 criterion 测端到端墙钟时间,报告每事件 ns 级延迟。(b) 加一个 AtomicU64 lost_events_counter,切换反压策略为 escalate-to-supervisor (if q.try_push(v).is_err() { lost.fetch_add(1, Ordering::Relaxed); });跑相同工作负载,报告 lost_events_counter 终值 (在竞争下应当 > 0,因为消费者故意比生产者合起来慢)。(c) 把 3.5.2 L3 的手写 SpscRing<T> 升级:head: AtomicUsize / tail: AtomicUsize 改为 head: CachePadded<AtomicUsize> / tail: CachePadded<AtomicUsize>;一 producer 一 consumer 各 push / pop 10_000_000 个 u64,用 core_affinity::set_for_current(...) 钉到不同物理核;基准并报告 padded vs un-padded 的吞吐比 (预期:padded 快 2-3 倍)。(d) 用 crossbeam_utils::Backoff::new() + .snooze() 在 loop { match q.try_push(v) { Ok(()) => return, Err(v_back) => { v = v_back; backoff.snooze(); } } } 形式里实现 try_push_blocking,写一个单元测试证明消费者 pop 后生产者最终成功,并用两句话解释为什么这个模式在 tick 热路径上被禁用。(e) 只判断不实现:分别给 ArrayQueue<T>、SegQueue<T>、手写 cache-padded SPSC 各举一个生产场景。说明 flume 与 kanal 是否可作为 (a) 的 drop-in 替代,以及为什么 crossbeam_queue 在金融 Rust 圈仍然是事实标准。
提示
head 与 tail 包进 CachePadded<AtomicUsize>,生产者与消费者就写到不同缓存行的原子上,这就把未填充版本被 MESI 流量损失的吞吐量找回来。提示
lost_events_counter 当作可观测信号:持续非零是触发 page 值班的信号,而不是参数。行业背景
国内 Rust 量化与券商自营头部:幻方、鸣熙、九坤、明汯、灵均、宽德、思勰、衍盛、磐松、博普、宁聚,以及中信、中信建投、华泰、海通、招商、国泰君安、中金 Rust 自营线。生产标的:510300.SH、510500.SH、IF / IC / IH 股指期货、SC 原油、SHFE 铜、CZCE PTA 与 DCE 豆粕等热路径,统一 crossbeam_queue::ArrayQueue。证监会、SSE、SZSE、CFFEX、SHFE、CZCE、DCE 是下游监管面;Wind、东方财富 Choice、聚源、同花顺是回测数据源。
通向 L3 的桥
到这里你掌握了生产级的事件传递原语:ArrayQueue<T> 为默认、缓存填充 SPSC 是测出来才用的 1% 特殊路径、drop-oldest 加 escalation 计数器是反压策略、tick 数据上禁用 SegQueue。L3 接着追问:Arc<ArrayQueue<MdEvent>> 里的事件到底从哪里来?答案是异步行情接入。我们把 std::thread 换成 tokio::spawn,把 tokio::net::UdpSocket 接到 multicast 组,用 tokio_util::codec::Framed 配合自定义 Decoder 解析 纳斯达克 TotalView ITCH 5.0 wire (架构上与 SSE Level-2 / SZSE Level-2 / CFFEX 完全一致,协议层因订阅授权用公开 ITCH 5.0 作教学替身),检测序列号缺口,触发三条生产恢复路径 —— TCP 重传、A/B 冗余 feed、snapshot-then-incrementals。L3 的 feed handler 就是把事件推进你刚搭好的 ArrayQueue 的那条 async-network-edge;L4 里它们汇合成一个被测量的 tick-to-trade 引擎。