Skip to content

Commit 0645f28

Browse files
committed
CP-52745: Add ThreadLocalStorage in Threadext
Adds a `ThreadLocalStorage` module under `Threadext`. Currently, it uses `Ambient_local.Thread_local` as the underlying implementation. Following the thread classification PRs, #6154, this will enable accessing the current thread group for each thread. The current data structure contains the following: 1. ocaml_tid 2. thread_name 3. time_running 4. tepoch 5. tgroup Field 1: `ocaml_tid` is equivalent to `Thread.self () |> Thread.id.` Field 2: `thread_name` is to associet threads with human readable string. Fields 3-5: - `time_runinng` - the amount of time the thread has been running in the current OCaml runtime timeslice, - `tepoch` - the current timeslice the thread has been scheduled for, - `tgroup` - current thread classification. Fields 3-5 are what is expected to be used for thread scheduling when xapi is under load. This can be extended in the future to contain information about tracing, such as `traceparent` and `baggage`. Signed-off-by: Gabriel Buica <[email protected]>
1 parent 03c1780 commit 0645f28

File tree

4 files changed

+198
-25
lines changed

4 files changed

+198
-25
lines changed

Diff for: ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune

+38-24
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,43 @@
11
(library
2-
(public_name xapi-stdext-threads)
3-
(name xapi_stdext_threads)
4-
(modules :standard \ ipq scheduler threadext_test ipq_test scheduler_test)
5-
(libraries
6-
mtime
7-
mtime.clock.os
8-
threads.posix
9-
unix
10-
xapi-stdext-unix
11-
xapi-stdext-pervasives)
12-
(foreign_stubs
13-
(language c)
14-
(names delay_stubs))
15-
)
2+
(public_name xapi-stdext-threads)
3+
(name xapi_stdext_threads)
4+
(modules :standard \ ipq scheduler threadext_test ipq_test scheduler_test)
5+
(libraries
6+
ambient-context.thread_local
7+
mtime
8+
mtime.clock.os
9+
threads.posix
10+
unix
11+
tgroup
12+
xapi-stdext-unix
13+
xapi-stdext-pervasives)
14+
(foreign_stubs
15+
(language c)
16+
(names delay_stubs)))
1617

1718
(library
18-
(public_name xapi-stdext-threads.scheduler)
19-
(name xapi_stdext_threads_scheduler)
20-
(modules ipq scheduler)
21-
(libraries mtime mtime.clock.os threads.posix unix xapi-log xapi-stdext-threads clock)
22-
)
19+
(public_name xapi-stdext-threads.scheduler)
20+
(name xapi_stdext_threads_scheduler)
21+
(modules ipq scheduler)
22+
(libraries
23+
mtime
24+
mtime.clock.os
25+
threads.posix
26+
unix
27+
xapi-log
28+
xapi-stdext-threads
29+
clock))
2330

2431
(tests
25-
(names threadext_test ipq_test scheduler_test)
26-
(package xapi-stdext-threads)
27-
(modules threadext_test ipq_test scheduler_test)
28-
(libraries xapi_stdext_threads alcotest mtime.clock.os mtime fmt threads.posix xapi_stdext_threads_scheduler)
29-
)
32+
(names threadext_test ipq_test scheduler_test)
33+
(package xapi-stdext-threads)
34+
(modules threadext_test ipq_test scheduler_test)
35+
(libraries
36+
xapi_stdext_threads
37+
alcotest
38+
mtime.clock.os
39+
mtime
40+
fmt
41+
tgroup
42+
threads.posix
43+
xapi_stdext_threads_scheduler))

Diff for: ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.ml

+42
Original file line numberDiff line numberDiff line change
@@ -97,3 +97,45 @@ let wait_timed_write fd timeout =
9797
true
9898
| _ ->
9999
assert false
100+
101+
module ThreadRuntimeContext = struct
102+
type t = {
103+
ocaml_tid: int
104+
; thread_name: string
105+
; mutable time_running: Mtime.span
106+
; mutable tepoch: int
107+
; tgroup: Tgroup.Group.t
108+
}
109+
110+
(*The documentation for Ambient_context_thread_local isn't really clear is
111+
this context. thread_local_storage is a global variable shared by all
112+
threads. It is a map with keys, the thread IDs and values the above
113+
defined data structure.*)
114+
let thread_local_storage = Ambient_context_thread_local.Thread_local.create ()
115+
116+
let create ?(thread_name = "") () =
117+
let ocaml_tid = Thread.self () |> Thread.id in
118+
let time_running = Mtime.Span.zero in
119+
let tepoch = 0 in
120+
let tgroup =
121+
Tgroup.Group.(
122+
of_creator (Creator.make ~identity:Identity.root_identity ())
123+
)
124+
in
125+
let tls = {thread_name; tgroup; ocaml_tid; time_running; tepoch} in
126+
let () =
127+
Ambient_context_thread_local.Thread_local.set thread_local_storage tls
128+
in
129+
tls
130+
131+
let get () =
132+
Ambient_context_thread_local.Thread_local.get_or_create ~create
133+
thread_local_storage
134+
135+
let update f context =
136+
f context
137+
|> Ambient_context_thread_local.Thread_local.set thread_local_storage
138+
139+
let remove () =
140+
Ambient_context_thread_local.Thread_local.remove thread_local_storage
141+
end

Diff for: ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.mli

+24
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,27 @@ end
4343
val wait_timed_read : Unix.file_descr -> float -> bool
4444

4545
val wait_timed_write : Unix.file_descr -> float -> bool
46+
47+
module ThreadRuntimeContext : sig
48+
type t = {
49+
ocaml_tid: int
50+
; thread_name: string
51+
; mutable time_running: Mtime.span
52+
; mutable tepoch: int
53+
; tgroup: Tgroup.Group.t
54+
}
55+
56+
val create : ?thread_name:string -> unit -> t
57+
(** [create ()] creates and returns an initial thread local strorage for the
58+
current thread. *)
59+
60+
val get : unit -> t
61+
(** [get ()] returns the current thread local storage. *)
62+
63+
val update : (t -> t) -> t -> unit
64+
(** [update fn thread_ctx] updates the thread local storage based on
65+
the supplied arguments. *)
66+
67+
val remove : unit -> unit
68+
(** [remove ()] removes the thread local storage of the current thread. *)
69+
end

Diff for: ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext_test.ml

+94-1
Original file line numberDiff line numberDiff line change
@@ -74,4 +74,97 @@ let tests =
7474
; ("other_thread", `Quick, other_thread)
7575
]
7676

77-
let () = Alcotest.run "Threadext" [("Delay", tests)]
77+
let test_create_ambient_storage () =
78+
let open Xapi_stdext_threads.Threadext in
79+
let _ : Thread.t =
80+
Thread.create
81+
(fun () ->
82+
let storage = ThreadRuntimeContext.create () in
83+
let storage_tid = storage.ocaml_tid in
84+
let ocaml_tid = Thread.self () |> Thread.id in
85+
Alcotest.(check int)
86+
"Ocaml thread id matches the thread id stored" ocaml_tid storage_tid
87+
)
88+
()
89+
in
90+
()
91+
92+
let test_thread_storage_update_and_get () =
93+
let open Xapi_stdext_threads.Threadext in
94+
let _ : Thread.t =
95+
Thread.create
96+
(fun () ->
97+
let context : ThreadRuntimeContext.t = ThreadRuntimeContext.create () in
98+
99+
let expected_name = "thread_1" in
100+
ThreadRuntimeContext.update
101+
(fun t -> {t with thread_name= expected_name})
102+
context ;
103+
let storage = ThreadRuntimeContext.get () in
104+
Alcotest.(check string)
105+
"Check if correct value is set in storage" expected_name
106+
storage.thread_name
107+
)
108+
()
109+
in
110+
()
111+
112+
let test_storage_locality () =
113+
let open Xapi_stdext_threads.Threadext in
114+
let r1 = ref None in
115+
let r2 = ref None in
116+
117+
let thread1_expected_name = "thread_1" in
118+
let thread2_expected_name = "thread_2" in
119+
120+
let thread1 =
121+
Thread.create
122+
(fun () ->
123+
let context = ThreadRuntimeContext.create () in
124+
ThreadRuntimeContext.update
125+
(fun t -> {t with thread_name= thread1_expected_name})
126+
context ;
127+
Thread.delay 1. ;
128+
r1 := Some (ThreadRuntimeContext.get ())
129+
)
130+
()
131+
in
132+
let thread2 =
133+
Thread.create
134+
(fun () ->
135+
let context = ThreadRuntimeContext.create () in
136+
ThreadRuntimeContext.update
137+
(fun t -> {t with thread_name= thread2_expected_name})
138+
context ;
139+
140+
r2 := Some (ThreadRuntimeContext.get ())
141+
)
142+
()
143+
in
144+
Thread.join thread1 ;
145+
Thread.join thread2 ;
146+
Alcotest.(check bool)
147+
"Check thread local storage is set for thread1" true (Option.is_some !r1) ;
148+
Alcotest.(check bool)
149+
"Check thread local storage is set for thread2" true (Option.is_some !r2) ;
150+
let thread1_name =
151+
let r1 = Option.get !r1 in
152+
r1.thread_name
153+
in
154+
let thread2_name =
155+
let r2 = Option.get !r2 in
156+
r2.thread_name
157+
in
158+
Alcotest.(check string) "Thread1 name" thread1_expected_name thread1_name ;
159+
Alcotest.(check string) "Thread2 name" thread2_expected_name thread2_name
160+
161+
let tls_tests =
162+
[
163+
("create storage", `Quick, test_create_ambient_storage)
164+
; ("storage update and get", `Quick, test_thread_storage_update_and_get)
165+
; ("thread local storage", `Quick, test_storage_locality)
166+
]
167+
168+
let () =
169+
Alcotest.run "Threadext"
170+
[("Delay", tests); ("ThreadRuntimeContext", tls_tests)]

0 commit comments

Comments
 (0)