@@ -168,6 +168,8 @@ type atomic =
168
168
| VM_rename of (Vm .id * Vm .id * rename_when )
169
169
| VM_import_metadata of (Vm .id * Metadata .t )
170
170
| Parallel of Vm .id * string * atomic list
171
+ | Nested_parallel of Vm .id * string * atomic list
172
+ (* * used to make nested parallel atoms explicit, as each atom requires its own worker *)
171
173
| Serial of Vm .id * string * atomic list
172
174
| Best_effort of atomic
173
175
[@@ deriving rpcty ]
@@ -286,6 +288,9 @@ let rec name_of_atomic = function
286
288
| Parallel (_ , _ , atomics ) ->
287
289
Printf. sprintf " Parallel (%s)"
288
290
(String. concat " | " (List. map name_of_atomic atomics))
291
+ | Nested_parallel (_ , _ , atomics ) ->
292
+ Printf. sprintf " Nested_parallel (%s)"
293
+ (String. concat " | " (List. map name_of_atomic atomics))
289
294
| Serial (_ , _ , atomics ) ->
290
295
Printf. sprintf " Serial (%s)"
291
296
(String. concat " & " (List. map name_of_atomic atomics))
@@ -295,7 +300,7 @@ let rec name_of_atomic = function
295
300
let rec atomic_expires_after = function
296
301
| Serial (_ , _ , ops ) ->
297
302
List. map atomic_expires_after ops |> List. fold_left ( +. ) 0.
298
- | Parallel (_ , _ , ops ) ->
303
+ | Parallel (_ , _ , ops ) | Nested_parallel ( _ , _ , ops ) ->
299
304
List. map atomic_expires_after ops |> List. fold_left Float. max 0.
300
305
| _ ->
301
306
(* 20 minutes, in seconds *)
@@ -916,6 +921,27 @@ module Redirector = struct
916
921
Parallel atoms, creating a deadlock. *)
917
922
let parallel_queues = {queues= Queues. create () ; mutex= Mutex. create () }
918
923
924
+ (* We create another queue only for Nested_parallel atoms for the same reason
925
+ as parallel_queues. When a Nested_parallel atom is inside a Parallel atom,
926
+ they are both using a worker whilst not doing any work, so they each need
927
+ additional space to prevent a deadlock. *)
928
+ let nested_parallel_queues =
929
+ {queues= Queues. create () ; mutex= Mutex. create () }
930
+
931
+ (* we do not want to use = when comparing queues: queues can contain
932
+ (uncomparable) functions, and we are only interested in comparing the
933
+ equality of their static references *)
934
+ let is_same_redirector q1 q2 = q1 == q2
935
+
936
+ let to_string r =
937
+ match r with
938
+ | w when is_same_redirector w parallel_queues ->
939
+ " Parallel"
940
+ | w when is_same_redirector w nested_parallel_queues ->
941
+ " Nested_parallel"
942
+ | _ ->
943
+ " Default"
944
+
919
945
(* When a thread is actively processing a queue, items are redirected to a
920
946
thread-private queue *)
921
947
let overrides = ref StringMap. empty
@@ -1035,6 +1061,7 @@ module Redirector = struct
1035
1061
List. concat_map one
1036
1062
(default.queues
1037
1063
:: parallel_queues.queues
1064
+ :: nested_parallel_queues.queues
1038
1065
:: List. map snd (StringMap. bindings ! overrides)
1039
1066
)
1040
1067
)
@@ -1219,29 +1246,30 @@ module WorkerPool = struct
1219
1246
operate *)
1220
1247
let count_active queues =
1221
1248
with_lock m (fun () ->
1222
- (* we do not want to use = when comparing queues: queues can contain
1223
- (uncomparable) functions, and we are only interested in comparing the
1224
- equality of their static references *)
1225
1249
List. map
1226
- (fun w -> w.Worker. redirector == queues && Worker. is_active w)
1250
+ (fun w ->
1251
+ Redirector. is_same_redirector w.Worker. redirector queues
1252
+ && Worker. is_active w
1253
+ )
1227
1254
! pool
1228
1255
|> List. filter (fun x -> x)
1229
1256
|> List. length
1230
1257
)
1231
1258
1232
1259
let find_one queues f =
1233
1260
List. fold_left
1234
- (fun acc x -> acc || (x.Worker. redirector == queues && f x))
1261
+ (fun acc x ->
1262
+ acc || (Redirector. is_same_redirector x.Worker. redirector queues && f x)
1263
+ )
1235
1264
false
1236
1265
1237
1266
(* Clean up any shutdown threads and remove them from the master list *)
1238
1267
let gc queues pool =
1239
1268
List. fold_left
1240
1269
(fun acc w ->
1241
- (* we do not want to use = when comparing queues: queues can contain
1242
- (uncomparable) functions, and we are only interested in comparing the
1243
- equality of their static references *)
1244
- if w.Worker. redirector == queues && Worker. get_state w = Worker. Shutdown
1270
+ if
1271
+ Redirector. is_same_redirector w.Worker. redirector queues
1272
+ && Worker. get_state w = Worker. Shutdown
1245
1273
then (
1246
1274
Worker. join w ; acc
1247
1275
) else
@@ -1268,7 +1296,8 @@ module WorkerPool = struct
1268
1296
let start size =
1269
1297
for _i = 1 to size do
1270
1298
incr Redirector. default ;
1271
- incr Redirector. parallel_queues
1299
+ incr Redirector. parallel_queues ;
1300
+ incr Redirector. nested_parallel_queues
1272
1301
done
1273
1302
1274
1303
let set_size size =
@@ -1283,7 +1312,8 @@ module WorkerPool = struct
1283
1312
done
1284
1313
in
1285
1314
inner Redirector. default ;
1286
- inner Redirector. parallel_queues
1315
+ inner Redirector. parallel_queues ;
1316
+ inner Redirector. nested_parallel_queues
1287
1317
end
1288
1318
1289
1319
(* Keep track of which VMs we're rebooting so we avoid transient glitches where
@@ -1584,6 +1614,11 @@ let collect_into apply = function [] -> [] | [op] -> [op] | lst -> apply lst
1584
1614
let parallel name ~id =
1585
1615
collect_into (fun ls -> [Parallel (id, Printf. sprintf " %s VM=%s" name id, ls)])
1586
1616
1617
+ let nested_parallel name ~id =
1618
+ collect_into (fun ls ->
1619
+ [Nested_parallel (id, Printf. sprintf " %s VM=%s" name id, ls)]
1620
+ )
1621
+
1587
1622
let serial name ~id =
1588
1623
collect_into (fun ls -> [Serial (id, Printf. sprintf " %s VM=%s" name id, ls)])
1589
1624
@@ -1593,6 +1628,9 @@ let serial_concat name ~id lst = serial name ~id (List.concat lst)
1593
1628
1594
1629
let parallel_map name ~id lst f = parallel name ~id (List. concat_map f lst)
1595
1630
1631
+ let nested_parallel_map name ~id lst f =
1632
+ nested_parallel name ~id (List. concat_map f lst)
1633
+
1596
1634
let map_or_empty f x = Option. value ~default: [] (Option. map f x)
1597
1635
1598
1636
(* Creates a Serial of 2 or more Atomics. If the number of Atomics could be
@@ -1630,7 +1668,7 @@ let rec atomics_of_operation = function
1630
1668
let pf = Printf. sprintf in
1631
1669
let name_multi = pf " VBDs.activate_epoch_and_plug %s" typ in
1632
1670
let name_one = pf " VBD.activate_epoch_and_plug %s" typ in
1633
- parallel_map name_multi ~id vbds (fun vbd ->
1671
+ nested_parallel_map name_multi ~id vbds (fun vbd ->
1634
1672
serial_concat name_one ~id
1635
1673
[
1636
1674
[VBD_set_active (vbd.Vbd. id, true )]
@@ -1664,11 +1702,11 @@ let rec atomics_of_operation = function
1664
1702
vifs
1665
1703
; serial_concat " VGPUs.activate & PCI.plug (SRIOV)" ~id
1666
1704
[
1667
- parallel_map " VGPUs.activate" ~id vgpus (fun vgpu ->
1705
+ nested_parallel_map " VGPUs.activate" ~id vgpus (fun vgpu ->
1668
1706
[VGPU_set_active (vgpu.Vgpu. id, true )]
1669
1707
)
1670
- ; parallel_map " PCIs.plug (SRIOV)" ~id pcis_sriov ( fun pci ->
1671
- [PCI_plug (pci.Pci. id, false )]
1708
+ ; nested_parallel_map " PCIs.plug (SRIOV)" ~id pcis_sriov
1709
+ ( fun pci -> [PCI_plug (pci.Pci. id, false )]
1672
1710
)
1673
1711
]
1674
1712
]
@@ -1883,56 +1921,9 @@ let rec perform_atomic ~progress_callback ?result (op : atomic)
1883
1921
(Printexc. to_string e)
1884
1922
)
1885
1923
| Parallel (_id , description , atoms ) ->
1886
- (* parallel_id is a unused unique name prefix for a parallel worker queue *)
1887
- let parallel_id =
1888
- Printf. sprintf " Parallel:task=%s.atoms=%d.(%s)"
1889
- (Xenops_task. id_of_handle t)
1890
- (List. length atoms) description
1891
- in
1892
- let with_tracing = id_with_tracing parallel_id t in
1893
- debug " begin_%s" parallel_id ;
1894
- let task_list =
1895
- queue_atomics_and_wait ~progress_callback ~max_parallel_atoms: 10
1896
- with_tracing parallel_id atoms
1897
- in
1898
- debug " end_%s" parallel_id ;
1899
- (* make sure that we destroy all the parallel tasks that finished *)
1900
- let errors =
1901
- List. map
1902
- (fun (id , task_handle , task_state ) ->
1903
- match task_state with
1904
- | Some (Task. Completed _ ) ->
1905
- TASK. destroy' id ; None
1906
- | Some (Task. Failed e ) ->
1907
- TASK. destroy' id ;
1908
- let e =
1909
- match Rpcmarshal. unmarshal Errors. error.Rpc.Types. ty e with
1910
- | Ok x ->
1911
- Xenopsd_error x
1912
- | Error (`Msg x ) ->
1913
- internal_error " Error unmarshalling failure: %s" x
1914
- in
1915
- Some e
1916
- | None | Some (Task. Pending _ ) ->
1917
- (* Because pending tasks are filtered out in
1918
- queue_atomics_and_wait with task_ended the second case will
1919
- never be encountered. The previous boolean used in
1920
- event_wait was enough to express the possible cases *)
1921
- let err_msg =
1922
- Printf. sprintf " Timed out while waiting on task %s (%s)" id
1923
- (Xenops_task. get_dbg task_handle)
1924
- in
1925
- error " %s" err_msg ;
1926
- Xenops_task. cancel task_handle ;
1927
- Some (Xenopsd_error (Internal_error err_msg))
1928
- )
1929
- task_list
1930
- in
1931
- (* if any error was present, raise first one, so that
1932
- trigger_cleanup_after_failure is called *)
1933
- List. iter
1934
- (fun err -> match err with None -> () | Some e -> raise e)
1935
- errors
1924
+ parallel_atomic ~progress_callback ~description ~nested: false atoms t
1925
+ | Nested_parallel (_id , description , atoms ) ->
1926
+ parallel_atomic ~progress_callback ~description ~nested: true atoms t
1936
1927
| Serial (_ , _ , atoms ) ->
1937
1928
List. iter (Fun. flip (perform_atomic ~progress_callback ) t) atoms
1938
1929
| VIF_plug id ->
@@ -2361,7 +2352,64 @@ let rec perform_atomic ~progress_callback ?result (op : atomic)
2361
2352
debug " VM.soft_reset %s" id ;
2362
2353
B.VM. soft_reset t (VM_DB. read_exn id)
2363
2354
2364
- and queue_atomic_int ~progress_callback dbg id op =
2355
+ and parallel_atomic ~progress_callback ~description ~nested atoms t =
2356
+ (* parallel_id is a unused unique name prefix for a parallel worker queue *)
2357
+ let redirector =
2358
+ if nested then
2359
+ Redirector. nested_parallel_queues
2360
+ else
2361
+ Redirector. parallel_queues
2362
+ in
2363
+ let parallel_id =
2364
+ Printf. sprintf " %s:task=%s.atoms=%d.(%s)"
2365
+ (Redirector. to_string redirector)
2366
+ (Xenops_task. id_of_handle t)
2367
+ (List. length atoms) description
2368
+ in
2369
+ let with_tracing = id_with_tracing parallel_id t in
2370
+ debug " begin_%s" parallel_id ;
2371
+ let task_list =
2372
+ queue_atomics_and_wait ~progress_callback ~max_parallel_atoms: 10
2373
+ with_tracing parallel_id atoms redirector
2374
+ in
2375
+ debug " end_%s" parallel_id ;
2376
+ (* make sure that we destroy all the parallel tasks that finished *)
2377
+ let errors =
2378
+ List. map
2379
+ (fun (id , task_handle , task_state ) ->
2380
+ match task_state with
2381
+ | Some (Task. Completed _ ) ->
2382
+ TASK. destroy' id ; None
2383
+ | Some (Task. Failed e ) ->
2384
+ TASK. destroy' id ;
2385
+ let e =
2386
+ match Rpcmarshal. unmarshal Errors. error.Rpc.Types. ty e with
2387
+ | Ok x ->
2388
+ Xenopsd_error x
2389
+ | Error (`Msg x ) ->
2390
+ internal_error " Error unmarshalling failure: %s" x
2391
+ in
2392
+ Some e
2393
+ | None | Some (Task. Pending _ ) ->
2394
+ (* Because pending tasks are filtered out in
2395
+ queue_atomics_and_wait with task_ended the second case will
2396
+ never be encountered. The previous boolean used in
2397
+ event_wait was enough to express the possible cases *)
2398
+ let err_msg =
2399
+ Printf. sprintf " Timed out while waiting on task %s (%s)" id
2400
+ (Xenops_task. get_dbg task_handle)
2401
+ in
2402
+ error " %s" err_msg ;
2403
+ Xenops_task. cancel task_handle ;
2404
+ Some (Xenopsd_error (Internal_error err_msg))
2405
+ )
2406
+ task_list
2407
+ in
2408
+ (* if any error was present, raise first one, so that
2409
+ trigger_cleanup_after_failure is called *)
2410
+ List. iter (fun err -> match err with None -> () | Some e -> raise e) errors
2411
+
2412
+ and queue_atomic_int ~progress_callback dbg id op redirector =
2365
2413
let task =
2366
2414
Xenops_task. add tasks dbg
2367
2415
(let r = ref None in
@@ -2370,10 +2418,12 @@ and queue_atomic_int ~progress_callback dbg id op =
2370
2418
! r
2371
2419
)
2372
2420
in
2373
- Redirector. push Redirector. parallel_queues id (Atomic op, task) ;
2421
+ debug " Adding to %s queues" (Redirector. to_string redirector) ;
2422
+ Redirector. push redirector id (Atomic op, task) ;
2374
2423
task
2375
2424
2376
- and queue_atomics_and_wait ~progress_callback ~max_parallel_atoms dbg id ops =
2425
+ and queue_atomics_and_wait ~progress_callback ~max_parallel_atoms dbg id ops
2426
+ redirector =
2377
2427
let from = Updates. last_id dbg updates in
2378
2428
Xenops_utils. chunks max_parallel_atoms ops
2379
2429
|> List. mapi (fun chunk_idx ops ->
@@ -2386,7 +2436,9 @@ and queue_atomics_and_wait ~progress_callback ~max_parallel_atoms dbg id ops =
2386
2436
let atom_id =
2387
2437
Printf. sprintf " %s.chunk=%d.atom=%d" id chunk_idx atom_idx
2388
2438
in
2389
- (queue_atomic_int ~progress_callback dbg atom_id op, op)
2439
+ ( queue_atomic_int ~progress_callback dbg atom_id op redirector
2440
+ , op
2441
+ )
2390
2442
)
2391
2443
ops
2392
2444
in
@@ -2562,7 +2614,9 @@ and trigger_cleanup_after_failure_atom op t =
2562
2614
immediate_operation dbg id (VM_check_state id)
2563
2615
| Best_effort op ->
2564
2616
trigger_cleanup_after_failure_atom op t
2565
- | Parallel (_id , _description , ops ) | Serial (_id , _description , ops ) ->
2617
+ | Parallel (_id, _description, ops)
2618
+ | Nested_parallel (_id, _description, ops)
2619
+ | Serial (_id , _description , ops ) ->
2566
2620
List. iter (fun op -> trigger_cleanup_after_failure_atom op t) ops
2567
2621
| VM_rename (id1 , id2 , _ ) ->
2568
2622
immediate_operation dbg id1 (VM_check_state id1) ;
0 commit comments