Skip to content

Commit 068c67d

Browse files
committed
Add A lock-free MPMC ring buffer
1 parent de364e8 commit 068c67d

File tree

8 files changed

+461
-1
lines changed

8 files changed

+461
-1
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ tpool/tpool
2121
list-move/bench-lock
2222
list-move/bench-lockfree
2323
hp_list/list
24+
lfring/lfring
2425

2526
# external source files
2627
preempt_sched/list.h

README.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,11 @@ purpose of these programs is to be illustrative and educational.
1414
* [Producer–consumer problem](https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem)
1515
- [spmc](spmc/): A concurrent single-producer/multiple-consumer queue.
1616
- [mpsc](mpsc/): An unbounded lockless single-consumer/multiple-producer FIFO queue.
17-
- [mpmc](mpmc/): A multiple-producer/multiple-consumer (MPMC) queue.
17+
- [mpmc](mpmc/): A multiple-producer/multiple-consumer (MPMC) queue using Linux futex.
1818
- [channel](channel/): A Linux futex based Go channel implementation.
1919
* [Lock-Free](https://en.wikipedia.org/wiki/Non-blocking_algorithm) Data Structure
2020
- [ringbuffer](ringbuffer/): A lock-less ring buffer.
21+
- [lfring](lfring/): A lock-free multiple-producer/multiple-consumer (MPMC) ring buffer.
2122
- [ringbuf\_shm](ringbuf-shm/): An optimized lock-free ring buffer with shared memory.
2223
- [mbus](mbus/): A concurrent message bus.
2324
* [Synchronization](https://en.wikipedia.org/wiki/Synchronization_(computer_science))

lfring/Makefile

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
CC = gcc
2+
CFLAGS = -O2 -g -Wall -I.
3+
CFLAGS += -fsanitize=thread
4+
LDFLAGS = -fsanitize=thread
5+
6+
all: lfring
7+
8+
# Control the build verbosity
9+
ifeq ("$(VERBOSE)","1")
10+
Q :=
11+
VECHO = @true
12+
else
13+
Q := @
14+
VECHO = @printf
15+
endif
16+
17+
OBJS := lfring.o tests.o
18+
deps := $(OBJS:%.o=.%.o.d)
19+
20+
lfring: $(OBJS)
21+
$(VECHO) " LD\t$@\n"
22+
$(Q)$(CC) $(LDFLAGS) -o $@ $^
23+
24+
%.o: %.c
25+
@mkdir -p .$(DUT_DIR)
26+
$(VECHO) " CC\t$@\n"
27+
$(Q)$(CC) -o $@ $(CFLAGS) -c -MMD -MF .$@.d $<
28+
29+
clean:
30+
rm -f $(OBJS) $(deps) lfring
31+
rm -rf *.dSYM
32+
33+
-include $(deps)

lfring/arch.h

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
#pragma once
2+
3+
// Parameters for smp_fence()
4+
#define LoadStore 0x12
5+
#define StoreLoad 0x21
6+
7+
#if defined(__x86_64__)
8+
static inline void smp_fence(unsigned int mask)
9+
{
10+
if ((mask & StoreLoad) == StoreLoad) {
11+
__asm__ volatile("mfence" ::: "memory");
12+
} else if (mask != 0) {
13+
/* Any fence but StoreLoad */
14+
__asm__ volatile("" ::: "memory");
15+
}
16+
}
17+
#else
18+
#error "Unsupported architecture"
19+
#endif
20+
21+
#include "common.h"
22+
23+
#if defined(__x86_64__)
24+
union u128 {
25+
struct {
26+
uint64_t lo, hi;
27+
} s;
28+
__int128 ui;
29+
};
30+
31+
static inline bool lf_compare_exchange(register __int128 *var,
32+
__int128 *exp,
33+
__int128 neu)
34+
{
35+
union u128 cmp = {.ui = *exp}, with = {.ui = neu};
36+
bool ret;
37+
__asm__ __volatile__("lock cmpxchg16b %1\n\tsetz %0"
38+
: "=q"(ret), "+m"(*var), "+d"(cmp.s.hi), "+a"(cmp.s.lo)
39+
: "c"(with.s.hi), "b"(with.s.lo)
40+
: "cc", "memory");
41+
if (UNLIKELY(!ret))
42+
*exp = cmp.ui;
43+
return ret;
44+
}
45+
46+
#else
47+
#error "Unsupported architecture"
48+
#endif

lfring/common.h

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
#pragma once
2+
3+
#define CACHE_LINE 64 /* FIXME: should be configurable */
4+
5+
#define INIT_FUNCTION __attribute__((constructor))
6+
#define LIKELY(x) __builtin_expect(!!(x), 1)
7+
#define UNLIKELY(x) __builtin_expect(!!(x), 0)
8+
9+
#define ALIGNED(x) __attribute__((__aligned__(x)))
10+
#if __STDC_VERSION__ >= 201112L
11+
#define THREAD_LOCAL _Thread_local /* C11 */
12+
#else
13+
#define THREAD_LOCAL __thread /* GNU extension */
14+
#endif
15+
16+
#define ROUNDUP_POW2(x) \
17+
({ \
18+
unsigned long _x = (x); \
19+
_x > 1 ? (1UL << (__SIZEOF_LONG__ * __CHAR_BIT__ - \
20+
__builtin_clzl(_x - 1UL))) \
21+
: 1; \
22+
})
23+
24+
#define ROUNDUP(a, b) \
25+
({ \
26+
__typeof__(a) tmp_a = (a); \
27+
__typeof__(b) tmp_b = (b); \
28+
((tmp_a + tmp_b - 1) / tmp_b) * tmp_b; \
29+
})
30+
31+
#if __SIZEOF_POINTER__ == 4
32+
typedef unsigned long long ptrpair_t; /* assume 64 bits */
33+
#else /* __SIZEOF_POINTER__ == 8 */
34+
typedef __int128 ptrpair_t;
35+
#endif
36+
37+
#include <stdlib.h>
38+
39+
static inline void *osal_alloc(size_t size, size_t alignment)
40+
{
41+
return alignment > 1 ? aligned_alloc(alignment, ROUNDUP(size, alignment))
42+
: malloc(size);
43+
}
44+
45+
static inline void osal_free(void *ptr)
46+
{
47+
free(ptr);
48+
}

lfring/lfring.c

+221
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
#include <assert.h>
2+
#include <inttypes.h>
3+
#include <stdbool.h>
4+
#include <stdlib.h>
5+
6+
#include "arch.h"
7+
#include "common.h"
8+
#include "lfring.h"
9+
10+
#define SUPPORTED_FLAGS \
11+
(LFRING_FLAG_SP | LFRING_FLAG_MP | LFRING_FLAG_SC | LFRING_FLAG_MC)
12+
13+
#define MIN(a, b) \
14+
({ \
15+
__typeof__(a) tmp_a = (a); \
16+
__typeof__(b) tmp_b = (b); \
17+
tmp_a < tmp_b ? tmp_a : tmp_b; \
18+
})
19+
20+
typedef uintptr_t ringidx_t;
21+
struct element {
22+
void *ptr;
23+
uintptr_t idx;
24+
};
25+
26+
struct lfring {
27+
ringidx_t head;
28+
ringidx_t tail ALIGNED(CACHE_LINE);
29+
uint32_t mask;
30+
uint32_t flags;
31+
struct element ring[] ALIGNED(CACHE_LINE);
32+
} ALIGNED(CACHE_LINE);
33+
34+
lfring_t *lfring_alloc(uint32_t n_elems, uint32_t flags)
35+
{
36+
unsigned long ringsz = ROUNDUP_POW2(n_elems);
37+
if (n_elems == 0 || ringsz == 0 || ringsz > 0x80000000) {
38+
assert(0 && "invalid number of elements");
39+
return NULL;
40+
}
41+
if ((flags & ~SUPPORTED_FLAGS) != 0) {
42+
assert(0 && "invalid flags");
43+
return NULL;
44+
}
45+
46+
size_t nbytes = sizeof(lfring_t) + ringsz * sizeof(struct element);
47+
lfring_t *lfr = osal_alloc(nbytes, CACHE_LINE);
48+
if (!lfr)
49+
return NULL;
50+
51+
lfr->head = 0, lfr->tail = 0;
52+
lfr->mask = ringsz - 1;
53+
lfr->flags = flags;
54+
for (ringidx_t i = 0; i < ringsz; i++) {
55+
lfr->ring[i].ptr = NULL;
56+
lfr->ring[i].idx = i - ringsz;
57+
}
58+
return lfr;
59+
}
60+
61+
void lfring_free(lfring_t *lfr)
62+
{
63+
if (!lfr)
64+
return;
65+
66+
if (lfr->head != lfr->tail) {
67+
assert(0 && "ring buffer not empty");
68+
return;
69+
}
70+
osal_free(lfr);
71+
}
72+
73+
/* True if 'a' is before 'b' ('a' < 'b') in serial number arithmetic */
74+
static inline bool before(ringidx_t a, ringidx_t b)
75+
{
76+
return (intptr_t)(a - b) < 0;
77+
}
78+
79+
static inline ringidx_t cond_update(ringidx_t *loc, ringidx_t neu)
80+
{
81+
ringidx_t old = __atomic_load_n(loc, __ATOMIC_RELAXED);
82+
do {
83+
if (before(neu, old)) /* neu < old */
84+
return old;
85+
/* if neu > old, need to update *loc */
86+
} while (!__atomic_compare_exchange_n(loc, &old, /* Updated on failure */
87+
neu,
88+
/* weak */ true, __ATOMIC_RELEASE,
89+
__ATOMIC_RELAXED));
90+
return neu;
91+
}
92+
93+
static inline ringidx_t cond_reload(ringidx_t idx, const ringidx_t *loc)
94+
{
95+
ringidx_t fresh = __atomic_load_n(loc, __ATOMIC_RELAXED);
96+
if (before(idx, fresh)) { /* fresh is after idx, use this instead */
97+
idx = fresh;
98+
} else { /* Continue with next slot */
99+
idx++;
100+
}
101+
return idx;
102+
}
103+
104+
/* Enqueue elements at tail */
105+
uint32_t lfring_enqueue(lfring_t *lfr,
106+
void *const *restrict elems,
107+
uint32_t n_elems)
108+
{
109+
intptr_t actual = 0;
110+
ringidx_t mask = lfr->mask;
111+
ringidx_t size = mask + 1;
112+
ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_RELAXED);
113+
114+
if (lfr->flags & LFRING_FLAG_SP) { /* single-producer */
115+
ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE);
116+
actual = MIN((intptr_t)(head + size - tail), (intptr_t) n_elems);
117+
if (actual <= 0)
118+
return 0;
119+
120+
for (uint32_t i = 0; i < (uint32_t) actual; i++) {
121+
assert(lfr->ring[tail & mask].idx == tail - size);
122+
lfr->ring[tail & mask].ptr = *elems++;
123+
lfr->ring[tail & mask].idx = tail;
124+
tail++;
125+
}
126+
__atomic_store_n(&lfr->tail, tail, __ATOMIC_RELEASE);
127+
return (uint32_t) actual;
128+
}
129+
130+
/* else: lock-free multi-producer */
131+
restart:
132+
while ((uint32_t) actual < n_elems &&
133+
before(tail, __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE) + size)) {
134+
union {
135+
struct element e;
136+
ptrpair_t pp;
137+
} old, neu;
138+
void *elem = elems[actual];
139+
struct element *slot = &lfr->ring[tail & mask];
140+
old.e.ptr = __atomic_load_n(&slot->ptr, __ATOMIC_RELAXED);
141+
old.e.idx = __atomic_load_n(&slot->idx, __ATOMIC_RELAXED);
142+
do {
143+
if (UNLIKELY(old.e.idx != tail - size)) {
144+
if (old.e.idx != tail) {
145+
/* We are far behind. Restart with fresh index */
146+
tail = cond_reload(tail, &lfr->tail);
147+
goto restart;
148+
}
149+
/* slot already enqueued */
150+
tail++; /* Try next slot */
151+
goto restart;
152+
}
153+
154+
/* Found slot that was used one lap back.
155+
* Try to enqueue next element.
156+
*/
157+
neu.e.ptr = elem;
158+
neu.e.idx = tail; /* Set idx on enqueue */
159+
} while (!lf_compare_exchange((ptrpair_t *) slot, &old.pp, neu.pp));
160+
161+
/* Enqueue succeeded */
162+
actual++;
163+
tail++; /* Continue with next slot */
164+
}
165+
(void) cond_update(&lfr->tail, tail);
166+
return (uint32_t) actual;
167+
}
168+
169+
static inline ringidx_t find_tail(lfring_t *lfr, ringidx_t head, ringidx_t tail)
170+
{
171+
if (lfr->flags & LFRING_FLAG_SP) /* single-producer enqueue */
172+
return __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE);
173+
174+
/* Multi-producer enqueue.
175+
* Scan ring for new elements that have been written but not released.
176+
*/
177+
ringidx_t mask = lfr->mask;
178+
ringidx_t size = mask + 1;
179+
while (before(tail, head + size) &&
180+
__atomic_load_n(&lfr->ring[tail & mask].idx, __ATOMIC_RELAXED) ==
181+
tail)
182+
tail++;
183+
tail = cond_update(&lfr->tail, tail);
184+
return tail;
185+
}
186+
187+
/* Dequeue elements from head */
188+
uint32_t lfring_dequeue(lfring_t *lfr,
189+
void **restrict elems,
190+
uint32_t n_elems,
191+
uint32_t *index)
192+
{
193+
ringidx_t mask = lfr->mask;
194+
intptr_t actual;
195+
ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_RELAXED);
196+
ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE);
197+
do {
198+
actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems);
199+
if (UNLIKELY(actual <= 0)) {
200+
/* Ring buffer is empty, scan for new but unreleased elements */
201+
tail = find_tail(lfr, head, tail);
202+
actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems);
203+
if (actual <= 0)
204+
return 0;
205+
}
206+
for (uint32_t i = 0; i < (uint32_t) actual; i++)
207+
elems[i] = lfr->ring[(head + i) & mask].ptr;
208+
smp_fence(LoadStore); // Order loads only
209+
if (UNLIKELY(lfr->flags & LFRING_FLAG_SC)) { /* Single-consumer */
210+
__atomic_store_n(&lfr->head, head + actual, __ATOMIC_RELAXED);
211+
break;
212+
}
213+
214+
/* else: lock-free multi-consumer */
215+
} while (!__atomic_compare_exchange_n(
216+
&lfr->head, &head, /* Updated on failure */
217+
head + actual,
218+
/* weak */ false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
219+
*index = (uint32_t) head;
220+
return (uint32_t) actual;
221+
}

0 commit comments

Comments
 (0)