From cedf8362e71964ab05451224ff8fa6b731f254e4 Mon Sep 17 00:00:00 2001 From: Steven Woods Date: Tue, 29 Apr 2025 11:33:42 +0100 Subject: [PATCH 1/2] CA-409510: Make xenopsd nested Parallel atoms explicit Each Parallel atom takes up a worker thread whilst its children do the actual work, so we have parallel_queues to prevent a deadlock. However, nested Parallel atoms take up an additional worker, meaning they can still cause a deadlock. This commit adds a new Nested_parallel atomic with matching nested_parallel_queues to remove the possibility of this deadlock. This increases the total number of workers, but these workers are just to hold the Nested_parallel Atomics and will not be doing any actual work Signed-off-by: Steven Woods --- ocaml/xenopsd/lib/xenops_server.ml | 196 ++++++++++++++++++----------- quality-gate.sh | 2 +- 2 files changed, 126 insertions(+), 72 deletions(-) diff --git a/ocaml/xenopsd/lib/xenops_server.ml b/ocaml/xenopsd/lib/xenops_server.ml index 3a4cf23806..da91cc651c 100644 --- a/ocaml/xenopsd/lib/xenops_server.ml +++ b/ocaml/xenopsd/lib/xenops_server.ml @@ -168,6 +168,8 @@ type atomic = | VM_rename of (Vm.id * Vm.id * rename_when) | VM_import_metadata of (Vm.id * Metadata.t) | Parallel of Vm.id * string * atomic list + | Nested_parallel of Vm.id * string * atomic list + (** used to make nested parallel atoms explicit, as each atom requires its own worker *) | Serial of Vm.id * string * atomic list | Best_effort of atomic [@@deriving rpcty] @@ -286,6 +288,9 @@ let rec name_of_atomic = function | Parallel (_, _, atomics) -> Printf.sprintf "Parallel (%s)" (String.concat " | " (List.map name_of_atomic atomics)) + | Nested_parallel (_, _, atomics) -> + Printf.sprintf "Nested_parallel (%s)" + (String.concat " | " (List.map name_of_atomic atomics)) | Serial (_, _, atomics) -> Printf.sprintf "Serial (%s)" (String.concat " & " (List.map name_of_atomic atomics)) @@ -295,7 +300,7 @@ let rec name_of_atomic = function let rec atomic_expires_after = function | Serial (_, _, ops) -> List.map atomic_expires_after ops |> List.fold_left ( +. ) 0. - | Parallel (_, _, ops) -> + | Parallel (_, _, ops) | Nested_parallel (_, _, ops) -> List.map atomic_expires_after ops |> List.fold_left Float.max 0. | _ -> (* 20 minutes, in seconds *) @@ -916,6 +921,27 @@ module Redirector = struct Parallel atoms, creating a deadlock. *) let parallel_queues = {queues= Queues.create (); mutex= Mutex.create ()} + (* We create another queue only for Nested_parallel atoms for the same reason + as parallel_queues. When a Nested_parallel atom is inside a Parallel atom, + they are both using a worker whilst not doing any work, so they each need + additional space to prevent a deadlock. *) + let nested_parallel_queues = + {queues= Queues.create (); mutex= Mutex.create ()} + + (* we do not want to use = when comparing queues: queues can contain + (uncomparable) functions, and we are only interested in comparing the + equality of their static references *) + let is_same_redirector q1 q2 = q1 == q2 + + let to_string r = + match r with + | w when is_same_redirector w parallel_queues -> + "Parallel" + | w when is_same_redirector w nested_parallel_queues -> + "Nested_parallel" + | _ -> + "Default" + (* When a thread is actively processing a queue, items are redirected to a thread-private queue *) let overrides = ref StringMap.empty @@ -1035,6 +1061,7 @@ module Redirector = struct List.concat_map one (default.queues :: parallel_queues.queues + :: nested_parallel_queues.queues :: List.map snd (StringMap.bindings !overrides) ) ) @@ -1219,11 +1246,11 @@ module WorkerPool = struct operate *) let count_active queues = with_lock m (fun () -> - (* we do not want to use = when comparing queues: queues can contain - (uncomparable) functions, and we are only interested in comparing the - equality of their static references *) List.map - (fun w -> w.Worker.redirector == queues && Worker.is_active w) + (fun w -> + Redirector.is_same_redirector w.Worker.redirector queues + && Worker.is_active w + ) !pool |> List.filter (fun x -> x) |> List.length @@ -1231,17 +1258,18 @@ module WorkerPool = struct let find_one queues f = List.fold_left - (fun acc x -> acc || (x.Worker.redirector == queues && f x)) + (fun acc x -> + acc || (Redirector.is_same_redirector x.Worker.redirector queues && f x) + ) false (* Clean up any shutdown threads and remove them from the master list *) let gc queues pool = List.fold_left (fun acc w -> - (* we do not want to use = when comparing queues: queues can contain - (uncomparable) functions, and we are only interested in comparing the - equality of their static references *) - if w.Worker.redirector == queues && Worker.get_state w = Worker.Shutdown + if + Redirector.is_same_redirector w.Worker.redirector queues + && Worker.get_state w = Worker.Shutdown then ( Worker.join w ; acc ) else @@ -1268,7 +1296,8 @@ module WorkerPool = struct let start size = for _i = 1 to size do incr Redirector.default ; - incr Redirector.parallel_queues + incr Redirector.parallel_queues ; + incr Redirector.nested_parallel_queues done let set_size size = @@ -1283,7 +1312,8 @@ module WorkerPool = struct done in inner Redirector.default ; - inner Redirector.parallel_queues + inner Redirector.parallel_queues ; + inner Redirector.nested_parallel_queues end (* Keep track of which VMs we're rebooting so we avoid transient glitches where @@ -1584,6 +1614,11 @@ let collect_into apply = function [] -> [] | [op] -> [op] | lst -> apply lst let parallel name ~id = collect_into (fun ls -> [Parallel (id, Printf.sprintf "%s VM=%s" name id, ls)]) +let nested_parallel name ~id = + collect_into (fun ls -> + [Nested_parallel (id, Printf.sprintf "%s VM=%s" name id, ls)] + ) + let serial name ~id = collect_into (fun ls -> [Serial (id, Printf.sprintf "%s VM=%s" name id, ls)]) @@ -1593,6 +1628,9 @@ let serial_concat name ~id lst = serial name ~id (List.concat lst) let parallel_map name ~id lst f = parallel name ~id (List.concat_map f lst) +let nested_parallel_map name ~id lst f = + nested_parallel name ~id (List.concat_map f lst) + let map_or_empty f x = Option.value ~default:[] (Option.map f x) (* Creates a Serial of 2 or more Atomics. If the number of Atomics could be @@ -1630,7 +1668,7 @@ let rec atomics_of_operation = function let pf = Printf.sprintf in let name_multi = pf "VBDs.activate_epoch_and_plug %s" typ in let name_one = pf "VBD.activate_epoch_and_plug %s" typ in - parallel_map name_multi ~id vbds (fun vbd -> + nested_parallel_map name_multi ~id vbds (fun vbd -> serial_concat name_one ~id [ [VBD_set_active (vbd.Vbd.id, true)] @@ -1664,11 +1702,11 @@ let rec atomics_of_operation = function vifs ; serial_concat "VGPUs.activate & PCI.plug (SRIOV)" ~id [ - parallel_map "VGPUs.activate" ~id vgpus (fun vgpu -> + nested_parallel_map "VGPUs.activate" ~id vgpus (fun vgpu -> [VGPU_set_active (vgpu.Vgpu.id, true)] ) - ; parallel_map "PCIs.plug (SRIOV)" ~id pcis_sriov (fun pci -> - [PCI_plug (pci.Pci.id, false)] + ; nested_parallel_map "PCIs.plug (SRIOV)" ~id pcis_sriov + (fun pci -> [PCI_plug (pci.Pci.id, false)] ) ] ] @@ -1883,56 +1921,9 @@ let rec perform_atomic ~progress_callback ?result (op : atomic) (Printexc.to_string e) ) | Parallel (_id, description, atoms) -> - (* parallel_id is a unused unique name prefix for a parallel worker queue *) - let parallel_id = - Printf.sprintf "Parallel:task=%s.atoms=%d.(%s)" - (Xenops_task.id_of_handle t) - (List.length atoms) description - in - let with_tracing = id_with_tracing parallel_id t in - debug "begin_%s" parallel_id ; - let task_list = - queue_atomics_and_wait ~progress_callback ~max_parallel_atoms:10 - with_tracing parallel_id atoms - in - debug "end_%s" parallel_id ; - (* make sure that we destroy all the parallel tasks that finished *) - let errors = - List.map - (fun (id, task_handle, task_state) -> - match task_state with - | Some (Task.Completed _) -> - TASK.destroy' id ; None - | Some (Task.Failed e) -> - TASK.destroy' id ; - let e = - match Rpcmarshal.unmarshal Errors.error.Rpc.Types.ty e with - | Ok x -> - Xenopsd_error x - | Error (`Msg x) -> - internal_error "Error unmarshalling failure: %s" x - in - Some e - | None | Some (Task.Pending _) -> - (* Because pending tasks are filtered out in - queue_atomics_and_wait with task_ended the second case will - never be encountered. The previous boolean used in - event_wait was enough to express the possible cases *) - let err_msg = - Printf.sprintf "Timed out while waiting on task %s (%s)" id - (Xenops_task.get_dbg task_handle) - in - error "%s" err_msg ; - Xenops_task.cancel task_handle ; - Some (Xenopsd_error (Internal_error err_msg)) - ) - task_list - in - (* if any error was present, raise first one, so that - trigger_cleanup_after_failure is called *) - List.iter - (fun err -> match err with None -> () | Some e -> raise e) - errors + parallel_atomic ~progress_callback ~description ~nested:false atoms t + | Nested_parallel (_id, description, atoms) -> + parallel_atomic ~progress_callback ~description ~nested:true atoms t | Serial (_, _, atoms) -> List.iter (Fun.flip (perform_atomic ~progress_callback) t) atoms | VIF_plug id -> @@ -2361,7 +2352,64 @@ let rec perform_atomic ~progress_callback ?result (op : atomic) debug "VM.soft_reset %s" id ; B.VM.soft_reset t (VM_DB.read_exn id) -and queue_atomic_int ~progress_callback dbg id op = +and parallel_atomic ~progress_callback ~description ~nested atoms t = + (* parallel_id is a unused unique name prefix for a parallel worker queue *) + let redirector = + if nested then + Redirector.nested_parallel_queues + else + Redirector.parallel_queues + in + let parallel_id = + Printf.sprintf "%s:task=%s.atoms=%d.(%s)" + (Redirector.to_string redirector) + (Xenops_task.id_of_handle t) + (List.length atoms) description + in + let with_tracing = id_with_tracing parallel_id t in + debug "begin_%s" parallel_id ; + let task_list = + queue_atomics_and_wait ~progress_callback ~max_parallel_atoms:10 + with_tracing parallel_id atoms redirector + in + debug "end_%s" parallel_id ; + (* make sure that we destroy all the parallel tasks that finished *) + let errors = + List.map + (fun (id, task_handle, task_state) -> + match task_state with + | Some (Task.Completed _) -> + TASK.destroy' id ; None + | Some (Task.Failed e) -> + TASK.destroy' id ; + let e = + match Rpcmarshal.unmarshal Errors.error.Rpc.Types.ty e with + | Ok x -> + Xenopsd_error x + | Error (`Msg x) -> + internal_error "Error unmarshalling failure: %s" x + in + Some e + | None | Some (Task.Pending _) -> + (* Because pending tasks are filtered out in + queue_atomics_and_wait with task_ended the second case will + never be encountered. The previous boolean used in + event_wait was enough to express the possible cases *) + let err_msg = + Printf.sprintf "Timed out while waiting on task %s (%s)" id + (Xenops_task.get_dbg task_handle) + in + error "%s" err_msg ; + Xenops_task.cancel task_handle ; + Some (Xenopsd_error (Internal_error err_msg)) + ) + task_list + in + (* if any error was present, raise first one, so that + trigger_cleanup_after_failure is called *) + List.iter (fun err -> match err with None -> () | Some e -> raise e) errors + +and queue_atomic_int ~progress_callback dbg id op redirector = let task = Xenops_task.add tasks dbg (let r = ref None in @@ -2370,10 +2418,12 @@ and queue_atomic_int ~progress_callback dbg id op = !r ) in - Redirector.push Redirector.parallel_queues id (Atomic op, task) ; + debug "Adding to %s queues" (Redirector.to_string redirector) ; + Redirector.push redirector id (Atomic op, task) ; task -and queue_atomics_and_wait ~progress_callback ~max_parallel_atoms dbg id ops = +and queue_atomics_and_wait ~progress_callback ~max_parallel_atoms dbg id ops + redirector = let from = Updates.last_id dbg updates in Xenops_utils.chunks max_parallel_atoms ops |> List.mapi (fun chunk_idx ops -> @@ -2386,7 +2436,9 @@ and queue_atomics_and_wait ~progress_callback ~max_parallel_atoms dbg id ops = let atom_id = Printf.sprintf "%s.chunk=%d.atom=%d" id chunk_idx atom_idx in - (queue_atomic_int ~progress_callback dbg atom_id op, op) + ( queue_atomic_int ~progress_callback dbg atom_id op redirector + , op + ) ) ops in @@ -2562,7 +2614,9 @@ and trigger_cleanup_after_failure_atom op t = immediate_operation dbg id (VM_check_state id) | Best_effort op -> trigger_cleanup_after_failure_atom op t - | Parallel (_id, _description, ops) | Serial (_id, _description, ops) -> + | Parallel (_id, _description, ops) + | Nested_parallel (_id, _description, ops) + | Serial (_id, _description, ops) -> List.iter (fun op -> trigger_cleanup_after_failure_atom op t) ops | VM_rename (id1, id2, _) -> immediate_operation dbg id1 (VM_check_state id1) ; diff --git a/quality-gate.sh b/quality-gate.sh index a1f57a6752..c812697556 100755 --- a/quality-gate.sh +++ b/quality-gate.sh @@ -44,7 +44,7 @@ mli-files () { } structural-equality () { - N=9 + N=7 EQ=$(git grep -r --count ' == ' -- '**/*.ml' ':!ocaml/sdk-gen/**/*.ml' | cut -d ':' -f 2 | paste -sd+ - | bc) if [ "$EQ" -eq "$N" ]; then echo "OK counted $EQ usages of ' == '" From 62584053b0c54c8ca3bfa1f0b31c6b482b049bd7 Mon Sep 17 00:00:00 2001 From: Steven Woods Date: Mon, 19 May 2025 16:28:23 +0100 Subject: [PATCH 2/2] CA-409510: Give a warning if atoms nested incorrectly This is a stopgap until we add compile-time constraints on the nesting, by for example using a polymorphic variant. Signed-off-by: Steven Woods --- ocaml/xenopsd/lib/xenops_server.ml | 34 ++++++++++++++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/ocaml/xenopsd/lib/xenops_server.ml b/ocaml/xenopsd/lib/xenops_server.ml index da91cc651c..ae93a2476c 100644 --- a/ocaml/xenopsd/lib/xenops_server.ml +++ b/ocaml/xenopsd/lib/xenops_server.ml @@ -1920,9 +1920,11 @@ let rec perform_atomic ~progress_callback ?result (op : atomic) debug "Ignoring error during best-effort operation: %s" (Printexc.to_string e) ) - | Parallel (_id, description, atoms) -> + | Parallel (_id, description, atoms) as atom -> + check_nesting atom ; parallel_atomic ~progress_callback ~description ~nested:false atoms t - | Nested_parallel (_id, description, atoms) -> + | Nested_parallel (_id, description, atoms) as atom -> + check_nesting atom ; parallel_atomic ~progress_callback ~description ~nested:true atoms t | Serial (_, _, atoms) -> List.iter (Fun.flip (perform_atomic ~progress_callback) t) atoms @@ -2352,6 +2354,34 @@ let rec perform_atomic ~progress_callback ?result (op : atomic) debug "VM.soft_reset %s" id ; B.VM.soft_reset t (VM_DB.read_exn id) +and check_nesting atom = + let msg_prefix = "Nested atomics error" in + let rec check_nesting_inner found_parallel found_nested = function + | Parallel (_, _, rem) -> + if found_parallel then ( + warn + "%s: Two or more Parallel atoms found, use Nested_parallel for the \ + inner atom" + msg_prefix ; + true + ) else + List.exists (check_nesting_inner true found_nested) rem + | Nested_parallel (_, _, rem) -> + if found_nested then ( + warn + "%s: Two or more Nested_parallel atoms found, there should only be \ + one layer of nesting" + msg_prefix ; + true + ) else + List.exists (check_nesting_inner found_parallel true) rem + | Serial (_, _, rem) -> + List.exists (check_nesting_inner found_parallel found_nested) rem + | _ -> + false + in + ignore @@ check_nesting_inner false false atom + and parallel_atomic ~progress_callback ~description ~nested atoms t = (* parallel_id is a unused unique name prefix for a parallel worker queue *) let redirector =