国内某头部私募的中频策略团队把 3.5.1 写的那版单线程 Monte Carlo 定价器搬上生产: 给 510300.SH (沪深300 ETF) 的欧式看涨期权报实时理论价。CPU 占用率长期挂在 8%——一台 16 核机器只有一个核在干活。负责人甩给你的任务很直接: 把 n_paths 平均分到 N 个线程上, 共享一个 f64 累加器, 拿到一样的价格, 把 CPU 用满, T+1 收盘前要看见 Grafana 上 CPU 曲线被打满。这一课就是把这个任务从「明天上线」做到「编译期就保证无数据竞争」所需要的全部 Rust 并发底座: OS 线程、Send / Sync 标记 trait, 以及 Arc<Mutex<T>> 这套你接下来 200 分钟里每一节课都会用到的共享状态模式。蚂蚁链、字节火山引擎金融科技、PingCAP 等团队的公开技术分享, 以及多家 SSE 成员私募的 Rust 重写项目, 都坐在这套四原语之上。
std::thread::spawn 与 JoinHandle
入口函数的签名你只需要读一次:
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
三个 trait 边界各对应一句话: FnOnce 因为闭包在新线程里只跑一次, 不像 Fn / FnMut 那样可重复调用, 也不需要把捕获的变量保留到下一次调用; Send 因为闭包捕获的环境要被「搬」到另一个线程, 编译器必须确认这次跨线程所有权转移是安全的; 'static 因为新线程可能比派生它的栈帧活得更久——任何被闭包捕获的引用都必须活到程序结束 (即 &'static), 否则当前函数返回、栈帧被销毁后子线程会读到悬垂引用。一旦理解这三条, cargo check 抛出的所有线程相关错误你都能在不读文档的情况下定位。
Fenced 标准模式是: 派四个线程, 每个打印自己的索引, 然后逐个 join:
use std::thread;
fn main() {
let mut handles = Vec::new();
for i in 0..4 {
handles.push(thread::spawn(move || {
println!("线程 {}", i);
}));
}
for h in handles {
h.join().unwrap();
}
}
thread::spawn 返回 JoinHandle<T>; handle.join() 阻塞当前线程, 等到那个新线程退出后返回 Result<T, Box<dyn Any + Send>>——Ok(value) 是闭包的返回值, Err(payload) 表示那个线程 panic 了, 你拿到的是 panic 的载荷。生产代码里 .unwrap() 把 panic 透传出来; 库代码里你可能想 match 一下并把错误向上传。thread::current().id() 在调试里能给你一个 ThreadId; thread::sleep(Duration::from_millis(10)) 是阻塞当前线程; thread::Builder::new().name("worker-1".into()).stack_size(64 * 1024).spawn(...) 是你需要给线程起名 (在 profiler、gdb info threads 里能看见) 或调整栈大小时的入口。底层上 std::thread::spawn 在 Linux 上其实就是 pthread_create 的薄包装, 这里点到为止——具体 FFI 留到 3.5.4。
借父栈数据: thread::scope
'static 这条边界很硬。如果你想让线程读栈上的 Vec<f64>, 上面那版 thread::spawn 会拒绝编译——栈帧可能在子线程还活着时就被销毁。2022 年稳定的 std::thread::scope 就是为这个场景设计的:
use std::thread;
fn main() {
let data = vec![1, 2, 3, 4];
thread::scope(|s| {
s.spawn(|| {
println!("{:?}", &data);
});
s.spawn(|| {
println!("len = {}", data.len());
});
});
}
没有 Arc, 没有 move。安全性的关键是 thread::scope 这个 API 自己负责: 它在闭包返回前会 join 掉每一个 s.spawn 出去的线程, 所以所有被借用的数据保证活到 scope 结束。你接下来在 L2 写 fan-out / fan-in Monte Carlo 时几乎都用这种 scoped 形式——它省掉一大堆 Arc::clone 的样板。
数据竞争是编译期错误: Send / Sync
Send 和 Sync 是定义在 std::marker 里的两个 unsafe auto trait——「unsafe」是因为错误地手工实现它们能引入数据竞争; 「auto」是因为编译器会按字段递归自动派生。两条规则:
T: Send表示「T类型的所有权可以转移到另一个线程」。T: Sync表示「&T可以送到另一个线程」, 等价于「&T: Send」。
绝大多数类型这两条都自动成立。承重的反例只有三个: Rc<T> (引用计数是非原子的 Cell<usize>, 在两个线程上同时 clone 会数据竞争), RefCell<T> (借用检查是单线程非原子计数), 以及裸指针 *const T / *mut T (编译器不假设任何别名规则)。下面这段代码会拒绝编译, 报错你必须自己看一次:
use std::rc::Rc;
use std::thread;
fn main() {
let r = Rc::new(42);
// 下一行会触发 E0277:
thread::spawn(move || println!("{}", r));
// error[E0277]: `Rc<i32>` cannot be sent between threads safely
// = help: within `..`, the trait `Send` is not implemented for `Rc<i32>`
// note: required by a bound in `std::thread::spawn`
}
cargo check 的相关输出节选大致如下——把 E0277 和「cannot be sent between threads safely」这一行牢牢记住:
error[E0277]: `Rc<i32>` cannot be sent between threads safely
--> src/main.rs:7:5
|
7 | thread::spawn(move || println!("{}", r));
| ^^^^^^^^^^^^^ `Rc<i32>` cannot be sent between threads safely
= help: within `[closure@src/main.rs:7:19]`, the trait `Send` is not implemented for `Rc<i32>`
note: required by a bound in `std::thread::spawn`
修法只有一个: 把 Rc<T> 换成 Arc<T> (atomic reference count, 原子引用计数)。API 完全一致, 只是引用计数走 AtomicUsize, 因此 Arc<T>: Send + Sync 在 T: Send + Sync 时自动成立。「数据竞争是编译期错误」这句话在这个时刻才真正被你内化。
Arc<Mutex<T>>: 共享可变状态
回到开头那台 16 核机器。沪深300 Monte Carlo 计数器版本如下:
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0_u64));
let mut handles = Vec::new();
for _ in 0..4 {
let c = Arc::clone(&counter);
handles.push(thread::spawn(move || {
*c.lock().unwrap() += 1;
}));
}
for h in handles {
h.join().unwrap();
}
println!("最终值 = {}", *counter.lock().unwrap());
}
三点要看清: (1) Mutex::lock 返回的不是 T, 而是 LockResult<MutexGuard<T>>。MutexGuard<T> 是一个智能指针, 它的 Drop 实现负责释放锁——你永远不用手写 unlock(), 离开作用域就自动放锁 (RAII)。(2) lock() 返回 Result 的唯一原因是「中毒」(poisoning): 某个线程持锁时 panic 了, 之后所有 lock() 都返回 Err(PoisonError)——生产代码通常 .unwrap() 直接 panic, 库代码可以用错误上的 into_inner() 取回那份被怀疑的数据。(3) *g += 1 看着像一句, 其实是 load / add / store 三步, 但只要 g 这个 guard 还活着, 编译器就保证你独占访问, 整个三步是原子的。
RwLock<T> 一段就够: 多读者或者单写者; read() 返回 RwLockReadGuard (可多份共存), write() 返回 RwLockWriteGuard (独占)。读写比高时用它——经典场景是行情快照缓存, 一个 feed handler 写、N 个策略读。注意这里的 trade-off: 写者饥饿在某些操作系统的默认调度策略下确实会发生, 因此当写频率显著高于读频率时, 退回普通的 Mutex<T> 反而更稳。Condvar (条件变量, 用来按条件唤醒线程) 与 Barrier (N 个线程都到达同一点才放行) 各点一次, 本课不展开; 你绝大多数共享状态的需求用前面这四个原语就足够覆盖。
这一节往后
记住接下来三节会反复用的一条经验法则: **Arc<T> 用于跨线程共享只读所有权; Arc<Mutex<T>> 用于跨线程共享可变状态; Arc<RwLock<T>> 用于读写比高的共享状态; Send / Sync 由编译器从字段自动推导, 你几乎从不亲自写 impl**。L2 会把上面 Arc<Mutex<f64>> 的累加换成「发送端 / 接收端」(sender / receiver) 通道——你会看见为什么对异构有序的工作流, channel 比 mutex 更顺手。L3 会把 Mutex<u64> 换成 AtomicU64——为什么在内层循环里原子操作比互斥锁快一个数量级。L4 会把 std::sync::Mutex 换成 tokio::sync::Mutex——为什么异步任务持锁过 .await 必须用异步互斥锁。但所有这些替代品都坐在你这一节学到的四个原语之上。
练习
Exercise
拿本课展示的 Arc<Mutex<u64>> 计数器程序, 完成三步: (a) 把内层从 *c.lock().unwrap() += 1 改成 let mut g = c.lock().unwrap(); *g += 1; thread::sleep(std::time::Duration::from_millis(1));, 跑一次, 确认最终打印值依然恰好为 N (应当如此); (b) 把 Arc<Mutex<u64>> 替换为 Arc<RwLock<u64>>, 把 c.lock().unwrap() 替换为 c.write().unwrap(), 确认程序仍能编译且给出同一答案; (c) 把外层的 Arc::new(Mutex::new(0_u64)) 改成 Rc::new(Mutex::new(0_u64)), 把 cargo check 报错完整粘出来, 指出哪一条 trait 边界没满足以及为什么——明确点出 Rc<T> 不是 Send。
提示
MutexGuard 在作用域结束时才 drop; 即使 sleep 一毫秒, 同一时刻只有一个线程持锁, 所以最终值还是 N。先肉眼跟踪 guard 的生命周期。提示
thread::spawn 的 where F: FnOnce() -> T + Send + 'static。Rc<Mutex<u64>> 不是 Send, 因为 Rc<T> 的引用计数是非原子 Cell<usize>, 跨线程克隆会数据竞争。