国内一家头部私募的 CSI 300 ETF 期权桌的 C++ 工程师,正在把一个新的 510300.SH 期权策略接进生产引擎。单线程回测跑十一秒;上线引擎要求一个线程从 CFFEX 行情链路喂 tick,第二个线程跑 Greeks 重估,第三个线程把订单分发到 SSE 的报盘网关。第一版在第二个 tick 就死锁了。第二版位置表被搞乱了——两条线程同时读写一个 std::vector<double> 而完全没有同步。第三版用 std::mutex 修掉了数据竞争,结果立刻又引入了一个死锁:风控线程和下单线程以相反的顺序申请两把锁。桌组负责人在白板上写下结论:「单线程正确性不够;你得像当年学 std::unique_ptr 那样,把语言提供的并发原语学清楚。」本课就是它。3.4.1 到 3.4.3 一路把现代单线程 C++ 搭起来——智能指针、STL、分配器、缓存感知的数据布局、在沪深 300 ETF 与上证 50ETF 期权热点路径上 profile-and-optimize 的代码。3.4.4 转向多线程半边。L1 是概念过渡:怎么启动一条线程、怎么共享状态而不让它损坏、怎么在线程间协作而不必忙等、怎么把一个值从 worker 线程交还给调用方。L2 再深入底下的 C++ 内存模型。
(本课每一个 Fenced cpp 代码块都是 gate 会按字节核对的精确形式。)
std::thread / std::jthread 与 join 契约
C++11 引入 std::thread,作为平台底层线程原语(Linux 上的 pthread_create、Windows 上的 CreateThread)的可移植封装。最小程序:std::thread t([]{ work(); }); t.join();。lambda 是线程体——函数指针或任何可调用对象也行。join() 阻塞调用线程,直到新线程跑完。语言留了一个故意尖锐的角:如果一个 std::thread 在 t.joinable() == true 的状态下被析构,析构函数会调用 std::terminate。语言拒绝默默泄漏一条正在运行的线程。两种正确替代:t.join()(在这里阻塞到线程结束)或 t.detach()(放弃所有权;线程自己跑完,由操作系统回收)。detach() 极少正确,因为脱离的线程往往比调用它的作用域活得更久,最终读到的捕获局部变量早已销毁。
生产级形式是 C++20 的 std::jthread:析构时自动 join,并自带 std::stop_token 用于协作式取消。国内多数 quant 代码库仍以 C++17 为稳定的最小公分母(上海期货信息技术的 CTP API 头文件、Solarflare OpenOnload 示例、以及多数 SSE / SZSE 报盘库都假设 C++17)。30 行把 std::jthread 移植回来:
// C++17 back-port of C++20 std::jthread: RAII auto-join + cooperative stop.
#include <atomic>
#include <thread>
#include <utility>
class StopToken {
public:
explicit StopToken(std::atomic<bool>* flag) : flag_(flag) {}
bool stop_requested() const noexcept { return flag_->load(std::memory_order_acquire); }
private:
std::atomic<bool>* flag_;
};
class JThread {
public:
template <typename F, typename... Args>
explicit JThread(F&& f, Args&&... args) {
StopToken tok(&stop_);
t_ = std::thread(std::forward<F>(f), tok, std::forward<Args>(args)...);
}
JThread(const JThread&) = delete;
JThread& operator=(const JThread&) = delete;
void request_stop() noexcept { stop_.store(true, std::memory_order_release); }
~JThread() {
request_stop();
if (t_.joinable()) t_.join();
}
private:
std::atomic<bool> stop_{false};
std::thread t_;
};
析构函数置位标志再 join,于是 JThread 出作用域永远不会触发 std::terminate。线程体把 StopToken 作为首参收下,在自己的循环里轮询 tok.stop_requested()。store 上的 std::memory_order_release 和 load 上的 std::memory_order_acquire 建立的同步关系,L2 会正式定义;现在先把它读作「我在请求 stop 之前做的写,对那个观察到 stop 请求之后的 worker 是可见的」。这里出于讲解简洁删了 move(删了 copy 也没补 move);生产形式会加上 noexcept move。
互斥锁:lock_guard / unique_lock / scoped_lock / shared_mutex
std::mutex 是标准的二值互斥原语。不要直接调用 m.lock() 与 m.unlock()——两者之间抛异常就会泄漏锁。请用 RAII 作用域包装。std::lock_guard<std::mutex> lk(m); 在构造时上锁、析构时解锁;它是适合「整个临界区刚好在一个作用域里」的工具。std::unique_lock<std::mutex> 是「需要临时释放锁」(defer-lock 构造、lk.unlock() 后再 lk.lock() 跨段落)或者「需要把锁交给 std::condition_variable::wait」时的升级款——CV 必须原子地「解锁后再上锁」,所以必须吃 unique_lock 接口。std::scoped_lock(m1, m2, ...)(C++17 变参)一次性把所有传入的互斥锁锁上,并使用标准库的避死锁算法——这是「一个临界区需要两把或更多锁」时的正确工具。
对于以读为主的状态,std::shared_mutex(C++17)支持读者-写者模式。读者用 std::shared_lock<std::shared_mutex>(可并发),单一写者用 std::unique_lock<std::shared_mutex>(或 std::lock_guard)。经典用法是策略每次信号计算都要查的「参数配置表」——平时只读,写者线程偶发热更新。shared_mutex 的代价是每次读相比无竞争的 mutex 多一笔可观开销;上线前请 benchmark。每微秒都珍贵的热路径上,更好的做法常常是「std::shared_ptr<const Config> 原子换包」、读侧完全免锁——但那需要 L2 才教的原子加载语义。
一个线程安全的有界阻塞队列
互斥锁 + 条件变量最经典的范例是一个有界阻塞队列。一把锁保护缓冲区。两个条件变量:not_empty_(消费者等它;生产者在 push 之后通知)和 not_full_(生产者等它;消费者在 pop 之后通知)。每一次 wait 都用「谓词循环」形式,因为伪唤醒确实存在——操作系统允许在没有匹配 notify 的情况下把 wait 唤醒,标准明文规定。谓词每次 wait 返回都会重新检查实际条件。
#include <condition_variable>
#include <mutex>
#include <optional>
#include <vector>
template <typename T>
class BoundedBlockingQueue {
public:
explicit BoundedBlockingQueue(std::size_t capacity) : capacity_(capacity) {
buf_.reserve(capacity);
}
void push(T value) {
std::unique_lock<std::mutex> lk(m_);
not_full_.wait(lk, [&]{ return buf_.size() < capacity_; }); // predicate loop for spurious wakeup
buf_.push_back(std::move(value));
lk.unlock();
not_empty_.notify_one();
}
T pop() {
std::unique_lock<std::mutex> lk(m_);
not_empty_.wait(lk, [&]{ return !buf_.empty(); }); // predicate loop for spurious wakeup
T value = std::move(buf_.front());
buf_.erase(buf_.begin());
lk.unlock();
not_full_.notify_one();
return value;
}
private:
const std::size_t capacity_;
std::vector<T> buf_;
std::mutex m_;
std::condition_variable not_empty_;
std::condition_variable not_full_;
};
实现上有两点注解。第一,push 与 pop 都在调用 notify_one 之前显式 unlock 那个 unique_lock。这是优化,不是正确性问题——持锁通知是安全的,但被唤醒的线程会立刻在你还握着的锁上再阻塞一次,浪费一次往返。先释放,让被唤醒的线程一被调度就能跑。第二,这里用 notify_one 是合适的:每次 push 或 pop 最多解锁一名等待者;只有「一个事件可能解锁多名等待者」时(生产者-消费者设计里极少见)才用 notify_all。
把队列用 1000 个 tick 的生产者-消费者对配上 sentinel 干净退出来跑一遍:
struct Tick { std::string symbol; double price; std::int64_t ts_ns; };
static constexpr int kNumTicks = 1000;
static const Tick kSentinel{"DONE", 0.0, 0};
void producer(BoundedBlockingQueue<Tick>& q) {
using namespace std::chrono;
for (int i = 0; i < kNumTicks; ++i) {
Tick t{"510300.SH", 4.20 + 0.001 * (i % 100) - 0.05,
duration_cast<nanoseconds>(steady_clock::now().time_since_epoch()).count()};
q.push(std::move(t));
}
q.push(kSentinel);
}
void consumer(BoundedBlockingQueue<Tick>& q) {
int n = 0;
while (true) {
Tick t = q.pop();
if (t.symbol == kSentinel.symbol) break;
++n;
}
std::printf("consumed %d ticks\n", n);
}
int main() {
BoundedBlockingQueue<Tick> q(64);
std::thread prod(producer, std::ref(q));
std::thread cons(consumer, std::ref(q));
prod.join();
cons.join();
return 0;
}
sentinel 模式是标准的干净退出惯用法:生产者推一个消费者认得的特殊值;消费者收到时跳出循环;两条线程都 join 干净。生产代码通常把 sentinel 换成 pop 返回 std::optional<T> 加队列上的 done() 方法,但概念形状一致。在 Intel Xeon Gold 6342 / 6354(Ice Lake-SP)或同档 AMD EPYC 上整轮跑完远在 100 ms 以内——每次 push / pop 配对都是亚微秒级别的互斥锁往返。
std::async / std::future / std::promise
基于 future 的并发层是「让一条 worker 线程计算单个值并交还调用方」的合适工具。std::async(std::launch::async, []{ return compute(); }) 返回 std::future<T>——fut.get() 阻塞直到结果就绪。两条纪律要谨记。其一,永远不要用 std::launch::deferred:它把工作放到第一次 get 时同步执行,完全违背初衷。请总是显式传 std::launch::async。其二,永远不要让 std::async 返回的那个 std::future 在不 get 的情况下出作用域:它的析构函数会阻塞等待 worker 跑完。这是标准委员会刻意的设计,用来阻止「沉默的 fire-and-forget 线程」;实际后果是:要么把 future 存起来,要么就地 get。
要对生产者-消费者的交接做更精细控制、又不想引入 CV,可以直接用 std::promise<T> 加 std::future<T>。worker 持有 promise,主线程持有匹配的 future。worker 就绪时调用 prom.set_value(result);主线程的 fut.get() 就以那个值解阻。
#include <future>
double black_scholes_call(double S, double K, double r, double sigma, double T); // from 3.4.1
int main() {
// std::async form: launch worker, get result via future.
std::future<double> fut = std::async(std::launch::async, []{
return black_scholes_call(4.20, 4.30, 0.024, 0.18, 0.5);
});
double price = fut.get(); // blocks until worker finishes
std::printf("price = %.6f\n", price);
// std::promise / std::future form: same hand-off without async.
std::promise<double> prom;
std::future<double> fut2 = prom.get_future();
std::thread worker([&]{
prom.set_value(black_scholes_call(4.20, 4.30, 0.024, 0.18, 0.5));
});
double price2 = fut2.get();
worker.join();
std::printf("price (promise) = %.6f\n", price2);
return 0;
}
两种形式产生同一个数:510300.SH 欧式认购、S = 4.20、K = 4.30、r = 0.024、sigma = 0.18、T = 0.5 的 Black-Scholes 闭式价。这里引用的 Black-Scholes 认购公式就是教科书形式,
其中 、——3.4.1 推导过;本课只需要那个能算出它的例程。std::async 形式更精炼,适合自包含的单次计算;std::promise 形式是 worker 在更长例程中需要在特定时点发出完成信号时的基础积木。std::packaged_task<R(Args...)> 是这一族的第三位成员:把一个可调用对象包起来,可以稍后被调用、并通过关联的 future 取结果——线程池的基础积木。线程池的完整处置交给 3.4.5 与策略框架一起讲;当前只需记住「需要它时知道叫它什么」即可。
死锁与加锁顺序规则
两把互斥锁、两条线程以相反顺序申请它们的程序,一旦调度交叠到位,就会确定性地死锁。
// BUG: AB-BA deadlock — thread A locks m1 then m2, thread B locks m2 then m1.
std::mutex m1, m2;
void thread_a_bad() {
std::lock_guard<std::mutex> l1(m1);
std::this_thread::sleep_for(std::chrono::milliseconds(1)); // yield to thread B
std::lock_guard<std::mutex> l2(m2); // blocks: B holds m2
}
void thread_b_bad() {
std::lock_guard<std::mutex> l2(m2);
std::this_thread::sleep_for(std::chrono::milliseconds(1)); // yield to thread A
std::lock_guard<std::mutex> l1(m1); // blocks: A holds m1
}
// FIX 1: global lock ordering — always m1 before m2 everywhere.
// FIX 2: std::scoped_lock atomically locks both via the standard library's
// deadlock-avoidance algorithm.
void thread_either_good() {
std::scoped_lock lk(m1, m2); // C++17: locks both, no AB-BA possible
// critical section uses both m1- and m2-protected state
}
诊断很机械:gdb -p <pid>,然后 thread apply all bt。每条线程都会显示一个 pthread_mutex_lock 调用栈,正等着对方持有的那把锁。两种修法已经写在上面的注释块里了。Fix 1 是全局加锁顺序:整个程序里所有代码路径都按同一个全序申请锁。这是多数生产代码库采用的纪律——它能扩展到 N 把锁,并且容易在 code review 时执行。Fix 2 是 std::scoped_lock(m1, m2),把工作委托给标准库的避死锁算法(try-lock + 回退)。临界区确实需要两把锁时用 Fix 2;用 Fix 1 作为全局纪律,避免问题第一次冒出来。
最后一个脚注:std::recursive_mutex 在这里被点名只是为了警告它。它唯一合法的用途,是疏通一个开发者无法重构的环形调用图——而正确答案几乎永远是「重构那个调用图」。当你伸手去拿 std::recursive_mutex 时,是设计在更高一层发出报警。
下一课
L2 进入 C++ 内存模型:std::atomic<T>、六种内存顺序、release-acquire / 顺序一致性规则、数据竞争定义、以及两条线程一旦写入相邻地址就会暴露的伪共享惩罚。本课所有内容都默认了底层是顺序一致的——每一次互斥锁的获取与释放都建立了标准要求的同步。L2 教你「不用互斥锁也能完成那次同步」,这是 L3 无锁 SPSC 环形缓冲的前置门槛。上面那 30 行 JThread back-port 在 stop 标志上已经用了 release / acquire;L2 会解释为什么那是对的、如果换成 std::memory_order_relaxed 又会坏在哪里。
练习
Exercise
(a) 实现上面的 JThread 类,用它启动一个每 100 ms 打印一次 "tick %d\n" 的 worker;在 main 里 sleep 1 秒,然后让 JThread 出作用域。确认程序干净退出(不触发 std::terminate),且 worker 观察到 stop_requested() == true 并迅速退出。(b) 实现上面的 BoundedBlockingQueue<Tick>,容量 64,跑 kNumTicks = 1000 的生产者 / 消费者测试。确认 consumer 打印 "consumed 1000 ticks\n",两条线程都 join。用 std::chrono::steady_clock 计时端到端 wall clock;整轮应远低于 100 ms,因为每次 push / pop 配对都是亚微秒级别的互斥锁往返。(c) 把 AB-BA 死锁例子编译并运行;观察程序挂起(Ctrl-C 终止)。gdb -p <pid> 后跑 thread apply all bt,识别两条阻塞在 pthread_mutex_lock、各等对方持锁的线程。把两个坏函数替换为 thread_either_good 形式,再跑 1000 次双线程;确认不再死锁。(d) 跑 std::async / std::promise 范例;确认两次打印的价格在 6 位小数内一致、且匹配本区域欧式认购的 Black-Scholes 闭式值。用一句话说明 std::launch::deferred 在这里为什么会违背 std::async 的初衷。
提示
(a):线程体得收一个 StopToken 作为首参,在两次 sleep 之间轮询 tok.stop_requested()。析构函数先置位标志,再 join。
提示
(c):全局加锁顺序规则要求每个代码路径都先锁 m1 再锁 m2。std::scoped_lock 修法直接把两把锁原子地锁上,从根上回避了这个问题。