没多少行代码,先给出最终实现:
template<typename T> class CAS { private: T *pt_; T old_val_; T new_val_; public: CAS(T *pt, T old_val, T new_val) { pt_ = pt; old_val_ = old_val; new_val_ = new_val; while (!__sync_bool_compare_and_swap(pt, old_val, new_val)); } ~CAS() { if (new_val_ != __sync_val_compare_and_swap(pt_, new_val_, old_val_)) { fprintf(stderr, "~CAS Error!\n"); } } }; template<typename T> class LockFreeQueue { private: T *queue_ptr_; int64_t mask_; int64_t __attribute((aligned (0x40))) head_; int64_t head_count_; int64_t head_pos_; int64_t __attribute((aligned (0x40))) tail_; int64_t tail_count_; int64_t tail_pos_; // head_count_ = |[head_pos_, tail_pos_)| // tail_count_ = |(tail_pos_, head_pos_)| public: LockFreeQueue(int64_t size) { if (size <= 1 || size & (size - 1) != 0) { fprintf(stderr, "LockFreeQueue Error!"); exit(1); } mask_ = size - 1; queue_ptr_ = new T[size]; head_ = 0; tail_ = 0; head_pos_ = 0; tail_pos_ = size - 1; head_count_ = (tail_pos_ - head_pos_) & mask_; tail_count_ = (head_pos_ - tail_pos_ - 1) & mask_; } // A full memory barrier around CAS should be guaranteed! // If queue is full return false, // otherwise return true. //ease::Mutex en, de; bool Enqueue(T el) { //ease::MutexLock lock(&en); CAS<int64_t> cas(&head_, 0, 1); if (head_count_ == 0) { head_count_ = (tail_pos_ - head_pos_) & mask_; if (head_count_ == 0) return false; } head_count_--; queue_ptr_[head_pos_] = el; __sync_synchronize(); head_pos_ = (head_pos_ + 1) & mask_; return true; } // If queue is emtpy return false, // otherwise return true. bool Dequeue(T &el) { //ease::MutexLock lock(&de); CAS<int64_t> cas(&tail_, 0x00, 0x01); if (tail_count_ == 0) { tail_count_ = (head_pos_ - tail_pos_ - 1) & mask_; if (tail_count_ == 0) return false; } tail_count_--; el = queue_ptr_[(tail_pos_ + 1) & mask_]; __sync_synchronize(); tail_pos_ = (tail_pos_ + 1) & mask_; return true; } };
这个实现的特点是两头分别是串行控制,但可以同步进行。
另外加入head_count_,tail_count_优化,可以减少两头信息同步带来的开销。
实现中几个注意的点:
1、
不大相关的变量组放在不同的cache line中,
这样可以避免它们所在的cache line作无谓的同步。
例如head_,tail_。
2、
一次调用中相关的,甚至是相互依赖的变量可以放在一个cache line中,
把数据集中起来更方便cpu处理,例如head_,head_pos_,head_count_。
3、
把常量性质的变量跟经常需要线程间同步的变量,放在不同的cache line中。
例如queue_ptr_, mask_,它们被读的时候,所在的cache line是不需要去同步的。
以上3点根据测试知是可以提升性能的。
4、
el = queue_ptr_[(tail_pos_ + 1) & mask_]; __sync_synchronize(); tail_pos_ = (tail_pos_ + 1) & mask_;
确保Enqueue线程看到数据出队操作是先于位置标志变化的,
不然标志变化先被Enqueue感知了,就可能导致原先的数据还没出队就被覆盖了。
5、
CAS的系统调用中会有一个full memory产生,这也是正确性的保证。
实例化一个无锁队列(LockFreeQueue<uint64_t> lfq(1 << 10);)
测试1000W个数据进出队列的所用的时间:
1 en-threads, 1 de-threads: [test0] time consume: 1393ms [test1] time consume: 1270ms [test2] time consume: 1374ms 2 en-threads, 2 de-threads: [test0] time consume: 1366ms [test1] time consume: 1557ms [test2] time consume: 1353ms 3 en-threads, 3 de-threads: [test0] time consume: 3899ms [test1] time consume: 4409ms [test2] time consume: 4614ms 4 en-threads, 4 de-threads: [test0] time consume: 8159ms [test1] time consume: 8664ms [test2] time consume: 11048ms 5 en-threads, 5 de-threads: [test0] time consume: 11903ms [test1] time consume: 11153ms [test2] time consume: 12082ms
不难看出随着线程数的增加耗时也大概成线性增加。
另外把CAS换成MutexLock,测试结果是:
1 en-threads, 1 de-threads: [test0] time consume: 2218ms [test1] time consume: 2100ms [test2] time consume: 2159ms 2 en-threads, 2 de-threads: [test0] time consume: 2305ms [test1] time consume: 2301ms [test2] time consume: 2338ms 3 en-threads, 3 de-threads: [test0] time consume: 2449ms [test1] time consume: 2631ms [test2] time consume: 2550ms 4 en-threads, 4 de-threads: [test0] time consume: 2526ms [test1] time consume: 2527ms [test2] time consume: 2524ms 5 en-threads, 5 de-threads: [test0] time consume: 2523ms [test1] time consume: 2550ms [test2] time consume: 2528ms
MutexLock随着线程数打增加,耗时增加没有CAS那么陡,相对比较平缓,
所以MutexLock开了很多线程仍然保持3000ms左右,当然开多了,耗时会越来越大。
可以得出的结论是当线程总数不大于cpu总核数的时候,无锁有一定的优势,
但是线程开多了,线程之间的cpu竞争将带来越来越多的开销。
这个实现比较简单,下来想一种两头都可以并行的做法。
相关推荐
前一段时间用到了多线程间的共享队列,然后就找到了一种简单实现方式用到项目中,后面在项目组中进行了简单的技术分享,形成了这个ppt。主要用到的是c++11的多线程+原子操作+内存模型方面的知识。
Enlfq 使用以下库的简单NIF无锁队列: moodycamel :: ConcurrentQueue C ++的工业强度无锁队列。 特点: 击倒你的袜子,快速的表现。 单头实现。 只需将其放入您的项目中即可。 完全线程安全的无锁队列。 从任何数量...
无锁在有关无锁队列的,最受支持的答案(jan / 20)是: 在过去的几年中,我对无锁数据结构进行了专门的研究。 我已经阅读了该领域的大多数论文(大约只有40篇,尽管只有10篇或15篇才是真正的用途:-) AFAIK,尚未...
这段时间在写一个基于PCIE实现FPGA与上位机通信,利用多线程实现读写同步,本次使用到了KFIFO无锁队列,实现了kfifo的简单测试。
阻塞队列 阻塞队列提供了一些简单,高性能,常规的安全...ConcurrentRingBuffer :由片支持的有界无锁队列 安装 go get - u github . com / theodesp / blockingQueues 用法 非阻塞API queue , _ := NewArrayBlock
先是简单的给大家介绍了什么是环形队列和环形队列的优点,然后通过实例代码给大家介绍C#如何实现环形队列,有需要的朋友们可以参考借鉴,下面来一起看看吧。
2、实现了RingBuffer无锁队列的高性能异步日志工具。3、实现了定时器事件。4、实现了事件按优先级处理。3、文件:kikilib:网络库的源代码。old_version:旧版本源码。example:一些示例,包含:echo: 回显服务器。...
分布式(多进程)架构,几行代码实现一个功能服务器的搭建 多线程设计,注解方式配置,轻松管理所有消息流 强大的RPC功能,调用远程RPC近似于调用本地函数,无需手工定义内部协议 支持插件功能,轻松实现功能插件 框架...
基于无锁 mpsc 队列的类实现 coroutine impl 从零开始,演示如何实现自己的协程 玩具 基于多继承的类元组类 对大文件中的数字进行排序(位集和除数) 用于存储任何可调用信号对象的函子映射,请参见 多部分/表单...
一个简单快速的持久无服务器队列。 显着特点: 接受来自多个线程和进程的并发写入,无需专用服务器进程。 持久消息,写入接近磁盘带宽速度。 内存速度下的高吞吐量。 将非常大的消息排入队列(受进程内存空间...
pstat-并行统计 pstat是一个并行stat命令,可以有效地收集指定目录中所有文件/子目录的...充分利用英特尔TBB无锁队列(如果有)以实现最佳性能,如果没有,则使用标准容器 CSV格式的输出 支持输出原始或人类可读的统
该工具是一种实验模型检查器,用于针对无限制数量的客户端线程验证具有安全内存回收(SMR)的无锁数据结构的线性化[1]。 该工具能够使用基于时期的回收[5]和危险指针[6]处理单链接的数据独立数据结构,例如Treiber...
│ 高并发编程第一阶段30讲、如何实现一个自己的显式锁Lock精讲下(让锁具备超时功能).mp4 │ 高并发编程第一阶段31讲、如何给你的应用程序注入钩子程序,Linux下演示.mp4 │ 高并发编程第一阶段32讲、如何捕获...
│ 高并发编程第一阶段30讲、如何实现一个自己的显式锁Lock精讲下(让锁具备超时功能).mp4 │ 高并发编程第一阶段31讲、如何给你的应用程序注入钩子程序,Linux下演示.mp4 │ 高并发编程第一阶段32讲、如何捕获...
一、实现了一个完整的跨进程异步任务队列,支持任务超时、出错重试及防御队列溢出 二、拦截并Hook系统层蓝牙Binder,实现对所有蓝牙设备通信的监控,当同时连接设备数过多时会自动断掉活跃度最低的设备 三、...
课程“ Linux Kernel Internals”项目的补充程序项目清单tpool:一个轻量级的线程池。 tinync:使用co互补程序的小型nc实现,用于课程“ Linux Kernel Internals”项目列表tpool:轻量级线程池。 tinync:使用协程的...
从官方意义上讲,它不是无锁的,只是通过无互斥量的原子RMW操作实现的。 入队/出队的成本为每项操作1 CAS。 没有摊销,只有1个CAS。 操作期间无动态内存分配/管理。 生产者和使用者是彼此分开的(如在两锁队列中)...
主进程和基准进程通过控制信息交换信号,并且数据通过无锁共享内存队列进行交互,以实现最低的资源消耗。 主要过程会定期收集每个过程。基准过程的网络统计信息将在简单总结之后输出到控制台。用法样本 ./tb -c 600...