国内某 SSE 接入团队接到任务: 把老 C++ + Boost.Asio 写的行情接入网关重构成 Rust, 单台机器要同时维持 8000 条 TCP 长连接, 把 510300.SH (沪深300 ETF) 等几百只标的的 tick 流落到内部撮合面板。架构师扫一眼说: 上 tokio——别想着每条连接派一个 OS 线程, 8000 个 OS 线程在调度器上会自相残杀; 写 async, 让一个工作线程池服务上万个并发任务, I/O 等待时把那个工作线程让给别的任务。这一课就是这个决策的最底层版本: 当你的瓶颈是 I/O 等待而非 CPU 计算, async / await 是 Rust 给你的解决方案; Tokio 是国内私募和大厂在这条路上几乎统一选用的运行时。前三课讲了你需要把 CPU 用满时用什么; 本课讲你需要等待时用什么。两套机制互补, 真实量化系统在 I/O 边界用 tokio, 在 CPU 计算边界用 std::thread + Rayon + 无锁队列。
心智模型: async fn 是状态机的语法糖
调用 async fn foo() -> i32 { ... } 时, 它 不会运行——返回的是一个匿名类型, 实现 Future trait。编译器把 async fn 函数体脱糖成一个状态机, 每个 .await 是一个状态边界, Future 的 poll 方法每被调用一次, 状态机前进一步。Future trait 的签名读一次就够, 本课不要求你手写实现:
pub trait Future {
type Output;
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output>;
}
Poll<T> 是 Pending (还没好; future 已经登记 waker, 执行器之后会再调用 poll) 或者 Ready(T) (好了)。Pin<T> 这个类型让自引用的状态机在类型系统层面安全, 细节留到 3.5.4。本课你只需要把 async / .await 当语法用, 信任编译器把状态机造对。
运行时: 你必须自己选一个
Rust 标准库里 没有 async 运行时。这是从 Go / Java / Python 切过来的开发者最容易踩的第一个坑: 调用一个 async fn 不把返回的 future 交给执行器, 它什么都不会做——没有 warning, 没有 error, 就是一个静默的「不发生」。社区里有 tokio、async-std、smol、glommio、monoio 等运行时; 本模块统一选 tokio, 因为国内金融基础设施栈里 tokio 几乎占绝对多数, 文档完整, 社区支持成熟。cargo add tokio --features full 拉进多线程运行时、I/O 驱动、定时器、同步原语、宏和 spawn_blocking——学习阶段用 full, 生产里挑你真用到的子集开启即可。Fenced Cargo.toml 配置如下:
[dependencies]
tokio = { version = "1", features = ["full"] }
入口宏 #[tokio::main]:
#[tokio::main]
async fn main() {
let handle = tokio::spawn(async {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
42_i32
});
let v = handle.await.unwrap();
println!("{}", v);
}
宏展开为一个同步 main, 内部构造 tokio::runtime::Runtime 并调用 runtime.block_on(your_async_body); 默认是多线程运行时, 工作线程数等于 CPU 核数。#[tokio::main(flavor = "current_thread")] 是单线程变体, 所有任务都钉在同一个 OS 线程上, 调试或单核小型服务可用。#[tokio::test] 是测试的等价物。
tokio::spawn 与结构化并发
tokio::spawn(async move { ... }) 返回 JoinHandle<T>, .await 它得到 Result<T, JoinError>——JoinError 捕获 task panic 或 abort()。要求闭包 Future<Output = T> + Send + 'static, 与 L1 std::thread::spawn 的边界一致, 提升到 future 上。tokio::join! 让你在同一任务里并发跑多个 future, tokio::select! 让你等首个完成的 future 并 cancel 其余:
async fn task_a() -> u32 { 1 }
async fn task_b() -> u32 { 2 }
async fn task_c() -> u32 { 3 }
#[tokio::main]
async fn main() {
let (a, b) = tokio::join!(task_a(), task_b());
println!("{} {}", a, b);
tokio::select! {
v = task_c() => println!("c first: {:?}", v),
_ = tokio::time::sleep(std::time::Duration::from_secs(1)) => println!("timeout"),
}
}
select! 选中一支后会 drop 其余 future——drop 一个 future 即 cancel, 「cancel by drop」是 Rust async 的核心模型。3.5.3 的 tokio_util::sync::CancellationToken 是协作式 cancel 的更结构化版本, 本课点到为止。
std 同步原语的 async 表亲
按 L1 / L2 的镜像学: tokio::sync::mpsc::channel(n) 默认有界, 返回 (Sender, Receiver); tx.send(msg).await 在缓冲区满时挂起, rx.recv().await 返回 Option<T> ——注意与 std::sync::mpsc 的 Result<T, RecvError> API 有差异: 通道关闭时 stdlib 返回 Err, tokio 返回 None。需要无界变体时, tokio::sync::mpsc::unbounded_channel() 是 opt-in 入口; 默认有界是 tokio 的明智选择, 因为大多数生产代码都需要背压。tokio::sync::Mutex<T> 是 async mutex, m.lock().await 返回 MutexGuard; 只有在你需要跨 .await 持锁时才用它, 大多数情况 std::sync::Mutex 在 async 代码里也是正确选择, 而且更快——尝试在 std::sync::Mutex 持锁状态下 .await 是常见 bug, 会让运行时调度器把锁带到另一个任务的执行栈上去。tokio::sync::RwLock<T> 是读写锁的 async 表亲; tokio::sync::Notify 用于一次性条件唤醒; tokio::sync::oneshot::channel() 用于「从任务 A 向任务 B 发送恰好一个值」, 是 tokio 里最便宜的同步原语。tokio::time::sleep(Duration::from_millis(10)).await 是 async sleep, 不阻塞任何工作线程; tokio::time::timeout(d, fut).await 把 future 加上超时, 返回 Result<T, Elapsed>; tokio::time::interval(period) 给出周期性 tick, 适合做心跳。
绝对规则: async 任务里不要阻塞
每个学过 async 的人都犯过一次这个错, 写下来做警示。std::thread::sleep、阻塞文件 I/O、长 CPU 循环、std::sync::Mutex::lock() 在高竞争下——它们都跑在工作线程上, 跑的过程中那个工作线程无法服务其他任务, 你会饿死整个运行时。逃生口是 tokio::task::spawn_blocking(|| { ... }), 返回 JoinHandle<T>, 把阻塞工作搬到专门的 blocking 线程池:
#[tokio::main]
async fn main() {
let cpu_result = tokio::task::spawn_blocking(|| {
let mut s = 0_u64;
for i in 0..1_000_000_u64 {
s = s.wrapping_add(i);
}
s
}).await.unwrap();
println!("{}", cpu_result);
}
经验法则: 任何可能花 >10 μs 纯 CPU 的调用, 或任何不在 tokio I/O 驱动内的同步系统调用 (文件 I/O、原生加解密、调 C / Python 库), 都走 spawn_blocking。
Capstone: 异步行情模拟器
把全部要点放进一个 capstone——510300.SH 行情模拟器: producer 每 10 ms 通过 sleep 唤醒发一条合成 tick, consumer 累加均值, 100 ms 后主任务 drop sender, 三任务通过 tokio::join! 收尾:
#[tokio::main]
async fn main() {
let (tx, mut rx) = tokio::sync::mpsc::channel::<f64>(16);
let producer = tokio::spawn(async move {
for _ in 0..10 {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
if tx.send(0.0).await.is_err() {
break;
}
}
});
let consumer = tokio::spawn(async move {
let mut sum = 0.0_f64;
let mut n: u64 = 0;
while let Some(tick) = rx.recv().await {
sum += tick;
n += 1;
}
println!("mean = {}", if n == 0 { 0.0 } else { sum / n as f64 });
});
let _ = tokio::join!(producer, consumer);
}
三个值得注意的细节: producer 在 tx.send(...).is_err() 时干净退出, 没有死循环追打一个已经消失的 receiver; consumer 在通道关闭时 while let 自然结束, 不需要额外的 shutdown 信号; 三个任务在默认 #[tokio::main] 上至多共用两个工作线程——可通过 tokio_console 在 3.5.3 里看到任务调度的实际形态, 而不是你想象中的 1 任务 = 1 OS 线程。这就是 async 模型真正赚到的地方: 「N 个任务跑在 M 个线程上, M ≪ N」。
往后
真实交易系统的 async 模式——tokio::net::TcpStream 接交易所、tokio-util::codec::Framed 做协议分帧、背压设计、tokio_util::sync::CancellationToken 做协作 cancel、tracing + tokio_console 做可观测性——全部在 3.5.3 Rust 低延迟交易里展开; 手写 Future 实现与 Pin<T> 的来龙去脉在 3.5.4 讲; 其他运行时存在但 tokio 是国内 quant Rust 路线上的事实标准, 也是本科目接下来用的运行时。
阅读建议与行业落点
《Rust 程序设计语言》中文版第 17 章 (若你的版本已纳入) 给出 async 章的入门; course.rs 的「Rust 圣经」async 章节、tokio.rs 官方教程的中文社区翻译合集、张汉东《Rust 编程之道》异步章是另外三条主线阅读路径。国内 quant Rust 实务里, tokio 主要承担三类角色: 交易所网关 (TCP/UDP 长连接 + 自定义二进制协议, CFFEX 等多家接入栈在 Rust 重写时几乎清一色选 tokio)、行情接入与分发 (UDP 组播解码 → 内部 tokio::sync::broadcast 扇出)、以及管理后台 (REST / gRPC / WebSocket)。回测扫参、策略 P&L 计算这类 CPU 重负载几乎不走 async (瓶颈在 CPU 不在 I/O), 仍然是 std::thread + Rayon + 无锁队列。这条「I/O 走 async, CPU 走线程」的分工, 是本模块前三课与本课在生产里最常见的组装方式: 你已经具备把一台 16 核机器同时塞满计算与塞满连接的全部基础。本课就此收尾, 后续每一节都建立在这四个原语之上。
练习
Exercise
拿本课的异步行情模拟器 capstone, 完成: (a) 把 producer 里的 tokio::time::sleep(...).await 改成 std::thread::sleep(std::time::Duration::from_millis(10)), 同时保持 consumer 运行, 报告吞吐量与 CPU 占用的变化 (producer 的 std::thread::sleep 会阻塞一个运行时工作线程, 饿死 consumer; 现象是 consumer 任务落后或看似停滞); (b) 还原 (a), 再加一个第三任务每 25 ms 通过 tokio::time::sleep(Duration::from_millis(25)).await 打印 "alive", 确认 100 ms 运行期间它恰好打印 4 次; (c) 把通道创建改为 tokio::sync::mpsc::channel::<f64>(1) (缓冲区 1), 报告 producer 的 tx.send(...).await 是否会挂起 (会; consumer 每条 tick 都要处理一点时间); (d) 把整个 producer 包进 tokio::time::timeout(Duration::from_millis(50), async { ... }).await, 报告外层表达式的类型 (Result<(), tokio::time::error::Elapsed>)。
提示
num_cpus 个工作线程, 把其中一个完全阻塞 10 ms 通常仍能让另一线程跑 consumer——但若工作线程数被 cap 为 1 (例如 current_thread flavor) 现象会更显眼。