Skip to content

Commit 62c804a

Browse files
committed
Add a bounded lock-free queue
1 parent 5737c46 commit 62c804a

File tree

6 files changed

+777
-0
lines changed

6 files changed

+777
-0
lines changed

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ purpose of these programs is to be illustrative and educational.
1616
- [spmc](spmc/): A concurrent single-producer/multiple-consumer queue.
1717
- [mpsc](mpsc/): An unbounded lockless single-consumer/multiple-producer FIFO queue.
1818
- [mpmc](mpmc/): A multiple-producer/multiple-consumer (MPMC) queue using Linux futex.
19+
- [lf-queue](lf-queue/): A bounded lock-free queue.
1920
- [channel](channel/): A Linux futex based Go channel implementation.
2021
* [Lock-Free](https://en.wikipedia.org/wiki/Non-blocking_algorithm) Data Structure
2122
- [ringbuffer](ringbuffer/): A lock-less ring buffer.

lf-queue/.travis.yml

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
language: c
2+
3+
matrix:
4+
include:
5+
- os: linux
6+
dist: xenial
7+
compiler: gcc
8+
addons:
9+
apt:
10+
packages:
11+
- ninja-build
12+
- os: linux
13+
dist: xenial
14+
compiler: clang
15+
addons:
16+
apt:
17+
packages:
18+
- ninja-build
19+
- os: osx
20+
compiler: clang
21+
addons:
22+
homebrew:
23+
packages:
24+
- ninja
25+
26+
script:
27+
- mkdir build
28+
- cd build
29+
- cmake .. -G Ninja -DCMAKE_BUILD_TYPE=RelWithDebInfo
30+
- cmake --build .
31+
- ctest -V --timeout 60

lf-queue/Makefile

+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
CFLAGS = -Wall -O2 -g -I.
2+
LDFLAGS = -lpthread
3+
4+
# Enable ThreadSanitizer
5+
# CFLAGS += -fsanitize=thread
6+
# LDFLAGS += -fsanitize=thread
7+
8+
all: test
9+
10+
test: test.c
11+
$(CC) $(CFLAGS) -o $@ $< $(LDFLAGS)
12+
13+
clean:
14+
rm -f test

lf-queue/README.md

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# Multithreaded Bounded Lock-free Queue
2+
3+
Features:
4+
* single header style for C11
5+
* strongly typed
6+
* multithreaded
7+
* bounded
8+
* lock-free

lf-queue/queues.h

+290
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,290 @@
1+
#include <stddef.h>
2+
#include <stdint.h>
3+
#include <string.h>
4+
5+
#if !defined(QUEUE_TYPE) || !defined(QUEUE_MP) || !defined(QUEUE_MC)
6+
#error Please define QUEUE_TYPE, QUEUE_MP and QUEUE_MC
7+
#endif
8+
9+
#if !defined(QUEUE_COMMON_DEFINED)
10+
11+
#define QUEUE_COMMON_DEFINED
12+
13+
#if (__STDC_VERSION__ < 201112L)
14+
#error C11 is required for this file
15+
#endif
16+
17+
#include <stdatomic.h>
18+
19+
#if defined(__STDC_NO_ATOMICS__)
20+
#error Your C compiler does not support C11 atomics
21+
#endif
22+
23+
#define QUEUE_ATOMIC_SIZE_T atomic_size_t
24+
#define QUEUE_ORDER_RELAXED memory_order_relaxed
25+
#define QUEUE_ORDER_RELEASE memory_order_release
26+
#define QUEUE_ORDER_ACQUIRE memory_order_acquire
27+
#define QUEUE_ATOMIC_STORE atomic_store_explicit
28+
#define QUEUE_ATOMIC_LOAD atomic_load_explicit
29+
30+
#define QUEUE_MERGE_BASE(a, b) a##b
31+
#define QUEUE_MERGE(a, b) QUEUE_MERGE_BASE(a, b)
32+
33+
#if !defined(QUEUE_CACHELINE_BYTES)
34+
#define QUEUE_CACHELINE_BYTES 64
35+
#endif
36+
37+
#if !defined(QUEUE_TOO_BIG)
38+
#define QUEUE_TOO_BIG (1024ULL * 1024ULL * 256ULL)
39+
#endif
40+
41+
typedef enum {
42+
QueueResult_Ok,
43+
QueueResult_Full,
44+
QueueResult_Empty,
45+
QueueResult_Contention,
46+
47+
QueueResult_Error = 128,
48+
QueueResult_Error_Too_Small,
49+
QueueResult_Error_Too_Big,
50+
QueueResult_Error_Not_Pow2,
51+
QueueResult_Error_Not_Aligned_16_Bytes,
52+
QueueResult_Error_Null_Bytes,
53+
QueueResult_Error_Bytes_Smaller_Than_Needed
54+
} QueueResult_t;
55+
#endif
56+
57+
#if (QUEUE_MP)
58+
#define QUEUE_P_NAME_FN mp
59+
#define QUEUE_P_NAME_TYPE Mp
60+
#define QUEUE_P_TYPE QUEUE_ATOMIC_SIZE_T
61+
#define QUEUE_P_SETUP(a, b, c) QUEUE_ATOMIC_STORE(&a, b, c)
62+
#define QUEUE_P_LOAD(a, b) QUEUE_ATOMIC_LOAD(&a, b)
63+
64+
#define QUEUE_P_IF_CAS(a, b, c, d, e) \
65+
if (atomic_compare_exchange_weak_explicit(&a, &b, c, d, e))
66+
#else
67+
#define QUEUE_P_NAME_FN sp
68+
#define QUEUE_P_NAME_TYPE Sp
69+
#define QUEUE_P_TYPE size_t
70+
#define QUEUE_P_SETUP(a, b, c)
71+
#define QUEUE_P_LOAD(a, b) a
72+
#define QUEUE_P_IF_CAS(a, b, c, d, e) a = c;
73+
#endif
74+
75+
#if (QUEUE_MC)
76+
#define QUEUE_C_NAME mc
77+
#define QUEUE_C_TYPE QUEUE_ATOMIC_SIZE_T
78+
#define QUEUE_C_SETUP(a, b, c) QUEUE_ATOMIC_STORE(&a, b, c)
79+
#define QUEUE_C_LOAD(a, b) QUEUE_ATOMIC_LOAD(&a, b)
80+
81+
#define QUEUE_C_IF_CAS(a, b, c, d, e) \
82+
if (atomic_compare_exchange_weak_explicit(&a, &b, c, d, e))
83+
#else
84+
#define QUEUE_C_NAME sc
85+
#define QUEUE_C_TYPE size_t
86+
#define QUEUE_C_SETUP(a, b, c)
87+
#define QUEUE_C_LOAD(a, b) a
88+
#define QUEUE_C_IF_CAS(a, b, c, d, e) a = c;
89+
#endif
90+
91+
#define QUEUE_FN_A QUEUE_MERGE(QUEUE_P_NAME_FN, QUEUE_C_NAME)
92+
#define QUEUE_FN_B QUEUE_MERGE(QUEUE_FN_A, _)
93+
#define QUEUE_FN(name) QUEUE_MERGE(QUEUE_MERGE(QUEUE_FN_B, name##_), QUEUE_TYPE)
94+
95+
#define QUEUE_STRUCT_A QUEUE_MERGE(QUEUE_P_NAME_TYPE, QUEUE_C_NAME)
96+
#define QUEUE_STRUCT_B QUEUE_MERGE(QUEUE_STRUCT_A, _)
97+
#define QUEUE_STRUCT_C QUEUE_MERGE(QUEUE_STRUCT_B, QUEUE_TYPE)
98+
#define QUEUE_STRUCT QUEUE_MERGE(Queue_, QUEUE_STRUCT_C)
99+
#define QUEUE_CELL QUEUE_MERGE(Cell_, QUEUE_STRUCT_C)
100+
101+
typedef struct QUEUE_STRUCT QUEUE_STRUCT;
102+
103+
QueueResult_t QUEUE_FN(make_queue)(size_t cell_count,
104+
QUEUE_STRUCT *queue,
105+
size_t *bytes);
106+
107+
QueueResult_t QUEUE_FN(try_enqueue)(QUEUE_STRUCT *queue,
108+
QUEUE_TYPE const *data);
109+
QueueResult_t QUEUE_FN(try_dequeue)(QUEUE_STRUCT *queue, QUEUE_TYPE *data);
110+
QueueResult_t QUEUE_FN(enqueue)(QUEUE_STRUCT *queue, QUEUE_TYPE const *data);
111+
QueueResult_t QUEUE_FN(dequeue)(QUEUE_STRUCT *queue, QUEUE_TYPE *data);
112+
113+
#if defined(QUEUE_IMPLEMENTATION)
114+
#undef QUEUE_IMPLEMENTATION
115+
116+
typedef struct QUEUE_CELL {
117+
QUEUE_ATOMIC_SIZE_T sequence;
118+
QUEUE_TYPE data;
119+
} QUEUE_CELL;
120+
121+
typedef struct QUEUE_STRUCT {
122+
uint8_t pad0[QUEUE_CACHELINE_BYTES];
123+
124+
QUEUE_P_TYPE enqueue_index;
125+
uint8_t pad2[QUEUE_CACHELINE_BYTES - sizeof(QUEUE_P_TYPE)];
126+
127+
QUEUE_C_TYPE dequeue_index;
128+
uint8_t pad3[QUEUE_CACHELINE_BYTES - sizeof(QUEUE_C_TYPE)];
129+
130+
size_t cell_mask;
131+
uint8_t pad4[QUEUE_CACHELINE_BYTES - sizeof(size_t)];
132+
133+
QUEUE_CELL cells[];
134+
} QUEUE_STRUCT;
135+
136+
QueueResult_t QUEUE_FN(make_queue)(size_t cell_count,
137+
QUEUE_STRUCT *queue,
138+
size_t *bytes)
139+
{
140+
if (!bytes)
141+
return QueueResult_Error_Null_Bytes;
142+
143+
if (cell_count < 2)
144+
return QueueResult_Error_Too_Small;
145+
146+
if (cell_count > QUEUE_TOO_BIG)
147+
return QueueResult_Error_Too_Big;
148+
149+
if (cell_count & (cell_count - 1))
150+
return QueueResult_Error_Not_Pow2;
151+
152+
size_t bytes_local =
153+
sizeof(QUEUE_STRUCT) + (sizeof(QUEUE_CELL) * cell_count);
154+
155+
if (!queue) {
156+
*bytes = bytes_local;
157+
return QueueResult_Ok;
158+
}
159+
160+
if (*bytes < bytes_local)
161+
return QueueResult_Error_Bytes_Smaller_Than_Needed;
162+
163+
{
164+
intptr_t queue_value = (intptr_t) queue;
165+
166+
if (queue_value & 0x0F) {
167+
return QueueResult_Error_Not_Aligned_16_Bytes;
168+
}
169+
}
170+
171+
memset(queue, 0, bytes_local);
172+
173+
queue->cell_mask = cell_count - 1;
174+
175+
for (size_t i = 0; i < cell_count; i++) {
176+
QUEUE_ATOMIC_STORE(&queue->cells[i].sequence, i, QUEUE_ORDER_RELAXED);
177+
}
178+
179+
QUEUE_P_SETUP(queue->enqueue_index, 0, QUEUE_ORDER_RELAXED);
180+
QUEUE_C_SETUP(queue->dequeue_index, 0, QUEUE_ORDER_RELAXED);
181+
182+
return QueueResult_Ok;
183+
}
184+
185+
QueueResult_t QUEUE_FN(try_enqueue)(QUEUE_STRUCT *queue, QUEUE_TYPE const *data)
186+
{
187+
size_t pos = QUEUE_P_LOAD(queue->enqueue_index, QUEUE_ORDER_RELAXED);
188+
189+
QUEUE_CELL *cell = &queue->cells[pos & queue->cell_mask];
190+
191+
size_t sequence = QUEUE_ATOMIC_LOAD(&cell->sequence, QUEUE_ORDER_ACQUIRE);
192+
193+
intptr_t difference = (intptr_t) sequence - (intptr_t) pos;
194+
195+
if (!difference) {
196+
QUEUE_P_IF_CAS(queue->enqueue_index, pos, pos + 1, QUEUE_ORDER_RELAXED,
197+
QUEUE_ORDER_RELAXED)
198+
{
199+
cell->data = *data;
200+
201+
QUEUE_ATOMIC_STORE(&cell->sequence, pos + 1, QUEUE_ORDER_RELEASE);
202+
203+
return QueueResult_Ok;
204+
}
205+
}
206+
207+
if (difference < 0)
208+
return QueueResult_Full;
209+
210+
return QueueResult_Contention;
211+
}
212+
213+
QueueResult_t QUEUE_FN(try_dequeue)(QUEUE_STRUCT *queue, QUEUE_TYPE *data)
214+
{
215+
size_t pos = QUEUE_C_LOAD(queue->dequeue_index, QUEUE_ORDER_RELAXED);
216+
217+
QUEUE_CELL *cell = &queue->cells[pos & queue->cell_mask];
218+
219+
size_t sequence = QUEUE_ATOMIC_LOAD(&cell->sequence, QUEUE_ORDER_ACQUIRE);
220+
221+
intptr_t difference = (intptr_t) sequence - (intptr_t)(pos + 1);
222+
223+
if (!difference) {
224+
QUEUE_C_IF_CAS(queue->dequeue_index, pos, pos + 1, QUEUE_ORDER_RELAXED,
225+
QUEUE_ORDER_RELAXED)
226+
{
227+
*data = cell->data;
228+
229+
QUEUE_ATOMIC_STORE(&cell->sequence, pos + queue->cell_mask + 1,
230+
QUEUE_ORDER_RELEASE);
231+
232+
return QueueResult_Ok;
233+
}
234+
}
235+
236+
if (difference < 0) {
237+
return QueueResult_Empty;
238+
}
239+
240+
return QueueResult_Contention;
241+
}
242+
243+
QueueResult_t QUEUE_FN(enqueue)(QUEUE_STRUCT *queue, QUEUE_TYPE const *data)
244+
{
245+
QueueResult_t result;
246+
247+
do {
248+
result = QUEUE_FN(try_enqueue)(queue, data);
249+
} while (result == QueueResult_Contention);
250+
251+
return result;
252+
}
253+
254+
QueueResult_t QUEUE_FN(dequeue)(QUEUE_STRUCT *queue, QUEUE_TYPE *data)
255+
{
256+
QueueResult_t result;
257+
258+
do {
259+
result = QUEUE_FN(try_dequeue)(queue, data);
260+
} while (result == QueueResult_Contention);
261+
262+
return result;
263+
}
264+
#endif
265+
266+
#undef QUEUE_TYPE
267+
#undef QUEUE_MP
268+
#undef QUEUE_MC
269+
270+
#undef QUEUE_P_NAME_FN
271+
#undef QUEUE_P_NAME_TYPE
272+
#undef QUEUE_P_NAME
273+
#undef QUEUE_P_TYPE
274+
#undef QUEUE_P_SETUP
275+
#undef QUEUE_P_LOAD
276+
#undef QUEUE_P_IF_CAS
277+
278+
#undef QUEUE_C_NAME
279+
#undef QUEUE_C_TYPE
280+
#undef QUEUE_C_SETUP
281+
#undef QUEUE_C_LOAD
282+
#undef QUEUE_C_IF_CAS
283+
284+
#undef QUEUE_FN_A
285+
#undef QUEUE_FN
286+
#undef QUEUE_STRUCT_A
287+
#undef QUEUE_STRUCT_B
288+
#undef QUEUE_STRUCT_C
289+
#undef QUEUE_STRUCT
290+
#undef QUEUE_CELL

0 commit comments

Comments
 (0)