Skip to content

Commit d8cde00

Browse files
committed
Add a concurrent hashmap implementation
1 parent 068c67d commit d8cde00

10 files changed

+728
-0
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ list-move/bench-lock
2222
list-move/bench-lockfree
2323
hp_list/list
2424
lfring/lfring
25+
hashmap/test-hashmap
2526

2627
# external source files
2728
preempt_sched/list.h

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ purpose of these programs is to be illustrative and educational.
2121
- [lfring](lfring/): A lock-free multiple-producer/multiple-consumer (MPMC) ring buffer.
2222
- [ringbuf\_shm](ringbuf-shm/): An optimized lock-free ring buffer with shared memory.
2323
- [mbus](mbus/): A concurrent message bus.
24+
- [hashmap](hashmap/): A concurrent hashmap implementation.
2425
* [Synchronization](https://en.wikipedia.org/wiki/Synchronization_(computer_science))
2526
- [hp\_list](hp_list): A concurrent linked list utilizing Hazard Pointers.
2627
- [rcu\_list](rcu_list/): A concurrent linked list utilizing the simplified RCU algorithm.

hashmap/Makefile

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
.PHONY: all clean
2+
TARGET = test-hashmap
3+
all: $(TARGET)
4+
5+
include common.mk
6+
7+
CFLAGS = -I.
8+
CFLAGS += -O2 -g
9+
CFLAGS += -std=gnu11 -Wall
10+
11+
LDFLAGS = -lpthread
12+
13+
# standard build rules
14+
.SUFFIXES: .o .c
15+
.c.o:
16+
$(VECHO) " CC\t$@\n"
17+
$(Q)$(CC) -o $@ $(CFLAGS) -c -MMD -MF $@.d $<
18+
19+
OBJS = \
20+
free_later.o \
21+
hashmap.o \
22+
test-hashmap.o
23+
24+
deps += $(OBJS:%.o=%.o.d)
25+
26+
$(TARGET): $(OBJS)
27+
$(VECHO) " LD\t$@\n"
28+
$(Q)$(CC) -o $@ $^ $(LDFLAGS)
29+
30+
check: $(TARGET)
31+
$(Q)./$^ && $(call pass)
32+
33+
clean:
34+
$(VECHO) " Cleaning...\n"
35+
$(Q)$(RM) $(TARGET) $(OBJS) $(deps)
36+
37+
-include $(deps)

hashmap/README.md

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# hashmap: A concurrent hashmap implementation
2+
3+
Internal state is managed by the hashmap to populate buckets with linked lists.
4+
`hashmap_keyval` nodes are allocated as needed; however, free'ing of those
5+
structs can not be done immediately during a `hashmap_del` call. Other threads
6+
may be concurrently using the `hashmap_keyval`.
7+
8+
`hashmap_del` may return a value. It will only do it once per delete, and the
9+
calling thread must buffer the pointer for appropriate later cleanup. It can
10+
not immediatley be free'd because other threads may also be using the same
11+
pointer.

hashmap/common.mk

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
UNAME_S := $(shell uname -s)
2+
ifeq ($(UNAME_S),Darwin)
3+
PRINTF = printf
4+
else
5+
PRINTF = env printf
6+
endif
7+
8+
# Control the build verbosity
9+
ifeq ("$(VERBOSE)","1")
10+
Q :=
11+
VECHO = @true
12+
else
13+
Q := @
14+
VECHO = @$(PRINTF)
15+
endif
16+
17+
PASS_COLOR = \e[32;01m
18+
NO_COLOR = \e[0m
19+
20+
pass = $(PRINTF) "$(PASS_COLOR)$1 Passed$(NO_COLOR)\n"

hashmap/free_later.c

+162
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
#include <stdbool.h>
2+
#include <stddef.h>
3+
#include <stdint.h>
4+
#include <stdlib.h>
5+
6+
typedef struct list_node {
7+
struct list_node *next;
8+
void *val;
9+
} list_node_t;
10+
11+
typedef struct {
12+
list_node_t *head;
13+
uint32_t length;
14+
} list_t;
15+
16+
static volatile uint32_t list_retries_empty = 0, list_retries_populated = 0;
17+
static const list_node_t *empty = NULL;
18+
19+
static list_t *list_new()
20+
{
21+
list_t *l = calloc(1, sizeof(list_node_t));
22+
l->head = (list_node_t *) empty;
23+
l->length = 0;
24+
return l;
25+
}
26+
27+
static void list_add(list_t *l, void *val)
28+
{
29+
/* wrap the value as a node in the linked list */
30+
list_node_t *v = calloc(1, sizeof(list_node_t));
31+
v->val = val;
32+
33+
/* try adding to the front of the list */
34+
while (true) {
35+
list_node_t *n = l->head;
36+
if (n == empty) { /* if this is the first link in the list */
37+
v->next = NULL;
38+
if (__atomic_compare_exchange(&l->head, &empty, &v, false,
39+
__ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
40+
__atomic_fetch_add(&l->length, 1, __ATOMIC_SEQ_CST);
41+
return;
42+
}
43+
list_retries_empty++;
44+
} else { /* inserting when an existing link is present */
45+
v->next = n;
46+
if (__atomic_compare_exchange(&l->head, &n, &v, false,
47+
__ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
48+
__atomic_fetch_add(&l->length, 1, __ATOMIC_SEQ_CST);
49+
return;
50+
}
51+
list_retries_populated++;
52+
}
53+
}
54+
}
55+
56+
#include "free_later.h"
57+
58+
#define CAS(a, b, c) \
59+
__extension__({ \
60+
typeof(*a) _old = b, _new = c; \
61+
__atomic_compare_exchange(a, &_old, &_new, 0, __ATOMIC_SEQ_CST, \
62+
__ATOMIC_SEQ_CST); \
63+
_old; \
64+
})
65+
66+
static inline void acquire_lock(volatile bool *lock)
67+
{
68+
while (CAS(lock, false, true))
69+
;
70+
}
71+
72+
static inline void release_lock(volatile bool *lock)
73+
{
74+
int l = *lock;
75+
CAS(&l, true, false);
76+
}
77+
78+
typedef struct {
79+
void *var;
80+
void (*free)(void *var);
81+
} free_later_t;
82+
83+
/* track expired variables to cleanup later */
84+
static list_t *buffer = NULL, *buffer_prev = NULL;
85+
86+
int free_later_init()
87+
{
88+
buffer = list_new();
89+
return 0;
90+
}
91+
92+
/* register a var for cleanup */
93+
void free_later(void *var, void release(void *var))
94+
{
95+
free_later_t *cv = malloc(sizeof(free_later_t));
96+
cv->var = var;
97+
cv->free = release;
98+
list_add(buffer, cv);
99+
}
100+
101+
/* signal that worker threads are done with old references */
102+
void free_later_stage(void)
103+
{
104+
/* lock to ensure that multiple threads do not clean up simultaneously */
105+
static bool lock = false;
106+
107+
/* CAS-based lock in case multiple threads are calling this */
108+
acquire_lock(&lock);
109+
110+
if (!buffer_prev || buffer_prev->length == 0) {
111+
release_lock(&lock);
112+
return;
113+
}
114+
115+
/* swap the buffers */
116+
buffer_prev = buffer;
117+
buffer = list_new();
118+
119+
release_lock(&lock);
120+
}
121+
122+
void free_later_run()
123+
{
124+
/* lock to ensure that multiple threads do not clean up simultaneously */
125+
static bool lock = false;
126+
127+
/* skip if there is nothing to return */
128+
if (!buffer_prev)
129+
return;
130+
131+
/* CAS-based lock in case multiple threads are calling this */
132+
acquire_lock(&lock);
133+
134+
/* At this point, all workers have processed one or more new flow since the
135+
* free_later buffer was filled. No threads are using the old, deleted data.
136+
*/
137+
for (list_node_t *n = buffer_prev->head; n; n = n->next) {
138+
free_later_t *v = n->val;
139+
v->free(v->var);
140+
free(n);
141+
}
142+
143+
free(buffer_prev);
144+
buffer_prev = NULL;
145+
146+
release_lock(&lock);
147+
}
148+
149+
int free_later_exit()
150+
{
151+
/* purge anything that is buffered */
152+
free_later_run();
153+
154+
/* stage and purge anything that was unbuffered */
155+
free_later_stage();
156+
free_later_run();
157+
158+
/* release memory for the buffer */
159+
free(buffer);
160+
buffer = NULL;
161+
return 0;
162+
}

hashmap/free_later.h

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/* Memory cleanup for lock-free deletes
2+
*
3+
* Several data structures such as `hashmap_del` will remove data; however, it
4+
* may not be possible to free data until later. For example, if many threads
5+
* are using the same hashmap, more than one may be using a reference when
6+
* `hashmap_del` is called. The solution here is to have `hashmap_del` register
7+
* data that can be deleted later and let the application notify when worker
8+
* threads are done with old references.
9+
*
10+
* `free_later(void *var, void release(void *))` will register a pointer to have
11+
* the `release` method called on it later, when it is safe to free memory.
12+
*
13+
* `free_later_init()` must be called before using `free_later`, and
14+
* `free_later_exit()` should be called before application termination. It'll
15+
* ensure all registerd vars have their `release()` callback invoked.
16+
*
17+
* `free_later_stage()` should be called before a round of work starts. It'll
18+
* stage all buffered values to a list that can't be updated, and make a new
19+
* list to register any new `free_later()` invocations. After all worker threads
20+
* have progressed with work, call `free_later_run()` to have every value in the
21+
* staged buffer released.
22+
*/
23+
24+
#ifndef _FREE_LATER_H_
25+
#define _FREE_LATER_H_
26+
27+
/* _init() must be called before use and _exit() once at the end */
28+
int free_later_init(void);
29+
int free_later_exit(void);
30+
31+
/* add a var to the cleanup later list */
32+
void free_later(void *var, void release(void *var));
33+
34+
#endif

0 commit comments

Comments
 (0)