Skip to content

CA-409510: Make xenopsd nested Parallel atoms explicit #6469

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 156 additions & 72 deletions ocaml/xenopsd/lib/xenops_server.ml
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ type atomic =
| VM_rename of (Vm.id * Vm.id * rename_when)
| VM_import_metadata of (Vm.id * Metadata.t)
| Parallel of Vm.id * string * atomic list
| Nested_parallel of Vm.id * string * atomic list
(** used to make nested parallel atoms explicit, as each atom requires its own worker *)
| Serial of Vm.id * string * atomic list
| Best_effort of atomic
[@@deriving rpcty]
Expand Down Expand Up @@ -286,6 +288,9 @@ let rec name_of_atomic = function
| Parallel (_, _, atomics) ->
Printf.sprintf "Parallel (%s)"
(String.concat " | " (List.map name_of_atomic atomics))
| Nested_parallel (_, _, atomics) ->
Printf.sprintf "Nested_parallel (%s)"
(String.concat " | " (List.map name_of_atomic atomics))
| Serial (_, _, atomics) ->
Printf.sprintf "Serial (%s)"
(String.concat " & " (List.map name_of_atomic atomics))
Expand All @@ -295,7 +300,7 @@ let rec name_of_atomic = function
let rec atomic_expires_after = function
| Serial (_, _, ops) ->
List.map atomic_expires_after ops |> List.fold_left ( +. ) 0.
| Parallel (_, _, ops) ->
| Parallel (_, _, ops) | Nested_parallel (_, _, ops) ->
List.map atomic_expires_after ops |> List.fold_left Float.max 0.
| _ ->
(* 20 minutes, in seconds *)
Expand Down Expand Up @@ -916,6 +921,27 @@ module Redirector = struct
Parallel atoms, creating a deadlock. *)
let parallel_queues = {queues= Queues.create (); mutex= Mutex.create ()}

(* We create another queue only for Nested_parallel atoms for the same reason
as parallel_queues. When a Nested_parallel atom is inside a Parallel atom,
they are both using a worker whilst not doing any work, so they each need
additional space to prevent a deadlock. *)
let nested_parallel_queues =
{queues= Queues.create (); mutex= Mutex.create ()}

(* we do not want to use = when comparing queues: queues can contain
(uncomparable) functions, and we are only interested in comparing the
equality of their static references *)
let is_same_redirector q1 q2 = q1 == q2

let to_string r =
match r with
| w when is_same_redirector w parallel_queues ->
"Parallel"
| w when is_same_redirector w nested_parallel_queues ->
"Nested_parallel"
| _ ->
"Default"

(* When a thread is actively processing a queue, items are redirected to a
thread-private queue *)
let overrides = ref StringMap.empty
Expand Down Expand Up @@ -1035,6 +1061,7 @@ module Redirector = struct
List.concat_map one
(default.queues
:: parallel_queues.queues
:: nested_parallel_queues.queues
:: List.map snd (StringMap.bindings !overrides)
)
)
Expand Down Expand Up @@ -1219,29 +1246,30 @@ module WorkerPool = struct
operate *)
let count_active queues =
with_lock m (fun () ->
(* we do not want to use = when comparing queues: queues can contain
(uncomparable) functions, and we are only interested in comparing the
equality of their static references *)
List.map
(fun w -> w.Worker.redirector == queues && Worker.is_active w)
(fun w ->
Redirector.is_same_redirector w.Worker.redirector queues
&& Worker.is_active w
)
!pool
|> List.filter (fun x -> x)
|> List.length
)

let find_one queues f =
List.fold_left
(fun acc x -> acc || (x.Worker.redirector == queues && f x))
(fun acc x ->
acc || (Redirector.is_same_redirector x.Worker.redirector queues && f x)
)
false

(* Clean up any shutdown threads and remove them from the master list *)
let gc queues pool =
List.fold_left
(fun acc w ->
(* we do not want to use = when comparing queues: queues can contain
(uncomparable) functions, and we are only interested in comparing the
equality of their static references *)
if w.Worker.redirector == queues && Worker.get_state w = Worker.Shutdown
if
Redirector.is_same_redirector w.Worker.redirector queues
&& Worker.get_state w = Worker.Shutdown
then (
Worker.join w ; acc
) else
Expand All @@ -1268,7 +1296,8 @@ module WorkerPool = struct
let start size =
for _i = 1 to size do
incr Redirector.default ;
incr Redirector.parallel_queues
incr Redirector.parallel_queues ;
incr Redirector.nested_parallel_queues
done

let set_size size =
Expand All @@ -1283,7 +1312,8 @@ module WorkerPool = struct
done
in
inner Redirector.default ;
inner Redirector.parallel_queues
inner Redirector.parallel_queues ;
inner Redirector.nested_parallel_queues
end

(* Keep track of which VMs we're rebooting so we avoid transient glitches where
Expand Down Expand Up @@ -1584,6 +1614,11 @@ let collect_into apply = function [] -> [] | [op] -> [op] | lst -> apply lst
let parallel name ~id =
collect_into (fun ls -> [Parallel (id, Printf.sprintf "%s VM=%s" name id, ls)])

let nested_parallel name ~id =
collect_into (fun ls ->
[Nested_parallel (id, Printf.sprintf "%s VM=%s" name id, ls)]
)

let serial name ~id =
collect_into (fun ls -> [Serial (id, Printf.sprintf "%s VM=%s" name id, ls)])

Expand All @@ -1593,6 +1628,9 @@ let serial_concat name ~id lst = serial name ~id (List.concat lst)

let parallel_map name ~id lst f = parallel name ~id (List.concat_map f lst)

let nested_parallel_map name ~id lst f =
nested_parallel name ~id (List.concat_map f lst)

let map_or_empty f x = Option.value ~default:[] (Option.map f x)

(* Creates a Serial of 2 or more Atomics. If the number of Atomics could be
Expand Down Expand Up @@ -1630,7 +1668,7 @@ let rec atomics_of_operation = function
let pf = Printf.sprintf in
let name_multi = pf "VBDs.activate_epoch_and_plug %s" typ in
let name_one = pf "VBD.activate_epoch_and_plug %s" typ in
parallel_map name_multi ~id vbds (fun vbd ->
nested_parallel_map name_multi ~id vbds (fun vbd ->
serial_concat name_one ~id
[
[VBD_set_active (vbd.Vbd.id, true)]
Expand Down Expand Up @@ -1664,11 +1702,11 @@ let rec atomics_of_operation = function
vifs
; serial_concat "VGPUs.activate & PCI.plug (SRIOV)" ~id
[
parallel_map "VGPUs.activate" ~id vgpus (fun vgpu ->
nested_parallel_map "VGPUs.activate" ~id vgpus (fun vgpu ->
[VGPU_set_active (vgpu.Vgpu.id, true)]
)
; parallel_map "PCIs.plug (SRIOV)" ~id pcis_sriov (fun pci ->
[PCI_plug (pci.Pci.id, false)]
; nested_parallel_map "PCIs.plug (SRIOV)" ~id pcis_sriov
(fun pci -> [PCI_plug (pci.Pci.id, false)]
)
]
]
Expand Down Expand Up @@ -1882,57 +1920,12 @@ let rec perform_atomic ~progress_callback ?result (op : atomic)
debug "Ignoring error during best-effort operation: %s"
(Printexc.to_string e)
)
| Parallel (_id, description, atoms) ->
(* parallel_id is a unused unique name prefix for a parallel worker queue *)
let parallel_id =
Printf.sprintf "Parallel:task=%s.atoms=%d.(%s)"
(Xenops_task.id_of_handle t)
(List.length atoms) description
in
let with_tracing = id_with_tracing parallel_id t in
debug "begin_%s" parallel_id ;
let task_list =
queue_atomics_and_wait ~progress_callback ~max_parallel_atoms:10
with_tracing parallel_id atoms
in
debug "end_%s" parallel_id ;
(* make sure that we destroy all the parallel tasks that finished *)
let errors =
List.map
(fun (id, task_handle, task_state) ->
match task_state with
| Some (Task.Completed _) ->
TASK.destroy' id ; None
| Some (Task.Failed e) ->
TASK.destroy' id ;
let e =
match Rpcmarshal.unmarshal Errors.error.Rpc.Types.ty e with
| Ok x ->
Xenopsd_error x
| Error (`Msg x) ->
internal_error "Error unmarshalling failure: %s" x
in
Some e
| None | Some (Task.Pending _) ->
(* Because pending tasks are filtered out in
queue_atomics_and_wait with task_ended the second case will
never be encountered. The previous boolean used in
event_wait was enough to express the possible cases *)
let err_msg =
Printf.sprintf "Timed out while waiting on task %s (%s)" id
(Xenops_task.get_dbg task_handle)
in
error "%s" err_msg ;
Xenops_task.cancel task_handle ;
Some (Xenopsd_error (Internal_error err_msg))
)
task_list
in
(* if any error was present, raise first one, so that
trigger_cleanup_after_failure is called *)
List.iter
(fun err -> match err with None -> () | Some e -> raise e)
errors
| Parallel (_id, description, atoms) as atom ->
check_nesting atom ;
parallel_atomic ~progress_callback ~description ~nested:false atoms t
| Nested_parallel (_id, description, atoms) as atom ->
check_nesting atom ;
parallel_atomic ~progress_callback ~description ~nested:true atoms t
| Serial (_, _, atoms) ->
List.iter (Fun.flip (perform_atomic ~progress_callback) t) atoms
| VIF_plug id ->
Expand Down Expand Up @@ -2361,7 +2354,92 @@ let rec perform_atomic ~progress_callback ?result (op : atomic)
debug "VM.soft_reset %s" id ;
B.VM.soft_reset t (VM_DB.read_exn id)

and queue_atomic_int ~progress_callback dbg id op =
and check_nesting atom =
let msg_prefix = "Nested atomics error" in
let rec check_nesting_inner found_parallel found_nested = function
| Parallel (_, _, rem) ->
if found_parallel then (
warn
"%s: Two or more Parallel atoms found, use Nested_parallel for the \
inner atom"
msg_prefix ;
true
) else
List.exists (check_nesting_inner true found_nested) rem
| Nested_parallel (_, _, rem) ->
if found_nested then (
warn
"%s: Two or more Nested_parallel atoms found, there should only be \
one layer of nesting"
msg_prefix ;
true
) else
List.exists (check_nesting_inner found_parallel true) rem
| Serial (_, _, rem) ->
List.exists (check_nesting_inner found_parallel found_nested) rem
| _ ->
false
in
ignore @@ check_nesting_inner false false atom

and parallel_atomic ~progress_callback ~description ~nested atoms t =
(* parallel_id is a unused unique name prefix for a parallel worker queue *)
let redirector =
if nested then
Redirector.nested_parallel_queues
else
Redirector.parallel_queues
in
let parallel_id =
Printf.sprintf "%s:task=%s.atoms=%d.(%s)"
(Redirector.to_string redirector)
(Xenops_task.id_of_handle t)
(List.length atoms) description
in
let with_tracing = id_with_tracing parallel_id t in
debug "begin_%s" parallel_id ;
let task_list =
queue_atomics_and_wait ~progress_callback ~max_parallel_atoms:10
with_tracing parallel_id atoms redirector
in
debug "end_%s" parallel_id ;
(* make sure that we destroy all the parallel tasks that finished *)
let errors =
List.map
(fun (id, task_handle, task_state) ->
match task_state with
| Some (Task.Completed _) ->
TASK.destroy' id ; None
| Some (Task.Failed e) ->
TASK.destroy' id ;
let e =
match Rpcmarshal.unmarshal Errors.error.Rpc.Types.ty e with
| Ok x ->
Xenopsd_error x
| Error (`Msg x) ->
internal_error "Error unmarshalling failure: %s" x
in
Some e
| None | Some (Task.Pending _) ->
(* Because pending tasks are filtered out in
queue_atomics_and_wait with task_ended the second case will
never be encountered. The previous boolean used in
event_wait was enough to express the possible cases *)
let err_msg =
Printf.sprintf "Timed out while waiting on task %s (%s)" id
(Xenops_task.get_dbg task_handle)
in
error "%s" err_msg ;
Xenops_task.cancel task_handle ;
Some (Xenopsd_error (Internal_error err_msg))
)
task_list
in
(* if any error was present, raise first one, so that
trigger_cleanup_after_failure is called *)
List.iter (fun err -> match err with None -> () | Some e -> raise e) errors

and queue_atomic_int ~progress_callback dbg id op redirector =
let task =
Xenops_task.add tasks dbg
(let r = ref None in
Expand All @@ -2370,10 +2448,12 @@ and queue_atomic_int ~progress_callback dbg id op =
!r
)
in
Redirector.push Redirector.parallel_queues id (Atomic op, task) ;
debug "Adding to %s queues" (Redirector.to_string redirector) ;
Redirector.push redirector id (Atomic op, task) ;
task

and queue_atomics_and_wait ~progress_callback ~max_parallel_atoms dbg id ops =
and queue_atomics_and_wait ~progress_callback ~max_parallel_atoms dbg id ops
redirector =
let from = Updates.last_id dbg updates in
Xenops_utils.chunks max_parallel_atoms ops
|> List.mapi (fun chunk_idx ops ->
Expand All @@ -2386,7 +2466,9 @@ and queue_atomics_and_wait ~progress_callback ~max_parallel_atoms dbg id ops =
let atom_id =
Printf.sprintf "%s.chunk=%d.atom=%d" id chunk_idx atom_idx
in
(queue_atomic_int ~progress_callback dbg atom_id op, op)
( queue_atomic_int ~progress_callback dbg atom_id op redirector
, op
)
)
ops
in
Expand Down Expand Up @@ -2562,7 +2644,9 @@ and trigger_cleanup_after_failure_atom op t =
immediate_operation dbg id (VM_check_state id)
| Best_effort op ->
trigger_cleanup_after_failure_atom op t
| Parallel (_id, _description, ops) | Serial (_id, _description, ops) ->
| Parallel (_id, _description, ops)
| Nested_parallel (_id, _description, ops)
| Serial (_id, _description, ops) ->
List.iter (fun op -> trigger_cleanup_after_failure_atom op t) ops
| VM_rename (id1, id2, _) ->
immediate_operation dbg id1 (VM_check_state id1) ;
Expand Down
2 changes: 1 addition & 1 deletion quality-gate.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ mli-files () {
}

structural-equality () {
N=9
N=7
EQ=$(git grep -r --count ' == ' -- '**/*.ml' ':!ocaml/sdk-gen/**/*.ml' | cut -d ':' -f 2 | paste -sd+ - | bc)
if [ "$EQ" -eq "$N" ]; then
echo "OK counted $EQ usages of ' == '"
Expand Down
Loading