Skip to content

Commit 02394c5

Browse files
committed
Add a work-stealing scheduler
1 parent e2b7d34 commit 02394c5

File tree

3 files changed

+308
-0
lines changed

3 files changed

+308
-0
lines changed

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ purpose of these programs is to be illustrative and educational.
1313
* Multi-threading Paradigms
1414
- [tpool](tpool/): A lightweight thread pool.
1515
- [refcnt](refcnt/): A generic reference counting.
16+
- [work-steal](work-steal/): A work-stealing scheduler.
1617
* [Producer–consumer problem](https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem)
1718
- [spmc](spmc/): A concurrent single-producer/multiple-consumer queue.
1819
- [mpsc](mpsc/): An unbounded lockless single-consumer/multiple-producer FIFO queue.

work-steal/Makefile

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
all:
2+
gcc -O2 -Wall -std=c11 -o work-steal work-steal.c -lpthread
3+
4+
clean:
5+
rm -f work-steal

work-steal/work-steal.c

+302
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,302 @@
1+
/* A work-stealing scheduler is described in
2+
* Robert D. Blumofe, Christopher F. Joerg, Bradley C. Kuszmaul, Charles E.
3+
* Leiserson, Keith H. Randall, and Yuli Zhou. Cilk: An efficient multithreaded
4+
* runtime system. In Proceedings of the Fifth ACM SIGPLAN Symposium on
5+
* Principles and Practice of Parallel Programming (PPoPP), pages 207-216,
6+
* Santa Barbara, California, July 1995.
7+
* http://supertech.csail.mit.edu/papers/PPoPP95.pdf
8+
*
9+
* However, that refers to an outdated model of Cilk; an update appears in
10+
* the essential idea of work stealing mentioned in Leiserson and Platt,
11+
* Programming Parallel Applications in Cilk
12+
*/
13+
14+
#include <assert.h>
15+
#include <pthread.h>
16+
#include <stdatomic.h>
17+
#include <stdbool.h>
18+
#include <stdio.h>
19+
#include <stdlib.h>
20+
21+
struct work_internal;
22+
23+
/* A 'task_t' represents a function pointer that accepts a pointer to a 'work_t'
24+
* struct as input and returns another 'work_t' struct as output. The input to
25+
* this function is always a pointer to the encompassing 'work_t' struct.
26+
*
27+
* It is worth considering whether to include information about the executing
28+
* thread's identifier when invoking the task. This information might be
29+
* beneficial for supporting thread-local accumulators in cases of commutative
30+
* reductions. Additionally, it could be useful to determine the destination
31+
* worker's queue for appending further tasks.
32+
*
33+
* The 'task_t' trampoline is responsible for delivering the subsequent unit of
34+
* work to be executed. It returns the next work item if it is prepared for
35+
* execution, or NULL if the task is not ready to proceed.
36+
*/
37+
typedef struct work_internal *(*task_t)(struct work_internal *);
38+
39+
typedef struct work_internal {
40+
task_t code;
41+
atomic_int join_count;
42+
void *args[];
43+
} work_t;
44+
45+
/* These are non-NULL pointers that will result in page faults under normal
46+
* circumstances, used to verify that nobody uses non-initialized entries.
47+
*/
48+
static work_t *EMPTY = (work_t *) 0x100, *ABORT = (work_t *) 0x200;
49+
50+
/* work_t-stealing deque */
51+
52+
typedef struct {
53+
atomic_size_t size;
54+
_Atomic work_t *buffer[];
55+
} array_t;
56+
57+
typedef struct {
58+
/* Assume that they never overflow */
59+
atomic_size_t top, bottom;
60+
_Atomic(array_t *) array;
61+
} deque_t;
62+
63+
void init(deque_t *q, int size_hint)
64+
{
65+
atomic_init(&q->top, 0);
66+
atomic_init(&q->bottom, 0);
67+
array_t *a = malloc(sizeof(array_t) + sizeof(work_t *) * size_hint);
68+
atomic_init(&a->size, size_hint);
69+
atomic_init(&q->array, a);
70+
}
71+
72+
void resize(deque_t *q)
73+
{
74+
array_t *a = atomic_load_explicit(&q->array, memory_order_relaxed);
75+
size_t old_size = a->size;
76+
size_t new_size = old_size * 2;
77+
array_t *new = malloc(sizeof(array_t) + sizeof(work_t *) * new_size);
78+
atomic_init(&new->size, new_size);
79+
size_t t = atomic_load_explicit(&q->top, memory_order_relaxed);
80+
size_t b = atomic_load_explicit(&q->bottom, memory_order_relaxed);
81+
for (size_t i = t; i < b; i++)
82+
new->buffer[i % new_size] = a->buffer[i % old_size];
83+
84+
atomic_store_explicit(&q->array, new, memory_order_relaxed);
85+
/* The question arises as to the appropriate timing for releasing memory
86+
* associated with the previous array denoted by *a. In the original Chase
87+
* and Lev paper, this task was undertaken by the garbage collector, which
88+
* presumably possessed knowledge about ongoing steal operations by other
89+
* threads that might attempt to access data within the array.
90+
*
91+
* In our context, the responsible deallocation of *a cannot occur at this
92+
* point, as another thread could potentially be in the process of reading
93+
* from it. Thus, we opt to abstain from freeing *a in this context,
94+
* resulting in memory leakage. It is worth noting that our expansion
95+
* strategy for these queues involves consistent doubling of their size;
96+
* this design choice ensures that any leaked memory remains bounded by the
97+
* memory actively employed by the functional queues.
98+
*/
99+
}
100+
101+
work_t *take(deque_t *q)
102+
{
103+
size_t b = atomic_load_explicit(&q->bottom, memory_order_relaxed) - 1;
104+
array_t *a = atomic_load_explicit(&q->array, memory_order_relaxed);
105+
atomic_store_explicit(&q->bottom, b, memory_order_relaxed);
106+
atomic_thread_fence(memory_order_seq_cst);
107+
size_t t = atomic_load_explicit(&q->top, memory_order_relaxed);
108+
work_t *x;
109+
if (t <= b) {
110+
/* Non-empty queue */
111+
x = atomic_load_explicit(&a->buffer[b % a->size], memory_order_relaxed);
112+
if (t == b) {
113+
/* Single last element in queue */
114+
if (!atomic_compare_exchange_strong_explicit(&q->top, &t, t + 1,
115+
memory_order_seq_cst,
116+
memory_order_relaxed))
117+
/* Failed race */
118+
x = EMPTY;
119+
atomic_store_explicit(&q->bottom, b + 1, memory_order_relaxed);
120+
}
121+
} else { /* Empty queue */
122+
x = EMPTY;
123+
atomic_store_explicit(&q->bottom, b + 1, memory_order_relaxed);
124+
}
125+
return x;
126+
}
127+
128+
void push(deque_t *q, work_t *w)
129+
{
130+
size_t b = atomic_load_explicit(&q->bottom, memory_order_relaxed);
131+
size_t t = atomic_load_explicit(&q->top, memory_order_acquire);
132+
array_t *a = atomic_load_explicit(&q->array, memory_order_relaxed);
133+
if (b - t > a->size - 1) { /* Full queue */
134+
resize(q);
135+
a = atomic_load_explicit(&q->array, memory_order_relaxed);
136+
}
137+
atomic_store_explicit(&a->buffer[b % a->size], w, memory_order_relaxed);
138+
atomic_thread_fence(memory_order_release);
139+
atomic_store_explicit(&q->bottom, b + 1, memory_order_relaxed);
140+
}
141+
142+
work_t *steal(deque_t *q)
143+
{
144+
size_t t = atomic_load_explicit(&q->top, memory_order_acquire);
145+
atomic_thread_fence(memory_order_seq_cst);
146+
size_t b = atomic_load_explicit(&q->bottom, memory_order_acquire);
147+
work_t *x = EMPTY;
148+
if (t < b) {
149+
/* Non-empty queue */
150+
array_t *a = atomic_load_explicit(&q->array, memory_order_consume);
151+
x = atomic_load_explicit(&a->buffer[t % a->size], memory_order_relaxed);
152+
if (!atomic_compare_exchange_strong_explicit(
153+
&q->top, &t, t + 1, memory_order_seq_cst, memory_order_relaxed))
154+
/* Failed race */
155+
return ABORT;
156+
}
157+
return x;
158+
}
159+
160+
#define N_THREADS 24
161+
deque_t *thread_queues;
162+
163+
atomic_bool done;
164+
165+
/* Returns the subsequent item available for processing, or NULL if no items
166+
* are remaining.
167+
*/
168+
static work_t *do_one_work(int id, work_t *work)
169+
{
170+
printf("work item %d running item %p\n", id, work);
171+
return (*(work->code)) (work);
172+
}
173+
174+
void do_work(int id, work_t *work)
175+
{
176+
while (work)
177+
work = do_one_work(id, work);
178+
}
179+
180+
/* Returns the next item to be processed, or NULL if there are no remaining
181+
* items.
182+
*/
183+
work_t *join_work(work_t *work)
184+
{
185+
int old_join_count = atomic_fetch_sub(&work->join_count, 1);
186+
if (old_join_count == 1)
187+
return work;
188+
return NULL;
189+
}
190+
191+
void *thread(void *payload)
192+
{
193+
int id = *(int *) payload;
194+
deque_t *my_queue = &thread_queues[id];
195+
while (true) {
196+
work_t *work = take(my_queue);
197+
if (work != EMPTY) {
198+
do_work(id, work);
199+
} else {
200+
/* Currently, there is no work present in my own queue */
201+
work_t *stolen = EMPTY;
202+
for (int i = 0; i < N_THREADS; ++i) {
203+
if (i == id)
204+
continue;
205+
stolen = steal(&thread_queues[i]);
206+
if (stolen == ABORT) {
207+
i--;
208+
continue; /* Try again at the same i */
209+
} else if (stolen == EMPTY)
210+
continue;
211+
212+
/* Found some work to do */
213+
break;
214+
}
215+
if (stolen == EMPTY) {
216+
/* Despite the previous observation of all queues being devoid
217+
* of tasks during the last examination, there exists
218+
* a possibility that additional work items have been introduced
219+
* subsequently. To account for this scenario, a state of active
220+
* waiting is adopted, wherein the program continues to loop
221+
* until the global "done" flag becomes set, indicative of
222+
* potential new work additions.
223+
*/
224+
if (atomic_load(&done))
225+
break;
226+
continue;
227+
} else {
228+
do_work(id, stolen);
229+
}
230+
}
231+
}
232+
printf("work item %d finished\n", id);
233+
return NULL;
234+
}
235+
236+
work_t *print_task(work_t *w)
237+
{
238+
int *payload = (int *) w->args[0];
239+
int item = *payload;
240+
printf("Did item %p with payload %d\n", w, item);
241+
work_t *cont = (work_t *) w->args[1];
242+
free(payload);
243+
free(w);
244+
return join_work(cont);
245+
}
246+
247+
work_t *done_task(work_t *w)
248+
{
249+
free(w);
250+
atomic_store(&done, true);
251+
return NULL;
252+
}
253+
254+
int main(int argc, char **argv)
255+
{
256+
/* Check that top and bottom are 64-bit so they never overflow */
257+
static_assert(sizeof(atomic_size_t) == 8,
258+
"Assume atomic_size_t is 8 byte wide");
259+
260+
pthread_t threads[N_THREADS];
261+
int tids[N_THREADS];
262+
thread_queues = malloc(N_THREADS * sizeof(deque_t));
263+
int nprints = 10;
264+
265+
atomic_store(&done, false);
266+
work_t *done_work = malloc(sizeof(work_t));
267+
done_work->code = &done_task;
268+
done_work->join_count = N_THREADS * nprints;
269+
270+
for (int i = 0; i < N_THREADS; ++i) {
271+
tids[i] = i;
272+
init(&thread_queues[i], 8);
273+
for (int j = 0; j < nprints; ++j) {
274+
work_t *work = malloc(sizeof(work_t) + 2 * sizeof(int *));
275+
work->code = &print_task;
276+
work->join_count = 0;
277+
int *payload = malloc(sizeof(int));
278+
*payload = 1000 * i + j;
279+
work->args[0] = payload;
280+
work->args[1] = done_work;
281+
push(&thread_queues[i], work);
282+
}
283+
}
284+
285+
for (int i = 0; i < N_THREADS; ++i) {
286+
if (pthread_create(&threads[i], NULL, thread, &tids[i]) != 0) {
287+
perror("Failed to start the thread");
288+
exit(EXIT_FAILURE);
289+
}
290+
}
291+
292+
for (int i = 0; i < N_THREADS; ++i) {
293+
if (pthread_join(threads[i], NULL) != 0) {
294+
perror("Failed to join the thread");
295+
exit(EXIT_FAILURE);
296+
}
297+
}
298+
printf("Expect %d lines of output (including this one)\n",
299+
2 * N_THREADS * nprints + N_THREADS + 2);
300+
301+
return 0;
302+
}

0 commit comments

Comments
 (0)