Skip to content

Commit 49a5785

Browse files
committed
Add a lock-free MPMC broadcast pub-sub queue
1 parent 550c22b commit 49a5785

File tree

10 files changed

+668
-0
lines changed

10 files changed

+668
-0
lines changed

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ purpose of these programs is to be illustrative and educational.
1919
- [mpmc](mpmc/): A multiple-producer/multiple-consumer (MPMC) queue using Linux futex.
2020
- [lf-queue](lf-queue/): A bounded lock-free queue.
2121
- [channel](channel/): A Linux futex based Go channel implementation.
22+
- [broadcast](broadcast/): A lock-free MPMC broadcast pub-sub queue.
2223
* [Lock-Free](https://en.wikipedia.org/wiki/Non-blocking_algorithm) Data Structure
2324
- [ringbuffer](ringbuffer/): A lock-less ring buffer.
2425
- [lfring](lfring/): A lock-free multiple-producer/multiple-consumer (MPMC) ring buffer.

broadcast/Makefile

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
all:
2+
gcc -Wall -O2 -D_GNU_SOURCE \
3+
-o stress \
4+
stress.c \
5+
broadcast.c \
6+
pool.c \
7+
-lpthread
8+
9+
clean:
10+
rm -f stress

broadcast/broadcast.c

+246
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
#include <stdlib.h>
2+
3+
#include "broadcast.h"
4+
#include "pool.h"
5+
#include "util.h"
6+
7+
#define ESTIMATED_PUBLISHERS 16
8+
9+
struct __attribute__((aligned(CACHELINE_SIZE))) broadcast {
10+
uint64_t depth_mask;
11+
uint64_t max_msg_size;
12+
uint64_t head_idx, tail_idx;
13+
size_t pool_off;
14+
char padding[CACHELINE_SIZE - 5 * sizeof(uint64_t) - sizeof(size_t)];
15+
16+
lf_ref_t slots[];
17+
};
18+
static_assert(sizeof(broadcast_t) == CACHELINE_SIZE, "");
19+
static_assert(alignof(broadcast_t) == CACHELINE_SIZE, "");
20+
21+
typedef struct __attribute__((aligned(alignof(uint128_t)))) msg {
22+
uint64_t size;
23+
uint8_t payload[];
24+
} msg_t;
25+
26+
static inline pool_t *get_pool(broadcast_t *b)
27+
{
28+
return (pool_t *) ((char *) b + b->pool_off);
29+
}
30+
31+
static void broadcast_footprint(size_t depth,
32+
size_t max_msg_size,
33+
size_t *size,
34+
size_t *align);
35+
36+
broadcast_t *broadcast_new(size_t depth, size_t max_msg_size)
37+
{
38+
size_t mem_size, mem_align;
39+
broadcast_footprint(depth, max_msg_size, &mem_size, &mem_align);
40+
41+
void *mem = NULL;
42+
int ret = posix_memalign(&mem, mem_align, mem_size);
43+
if (ret != 0)
44+
return NULL;
45+
46+
broadcast_t *bcast = broadcast_mem_init(mem, depth, max_msg_size);
47+
if (!bcast) {
48+
free(mem);
49+
return NULL;
50+
}
51+
52+
return bcast;
53+
}
54+
55+
void broadcast_delete(broadcast_t *bcast)
56+
{
57+
free(bcast);
58+
}
59+
60+
static void try_drop_head(broadcast_t *b, uint64_t head_idx)
61+
{
62+
lf_ref_t head_cur = b->slots[head_idx & b->depth_mask];
63+
if (!LF_U64_CAS(&b->head_idx, head_idx, head_idx + 1))
64+
return;
65+
66+
/* TODO: release the shared resources */
67+
uint64_t msg_off = head_cur.val;
68+
msg_t *msg = (msg_t *) ((char *) b + msg_off);
69+
pool_release(get_pool(b), msg);
70+
}
71+
72+
bool broadcast_pub(broadcast_t *b, void *msg_buf, size_t msg_size)
73+
{
74+
msg_t *msg = (msg_t *) pool_acquire(get_pool(b));
75+
if (!msg)
76+
return false; /* out of elements */
77+
uint64_t msg_off = (char *) msg - (char *) b;
78+
79+
msg->size = msg_size;
80+
memcpy(msg->payload, msg_buf, msg_size);
81+
82+
while (1) {
83+
uint64_t head_idx = b->head_idx;
84+
uint64_t tail_idx = b->tail_idx;
85+
lf_ref_t *tail_ptr = &b->slots[tail_idx & b->depth_mask];
86+
lf_ref_t tail_cur = *tail_ptr;
87+
LF_BARRIER_ACQUIRE();
88+
89+
/* Stale tail pointer? Try to advance it */
90+
if (tail_cur.tag == tail_idx) {
91+
LF_U64_CAS(&b->tail_idx, tail_idx, tail_idx + 1);
92+
LF_PAUSE();
93+
continue;
94+
}
95+
96+
/* Stale tail_idx? Try again */
97+
if (tail_cur.tag >= tail_idx) {
98+
LF_PAUSE();
99+
continue;
100+
}
101+
102+
/* Slot currently used. if full, roll off the head */
103+
if (head_idx <= tail_cur.tag) {
104+
try_drop_head(b, head_idx);
105+
LF_PAUSE();
106+
continue;
107+
}
108+
109+
/* Otherwise, try to append the tail */
110+
lf_ref_t tail_next = LF_REF_MAKE(tail_idx, msg_off);
111+
if (!LF_REF_CAS(tail_ptr, tail_cur, tail_next)) {
112+
LF_PAUSE();
113+
continue;
114+
}
115+
116+
/* Success, try to update the tail. */
117+
LF_U64_CAS(&b->tail_idx, tail_idx, tail_idx + 1);
118+
return true;
119+
}
120+
}
121+
122+
typedef struct sub_impl sub_impl_t;
123+
struct __attribute__((aligned(16))) sub_impl {
124+
broadcast_t *bcast;
125+
uint64_t idx;
126+
char _extra[16];
127+
};
128+
static_assert(sizeof(sub_impl_t) == sizeof(broadcast_sub_t), "");
129+
static_assert(alignof(sub_impl_t) == alignof(broadcast_sub_t), "");
130+
131+
void broadcast_sub_begin(broadcast_sub_t *_sub, broadcast_t *b)
132+
{
133+
sub_impl_t *sub = (sub_impl_t *) _sub;
134+
sub->bcast = b;
135+
sub->idx = b->head_idx;
136+
}
137+
138+
bool broadcast_sub_next(broadcast_sub_t *_sub,
139+
void *msg_buf,
140+
size_t *_out_msg_size,
141+
size_t *_out_drops)
142+
{
143+
sub_impl_t *sub = (sub_impl_t *) _sub;
144+
broadcast_t *b = sub->bcast;
145+
size_t drops = 0;
146+
147+
while (1) {
148+
if (sub->idx == b->tail_idx)
149+
return false;
150+
151+
lf_ref_t *ref_ptr = &b->slots[sub->idx & b->depth_mask];
152+
lf_ref_t ref = *ref_ptr;
153+
154+
LF_BARRIER_ACQUIRE();
155+
156+
/* we have fallen behind and the message we wanted was dropped? */
157+
if (ref.tag != sub->idx) {
158+
sub->idx++;
159+
drops++;
160+
LF_PAUSE();
161+
continue;
162+
}
163+
uint64_t msg_off = ref.val;
164+
msg_t *msg = (msg_t *) ((char *) b + msg_off);
165+
size_t msg_size = msg->size;
166+
if (msg_size > b->max_msg_size) { /* inconsistent */
167+
LF_PAUSE();
168+
continue;
169+
}
170+
memcpy(msg_buf, msg->payload, msg_size);
171+
172+
LF_BARRIER_ACQUIRE();
173+
174+
lf_ref_t ref2 = *ref_ptr;
175+
/* Data hanged while reading? Drop it */
176+
if (!LF_REF_EQUAL(ref, ref2)) {
177+
sub->idx++;
178+
drops++;
179+
LF_PAUSE();
180+
continue;
181+
}
182+
183+
sub->idx++;
184+
*_out_msg_size = msg_size;
185+
*_out_drops = drops;
186+
return true;
187+
}
188+
}
189+
190+
static void broadcast_footprint(size_t depth,
191+
size_t max_msg_size,
192+
size_t *_size,
193+
size_t *_align)
194+
{
195+
size_t elt_size =
196+
LF_ALIGN_UP(sizeof(msg_t) + max_msg_size, alignof(uint128_t));
197+
size_t pool_elts = depth + ESTIMATED_PUBLISHERS;
198+
199+
size_t pool_size, pool_align;
200+
pool_footprint(pool_elts, elt_size, &pool_size, &pool_align);
201+
202+
size_t size = sizeof(broadcast_t);
203+
size += depth * sizeof(lf_ref_t);
204+
size = LF_ALIGN_UP(size, pool_align);
205+
size += pool_size;
206+
207+
if (_size)
208+
*_size = size;
209+
if (_align)
210+
*_align = alignof(broadcast_t);
211+
}
212+
213+
broadcast_t *broadcast_mem_init(void *mem, size_t depth, size_t max_msg_size)
214+
{
215+
if (!LF_IS_POW2(depth))
216+
return NULL;
217+
if (max_msg_size == 0)
218+
return NULL;
219+
220+
size_t elt_size =
221+
LF_ALIGN_UP(sizeof(msg_t) + max_msg_size, alignof(uint128_t));
222+
size_t pool_elts = depth + ESTIMATED_PUBLISHERS;
223+
224+
size_t pool_size, pool_align;
225+
pool_footprint(pool_elts, elt_size, &pool_size, &pool_align);
226+
227+
size_t size = sizeof(broadcast_t) + depth * sizeof(lf_ref_t);
228+
size_t pool_off = LF_ALIGN_UP(size, pool_align);
229+
void *pool_mem = (char *) mem + pool_off;
230+
231+
broadcast_t *b = (broadcast_t *) mem;
232+
b->depth_mask = depth - 1;
233+
b->max_msg_size = max_msg_size;
234+
/* Start from 1 because we use 0 to mean "unused" */
235+
b->head_idx = 1, b->tail_idx = 1;
236+
b->pool_off = pool_off;
237+
238+
memset(b->slots, 0, depth * sizeof(lf_ref_t));
239+
240+
pool_t *pool = pool_mem_init(pool_mem, pool_elts, elt_size);
241+
if (!pool)
242+
return NULL;
243+
assert(pool == pool_mem);
244+
245+
return b;
246+
}

broadcast/broadcast.h

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#pragma once
2+
3+
#include <stdbool.h>
4+
#include <stddef.h>
5+
6+
/* Lock-free multi-publisher broadcast to multi-consumer */
7+
8+
typedef struct broadcast broadcast_t;
9+
typedef struct broadcast_sub broadcast_sub_t;
10+
struct __attribute__((aligned(16))) broadcast_sub {
11+
char _opaque[32];
12+
};
13+
14+
broadcast_t *broadcast_new(size_t depth, size_t max_msg_size);
15+
void broadcast_delete(broadcast_t *bcast);
16+
bool broadcast_pub(broadcast_t *b, void *msg, size_t msg_size);
17+
void broadcast_sub_begin(broadcast_sub_t *sub, broadcast_t *b);
18+
bool broadcast_sub_next(broadcast_sub_t *sub,
19+
void *msg_buf,
20+
size_t *_out_msg_size,
21+
size_t *_out_drops);
22+
23+
broadcast_t *broadcast_mem_init(void *mem, size_t depth, size_t max_msg_size);

broadcast/platform.h

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
#include <assert.h>
2+
#include <limits.h>
3+
#include <stddef.h>
4+
5+
/* Machine is byte-addressable */
6+
static_assert(CHAR_BIT == 8, "");
7+
8+
/* Sizes / Offsets / Alignments are 64-bit */
9+
static_assert(sizeof(size_t) == 8, "");
10+
11+
/* Assuming we have 64-bit pointers */
12+
static_assert(sizeof(void *) == 8, "");
13+
14+
/* Long and Long Long are the same and 64-bits */
15+
static_assert(sizeof(long) == sizeof(long long), "");
16+
static_assert(sizeof(long) == 8, "");

broadcast/pool.c

+87
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
#include "pool.h"
2+
#include "util.h"
3+
4+
struct __attribute__((aligned(CACHELINE_SIZE))) pool {
5+
size_t num_elts;
6+
size_t elt_size;
7+
uint64_t tag_next;
8+
uint64_t _pad1;
9+
lf_ref_t head;
10+
char _pad2[CACHELINE_SIZE - 2 * sizeof(size_t) - 2 * sizeof(uint64_t) -
11+
sizeof(lf_ref_t)];
12+
13+
char mem[];
14+
};
15+
static_assert(sizeof(pool_t) == CACHELINE_SIZE, "");
16+
static_assert(alignof(pool_t) == CACHELINE_SIZE, "");
17+
18+
void *pool_acquire(pool_t *pool)
19+
{
20+
while (1) {
21+
lf_ref_t cur = pool->head;
22+
if (LF_REF_IS_NULL(cur))
23+
return NULL;
24+
25+
uint64_t elt_off = cur.val;
26+
lf_ref_t *elt = (lf_ref_t *) ((char *) pool + elt_off);
27+
lf_ref_t next = *elt;
28+
29+
if (!LF_REF_CAS(&pool->head, cur, next)) {
30+
LF_PAUSE();
31+
continue;
32+
}
33+
return elt;
34+
}
35+
}
36+
37+
void pool_release(pool_t *pool, void *elt)
38+
{
39+
uint64_t elt_off = (uint64_t) ((char *) elt - (char *) pool);
40+
uint64_t tag = LF_ATOMIC_INC(&pool->tag_next);
41+
lf_ref_t next = LF_REF_MAKE(tag, elt_off);
42+
43+
while (1) {
44+
lf_ref_t cur = pool->head;
45+
*(lf_ref_t *) elt = cur;
46+
47+
if (!LF_REF_CAS(&pool->head, cur, next)) {
48+
LF_PAUSE();
49+
continue;
50+
}
51+
return;
52+
}
53+
}
54+
55+
void pool_footprint(size_t num_elts,
56+
size_t elt_size,
57+
size_t *_size,
58+
size_t *_align)
59+
{
60+
elt_size = LF_ALIGN_UP(elt_size, alignof(uint128_t));
61+
62+
if (_size)
63+
*_size = sizeof(pool_t) + elt_size * num_elts;
64+
if (_align)
65+
*_align = alignof(pool_t);
66+
}
67+
68+
pool_t *pool_mem_init(void *mem, size_t num_elts, size_t elt_size)
69+
{
70+
if (elt_size == 0)
71+
return NULL;
72+
elt_size = LF_ALIGN_UP(elt_size, alignof(uint128_t));
73+
74+
pool_t *pool = (pool_t *) mem;
75+
pool->num_elts = num_elts;
76+
pool->elt_size = elt_size;
77+
pool->tag_next = 0;
78+
pool->head = LF_REF_NULL;
79+
80+
char *ptr = pool->mem + num_elts * elt_size;
81+
for (size_t i = num_elts; i > 0; i--) {
82+
ptr -= elt_size;
83+
pool_release(pool, ptr);
84+
}
85+
86+
return pool;
87+
}

0 commit comments

Comments
 (0)