国内某头部私募的低延迟交易团队把 510300.SH (沪深300 ETF) 的行情接入和 50ETF 策略引擎之间的那一跳从 crossbeam::channel 换成手写 SPSC 环形缓冲后, P99.9 的延迟从 25 μs 掉到 6 μs——单跳省了 ~20 μs, 是策略整体延迟预算 (~50 μs) 的 40%。这就是这一节课的全部出发点: Mutex<T> 一次 lock/unlock 往返在现代 x86 上 ~25-50 ns; 一次 fetch_add ~5-10 ns; 在内层热路径上, 这个 5× 的差距是策略「能不能赶在涨跌停板被吃掉之前发单」的差距。本课不要求你能从头证明无锁数据结构的可线性化, 但要让你能读懂生产代码、写一份 SPSC, 并且明白什么时候应该停手换 crossbeam_queue。
原子类型
std::sync::atomic 提供 AtomicBool, AtomicI8 / AtomicI16 / AtomicI32 / AtomicI64 / AtomicIsize, AtomicU8 / AtomicU16 / AtomicU32 / AtomicU64 / AtomicUsize, 以及 AtomicPtr<T>。前八个用于无锁计数、状态标志、索引; AtomicPtr<T> 用于发布裸指针——典型场景是无锁数据结构和一次性初始化。先看最简单的 Fenced 例子, 暂时把内存序的复杂性留到后面:
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::thread;
fn main() {
let counter = Arc::new(AtomicU64::new(0));
let mut handles = Vec::new();
for _ in 0..4 {
let c = Arc::clone(&counter);
handles.push(thread::spawn(move || {
for _ in 0..1000 {
c.fetch_add(1, Ordering::Relaxed);
}
}));
}
for h in handles {
h.join().unwrap();
}
println!("{}", counter.load(Ordering::SeqCst));
}
打印结果是 4000。对比 L1 的 Arc<Mutex<u64>> 版本: 答案一样, 没有锁竞争, 实测大约快一个数量级。但是, 这只是因为这里的「计数值不用来同步任何其他内存」。一旦你想用一个 flag 通知「初始化已完成, 你现在可以读 buffer 了」, Relaxed 就不再够用——这就是本课其余部分要讲的悬崖。
你实际会用到的方法
load(order) -> T——读取当前值。store(value, order)——写入新值。fetch_add(delta, order) -> T、fetch_sub/fetch_or/fetch_and/fetch_xor——做对应的算术或位运算, 都返回 之前 的值。compare_exchange(current, new, success, failure) -> Result<T, T>——比较并交换 (CAS), 当且仅当当前值恰好为current时把它改为new; 成功返回Ok(prev), 失败返回Err(actual)。compare_exchange_weak(...) -> Result<T, T>——契约一致, 但允许「伪失败」(spurious failure)。在 ARM 这类用 LL/SC (load-linked / store-conditional) 实现 CAS 的架构上, 允许伪失败让硬件能少打一层重试簿记, 通常出现在loop { ... }重试模式里。
下面是用 compare_exchange_weak 模拟 fetch_max 的经典重试循环——回测里追踪历史最大回撤的高水位线常用这个模式:
use std::sync::atomic::{AtomicU64, Ordering};
fn fetch_max_emulated(a: &AtomicU64, candidate: u64) -> u64 {
let mut current = a.load(Ordering::Relaxed);
loop {
if candidate <= current {
return current;
}
match a.compare_exchange_weak(
current, candidate, Ordering::Release, Ordering::Relaxed,
) {
Ok(prev) => return prev,
Err(actual) => current = actual,
}
}
}
内存序: 五个名字与一条经验法则
std::sync::atomic::Ordering 的五个变体, 从弱到强:
Relaxed counter only, no sync
Acquire load that consumes a published value
Release store that publishes a value
AcqRel read-modify-write that needs both halves
SeqCst global total order, the simple choice
操作上的经验法则覆盖 ~90% 的真实代码: 存值「发布」一个数据时用 Release (写完数据再把 ready flag 设为 true, flag store 用 Release); 匹配的 load 用 Acquire (读 flag 看到 true 时, 你立刻保证看到 producer 在 release store 之前写入的全部内存); read-modify-write 操作用 AcqRel (比如环形缓冲的索引推进, 既要看到旧值, 又要发布新值); **想要全局总序又懒得思考时用 SeqCst**——它是最直观的心智模型, 也是最贵的; **只用作纯计数, 不同步任何其他内存时用 Relaxed**。Acquire load 与匹配的 Release store 之间建立的「happens-before」边, 正是 Mutex<T> 内部用来保证「拿到锁的线程能看见上一个持锁线程的全部写入」的那种同步关系——你写的每一次锁操作背后都站着这条规则。
强 CAS 与弱 CAS 的取舍写在这条法则之外要单独记一笔: compare_exchange (强 CAS) 永不伪失败, 适用于「一次性把状态从 A 推到 B, 不打算重试」这类操作, 例如把单例的初始化标志从 false 翻成 true; compare_exchange_weak (弱 CAS) 允许硬件在某些 cache 状态下伪失败, 但它跑在 loop { ... } 重试模式里时, ARM / RISC-V 这些 LL/SC 架构能少打一层重试簿记的汇编。规则的简化版: 一次性 CAS 用强的, 循环里 CAS 用弱的。本课其余地方所有重试模式都用弱版。
工作示例: 一条 SPSC 环形缓冲
单生产者单消费者环形缓冲, 量化场景里 feed handler → 策略引擎那一跳的最小可用版本。容量必须是 2 的幂, 这样 index & mask 就等于 index % capacity:
use std::cell::UnsafeCell;
use std::mem::MaybeUninit;
use std::sync::atomic::{AtomicUsize, Ordering};
pub struct SpscRing<T> {
buf: Vec<UnsafeCell<MaybeUninit<T>>>,
mask: usize,
head: AtomicUsize,
tail: AtomicUsize,
}
impl<T> SpscRing<T> {
pub fn push(&self, value: T) -> Result<(), T> {
let head = self.head.load(Ordering::Acquire);
let tail = self.tail.load(Ordering::Relaxed);
if tail.wrapping_sub(head) >= self.buf.len() {
return Err(value);
}
// SAFETY: the producer owns slot [tail], which the consumer has not yet read.
unsafe {
*(self.buf[tail & self.mask].get()) = MaybeUninit::new(value);
}
self.tail.store(tail.wrapping_add(1), Ordering::Release);
Ok(())
}
pub fn pop(&self) -> Option<T> {
let tail = self.tail.load(Ordering::Acquire);
let head = self.head.load(Ordering::Relaxed);
if head == tail {
return None;
}
// SAFETY: the consumer owns slot [head], whose value the producer has published.
let value = unsafe { (*self.buf[head & self.mask].get()).assume_init_read() };
self.head.store(head.wrapping_add(1), Ordering::Release);
Some(value)
}
}
unsafe 出现在两处, 都被一行 // SAFETY: 注释保护。可靠性论证一句话: 「生产者只写当前的空闲槽位 tail (消费者还没看见它), 消费者只读已发布的占用区间 [head..tail), 因此两边访问的槽位永远不重叠——借用检查器看不到这条不变式, 但 head 与 tail 的索引代数证明了它」。这个论证仅在「恰好一个生产者、恰好一个消费者」前提下成立; 生产级别的安全封装会通过一个 split() 构造函数返回 Producer 与 Consumer 两个独立 handle 类型, 不在 &self 上同时暴露 push / pop。Acquire 用在每方读取「对方拥有的索引」上, Release 用在每方写「自己拥有的索引」上; 自家拥有的索引可以用 Relaxed 读 (没有跨线程同步可言)。
跨线程使用还差最后两行:
unsafe impl<T: Send> Send for SpscRing<T> {}
unsafe impl<T: Send> Sync for SpscRing<T> {}
unsafe impl 是因为编译器无法证明 UnsafeCell 的访问纪律安全——程序员承担这部分证明。T: Send 是必要的, 因为 pop 把值从一个线程搬到另一个; Sync 让多个引用可跨线程共享 (例如生产者线程持有 &self, 消费者线程也持有 &self, 两份引用都是合法的)。公开的 push / pop API 本身是安全的——unsafe 被封装在了内部。
何时停手
请不要在生产代码里自己写环形缓冲。crossbeam_queue::ArrayQueue<T> 提供有界 MPMC, crossbeam_queue::SegQueue<T> 提供无界 MPMC——两个都经过审计、有 benchmark, 且在国内 quant 系统里被广泛部署。本课的 SpscRing<T> 是为了让你「能读懂这些 crate 的源码并认出模式」, 而不是为了让你拿去跑生产。手写无锁结构的成本不是编码, 而是审查——unsafe 一旦走出 review 视线, 出问题往往是几周以后才在异常 corner case 里现形, 而那种 bug 在量化场景里直接表现为「某个交易日凌晨重启后行情对不上账」。
crossbeam_utils::CachePadded<T> 与「伪共享 (false sharing)」是下一层优化: 当两个本应独立的原子变量住在同一条 cache line 上, 两个核心的写操作会互相把 cache line 在 L1 之间 ping-pong, 多线程反而比单线程慢; 症状是「我加了核心反而更慢」, 修法是把每个独立原子包进 CachePadded 让它独占一条 cache line。3.5.3 会讲怎么用 perf c2c 测量并定位伪共享, 本课只点名。最后一句: 独立的 std::sync::atomic::fence(Ordering) 在罕见场合 (例如某个 load 自身只能用 Relaxed, 但你需要在它之后插入一道 acquire 屏障) 用得上, 大多数代码不需要。
阅读建议
《Rust Atomics and Locks》(英文原版可在 marabos.nl/atomics 免费阅读, 中文社区在 GitHub 上整理翻译), 第 2 章「原子操作」、第 3 章「内存序」、第 4 章「自己造一个自旋锁」三章直接对应本课内容; course.rs 的「Rust 圣经」并发章节有对照的中文解读;《Rust 编程之道》原子操作与无锁章给出了从 C++ memory model 视角的横向比较。crossbeam-queue / crossbeam-utils 的 docs.rs 文档是生产替代品的权威参考。PingCAP TiKV、火山引擎金融科技、字节量化团队在公众号与 InfoQ 中文站发布的 Rust 无锁队列在订单簿前置 (order book front-end) 与撮合引擎中的实践文章, 本课的 SpscRing 几乎是这些文章里反复出现的最小骨架。国内 HFT / 中频量化在 CFFEX 接入网关上对 SPSC / MPSC 队列的依赖, 也是这条工艺路线在国内落地最显著的标志。
练习
Exercise
拿本课的 AtomicU64 计数器程序, 完成: (a) 4 个线程各做 1_000_000 次 fetch_add(1, Ordering::Relaxed), 最终值必须恰好为 4_000_000; (b) 把它的 wall-clock 与 L1 的 Arc<Mutex<u64>> 同样负载版本对比, 报告比值——在典型桌面机上原子版本大约快 5-15×; (c) 拿 compare_exchange_weak 版的 fetch_max_emulated, 写一个测试: 4 个线程各调用一次, 实参取 0..1_000_000 范围内的随机值; 测试结束时用 assert_eq! 断言 atomic 的最终值等于所有 candidate 中的最大值; (d) 用两句话解释为什么 Ordering::Relaxed 对自增计数足够, 但对「告知初始化完成」的 flag 不够。
提示
Relaxed 不保证跨原子操作的顺序, 但保证「这一次 fetch_add 是不可分割的」, 因此 4M 次加 1 的累加结果一定恰好 4M。