Skip to content

Commit 719846f

Browse files
committed
Wait-free size for lock-free data structures
1 parent 0668f83 commit 719846f

16 files changed

+707
-1
lines changed

LICENSE.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
Copyright (c) 2016 KC Sivaramakrishnan
22
Copyright (C) 2022 Thomas Leonard
3+
Copyright (C) 2023 Vesa Karvonen
34

45
Permission to use, copy, modify, and/or distribute this software for any
56
purpose with or without fee is hereby granted, provided that the above

dune-project

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
(ocaml (>= 4.13))
1515
(domain_shims (and (>= 0.1.0) :with-test))
1616
(saturn_lockfree (= :version))
17+
(multicore-magic (and (>= 2.1.0) :with-test))
1718
(alcotest (and (>= 1.7.0) :with-test))
1819
(qcheck (and (>= 0.21.3) :with-test))
1920
(qcheck-stm (and (>= 0.3) :with-test))
@@ -27,6 +28,7 @@
2728
(ocaml (>= 4.13))
2829
(domain_shims (and (>= 0.1.0) :with-test))
2930
(backoff (>= 0.1.0))
31+
(multicore-magic (>= 2.1.0))
3032
(alcotest (and (>= 1.7.0) :with-test))
3133
(qcheck (and (>= 0.21.3) :with-test))
3234
(qcheck-core (and (>= 0.21.3) :with-test))

saturn.opam

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ depends: [
1313
"ocaml" {>= "4.13"}
1414
"domain_shims" {>= "0.1.0" & with-test}
1515
"saturn_lockfree" {= version}
16+
"multicore-magic" {>= "2.1.0" & with-test}
1617
"alcotest" {>= "1.7.0" & with-test}
1718
"qcheck" {>= "0.21.3" & with-test}
1819
"qcheck-stm" {>= "0.3" & with-test}

saturn_lockfree.opam

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ depends: [
1212
"ocaml" {>= "4.13"}
1313
"domain_shims" {>= "0.1.0" & with-test}
1414
"backoff" {>= "0.1.0"}
15+
"multicore-magic" {>= "2.1.0"}
1516
"alcotest" {>= "1.7.0" & with-test}
1617
"qcheck" {>= "0.21.3" & with-test}
1718
"qcheck-core" {>= "0.21.3" & with-test}

src_lockfree/dune

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ let () =
1010
(library
1111
(name saturn_lockfree)
1212
(public_name saturn_lockfree)
13-
(libraries backoff |}
13+
(libraries backoff multicore-magic |}
1414
^ maybe_threads
1515
^ {| ))
1616

src_lockfree/saturn_lockfree.ml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,4 @@ module Work_stealing_deque = Ws_deque
3232
module Single_prod_single_cons_queue = Spsc_queue
3333
module Single_consumer_queue = Mpsc_queue
3434
module Relaxed_queue = Mpmc_relaxed_queue
35+
module Size = Size

src_lockfree/saturn_lockfree.mli

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,4 @@ module Work_stealing_deque = Ws_deque
3636
module Single_prod_single_cons_queue = Spsc_queue
3737
module Single_consumer_queue = Mpsc_queue
3838
module Relaxed_queue = Mpmc_relaxed_queue
39+
module Size = Size

src_lockfree/size.ml

Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
(* Copyright (c) 2023 Vesa Karvonen
2+
3+
Permission to use, copy, modify, and/or distribute this software for any
4+
purpose with or without fee is hereby granted, provided that the above
5+
copyright notice and this permission notice appear in all copies.
6+
7+
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
8+
REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
9+
AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
10+
INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
11+
LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
12+
OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
13+
PERFORMANCE OF THIS SOFTWARE. *)
14+
15+
(** ⚠️ Beware that this implementation uses a bunch of low level data
16+
representation tricks to minimize overheads. *)
17+
18+
module Atomic = Transparent_atomic
19+
20+
let max_value = Int.max_int
21+
22+
module Snapshot = struct
23+
type t = int Atomic.t array
24+
(** We use an optimized flat representation where the first element of the
25+
array holds the status of the snapshot.
26+
27+
+--------+---------+---------+---------+- - -
28+
| status | counter | counter | counter | ...
29+
+--------+---------+---------+---------+- - -
30+
31+
The status is either {!collecting}, {!computing}, or a non-negative value.
32+
33+
The counter snapshot values are initialized to a negative value and after
34+
collecting they will all be non-negative. *)
35+
36+
let zero = [| Atomic.make 0 |]
37+
let collecting = -1
38+
let computing = -2
39+
40+
let[@inline] is_collecting (s : t) =
41+
Atomic.get (Array.unsafe_get s 0) = collecting
42+
43+
let create n = Array.init n @@ fun _ -> Atomic.make collecting
44+
45+
let[@inline] set s i after =
46+
let snap = Array.unsafe_get s i in
47+
let before = Atomic.get snap in
48+
if
49+
before = collecting
50+
|| (* NOTE: The condition below accounts for overflow. *)
51+
(after - before - 1) land max_value < max_value / 2
52+
then Atomic.compare_and_set snap before after |> ignore
53+
54+
let[@inline] forward s i after =
55+
let snap = Array.unsafe_get s i in
56+
while
57+
let before = Atomic.get snap in
58+
(before = collecting
59+
|| (* NOTE: The condition below accounts for overflow. *)
60+
(after - before - 1) land max_value < max_value / 2)
61+
&& not (Atomic.compare_and_set snap before after)
62+
do
63+
()
64+
done
65+
66+
let rec compute s sum i =
67+
if 0 < i then
68+
(* NOTE: Operations below are in specific order for performance. *)
69+
let decr = Array.unsafe_get s i in
70+
let incr = Array.unsafe_get s (i + 1) in
71+
let decr = Atomic.get decr in
72+
let incr = Atomic.get incr in
73+
compute s (sum - decr + incr) (i - 2)
74+
else sum land max_value
75+
76+
let compute s = compute s 0 (Array.length s - 2)
77+
78+
let compute s =
79+
let status = Array.unsafe_get s 0 in
80+
if Atomic.get status = collecting then
81+
Atomic.compare_and_set status collecting computing |> ignore;
82+
if Atomic.get status = computing then begin
83+
let computed = compute s in
84+
if Atomic.get status = computing then
85+
Atomic.compare_and_set status computing computed |> ignore
86+
end;
87+
Atomic.get status
88+
end
89+
90+
type _ state =
91+
| Open : { mutable index : int } -> [ `Open ] state
92+
| Used : [ `Used ] state
93+
94+
let used_index = 0
95+
96+
type tx = { value : int; once : [ `Open ] state }
97+
type t = tx Atomic.t array Atomic.t
98+
99+
(** We use an optimized flat representation where the first element of the array
100+
holds a reference to the snapshot and the other elements are the counters.
101+
102+
+----------+------+------+------+------+- - -
103+
| snapshot | decr | incr | decr | incr | ...
104+
+----------+------+------+------+------+- - -
105+
106+
Counters at odd numbered indices are for [decr]ements and the counters at
107+
even numbered indices are for [incr]ements.
108+
109+
A counter refers to a unique [tx] record. *)
110+
111+
let[@inline] snapshot_of txs : Snapshot.t Atomic.t =
112+
Obj.magic (Array.unsafe_get txs 0)
113+
114+
(* *)
115+
116+
let zero = { value = 0; once = Open { index = used_index } }
117+
118+
let create () =
119+
Array.init
120+
((1 * 2) + 1)
121+
(fun i ->
122+
Atomic.make (if i = 0 then Obj.magic Snapshot.zero else zero)
123+
|> Multicore_magic.copy_as_padded)
124+
|> Atomic.make |> Multicore_magic.copy_as_padded
125+
126+
(* *)
127+
128+
type once = Once : _ state -> once [@@unboxed]
129+
130+
let get_index (Open r) = r.index
131+
let use_index (Open r) = r.index <- used_index
132+
133+
(* *)
134+
135+
let used_once = Once Used
136+
137+
(* *)
138+
139+
type update = int
140+
141+
let decr = 1
142+
let incr = 2
143+
144+
let rec new_once t update =
145+
let index = (Multicore_magic.instantaneous_domain_index () * 2) + update in
146+
let txs = Atomic.fenceless_get t in
147+
let n = Array.length txs in
148+
if index < n then Once (Open { index })
149+
else
150+
let txs_new =
151+
(* The length of [txs_new] will be a power of two minus 1, which means the
152+
whole heap block will have a power of two number of words, which may
153+
help to keep it cache line aligned. *)
154+
Array.init ((n * 2) + 1) @@ fun i ->
155+
if i = 0 then
156+
Obj.magic (Multicore_magic.copy_as_padded @@ Atomic.make Snapshot.zero)
157+
else if i < n then Array.unsafe_get txs i
158+
else Multicore_magic.copy_as_padded (Atomic.make zero)
159+
in
160+
Atomic.compare_and_set t txs txs_new |> ignore;
161+
new_once t update
162+
163+
let new_once t update =
164+
let index = (Multicore_magic.instantaneous_domain_index () * 2) + update in
165+
let txs = Atomic.fenceless_get t in
166+
if index < Array.length txs then Once (Open { index }) else new_once t update
167+
168+
(* *)
169+
170+
let rec update_once txs once counter =
171+
let before = Atomic.get counter in
172+
let index = get_index once in
173+
let before_once = before.once in
174+
if index != used_index && before_once != once then begin
175+
use_index before_once;
176+
let value = (before.value + 1) land max_value in
177+
let after = { value; once } in
178+
if Atomic.compare_and_set counter before after then begin
179+
let snapshot = Atomic.get (snapshot_of txs) in
180+
if Snapshot.is_collecting snapshot then
181+
Snapshot.forward snapshot index value
182+
end
183+
else update_once txs once (Array.unsafe_get txs index)
184+
end
185+
186+
let update_once t once =
187+
match once with
188+
| Once Used -> ()
189+
| Once (Open _ as once) ->
190+
let index = get_index once in
191+
if index != used_index then
192+
let txs = Atomic.fenceless_get t in
193+
update_once txs once (Array.unsafe_get txs index)
194+
195+
(* *)
196+
197+
let get_collecting_snapshot txs =
198+
let snapshot = snapshot_of txs in
199+
let before = Atomic.get snapshot in
200+
if Snapshot.is_collecting before then before
201+
else
202+
let after = Snapshot.create (Array.length txs) in
203+
if Atomic.compare_and_set snapshot before after then after
204+
else Atomic.get snapshot
205+
206+
let rec collect txs snapshot i =
207+
if 0 < i then begin
208+
let after = Atomic.get (Array.unsafe_get txs i) in
209+
Snapshot.set snapshot i after.value;
210+
collect txs snapshot (i - 1)
211+
end
212+
213+
let rec get t =
214+
let txs = Atomic.fenceless_get t in
215+
let snapshot = get_collecting_snapshot txs in
216+
collect txs snapshot (Array.length txs - 1);
217+
let size = Snapshot.compute snapshot in
218+
if Atomic.fenceless_get t == txs then size else get t

0 commit comments

Comments
 (0)