diff --git a/dune-project b/dune-project index 1de533b179d..3c6b4af663e 100644 --- a/dune-project +++ b/dune-project @@ -711,12 +711,14 @@ This package provides an Lwt compatible interface to the library.") (synopsis "Xapi's standard library extension, Threads") (authors "Jonathan Ludlam") (depends + ambient-context base-threads base-unix (alcotest :with-test) (clock (= :version)) (fmt :with-test) mtime + tgroup (xapi-log (= :version)) (xapi-stdext-pervasives (= :version)) (xapi-stdext-unix (= :version)) diff --git a/ocaml/libs/tgroup/tgroup.ml b/ocaml/libs/tgroup/tgroup.ml index 171b78ee2b2..071a9dfe0d2 100644 --- a/ocaml/libs/tgroup/tgroup.ml +++ b/ocaml/libs/tgroup/tgroup.ml @@ -253,6 +253,11 @@ module Group = struct External.name // External.Unauthenticated.name let to_string g = match g with Group group -> to_cgroup group + + let authenticated_root = + of_creator (Creator.make ~identity:Identity.root_identity ()) + + let unauthenticated = Group External_Unauthenticated end module Cgroup = struct diff --git a/ocaml/libs/tgroup/tgroup.mli b/ocaml/libs/tgroup/tgroup.mli index b9316967ae3..d89ef542ffd 100644 --- a/ocaml/libs/tgroup/tgroup.mli +++ b/ocaml/libs/tgroup/tgroup.mli @@ -37,7 +37,7 @@ module Group : sig val of_string : string -> t (** [of_string s] creates an originator from a string [s]. - + e.g create an originator based on a http header. *) val to_string : t -> string @@ -76,6 +76,14 @@ module Group : sig val to_string : t -> string (** [to_string g] returns the string representation of the group [g].*) + + val authenticated_root : t + (** [authenticated_root] represents the main classification of internal xapi + threads. *) + + val unauthenticated : t + (** [unauthenticated] represents the classification of xapi threads for + unauthenticated users. *) end (** [Cgroup] module encapsulates different function for managing the cgroups @@ -87,7 +95,7 @@ module Cgroup : sig val dir_of : Group.t -> t option (** [dir_of group] returns the full path of the cgroup directory corresponding to the group [group] as [Some dir]. - + Returns [None] if [init dir] has not been called. *) val init : string -> unit diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune index 5d61f52cfc4..0dc52b78cd8 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune @@ -1,29 +1,47 @@ (library (public_name xapi-stdext-threads) - (name xapi_stdext_threads) + (name xapi_stdext_threads) (modules :standard \ ipq scheduler threadext_test ipq_test scheduler_test) (libraries + ambient-context.thread_local mtime mtime.clock.os threads.posix unix + tgroup xapi-stdext-unix xapi-stdext-pervasives) (foreign_stubs (language c) - (names delay_stubs)) + (names delay_stubs) + ) ) (library (public_name xapi-stdext-threads.scheduler) (name xapi_stdext_threads_scheduler) (modules ipq scheduler) - (libraries mtime mtime.clock.os threads.posix unix xapi-log xapi-stdext-threads clock) -) + (libraries + mtime + mtime.clock.os + threads.posix + unix + xapi-log + xapi-stdext-threads + clock) + ) (tests (names threadext_test ipq_test scheduler_test) (package xapi-stdext-threads) (modules threadext_test ipq_test scheduler_test) - (libraries xapi_stdext_threads alcotest mtime.clock.os mtime fmt threads.posix xapi_stdext_threads_scheduler) + (libraries + xapi_stdext_threads + alcotest + mtime.clock.os + mtime + fmt + tgroup + threads.posix + xapi_stdext_threads_scheduler) ) diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.ml b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.ml index b954a159ddb..c8d85d8b6c5 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.ml +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.ml @@ -97,3 +97,41 @@ let wait_timed_write fd timeout = true | _ -> assert false + +module ThreadRuntimeContext = struct + type t = { + ocaml_tid: int + ; thread_name: string + ; mutable time_running: int + ; mutable tepoch: int + ; tgroup: Tgroup.Group.t + } + + (*The documentation for Ambient_context_thread_local isn't really clear is + this context. thread_local_storage is a global variable shared by all + threads. It is a map with keys, the thread IDs and values the above + defined data structure.*) + let thread_local_storage = Ambient_context_thread_local.Thread_local.create () + + let create ?(thread_name = "") () = + let ocaml_tid = Thread.self () |> Thread.id in + let time_running = 0 in + let tepoch = 0 in + let tgroup = Tgroup.Group.authenticated_root in + let tls = {thread_name; tgroup; ocaml_tid; time_running; tepoch} in + let () = + Ambient_context_thread_local.Thread_local.set thread_local_storage tls + in + tls + + let get () = + Ambient_context_thread_local.Thread_local.get_or_create ~create + thread_local_storage + + let update f context = + f context + |> Ambient_context_thread_local.Thread_local.set thread_local_storage + + let remove () = + Ambient_context_thread_local.Thread_local.remove thread_local_storage +end diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.mli b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.mli index a1af35ccbeb..7967e3fa573 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.mli +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.mli @@ -43,3 +43,27 @@ end val wait_timed_read : Unix.file_descr -> float -> bool val wait_timed_write : Unix.file_descr -> float -> bool + +module ThreadRuntimeContext : sig + type t = { + ocaml_tid: int + ; thread_name: string + ; mutable time_running: int + ; mutable tepoch: int + ; tgroup: Tgroup.Group.t + } + + val create : ?thread_name:string -> unit -> t + (** [create ()] creates and returns an initial thread local strorage for the + current thread. *) + + val get : unit -> t + (** [get ()] returns the current thread local storage. *) + + val update : (t -> t) -> t -> unit + (** [update fn thread_ctx] updates the thread local storage based on + the supplied arguments. *) + + val remove : unit -> unit + (** [remove ()] removes the thread local storage of the current thread. *) +end diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext_test.ml b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext_test.ml index b93df9f47a8..2182430b182 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext_test.ml +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext_test.ml @@ -74,4 +74,97 @@ let tests = ; ("other_thread", `Quick, other_thread) ] -let () = Alcotest.run "Threadext" [("Delay", tests)] +let test_create_ambient_storage () = + let open Xapi_stdext_threads.Threadext in + let _ : Thread.t = + Thread.create + (fun () -> + let storage = ThreadRuntimeContext.create () in + let storage_tid = storage.ocaml_tid in + let ocaml_tid = Thread.self () |> Thread.id in + Alcotest.(check int) + "Ocaml thread id matches the thread id stored" ocaml_tid storage_tid + ) + () + in + () + +let test_thread_storage_update_and_get () = + let open Xapi_stdext_threads.Threadext in + let _ : Thread.t = + Thread.create + (fun () -> + let context : ThreadRuntimeContext.t = ThreadRuntimeContext.create () in + + let expected_name = "thread_1" in + ThreadRuntimeContext.update + (fun t -> {t with thread_name= expected_name}) + context ; + let storage = ThreadRuntimeContext.get () in + Alcotest.(check string) + "Check if correct value is set in storage" expected_name + storage.thread_name + ) + () + in + () + +let test_storage_locality () = + let open Xapi_stdext_threads.Threadext in + let r1 = ref None in + let r2 = ref None in + + let thread1_expected_name = "thread_1" in + let thread2_expected_name = "thread_2" in + + let thread1 = + Thread.create + (fun () -> + let context = ThreadRuntimeContext.create () in + ThreadRuntimeContext.update + (fun t -> {t with thread_name= thread1_expected_name}) + context ; + Thread.delay 1. ; + r1 := Some (ThreadRuntimeContext.get ()) + ) + () + in + let thread2 = + Thread.create + (fun () -> + let context = ThreadRuntimeContext.create () in + ThreadRuntimeContext.update + (fun t -> {t with thread_name= thread2_expected_name}) + context ; + + r2 := Some (ThreadRuntimeContext.get ()) + ) + () + in + Thread.join thread1 ; + Thread.join thread2 ; + Alcotest.(check bool) + "Check thread local storage is set for thread1" true (Option.is_some !r1) ; + Alcotest.(check bool) + "Check thread local storage is set for thread2" true (Option.is_some !r2) ; + let thread1_name = + let r1 = Option.get !r1 in + r1.thread_name + in + let thread2_name = + let r2 = Option.get !r2 in + r2.thread_name + in + Alcotest.(check string) "Thread1 name" thread1_expected_name thread1_name ; + Alcotest.(check string) "Thread2 name" thread2_expected_name thread2_name + +let tls_tests = + [ + ("create storage", `Quick, test_create_ambient_storage) + ; ("storage update and get", `Quick, test_thread_storage_update_and_get) + ; ("thread local storage", `Quick, test_storage_locality) + ] + +let () = + Alcotest.run "Threadext" + [("Delay", tests); ("ThreadRuntimeContext", tls_tests)] diff --git a/opam/xapi-stdext-threads.opam b/opam/xapi-stdext-threads.opam index a61529e7e09..55653e588c9 100644 --- a/opam/xapi-stdext-threads.opam +++ b/opam/xapi-stdext-threads.opam @@ -8,12 +8,14 @@ homepage: "https://xapi-project.github.io/" bug-reports: "https://github.com/xapi-project/xen-api/issues" depends: [ "dune" {>= "3.15"} + "ambient-context" "base-threads" "base-unix" "alcotest" {with-test} "clock" {= version} "fmt" {with-test} "mtime" + "tgroup" "xapi-log" {= version} "xapi-stdext-pervasives" {= version} "xapi-stdext-unix" {= version}