- Cache损坏(Cache trashing)
线程间频繁切换的时候会导致 Cache 中数据的丢失,Cache中的数据会失效,因为它缓存的是将被换出任务的数据,这些数据对于新换进的任务是没⽤的。处理器的运⾏速度⽐主存快N倍,所以⼤量的处理器时间被浪费在处理器与主存的数据传输上。这就是在处理器和主存之间引⼊Cache的原因。Cache是⼀种速度更快但容量更⼩的内存(也更加昂贵),当处理器要访问主存中的数据时,这些数据⾸先被拷⻉到Cache中,因为这些数据在不久的将来可能⼜会被处理器访问。Cache misses对性能有⾮常⼤的影响,因为处理器访问Cache中的数据将⽐直接访问主存快得多。在保存和恢复上下⽂的过程中还隐藏了额外的开销。
- 在同步机制上争抢队列
⾮阻塞机制⼤展伸⼿的机会到了。任务之间不争抢任何资源,在队列中预定⼀个位置,然后在这个位置上插⼊或提取数据。这中机制使⽤了⼀种被称之为CAS(⽐较和交换)的特殊操作,这个特殊操作是⼀种特殊的指令,它可以原⼦的完成以下操作:它需要3个操作数m,A,B,其中m是⼀个内存地址,操作将m指向的内存中的内容与A⽐较,如果相等则将B写⼊到m指向的内存中并返回true,如果不相等则什么也不做返回false。简而言之非阻塞的机制使用了 CAS 的特殊操作,使得任务之间可以不争抢任何资源,然后在队列中预定的位置上,插入或者提取数据。CAS底层实现
- 多线程动态内存分配malloc性能下降
template<typename T>
class atomic_ptr_t {
void set(T *ptr_); //⾮原⼦操作
T *xchg(T *val_); //原⼦操作,设置⼀个新的值,然后返回旧的值
T *cas(T *cmp_, T *val_);//原⼦操作
volatile T *ptr;
- set函数,把私有成员ptr指针设置成参数ptr_的值,不是⼀个原⼦操作,需要使⽤者确保执⾏set过程没有其他线程使⽤ptr的值。
- xchg函数,把私有成员ptr指针设置成参数val_的值,并返回ptr设置之前的值。原⼦操作,线程安全。
- cas函数,原⼦操作,线程安全,把私有成员ptr指针与参数cmp_指针⽐较:如果相等返回ptr设置之前的值,并把ptr更新为参数val_的值,如果不相等直接返回ptr值。
首先我们需要考虑元素的分配,元素存在哪里?yqueue 中的数据结构使用的 chunk 块机制,每次批量分配一批元素,这样可以减少内存的分配和释放yqueue_t内部由⼀个⼀个chunk组成,每个chunk保存N个元素:spare_chunk⾥⾯,当再次需要分配chunk_t的时候从spare_chunk中获取。
struct chunk_t {
T values[N]; //每个chunk_t可以容纳N个T类型的元素,以后就以一个chunk_t为单位申请内存
chunk_t *prev;
chunk_t *next;
在yqueue_t类中有一个spare_chunk用于保存最近的空闲块 。也就是说,在将一个chunk中的所有元素都pop掉了,那么我们可以free这个chunk。但是我们可以保存一块最近的空闲块,以后如果chunk不够用时,扩容chunk就不用malloc,直接复用该spare_chunk即可。根据局部性原理,这个spare_chunk的地址或者内存页很有可能还在cache里,那么这样的机制就可以减少一次malloc以及存入cache的操作。
// class yqueue_t
// People are likely to produce and consume at similar rates. In
// this scenario holding onto the most recently freed chunk saves
// us from having to call malloc/free.
atomic_ptr_t<chunk_t> spare_chunk; //空闲块(把所有元素都已经出队的块称为空闲块),读写线程的共享变量
// Removes an element from the front end of the queue.
inline void pop() {
if (++begin_pos == N) // 删除满一个chunk才回收chunk
chunk_t *o = begin_chunk;
begin_chunk = begin_chunk->next;
begin_chunk->prev = NULL;
begin_pos = 0;
// 'o' has been more recently used than spare_chunk,
// so for cache reasons we'll get rid of the spare and
// use 'o' as the spare.
chunk_t *cs = spare_chunk.xchg(o); //由于局部性原理,总是保存最新的空闲块而释放先前的空闲快
// Adds an element to the back end of the queue.
inline void push() {
back_chunk = end_chunk;
back_pos = end_pos; //
if (++end_pos != N) //end_pos!=N表明这个chunk节点还没有满
chunk_t *sc = spare_chunk.xchg(NULL); // 为什么设置为NULL? 因为如果把之前值取出来了则没有spare chunk了,所以设置为NULL
if (sc) // 如果有spare chunk则继续复用它
end_chunk->next = sc;
sc->prev = end_chunk;
else // 没有则重新分配
// static int s_cout = 0;
// printf("s_cout:%d\n", ++s_cout);
end_chunk->next = (chunk_t *) malloc(sizeof(chunk_t)); // 分配一个chunk
end_chunk->next->prev = end_chunk;
end_chunk = end_chunk->next;
end_pos = 0;
// T is the type of the object in the queue.队列中元素的类型
// N is granularity(粒度) of the queue,简单来说就是chunk_t ⼀个结点可以装载N个T类型的元素
template<typename T, int N>
class yqueue_t {
inline yqueue_t();// 创建队列.
inline ~yqueue_t();// 销毁队列.
inline T &front();// Returns reference to the front element of the queue. If the queue is empty, behaviour is undefined.
inline T &back();// Returns reference to the back element of the queue.If the queue is empty, behaviour is undefined.
inline void push();// Adds an element to the back end of the queue.
inline void pop();// Removes an element from the front of the queue.
inline void unpush()// Removes element from the back end of the queue。 回滚时使⽤
// Individual memory chunk to hold N elements.
struct chunk_t {
T values[N];
chunk_t *prev;
chunk_t *next;
chunk_t *begin_chunk;
int begin_pos;
chunk_t *back_chunk;
int back_pos;
chunk_t *end_chunk;
int end_pos;
atomic_ptr_t<chunk_t> spare_chunk; //空闲块(我把所有元素都已经出队的块称为空闲块),读写线程的共享变量
2.4.1 begin/back/end_chunk 与 begin/back/end_pos 成员介绍
chunk_t *begin_chunk;
int begin_pos;
chunk_t *back_chunk;
int back_pos;
chunk_t *end_chunk;
int end_pos;
- begin_chunk/begin_pos:begin_chunk用于指向队列的第一个chunk,begin_pos用于指向第一个chunk的第一个元素的索引位置,因为pop(),所以第一个元素不可能永远是0,会随着pop而改变。同理第一个chunk也会被回收,也需要记录第一个chunk的位置。
- back_chunk/back_pos:begin_chunk用于指向队列的最后一个chunk,back_pos用于指向最后一个chunk的最后一个元素的索引位置。
- end_chunk/end_pos:在最后一个chunk未满的情况下,end_chunk和back_chunk是相同的,back_pos的下一个就是end_pos。在最后一个chunk满的情况下,end_chunk指向新分配的chunk,end_pos=0。也就是说end_chunk和end_pos是辅助back_chunk/back_pos的,可以理解为探测。

2.4.2 函数介绍
// 创建队列.
inline yqueue_t() {
begin_chunk = (chunk_t *) malloc(sizeof(chunk_t));
begin_pos = 0;
back_chunk = NULL; //back_chunk总是指向队列中最后一个元素所在的chunk,现在还没有元素,所以初始为空
back_pos = 0;
end_chunk = begin_chunk; //end_chunk总是指向链表的最后一个chunk
end_pos = 0;

// 销毁队列.
inline ~yqueue_t() {
while (true) {
if (begin_chunk == end_chunk) {
chunk_t *o = begin_chunk;
begin_chunk = begin_chunk->next;
chunk_t *sc = spare_chunk.xchg(NULL);
- begin_chunk->values[begin_pos]代表队列头可读元素, 读取队列头元素即是读取begin_pos位置的元素;
- back_chunk->values[back_pos]代表队列尾可写元素,写⼊元素时则是更新back_pos位置的元素,要确保元素真正⽣效,还需要调⽤push函数更新back_pos的位置,避免下次更新的时候⼜是更新当前back_pos位置对应的元素。
// Returns reference to the front element of the queue.
// If the queue is empty, behaviour is undefined.
// 返回队列头部元素的引用,调用者可以通过该引用更新元素,结合pop实现出队列操作。
inline T &front() // 返回的是引用,是个左值,调用者可以通过其修改容器的值
return begin_chunk->values[begin_pos];
// Returns reference to the back element of the queue.
// If the queue is empty, behaviour is undefined.
// 返回队列尾部元素的引用,调用者可以通过该引用更新元素,结合push实现插入操作。
// 如果队列为空,该函数是不允许被调用。
inline T &back() // 返回的是引用,是个左值,调用者可以通过其修改容器的值
return back_chunk->values[back_pos];
- 当++end_pos != N 时,说明当前的chunk还有空余位置可以插入,则不需要扩容
- 当++end_pos == N时,说明当前的chunk已经插入满了,下一次插入就要插入到新的chunk了,所以需要发生扩容
// Adds an element to the back end of the queue.
inline void push() {
back_chunk = end_chunk;
back_pos = end_pos; //
if (++end_pos != N) //end_pos!=N表明这个chunk节点还没有满
chunk_t *sc = spare_chunk.xchg(NULL); // 为什么设置为NULL? 因为如果把之前值取出来了则没有spare chunk了,所以设置为NULL
if (sc) // 如果有spare chunk则继续复用它
end_chunk->next = sc;
sc->prev = end_chunk;
else // 没有则重新分配
// static int s_cout = 0;
// printf("s_cout:%d\n", ++s_cout);
end_chunk->next = (chunk_t *) malloc(sizeof(chunk_t)); // 分配一个chunk
end_chunk->next->prev = end_chunk;
end_chunk = end_chunk->next;
end_pos = 0;
// Removes element from the back end of the queue. In other words
// it rollbacks last push to the queue. Take care: Caller is
// responsible for destroying the object being unpushed.
// The caller must also guarantee that the queue isn't empty when
// unpush is called. It cannot be done automatically as the read
// side of the queue can be managed by different, completely
// unsynchronised thread.
// 必须要保证队列不为空,参考ypipe_t的uwrite
inline void unpush() {
// First, move 'back' one position backwards.
if (back_pos) // 从尾部删除元素
else {
back_pos = N - 1; // 回退到前一个chunk
back_chunk = back_chunk->prev;
// Now, move 'end' position backwards. Note that obsolete end chunk
// is not used as a spare chunk. The analysis shows that doing so
// would require free and atomic operation per chunk deallocated
// instead of a simple free.
if (end_pos) // 意味着当前的chunk还有其他元素占有
else {
end_pos = N - 1; // 当前chunk没有元素占用,则需要将整个chunk释放
end_chunk = end_chunk->prev;
end_chunk->next = NULL;
- ++begin_pos != N,说明当前chunk还有元素没被取出,该chunk还要继续被使⽤;
- ++end_pos == N,说明该chunk的所有元素已经被取出,所以该chunk要被回收。把最后回收的chunk保存到spare_chunk,然后释放之前spare_chunk保存的chunk。
- pop掉的元素,其销毁⼯作交给调⽤者完成,即是pop前调⽤者需要通过front()接⼝读取并进⾏销毁
- 空闲块的保存,要求是原⼦操作。因为闲块是读写线程的共享变量,因为在push中也使⽤了spare_chunk。
// Removes an element from the front end of the queue.
inline void pop() {
if (++begin_pos == N) // 删除满一个chunk才回收chunk
chunk_t *o = begin_chunk;
begin_chunk = begin_chunk->next;
begin_chunk->prev = NULL;
begin_pos = 0;
// 'o' has been more recently used than spare_chunk,
// so for cache reasons we'll get rid of the spare and
// use 'o' as the spare.
chunk_t *cs = spare_chunk.xchg(o); //由于局部性原理,总是保存最新的空闲块而释放先前的空闲快
yqueue 负责元素内存的分配与释放,入队以及出队列;ypipe 负责 yqueue 读写指针的变化。ypipe_t在yqueue_t的基础上构建⼀个单写单读的⽆锁队列
template<typename T, int N>
class ypipe_t {
// Initialises the pipe.
inline ypipe_t();
// The destructor doesn't have to be virtual. It is mad virtual
// just to keep ICC and code checking tools from complaining.
inline virtual ~ypipe_t();
// Write an item to the pipe. Don't flush it yet. If incomplete is
// set to true the item is assumed to be continued by items
// subsequently written to the pipe. Incomplete items are never flushed down the stream.
// 写⼊数据,incomplete参数表示写⼊是否还没完成,在没完成的时候不会修改flush指针,即这部分数据不会让读线程看到。
inline void write(const T &value_, bool incomplete_);
// Pop an incomplete item from the pipe. Returns true is such
// item exists, false otherwise.
inline bool unwrite(T *value_);
// Flush all the completed items into the pipe. Returns false if
// the reader thread is sleeping. In that case, caller is obliged to
// wake the reader up before using the pipe again.
// 刷新所有已经完成的数据到管道,返回false意味着读线程在休眠,在这种情况下调⽤者需要唤醒读线程。
inline bool flush();
// Check whether item is available for reading.
// 这⾥⾯有两个点,⼀个是检查是否有数据可读,⼀个是预取
inline bool check_read();
// Reads an item from the pipe. Returns false if there is no value.
// available.
inline bool read(T *value_);
// Applies the function fn to the first elemenent in the pipe
// and returns the value returned by the fn.
// The pipe mustn't be empty or the function crashes.
inline bool probe(bool (*fn)(T &));
// Allocation-efficient queue to store pipe items.
// Front of the queue points to the first prefetched item, back of
// the pipe points to last un-flushed item. Front is used only by
// reader thread, while back is used only by writer thread.
yqueue_t<T, N> queue;
// Points to the first un-flushed item. This variable is used
// exclusively by writer thread.
T *w;//指向第⼀个未刷新的元素,只被写线程使⽤
// Points to the first un-prefetched item. This variable is used
// exclusively by reader thread.
T *r;//指向第⼀个还没预提取的元素,只被读线程使⽤
// Points to the first item to be flushed in the future.
T *f;//指向下⼀轮要被刷新的⼀批元素中的第⼀个
// The single point of contention between writer and reader thread.
// Points past the last flushed item. If it is NULL,
// reader is asleep. This pointer should be always accessed using
// atomic operations.
atomic_ptr_t<T> c;//读写线程共享的指针,指向每⼀轮刷新的起点(看代码的时候会详细说)。当c为空时,表示读线程睡眠(只会在读线程中被设置为空)
// Disable copying of ypipe object.
ypipe_t(const ypipe_t &);
const ypipe_t &operator=(const ypipe_t &);
// Write an item to the pipe. Don't flush it yet. If incomplete is
// set to true the item is assumed to be continued by items
// subsequently written to the pipe. Incomplete items are never flushed down the stream.
// 写入数据,incomplete参数表示写入是否还没完成,在没完成的时候不会修改flush指针,即这部分数据不会让读线程看到。
inline void write(const T &value_, bool incomplete_) {
// Place the value to the queue, add new terminator element.
queue.back() = value_;
// Move the "flush up to here" poiter.
if (!incomplete_) {
f = &queue.back(); // 记录要刷新的位置
//1. 单次写
//2. 批量写
// Flush all the completed items into the pipe. Returns false if
// the reader thread is sleeping. In that case, caller is obliged to
// wake the reader up before using the pipe again.
// 刷新所有已经完成的数据到管道,返回false意味着读线程在休眠,在这种情况下调用者需要唤醒读线程。
// 批量刷新的机制, 写入批量后唤醒读线程;
// 反悔机制 unwrite
inline bool flush() {
// If there are no un-flushed items, do nothing.
if (w == f) // 不需要刷新,即是还没有新元素加入
return true;
// Try to set 'c' to 'f'.
// read时如果没有数据可以读取则c的值会被置为NULL
if (c.cas(w, f) != w) // 尝试将c设置为f,即是准备更新w的位置
// Compare-and-swap was unseccessful because 'c' is NULL.
// This means that the reader is asleep. Therefore we don't
// care about thread-safeness and update c in non-atomic
// manner. We'll return false to let the caller know
// that reader is sleeping.
c.set(f); // 更新为新的f位置
w = f;
return false; //线程看到flush返回false之后会发送一个消息给读线程,这需要写业务去做处理
else // 读端还有数据可读取
// Reader is alive. Nothing special to do now. Just move
// the 'first un-flushed item' pointer to 'f'.
w = f; // 更新f的位置
return true;
// Check whether item is available for reading.
// 这里面有两个点,一个是检查是否有数据可读,一个是预取
inline bool check_read() {
// Was the value prefetched already? If so, return.
if (&queue.front() != r && r) //判断是否在前几次调用read函数时已经预取数据了return true;
return true;
// There's no prefetched value, so let us prefetch more values.
// Prefetching is to simply retrieve the
// pointer from c in atomic fashion. If there are no
// items to prefetch, set c to NULL (using compare-and-swap).
// 两种情况
// 1. 如果c值和queue.front(), 返回c值并将c值置为NULL,此时没有数据可读
// 2. 如果c值和queue.front(), 返回c值,此时可能有数据度的去
r = c.cas(&queue.front(), NULL); //尝试预取数据
// If there are no elements prefetched, exit.
// During pipe's lifetime r should never be NULL, however,
// it can happen during pipe shutdown when items are being deallocated.
if (&queue.front() == r || !r) //判断是否成功预取数据
return false;
// There was at least one value prefetched.
return true;
// Reads an item from the pipe. Returns false if there is no value.
// available.
inline bool read(T *value_) {
// Try to prefetch a value.
if (!check_read())
return false;
// There was at least one value prefetched.
// Return it to the caller.
*value_ = queue.front();
return true;
if (yqueue.read(&value)) {
else {
// usleep(100);
std::unique_lock<std::mutex> lock(ypipe_mutex_);
// sched_yield();
yqueue.write(count, false);
if (!yqueue.flush()) {
// printf("notify_one\n");
std::unique_lock<std::mutex> lock(ypipe_mutex_);
- T *f:指向下一轮要被刷新的一批元素的第一个。
- T *w:指向第一个未刷新的元素,只被写线程使用;
- T *r:指向第一个没有被预提取的元素,只被读线程使用;
- atomic_ptr_t c:读写线程共享的指针,指向每⼀轮刷新的起点。当c为空时,表示读线程睡眠(只会在读线程中被设置为空)
- write():写⼊数据,incomplete参数表示写⼊是否还没完成,在没完成的时候不会修改flush指针,即这部分数据不会让读线程看到。完成后会将f = &queue.back();
- unwrite():在数据没有flush之前可以运⾏反悔 Pop an incomplete item from the pipe. Returns true is such item exists, false otherwise.
- bool flush():将write的元素真正刷新到队列,使读端可以访问对应的数据。返回false意味着读线程在休眠,在这种情况下调⽤者需要唤醒读线程。如果读端阻塞,则c=f;w=f;否则w=f;
- bool check_read():检测是否有数据可读,如果c==queue.front则c=NULL,否则r=c
- bool read (T *value_):读数据,将读出的数据写⼊value指针中,返回false意味着没有数据可读
在构造函数里面,下一轮要被刷新的元素的第一个(f),必然是第一个位置;第一个未刷新的元素(w),也是第一个位置;第一个没有被预读取的元素( r ),也是第一个位置;每一轮刷新的起点,也是第一个位置( c );
inline ypipe_t() {
// Insert terminator element into the queue.
queue.push(); //yqueue_t的尾指针加1,开始back_chunk为空,现在back_chunk指向第一个chunk_t块的第一个位置
// Let all the pointers to point to the terminator.
// (unless pipe is dead, in which case c is set to NULL).
r = w = f = &queue.back(); //就是让r、w、f、c四个指针都指向这个end迭代器
写入函数write(const T &value_, bool incomplete_)
第二个参数决定是否要刷新一批元素,false时,刷新一批元素,那么下一轮要被刷新的元素的第一个( f ) 就要改变了。
// Write an item to the pipe. Don't flush it yet. If incomplete is
// set to true the item is assumed to be continued by items
// subsequently written to the pipe. Incomplete items are never flushed down the stream.
// 写入数据,incomplete参数表示写入是否还没完成,在没完成的时候不会修改flush指针,即这部分数据不会让读线程看到。
inline void write(const T &value_, bool incomplete_) {
// Place the value to the queue, add new terminator element.
queue.back() = value_;
// Move the "flush up to here" poiter.
if (!incomplete_) {
f = &queue.back(); // 记录要刷新的位置
刷新元素使元素对读线程可见 bool flush()
还记得c吗?指向每一轮刷新的起点。如果c和w一样,则尝试将c置为f。刷新元素,指向第一个未刷新的元素( w ),那么必然w=f了。此时前面的元素都可以被读线程可见。
我们来看看什么情况下c != w。
// Try to set 'c' to 'f'.
// read时如果没有数据可以读取则c的值会被置为NULL
if (c.cas(w, f) != w) // 尝试将c设置为f,即是准备更新w的位置
// Compare-and-swap was unseccessful because 'c' is NULL.
// This means that the reader is asleep. Therefore we don't
// care about thread-safeness and update c in non-atomic
// manner. We'll return false to let the caller know
// that reader is sleeping.
c.set(f); // 更新为新的f位置
w = f;
return false; //线程看到flush返回false之后会发送一个消息给读线程,这需要写业务去做处理
else // 读端还有数据可读取
// Reader is alive. Nothing special to do now. Just move
// the 'first un-flushed item' pointer to 'f'.
w = f; // 更新f的位置
return true;
// Flush all the completed items into the pipe. Returns false if
// the reader thread is sleeping. In that case, caller is obliged to
// wake the reader up before using the pipe again.
// 刷新所有已经完成的数据到管道,返回false意味着读线程在休眠,在这种情况下调用者需要唤醒读线程。
// 批量刷新的机制, 写入批量后唤醒读线程;
// 反悔机制 unwrite
inline bool flush() {
// If there are no un-flushed items, do nothing.
if (w == f) // 不需要刷新,即是还没有新元素加入
return true;
// Try to set 'c' to 'f'.
// read时如果没有数据可以读取则c的值会被置为NULL
if (c.cas(w, f) != w) // 尝试将c设置为f,即是准备更新w的位置
// Compare-and-swap was unseccessful because 'c' is NULL.
// This means that the reader is asleep. Therefore we don't
// care about thread-safeness and update c in non-atomic
// manner. We'll return false to let the caller know
// that reader is sleeping.
c.set(f); // 更新为新的f位置
w = f;
return false; //线程看到flush返回false之后会发送一个消息给读线程,这需要写业务去做处理
else // 读端还有数据可读取
// Reader is alive. Nothing special to do now. Just move
// the 'first un-flushed item' pointer to 'f'.
w = f; // 更新f的位置
return true;
如果指针r指向的是队头元素(r==&queue.front())或者r没有指向任何元素(NULL)则说明队列中并没有可读的数据,这个时候check_read尝试去预取数据。所谓的预取就是令 r=c (cas函数就是返回c本身的值,看上⾯关于cas的实现), ⽽c在write中被指向f(⻅上图),这时从queue.front()到f这个位置的数据都被预取出来了,然后每次调⽤read都能取出⼀段。
值得注意的是,当c==&queue.front()时,代表数据被取完了,这时把c指向NULL,接着读线程会睡眠,这也是给写线程 检查 读线程是否睡眠的标志(c指向NULL)。
- r = c.cas(&queue.front(), NULL);执行之前,如果写端没有flush,那么c置为NULL,说明没有数据可读,返回false。
- r = c.cas(&queue.front(), NULL);执行之前,如果写端调用flush,那么c就不等于front(),则r返回了新的f值,最终返回true。
// Check whether item is available for reading.
// 这里面有两个点,一个是检查是否有数据可读,一个是预取
inline bool check_read() {
// Was the value prefetched already? If so, return.
if (&queue.front() != r && r) //判断是否在前几次调用read函数时已经预取数据了return true;
return true;
// There's no prefetched value, so let us prefetch more values.
// Prefetching is to simply retrieve the
// pointer from c in atomic fashion. If there are no
// items to prefetch, set c to NULL (using compare-and-swap).
// 两种情况
// 1. 如果c值和queue.front(), 返回c值并将c值置为NULL,此时没有数据可读
// 2. 如果c值和queue.front(), 返回c值,此时可能有数据度的去
r = c.cas(&queue.front(), NULL); //尝试预取数据
// If there are no elements prefetched, exit.
// During pipe's lifetime r should never be NULL, however,
// it can happen during pipe shutdown when items are being deallocated.
if (&queue.front() == r || !r) //判断是否成功预取数据
return false;
// There was at least one value prefetched.
return true;
// Reads an item from the pipe. Returns false if there is no value.
// available.
inline bool read(T *value_) {
// Try to prefetch a value.
if (!check_read())
return false;
// There was at least one value prefetched.
// Return it to the caller.
*value_ = queue.front();
return true;
- NULL:读线程设置,此时意味着已经没有数据可读,读线程在休眠。
- ⾮零:写线程设置,这⾥⼜区分两种情况:
- 旧值为_w的情况下,cas(_w,_f)操作修改为_f,意味着如果原先的值为_w,则原⼦性的修改为_f,表示有更多已被刷新的数据可读。
- 在旧值为NULL的情况下,此时读线程休眠,因此可以安全的设置为当前_f指针的位置。
- 写端yquque.write(count,false);将f = &queue.back();
- 写端yquque.flush();如果c==w,则c=f;w=f;否则w=f;
- 读端check_read();如果c==queue.front则c=NULL否则r更新为f。
- 一次写就提交,read失败就usleep
- 10次写才提交,read失败就yield
- flush失败就notify,read失败就wait
下面来看一看互斥锁队列 vs 互斥锁+条件变量队列 vs 内存屏障链表 vs RingBuffer CAS 实现。可以看到在一个写线程一个读线程的情况下,我们的ZMQ无锁队列是最快的。