适读人群:C++ 中高级开发者、高性能系统工程师、音视频/量化交易/游戏引擎方向的同学
一、库介绍

在多线程高性能系统中,生产者-消费者队列是最常见的通信原语之一。然而大多数队列实现为了通用性,往往引入了锁、条件变量甚至动态内存分配,这些在极低延迟场景下都是"毒药"。

SPSCQueue(Single-Producer Single-Consumer Queue)是由 Erik Rigtorp 开发的一款专为生产者 + 单消费者场景精心打磨的无等待、无锁、定长环形队列,使用C++11编写,以单头文件形式分发。

它的定位非常精准:

  • 不是通用并发队列,只服务于严格的 1 写 1 读场景
  • 不是无界队列,容量在构造时固定,杜绝运行期内存分配
  • 不是学术玩具,它比 boost::lockfree::spsc 吞吐量高73%,比 folly::ProducerConsumerQueue 高144%(AMD Ryzen 9 3900X 实测)

队列实现

吞吐量(ops/ms)

往返延迟(ns)

SPSCQueue

362,723

133

boost::lockfree::spsc

209,877

222

folly::ProducerConsumerQueue

148,818

147

GitHub 地址:https://github.com/rigtorp/SPSCQueue
Star 数:⭐ 1.2k(截至 2026 年 5 月)
许可协议:MIT

二、核心特性2.1 Wait-Free + Lock-Free 双重保证

SPSCQueue 的入队和出队操作均不使用任何互斥锁或条件变量,也不存在 CAS 重试(Compare-And-Swap spin loop)。在满足单生产者、单消费者约束的前提下,每次操作仅需有限步骤即可完成,这是严格意义上的wait-free

2.2 False Sharing 精心规避

现代 CPU 的 Cache Line 通常为 64 字节。若两个线程频繁读写同一 Cache Line 上的不同变量,即使逻辑上互不干扰,硬件层面也会产生大量无效的 Cache 一致性流量,这就是false sharing

SPSCQueue 通过以下措施彻底消除这一隐患:

  1. head(读索引)和 tail(写索引)分别对齐并填充到 false sharing 范围(即 Cache Line 大小)
  2. 底层 slots 缓冲区的首尾各额外填充一个 false sharing 范围,防止与相邻内存分配产生 false sharing
2.3 本地缓存索引,减少 Cache 一致性流量

这是 SPSCQueue 吞吐量远超竞品的关键设计。

传统环形队列的困境:读操作需要以独占状态(exclusive)加载 head,同时以共享状态(shared)加载 tail 检查队列是否为空。写操作也需要读 head,这导致 Cache Line 频繁在 shared 和 exclusive 之间切换,最坏情况下每次读写都触发 Cache 一致性流量。

SPSCQueue 的解法:读端本地缓存 tail,写端本地缓存 head。仅当本地缓存的索引显示队列为空(或满)时,才去读对方的真实值。这意味着在队列非空时,可以连续完成多次读操作而不触碰写端的 tail Cache Line,大幅降低了 Cache 一致性开销。

2.4 支持非 2 的幂次容量

许多 ring buffer 实现为了用位运算取模而强制要求容量为 2 的幂次。SPSCQueue 通过多分配一个 slot作为"队列满"标识,支持任意容量,让调用方可以按业务需求精确指定缓冲区大小。

2.5 Header-Only,零侵入集成

整个库只有一个头文件 include/rigtorp/SPSCQueue.h,无需编译、无依赖,#include 即用。

2.6 支持 Huge Page(大页内存)

通过标准 Allocator 接口(以及 C++17 的 P0401R3 尺寸反馈扩展),可以无缝对接 Linux mmap 大页分配器,进一步降低 TLB miss 带来的延迟抖动。

三、架构解析3.1 整体内存布局

┌─────────────────────────────────────────────────────────┐│  padding (false_sharing_range bytes)                    │├─────────────────────────────────────────────────────────┤│  head_  (对齐到 cache line, 只有 consumer 写)            ││  padding                                                │├─────────────────────────────────────────────────────────┤│  tail_  (对齐到 cache line, 只有 producer 写)            ││  padding                                                │├─────────────────────────────────────────────────────────┤│  slots_ buffer                                          ││  ┌──────┬──────┬──────┬──────┬──────┬──────┐           ││  │ pad  │ [0]  │ [1]  │ ...  │ [N]  │ pad  │           ││  └──────┴──────┴──────┴──────┴──────┴──────┘           ││   首尾各一个 padding 防止相邻分配产生 false sharing       │└─────────────────────────────────────────────────────────┘
3.2 读写端本地缓存

Producer 端:tail_  (写)  ← 只有 producer 更新head_  (读)  ← 仅队列"看起来满"时才读一次, 用 readHead_ 本地缓存Consumer 端:head_  (写)  ← 只有 consumer 更新tail_  (读)  ← 仅队列"看起来空"时才读一次, 用 writeTail_ 本地缓存

这种设计让两个线程在"快路径"下完全不争抢对方的 Cache Line。

3.3 关键数据结构(简化)

template >class SPSCQueue {// 缓冲区容量 (capacity_ + 1 个 slot 实际分配)size_t capacity_;T* slots_;// 生产者私有:写索引 + 缓存的读索引alignas(kFalseSharingRange) std::atomic head_;alignas(kFalseSharingRange) size_t headCache_; // 仅 producer 访问// 消费者私有:读索引 + 缓存的写索引alignas(kFalseSharingRange) std::atomic tail_;alignas(kFalseSharingRange) size_t tailCache_; // 仅 consumer 访问
四、快速上手4.1 安装

方式一:直接复制头文件

# 将头文件复制到你的项目 include 目录curl -O https://raw.githubusercontent.com/rigtorp/SPSCQueue/master/include/rigtorp/SPSCQueue.h

方式二:CMake FetchContent(推荐)

include(FetchContent)FetchContent_Declare(SPSCQueueGIT_REPOSITORY https://github.com/rigtorp/SPSCQueue.gitGIT_TAG        masterFetchContent_MakeAvailable(SPSCQueue)target_link_libraries(your_target PRIVATE rigtorp::SPSCQueue)

方式三:直接 clone + 手动指定 include 路径

git clone https://github.com/rigtorp/SPSCQueue.git# 编译时加 -I SPSCQueue/includeg++ -std=c++17 -O2 -I SPSCQueue/include your_app.cpp -o your_app
4.2 第一个程序:Hello SPSCQueue

#include#include#include "rigtorp/SPSCQueue.h"int main() {// 创建容量为 1 的队列,存放 int 类型rigtorp::SPSCQueue q(1);// 消费者线程:等待队列非空后读取并打印auto t = std::thread([&] {while (!q.front()); // 自旋等待std::cout << "received: " << *q.front() << std::endl;q.pop();// 主线程作为生产者:推入数据q.push(42);t.join();return 0;

输出:

received: 42
五、详细 API 模块5.1 构造与析构

// 构造:指定容量(至少为 1)rigtorp::SPSCQueue q(capacity);// 支持自定义 Allocator(C++17 开始支持 size feedback)rigtorp::SPSCQueue q(capacity, MyAllocator{});

容量固定后不可动态扩容,实际会多分配 1 个 slot 用于区分"满"和"空"状态。

5.2 入队操作(Producer 端)

阻塞式入队

// 拷贝构造入队,队列满时阻塞自旋void push(const T& v);// 移动构造入队(右值引用,避免拷贝开销)templatevoid push(P&& v);

非阻塞式入队(推荐用于实时系统)

// 返回 true 表示成功,false 表示队列已满bool try_push(const T& v);templatebool try_push(P&& v);

原地构造入队(Emplace,零拷贝)

// 在队列内部直接构造对象,args 转发给 T 的构造函数void emplace(Args&&... args);bool try_emplace(Args&&... args);

代码示例:

struct Msg {int id;std::string payload;rigtorp::SPSCQueue q(128);// 移动入队,避免 string 拷贝q.push(Msg{1, "hello"});// try_push:非阻塞,实时场景推荐if (!q.try_push(Msg{2, "world"})) {// 队列满,按业务处理(丢弃/降级/记录日志)std::cerr << "queue full, drop message\n";// emplace:原地构造,最高效q.emplace(3, "emplace direct");
5.3 出队操作(Consumer 端)

SPSCQueue 采用**"peek + pop"两步出队**设计,这比传统 dequeue() 返回值的方式更灵活,可以在决定消费之前先检查数据。

// 查看队首指针,队列为空时返回 nullptrT* front();// 弹出队首,必须在 front() 返回非 nullptr 后调用void pop();// 辅助查询size_t size();  // 当前队列中的元素数量bool empty();   // 是否为空

代码示例:

rigtorp::SPSCQueue q(64);// ... 生产者已经 push 了一些数据 ...// 消费者批量消费while (true) {int* p = q.front();if (p == nullptr) {// 队列为空,休眠或做其他事情break;// 处理数据(此时数据还在队列里,安全访问)process(*p);q.pop(); // 处理完毕后再弹出
⚠️ 重要约束:pop() 必须且只能在 front() 返回非 nullptr 后调用,否则行为未定义。
5.4 大页内存 Allocator(Linux 高性能场景)

当队列容量很大时(如百万级缓冲),使用 Linux Huge Page(2MB 大页)可以显著降低 TLB miss,进一步提升吞吐量和稳定性。

#include#include "rigtorp/SPSCQueue.h"templatestruct HugePageAllocator {using value_type = T;struct AllocationResult {T* ptr;size_t count;// 向上对齐到 2MB(Huge Page 大小)size_t roundup(size_t n) {return (((n - 1) >> 21) + 1) << 21;// C++17 P0401R3 size feedback 接口AllocationResult allocate_at_least(size_t n) {size_t count = roundup(sizeof(T) * n);auto p = static_cast(mmap(nullptr, count,PROT_READ | PROT_WRITE,MAP_PRIVATE | MAP_ANONYMOUS | MAP_HUGETLB,-1, 0if (p == MAP_FAILED) throw std::bad_alloc();return {p, count / sizeof(T)};void deallocate(T* p, size_t n) {munmap(p, roundup(sizeof(T) * n));int main() {// 使用大页内存的百万容量队列rigtorp::SPSCQueue> q(1'000'000, HugePageAllocator{}q.push(1);auto* p = q.front();if (p) {std::cout << *p << std::endl;q.pop();
注意:使用 MAP_HUGETLB 前需确保系统配置了足够的大页,可通过 echo 512 > /proc/sys/vm/nr_hugepages 配置(需 root 权限)。
六、应用场景与完整代码示例6.1 场景一:音视频帧传递(采集线程 → 编码线程)

音视频场景中,采集线程以固定帧率产生帧,编码线程消费帧。要求:零锁、低延迟、不丢帧。

#include#include#include#include#include#include "rigtorp/SPSCQueue.h"struct VideoFrame {uint64_t timestamp_us;std::vector data; // 实际项目建议用 buffer poolstd::atomic g_running{true};void capture_thread(rigtorp::SPSCQueue& queue) {uint64_t frame_id = 0;while (g_running.load(std::memory_order_relaxed)) {VideoFrame frame;frame.timestamp_us = frame_id++ * 33333; // ~30fpsframe.data.assign(1920 * 1080 * 3, 0);  // 模拟 RGB 数据// 非阻塞推送:队列满则丢弃(或降帧率)if (!queue.try_push(std::move(frame))) {std::cerr << "[capture] queue full, frame dropped\n";std::this_thread::sleep_for(std::chrono::microseconds(33333));void encode_thread(rigtorp::SPSCQueue& queue) {while (g_running.load(std::memory_order_relaxed)) {VideoFrame* frame = queue.front();if (frame == nullptr) {// 队列为空,短暂 yield 避免空转浪费 CPUstd::this_thread::yield();continue;// 编码处理(此处省略实际编码逻辑)std::cout << "[encode] processing frame ts=" << frame->timestamp_us << "us\n";queue.pop();int main() {rigtorp::SPSCQueue queue(16); // 缓冲 16 帧auto cap = std::thread(capture_thread, std::ref(queue));auto enc = std::thread(encode_thread, std::ref(queue));std::this_thread::sleep_for(std::chrono::seconds(1));g_running = false;cap.join();enc.join();
6.2 场景二:量化交易行情分发(网络线程 → 策略线程)

量化交易对延迟极其敏感,行情数据从网络收到后需要以最快速度传递给策略引擎,任何锁都可能带来不可接受的延迟抖动。

#include#include#include#include#include "rigtorp/SPSCQueue.h"struct Tick {uint64_t recv_ns;       // 接收时间戳(纳秒)uint32_t instrument_id; // 合约 IDdouble   bid;double   ask;double   last;constexpr size_t QUEUE_DEPTH = 4096; // 必须能覆盖策略计算的最大耗时std::atomic g_stop{false};// 模拟网络接收线程void market_data_receiver(rigtorp::SPSCQueue& queue) {uint32_t seq = 0;while (!g_stop) {Tick tick{};tick.recv_ns      = static_cast(std::chrono::steady_clock::now().time_since_epoch().count());tick.instrument_id = 1001;tick.bid  = 100.0 + (seq % 10) * 0.01;tick.ask  = tick.bid + 0.01;tick.last = tick.bid;seq++;// 行情场景中一般不接受阻塞,满了就丢(策略可以从快照恢复)queue.try_push(tick);std::this_thread::sleep_for(std::chrono::microseconds(100)); // 模拟 10000 tick/s// 策略引擎线程void strategy_engine(rigtorp::SPSCQueue& queue) {uint64_t processed = 0;while (!g_stop) {Tick* t = queue.front();if (!t) {// 忙等或 pause 指令(实盘通常用 _mm_pause())asm volatile("pause" ::: "memory");continue;uint64_t now = static_cast(std::chrono::steady_clock::now().time_since_epoch().count());uint64_t latency_ns = now - t->recv_ns;// 策略逻辑(此处仅打印延迟)if (++processed % 1000 == 0) {std::cout << "[strategy] latency=" << latency_ns<< "ns, bid=" << t->bid << "\n";queue.pop();int main() {rigtorp::SPSCQueue queue(QUEUE_DEPTH);auto recv = std::thread(market_data_receiver, std::ref(queue));auto strat = std::thread(strategy_engine, std::ref(queue));std::this_thread::sleep_for(std::chrono::seconds(3));g_stop = true;recv.join();strat.join();
6.3 场景三:游戏引擎主线程 ↔ 渲染线程解耦

游戏引擎通常将逻辑更新(主线程)和渲染(渲染线程)分开运行。SPSCQueue 可用于传递渲染命令,避免锁带来的帧率抖动。

#include#include#include#include#include#include "rigtorp/SPSCQueue.h"// 渲染命令(使用 variant 实现多态,避免虚函数开销)struct DrawMesh      { uint32_t mesh_id; float x, y, z; };struct UpdateTexture { uint32_t tex_id; std::vector data; };struct SwapBuffers   {};using RenderCmd = std::variant;std::atomic g_quit{false};void game_logic_thread(rigtorp::SPSCQueue& cmds) {uint32_t frame = 0;while (!g_quit) {// 游戏逻辑更新...// 提交渲染命令cmds.emplace(DrawMesh{1001, 0.f, 0.f, 0.f});cmds.emplace(DrawMesh{1002, 1.f, 0.f, 0.f});cmds.emplace(SwapBuffers{});frame++;if (frame % 60 == 0) {std::cout << "[game] submitted frame " << frame << "\n";std::this_thread::sleep_for(std::chrono::milliseconds(16)); // ~60fpscmds.emplace(SwapBuffers{}); // 最后一帧信号void render_thread(rigtorp::SPSCQueue& cmds) {while (!g_quit) {RenderCmd* cmd = cmds.front();if (!cmd) {std::this_thread::yield();continue;std::visit([](auto&& c) {using T = std::decay_t;if constexpr (std::is_same_v) {// 调用 GPU 绘制 API} else if constexpr (std::is_same_v) {// 上传纹理} else if constexpr (std::is_same_v) {// 交换缓冲区}, *cmd);cmds.pop();int main() {rigtorp::SPSCQueue render_queue(1024);auto game   = std::thread(game_logic_thread, std::ref(render_queue));auto render = std::thread(render_thread, std::ref(render_queue));std::this_thread::sleep_for(std::chrono::seconds(2));g_quit = true;game.join();render.join();
6.4 场景四:日志系统(业务线程 → 写盘线程,零停顿)

业务线程打日志不能因 I/O 阻塞而暂停,SPSCQueue 可做异步日志缓冲:

#include#include#include#include#include#include "rigtorp/SPSCQueue.h"struct LogEntry {uint64_t    ts_ns;int         level;   // 0=DEBUG 1=INFO 2=WARN 3=ERRORchar        msg[256];rigtorp::SPSCQueue g_log_queue(8192);std::atomic g_log_stop{false};// 业务线程调用此函数(无锁,纳秒级开销)void log(int level, const char* msg) {LogEntry entry{};entry.ts_ns = static_cast(std::chrono::steady_clock::now().time_since_epoch().count());entry.level = level;snprintf(entry.msg, sizeof(entry.msg), "%s", msg);// 满了也不阻塞,丢弃(可加 dropped counter)g_log_queue.try_push(entry);// 独立写盘线程(单消费者)void log_writer_thread() {std::ofstream file("app.log");const char* levels[] = {"DEBUG", "INFO", "WARN", "ERROR"};while (!g_log_stop || !g_log_queue.empty()) {LogEntry* e = g_log_queue.front();if (!e) {std::this_thread::sleep_for(std::chrono::microseconds(100));continue;file << e->ts_ns << " [" << levels[e->level] << "] " << e->msg << "\n";g_log_queue.pop();file.flush();int main() {auto writer = std::thread(log_writer_thread);// 业务代码中直接调用 log()for (int i = 0; i < 10000; i++) {log(1, "processing request");log(1, "done");g_log_stop = true;writer.join();std::cout << "All logs flushed.\n";
七、使用注意事项与常见坑7.1 严格遵守单写单读约束

SPSCQueue 不提供任何运行期检查,如果多个线程同时调用 push 或多个线程同时调用 pop,将产生数据竞争(data race),触发undefined behavior

// ❌ 错误:两个生产者线程同时 pushstd::thread t1([&]{ q.push(1); });std::thread t2([&]{ q.push(2); }); // 严重错误!// ✅ 正确:生产者和消费者各一个线程std::thread producer([&]{ q.push(1); });std::thread consumer([&]{ while(!q.front()); q.pop(); });
7.2 pop() 前必须确认 front() 非空

// ❌ 错误:未检查就 popq.pop(); // 如果队列为空,行为未定义// ✅ 正确:先 front() 再 pop()if (auto* p = q.front()) {// 使用 *pq.pop();
7.3 T 的析构函数必须 noexcept

pop() 要求 std::is_nothrow_destructible::value == true。标准类型和大多数 POD 类型满足此要求,但若 T 的析构函数可能抛异常,编译时会报错。

7.4 阻塞式 push/emplace 的忙等开销

push() 和 emplace() 在队列满时自旋等待,不是 sleep。在实时场景中这通常是期望行为,但在容量规划不当时会导致 CPU 空转浪费。生产环境推荐使用 try_push / try_emplace 配合业务降级逻辑。

7.5 size() 返回值的时效性

size_t n = q.size();// n 只是某一瞬间的快照,下一行已经可能变化// 不要用 size() 做业务逻辑判断,用 front()/try_push() 返回值
八、社区与生态8.1 学术引用

SPSCQueue 已被学术论文引用:

Peizhao Ou and Brian Demsky. 2018. Towards understanding the costs of avoiding out-of-thin-air results. Proc. ACM Program. Lang. 2, OOPSLA, Article 136. DOI: 10.1145/3276506
8.2 作者其他高性能库

Erik Rigtorp 还维护了一系列无锁高性能库,与 SPSCQueue 形成生态:

库名

功能

MPMCQueue

多生产者多消费者无锁队列

HashMap

高性能开放寻址哈希表

Seqlock

序列锁实现

Spinlock

高性能自旋锁

8.3 类似方案横向对比

方案

优点

缺点

SPSCQueue

最快 SPSC、Header-Only、MIT

仅限单写单读

boost::lockfree::spsc

Boost 生态、久经考验

比 SPSCQueue 慢 ~40%,需要 Boost 依赖

folly::ProducerConsumerQueue

Facebook 出品、MPMC 兼顾

吞吐量更低,需要 folly 整体依赖

moodycamel::ReaderWriterQueue

SPSC,功能接近

内存使用模式不同,适合动态场景

8.4 编译器与平台支持

平台

状态

Linux x86-64 (GCC/Clang)

✅ 完整支持

macOS arm64/x86 (Clang)

✅ 完整支持

Windows MSVC

✅ 支持(需 C++17)

ARM / Embedded

⚠️ 需确认 Cache Line 大小配置正确

九、总结

SPSCQueue 是高性能 C++ 开发者工具箱中一颗被低估的明珠。它的设计哲学代表了"极简约束换极致性能"的思路——严格限定 1 写 1 读,换来了 wait-free 保证、false sharing 消除、Cache 友好的本地索引缓存,以及最终 362,723 ops/ms 的惊人吞吐。

适合引入 SPSCQueue 的场景特征:

  • 数据流方向固定(一个源头,一个终点)
  • 对延迟和吞吐量有极高要求
  • 不能接受锁、条件变量带来的延迟抖动
  • 典型场景:音视频流水线、量化行情分发、游戏引擎渲染命令队列、零停顿异步日志

如果你的系统中有多个生产者或多个消费者,可以考虑同作者的 MPMCQueue,设计理念一脉相承。

一个头文件,一千颗星,无锁极速——SPSCQueue 值得放进你的项目工具箱。

参考资料 SPSCQueue GitHub 仓库 Intel: Avoiding and Identifying False Sharing Wikipedia: Ring buffer Wikipedia: False sharing C++ Allocator P0401R3