@@ -168,6 +168,8 @@ type atomic =
168
168
| VM_rename of (Vm .id * Vm .id * rename_when )
169
169
| VM_import_metadata of (Vm .id * Metadata .t )
170
170
| Parallel of Vm .id * string * atomic list
171
+ | Nested_parallel of Vm .id * string * atomic list
172
+ (* * used to make nested parallel atoms explicit, as each atom requires its own worker *)
171
173
| Serial of Vm .id * string * atomic list
172
174
| Best_effort of atomic
173
175
[@@ deriving rpcty ]
@@ -286,6 +288,9 @@ let rec name_of_atomic = function
286
288
| Parallel (_ , _ , atomics ) ->
287
289
Printf. sprintf " Parallel (%s)"
288
290
(String. concat " | " (List. map name_of_atomic atomics))
291
+ | Nested_parallel (_ , _ , atomics ) ->
292
+ Printf. sprintf " Nested_parallel (%s)"
293
+ (String. concat " | " (List. map name_of_atomic atomics))
289
294
| Serial (_ , _ , atomics ) ->
290
295
Printf. sprintf " Serial (%s)"
291
296
(String. concat " & " (List. map name_of_atomic atomics))
@@ -295,7 +300,7 @@ let rec name_of_atomic = function
295
300
let rec atomic_expires_after = function
296
301
| Serial (_ , _ , ops ) ->
297
302
List. map atomic_expires_after ops |> List. fold_left ( +. ) 0.
298
- | Parallel (_ , _ , ops ) ->
303
+ | Parallel (_ , _ , ops ) | Nested_parallel ( _ , _ , ops ) ->
299
304
List. map atomic_expires_after ops |> List. fold_left Float. max 0.
300
305
| _ ->
301
306
(* 20 minutes, in seconds *)
@@ -916,6 +921,28 @@ module Redirector = struct
916
921
Parallel atoms, creating a deadlock. *)
917
922
let parallel_queues = {queues= Queues. create () ; mutex= Mutex. create () }
918
923
924
+ (* We create another queue only for Nested_parallel atoms for the same reason
925
+ as parallel_queues. When a Nested_parallel atom is inside a Parallel atom,
926
+ they are both using a worker whilst not doing any work, so they each need
927
+ additional space to prevent a deadlock. *)
928
+ let nested_parallel_queues =
929
+ {queues= Queues. create () ; mutex= Mutex. create () }
930
+
931
+ (* we do not want to use = when comparing queues: queues can contain
932
+ (uncomparable) functions, and we are only interested in comparing the
933
+ equality of their static references *)
934
+ let is_same_redirector q1 q2 =
935
+ q1 == q2
936
+
937
+ let to_string r =
938
+ match r with
939
+ | w when is_same_redirector w parallel_queues ->
940
+ " Parallel"
941
+ | w when is_same_redirector w nested_parallel_queues ->
942
+ " Nested_parallel"
943
+ | _ ->
944
+ " Default"
945
+
919
946
(* When a thread is actively processing a queue, items are redirected to a
920
947
thread-private queue *)
921
948
let overrides = ref StringMap. empty
@@ -1035,6 +1062,7 @@ module Redirector = struct
1035
1062
List. concat_map one
1036
1063
(default.queues
1037
1064
:: parallel_queues.queues
1065
+ :: nested_parallel_queues.queues
1038
1066
:: List. map snd (StringMap. bindings ! overrides)
1039
1067
)
1040
1068
)
@@ -1219,29 +1247,23 @@ module WorkerPool = struct
1219
1247
operate *)
1220
1248
let count_active queues =
1221
1249
with_lock m (fun () ->
1222
- (* we do not want to use = when comparing queues: queues can contain
1223
- (uncomparable) functions, and we are only interested in comparing the
1224
- equality of their static references *)
1225
1250
List. map
1226
- (fun w -> w.Worker. redirector == queues && Worker. is_active w)
1251
+ (fun w -> ( Redirector. is_same_redirector w.Worker. redirector queues) && Worker. is_active w)
1227
1252
! pool
1228
1253
|> List. filter (fun x -> x)
1229
1254
|> List. length
1230
1255
)
1231
1256
1232
1257
let find_one queues f =
1233
1258
List. fold_left
1234
- (fun acc x -> acc || (x.Worker. redirector == queues && f x))
1259
+ (fun acc x -> acc || (( Redirector. is_same_redirector x.Worker. redirector queues) && f x))
1235
1260
false
1236
1261
1237
1262
(* Clean up any shutdown threads and remove them from the master list *)
1238
1263
let gc queues pool =
1239
1264
List. fold_left
1240
1265
(fun acc w ->
1241
- (* we do not want to use = when comparing queues: queues can contain
1242
- (uncomparable) functions, and we are only interested in comparing the
1243
- equality of their static references *)
1244
- if w.Worker. redirector == queues && Worker. get_state w = Worker. Shutdown
1266
+ if (Redirector. is_same_redirector w.Worker. redirector queues) && Worker. get_state w = Worker. Shutdown
1245
1267
then (
1246
1268
Worker. join w ; acc
1247
1269
) else
@@ -1268,7 +1290,8 @@ module WorkerPool = struct
1268
1290
let start size =
1269
1291
for _i = 1 to size do
1270
1292
incr Redirector. default ;
1271
- incr Redirector. parallel_queues
1293
+ incr Redirector. parallel_queues ;
1294
+ incr Redirector. nested_parallel_queues
1272
1295
done
1273
1296
1274
1297
let set_size size =
@@ -1283,7 +1306,8 @@ module WorkerPool = struct
1283
1306
done
1284
1307
in
1285
1308
inner Redirector. default ;
1286
- inner Redirector. parallel_queues
1309
+ inner Redirector. parallel_queues ;
1310
+ inner Redirector. nested_parallel_queues
1287
1311
end
1288
1312
1289
1313
(* Keep track of which VMs we're rebooting so we avoid transient glitches where
@@ -1584,6 +1608,11 @@ let collect_into apply = function [] -> [] | [op] -> [op] | lst -> apply lst
1584
1608
let parallel name ~id =
1585
1609
collect_into (fun ls -> [Parallel (id, Printf. sprintf " %s VM=%s" name id, ls)])
1586
1610
1611
+ let nested_parallel name ~id =
1612
+ collect_into (fun ls ->
1613
+ [Nested_parallel (id, Printf. sprintf " %s VM=%s" name id, ls)]
1614
+ )
1615
+
1587
1616
let serial name ~id =
1588
1617
collect_into (fun ls -> [Serial (id, Printf. sprintf " %s VM=%s" name id, ls)])
1589
1618
@@ -1593,6 +1622,9 @@ let serial_concat name ~id lst = serial name ~id (List.concat lst)
1593
1622
1594
1623
let parallel_map name ~id lst f = parallel name ~id (List. concat_map f lst)
1595
1624
1625
+ let nested_parallel_map name ~id lst f =
1626
+ nested_parallel name ~id (List. concat_map f lst)
1627
+
1596
1628
let map_or_empty f x = Option. value ~default: [] (Option. map f x)
1597
1629
1598
1630
(* Creates a Serial of 2 or more Atomics. If the number of Atomics could be
@@ -1630,7 +1662,7 @@ let rec atomics_of_operation = function
1630
1662
let pf = Printf. sprintf in
1631
1663
let name_multi = pf " VBDs.activate_epoch_and_plug %s" typ in
1632
1664
let name_one = pf " VBD.activate_epoch_and_plug %s" typ in
1633
- parallel_map name_multi ~id vbds (fun vbd ->
1665
+ nested_parallel_map name_multi ~id vbds (fun vbd ->
1634
1666
serial_concat name_one ~id
1635
1667
[
1636
1668
[VBD_set_active (vbd.Vbd. id, true )]
@@ -1664,11 +1696,11 @@ let rec atomics_of_operation = function
1664
1696
vifs
1665
1697
; serial_concat " VGPUs.activate & PCI.plug (SRIOV)" ~id
1666
1698
[
1667
- parallel_map " VGPUs.activate" ~id vgpus (fun vgpu ->
1699
+ nested_parallel_map " VGPUs.activate" ~id vgpus (fun vgpu ->
1668
1700
[VGPU_set_active (vgpu.Vgpu. id, true )]
1669
1701
)
1670
- ; parallel_map " PCIs.plug (SRIOV)" ~id pcis_sriov ( fun pci ->
1671
- [PCI_plug (pci.Pci. id, false )]
1702
+ ; nested_parallel_map " PCIs.plug (SRIOV)" ~id pcis_sriov
1703
+ ( fun pci -> [PCI_plug (pci.Pci. id, false )]
1672
1704
)
1673
1705
]
1674
1706
]
@@ -1883,56 +1915,9 @@ let rec perform_atomic ~progress_callback ?result (op : atomic)
1883
1915
(Printexc. to_string e)
1884
1916
)
1885
1917
| Parallel (_id , description , atoms ) ->
1886
- (* parallel_id is a unused unique name prefix for a parallel worker queue *)
1887
- let parallel_id =
1888
- Printf. sprintf " Parallel:task=%s.atoms=%d.(%s)"
1889
- (Xenops_task. id_of_handle t)
1890
- (List. length atoms) description
1891
- in
1892
- let with_tracing = id_with_tracing parallel_id t in
1893
- debug " begin_%s" parallel_id ;
1894
- let task_list =
1895
- queue_atomics_and_wait ~progress_callback ~max_parallel_atoms: 10
1896
- with_tracing parallel_id atoms
1897
- in
1898
- debug " end_%s" parallel_id ;
1899
- (* make sure that we destroy all the parallel tasks that finished *)
1900
- let errors =
1901
- List. map
1902
- (fun (id , task_handle , task_state ) ->
1903
- match task_state with
1904
- | Some (Task. Completed _ ) ->
1905
- TASK. destroy' id ; None
1906
- | Some (Task. Failed e ) ->
1907
- TASK. destroy' id ;
1908
- let e =
1909
- match Rpcmarshal. unmarshal Errors. error.Rpc.Types. ty e with
1910
- | Ok x ->
1911
- Xenopsd_error x
1912
- | Error (`Msg x ) ->
1913
- internal_error " Error unmarshalling failure: %s" x
1914
- in
1915
- Some e
1916
- | None | Some (Task. Pending _ ) ->
1917
- (* Because pending tasks are filtered out in
1918
- queue_atomics_and_wait with task_ended the second case will
1919
- never be encountered. The previous boolean used in
1920
- event_wait was enough to express the possible cases *)
1921
- let err_msg =
1922
- Printf. sprintf " Timed out while waiting on task %s (%s)" id
1923
- (Xenops_task. get_dbg task_handle)
1924
- in
1925
- error " %s" err_msg ;
1926
- Xenops_task. cancel task_handle ;
1927
- Some (Xenopsd_error (Internal_error err_msg))
1928
- )
1929
- task_list
1930
- in
1931
- (* if any error was present, raise first one, so that
1932
- trigger_cleanup_after_failure is called *)
1933
- List. iter
1934
- (fun err -> match err with None -> () | Some e -> raise e)
1935
- errors
1918
+ parallel_atomic ~progress_callback ~description ~nested: false atoms t
1919
+ | Nested_parallel (_id , description , atoms ) ->
1920
+ parallel_atomic ~progress_callback ~description ~nested: true atoms t
1936
1921
| Serial (_ , _ , atoms ) ->
1937
1922
List. iter (Fun. flip (perform_atomic ~progress_callback ) t) atoms
1938
1923
| VIF_plug id ->
@@ -2361,7 +2346,64 @@ let rec perform_atomic ~progress_callback ?result (op : atomic)
2361
2346
debug " VM.soft_reset %s" id ;
2362
2347
B.VM. soft_reset t (VM_DB. read_exn id)
2363
2348
2364
- and queue_atomic_int ~progress_callback dbg id op =
2349
+ and parallel_atomic ~progress_callback ~description ~nested atoms t =
2350
+ (* parallel_id is a unused unique name prefix for a parallel worker queue *)
2351
+ let redirector =
2352
+ if nested then
2353
+ Redirector. nested_parallel_queues
2354
+ else
2355
+ Redirector. parallel_queues
2356
+ in
2357
+ let parallel_id =
2358
+ Printf. sprintf " %s:task=%s.atoms=%d.(%s)"
2359
+ (Redirector. to_string redirector)
2360
+ (Xenops_task. id_of_handle t)
2361
+ (List. length atoms) description
2362
+ in
2363
+ let with_tracing = id_with_tracing parallel_id t in
2364
+ debug " begin_%s" parallel_id ;
2365
+ let task_list =
2366
+ queue_atomics_and_wait ~progress_callback ~max_parallel_atoms: 10
2367
+ with_tracing parallel_id atoms redirector
2368
+ in
2369
+ debug " end_%s" parallel_id ;
2370
+ (* make sure that we destroy all the parallel tasks that finished *)
2371
+ let errors =
2372
+ List. map
2373
+ (fun (id , task_handle , task_state ) ->
2374
+ match task_state with
2375
+ | Some (Task. Completed _ ) ->
2376
+ TASK. destroy' id ; None
2377
+ | Some (Task. Failed e ) ->
2378
+ TASK. destroy' id ;
2379
+ let e =
2380
+ match Rpcmarshal. unmarshal Errors. error.Rpc.Types. ty e with
2381
+ | Ok x ->
2382
+ Xenopsd_error x
2383
+ | Error (`Msg x ) ->
2384
+ internal_error " Error unmarshalling failure: %s" x
2385
+ in
2386
+ Some e
2387
+ | None | Some (Task. Pending _ ) ->
2388
+ (* Because pending tasks are filtered out in
2389
+ queue_atomics_and_wait with task_ended the second case will
2390
+ never be encountered. The previous boolean used in
2391
+ event_wait was enough to express the possible cases *)
2392
+ let err_msg =
2393
+ Printf. sprintf " Timed out while waiting on task %s (%s)" id
2394
+ (Xenops_task. get_dbg task_handle)
2395
+ in
2396
+ error " %s" err_msg ;
2397
+ Xenops_task. cancel task_handle ;
2398
+ Some (Xenopsd_error (Internal_error err_msg))
2399
+ )
2400
+ task_list
2401
+ in
2402
+ (* if any error was present, raise first one, so that
2403
+ trigger_cleanup_after_failure is called *)
2404
+ List. iter (fun err -> match err with None -> () | Some e -> raise e) errors
2405
+
2406
+ and queue_atomic_int ~progress_callback dbg id op redirector =
2365
2407
let task =
2366
2408
Xenops_task. add tasks dbg
2367
2409
(let r = ref None in
@@ -2370,10 +2412,12 @@ and queue_atomic_int ~progress_callback dbg id op =
2370
2412
! r
2371
2413
)
2372
2414
in
2373
- Redirector. push Redirector. parallel_queues id (Atomic op, task) ;
2415
+ debug " Adding to %s queues" (Redirector. to_string redirector) ;
2416
+ Redirector. push redirector id (Atomic op, task) ;
2374
2417
task
2375
2418
2376
- and queue_atomics_and_wait ~progress_callback ~max_parallel_atoms dbg id ops =
2419
+ and queue_atomics_and_wait ~progress_callback ~max_parallel_atoms dbg id ops
2420
+ redirector =
2377
2421
let from = Updates. last_id dbg updates in
2378
2422
Xenops_utils. chunks max_parallel_atoms ops
2379
2423
|> List. mapi (fun chunk_idx ops ->
@@ -2386,7 +2430,9 @@ and queue_atomics_and_wait ~progress_callback ~max_parallel_atoms dbg id ops =
2386
2430
let atom_id =
2387
2431
Printf. sprintf " %s.chunk=%d.atom=%d" id chunk_idx atom_idx
2388
2432
in
2389
- (queue_atomic_int ~progress_callback dbg atom_id op, op)
2433
+ ( queue_atomic_int ~progress_callback dbg atom_id op redirector
2434
+ , op
2435
+ )
2390
2436
)
2391
2437
ops
2392
2438
in
@@ -2562,7 +2608,9 @@ and trigger_cleanup_after_failure_atom op t =
2562
2608
immediate_operation dbg id (VM_check_state id)
2563
2609
| Best_effort op ->
2564
2610
trigger_cleanup_after_failure_atom op t
2565
- | Parallel (_id , _description , ops ) | Serial (_id , _description , ops ) ->
2611
+ | Parallel (_id, _description, ops)
2612
+ | Nested_parallel (_id, _description, ops)
2613
+ | Serial (_id , _description , ops ) ->
2566
2614
List. iter (fun op -> trigger_cleanup_after_failure_atom op t) ops
2567
2615
| VM_rename (id1 , id2 , _ ) ->
2568
2616
immediate_operation dbg id1 (VM_check_state id1) ;
0 commit comments