← 返回模块
3.5.2.2beta 可读 · 未来付费校验通过内容版本 2026-05-27

通道与消息传递

3.5.2 · Rust 并发 · 编程

L1 你给 510300.SH (沪深300 ETF) 的 Monte Carlo 定价器装了 Arc<Mutex<f64>> 共享累加器, CPU 是被打满了, 但 perf 一打就能看到 4 个核里有大半时间在 lock_mutex 自旋——4 个 worker 抢同一把锁, 串行化在了那里。下一步, 你的负责人把另一个量化老兵叫过来评审, 他扫一眼说: 「这里就不该用锁。每个 worker 算一个局部平均值, 通过通道送回主线程, 主线程再合并——这是 Go 用了十多年的那套 share memory by communicating 模型, Rust 端把它做得更安全。」本课讲的就是 Rust 里这套消息传递并发模式, 它和 L1 的共享状态并发互为补集: 共享状态需要锁, 抢锁会串行化, 失败模式是死锁; 消息传递不要锁, 失败模式是无界队列暴涨——而 Rust 的 sync_channel(n) 给你内建的背压解决方案。

std::sync::mpsc: 标准库的多生产者单消费者

名字解码: multi-producer single-consumer (多生产者单消费者, MPSC)。Fenced 标准调用如下:

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel::<i32>();
    thread::spawn(move || {
        tx.send(42).unwrap();
    });
    let v = rx.recv().unwrap();
    println!("{}", v);
}

mpsc::channel::<T>() 返回 (Sender<T>, Receiver<T>)——无界 (unbounded) 通道。tx.send(msg) 的返回类型是 Result<(), SendError<T>>, 出错仅在「接收端被 drop 了」一种情况 (队列没人收, 直接报错); 工业代码通常 .unwrap()——接收端意外消失通常是 shutdown 逻辑写错。rx.recv() 返回 Result<T, RecvError>——阻塞到有消息到达或所有 sender 都被 drop 才返回; 后者意味着通道关闭。rx.recv_timeout(d) 给你 Result<T, RecvTimeoutError>, 其中 TimeoutDisconnected 两种变体可区分; rx.try_recv() 非阻塞返回 Result<T, TryRecvError>。最 idiomatic 的消费者形式是 for msg in rx { ... }——Receiver 实现了 Iterator, 通道关闭时循环自然退出, 你几乎不应该在循环里直接写 recv().unwrap()

Sender 实现 Clone + Send (每个生产者克隆一份送进各自的线程); ReceiverSend!Clone——单消费者是有意的设计选择, 因为它保证了「严格的入队顺序在出队端被一致地看到」, 这是大多数管道想要的属性。

背压与有界通道: sync_channel(n)

mpsc::sync_channel(n) 是有界变体, 同样返回 (SyncSender, Receiver) 对; 区别只在 send 会在缓冲区已有 n 条未读消息时阻塞。生产场景下你几乎总是要用它——任何「生产者可能跑得比消费者快」的情况下, 无界通道都会让内存涨穿。一个典型的反例: tick 行情接入线程每毫秒发一条消息, 策略线程每条消息要算 5 ms, 用 channel() 半小时就把进程 OOM 杀掉了; 用 sync_channel(8) 行情线程会被自动节流到策略的吞吐量。背压是 channel 模型里你最不需要自己写的特性——它是免费的, 只要别用错变体。下面是有界 + 多生产者的标准模式:

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::sync_channel::<u32>(4);
    for i in 0..3 {
        let tx_i = tx.clone();
        thread::spawn(move || {
            tx_i.send(i).unwrap();
        });
    }
    drop(tx);
    for msg in rx {
        println!("收到 {}", msg);
    }
}

那一行 drop(tx); 至关重要: 不 drop 它, 主线程持有的原始 tx 副本会让通道始终「有 sender 活着」, for msg in rx 永远不会退出。这是「我的 channel 怎么不关」类 bug 最常见的根因。

多消费者怎么办: crossbeam-channel

stdlib 的 MPSC 设计上只支持一个消费者。当你想让多个 worker 从同一个队列里抢任务时, 业界标准答案是 crossbeam-channel crate: crossbeam::channel::unbounded()bounded(n) 给你 MPMC (多生产者多消费者) 版的 Sender / Receiver, API 与 stdlib 几乎一致。还有 select! 宏让一个线程同时等待多个 channel 上的事件——这是 Rust 里最接近 Go select 的原语:

select! {
    recv(rx1) -> msg => ...,
    recv(rx2) -> msg => ...,
    default(Duration::from_millis(10)) => ...,
}

异步版的 tokio::sync::mpsc::channeltokio::select! 形态一致, 留到 L4 讲。

fan-out / fan-in: Monte Carlo 重写

L1 那版 Arc<Mutex<f64>> 累加器, 现在用 channel 重写。骨架如下 (simulate_path 是 3.5.1 那版的单路径 payoff 函数):

use std::sync::mpsc;
use std::thread;

fn simulate_path(seed: u64) -> f64 { /* 一条路径的折现 payoff */ 0.0 }

fn main() {
    let n_workers = 4;
    let n_paths = 100_000;
    let (tx, rx) = mpsc::sync_channel::<f64>(n_workers);
    thread::scope(|s| {
        for w in 0..n_workers {
            let tx_w = tx.clone();
            s.spawn(move || {
                let mut sum = 0.0_f64;
                for i in 0..(n_paths / n_workers) {
                    sum += simulate_path((w * n_paths / n_workers + i) as u64);
                }
                tx_w.send(sum / (n_paths / n_workers) as f64).unwrap();
            });
        }
        drop(tx);
    });
    let total: f64 = rx.iter().sum::<f64>();
    let mean = total / n_workers as f64;
    println!("price proxy = {}", mean);
}

三个细节要看到: thread::scope 替代了 L1 的 Arc::clone 样板——scoped 线程可以借用主栈上的 n_paths / n_workers 等只读常量, 不需要 Arc; drop(tx);scope 闭包末尾把原始 sender 释放——加上所有 worker 退出时会自动 drop 各自的克隆, 通道在最后一个 sender 死亡的瞬间关闭, 消费者 for partial in rx 自然终止; 消费循环完全不在乎有几个生产者——这就是 channel 模型相对 mutex 模型的核心优势, 拓扑结构对消费者透明。

数据并行的另一条路: Rayon

并不是每个并行任务都需要 channel。如果工作单元同构、独立、顺序无关——典型的回测扫参、参数搜索、Monte Carlo 求和——rayon crate 用一行替你解决:

use rayon::prelude::*;

fn main() {
    let n_paths: usize = 100_000;
    let mean: f64 = (0..n_paths)
        .into_par_iter()
        .map(|i| simulate_path(i as u64))
        .sum::<f64>() / (n_paths as f64);
    println!("{}", mean);
}

fn simulate_path(seed: u64) -> f64 { 0.0 }

Cargo.toml 加一行:

[dependencies]
rayon = "1"

Rayon 的模型一段话讲完: 它内置一个 work-stealing 线程池, 默认尺寸为 CPU 核数; into_par_iter 把迭代器分块派给池里的 worker; API 与标准 Iterator 完全一致 (map / filter / sum / collect), 把串行版本逐行改成并行版本几乎不需要重构。

经验法则: ​工作单元异构、顺序敏感、需要显式背压时用 channel​ (交易管道——行情 → 策略 → 下单器); ​工作单元同构、独立、顺序无关时用 par_iter (回测扫参、参数网格、Monte Carlo 聚合)。两者互补; channel 不是「老」, par_iter 也不是「新」, 它们解决的是不同形状的工作负载。蚂蚁链、字节火山引擎和头部私募的 Rust 实务也是这个分界: 实时管道走 channel, 离线扫参走 Rayon。

往后

L3 会告诉你, 当 sync_channel 内部那把 mutex 都成为瓶颈时, 怎么用原子操作直接搭一条 SPSC (单生产者单消费者) 环形缓冲; 那是真正的低延迟管道里行情接入到策略引擎那一跳要做的事。L4 会把 tokio::sync::mpsc 引出来——当 I/O 等待是主要瓶颈、线程数远大于 CPU 核数时, async channel 是更合适的原语, 国内交易所网关重写几乎清一色选这条路。crossbeam-channel 的 MPMC + select! 在 3.5.3 的低延迟交易场景里会被反复用到, 当一个 worker 需要同时监听多条上游通道时它是唯一像样的方案。本课的两个模型——channel 与 par_iter——已经覆盖了你在量化生产管道里 90% 的并行需求。

练习

Exercise

拿本课的 fan-out / fan-in Monte Carlo 骨架, 完成: (a) 分别用 n_workers = 1n_workers = 4 各跑一次, 比较两次最终均值——它们到 1e-12 应当 相符 (每个 worker 用不同的确定性 seed 区间, 浮点求和顺序变了), 但到 1e-3 应当相符; (b) 把 thread::scope 末尾的显式 drop(tx); 删掉, 运行程序并报告现象——消费者 for msg in rx { ... } 会卡住, 因为主线程仍持有原始 tx, 通道不会关闭; 一句话解释为什么; (c) 用 rayon::prelude::*(0..n_paths).into_par_iter().map(simulate_path).sum::<f64>() / (n_paths as f64) 重写同一 Monte Carlo, 确认结果与 4-worker channel 版本到 1e-3 相符。

提示
(a) 浮点求和不满足结合律。同一组路径用不同分组方式求和, 中间舍入误差不同, 末位会差几 ULP。但 1e-3 量级上 Monte Carlo 的统计误差远大于浮点舍入。
提示
(b) Receiver::iter() 的终止条件是「所有 Sender clone 都已 drop」。主线程那份原始 tx 没被显式 drop, scope 闭包返回后它还活着, 通道一直敞着。