← 返回模块
3.5.2.4beta 可读 · 未来付费校验通过内容版本 2026-05-27

async / await 与 Tokio 入门

3.5.2 · Rust 并发 · 编程

国内某 SSE 接入团队接到任务: 把老 C++ + Boost.Asio 写的行情接入网关重构成 Rust, 单台机器要同时维持 8000 条 TCP 长连接, 把 510300.SH (沪深300 ETF) 等几百只标的的 tick 流落到内部撮合面板。架构师扫一眼说: 上 tokio——别想着每条连接派一个 OS 线程, 8000 个 OS 线程在调度器上会自相残杀; 写 async, 让一个工作线程池服务上万个并发任务, I/O 等待时把那个工作线程让给别的任务。这一课就是这个决策的最底层版本: 当你的瓶颈是 I/O 等待而非 CPU 计算, async / await 是 Rust 给你的解决方案; Tokio 是国内私募和大厂在这条路上几乎统一选用的运行时。前三课讲了你需要把 CPU 用满时用什么; 本课讲你需要等待时用什么。两套机制互补, 真实量化系统在 I/O 边界用 tokio, 在 CPU 计算边界用 std::thread + Rayon + 无锁队列。

心智模型: async fn 是状态机的语法糖

调用 async fn foo() -> i32 { ... } 时, 它 ​不会运行​​——返回的是一个匿名类型, 实现 Future trait。编译器把 async fn 函数体脱糖成一个状态机, 每个 .await 是一个状态边界, Futurepoll 方法每被调用一次, 状态机前进一步。Future trait 的签名读一次就够, 本课不要求你手写实现:

pub trait Future {
    type Output;
    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output>;
}

Poll<T>Pending (还没好; future 已经登记 waker, 执行器之后会再调用 poll) 或者 Ready(T) (好了)。Pin<T> 这个类型让自引用的状态机在类型系统层面安全, 细节留到 3.5.4。本课你只需要把 async / .await 当语法用, 信任编译器把状态机造对。

运行时: 你必须自己选一个

Rust 标准库里 ​没有​ async 运行时。这是从 Go / Java / Python 切过来的开发者最容易踩的第一个坑: 调用一个 async fn 不把返回的 future 交给执行器, 它什么都不会做——没有 warning, 没有 error, 就是一个静默的「不发生」。社区里有 tokioasync-stdsmolglommiomonoio 等运行时; 本模块统一选 tokio, 因为国内金融基础设施栈里 tokio 几乎占绝对多数, 文档完整, 社区支持成熟。cargo add tokio --features full 拉进多线程运行时、I/O 驱动、定时器、同步原语、宏和 spawn_blocking——学习阶段用 full, 生产里挑你真用到的子集开启即可。Fenced Cargo.toml 配置如下:

[dependencies]
tokio = { version = "1", features = ["full"] }

入口宏 #[tokio::main]:

#[tokio::main]
async fn main() {
    let handle = tokio::spawn(async {
        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
        42_i32
    });
    let v = handle.await.unwrap();
    println!("{}", v);
}

宏展开为一个同步 main, 内部构造 tokio::runtime::Runtime 并调用 runtime.block_on(your_async_body); 默认是多线程运行时, 工作线程数等于 CPU 核数。#[tokio::main(flavor = "current_thread")] 是单线程变体, 所有任务都钉在同一个 OS 线程上, 调试或单核小型服务可用。#[tokio::test] 是测试的等价物。

tokio::spawn 与结构化并发

tokio::spawn(async move { ... }) 返回 JoinHandle<T>, .await 它得到 Result<T, JoinError>——JoinError 捕获 task panic 或 abort()。要求闭包 Future<Output = T> + Send + 'static, 与 L1 std::thread::spawn 的边界一致, 提升到 future 上。tokio::join! 让你在同一任务里并发跑多个 future, tokio::select! 让你等首个完成的 future 并 cancel 其余:

async fn task_a() -> u32 { 1 }
async fn task_b() -> u32 { 2 }
async fn task_c() -> u32 { 3 }

#[tokio::main]
async fn main() {
    let (a, b) = tokio::join!(task_a(), task_b());
    println!("{} {}", a, b);

    tokio::select! {
        v = task_c() => println!("c first: {:?}", v),
        _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => println!("timeout"),
    }
}

select! 选中一支后会 drop 其余 future——drop 一个 future 即 cancel, 「cancel by drop」是 Rust async 的核心模型。3.5.3 的 tokio_util::sync::CancellationToken 是协作式 cancel 的更结构化版本, 本课点到为止。

std 同步原语的 async 表亲

按 L1 / L2 的镜像学: tokio::sync::mpsc::channel(n) 默认有界, 返回 (Sender, Receiver); tx.send(msg).await 在缓冲区满时挂起, rx.recv().await 返回 Option<T> ——注意与 std::sync::mpscResult<T, RecvError> API 有差异: 通道关闭时 stdlib 返回 Err, tokio 返回 None。需要无界变体时, tokio::sync::mpsc::unbounded_channel() 是 opt-in 入口; 默认有界是 tokio 的明智选择, 因为大多数生产代码都需要背压。tokio::sync::Mutex<T> 是 async mutex, m.lock().await 返回 MutexGuard; ​只有在你需要跨 .await 持锁时才用它​​, 大多数情况 std::sync::Mutex 在 async 代码里也是正确选择, 而且更快——尝试在 std::sync::Mutex 持锁状态下 .await 是常见 bug, 会让运行时调度器把锁带到另一个任务的执行栈上去。tokio::sync::RwLock<T> 是读写锁的 async 表亲; tokio::sync::Notify 用于一次性条件唤醒; tokio::sync::oneshot::channel() 用于「从任务 A 向任务 B 发送恰好一个值」, 是 tokio 里最便宜的同步原语。tokio::time::sleep(Duration::from_millis(10)).await 是 async sleep, 不阻塞任何工作线程; tokio::time::timeout(d, fut).await 把 future 加上超时, 返回 Result<T, Elapsed>; tokio::time::interval(period) 给出周期性 tick, 适合做心跳。

绝对规则: async 任务里不要阻塞

每个学过 async 的人都犯过一次这个错, 写下来做警示。std::thread::sleep、阻塞文件 I/O、长 CPU 循环、std::sync::Mutex::lock() 在高竞争下——它们都跑在工作线程上, 跑的过程中那个工作线程无法服务其他任务, 你会饿死整个运行时。逃生口是 tokio::task::spawn_blocking(|| { ... }), 返回 JoinHandle<T>, 把阻塞工作搬到专门的 blocking 线程池:

#[tokio::main]
async fn main() {
    let cpu_result = tokio::task::spawn_blocking(|| {
        let mut s = 0_u64;
        for i in 0..1_000_000_u64 {
            s = s.wrapping_add(i);
        }
        s
    }).await.unwrap();
    println!("{}", cpu_result);
}

经验法则: 任何可能花 >10 μs 纯 CPU 的调用, 或任何不在 tokio I/O 驱动内的同步系统调用 (文件 I/O、原生加解密、调 C / Python 库), 都走 spawn_blocking

Capstone: 异步行情模拟器

把全部要点放进一个 capstone——510300.SH 行情模拟器: producer 每 10 ms 通过 sleep 唤醒发一条合成 tick, consumer 累加均值, 100 ms 后主任务 drop sender, 三任务通过 tokio::join! 收尾:

#[tokio::main]
async fn main() {
    let (tx, mut rx) = tokio::sync::mpsc::channel::<f64>(16);
    let producer = tokio::spawn(async move {
        for _ in 0..10 {
            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
            if tx.send(0.0).await.is_err() {
                break;
            }
        }
    });
    let consumer = tokio::spawn(async move {
        let mut sum = 0.0_f64;
        let mut n: u64 = 0;
        while let Some(tick) = rx.recv().await {
            sum += tick;
            n += 1;
        }
        println!("mean = {}", if n == 0 { 0.0 } else { sum / n as f64 });
    });
    let _ = tokio::join!(producer, consumer);
}

三个值得注意的细节: producer 在 tx.send(...).is_err() 时干净退出, 没有死循环追打一个已经消失的 receiver; consumer 在通道关闭时 while let 自然结束, 不需要额外的 shutdown 信号; 三个任务在默认 #[tokio::main] 上至多共用两个工作线程——可通过 tokio_console 在 3.5.3 里看到任务调度的实际形态, 而不是你想象中的 1 任务 = 1 OS 线程。这就是 async 模型真正赚到的地方: 「N 个任务跑在 M 个线程上, M ≪ N」。

往后

真实交易系统的 async 模式——tokio::net::TcpStream 接交易所、tokio-util::codec::Framed 做协议分帧、背压设计、tokio_util::sync::CancellationToken 做协作 cancel、tracing + tokio_console 做可观测性——全部在 3.5.3 Rust 低延迟交易里展开; 手写 Future 实现与 Pin<T> 的来龙去脉在 3.5.4 讲; 其他运行时存在但 tokio 是国内 quant Rust 路线上的事实标准, 也是本科目接下来用的运行时。

阅读建议与行业落点

《Rust 程序设计语言》中文版第 17 章 (若你的版本已纳入) 给出 async 章的入门; course.rs 的「Rust 圣经」async 章节、tokio.rs 官方教程的中文社区翻译合集、张汉东《Rust 编程之道》异步章是另外三条主线阅读路径。国内 quant Rust 实务里, tokio 主要承担三类角色: 交易所网关 (TCP/UDP 长连接 + 自定义二进制协议, CFFEX 等多家接入栈在 Rust 重写时几乎清一色选 tokio)、行情接入与分发 (UDP 组播解码 → 内部 tokio::sync::broadcast 扇出)、以及管理后台 (REST / gRPC / WebSocket)。回测扫参、策略 P&L 计算这类 CPU 重负载几乎不走 async (瓶颈在 CPU 不在 I/O), 仍然是 std::thread + Rayon + 无锁队列。这条「I/O 走 async, CPU 走线程」的分工, 是本模块前三课与本课在生产里最常见的组装方式: 你已经具备把一台 16 核机器同时塞满计算与塞满连接的全部基础。本课就此收尾, 后续每一节都建立在这四个原语之上。

练习

Exercise

拿本课的异步行情模拟器 capstone, 完成: (a) 把 producer 里的 tokio::time::sleep(...).await 改成 std::thread::sleep(std::time::Duration::from_millis(10)), 同时保持 consumer 运行, 报告吞吐量与 CPU 占用的变化 (producer 的 std::thread::sleep 会阻塞一个运行时工作线程, 饿死 consumer; 现象是 consumer 任务落后或看似停滞); (b) 还原 (a), 再加一个第三任务每 25 ms 通过 tokio::time::sleep(Duration::from_millis(25)).await 打印 "alive", 确认 100 ms 运行期间它恰好打印 4 次; (c) 把通道创建改为 tokio::sync::mpsc::channel::<f64>(1) (缓冲区 1), 报告 producer 的 tx.send(...).await 是否会挂起 (会; consumer 每条 tick 都要处理一点时间); (d) 把整个 producer 包进 tokio::time::timeout(Duration::from_millis(50), async { ... }).await, 报告外层表达式的类型 (Result<(), tokio::time::error::Elapsed>)。

提示
(a) tokio 默认多线程运行时有 num_cpus 个工作线程, 把其中一个完全阻塞 10 ms 通常仍能让另一线程跑 consumer——但若工作线程数被 cap 为 1 (例如 current_thread flavor) 现象会更显眼。
提示
(c) 「producer 不挂起」的唯一可能是 buffer 永远不满。buffer=1 意味着第一条 tick 必须被 consumer 取走后 producer 才能发第二条; 否则 send 必然 await 直到出位。