`
splayx
  • 浏览: 82948 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

无锁队列一(简单实现)

    博客分类:
  • C++
 
阅读更多

 

没多少行代码,先给出最终实现:

template<typename T>                                                            
class CAS {                                                                     
private:                                                                        
    T *pt_;                                                                     
    T old_val_;                                                                 
    T new_val_;                                                                 
public:                                                                         
    CAS(T *pt, T old_val, T new_val) {                                          
        pt_ = pt;                                                               
        old_val_ = old_val;                                                     
        new_val_ = new_val;                                                     
        while (!__sync_bool_compare_and_swap(pt, old_val, new_val));            
    }                                                                           
    ~CAS() {                                                                    
        if (new_val_ != __sync_val_compare_and_swap(pt_, new_val_, old_val_)) { 
            fprintf(stderr, "~CAS Error!\n");                                   
        }                                                                       
    }                                                                           
};

template<typename T>                                                            
class LockFreeQueue {                                                           
private:                                                                        
    T *queue_ptr_;                                                              
    int64_t mask_;                                                              
    int64_t __attribute((aligned (0x40))) head_;                                
    int64_t head_count_;                                                        
    int64_t head_pos_;                                                          
    int64_t __attribute((aligned (0x40))) tail_;                                
    int64_t tail_count_;                                                        
    int64_t tail_pos_;                                                          
                                                                                
    // head_count_ = |[head_pos_, tail_pos_)|                                   
    // tail_count_ = |(tail_pos_, head_pos_)|                                   
                                                                                
public:                                                                         
    LockFreeQueue(int64_t size) {                                               
        if (size <= 1 || size & (size - 1) != 0) {                              
            fprintf(stderr, "LockFreeQueue Error!");                            
            exit(1);                                                            
        }                                                                       
        mask_ = size - 1;                                                       
        queue_ptr_ = new T[size];                                               
                                                                                
        head_ = 0;                                                              
        tail_ = 0;                                                              
                                                                                
        head_pos_ = 0;                                                          
        tail_pos_ = size - 1;                                                   
                                                                                
        head_count_ = (tail_pos_ - head_pos_) & mask_;
        tail_count_ = (head_pos_ - tail_pos_ - 1) & mask_;                      
    }                                                                           
                                                                                
    // A full memory barrier around CAS should be guaranteed!                   
                                                                                
    // If queue is full return false,                                           
    // otherwise return true.                                                   
                                                                                
    //ease::Mutex en, de;                                                       
                                                                                
    bool Enqueue(T el) {                                                        
        //ease::MutexLock lock(&en);                                            
        CAS<int64_t> cas(&head_, 0, 1);                                         
                                                                                
        if (head_count_ == 0) {                                                 
            head_count_ = (tail_pos_ - head_pos_) & mask_;                      
            if (head_count_ == 0) return false;                                 
        }                                                                       
        head_count_--;                                                          
                                                                                
        queue_ptr_[head_pos_] = el;                                             
        __sync_synchronize();                                                   
        head_pos_ = (head_pos_ + 1) & mask_;                                    
                                                                                
        return true;                                                            
    }

    // If queue is emtpy return false,                                          
    // otherwise return true.                                                   
    bool Dequeue(T &el) {                                                       
        //ease::MutexLock lock(&de);                                            
        CAS<int64_t> cas(&tail_, 0x00, 0x01);                                   
                                                                                
        if (tail_count_ == 0) {                                                 
            tail_count_ = (head_pos_ - tail_pos_ - 1) & mask_;                  
            if (tail_count_ == 0) return false;                                 
        }                                                                       
        tail_count_--;                                                          
                                                                                
        el = queue_ptr_[(tail_pos_ + 1) & mask_];                               
        __sync_synchronize();                                                   
        tail_pos_ = (tail_pos_ + 1) & mask_;                                    
                                                                                
        return true;                                                            
    }                                                                           
};

这个实现的特点是两头分别是串行控制,但可以同步进行。

另外加入head_count_,tail_count_优化,可以减少两头信息同步带来的开销。

 

实现中几个注意的点:

1、

不大相关的变量组放在不同的cache line中,

这样可以避免它们所在的cache line作无谓的同步。

例如head_,tail_。

2、

一次调用中相关的,甚至是相互依赖的变量可以放在一个cache line中,

把数据集中起来更方便cpu处理,例如head_,head_pos_,head_count_。

3、

把常量性质的变量跟经常需要线程间同步的变量,放在不同的cache line中。

例如queue_ptr_, mask_,它们被读的时候,所在的cache line是不需要去同步的。

 

以上3点根据测试知是可以提升性能的。

 

4、

el = queue_ptr_[(tail_pos_ + 1) & mask_];                               
__sync_synchronize();                                                   
tail_pos_ = (tail_pos_ + 1) & mask_;

确保Enqueue线程看到数据出队操作是先于位置标志变化的,

不然标志变化先被Enqueue感知了,就可能导致原先的数据还没出队就被覆盖了。

5、

CAS的系统调用中会有一个full memory产生,这也是正确性的保证。

 

实例化一个无锁队列(LockFreeQueue<uint64_t> lfq(1 << 10);)

测试1000W个数据进出队列的所用的时间:

1 en-threads, 1 de-threads:
[test0] time consume: 1393ms
[test1] time consume: 1270ms
[test2] time consume: 1374ms
2 en-threads, 2 de-threads:
[test0] time consume: 1366ms
[test1] time consume: 1557ms
[test2] time consume: 1353ms
3 en-threads, 3 de-threads:
[test0] time consume: 3899ms
[test1] time consume: 4409ms
[test2] time consume: 4614ms
4 en-threads, 4 de-threads:
[test0] time consume: 8159ms
[test1] time consume: 8664ms
[test2] time consume: 11048ms
5 en-threads, 5 de-threads:
[test0] time consume: 11903ms
[test1] time consume: 11153ms
[test2] time consume: 12082ms

不难看出随着线程数的增加耗时也大概成线性增加。

另外把CAS换成MutexLock,测试结果是:

1 en-threads, 1 de-threads:                                                     
[test0] time consume: 2218ms                                                    
[test1] time consume: 2100ms                                                    
[test2] time consume: 2159ms                                                    
2 en-threads, 2 de-threads:                                                     
[test0] time consume: 2305ms                                                    
[test1] time consume: 2301ms                                                    
[test2] time consume: 2338ms                                                    
3 en-threads, 3 de-threads:                                                     
[test0] time consume: 2449ms                                                    
[test1] time consume: 2631ms                                                    
[test2] time consume: 2550ms                                                    
4 en-threads, 4 de-threads:                                                     
[test0] time consume: 2526ms                                                    
[test1] time consume: 2527ms                                                    
[test2] time consume: 2524ms                                                    
5 en-threads, 5 de-threads:                                                     
[test0] time consume: 2523ms                                                    
[test1] time consume: 2550ms                                                    
[test2] time consume: 2528ms

MutexLock随着线程数打增加,耗时增加没有CAS那么陡,相对比较平缓,

所以MutexLock开了很多线程仍然保持3000ms左右,当然开多了,耗时会越来越大。

 

可以得出的结论是当线程总数不大于cpu总核数的时候,无锁有一定的优势,

但是线程开多了,线程之间的cpu竞争将带来越来越多的开销。

 

这个实现比较简单,下来想一种两头都可以并行的做法。

分享到:
评论

相关推荐

    c++11无锁队列的一种简单实现.pptx

    前一段时间用到了多线程间的共享队列,然后就找到了一种简单实现方式用到项目中,后面在项目组中进行了简单的技术分享,形成了这个ppt。主要用到的是c++11的多线程+原子操作+内存模型方面的知识。

    enlfq:Erlang NIF无锁队列

    Enlfq 使用以下库的简单NIF无锁队列: moodycamel :: ConcurrentQueue C ++的工业强度无锁队列。 特点: 击倒你的袜子,快速的表现。 单头实现。 只需将其放入您的项目中即可。 完全线程安全的无锁队列。 从任何数量...

    uniq:无锁(多读取器多写入器)循环缓冲队列

    无锁在有关无锁队列的,最受支持的答案(jan / 20)是: 在过去的几年中,我对无锁数据结构进行了专门的研究。 我已经阅读了该领域的大多数论文(大约只有40篇,尽管只有10篇或15篇才是真正的用途:-) AFAIK,尚未...

    KFIFO_Test.zip

    这段时间在写一个基于PCIE实现FPGA与上位机通信,利用多线程实现读写同步,本次使用到了KFIFO无锁队列,实现了kfifo的简单测试。

    blockingQueues:简单,高性能,goroutine安全队列,可用作资源池或作业队列

    阻塞队列 阻塞队列提供了一些简单,高性能,常规的安全...ConcurrentRingBuffer :由片支持的有界无锁队列 安装 go get - u github . com / theodesp / blockingQueues 用法 非阻塞API queue , _ := NewArrayBlock

    C#环形队列的实现方法详解

    先是简单的给大家介绍了什么是环形队列和环形队列的优点,然后通过实例代码给大家介绍C#如何实现环形队列,有需要的朋友们可以参考借鉴,下面来一起看看吧。

    kikilib:一个面向对象的轻量级高性能C++网络Linux系统库

    2、实现了RingBuffer无锁队列的高性能异步日志工具。3、实现了定时器事件。4、实现了事件按优先级处理。3、文件:kikilib:网络库的源代码。old_version:旧版本源码。example:一些示例,包含:echo: 回显服务器。...

    一款分布式的java游戏服务器框架,具备高性能、可伸缩、分布式、多线程等特点,java 8 +gradle 4.0

    分布式(多进程)架构,几行代码实现一个功能服务器的搭建 多线程设计,注解方式配置,轻松管理所有消息流 强大的RPC功能,调用远程RPC近似于调用本地函数,无需手工定义内部协议 支持插件功能,轻松实现功能插件 框架...

    tools:现代 C++ 工具包

    基于无锁 mpsc 队列的类实现 coroutine impl 从零开始,演示如何实现自己的协程 玩具 基于多继承的类元组类 对大文件中的数字进行排序(位集和除数) 用于存储任何可调用信号对象的函子映射,请参见 多部分/表单...

    Joqer:.NET 应用程序的持久无服务器队列

    一个简单快速的持久无服务器队列。 显着特点: 接受来自多个线程和进程的并发写入,无需专用服务器进程。 持久消息,写入接近磁盘带宽速度。 内存速度下的高吞吐量。 将非常大的消息排入队列(受进程内存空间...

    pstat:pstat-并行统计工具

    pstat-并行统计 pstat是一个并行stat命令,可以有效地收集指定目录中所有文件/子目录的...充分利用英特尔TBB无锁队列(如果有)以实现最佳性能,如果没有,则使用标准容器 CSV格式的输出 支持输出原始或人类可读的统

    TMRexp:使用安全内存回收的无锁数据结构的实验线性化检查器

    该工具是一种实验模型检查器,用于针对无限制数量的客户端线程验证具有安全内存回收(SMR)的无锁数据结构的线性化[1]。 该工具能够使用基于时期的回收[5]和危险指针[6]处理单链接的数据独立数据结构,例如Treiber...

    汪文君高并发编程实战视频资源全集

    │ 高并发编程第一阶段30讲、如何实现一个自己的显式锁Lock精讲下(让锁具备超时功能).mp4 │ 高并发编程第一阶段31讲、如何给你的应用程序注入钩子程序,Linux下演示.mp4 │ 高并发编程第一阶段32讲、如何捕获...

    汪文君高并发编程实战视频资源下载.txt

    │ 高并发编程第一阶段30讲、如何实现一个自己的显式锁Lock精讲下(让锁具备超时功能).mp4 │ 高并发编程第一阶段31讲、如何给你的应用程序注入钩子程序,Linux下演示.mp4 │ 高并发编程第一阶段32讲、如何捕获...

    Android蓝牙通信框架BluetoothKit.zip

    一、实现了一个完整的跨进程异步任务队列,支持任务超时、出错重试及防御队列溢出 二、拦截并Hook系统层蓝牙Binder,实现对所有蓝牙设备通信的监控,当同时连接设备数过多时会自动断掉活跃度最低的设备 三、...

    课程“ Linux Kernel Internals”的互补并发程序-C/C++开发

    课程“ Linux Kernel Internals”项目的补充程序项目清单tpool:一个轻量级的线程池。 tinync:使用co互补程序的小型nc实现,用于课程“ Linux Kernel Internals”项目列表tpool:轻量级线程池。 tinync:使用协程的...

    MPMCQueue.NET:受约束的多个生产者,多个使用者为.NET排队

    从官方意义上讲,它不是无锁的,只是通过无互斥量的原子RMW操作实现的。 入队/出队的成本为每项操作1 CAS。 没有摊销,只有1个CAS。 操作期间无动态内存分配/管理。 生产者和使用者是彼此分开的(如在两锁队列中)...

    TarsBenchmark:tarshttp服务的基准测试工具

    主进程和基准进程通过控制信息交换信号,并且数据通过无锁共享内存队列进行交互,以实现最低的资源消耗。 主要过程会定期收集每个过程。基准过程的网络统计信息将在简单总结之后输出到控制台。用法样本 ./tb -c 600...

Global site tag (gtag.js) - Google Analytics