Skip to content

Commit eb25a45

Browse files
committed
Add unbounded lockless single-consumer/multiple-producer FIFO queue
1 parent ef87b79 commit eb25a45

File tree

3 files changed

+359
-0
lines changed

3 files changed

+359
-0
lines changed

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ purpose of these programs is to be illustrative and educational.
1313
- [tpool](tpool/): A lightweight thread pool.
1414
* [Producer–consumer problem](https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem)
1515
- [spmc](spmc/): A concurrent single-producer/multiple-consumer queue.
16+
- [mpsc](mpsc/): An unbounded lockless single-consumer/multiple-producer FIFO queue.
1617
- [mpmc](mpmc/): A multiple-producer/multiple-consumer (MPMC) queue.
1718
- [channel](channel/): A Linux futex based Go channel implementation.
1819
* [Lock-Free](https://en.wikipedia.org/wiki/Non-blocking_algorithm) Data Structure

mpsc/Makefile

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
all:
2+
$(CC) -Wall -std=c11 -o mpsc mpsc.c -lpthread
3+
4+
indent:
5+
clang-format -i mpsc.c
6+
7+
clean:
8+
rm -f mpsc

mpsc/mpsc.c

+350
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,350 @@
1+
#include <stddef.h>
2+
#include <stdint.h>
3+
4+
/* Classical Producer-Consumer Problem, utilizing unbounded lockless single
5+
* consumer multiple producer FIFO queue.
6+
*/
7+
8+
typedef struct queue_internal *queue_p;
9+
10+
typedef enum {
11+
QUEUE_FALSE,
12+
QUEUE_SUCCESS,
13+
QUEUE_TRUE,
14+
QUEUE_OUT_OF_MEMORY = -1,
15+
} queue_result_t;
16+
17+
/**
18+
* \brief An unbounded lockless single consumer multiple producer FIFO Queue.
19+
*/
20+
struct __QUEUE_API__ {
21+
/** Create a new Queue object.
22+
* @param size the storage size in bytes.
23+
*/
24+
queue_p (*create)(size_t size);
25+
26+
/** Push an element to the back of the queue.
27+
* Pushing supports copying and moving. Pushing is considered a producer
28+
* operation. Any thread can safely execute this operation at any time.
29+
* @param data the region where the value stored will be copied from.
30+
* @return QUEUE_OUT_OF_MEMORY if the heap is exhausted.
31+
*/
32+
queue_result_t (*push)(queue_p, void *data);
33+
34+
/** Check if the queue has any data.
35+
* The method is considered a consumer operation, and only one thread may
36+
* safely execute this at one time.
37+
* @return QUEUE_TRUE if there is a front.
38+
* @return QUEUE_FALSE if there is not.
39+
*/
40+
queue_result_t (*hasFront)(queue_p);
41+
42+
/** Get the value at the front of the queue.
43+
* You should always check that there is data in queue before calling
44+
* front as there is no built in check. If no data is in the queue when
45+
* front is called, memory violation likely happens.
46+
* Getting data is considered a consumer operation, only one thread may
47+
* safely execute this at one time.
48+
* @param data the destination where value stored will be copied to
49+
*/
50+
queue_result_t (*front)(queue_p, void *data);
51+
52+
/** Remove the item at the front of the queue.
53+
* You should always check that there is data in queue before popping as
54+
* there is no built in check. If no data is in the queue when pop is
55+
* called, memory violation likely happens.
56+
* Popping is considered a consumer operation, and only one thread may
57+
* safely execute this at one time.
58+
*/
59+
queue_result_t (*pop)(queue_p);
60+
61+
/** Clear the entire queue.
62+
* You should always clear it before deleting the Queue itself, otherwise
63+
* you will leak memory.
64+
*/
65+
queue_result_t (*clear)(queue_p);
66+
67+
/** Destroy the queue object */
68+
queue_result_t (*destroy)(queue_p);
69+
} Queue;
70+
71+
#include <assert.h>
72+
#include <stdatomic.h>
73+
#include <stdlib.h>
74+
#include <string.h>
75+
76+
static const size_t sentinel = 0xDEADC0DE;
77+
static const size_t alignment = sizeof(size_t);
78+
79+
typedef struct node {
80+
atomic_uintptr_t next;
81+
} node;
82+
83+
struct queue_internal {
84+
atomic_uintptr_t head, tail;
85+
size_t item_size;
86+
};
87+
88+
static queue_p queue_create(size_t item_size)
89+
{
90+
size_t *ptr = calloc(sizeof(struct queue_internal) + alignment, 1);
91+
assert(ptr);
92+
ptr[0] = sentinel;
93+
queue_p q = (queue_p)(ptr + 1);
94+
atomic_init(&q->head, 0);
95+
atomic_init(&q->tail, 0);
96+
q->item_size = item_size;
97+
return q;
98+
}
99+
100+
static queue_result_t queue_has_front(queue_p q)
101+
{
102+
assert(q);
103+
return (atomic_load(&q->head) == 0) ? QUEUE_FALSE : QUEUE_TRUE;
104+
}
105+
106+
static queue_result_t queue_front(queue_p q, void *data)
107+
{
108+
assert(q);
109+
assert(data);
110+
node *head = (node *) atomic_load(&q->head);
111+
assert(head);
112+
memcpy(data, (void *) (head + 1), q->item_size);
113+
return QUEUE_SUCCESS;
114+
}
115+
116+
static queue_result_t queue_pop(queue_p q)
117+
{
118+
assert(q);
119+
assert(queue_has_front(q) == QUEUE_TRUE);
120+
121+
/* get the head */
122+
node *popped = (node *) atomic_load(&q->head);
123+
node *compare = popped;
124+
125+
/* set the tail and head to nothing if they are the same */
126+
if (atomic_compare_exchange_strong(&q->tail, &compare, 0)) {
127+
compare = popped;
128+
/* It is possible for another thread to have pushed after
129+
* we swap out the tail. In this case, the head will be different
130+
* then what was popped, so we just do a blind exchange regardless
131+
* of the result.
132+
*/
133+
atomic_compare_exchange_strong(&q->head, &compare, 0);
134+
} else {
135+
/* tail is different from head, set the head to the next value */
136+
node *new_head = 0;
137+
while (!new_head) {
138+
/* It is possible that the next node has not been assigned yet,
139+
* so just spin until the pushing thread stores the value.
140+
*/
141+
new_head = (node *) atomic_load(&popped->next);
142+
}
143+
atomic_store(&q->head, (atomic_uintptr_t) new_head);
144+
}
145+
146+
free(popped);
147+
return QUEUE_SUCCESS;
148+
}
149+
150+
static queue_result_t queue_push(queue_p q, void *data)
151+
{
152+
assert(q);
153+
/* create the new tail */
154+
node *new_tail = malloc(sizeof(node) + q->item_size);
155+
if (!new_tail)
156+
return QUEUE_OUT_OF_MEMORY;
157+
158+
atomic_init(&new_tail->next, 0);
159+
memcpy(new_tail + 1, data, q->item_size);
160+
161+
/* swap the new tail with the old */
162+
node *old_tail =
163+
(node *) atomic_exchange(&q->tail, (atomic_uintptr_t) new_tail);
164+
165+
/* link the old tail to the new */
166+
atomic_store(old_tail ? &old_tail->next : &q->head,
167+
(atomic_uintptr_t) new_tail);
168+
return QUEUE_SUCCESS;
169+
}
170+
171+
static queue_result_t queue_clear(queue_p q)
172+
{
173+
assert(q);
174+
while (queue_has_front(q) == QUEUE_TRUE) {
175+
queue_result_t result = queue_pop(q);
176+
assert(result == QUEUE_SUCCESS);
177+
}
178+
return QUEUE_SUCCESS;
179+
}
180+
181+
static queue_result_t queue_destroy(queue_p q)
182+
{
183+
size_t *ptr = (size_t *) q - 1;
184+
assert(ptr[0] == sentinel);
185+
free(ptr);
186+
return QUEUE_SUCCESS;
187+
}
188+
189+
/* API gateway */
190+
struct __QUEUE_API__ Queue = {
191+
.create = queue_create,
192+
.hasFront = queue_has_front,
193+
.front = queue_front,
194+
.pop = queue_pop,
195+
.push = queue_push,
196+
.clear = queue_clear,
197+
.destroy = queue_destroy,
198+
};
199+
200+
#include <pthread.h>
201+
#include <stdio.h>
202+
203+
static void basic_test()
204+
{
205+
queue_p q = Queue.create(sizeof(int));
206+
assert(q);
207+
208+
/* initial queue is empty */
209+
assert(Queue.hasFront(q) == QUEUE_FALSE);
210+
211+
queue_result_t result;
212+
/* push one item to the empty queue */
213+
{
214+
int in = 1, out = 0;
215+
{
216+
result = Queue.push(q, &in);
217+
assert(result == QUEUE_SUCCESS);
218+
}
219+
assert(Queue.hasFront(q) == QUEUE_TRUE);
220+
{
221+
result = Queue.front(q, &out);
222+
assert(result == QUEUE_SUCCESS);
223+
}
224+
assert(out == in);
225+
}
226+
227+
/* pop one item out of a single item queue */
228+
{
229+
result = Queue.pop(q);
230+
assert(result == QUEUE_SUCCESS);
231+
}
232+
assert(Queue.hasFront(q) == QUEUE_FALSE);
233+
234+
/* push many items on the queue */
235+
for (size_t i = 0; i < 64; ++i) {
236+
int in = i, out = -1;
237+
result = Queue.push(q, &in);
238+
assert(result == QUEUE_SUCCESS);
239+
240+
assert(Queue.hasFront(q) == QUEUE_TRUE);
241+
result = Queue.front(q, &out);
242+
assert(result == QUEUE_SUCCESS);
243+
244+
assert(out == 0);
245+
}
246+
247+
/* pop many items from the queue */
248+
for (size_t i = 0; i < 32; ++i) {
249+
int out = -1;
250+
result = Queue.front(q, &out);
251+
assert(result == QUEUE_SUCCESS);
252+
assert(out == i);
253+
254+
result = Queue.pop(q);
255+
assert(result == QUEUE_SUCCESS);
256+
}
257+
258+
/* clear the queue */
259+
assert(Queue.hasFront(q) == QUEUE_TRUE);
260+
result = Queue.clear(q);
261+
assert(result == QUEUE_SUCCESS);
262+
263+
assert(Queue.hasFront(q) == QUEUE_FALSE);
264+
265+
Queue.destroy(q);
266+
}
267+
268+
#define THREAD_COUNT (50 * 1.5 * 1000 * 1000)
269+
#define PRODUCER_COUNT 64
270+
271+
typedef struct {
272+
atomic_int consume_count, producer_count;
273+
queue_p q;
274+
} queue_test_t;
275+
276+
static void *test_consumer(void *arg)
277+
{
278+
queue_test_t *test = (queue_test_t *) arg;
279+
while (atomic_load(&test->consume_count) < THREAD_COUNT) {
280+
if (Queue.hasFront(test->q)) {
281+
atomic_fetch_add(&test->consume_count, 1);
282+
queue_result_t result = Queue.pop(test->q);
283+
assert(result == QUEUE_SUCCESS);
284+
}
285+
}
286+
return NULL;
287+
}
288+
289+
static void *test_producer(void *arg)
290+
{
291+
queue_test_t *test = (queue_test_t *) arg;
292+
assert(test->q);
293+
while (1) {
294+
int in = atomic_fetch_add(&test->producer_count, 1);
295+
if (in >= THREAD_COUNT)
296+
break;
297+
queue_result_t result = Queue.push(test->q, &in);
298+
assert(result == QUEUE_SUCCESS);
299+
}
300+
return NULL;
301+
}
302+
303+
static void stress_test()
304+
{
305+
queue_test_t test;
306+
atomic_init(&test.consume_count, 0);
307+
atomic_init(&test.producer_count, 0);
308+
309+
test.q = Queue.create(sizeof(int));
310+
assert(test.q);
311+
312+
/* thread creation */
313+
pthread_t consumer, producers[PRODUCER_COUNT];
314+
{
315+
int p_result = pthread_create(&consumer, NULL, test_consumer, &test);
316+
assert(p_result == 0);
317+
}
318+
for (size_t i = 0; i < PRODUCER_COUNT; ++i) {
319+
int p_result =
320+
pthread_create(&producers[i], NULL, test_producer, &test);
321+
assert(p_result == 0);
322+
}
323+
324+
/* wait for completion */
325+
for (size_t i = 0; i < PRODUCER_COUNT; ++i) {
326+
int p_result = pthread_join(producers[i], NULL);
327+
assert(p_result == 0);
328+
}
329+
{
330+
int p_result = pthread_join(consumer, NULL);
331+
assert(p_result == 0);
332+
}
333+
334+
assert(Queue.hasFront(test.q) == QUEUE_FALSE);
335+
336+
Queue.destroy(test.q);
337+
}
338+
339+
int main(int argc, char *argv[])
340+
{
341+
printf("** Basic operations **\n");
342+
basic_test();
343+
printf("Verified OK!\n\n");
344+
345+
printf("** Stress test **\n");
346+
stress_test();
347+
printf("Verified OK!\n\n");
348+
349+
return 0;
350+
}

0 commit comments

Comments
 (0)