Skip to content

Commit fd73065

Browse files
committed
mpmc: Enable Arm build
1 parent 51787a6 commit fd73065

File tree

2 files changed

+68
-68
lines changed

2 files changed

+68
-68
lines changed

mpmc/Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
all:
2-
$(CC) -Wall -std=c11 -o mpmc mpmc.c -lpthread
2+
$(CC) -Wall -O2 -std=c11 -o mpmc mpmc.c -lpthread
33

44
indent:
55
clang-format -i mpmc.c

mpmc/mpmc.c

+67-67
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
#define _GNU_SOURCE
77
#endif
88

9-
#include <malloc.h>
109
#include <pthread.h>
1110
#include <stdbool.h>
1211
#include <stdio.h>
@@ -15,10 +14,10 @@
1514

1615
#define PAGE_SIZE 4096 /* FIXME: avoid hard-coded */
1716
#define CACHE_LINE_SIZE 64 /* FIXME: make it configurable */
18-
#define CACHE_ALIGNED __attribute__((aligned(CACHE_LINE_SIZE)))
19-
#define DOUBLE_CACHE_ALIGNED __attribute__((aligned(2 * CACHE_LINE_SIZE)))
17+
#define __CACHE_ALIGNED __attribute__((aligned(CACHE_LINE_SIZE)))
18+
#define __DOUBLE___CACHE_ALIGNED __attribute__((aligned(2 * CACHE_LINE_SIZE)))
2019

21-
static inline void *align_malloc(size_t align, size_t size)
20+
static inline void *align_alloc(size_t align, size_t size)
2221
{
2322
void *ptr;
2423
int ret = posix_memalign(&ptr, align, size);
@@ -30,32 +29,31 @@ static inline void *align_malloc(size_t align, size_t size)
3029
}
3130

3231
#define N (1 << 12) /* node size */
33-
#define NBITS (N - 1)
32+
#define N_BITS (N - 1)
3433

3534
typedef struct __node {
36-
struct __node *volatile next DOUBLE_CACHE_ALIGNED;
37-
long id DOUBLE_CACHE_ALIGNED;
38-
void *volatile cells[N] DOUBLE_CACHE_ALIGNED;
35+
struct __node *volatile next __DOUBLE___CACHE_ALIGNED;
36+
long id __DOUBLE___CACHE_ALIGNED;
37+
void *cells[N] __DOUBLE___CACHE_ALIGNED;
3938
} node_t;
4039

41-
#define HANDLES 128 /* support 127 threads */
40+
#define N_HANDLES 128 /* support 127 threads */
4241

4342
typedef struct {
4443
node_t *spare;
4544

46-
node_t *volatile put_node CACHE_ALIGNED;
47-
node_t *volatile pop_node CACHE_ALIGNED;
45+
node_t *volatile push __CACHE_ALIGNED;
46+
node_t *volatile pop __CACHE_ALIGNED;
4847
} handle_t;
4948

5049
typedef struct {
5150
node_t *init_node;
52-
volatile long init_id DOUBLE_CACHE_ALIGNED;
51+
volatile long init_id __DOUBLE___CACHE_ALIGNED;
5352

54-
volatile long put_index DOUBLE_CACHE_ALIGNED;
55-
volatile long pop_index DOUBLE_CACHE_ALIGNED;
53+
volatile long put_index __DOUBLE___CACHE_ALIGNED;
54+
volatile long pop_index __DOUBLE___CACHE_ALIGNED;
5655

57-
handle_t *volatile enq_handles[HANDLES];
58-
handle_t *volatile deq_handles[HANDLES];
56+
handle_t *enqueue_handles[N_HANDLES], *dequeue_handles[N_HANDLES];
5957

6058
int threshold;
6159

@@ -64,24 +62,24 @@ typedef struct {
6462

6563
static inline node_t *mpmc_new_node()
6664
{
67-
node_t *n = align_malloc(PAGE_SIZE, sizeof(node_t));
65+
node_t *n = align_alloc(PAGE_SIZE, sizeof(node_t));
6866
memset(n, 0, sizeof(node_t));
6967
return n;
7068
}
7169

7270
enum queue_ops {
73-
ENQ = 1 << 1,
74-
DEQ = 1 << 0,
71+
DEQUEUE = 1 << 0,
72+
ENQUEUE = 1 << 1,
7573
};
7674

7775
/* register the enqueuers first, dequeuers second. */
7876
void mpmc_queue_register(mpmc_t *q, handle_t *th, int flag)
7977
{
8078
th->spare = mpmc_new_node();
81-
th->put_node = th->pop_node = q->init_node;
79+
th->push = th->pop = q->init_node;
8280

83-
if (flag & ENQ) {
84-
handle_t **tail = q->enq_handles;
81+
if (flag & ENQUEUE) {
82+
handle_t **tail = q->enqueue_handles;
8583
for (int i = 0;; ++i) {
8684
handle_t *init = NULL;
8785
if (!tail[i] &&
@@ -93,8 +91,8 @@ void mpmc_queue_register(mpmc_t *q, handle_t *th, int flag)
9391
pthread_barrier_wait(&q->enq_barrier);
9492
}
9593

96-
if (flag & DEQ) {
97-
handle_t **tail = q->deq_handles;
94+
if (flag & DEQUEUE) {
95+
handle_t **tail = q->dequeue_handles;
9896
for (int i = 0;; ++i) {
9997
handle_t *init = NULL;
10098
if (!tail[i] &&
@@ -157,10 +155,10 @@ static void *mpmc_find_cell(node_t *volatile *ptr, long i, handle_t *th)
157155
*ptr = curr; /* update our node to the present node */
158156

159157
/* Orders processor execution, so other thread can see the '*ptr = curr' */
160-
__asm("sfence" ::: "cc", "memory"); /* FIXME: x86-only */
158+
__atomic_thread_fence(__ATOMIC_SEQ_CST);
161159

162160
/* now we get the needed cell, its' node is curr and index is i % N */
163-
return &curr->cells[i & NBITS];
161+
return &curr->cells[i & N_BITS];
164162
}
165163

166164
#include <linux/futex.h>
@@ -183,8 +181,7 @@ void mpmc_enqueue(mpmc_t *q, handle_t *th, void *v)
183181
{
184182
/* return the needed index */
185183
void *volatile *c = mpmc_find_cell(
186-
&th->put_node, __atomic_fetch_add(&q->put_index, 1, __ATOMIC_SEQ_CST),
187-
th);
184+
&th->push, __atomic_fetch_add(&q->put_index, 1, __ATOMIC_SEQ_CST), th);
188185
/* __atomic_fetch_add(ptr, val) is an atomic fetch-and-add that also
189186
* ensures sequential consistency
190187
*/
@@ -214,15 +211,19 @@ void *mpmc_dequeue(mpmc_t *q, handle_t *th)
214211
long index = __atomic_fetch_add(&q->pop_index, 1, __ATOMIC_SEQ_CST);
215212

216213
/* locate the needed cell */
217-
void *volatile *c = mpmc_find_cell(&th->pop_node, index, th);
214+
void *volatile *c = mpmc_find_cell(&th->pop, index, th);
218215

219216
/* because the queue is a blocking queue, so we just use more spin. */
220217
int times = (1 << 20);
221218
do {
222219
cv = *c;
223220
if (cv)
224221
goto over;
225-
__asm__("pause"); /* FIXME: x86-only */
222+
#if defined(__i386__) || defined(__x86_64__)
223+
__asm__ __volatile__("pause");
224+
#elif defined(__aarch64__) || defined(__arm__)
225+
__asm__ __volatile__("isb\n");
226+
#endif
226227
} while (times-- > 0);
227228

228229
/* XCHG, if return NULL so this cell is NULL, we just wait and observe the
@@ -236,20 +237,20 @@ void *mpmc_dequeue(mpmc_t *q, handle_t *th)
236237
mpmc_futex_wait(&futex_addr, 1);
237238
} while (futex_addr == 1);
238239

239-
/* the counterpart put thread has change futex_addr's value to 0. and the
240-
* data has into cell(c).
240+
/* the counterpart put thread has change futex_addr's value to 0. and
241+
* the data has into cell(c).
241242
*/
242243
cv = *c;
243244
}
244245

245246
over:
246-
/* if the index is the node's last cell: (NBITS == 4095), it Try to reclaim
247+
/* if the index is the node's last cell: (N_BITS == 4095), it Try to reclaim
247248
* the memory. so we just take the smallest ID node that is not
248249
* reclaimed(init_node), and At the same time, by traversing the local data
249250
* of other threads, we get a larger ID node(min_node). So it is safe to
250251
* recycle the memory [init_node, min_node).
251252
*/
252-
if ((index & NBITS) == NBITS) {
253+
if ((index & N_BITS) == N_BITS) {
253254
/* __atomic_load_n(ptr, __ATOMIC_ACQUIRE) is a load with a following
254255
* acquire fence to ensure no following load and stores can start before
255256
* the current load completes.
@@ -260,33 +261,32 @@ void *mpmc_dequeue(mpmc_t *q, handle_t *th)
260261
* __ATOMIC_RELAXED) is an atomic compare-and-swap that ensures acquire
261262
* semantic when succeed or relaxed semantic when failed.
262263
*/
263-
if ((th->pop_node->id - init_index) >= q->threshold &&
264-
init_index >= 0 &&
264+
if ((th->pop->id - init_index) >= q->threshold && init_index >= 0 &&
265265
__atomic_compare_exchange_n(&q->init_id, &init_index, -1, 0,
266266
__ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
267267
node_t *init_node = q->init_node;
268-
th = q->deq_handles[0];
269-
node_t *min_node = th->pop_node;
268+
th = q->dequeue_handles[0];
269+
node_t *min_node = th->pop;
270270

271271
int i;
272-
handle_t *next = q->deq_handles[i = 1];
272+
handle_t *next = q->dequeue_handles[i = 1];
273273
while (next) {
274-
node_t *next_min = next->pop_node;
274+
node_t *next_min = next->pop;
275275
if (next_min->id < min_node->id)
276276
min_node = next_min;
277277
if (min_node->id <= init_index)
278278
break;
279-
next = q->deq_handles[++i];
279+
next = q->dequeue_handles[++i];
280280
}
281281

282-
next = q->enq_handles[i = 0];
282+
next = q->enqueue_handles[i = 0];
283283
while (next) {
284-
node_t *next_min = next->put_node;
284+
node_t *next_min = next->push;
285285
if (next_min->id < min_node->id)
286286
min_node = next_min;
287287
if (min_node->id <= init_index)
288288
break;
289-
next = q->enq_handles[++i];
289+
next = q->enqueue_handles[++i];
290290
}
291291

292292
long new_id = min_node->id;
@@ -322,33 +322,32 @@ static pthread_barrier_t prod_barrier, cons_barrier;
322322
static void *producer(void *index)
323323
{
324324
mpmc_t *q = &mpmc;
325-
handle_t *th = malloc(sizeof(handle_t));
326-
memset(th, 0, sizeof(handle_t));
327-
mpmc_queue_register(q, th, ENQ);
325+
handle_t *th = calloc(1, sizeof(handle_t));
326+
mpmc_queue_register(q, th, ENQUEUE);
328327

329328
for (;;) {
330329
pthread_barrier_wait(&prod_barrier);
331330
for (int i = 0; i < COUNTS_PER_THREAD; ++i)
332-
mpmc_enqueue(q, th, 1 + i + ((int) index) * COUNTS_PER_THREAD);
331+
mpmc_enqueue(
332+
q, th, (void *) 1 + i + ((intptr_t) index) * COUNTS_PER_THREAD);
333333
pthread_barrier_wait(&prod_barrier);
334334
}
335335
return NULL;
336336
}
337337

338-
#define THREAD_NUM 4
338+
#define N_THREADS 4
339339
static bool *array;
340340
static void *consumer(void *index)
341341
{
342342
mpmc_t *q = &mpmc;
343-
handle_t *th = malloc(sizeof(handle_t));
344-
memset(th, 0, sizeof(handle_t));
345-
mpmc_queue_register(q, th, DEQ);
343+
handle_t *th = calloc(1, sizeof(handle_t));
344+
mpmc_queue_register(q, th, DEQUEUE);
346345

347346
for (;;) {
348347
pthread_barrier_wait(&cons_barrier);
349348
for (long i = 0; i < COUNTS_PER_THREAD; ++i) {
350349
int value;
351-
if (!(value = mpmc_dequeue(q, th)))
350+
if (!(value = (intptr_t) mpmc_dequeue(q, th)))
352351
return NULL;
353352
array[value] = true;
354353
}
@@ -361,24 +360,25 @@ static void *consumer(void *index)
361360

362361
int main(int argc, char *argv[])
363362
{
364-
pthread_barrier_init(&prod_barrier, NULL, THREAD_NUM + 1);
365-
pthread_barrier_init(&cons_barrier, NULL, THREAD_NUM + 1);
363+
pthread_barrier_init(&prod_barrier, NULL, N_THREADS + 1);
364+
pthread_barrier_init(&cons_barrier, NULL, N_THREADS + 1);
366365
if (argc >= 3) {
367366
COUNTS_PER_THREAD = atol(argv[1]);
368367
threshold = atoi(argv[2]);
369368
}
370369

371-
printf("Amount: %ld\n", THREAD_NUM * COUNTS_PER_THREAD);
370+
printf("Amount: %ld\n", N_THREADS * COUNTS_PER_THREAD);
372371
fflush(stdout);
373-
array = malloc((1 + THREAD_NUM * COUNTS_PER_THREAD) * sizeof(bool));
374-
memset(array, 0, (1 + THREAD_NUM * COUNTS_PER_THREAD) * sizeof(bool));
375-
mpmc_init_queue(&mpmc, THREAD_NUM, THREAD_NUM, threshold);
372+
array = calloc(1, (1 + N_THREADS * COUNTS_PER_THREAD) * sizeof(bool));
373+
mpmc_init_queue(&mpmc, N_THREADS, N_THREADS, threshold);
376374

377-
pthread_t pids[THREAD_NUM];
375+
pthread_t pids[N_THREADS];
378376

379-
for (int i = 0; i < THREAD_NUM; ++i) {
380-
if (-1 == pthread_create(&pids[i], NULL, producer, i) ||
381-
-1 == pthread_create(&pids[i], NULL, consumer, i)) {
377+
for (int i = 0; i < N_THREADS; ++i) {
378+
if (-1 == pthread_create(&pids[i], NULL, producer,
379+
(void *) (intptr_t) i) ||
380+
-1 == pthread_create(&pids[i], NULL, consumer,
381+
(void *) (intptr_t) i)) {
382382
printf("error create thread\n");
383383
exit(1);
384384
}
@@ -388,7 +388,7 @@ int main(int argc, char *argv[])
388388
printf("\n#%d\n", i);
389389

390390
pthread_barrier_wait(&cons_barrier);
391-
sleep(1);
391+
usleep(1e5);
392392

393393
struct timeval start, prod_end;
394394
gettimeofday(&start, NULL);
@@ -398,7 +398,7 @@ int main(int argc, char *argv[])
398398
gettimeofday(&prod_end, NULL);
399399

400400
bool verify = true;
401-
for (int j = 1; j <= THREAD_NUM * COUNTS_PER_THREAD; ++j) {
401+
for (int j = 1; j <= N_THREADS * COUNTS_PER_THREAD; ++j) {
402402
if (!array[j]) {
403403
printf("Error: ints[%d]\n", j);
404404
verify = false;
@@ -407,13 +407,13 @@ int main(int argc, char *argv[])
407407
}
408408
if (verify)
409409
printf("ints[1-%ld] have been verified through\n",
410-
THREAD_NUM * COUNTS_PER_THREAD);
410+
N_THREADS * COUNTS_PER_THREAD);
411411
float cost_time = (prod_end.tv_sec - start.tv_sec) +
412412
(prod_end.tv_usec - start.tv_usec) / 1000000.0;
413413
printf("elapsed time: %f seconds\n", cost_time);
414414
printf("DONE #%d\n", i);
415415
fflush(stdout);
416-
memset(array, 0, (1 + THREAD_NUM * COUNTS_PER_THREAD) * sizeof(bool));
416+
memset(array, 0, (1 + N_THREADS * COUNTS_PER_THREAD) * sizeof(bool));
417417
}
418418
return 0;
419419
}

0 commit comments

Comments
 (0)