Skip to content

Commit f2b848d

Browse files
committed
CP-52745: Add ThreadLocalStorage in Threadext
Adds a `ThreadLocalStorage` module under `Threadext`. Currently, it uses `Ambient_local.Thread_local` as the underlying implementation. Following the thread classification PRs, #6154, this will enable accessing the current thread group for each thread. The current data structure contains the following: 1. ocaml_tid 2. thread_name 3. time_running 4. tepoch 5. tgroup Field 1: `ocaml_tid` is equivalent to `Thread.self () |> Thread.id.` Field 2: `thread_name` is to associet threads with human readable string. Fields 3-5: - `time_runinng` - the amount of time the thread has been running in the current OCaml runtime timeslice, - `tepoch` - the current timeslice the thread has been scheduled for, - `tgroup` - current thread classification. Fields 3-5 are what is expected to be used for thread scheduling when xapi is under load. This can be extended in the future to contain information about tracing, such as `traceparent` and `baggage`. Signed-off-by: Gabriel Buica <[email protected]>
1 parent 012bead commit f2b848d

File tree

8 files changed

+198
-8
lines changed

8 files changed

+198
-8
lines changed

Diff for: dune-project

+2
Original file line numberDiff line numberDiff line change
@@ -711,12 +711,14 @@ This package provides an Lwt compatible interface to the library.")
711711
(synopsis "Xapi's standard library extension, Threads")
712712
(authors "Jonathan Ludlam")
713713
(depends
714+
ambient-context
714715
base-threads
715716
base-unix
716717
(alcotest :with-test)
717718
(clock (= :version))
718719
(fmt :with-test)
719720
mtime
721+
tgroup
720722
(xapi-log (= :version))
721723
(xapi-stdext-pervasives (= :version))
722724
(xapi-stdext-unix (= :version))

Diff for: ocaml/libs/tgroup/tgroup.ml

+5
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,11 @@ module Group = struct
253253
External.name // External.Unauthenticated.name
254254

255255
let to_string g = match g with Group group -> to_cgroup group
256+
257+
let authenticated_root =
258+
of_creator (Creator.make ~identity:Identity.root_identity ())
259+
260+
let unauthenticated = Group External_Unauthenticated
256261
end
257262

258263
module Cgroup = struct

Diff for: ocaml/libs/tgroup/tgroup.mli

+10-2
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ module Group : sig
3737

3838
val of_string : string -> t
3939
(** [of_string s] creates an originator from a string [s].
40-
40+
4141
e.g create an originator based on a http header. *)
4242

4343
val to_string : t -> string
@@ -76,6 +76,14 @@ module Group : sig
7676

7777
val to_string : t -> string
7878
(** [to_string g] returns the string representation of the group [g].*)
79+
80+
val authenticated_root : t
81+
(** [authenticated_root] represents the main classification of internal xapi
82+
threads. *)
83+
84+
val unauthenticated : t
85+
(** [unauthenticated] represents the classification of xapi threads for
86+
unauthenticated users. *)
7987
end
8088

8189
(** [Cgroup] module encapsulates different function for managing the cgroups
@@ -87,7 +95,7 @@ module Cgroup : sig
8795
val dir_of : Group.t -> t option
8896
(** [dir_of group] returns the full path of the cgroup directory corresponding
8997
to the group [group] as [Some dir].
90-
98+
9199
Returns [None] if [init dir] has not been called. *)
92100

93101
val init : string -> unit

Diff for: ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune

+23-5
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,47 @@
11
(library
22
(public_name xapi-stdext-threads)
3-
(name xapi_stdext_threads)
3+
(name xapi_stdext_threads)
44
(modules :standard \ ipq scheduler threadext_test ipq_test scheduler_test)
55
(libraries
6+
ambient-context.thread_local
67
mtime
78
mtime.clock.os
89
threads.posix
910
unix
11+
tgroup
1012
xapi-stdext-unix
1113
xapi-stdext-pervasives)
1214
(foreign_stubs
1315
(language c)
14-
(names delay_stubs))
16+
(names delay_stubs)
17+
)
1518
)
1619

1720
(library
1821
(public_name xapi-stdext-threads.scheduler)
1922
(name xapi_stdext_threads_scheduler)
2023
(modules ipq scheduler)
21-
(libraries mtime mtime.clock.os threads.posix unix xapi-log xapi-stdext-threads clock)
22-
)
24+
(libraries
25+
mtime
26+
mtime.clock.os
27+
threads.posix
28+
unix
29+
xapi-log
30+
xapi-stdext-threads
31+
clock)
32+
)
2333

2434
(tests
2535
(names threadext_test ipq_test scheduler_test)
2636
(package xapi-stdext-threads)
2737
(modules threadext_test ipq_test scheduler_test)
28-
(libraries xapi_stdext_threads alcotest mtime.clock.os mtime fmt threads.posix xapi_stdext_threads_scheduler)
38+
(libraries
39+
xapi_stdext_threads
40+
alcotest
41+
mtime.clock.os
42+
mtime
43+
fmt
44+
tgroup
45+
threads.posix
46+
xapi_stdext_threads_scheduler)
2947
)

Diff for: ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.ml

+38
Original file line numberDiff line numberDiff line change
@@ -97,3 +97,41 @@ let wait_timed_write fd timeout =
9797
true
9898
| _ ->
9999
assert false
100+
101+
module ThreadRuntimeContext = struct
102+
type t = {
103+
ocaml_tid: int
104+
; thread_name: string
105+
; mutable time_running: int
106+
; mutable tepoch: int
107+
; tgroup: Tgroup.Group.t
108+
}
109+
110+
(*The documentation for Ambient_context_thread_local isn't really clear is
111+
this context. thread_local_storage is a global variable shared by all
112+
threads. It is a map with keys, the thread IDs and values the above
113+
defined data structure.*)
114+
let thread_local_storage = Ambient_context_thread_local.Thread_local.create ()
115+
116+
let create ?(thread_name = "") () =
117+
let ocaml_tid = Thread.self () |> Thread.id in
118+
let time_running = 0 in
119+
let tepoch = 0 in
120+
let tgroup = Tgroup.Group.authenticated_root in
121+
let tls = {thread_name; tgroup; ocaml_tid; time_running; tepoch} in
122+
let () =
123+
Ambient_context_thread_local.Thread_local.set thread_local_storage tls
124+
in
125+
tls
126+
127+
let get () =
128+
Ambient_context_thread_local.Thread_local.get_or_create ~create
129+
thread_local_storage
130+
131+
let update f context =
132+
f context
133+
|> Ambient_context_thread_local.Thread_local.set thread_local_storage
134+
135+
let remove () =
136+
Ambient_context_thread_local.Thread_local.remove thread_local_storage
137+
end

Diff for: ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.mli

+24
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,27 @@ end
4343
val wait_timed_read : Unix.file_descr -> float -> bool
4444

4545
val wait_timed_write : Unix.file_descr -> float -> bool
46+
47+
module ThreadRuntimeContext : sig
48+
type t = {
49+
ocaml_tid: int
50+
; thread_name: string
51+
; mutable time_running: int
52+
; mutable tepoch: int
53+
; tgroup: Tgroup.Group.t
54+
}
55+
56+
val create : ?thread_name:string -> unit -> t
57+
(** [create ()] creates and returns an initial thread local strorage for the
58+
current thread. *)
59+
60+
val get : unit -> t
61+
(** [get ()] returns the current thread local storage. *)
62+
63+
val update : (t -> t) -> t -> unit
64+
(** [update fn thread_ctx] updates the thread local storage based on
65+
the supplied arguments. *)
66+
67+
val remove : unit -> unit
68+
(** [remove ()] removes the thread local storage of the current thread. *)
69+
end

Diff for: ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext_test.ml

+94-1
Original file line numberDiff line numberDiff line change
@@ -74,4 +74,97 @@ let tests =
7474
; ("other_thread", `Quick, other_thread)
7575
]
7676

77-
let () = Alcotest.run "Threadext" [("Delay", tests)]
77+
let test_create_ambient_storage () =
78+
let open Xapi_stdext_threads.Threadext in
79+
let _ : Thread.t =
80+
Thread.create
81+
(fun () ->
82+
let storage = ThreadRuntimeContext.create () in
83+
let storage_tid = storage.ocaml_tid in
84+
let ocaml_tid = Thread.self () |> Thread.id in
85+
Alcotest.(check int)
86+
"Ocaml thread id matches the thread id stored" ocaml_tid storage_tid
87+
)
88+
()
89+
in
90+
()
91+
92+
let test_thread_storage_update_and_get () =
93+
let open Xapi_stdext_threads.Threadext in
94+
let _ : Thread.t =
95+
Thread.create
96+
(fun () ->
97+
let context : ThreadRuntimeContext.t = ThreadRuntimeContext.create () in
98+
99+
let expected_name = "thread_1" in
100+
ThreadRuntimeContext.update
101+
(fun t -> {t with thread_name= expected_name})
102+
context ;
103+
let storage = ThreadRuntimeContext.get () in
104+
Alcotest.(check string)
105+
"Check if correct value is set in storage" expected_name
106+
storage.thread_name
107+
)
108+
()
109+
in
110+
()
111+
112+
let test_storage_locality () =
113+
let open Xapi_stdext_threads.Threadext in
114+
let r1 = ref None in
115+
let r2 = ref None in
116+
117+
let thread1_expected_name = "thread_1" in
118+
let thread2_expected_name = "thread_2" in
119+
120+
let thread1 =
121+
Thread.create
122+
(fun () ->
123+
let context = ThreadRuntimeContext.create () in
124+
ThreadRuntimeContext.update
125+
(fun t -> {t with thread_name= thread1_expected_name})
126+
context ;
127+
Thread.delay 1. ;
128+
r1 := Some (ThreadRuntimeContext.get ())
129+
)
130+
()
131+
in
132+
let thread2 =
133+
Thread.create
134+
(fun () ->
135+
let context = ThreadRuntimeContext.create () in
136+
ThreadRuntimeContext.update
137+
(fun t -> {t with thread_name= thread2_expected_name})
138+
context ;
139+
140+
r2 := Some (ThreadRuntimeContext.get ())
141+
)
142+
()
143+
in
144+
Thread.join thread1 ;
145+
Thread.join thread2 ;
146+
Alcotest.(check bool)
147+
"Check thread local storage is set for thread1" true (Option.is_some !r1) ;
148+
Alcotest.(check bool)
149+
"Check thread local storage is set for thread2" true (Option.is_some !r2) ;
150+
let thread1_name =
151+
let r1 = Option.get !r1 in
152+
r1.thread_name
153+
in
154+
let thread2_name =
155+
let r2 = Option.get !r2 in
156+
r2.thread_name
157+
in
158+
Alcotest.(check string) "Thread1 name" thread1_expected_name thread1_name ;
159+
Alcotest.(check string) "Thread2 name" thread2_expected_name thread2_name
160+
161+
let tls_tests =
162+
[
163+
("create storage", `Quick, test_create_ambient_storage)
164+
; ("storage update and get", `Quick, test_thread_storage_update_and_get)
165+
; ("thread local storage", `Quick, test_storage_locality)
166+
]
167+
168+
let () =
169+
Alcotest.run "Threadext"
170+
[("Delay", tests); ("ThreadRuntimeContext", tls_tests)]

Diff for: opam/xapi-stdext-threads.opam

+2
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@ homepage: "https://xapi-project.github.io/"
88
bug-reports: "https://github.com/xapi-project/xen-api/issues"
99
depends: [
1010
"dune" {>= "3.15"}
11+
"ambient-context"
1112
"base-threads"
1213
"base-unix"
1314
"alcotest" {with-test}
1415
"clock" {= version}
1516
"fmt" {with-test}
1617
"mtime"
18+
"tgroup"
1719
"xapi-log" {= version}
1820
"xapi-stdext-pervasives" {= version}
1921
"xapi-stdext-unix" {= version}

0 commit comments

Comments
 (0)