Skip to content

Commit 8a427b9

Browse files
authored
CP-52526: rate limit event updates (#6126)
We generate O(N^2) events when we update O(N) fields: each field update generates an event including the entire object, even if later we are going to change other fields of the same object. Instead of returning the individual field update events immediately (and generating a storm of events whenever an API client watcher for VM power events), we batch these event updates by introducing a minimum amount of time that successive Event.from need to have between them. (The client is working as expected here: when it gets an event and processes it, it immediately calls Event.from to get more events) Although this doesn't guarantee to eliminate the O(N^2) problem, in practice it reduces the overhead significantly. There is one case where we do want almost immediately notification of updates: task completions (because then the client likely wants to send us more tasks). This PR makes the already existing rate limiting in Xapi_event consistent and configurable, but doesn't yet introduce a batching delay for Event.from (it does for Event.next, which is deprecated). A separate PR (or config change) can then enable this for testing purposes, but also allows us to roll the change back by changing the tunable in the config file. There is also a new microbenchmark introduced here, I'll need to update that with the latest results.
2 parents 83f4517 + 257af94 commit 8a427b9

File tree

7 files changed

+158
-14
lines changed

7 files changed

+158
-14
lines changed

dune-project

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,7 @@
327327
(synopsis "The toolstack daemon which implements the XenAPI")
328328
(description "This daemon exposes the XenAPI and is used by clients such as 'xe' and 'XenCenter' to manage clusters of Xen-enabled hosts.")
329329
(depends
330+
(ocaml (>= 4.09))
330331
(alcotest :with-test)
331332
angstrom
332333
astring

ocaml/xapi-aux/dune

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
(modes best)
44
(libraries
55
astring
6+
clock
67
cstruct
78
forkexec
89
ipaddr

ocaml/xapi-aux/throttle.ml

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,48 @@ module Make (Size : SIZE) = struct
3939

4040
let execute f = execute (get_semaphore ()) f
4141
end
42+
43+
module Batching = struct
44+
type t = {
45+
delay_initial: Mtime.span
46+
; delay_before: Mtime.span
47+
; delay_between: Mtime.span
48+
}
49+
50+
let make ~delay_before ~delay_between =
51+
(* we are dividing, cannot overflow *)
52+
let delay_initial =
53+
Mtime.Span.to_float_ns delay_between /. 16.
54+
|> Mtime.Span.of_float_ns
55+
|> Option.get
56+
in
57+
{delay_initial; delay_before; delay_between}
58+
59+
let span_min a b = if Mtime.Span.is_shorter a ~than:b then a else b
60+
61+
(** [perform_delay delay] calls {!val:Thread.delay} when [delay] is non-zero.
62+
63+
Thread.delay 0 provides no fairness guarantees, the current thread may actually be the one that gets the global lock again.
64+
Instead {!val:Thread.yield} could be used, which does provide fairness guarantees, but it may also introduce large latencies
65+
when there are lots of threads waiting for the OCaml runtime lock. Only invoke this once, in the [delay_before] section.
66+
*)
67+
let perform_delay ~yield delay =
68+
if Mtime.Span.is_longer delay ~than:Mtime.Span.zero then
69+
Thread.delay (Clock.Timer.span_to_s delay)
70+
else if yield then
71+
(* this is a low-priority thread, if there are any other threads waiting, then run them now.
72+
If there are no threads waiting then this a noop.
73+
Requires OCaml >= 4.09 (older versions had fairness issues in Thread.yield)
74+
*)
75+
Thread.yield ()
76+
77+
let with_recursive_loop config f =
78+
let rec self arg input =
79+
let arg = span_min config.delay_between Mtime.Span.(2 * arg) in
80+
perform_delay ~yield:false arg ;
81+
(f [@tailcall]) (self arg) input
82+
in
83+
let self0 input = (f [@tailcall]) (self config.delay_initial) input in
84+
perform_delay ~yield:true config.delay_before ;
85+
f self0
86+
end

ocaml/xapi-aux/throttle.mli

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,39 @@ module Make (_ : SIZE) : sig
2222

2323
val execute : (unit -> 'a) -> 'a
2424
end
25+
26+
module Batching : sig
27+
(** batching delay configuration *)
28+
type t
29+
30+
val make : delay_before:Mtime.Span.t -> delay_between:Mtime.Span.t -> t
31+
(** [make ~delay_before ~delay_between] creates a configuration,
32+
where we delay the API call by [delay_before] once,
33+
and then with [delay_between] between each recursive call.
34+
*)
35+
36+
val with_recursive_loop : t -> (('a -> 'b) -> 'a -> 'b) -> 'a -> 'b
37+
(** [with_recursive_loop config f arg] calls [f self arg], where [self] can be used
38+
for recursive calls.
39+
40+
[arg] is an argument that the implementation of [f] can change between recursive calls for its own purposes,
41+
otherwise [()] can be used.
42+
43+
A [delay_before] amount of seconds is inserted once, and [delay_between/8] is inserted between recursive calls,
44+
except the first one, and delays increase exponentially until [delay_between] is reached
45+
{v
46+
delay_before
47+
f ...
48+
(self[@tailcall]) ...
49+
f ...
50+
(self[@tailcall]) ...
51+
delay_between/8
52+
f ...
53+
(self[@tailcall]) ...
54+
delay_between/4
55+
f ...
56+
v}
57+
58+
The delays are determined by [config], and [delay_between] uses an exponential backoff, up to [config.delay_between] delay.
59+
*)
60+
end

ocaml/xapi/xapi_event.ml

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,12 @@ let is_lowercase str = String.for_all is_lowercase_char str
5656
module Subscription = struct
5757
type t = Class of string | Object of string * string | All
5858

59+
let is_task_only = function
60+
| Class "task" | Object ("task", _) ->
61+
true
62+
| Class _ | Object _ | All ->
63+
false
64+
5965
let of_string x =
6066
if x = "*" then
6167
All
@@ -470,6 +476,7 @@ let unregister ~__context ~classes =
470476

471477
(** Blocking call which returns the next set of events relevant to this session. *)
472478
let rec next ~__context =
479+
let batching = !Xapi_globs.event_next_delay in
473480
let session = Context.get_session_id __context in
474481
let open Next in
475482
assert_subscribed session ;
@@ -489,11 +496,12 @@ let rec next ~__context =
489496
)
490497
in
491498
(* Like grab_range () only guarantees to return a non-empty range by blocking if necessary *)
492-
let rec grab_nonempty_range () =
499+
let grab_nonempty_range =
500+
Throttle.Batching.with_recursive_loop batching @@ fun self arg ->
493501
let last_id, end_id = grab_range () in
494502
if last_id = end_id then
495503
let (_ : int64) = wait subscription end_id in
496-
grab_nonempty_range ()
504+
(self [@tailcall]) arg
497505
else
498506
(last_id, end_id)
499507
in
@@ -511,7 +519,7 @@ let rec next ~__context =
511519
else
512520
rpc_of_events relevant
513521

514-
let from_inner __context session subs from from_t timer =
522+
let from_inner __context session subs from from_t timer batching =
515523
let open Xapi_database in
516524
let open From in
517525
(* The database tables involved in our subscription *)
@@ -599,7 +607,8 @@ let from_inner __context session subs from from_t timer =
599607
(* Each event.from should have an independent subscription record *)
600608
let msg_gen, messages, tableset, (creates, mods, deletes, last) =
601609
with_call session subs (fun sub ->
602-
let rec grab_nonempty_range () =
610+
let grab_nonempty_range =
611+
Throttle.Batching.with_recursive_loop batching @@ fun self arg ->
603612
let ( (msg_gen, messages, _tableset, (creates, mods, deletes, last))
604613
as result
605614
) =
@@ -618,8 +627,7 @@ let from_inner __context session subs from from_t timer =
618627
(* last id the client got is equivalent to the current one *)
619628
last_msg_gen := msg_gen ;
620629
wait2 sub last timer ;
621-
Thread.delay 0.05 ;
622-
grab_nonempty_range ()
630+
(self [@tailcall]) arg
623631
) else
624632
result
625633
in
@@ -698,6 +706,19 @@ let from_inner __context session subs from from_t timer =
698706
{events; valid_ref_counts; token= Token.to_string (last, msg_gen)}
699707

700708
let from ~__context ~classes ~token ~timeout =
709+
let duration =
710+
timeout
711+
|> Clock.Timer.s_to_span
712+
|> Option.value ~default:Mtime.Span.(24 * hour)
713+
in
714+
let timer = Clock.Timer.start ~duration in
715+
let subs = List.map Subscription.of_string classes in
716+
let batching =
717+
if List.for_all Subscription.is_task_only subs then
718+
!Xapi_globs.event_from_task_delay
719+
else
720+
!Xapi_globs.event_from_delay
721+
in
701722
let session = Context.get_session_id __context in
702723
let from, from_t =
703724
try Token.of_string token
@@ -709,19 +730,14 @@ let from ~__context ~classes ~token ~timeout =
709730
(Api_errors.event_from_token_parse_failure, [token])
710731
)
711732
in
712-
let subs = List.map Subscription.of_string classes in
713-
let duration =
714-
timeout
715-
|> Clock.Timer.s_to_span
716-
|> Option.value ~default:Mtime.Span.(24 * hour)
717-
in
718-
let timer = Clock.Timer.start ~duration in
719733
(* We need to iterate because it's possible for an empty event set
720734
to be generated if we peek in-between a Modify and a Delete; we'll
721735
miss the Delete event and fail to generate the Modify because the
722736
snapshot can't be taken. *)
723737
let rec loop () =
724-
let event_from = from_inner __context session subs from from_t timer in
738+
let event_from =
739+
from_inner __context session subs from from_t timer batching
740+
in
725741
if event_from.events = [] && not (Clock.Timer.has_expired timer) then (
726742
debug "suppressing empty event.from" ;
727743
loop ()

ocaml/xapi/xapi_globs.ml

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1068,6 +1068,47 @@ let tgroups_enabled = ref false
10681068
let xapi_requests_cgroup =
10691069
"/sys/fs/cgroup/cpu/control.slice/xapi.service/request"
10701070

1071+
(* Event.{from,next} batching delays *)
1072+
let make_batching name ~delay_before ~delay_between =
1073+
let name = Printf.sprintf "%s_delay" name in
1074+
let config = ref (Throttle.Batching.make ~delay_before ~delay_between)
1075+
and config_vals = ref (delay_before, delay_between) in
1076+
let set str =
1077+
Scanf.sscanf str "%f,%f" @@ fun delay_before delay_between ->
1078+
match
1079+
(Clock.Timer.s_to_span delay_before, Clock.Timer.s_to_span delay_between)
1080+
with
1081+
| Some delay_before, Some delay_between ->
1082+
config_vals := (delay_before, delay_between) ;
1083+
config := Throttle.Batching.make ~delay_before ~delay_between
1084+
| _ ->
1085+
D.warn
1086+
"Ignoring argument '%s'. (it only allows durations of less than 104 \
1087+
days)"
1088+
str
1089+
and get () =
1090+
let d1, d2 = !config_vals in
1091+
Printf.sprintf "%f,%f" (Clock.Timer.span_to_s d1) (Clock.Timer.span_to_s d2)
1092+
and desc =
1093+
Printf.sprintf
1094+
"delays in seconds before the API call, and between internal recursive \
1095+
calls, separated with a comma"
1096+
in
1097+
(config, (name, Arg.String set, get, desc))
1098+
1099+
let event_from_delay, event_from_entry =
1100+
make_batching "event_from" ~delay_before:Mtime.Span.zero
1101+
~delay_between:Mtime.Span.(50 * ms)
1102+
1103+
let event_from_task_delay, event_from_task_entry =
1104+
make_batching "event_from_task" ~delay_before:Mtime.Span.zero
1105+
~delay_between:Mtime.Span.(50 * ms)
1106+
1107+
let event_next_delay, event_next_entry =
1108+
make_batching "event_next"
1109+
~delay_before:Mtime.Span.(200 * ms)
1110+
~delay_between:Mtime.Span.(50 * ms)
1111+
10711112
let xapi_globs_spec =
10721113
[
10731114
( "master_connection_reset_timeout"
@@ -1644,6 +1685,9 @@ let other_options =
16441685
, (fun () -> string_of_bool !tgroups_enabled)
16451686
, "Turn on tgroups classification"
16461687
)
1688+
; event_from_entry
1689+
; event_from_task_entry
1690+
; event_next_entry
16471691
]
16481692

16491693
(* The options can be set with the variable xapiflags in /etc/sysconfig/xapi.

xapi.opam

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ homepage: "https://xapi-project.github.io/"
1010
bug-reports: "https://github.com/xapi-project/xen-api/issues"
1111
depends: [
1212
"dune" {>= "3.15"}
13+
"ocaml" {>= "4.09"}
1314
"alcotest" {with-test}
1415
"angstrom"
1516
"astring"

0 commit comments

Comments
 (0)