@@ -168,6 +168,8 @@ type atomic =
168168 | VM_rename of (Vm .id * Vm .id * rename_when )
169169 | VM_import_metadata of (Vm .id * Metadata .t )
170170 | Parallel of Vm .id * string * atomic list
171+ | Nested_parallel of Vm .id * string * atomic list
172+ (* * used to make nested parallel atoms explicit, as each atom requires its own worker *)
171173 | Serial of Vm .id * string * atomic list
172174 | Best_effort of atomic
173175[@@ deriving rpcty ]
@@ -286,6 +288,9 @@ let rec name_of_atomic = function
286288 | Parallel (_ , _ , atomics ) ->
287289 Printf. sprintf " Parallel (%s)"
288290 (String. concat " | " (List. map name_of_atomic atomics))
291+ | Nested_parallel (_ , _ , atomics ) ->
292+ Printf. sprintf " Nested_parallel (%s)"
293+ (String. concat " | " (List. map name_of_atomic atomics))
289294 | Serial (_ , _ , atomics ) ->
290295 Printf. sprintf " Serial (%s)"
291296 (String. concat " & " (List. map name_of_atomic atomics))
@@ -295,7 +300,7 @@ let rec name_of_atomic = function
295300let rec atomic_expires_after = function
296301 | Serial (_ , _ , ops ) ->
297302 List. map atomic_expires_after ops |> List. fold_left ( +. ) 0.
298- | Parallel (_ , _ , ops ) ->
303+ | Parallel (_ , _ , ops ) | Nested_parallel ( _ , _ , ops ) ->
299304 List. map atomic_expires_after ops |> List. fold_left Float. max 0.
300305 | _ ->
301306 (* 20 minutes, in seconds *)
@@ -916,6 +921,27 @@ module Redirector = struct
916921 Parallel atoms, creating a deadlock. *)
917922 let parallel_queues = {queues= Queues. create () ; mutex= Mutex. create () }
918923
924+ (* We create another queue only for Nested_parallel atoms for the same reason
925+ as parallel_queues. When a Nested_parallel atom is inside a Parallel atom,
926+ they are both using a worker whilst not doing any work, so they each need
927+ additional space to prevent a deadlock. *)
928+ let nested_parallel_queues =
929+ {queues= Queues. create () ; mutex= Mutex. create () }
930+
931+ (* we do not want to use = when comparing queues: queues can contain
932+ (uncomparable) functions, and we are only interested in comparing the
933+ equality of their static references *)
934+ let is_same_redirector q1 q2 = q1 == q2
935+
936+ let to_string r =
937+ match r with
938+ | w when is_same_redirector w parallel_queues ->
939+ " Parallel"
940+ | w when is_same_redirector w nested_parallel_queues ->
941+ " Nested_parallel"
942+ | _ ->
943+ " Default"
944+
919945 (* When a thread is actively processing a queue, items are redirected to a
920946 thread-private queue *)
921947 let overrides = ref StringMap. empty
@@ -1035,6 +1061,7 @@ module Redirector = struct
10351061 List. concat_map one
10361062 (default.queues
10371063 :: parallel_queues.queues
1064+ :: nested_parallel_queues.queues
10381065 :: List. map snd (StringMap. bindings ! overrides)
10391066 )
10401067 )
@@ -1219,29 +1246,30 @@ module WorkerPool = struct
12191246 operate *)
12201247 let count_active queues =
12211248 with_lock m (fun () ->
1222- (* we do not want to use = when comparing queues: queues can contain
1223- (uncomparable) functions, and we are only interested in comparing the
1224- equality of their static references *)
12251249 List. map
1226- (fun w -> w.Worker. redirector == queues && Worker. is_active w)
1250+ (fun w ->
1251+ Redirector. is_same_redirector w.Worker. redirector queues
1252+ && Worker. is_active w
1253+ )
12271254 ! pool
12281255 |> List. filter (fun x -> x)
12291256 |> List. length
12301257 )
12311258
12321259 let find_one queues f =
12331260 List. fold_left
1234- (fun acc x -> acc || (x.Worker. redirector == queues && f x))
1261+ (fun acc x ->
1262+ acc || (Redirector. is_same_redirector x.Worker. redirector queues && f x)
1263+ )
12351264 false
12361265
12371266 (* Clean up any shutdown threads and remove them from the master list *)
12381267 let gc queues pool =
12391268 List. fold_left
12401269 (fun acc w ->
1241- (* we do not want to use = when comparing queues: queues can contain
1242- (uncomparable) functions, and we are only interested in comparing the
1243- equality of their static references *)
1244- if w.Worker. redirector == queues && Worker. get_state w = Worker. Shutdown
1270+ if
1271+ Redirector. is_same_redirector w.Worker. redirector queues
1272+ && Worker. get_state w = Worker. Shutdown
12451273 then (
12461274 Worker. join w ; acc
12471275 ) else
@@ -1268,7 +1296,8 @@ module WorkerPool = struct
12681296 let start size =
12691297 for _i = 1 to size do
12701298 incr Redirector. default ;
1271- incr Redirector. parallel_queues
1299+ incr Redirector. parallel_queues ;
1300+ incr Redirector. nested_parallel_queues
12721301 done
12731302
12741303 let set_size size =
@@ -1283,7 +1312,8 @@ module WorkerPool = struct
12831312 done
12841313 in
12851314 inner Redirector. default ;
1286- inner Redirector. parallel_queues
1315+ inner Redirector. parallel_queues ;
1316+ inner Redirector. nested_parallel_queues
12871317end
12881318
12891319(* Keep track of which VMs we're rebooting so we avoid transient glitches where
@@ -1584,6 +1614,11 @@ let collect_into apply = function [] -> [] | [op] -> [op] | lst -> apply lst
15841614let parallel name ~id =
15851615 collect_into (fun ls -> [Parallel (id, Printf. sprintf " %s VM=%s" name id, ls)])
15861616
1617+ let nested_parallel name ~id =
1618+ collect_into (fun ls ->
1619+ [Nested_parallel (id, Printf. sprintf " %s VM=%s" name id, ls)]
1620+ )
1621+
15871622let serial name ~id =
15881623 collect_into (fun ls -> [Serial (id, Printf. sprintf " %s VM=%s" name id, ls)])
15891624
@@ -1593,6 +1628,9 @@ let serial_concat name ~id lst = serial name ~id (List.concat lst)
15931628
15941629let parallel_map name ~id lst f = parallel name ~id (List. concat_map f lst)
15951630
1631+ let nested_parallel_map name ~id lst f =
1632+ nested_parallel name ~id (List. concat_map f lst)
1633+
15961634let map_or_empty f x = Option. value ~default: [] (Option. map f x)
15971635
15981636(* Creates a Serial of 2 or more Atomics. If the number of Atomics could be
@@ -1630,7 +1668,7 @@ let rec atomics_of_operation = function
16301668 let pf = Printf. sprintf in
16311669 let name_multi = pf " VBDs.activate_epoch_and_plug %s" typ in
16321670 let name_one = pf " VBD.activate_epoch_and_plug %s" typ in
1633- parallel_map name_multi ~id vbds (fun vbd ->
1671+ nested_parallel_map name_multi ~id vbds (fun vbd ->
16341672 serial_concat name_one ~id
16351673 [
16361674 [VBD_set_active (vbd.Vbd. id, true )]
@@ -1664,11 +1702,11 @@ let rec atomics_of_operation = function
16641702 vifs
16651703 ; serial_concat " VGPUs.activate & PCI.plug (SRIOV)" ~id
16661704 [
1667- parallel_map " VGPUs.activate" ~id vgpus (fun vgpu ->
1705+ nested_parallel_map " VGPUs.activate" ~id vgpus (fun vgpu ->
16681706 [VGPU_set_active (vgpu.Vgpu. id, true )]
16691707 )
1670- ; parallel_map " PCIs.plug (SRIOV)" ~id pcis_sriov ( fun pci ->
1671- [PCI_plug (pci.Pci. id, false )]
1708+ ; nested_parallel_map " PCIs.plug (SRIOV)" ~id pcis_sriov
1709+ ( fun pci -> [PCI_plug (pci.Pci. id, false )]
16721710 )
16731711 ]
16741712 ]
@@ -1883,56 +1921,9 @@ let rec perform_atomic ~progress_callback ?result (op : atomic)
18831921 (Printexc. to_string e)
18841922 )
18851923 | Parallel (_id , description , atoms ) ->
1886- (* parallel_id is a unused unique name prefix for a parallel worker queue *)
1887- let parallel_id =
1888- Printf. sprintf " Parallel:task=%s.atoms=%d.(%s)"
1889- (Xenops_task. id_of_handle t)
1890- (List. length atoms) description
1891- in
1892- let with_tracing = id_with_tracing parallel_id t in
1893- debug " begin_%s" parallel_id ;
1894- let task_list =
1895- queue_atomics_and_wait ~progress_callback ~max_parallel_atoms: 10
1896- with_tracing parallel_id atoms
1897- in
1898- debug " end_%s" parallel_id ;
1899- (* make sure that we destroy all the parallel tasks that finished *)
1900- let errors =
1901- List. map
1902- (fun (id , task_handle , task_state ) ->
1903- match task_state with
1904- | Some (Task. Completed _ ) ->
1905- TASK. destroy' id ; None
1906- | Some (Task. Failed e ) ->
1907- TASK. destroy' id ;
1908- let e =
1909- match Rpcmarshal. unmarshal Errors. error.Rpc.Types. ty e with
1910- | Ok x ->
1911- Xenopsd_error x
1912- | Error (`Msg x ) ->
1913- internal_error " Error unmarshalling failure: %s" x
1914- in
1915- Some e
1916- | None | Some (Task. Pending _ ) ->
1917- (* Because pending tasks are filtered out in
1918- queue_atomics_and_wait with task_ended the second case will
1919- never be encountered. The previous boolean used in
1920- event_wait was enough to express the possible cases *)
1921- let err_msg =
1922- Printf. sprintf " Timed out while waiting on task %s (%s)" id
1923- (Xenops_task. get_dbg task_handle)
1924- in
1925- error " %s" err_msg ;
1926- Xenops_task. cancel task_handle ;
1927- Some (Xenopsd_error (Internal_error err_msg))
1928- )
1929- task_list
1930- in
1931- (* if any error was present, raise first one, so that
1932- trigger_cleanup_after_failure is called *)
1933- List. iter
1934- (fun err -> match err with None -> () | Some e -> raise e)
1935- errors
1924+ parallel_atomic ~progress_callback ~description ~nested: false atoms t
1925+ | Nested_parallel (_id , description , atoms ) ->
1926+ parallel_atomic ~progress_callback ~description ~nested: true atoms t
19361927 | Serial (_ , _ , atoms ) ->
19371928 List. iter (Fun. flip (perform_atomic ~progress_callback ) t) atoms
19381929 | VIF_plug id ->
@@ -2361,7 +2352,64 @@ let rec perform_atomic ~progress_callback ?result (op : atomic)
23612352 debug " VM.soft_reset %s" id ;
23622353 B.VM. soft_reset t (VM_DB. read_exn id)
23632354
2364- and queue_atomic_int ~progress_callback dbg id op =
2355+ and parallel_atomic ~progress_callback ~description ~nested atoms t =
2356+ (* parallel_id is a unused unique name prefix for a parallel worker queue *)
2357+ let redirector =
2358+ if nested then
2359+ Redirector. nested_parallel_queues
2360+ else
2361+ Redirector. parallel_queues
2362+ in
2363+ let parallel_id =
2364+ Printf. sprintf " %s:task=%s.atoms=%d.(%s)"
2365+ (Redirector. to_string redirector)
2366+ (Xenops_task. id_of_handle t)
2367+ (List. length atoms) description
2368+ in
2369+ let with_tracing = id_with_tracing parallel_id t in
2370+ debug " begin_%s" parallel_id ;
2371+ let task_list =
2372+ queue_atomics_and_wait ~progress_callback ~max_parallel_atoms: 10
2373+ with_tracing parallel_id atoms redirector
2374+ in
2375+ debug " end_%s" parallel_id ;
2376+ (* make sure that we destroy all the parallel tasks that finished *)
2377+ let errors =
2378+ List. map
2379+ (fun (id , task_handle , task_state ) ->
2380+ match task_state with
2381+ | Some (Task. Completed _ ) ->
2382+ TASK. destroy' id ; None
2383+ | Some (Task. Failed e ) ->
2384+ TASK. destroy' id ;
2385+ let e =
2386+ match Rpcmarshal. unmarshal Errors. error.Rpc.Types. ty e with
2387+ | Ok x ->
2388+ Xenopsd_error x
2389+ | Error (`Msg x ) ->
2390+ internal_error " Error unmarshalling failure: %s" x
2391+ in
2392+ Some e
2393+ | None | Some (Task. Pending _ ) ->
2394+ (* Because pending tasks are filtered out in
2395+ queue_atomics_and_wait with task_ended the second case will
2396+ never be encountered. The previous boolean used in
2397+ event_wait was enough to express the possible cases *)
2398+ let err_msg =
2399+ Printf. sprintf " Timed out while waiting on task %s (%s)" id
2400+ (Xenops_task. get_dbg task_handle)
2401+ in
2402+ error " %s" err_msg ;
2403+ Xenops_task. cancel task_handle ;
2404+ Some (Xenopsd_error (Internal_error err_msg))
2405+ )
2406+ task_list
2407+ in
2408+ (* if any error was present, raise first one, so that
2409+ trigger_cleanup_after_failure is called *)
2410+ List. iter (fun err -> match err with None -> () | Some e -> raise e) errors
2411+
2412+ and queue_atomic_int ~progress_callback dbg id op redirector =
23652413 let task =
23662414 Xenops_task. add tasks dbg
23672415 (let r = ref None in
@@ -2370,10 +2418,12 @@ and queue_atomic_int ~progress_callback dbg id op =
23702418 ! r
23712419 )
23722420 in
2373- Redirector. push Redirector. parallel_queues id (Atomic op, task) ;
2421+ debug " Adding to %s queues" (Redirector. to_string redirector) ;
2422+ Redirector. push redirector id (Atomic op, task) ;
23742423 task
23752424
2376- and queue_atomics_and_wait ~progress_callback ~max_parallel_atoms dbg id ops =
2425+ and queue_atomics_and_wait ~progress_callback ~max_parallel_atoms dbg id ops
2426+ redirector =
23772427 let from = Updates. last_id dbg updates in
23782428 Xenops_utils. chunks max_parallel_atoms ops
23792429 |> List. mapi (fun chunk_idx ops ->
@@ -2386,7 +2436,9 @@ and queue_atomics_and_wait ~progress_callback ~max_parallel_atoms dbg id ops =
23862436 let atom_id =
23872437 Printf. sprintf " %s.chunk=%d.atom=%d" id chunk_idx atom_idx
23882438 in
2389- (queue_atomic_int ~progress_callback dbg atom_id op, op)
2439+ ( queue_atomic_int ~progress_callback dbg atom_id op redirector
2440+ , op
2441+ )
23902442 )
23912443 ops
23922444 in
@@ -2562,7 +2614,9 @@ and trigger_cleanup_after_failure_atom op t =
25622614 immediate_operation dbg id (VM_check_state id)
25632615 | Best_effort op ->
25642616 trigger_cleanup_after_failure_atom op t
2565- | Parallel (_id , _description , ops ) | Serial (_id , _description , ops ) ->
2617+ | Parallel (_id, _description, ops)
2618+ | Nested_parallel (_id, _description, ops)
2619+ | Serial (_id , _description , ops ) ->
25662620 List. iter (fun op -> trigger_cleanup_after_failure_atom op t) ops
25672621 | VM_rename (id1 , id2 , _ ) ->
25682622 immediate_operation dbg id1 (VM_check_state id1) ;
0 commit comments