Skip to content

CP-52745: Add ThreadLocalStorage in Threadext #6354

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -711,12 +711,14 @@ This package provides an Lwt compatible interface to the library.")
(synopsis "Xapi's standard library extension, Threads")
(authors "Jonathan Ludlam")
(depends
ambient-context
base-threads
base-unix
(alcotest :with-test)
(clock (= :version))
(fmt :with-test)
mtime
tgroup
(xapi-log (= :version))
(xapi-stdext-pervasives (= :version))
(xapi-stdext-unix (= :version))
Expand Down
5 changes: 5 additions & 0 deletions ocaml/libs/tgroup/tgroup.ml
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,11 @@ module Group = struct
External.name // External.Unauthenticated.name

let to_string g = match g with Group group -> to_cgroup group

let authenticated_root =
of_creator (Creator.make ~identity:Identity.root_identity ())

let unauthenticated = Group External_Unauthenticated
end

module Cgroup = struct
Expand Down
12 changes: 10 additions & 2 deletions ocaml/libs/tgroup/tgroup.mli
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ module Group : sig

val of_string : string -> t
(** [of_string s] creates an originator from a string [s].

e.g create an originator based on a http header. *)

val to_string : t -> string
Expand Down Expand Up @@ -76,6 +76,14 @@ module Group : sig

val to_string : t -> string
(** [to_string g] returns the string representation of the group [g].*)

val authenticated_root : t
(** [authenticated_root] represents the main classification of internal xapi
threads. *)

val unauthenticated : t
(** [unauthenticated] represents the classification of xapi threads for
unauthenticated users. *)
end

(** [Cgroup] module encapsulates different function for managing the cgroups
Expand All @@ -87,7 +95,7 @@ module Cgroup : sig
val dir_of : Group.t -> t option
(** [dir_of group] returns the full path of the cgroup directory corresponding
to the group [group] as [Some dir].

Returns [None] if [init dir] has not been called. *)

val init : string -> unit
Expand Down
28 changes: 23 additions & 5 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune
Original file line number Diff line number Diff line change
@@ -1,29 +1,47 @@
(library
(public_name xapi-stdext-threads)
(name xapi_stdext_threads)
(name xapi_stdext_threads)
(modules :standard \ ipq scheduler threadext_test ipq_test scheduler_test)
(libraries
ambient-context.thread_local
mtime
mtime.clock.os
threads.posix
unix
tgroup
xapi-stdext-unix
xapi-stdext-pervasives)
(foreign_stubs
(language c)
(names delay_stubs))
(names delay_stubs)
)
)

(library
(public_name xapi-stdext-threads.scheduler)
(name xapi_stdext_threads_scheduler)
(modules ipq scheduler)
(libraries mtime mtime.clock.os threads.posix unix xapi-log xapi-stdext-threads clock)
)
(libraries
mtime
mtime.clock.os
threads.posix
unix
xapi-log
xapi-stdext-threads
clock)
)

(tests
(names threadext_test ipq_test scheduler_test)
(package xapi-stdext-threads)
(modules threadext_test ipq_test scheduler_test)
(libraries xapi_stdext_threads alcotest mtime.clock.os mtime fmt threads.posix xapi_stdext_threads_scheduler)
(libraries
xapi_stdext_threads
alcotest
mtime.clock.os
mtime
fmt
tgroup
threads.posix
xapi_stdext_threads_scheduler)
)
38 changes: 38 additions & 0 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.ml
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,41 @@ let wait_timed_write fd timeout =
true
| _ ->
assert false

module ThreadRuntimeContext = struct
type t = {
ocaml_tid: int
; thread_name: string
; mutable time_running: int
; mutable tepoch: int
; tgroup: Tgroup.Group.t
}

(*The documentation for Ambient_context_thread_local isn't really clear is
this context. thread_local_storage is a global variable shared by all
threads. It is a map with keys, the thread IDs and values the above
defined data structure.*)
let thread_local_storage = Ambient_context_thread_local.Thread_local.create ()

let create ?(thread_name = "") () =
let ocaml_tid = Thread.self () |> Thread.id in
let time_running = 0 in
let tepoch = 0 in
let tgroup = Tgroup.Group.authenticated_root in
let tls = {thread_name; tgroup; ocaml_tid; time_running; tepoch} in
let () =
Ambient_context_thread_local.Thread_local.set thread_local_storage tls
in
tls

let get () =
Ambient_context_thread_local.Thread_local.get_or_create ~create
thread_local_storage

let update f context =
f context
|> Ambient_context_thread_local.Thread_local.set thread_local_storage

let remove () =
Ambient_context_thread_local.Thread_local.remove thread_local_storage
end
24 changes: 24 additions & 0 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.mli
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,27 @@ end
val wait_timed_read : Unix.file_descr -> float -> bool

val wait_timed_write : Unix.file_descr -> float -> bool

module ThreadRuntimeContext : sig
type t = {
ocaml_tid: int
; thread_name: string
; mutable time_running: int
; mutable tepoch: int
; tgroup: Tgroup.Group.t
}

val create : ?thread_name:string -> unit -> t
(** [create ()] creates and returns an initial thread local strorage for the
current thread. *)

val get : unit -> t
(** [get ()] returns the current thread local storage. *)

val update : (t -> t) -> t -> unit
(** [update fn thread_ctx] updates the thread local storage based on
the supplied arguments. *)

val remove : unit -> unit
(** [remove ()] removes the thread local storage of the current thread. *)
end
95 changes: 94 additions & 1 deletion ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext_test.ml
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,97 @@ let tests =
; ("other_thread", `Quick, other_thread)
]

let () = Alcotest.run "Threadext" [("Delay", tests)]
let test_create_ambient_storage () =
let open Xapi_stdext_threads.Threadext in
let _ : Thread.t =
Thread.create
(fun () ->
let storage = ThreadRuntimeContext.create () in
let storage_tid = storage.ocaml_tid in
let ocaml_tid = Thread.self () |> Thread.id in
Alcotest.(check int)
"Ocaml thread id matches the thread id stored" ocaml_tid storage_tid
)
()
in
()

let test_thread_storage_update_and_get () =
let open Xapi_stdext_threads.Threadext in
let _ : Thread.t =
Thread.create
(fun () ->
let context : ThreadRuntimeContext.t = ThreadRuntimeContext.create () in

let expected_name = "thread_1" in
ThreadRuntimeContext.update
(fun t -> {t with thread_name= expected_name})
context ;
let storage = ThreadRuntimeContext.get () in
Alcotest.(check string)
"Check if correct value is set in storage" expected_name
storage.thread_name
)
()
in
()

let test_storage_locality () =
let open Xapi_stdext_threads.Threadext in
let r1 = ref None in
let r2 = ref None in

let thread1_expected_name = "thread_1" in
let thread2_expected_name = "thread_2" in

let thread1 =
Thread.create
(fun () ->
let context = ThreadRuntimeContext.create () in
ThreadRuntimeContext.update
(fun t -> {t with thread_name= thread1_expected_name})
context ;
Thread.delay 1. ;
r1 := Some (ThreadRuntimeContext.get ())
)
()
in
let thread2 =
Thread.create
(fun () ->
let context = ThreadRuntimeContext.create () in
ThreadRuntimeContext.update
(fun t -> {t with thread_name= thread2_expected_name})
context ;

r2 := Some (ThreadRuntimeContext.get ())
)
()
in
Thread.join thread1 ;
Thread.join thread2 ;
Alcotest.(check bool)
"Check thread local storage is set for thread1" true (Option.is_some !r1) ;
Alcotest.(check bool)
"Check thread local storage is set for thread2" true (Option.is_some !r2) ;
let thread1_name =
let r1 = Option.get !r1 in
r1.thread_name
in
let thread2_name =
let r2 = Option.get !r2 in
r2.thread_name
in
Alcotest.(check string) "Thread1 name" thread1_expected_name thread1_name ;
Alcotest.(check string) "Thread2 name" thread2_expected_name thread2_name

let tls_tests =
[
("create storage", `Quick, test_create_ambient_storage)
; ("storage update and get", `Quick, test_thread_storage_update_and_get)
; ("thread local storage", `Quick, test_storage_locality)
]

let () =
Alcotest.run "Threadext"
[("Delay", tests); ("ThreadRuntimeContext", tls_tests)]
2 changes: 2 additions & 0 deletions opam/xapi-stdext-threads.opam
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ homepage: "https://xapi-project.github.io/"
bug-reports: "https://github.com/xapi-project/xen-api/issues"
depends: [
"dune" {>= "3.15"}
"ambient-context"
"base-threads"
"base-unix"
"alcotest" {with-test}
"clock" {= version}
"fmt" {with-test}
"mtime"
"tgroup"
"xapi-log" {= version}
"xapi-stdext-pervasives" {= version}
"xapi-stdext-unix" {= version}
Expand Down
Loading