Skip to content

Commit a311be8

Browse files
committed
lf-queue: Stick to C11 atomics primitives
1 parent 0bc2c33 commit a311be8

File tree

3 files changed

+31
-53
lines changed

3 files changed

+31
-53
lines changed

lf-queue/atomics.h

-21
This file was deleted.

lf-queue/lfq.c

+16-14
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
#include <assert.h>
22
#include <errno.h>
3+
#include <stdatomic.h>
34
#include <stdbool.h>
5+
#include <stdlib.h>
6+
#include <string.h>
47

5-
#include "atomics.h"
68
#include "lfq.h"
79

810
#define MAX_FREE 150
@@ -20,14 +22,14 @@ static bool in_hp(struct lfq_ctx *ctx, struct lfq_node *node)
2022
static void insert_pool(struct lfq_ctx *ctx, struct lfq_node *node)
2123
{
2224
atomic_store(&node->free_next, NULL);
23-
struct lfq_node *old_tail = XCHG(&ctx->fpt, node); /* seq_cst */
25+
struct lfq_node *old_tail = atomic_exchange(&ctx->fpt, node); /* seq_cst */
2426
atomic_store(&old_tail->free_next, node);
2527
}
2628

2729
static void free_pool(struct lfq_ctx *ctx, bool freeall)
2830
{
2931
bool old = 0;
30-
if (!CAS(&ctx->is_freeing, &old, 1))
32+
if (!atomic_compare_exchange_strong(&ctx->is_freeing, &old, 1))
3133
return;
3234

3335
for (int i = 0; i < MAX_FREE || freeall; i++) {
@@ -39,20 +41,20 @@ static void free_pool(struct lfq_ctx *ctx, bool freeall)
3941
free(p);
4042
}
4143
atomic_store(&ctx->is_freeing, false);
42-
smp_mb();
44+
atomic_thread_fence(memory_order_seq_cst);
4345
}
4446

4547
static void safe_free(struct lfq_ctx *ctx, struct lfq_node *node)
4648
{
4749
if (atomic_load(&node->can_free) && !in_hp(ctx, node)) {
4850
/* free is not thread-safe */
4951
bool old = 0;
50-
if (CAS(&ctx->is_freeing, &old, 1)) {
52+
if (atomic_compare_exchange_strong(&ctx->is_freeing, &old, 1)) {
5153
/* poison the pointer to detect use-after-free */
5254
node->next = (void *) -1;
5355
free(node); /* we got the lock; actually free */
5456
atomic_store(&ctx->is_freeing, false);
55-
smp_mb();
57+
atomic_thread_fence(memory_order_seq_cst);
5658
} else /* we did not get the lock; only add to a freelist */
5759
insert_pool(ctx, node);
5860
} else
@@ -65,7 +67,7 @@ static int alloc_tid(struct lfq_ctx *ctx)
6567
for (int i = 0; i < ctx->MAX_HP_SIZE; i++) {
6668
if (ctx->tid_map[i] == 0) {
6769
int old = 0;
68-
if (CAS(&ctx->tid_map[i], &old, 1))
70+
if (atomic_compare_exchange_strong(&ctx->tid_map[i], &old, 1))
6971
return i;
7072
}
7173
}
@@ -141,7 +143,7 @@ int lfq_enqueue(struct lfq_ctx *ctx, void *data)
141143
return -errno;
142144

143145
insert_node->data = data;
144-
struct lfq_node *old_tail = XCHG(&ctx->tail, insert_node);
146+
struct lfq_node *old_tail = atomic_exchange(&ctx->tail, insert_node);
145147
/* We have claimed our spot in the insertion order by modifying tail.
146148
* we are the only inserting thread with a pointer to the old tail.
147149
*
@@ -162,13 +164,13 @@ void *lfq_dequeue_tid(struct lfq_ctx *ctx, int tid)
162164
/* HP[tid] is necessary for deallocation. */
163165
do {
164166
retry:
165-
/* continue jumps to the bottom of the loop, and would attempt a CAS
166-
* with uninitialized new_head.
167+
/* continue jumps to the bottom of the loop, and would attempt a
168+
* atomic_compare_exchange_strong with uninitialized new_head.
167169
*/
168170
old_head = atomic_load(&ctx->head);
169171

170172
atomic_store(&ctx->HP[tid], old_head);
171-
mb();
173+
atomic_thread_fence(memory_order_seq_cst);
172174

173175
/* another thread freed it before seeing our HP[tid] store */
174176
if (old_head != atomic_load(&ctx->head))
@@ -179,7 +181,7 @@ void *lfq_dequeue_tid(struct lfq_ctx *ctx, int tid)
179181
atomic_store(&ctx->HP[tid], 0);
180182
return NULL; /* never remove the last node */
181183
}
182-
} while (!CAS(&ctx->head, &old_head, new_head));
184+
} while (!atomic_compare_exchange_strong(&ctx->head, &old_head, new_head));
183185

184186
/* We have atomically advanced head, and we are the thread that won the race
185187
* to claim a node. We return the data from the *new* head. The list starts
@@ -191,7 +193,7 @@ void *lfq_dequeue_tid(struct lfq_ctx *ctx, int tid)
191193
atomic_store(&new_head->can_free, true);
192194

193195
/* we need to avoid freeing until other readers are definitely not going to
194-
* load its ->next in the CAS loop
196+
* load its ->next in the atomic_compare_exchange_strong loop
195197
*/
196198
safe_free(ctx, (struct lfq_node *) old_head);
197199

@@ -208,4 +210,4 @@ void *lfq_dequeue(struct lfq_ctx *ctx)
208210
void *ret = lfq_dequeue_tid(ctx, tid);
209211
free_tid(ctx, tid);
210212
return ret;
211-
}
213+
}

lf-queue/test.c

+15-18
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
#define _GNU_SOURCE
22
#include <inttypes.h>
33
#include <pthread.h>
4+
#include <stdatomic.h>
45
#include <stdint.h>
56
#include <stdio.h>
67
#include <stdlib.h>
78
#include <string.h>
8-
#include <time.h>
99
#include <unistd.h>
1010

11-
#include "atomics.h"
1211
#include "lfq.h"
1312

1413
#ifndef MAX_PRODUCER
@@ -33,20 +32,20 @@ struct user_data {
3332
void *add_queue(void *data)
3433
{
3534
struct lfq_ctx *ctx = data;
36-
int ret = 0;
3735
long added;
3836
for (added = 0; added < 500000; added++) {
3937
struct user_data *p = malloc(sizeof(struct user_data));
4038
p->data = SOME_ID;
39+
int ret = 0;
4140
if ((ret = lfq_enqueue(ctx, p)) != 0) {
4241
printf("lfq_enqueue failed, reason:%s\n", strerror(-ret));
43-
ATOMIC_ADD(&cnt_added, added);
44-
ATOMIC_SUB(&cnt_producer, 1);
42+
atomic_fetch_add(&cnt_added, added);
43+
atomic_fetch_sub(&cnt_producer, 1);
4544
return 0;
4645
}
4746
}
48-
ATOMIC_ADD(&cnt_added, added);
49-
ATOMIC_SUB(&cnt_producer, 1);
47+
atomic_fetch_add(&cnt_added, added);
48+
atomic_fetch_sub(&cnt_producer, 1);
5049
printf("Producer thread [%lu] exited! Still %d running...\n",
5150
pthread_self(), atomic_load(&cnt_producer));
5251
return 0;
@@ -55,11 +54,10 @@ void *add_queue(void *data)
5554
void *remove_queue(void *data)
5655
{
5756
struct lfq_ctx *ctx = data;
58-
struct user_data *p;
59-
int tid = ATOMIC_ADD(&cnt_thread, 1);
57+
int tid = atomic_fetch_add(&cnt_thread, 1);
6058
long deleted = 0;
6159
while (1) {
62-
p = lfq_dequeue_tid(ctx, tid);
60+
struct user_data *p = lfq_dequeue_tid(ctx, tid);
6361
if (p) {
6462
if (p->data != SOME_ID) {
6563
printf("data wrong!!\n");
@@ -69,13 +67,12 @@ void *remove_queue(void *data)
6967
free(p);
7068
deleted++;
7169
} else {
72-
if (ctx->count || atomic_load(&cnt_producer))
73-
sched_yield(); /* queue is empty, release CPU slice */
74-
else
75-
break; /* queue is empty and no more producers */
70+
if (!ctx->count && !atomic_load(&cnt_producer))
71+
break; /* queue is empty and no more producers */
72+
sched_yield(); /* queue is empty, release CPU slice */
7673
}
7774
}
78-
ATOMIC_ADD(&cnt_removed, deleted);
75+
atomic_fetch_add(&cnt_removed, deleted);
7976

8077
printf("Consumer thread [%lu] exited %d\n", pthread_self(), cnt_producer);
8178
return 0;
@@ -88,17 +85,17 @@ int main()
8885

8986
pthread_t thread_cons[MAX_CONSUMER], thread_pros[MAX_PRODUCER];
9087

91-
ATOMIC_ADD(&cnt_producer, 1);
88+
atomic_fetch_add(&cnt_producer, 1);
9289
for (int i = 0; i < MAX_CONSUMER; i++) {
9390
pthread_create(&thread_cons[i], NULL, remove_queue, (void *) &ctx);
9491
}
9592

9693
for (int i = 0; i < MAX_PRODUCER; i++) {
97-
ATOMIC_ADD(&cnt_producer, 1);
94+
atomic_fetch_add(&cnt_producer, 1);
9895
pthread_create(&thread_pros[i], NULL, add_queue, (void *) &ctx);
9996
}
10097

101-
ATOMIC_SUB(&cnt_producer, 1);
98+
atomic_fetch_sub(&cnt_producer, 1);
10299

103100
for (int i = 0; i < MAX_PRODUCER; i++)
104101
pthread_join(thread_pros[i], NULL);

0 commit comments

Comments
 (0)