生产者消费者模式下的并发无锁环形缓冲区


0、简单的说明

首先对环形缓冲区做下说明:

  1. 环形缓冲区使用改进的数组版本,缓冲区容量为2的幂
  2. 缓冲区满阻塞生产者,消费者进行消费后,缓冲区又有可用资源,由消费者唤醒生产者
  3. 缓冲区空阻塞消费者,生产者进程生产后,缓冲区又有可用资源,由生产者唤醒消费者

然后对涉及到的几个技术做下说明:

⑴CAS,Compare & Set,X86下对应的是CMPXCHG 汇编指令,原子操作,基本语义如下:

int CAS(address,old_value,new_value)
{
int ret = *addresss;
if(ret == old_value){
*address = new_value;
}
return ret;
}
为了方便调用者知道调用结果,通常被编译器改写成如下形式:

bool CAS(address,old_value,new_value)
{
int ret = *addresss;
if(ret == old_value){
*address = new_value;
return true;
}
return ret;
}

⑵sched_yield(),调用sched_yield可以使当前线程让出cpu,内核会把当前线程插入到线程优先级所对应的就绪队列,然后调度新的线程占有cpu,Stack Overflow上的解释

  1. sched_yield() causes the calling thread to relinquish the CPU. The thread is moved to the end of the queue for its static priority and a new thread gets to run.
  2. If the calling thread is the only thread in the highest priority list at that time, it will continue to run after a call to sched_yield().

1、无锁环形队列的实现

无锁队列的实现依托3个变量,in,out,max_out

in,被所有生产者共享,表示第一个可写入的位置,每次成功读取,值加1

out,被消费者共享,表示第一个课读取的位置,每次成功读取,值加1

max_out,生产者用于发布数据,[out,max_out]这一段表示消费者可消费的数据,max_out只能被生产者有序修改

out=max_out时,表示没有数据可消费

que_size,环形缓冲区大小,总是2的幂

in-out表示缓冲区中有多少数据

in-out = que_size,表示缓冲区满

template<typename ELEM_TYPE>
class queue{
public:
queue(int size);//把size上调至2的幂保存到que_size中
bool enqueue(const ELEM_TYPE& in_data);
bool dequeue(ELEM_TYPE& out_data );
...//其他成员
private:
ELEM_TYPE *arry;
int in;
int out;
int max_out;//用于读写线程同步
int que_size;
... //其他成员
}

template <typename ELEM_T>
bool queue<ELEM_T>::enqueue(const ELEM_T& in_data)
{
int cur_in ;
int cur_out;

do{
cur_in = in;
cur_out = out;

if(cur_in - cur_out == que_size){
return false;
}

}while(!CAS(&in,cur_in,cur_in+1))//如果cur_in==in依然成立,说明没有其他线程修改in,cur_in是可用的,并修改in;如果cur_in!=in,说明in值已被其他线程修改

arry[cur_in&(que_size-1)] = in_data;

while(!CAS(&max_out,cur_in,cur_in+1)){//发布数据
sched_yield();//发布数据失败,cur_in之前还有数据没有发布出去,让出cpu,让其他线程先执行
}

return true;
}

template <typename ELEM_T>
bool queue<ELEM_T,QUE_SIZE>::dequeue(ELEM_T& out_data)
{
int cur_out;
int cur_max_out;

do{
cur_out = out;
cur_max_out = max_out;

if(cur_out == cur_max_out){
return false;
}

out_data = arry[cur_out&(que_size-1)] ;//先尝试获取数据

}while(!CAS(&out,cur_out,cur_out+1))

return true;
}

enqueue操作:

23行到30行:

先预取可插入位置cur_in

然后判断缓冲区是否满,满了就不插入了,直接退出,返回false,表示缓冲区满

然后使用CAS(&in,cur_in,cur_in+1)判断该位置是否被使用过,在单线程中,25行到31行,cur_in和in永远相等,因为这之间根本就没有修过cur_in或in的语句,但是,在多线程中,in是所有生产者共享的,可能被其他线程在31行修改,所以,执行到31行时,要么cur_in<in,要么cur_in=in,CAS操作刚好可以判断相等的这个关系,如果相等,说明没有别的线程修改过in,也就是则预取的插入位置cur_in有效;如果不等,忙等待。

35行到37行:

这是生产者发布自己生产的数据,发布的方式就是把max_out指向当前线程的插入位置,同样,max_out可能会被其他线程修改,所以可能导致CAS失败,这时就不做忙等待了,而是让出cpu,仔细想一下,CAS失败的原因就是现在这个线程执行的太快了,导致这个线程插入位置cur_in之前还有数据没有发布出去,所以这个线程先让出cpu,先让没发布好数据的生产者先发布数据。

其实这样做会存在问题:如果线程A发布数据时,发现在他之前,还有没发布好数据的线程,假设为B,那么线程B在32行到34行之间挂掉之后,B之后的所有线程会一直处于忙等待的状态。我现在还不知道这个问题要怎么解决。

dequeue操作类似 

2、生产者消费者

 说明一下,下面18行、21行、31行、34行是伪码,看注释就知道其含义了。

queue<int> dataque(1024);//环形缓冲区

queue<int> asleep_producers(32);//缓冲区满时,生产者应该阻塞,该队列就是生产者等待队列
queue<int> asleep_consumers(32);//缓冲区空时,消费者应该阻塞,该队列就是消费者等待队列


void produce(){
if(!dataque.enqueue(in_data)){//生产的数据入队列

/* 数据入队列失败,把当前数据丢掉,或者保存到磁盘中等等*/
...
...

if(!asleep_producers.enqueue(this->gettid())){//把当前生产者线程加入等待队列
pthread_exit();//如果加入失败,说明已经达到了等待队列的最大值,那么线程主动退出
}

producer_poller.wait();//睡眠,等待信号的到来
}

wakeup_consumers(asleep_consumers);//生产了一个数据,应该从消费者等待队列中取一个线程消费数据,本质上可以发送一个数据可消费者的poller
}


void consume()
{
if(!dataque.dequeue(out_data)){

asleep_consumers.enqueue(this->gettid());//没能成功取得数据,把自己加入到消费者等待队列中

consumer_poller.wait();//睡眠,等待信号到来
}

wakeup_producers(asleep_producers);//消费了一个数据,应该从生产者等待队列中取一个线程生产数据,本质上可以发送一个数据给生产者的poller
}

关于producer_poller、consumer_poller、wait、wakeup的实现可以用下面的方案:

每个线程内含一个socketpair和select,select用于监听socketpair的读端。

生产者线程起来的时候先去尝试一次生产(调用produce函数),失败之后使用select监听消费者发过来的可生产信号,select成功监听到事件后,再去生产(调用produce)

消费者线程起来的时候先去尝试一次消费(调用consume函数),失败之后使用select监听生产者发过来的可消费信号,select成功监听到事件后,再去消费(调用consume)


原文连接地址:http://www.cnblogs.com/zengzy/p/5145899.html




智能推荐

注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
© 2014-2019 ITdaan.com 粤ICP备14056181号  

赞助商广告