← 返回模块
3.5.3.3beta 可读 · 未来付费校验通过内容版本 2026-05-27

Rust 低延迟之异步网络与 tokio + ITCH 解析

3.5.3 · Rust 低延迟交易 · 编程

凌晨四点零一,你坐在 CFFEX 张江 COLO 机房楼上的值班室。你是国内一家头部私募的 Rust 工程师,负责沪深300 ETF (510300.SH) 的行情接入;早盘脚本 03:58 跑完,集合竞价 9:15 开始;此刻你的 tokio::net::UdpSocket 订阅器跑合成行情回归时报了一个序列号缺口 —— 序号 142,367,189 与 142,367,190 之间的 multicast 包不知道在 COLO 机房网络里哪条线缆上丢了。你的代码打印了 FeedEvent::Gap { start: 142_367_189, end: 142_367_190 },向交易所重传服务器发了一条 TCP 重传请求,四十二毫秒后丢包补回,流恢复;策略侧丝毫没察觉,回归通过,你回头继续喝茶。3.5.2 L4 教过 tokiotokio::sync::mpsctokio::time::sleep,让你感受 runtime;本课在它上面盖上网络层 —— tokio::net::TcpStreamUdpSockettokio_util::codec::Framedbytes crate、一个可工作的 纳斯达克 TotalView ITCH 5.0 解析器,以及生产环境每个行情接入都要实现的「序列号缺口检测加三条恢复路径」纪律。

网络接口

tokio::net::TcpStream::connect("host:port").await? 返回异步 TCP 流,实现 AsyncReadAsyncWrite,即 std::io::Read / Write 的异步版本。tokio::net::TcpListener::bind("0.0.0.0:port").await? 是服务端,.accept().await? 返回 (TcpStream, SocketAddr)tokio::net::UdpSocket::bind("0.0.0.0:port").await? 返回异步 UDP 套接字,API 基于报文:socket.send_to(&buf, &addr).await? 发一个包,socket.recv_from(&mut buf).await? 收一个包并连同对端地址返回。多播订阅是同步配置:socket.join_multicast_v4(group_ip, interface_ip)? 配置套接字,异步工作放在随后的 recv_from 循环里。

生产法则按协议形态分:行情走 multicast UDP,因为交易所发一次,N 个订阅者并行收,不付服务端 fan-out 成本;以可靠性换可扩展性,可靠性靠上层的序列号机制找回。FIX 会话、重传请求、订单路由走 TCP,因为可靠性与顺序比 fan-out 更重要。下面的示例用 UdpSocket 接行情,TcpStream 留一段话在重传 / FIX 侧点名。

Fenced 下面是 ITCH 订阅者的标准 tokio::main 加 bind 加 join 的形态:

use std::net::Ipv4Addr;
use tokio::net::UdpSocket;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let socket = UdpSocket::bind("0.0.0.0:30001").await?;
    let group:     Ipv4Addr = "239.0.0.1".parse()?;
    let interface: Ipv4Addr = Ipv4Addr::UNSPECIFIED;          // any local interface
    socket.join_multicast_v4(group, interface)?;

    let mut buf = vec![0_u8; 65_536];
    loop {
        let (n, src) = socket.recv_from(&mut buf).await?;
        // hand off the n bytes from buf[..n] to the ItchDecoder (next snippet).
        let _ = (n, src);
    }
}

65,536 字节缓冲对应 IPv4 UDP 报文最大值;生产中单次 read 通常不超过 MTU (CFFEX 张江与 SSE 浦东 COLO 机房通常 1500 字节,部分巨帧路径 9000 字节)。CFFEX / SSE / SZSE 的真实 multicast 组由网络工程组按 239.x.x.x 分配;课程示例用 239.0.0.1,因为开发笔记本上也能路由通,方便在本地跑通。本课的硬件锚点继续沿用 L1 / L2 的 Intel Xeon Gold 6342 (Ice Lake-SP, 24 核) 配 Rocky Linux 9 或 Ubuntu 22.04 LTS,部署在 CFFEX 张江 / SSE 浦东 / SZSE 福田 COLO 机房,这套配置在国内一线私募与头部券商自营 Rust 团队里基本是默认基线。

tokio_util::codec 做分帧

Rust 异步分帧原语在 tokio_util::codec::{Framed, Decoder, Encoder, LengthDelimitedCodec, FramedRead, FramedWrite} 里。读侧是 Decoder trait:trait Decoder { type Item; type Error; fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error>; }。分帧机制反复调 decode;每次检视 BytesMut 缓冲,判断是否够字节解一条消息,够就解出来并把这些字节从缓冲消费掉,不够就 Ok(None) 提示「再多读点」。Encoder 是镜像:fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error>;LengthDelimitedCodec::new() 是「长度前缀」族的标准原语 —— 读 4 字节大端长度,再读相应字节数的 payload,最后把 payload 作为一个 BytesMut 交出来。ITCH 分帧是同样的形态,只是长度前缀为 2 字节大端。

包装一个 stream 只需一行:let mut framed = Framed::new(tcp_stream, MyDecoder::new()); 在读侧实现 Stream<Item = Result<MyMessage, MyError>>,在写侧实现 Sink<MyMessage>;惯用消费者写法是 while let Some(msg) = framed.next().await { ... },需要在作用域内 use tokio_stream::StreamExt;

bytes crate

bytes::Bytes 是引用计数、不可变、连续的字节缓冲;clone 一次原子自增,代价极低;切片再得到一个 Bytes,指向同一份底层分配,零拷贝。bytes::BytesMut 是单一所有者的可变版本 —— 支持 put_u8 / put_u16 / put_u32 / put_u64 等写入操作,freeze() 转换成 Bytes 交给下游消费者。法则:codec 层从 BytesMut 读 (runtime 拥有的分帧缓冲),解码为你的消息类型,而消息类型可以借用 Bytes 切片承载需要保留的负载 —— 从内核 read 缓冲到你的消息结构,全程零拷贝。

可工作的 ITCH 5.0 解析器

纳斯达克 TotalView ITCH 5.0 是本课选用的公开协议,理由是:它是真正公开的 (nasdaqtrader.com/content/technicalsupport/specifications/dataproducts/) ,SSE Level-2 / SZSE Level-2 / CFFEX 行情接口规范属订阅授权;架构模式 (multicast UDP 加序列号加冗余 feed 加 snapshot-then-incrementals) 与国内三家交易所完全一致;C++ 路径 3.4.4 L4 与 3.4.5 L1 解的就是这个协议。每条 ITCH 消息前面是 2 字节大端长度,然后是 1 字节消息类型码,再是按类型而定的报文体。五种载荷类型:b'A' Add Order (36 字节体 —— 序号 / 时间戳 / order_ref / 方向 / 数量 / 股票代码 / 价格 ticks)、b'D' Delete Order (21 字节体)、b'E' Order Executed (31 字节体)、b'X' Order Cancel (23 字节体)、b'U' Order Replace (34 字节体)。其它类型落入 Other(u8) 通配并跳过:

use bytes::{Buf, BytesMut};
use tokio_util::codec::Decoder;

#[derive(Debug, Clone)]
pub enum ItchMessage {
    AddOrder       { sequence: u64, timestamp: u64, order_ref: u64, side: u8, shares: u32, stock: [u8; 8], price_ticks: u32 },
    DeleteOrder    { sequence: u64, timestamp: u64, order_ref: u64 },
    OrderExecuted  { sequence: u64, timestamp: u64, order_ref: u64, executed_shares: u32, match_number: u64 },
    OrderCancel    { sequence: u64, timestamp: u64, order_ref: u64, cancelled_shares: u32 },
    OrderReplace   { sequence: u64, timestamp: u64, orig_order_ref: u64, new_order_ref: u64, shares: u32, price_ticks: u32 },
    Other(u8),
}

#[derive(Debug, thiserror::Error)]
pub enum ItchDecodeError {
    #[error("invalid length prefix")]            InvalidLength,
    #[error("incomplete message body")]          IncompleteBody,
    #[error("unknown message type byte: {0:#x}")] UnknownMessageType(u8),
    #[error("io: {0}")]                          Io(#[from] std::io::Error),
}

pub struct ItchDecoder;

impl ItchDecoder { pub fn new() -> Self { Self } }

impl Decoder for ItchDecoder {
    type Item  = ItchMessage;
    type Error = ItchDecodeError;

    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        if src.len() < 2 { return Ok(None); }
        let len = u16::from_be_bytes([src[0], src[1]]) as usize;
        if len == 0 { return Err(ItchDecodeError::InvalidLength); }
        if src.len() < 2 + len { return Ok(None); }
        let _ = src.split_to(2);   // consume length prefix
        let kind = src.get_u8();
        match kind {
            b'A' => { /* parse AddOrder body of len-1 bytes; see exercise */    Ok(Some(ItchMessage::Other(b'A'))) }
            b'D' => { /* parse DeleteOrder body */                                Ok(Some(ItchMessage::Other(b'D'))) }
            b'E' => { /* parse OrderExecuted body */                              Ok(Some(ItchMessage::Other(b'E'))) }
            b'X' => { /* parse OrderCancel body */                                Ok(Some(ItchMessage::Other(b'X'))) }
            b'U' => { /* parse OrderReplace body */                               Ok(Some(ItchMessage::Other(b'U'))) }
            other => {
                let _ = src.advance(len - 1);  // skip body
                Ok(Some(ItchMessage::Other(other)))
            }
        }
    }
}

decode 在缓冲里字节少于 2 + len 时返回 Ok(None);分帧层继续从套接字读,等更多数据到了再次调用 decode。返回 Ok(Some(message)) 时缓冲游标已经被 BytesMut::split_toBytesMut::get_u* 这类「读+前进」操作推进过,无需手动管理。五种载荷的报文体真实解析作为练习。

序列号缺口检测与恢复

每条 ITCH 消息携带序列号;教学示例用 u64,生产 纳斯达克 ITCH 在外层 moldUDP64 帧里用 32-bit 序号 (3.4.4 L4 已介绍, 此处仅前向指针)。feed handler 维护 per-stream 的 next_expected,每条消息归入三类之一:

  1. msg.sequence == next_expected —— 顺序到达,前进并发出 FeedEvent::Message(msg)
  2. msg.sequence > next_expected —— 出现缺口,发出 FeedEvent::Gap { start: next_expected, end: msg.sequence },缺口计数器递增,调用方触发恢复路径。区间是半开:start 包含,end 不包含。
  3. msg.sequence < next_expected —— 重复消息,丢弃并递增重复计数器。
use std::sync::atomic::{AtomicU64, Ordering};

#[derive(Debug, Clone)]
pub enum FeedEvent {
    Message(ItchMessage),
    Gap { start: u64, end: u64 },     // half-open interval [start, end)
    Recovered,
    EndOfStream,
}

pub struct GapDetector {
    next_expected: u64,
    duplicate_counter: AtomicU64,
    gap_counter:       AtomicU64,
}

impl GapDetector {
    pub fn new(start_seq: u64) -> Self {
        Self { next_expected: start_seq, duplicate_counter: AtomicU64::new(0), gap_counter: AtomicU64::new(0) }
    }

    pub fn classify(&mut self, msg_seq: u64, msg: ItchMessage) -> Option<FeedEvent> {
        if msg_seq == self.next_expected {
            self.next_expected += 1;
            Some(FeedEvent::Message(msg))
        } else if msg_seq > self.next_expected {
            let gap = FeedEvent::Gap { start: self.next_expected, end: msg_seq };
            self.gap_counter.fetch_add(1, Ordering::Relaxed);
            self.next_expected = msg_seq + 1;
            Some(gap)   // triggers recovery in the caller
        } else {
            self.duplicate_counter.fetch_add(1, Ordering::Relaxed);
            None        // duplicate; discard
        }
    }
}

生产环境每条行情接入都要实现的三条恢复路径;下面表格的路径名、机制描述、延迟区间、用途在两区版本上是逐字节一致的,因为这套纪律是普适的:

Recovery path                Mechanism                                                                                                Latency                                  Use case
TCP retransmission           Request range [start, end) over a TCP socket to the exchange's retransmission server                     ~1-10 ms (one TCP round-trip)            Small gaps within the exchange's retransmission window
A/B redundant feeds          Subscribe to two multicast groups on distinct paths; dedupe by sequence number; use whichever arrives first  ~0 ms (no recovery cost; transparent)     Single-feed packet loss; the everyday path
Snapshot-then-incrementals   Open TCP to the snapshot server; request 'state as of seq N'; install; replay incrementals from N+1 over multicast  ~50-500 ms (snapshot transfer + catchup)  Large gaps that exceed the retransmission window; cold start; the universal fallback

接入 L2 队列

在一个 tokio::spawn 出来的任务里,feed handler 循环消费 framed 流、用 GapDetector 分类、把每条顺序到达的 ItchMessage 转成 MdEvent (L2 / L4 用的词汇类型),然后用 L2 的 drop-oldest 模式推入共享 Arc<crossbeam_queue::ArrayQueue<MdEvent>>。这个任务结构是「async 网络边缘 + 同步 strategy 线程」的标准形态:

use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use crossbeam_queue::ArrayQueue;
use tokio::net::TcpStream;
use tokio_stream::StreamExt;
use tokio_util::codec::Framed;

#[derive(Debug, Clone, Copy)]
pub struct MdEvent {
    pub ts:          u64,
    pub symbol_id:   u32,
    pub price_ticks: i32,
    pub qty:         u32,
}

pub async fn run_feed_handler(
    stream: TcpStream,                          // or UdpSocket-backed framed stream
    queue:  Arc<ArrayQueue<MdEvent>>,
    lost:   Arc<AtomicU64>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    let mut framed = Framed::new(stream, ItchDecoder::new());
    let mut gap_detector = GapDetector::new(1);

    while let Some(msg) = framed.next().await {
        let msg = msg?;
        let seq = match &msg {
            ItchMessage::AddOrder      { sequence, .. } => *sequence,
            ItchMessage::DeleteOrder   { sequence, .. } => *sequence,
            ItchMessage::OrderExecuted { sequence, .. } => *sequence,
            ItchMessage::OrderCancel   { sequence, .. } => *sequence,
            ItchMessage::OrderReplace  { sequence, .. } => *sequence,
            ItchMessage::Other(_)                        => continue,
        };
        let Some(event) = gap_detector.classify(seq, msg) else { continue };
        let FeedEvent::Message(itch) = event else {
            // FeedEvent::Gap | Recovered | EndOfStream — triggers recovery; out of scope here.
            continue;
        };
        let md_event = MdEvent { ts: 0, symbol_id: 0, price_ticks: 0, qty: 0 };  // map itch -> MdEvent (exercise)
        let _ = itch;
        if let Err(ev) = queue.try_push(md_event) {
            // drop-oldest backpressure (L2 pattern); count the loss for observability.
            let _ = queue.try_pop();
            let _ = queue.try_push(ev);
            lost.fetch_add(1, Ordering::Relaxed);
        }
    }
    Ok(())
}

网络 I/O 在 tokio 里,交易热路径在用 core_affinity 钉到某一核的独立线程上 (L4 内容),两者的边界是一个 ArrayQueue。这条「异步网络边缘 + 同步策略线程」的形态在国内一线量化的 Rust 行情接入栈上几乎是默认架构;tokio 单 runtime 多 worker 配置已是行业惯例,feed-handler 任务通常以 tokio::task::Builder 命名,方便在 tokio-console 里追踪。miotokio 底下的 epoll / kqueue 层;一般不会直接用它,但读深栈跟踪或 strace 时知道层次有帮助。内核旁路下一步:tokio-uring 是基于 io_uring 的 runtime,在追求亚微秒、无系统调用 I/O 的部分场景有生产使用;AF_XDP 是 socket 层的内核旁路;DPDK 是 NIC 驱动层完全内核旁路 —— 全部点名,不展开,前向指针到 3.5.4 与生产部署。ITCH 是单向行情;委托侧用 FIX 4.4 over TCP,带状态会话 (序号重置、resend request、gap-fill、store-and-forward 持久化),归 3.5.4。课后阅读:tokio.rs/tokio/tutorial 官方教程 (英文,部分中文社区翻译, course.rs 子站);docs.rs/tokio-util/、docs.rs/bytes/ (英文);纳斯达克 TotalView ITCH 5.0 规范 nasdaqtrader.com (英文公开);SSE Level-2 行情接口规范公开摘要 sse.com.cn (中文,仅作协议对照);SZSE Level-2 行情接口规范公开摘要 szse.cn (中文);CFFEX 行情接口规范 cffex.com.cn (中文);moldUDP64 规范 (纳斯达克 公开,英文,序列号外层封装的参考);火山引擎 / 字节跳动 / PingCAP TiKV 团队的 Rust async 实践博客 (公众号 / InfoQ 中文站, 中文)。国内 Rust 行情接入团队 (幻方 / 鸣熙 / 九坤 / 明汯 / 灵均) 普遍以 tokio + tokio-util::codec + bytes 构造 feed handler,教学路径与生产现场一致。

Exercise

Exercise

(a) 把 ItchDecoder 补齐:对五种载荷类型按报文体长度 (AddOrder 36-byte body, DeleteOrder 21-byte, OrderExecuted 31-byte, OrderCancel 23-byte, OrderReplace 34-byte) 解析;用 bytes::Bufsrc.get_u8() / src.get_u32() / src.get_u64() 取字段;写一个单元测试,手动构造 5 条已知序号的合成 ITCH 流,断言解出的变体一一对应。(b) 写配套的合成发布器:一个 tokio::spawn 任务,造 1_000_000 条序号 1..=1_000_000 的 AddOrder 消息,通过 UdpSocket::send_to 发往 multicast 组 239.0.0.1,速率可控 (参数化 tokio::time::interval(Duration::from_micros(N)));消息通过镜像 DecoderEncoder 实现写入 BytesMut。(c) 实现 GapDetector::classify 并写单元测试,覆盖顺序流 (无缺口,next_expected 1, 2, 3, ... 前进)、有缺口流 (1, 2, 5, 6 —— 期望 FeedEvent::Gap { start: 3, end: 5 })、有重复流 (1, 2, 2, 3 —— 期望第二个 2 返回 Noneduplicate_counter 为 1)。(d) 把 feed-handler 任务连接到 L2 的 Arc<ArrayQueue<MdEvent>> (容量 1024),drop-oldest 反压加 lost_events_counter: Arc<AtomicU64>,每丢一次递增;让发布器跑队列承不住的速率;运行 10 seconds 后报告 lost_events_counter (预期:> 0)。(e) 写一个桩 async fn request_snapshot_and_resume(start_seq: u64, end_seq: u64) -> Result<Vec<ItchMessage>, FeedError>,当前返回 Ok(Vec::new()) (生产版属 L4 / 3.5.4),在 feed-handler 任务的缺口处理分支里调用它;写一个集成测试,验证注入一次缺口时这个桩恰好被调用一次。(f) 不实现,各用两句话解释:(i) 为什么 TCP 重传适合小缺口但不适合冷启动;(ii) 为什么 A/B 冗余 feed 仲裁能以近零延迟恢复单 feed 丢包;(iii) 为什么 snapshot-then-incrementals 虽然最慢但是普适回退。

提示
(a) 部分按 Buf trait 解:src.get_u64() 取 u64,src.get_u32() 取 u32,src.get_u8() 取一字节;stock: [u8; 8]src.copy_to_slice(&mut stock) 复制。
提示
(c) 半开区间 start 包含、end 不包含;缺口流 (1, 2, 5, 6) 跳了 3 与 4,缺口为 [3, 5);缺口处理后把 next_expected 推到 msg_seq + 1。

行业背景

国内做 Rust 行情接入的私募与自营头部:幻方、鸣熙、九坤、明汯、灵均、宽德、思勰、衍盛、磐松资管、博普科技、宁聚资产、致诚卓远、英仕曼中国;券商自营线包括中信证券、中信建投、华泰证券、海通证券、招商证券、国泰君安、中金公司、广发证券、东方证券、华泰资管。常见接入面:上证所 Level-2 行情 (SSE Level-2 通过 STEP / Binary / FAST 协议)、深证所 Level-2 行情 (SZSE Level-2)、上海期货交易所行情 (SHFE)、郑州商品交易所行情 (CZCE)、大连商品交易所行情 (DCE)、中金所行情 (CFFEX) 与上海国际能源中心 (INE) 的 SC 原油行情。COLO 机房面包括 CFFEX 张江、SSE 浦东、SZSE 福田、SHFE 浦东、DCE 大连、CZCE 郑州。下游数据源:Wind 资讯、东方财富 Choice、聚源数据、巨潮资讯网、同花顺、通联数据、米筐量化、聚宽、迅投、天软科技,这些在策略上线前的回放阶段灌进 ItchDecoder 做正确性校验,真正连 SSE / SZSE / CFFEX 直连之前先在它们的数据上跑一遍。

通向 L4 的桥

到这里你已经建好了喂给交易引擎的异步网络边缘:tokio::net::UdpSocketjoin_multicast_v4 订阅多播、tokio_util::codec::Framed 加自定义 ItchDecoder 解 wire、GapDetector 做序号纪律、drop-oldest 把事件交给 L2 的 Arc<ArrayQueue<MdEvent>>。L4 是组合 capstone:L1 的缓存布局加 CachePaddedcriterion 测量、L2 的 ArrayQueue 加 drop-oldest 加 core_affinity CPU 钉绑、L3 的 feed handler,全部汇成一个可跑的交易引擎 —— safe-Rust 的 OrderBookVec<Order> 背后的池、RiskGate 风控、通过 tokio::net::TcpStream 的订单路由,以及每阶段一份 hdrhistogram::Histogram<u64> 测量循环,报 P50 / P99 / P99.9 / P99.99 tick-to-trade 时延,对照「调优 Rust 栈生产预算 5-50 µs」这条标尺。