1
+ #include <assert.h>
2
+ #include <errno.h>
3
+ #include <stdbool.h>
4
+
5
+ #include "atomics.h"
6
+ #include "lfq.h"
7
+
8
+ #define MAX_FREE 150
9
+
10
+ static bool in_hp (struct lfq_ctx * ctx , struct lfq_node * node )
11
+ {
12
+ for (int i = 0 ; i < ctx -> MAX_HP_SIZE ; i ++ ) {
13
+ if (atomic_load (& ctx -> HP [i ]) == node )
14
+ return true;
15
+ }
16
+ return false;
17
+ }
18
+
19
+ /* add to tail of the free list */
20
+ static void insert_pool (struct lfq_ctx * ctx , struct lfq_node * node )
21
+ {
22
+ atomic_store (& node -> free_next , NULL );
23
+ struct lfq_node * old_tail = XCHG (& ctx -> fpt , node ); /* seq_cst */
24
+ atomic_store (& old_tail -> free_next , node );
25
+ }
26
+
27
+ static void free_pool (struct lfq_ctx * ctx , bool freeall )
28
+ {
29
+ bool old = 0 ;
30
+ if (!CAS (& ctx -> is_freeing , & old , 1 ))
31
+ return ;
32
+
33
+ for (int i = 0 ; i < MAX_FREE || freeall ; i ++ ) {
34
+ struct lfq_node * p = ctx -> fph ;
35
+ if ((!atomic_load (& p -> can_free )) || (!atomic_load (& p -> free_next )) ||
36
+ in_hp (ctx , (struct lfq_node * ) p ))
37
+ break ;
38
+ ctx -> fph = p -> free_next ;
39
+ free (p );
40
+ }
41
+ atomic_store (& ctx -> is_freeing , false);
42
+ smp_mb ();
43
+ }
44
+
45
+ static void safe_free (struct lfq_ctx * ctx , struct lfq_node * node )
46
+ {
47
+ if (atomic_load (& node -> can_free ) && !in_hp (ctx , node )) {
48
+ /* free is not thread-safe */
49
+ bool old = 0 ;
50
+ if (CAS (& ctx -> is_freeing , & old , 1 )) {
51
+ /* poison the pointer to detect use-after-free */
52
+ node -> next = (void * ) -1 ;
53
+ free (node ); /* we got the lock; actually free */
54
+ atomic_store (& ctx -> is_freeing , false);
55
+ smp_mb ();
56
+ } else /* we did not get the lock; only add to a freelist */
57
+ insert_pool (ctx , node );
58
+ } else
59
+ insert_pool (ctx , node );
60
+ free_pool (ctx , false);
61
+ }
62
+
63
+ static int alloc_tid (struct lfq_ctx * ctx )
64
+ {
65
+ for (int i = 0 ; i < ctx -> MAX_HP_SIZE ; i ++ ) {
66
+ if (ctx -> tid_map [i ] == 0 ) {
67
+ int old = 0 ;
68
+ if (CAS (& ctx -> tid_map [i ], & old , 1 ))
69
+ return i ;
70
+ }
71
+ }
72
+
73
+ return -1 ;
74
+ }
75
+
76
+ static void free_tid (struct lfq_ctx * ctx , int tid )
77
+ {
78
+ ctx -> tid_map [tid ] = 0 ;
79
+ }
80
+
81
+ int lfq_init (struct lfq_ctx * ctx , int max_consume_thread )
82
+ {
83
+ struct lfq_node * tmp = calloc (1 , sizeof (struct lfq_node ));
84
+ if (!tmp )
85
+ return - errno ;
86
+
87
+ struct lfq_node * node = calloc (1 , sizeof (struct lfq_node ));
88
+ if (!node )
89
+ return - errno ;
90
+
91
+ tmp -> can_free = node -> can_free = true;
92
+ memset (ctx , 0 , sizeof (struct lfq_ctx ));
93
+ ctx -> MAX_HP_SIZE = max_consume_thread ;
94
+ ctx -> HP = calloc (max_consume_thread , sizeof (struct lfq_node ));
95
+ ctx -> tid_map = calloc (max_consume_thread , sizeof (struct lfq_node ));
96
+ ctx -> head = ctx -> tail = tmp ;
97
+ ctx -> fph = ctx -> fpt = node ;
98
+
99
+ return 0 ;
100
+ }
101
+
102
+ long lfg_count_freelist (const struct lfq_ctx * ctx )
103
+ {
104
+ long count = 0 ;
105
+ for (struct lfq_node * p = (struct lfq_node * ) ctx -> fph ; p ; p = p -> free_next )
106
+ count ++ ;
107
+ return count ;
108
+ }
109
+
110
+ int lfq_release (struct lfq_ctx * ctx )
111
+ {
112
+ if (ctx -> tail && ctx -> head ) { /* if we have data in queue */
113
+ while ((struct lfq_node * ) ctx -> head ) { /* while still have node */
114
+ struct lfq_node * tmp = (struct lfq_node * ) ctx -> head -> next ;
115
+ safe_free (ctx , (struct lfq_node * ) ctx -> head );
116
+ ctx -> head = tmp ;
117
+ }
118
+ ctx -> tail = 0 ;
119
+ }
120
+ if (ctx -> fph && ctx -> fpt ) {
121
+ free_pool (ctx , true);
122
+ if (ctx -> fph != ctx -> fpt )
123
+ return -1 ;
124
+ free (ctx -> fpt ); /* free the empty node */
125
+ ctx -> fph = ctx -> fpt = 0 ;
126
+ }
127
+ if (ctx -> fph || ctx -> fpt )
128
+ return -1 ;
129
+
130
+ free (ctx -> HP );
131
+ free (ctx -> tid_map );
132
+ memset (ctx , 0 , sizeof (struct lfq_ctx ));
133
+
134
+ return 0 ;
135
+ }
136
+
137
+ int lfq_enqueue (struct lfq_ctx * ctx , void * data )
138
+ {
139
+ struct lfq_node * insert_node = calloc (1 , sizeof (struct lfq_node ));
140
+ if (!insert_node )
141
+ return - errno ;
142
+
143
+ insert_node -> data = data ;
144
+ struct lfq_node * old_tail = XCHG (& ctx -> tail , insert_node );
145
+ /* We have claimed our spot in the insertion order by modifying tail.
146
+ * we are the only inserting thread with a pointer to the old tail.
147
+ *
148
+ * Now we can make it part of the list by overwriting the NULL pointer in
149
+ * the old tail. This is safe whether or not other threads have updated
150
+ * ->next in our insert_node.
151
+ */
152
+ assert (!old_tail -> next && "old tail was not NULL" );
153
+ atomic_store (& old_tail -> next , insert_node );
154
+
155
+ return 0 ;
156
+ }
157
+
158
+ void * lfq_dequeue_tid (struct lfq_ctx * ctx , int tid )
159
+ {
160
+ struct lfq_node * old_head , * new_head ;
161
+
162
+ /* HP[tid] is necessary for deallocation. */
163
+ do {
164
+ retry :
165
+ /* continue jumps to the bottom of the loop, and would attempt a CAS
166
+ * with uninitialized new_head.
167
+ */
168
+ old_head = atomic_load (& ctx -> head );
169
+
170
+ atomic_store (& ctx -> HP [tid ], old_head );
171
+ mb ();
172
+
173
+ /* another thread freed it before seeing our HP[tid] store */
174
+ if (old_head != atomic_load (& ctx -> head ))
175
+ goto retry ;
176
+ new_head = atomic_load (& old_head -> next );
177
+
178
+ if (new_head == 0 ) {
179
+ atomic_store (& ctx -> HP [tid ], 0 );
180
+ return NULL ; /* never remove the last node */
181
+ }
182
+ } while (!CAS (& ctx -> head , & old_head , new_head ));
183
+
184
+ /* We have atomically advanced head, and we are the thread that won the race
185
+ * to claim a node. We return the data from the *new* head. The list starts
186
+ * off with a dummy node, so the current head is always a node that is
187
+ * already been read.
188
+ */
189
+ atomic_store (& ctx -> HP [tid ], 0 );
190
+ void * ret = new_head -> data ;
191
+ atomic_store (& new_head -> can_free , true);
192
+
193
+ /* we need to avoid freeing until other readers are definitely not going to
194
+ * load its ->next in the CAS loop
195
+ */
196
+ safe_free (ctx , (struct lfq_node * ) old_head );
197
+
198
+ return ret ;
199
+ }
200
+
201
+ void * lfq_dequeue (struct lfq_ctx * ctx )
202
+ {
203
+ int tid = alloc_tid (ctx );
204
+ /* To many thread race */
205
+ if (tid == -1 )
206
+ return (void * ) -1 ;
207
+
208
+ void * ret = lfq_dequeue_tid (ctx , tid );
209
+ free_tid (ctx , tid );
210
+ return ret ;
211
+ }
0 commit comments