@@ -525,6 +525,59 @@ let rec next ~__context =
525
525
else
526
526
rpc_of_events relevant
527
527
528
+ type time = Xapi_database.Db_cache_types.Time .t
529
+
530
+ type entry = {table : string ; obj : string ; time : time }
531
+
532
+ type acc = {
533
+ creates : entry list
534
+ ; mods : entry list
535
+ ; deletes : entry list
536
+ ; last : time
537
+ }
538
+
539
+ let collect_events (subs , tables , last_generation ) acc table =
540
+ let open Xapi_database in
541
+ let open Db_cache_types in
542
+ let table_value = TableSet. find table tables in
543
+ let prepend_recent obj stat _ ({creates; mods; last; _} as entries ) =
544
+ let Stat. {created; modified; deleted} = stat in
545
+ if Subscription. object_matches subs table obj then
546
+ let last = max last (max modified deleted) in
547
+ let creates =
548
+ if created > last_generation then
549
+ {table; obj; time= created} :: creates
550
+ else
551
+ creates
552
+ in
553
+ let mods =
554
+ if modified > last_generation && not (created > last_generation) then
555
+ {table; obj; time= modified} :: mods
556
+ else
557
+ mods
558
+ in
559
+ {entries with creates; mods; last}
560
+ else
561
+ entries
562
+ in
563
+ let prepend_deleted obj stat ({deletes; last; _} as entries ) =
564
+ let Stat. {created; modified; deleted} = stat in
565
+ if Subscription. object_matches subs table obj then
566
+ let last = max last (max modified deleted) in
567
+ let deletes =
568
+ if created < = last_generation then
569
+ {table; obj; time= deleted} :: deletes
570
+ else
571
+ deletes
572
+ in
573
+ {entries with deletes; last}
574
+ else
575
+ entries
576
+ in
577
+ acc
578
+ |> Table. fold_over_recent last_generation prepend_recent table_value
579
+ |> Table. fold_over_deleted last_generation prepend_deleted table_value
580
+
528
581
let from_inner __context session subs from from_t timer batching =
529
582
let open Xapi_database in
530
583
let open From in
@@ -541,159 +594,118 @@ let from_inner __context session subs from from_t timer batching =
541
594
in
542
595
List. filter (fun table -> Subscription. table_matches subs table) all
543
596
in
544
- let last_generation = ref from in
545
597
let last_msg_gen = ref from_t in
546
- let grab_range t =
598
+ let grab_range ~ since t =
547
599
let tableset = Db_cache_types.Database. tableset (Db_ref. get_database t) in
548
600
let msg_gen, messages =
549
601
if Subscription. table_matches subs " message" then
550
602
! Message. get_since_for_events ~__context ! last_msg_gen
551
603
else
552
604
(0L , [] )
553
605
in
554
- ( msg_gen
555
- , messages
556
- , tableset
557
- , List. fold_left
558
- (fun acc table ->
559
- (* Fold over the live objects *)
560
- let acc =
561
- Db_cache_types.Table. fold_over_recent ! last_generation
562
- (fun objref {Db_cache_types.Stat. created; modified; deleted} _
563
- (creates , mods , deletes , last ) ->
564
- if Subscription. object_matches subs table objref then
565
- let last = max last (max modified deleted) in
566
- (* mtime guaranteed to always be larger than ctime *)
567
- ( ( if created > ! last_generation then
568
- (table, objref, created) :: creates
569
- else
570
- creates
571
- )
572
- , ( if
573
- modified > ! last_generation
574
- && not (created > ! last_generation)
575
- then
576
- (table, objref, modified) :: mods
577
- else
578
- mods
579
- )
580
- , (* Only have a mod event if we don't have a created event *)
581
- deletes
582
- , last
583
- )
584
- else
585
- (creates, mods, deletes, last)
586
- )
587
- (Db_cache_types.TableSet. find table tableset)
588
- acc
589
- in
590
- (* Fold over the deleted objects *)
591
- Db_cache_types.Table. fold_over_deleted ! last_generation
592
- (fun objref {Db_cache_types.Stat. created; modified; deleted}
593
- (creates , mods , deletes , last ) ->
594
- if Subscription. object_matches subs table objref then
595
- let last = max last (max modified deleted) in
596
- (* mtime guaranteed to always be larger than ctime *)
597
- if created > ! last_generation then
598
- (creates, mods, deletes, last)
599
- (* It was created and destroyed since the last update *)
600
- else
601
- (creates, mods, (table, objref, deleted) :: deletes, last)
602
- (* It might have been modified, but we can't tell now *)
603
- else
604
- (creates, mods, deletes, last)
605
- )
606
- (Db_cache_types.TableSet. find table tableset)
607
- acc
608
- )
609
- ([] , [] , [] , ! last_generation)
610
- tables
611
- )
606
+ let events =
607
+ let initial = {creates= [] ; mods= [] ; deletes= [] ; last= since} in
608
+ let folder = collect_events (subs, tableset, since) in
609
+ List. fold_left folder initial tables
610
+ in
611
+ (msg_gen, messages, tableset, events)
612
612
in
613
613
(* Each event.from should have an independent subscription record *)
614
- let msg_gen, messages, tableset, (creates, mods, deletes, last) =
614
+ let msg_gen, messages, tableset, events =
615
615
with_call session subs (fun sub ->
616
616
let grab_nonempty_range =
617
- Throttle.Batching. with_recursive_loop batching @@ fun self arg ->
618
- let ( (msg_gen, messages, _tableset, (creates, mods, deletes, last))
619
- as result
620
- ) =
621
- Db_lock. with_lock (fun () -> grab_range (Db_backend. make () ))
617
+ Throttle.Batching. with_recursive_loop batching @@ fun self since ->
618
+ let result =
619
+ Db_lock. with_lock (fun () -> grab_range ~since (Db_backend. make () ))
622
620
in
621
+ let msg_gen, messages, _tables, events = result in
622
+ let {creates; mods; deletes; last} = events in
623
623
if
624
624
creates = []
625
625
&& mods = []
626
626
&& deletes = []
627
627
&& messages = []
628
628
&& not (Clock.Timer. has_expired timer)
629
629
then (
630
- last_generation := last ;
631
- (* Cur_id was bumped, but nothing relevent fell out of the db. Therefore the *)
630
+ (* cur_id was bumped, but nothing relevent fell out of the database.
631
+ Therefore the last ID the client got is equivalent to the current one. *)
632
632
sub.cur_id < - last ;
633
- (* last id the client got is equivalent to the current one *)
634
633
last_msg_gen := msg_gen ;
635
634
wait2 sub last timer ;
636
- (self [@ tailcall]) arg
635
+ (* The next iteration will fold over events starting after
636
+ the last database event that matched a subscription. *)
637
+ let next = last in
638
+ (self [@ tailcall]) next
637
639
) else
638
640
result
639
641
in
640
- grab_nonempty_range ()
642
+ grab_nonempty_range from
641
643
)
642
644
in
643
- last_generation := last ;
644
- let event_of op ?snapshot ( table , objref , time ) =
645
+ let {creates; mods; deletes; last} = events in
646
+ let event_of op ?snapshot { table; obj; time} =
645
647
{
646
648
id= Int64. to_string time
647
649
; ts= " 0.0"
648
650
; ty= String. lowercase_ascii table
649
651
; op
650
- ; reference= objref
652
+ ; reference= obj
651
653
; snapshot
652
654
}
653
655
in
654
- let events =
655
- List. fold_left
656
- (fun acc x ->
657
- let ev = event_of `del x in
658
- if Subscription. event_matches subs ev then ev :: acc else acc
659
- )
660
- [] deletes
661
- in
662
- let events =
663
- List. fold_left
664
- (fun acc (table , objref , mtime ) ->
665
- let serialiser = Eventgen. find_get_record table in
666
- try
667
- let xml = serialiser ~__context ~self: objref () in
668
- let ev = event_of `_mod ?snapshot:xml (table, objref, mtime) in
669
- if Subscription. event_matches subs ev then ev :: acc else acc
670
- with _ -> acc
671
- )
672
- events mods
656
+ let events_of ~kind ?(with_snapshot = true ) entries acc =
657
+ let rec go events ({table; obj; time = _ } as entry ) =
658
+ try
659
+ let snapshot =
660
+ let serialiser = Eventgen. find_get_record table in
661
+ if with_snapshot then
662
+ serialiser ~__context ~self: obj ()
663
+ else
664
+ None
665
+ in
666
+ let event = event_of kind ?snapshot entry in
667
+ if Subscription. event_matches subs event then
668
+ event :: events
669
+ else
670
+ events
671
+ with _ ->
672
+ (* CA-91931: An exception may be raised here if an object's
673
+ lifetime is too short.
674
+
675
+ The problem is that "collect_events" and "events_of" work
676
+ on different versions of the database, so some `add and
677
+ `mod events can be lost if the corresponding object is
678
+ deleted before a snapshot is taken.
679
+
680
+ In practice, this has only been seen with the "task"
681
+ object - which can be rapidly created and destroyed using
682
+ helper functions.
683
+
684
+ These exceptions have been suppressed since [bc0cc5a9]. *)
685
+ events
686
+ in
687
+ List. fold_left go acc entries
673
688
in
674
689
let events =
675
- List. fold_left
676
- (fun acc (table , objref , ctime ) ->
677
- let serialiser = Eventgen. find_get_record table in
678
- try
679
- let xml = serialiser ~__context ~self: objref () in
680
- let ev = event_of `add ?snapshot:xml (table, objref, ctime) in
681
- if Subscription. event_matches subs ev then ev :: acc else acc
682
- with _ -> acc
683
- )
684
- events creates
690
+ [] (* Accumulate the events for objects stored in the database. *)
691
+ |> events_of ~kind: `del ~with_snapshot: false deletes
692
+ |> events_of ~kind: `_mod mods
693
+ |> events_of ~kind: `add creates
685
694
in
686
695
let events =
696
+ (* Messages require a special casing as their contents are not
697
+ stored in the database. *)
687
698
List. fold_left
688
699
(fun acc mev ->
689
700
let event =
701
+ let table = " message" in
690
702
match mev with
691
703
| Message. Create (_ref , message ) ->
692
704
event_of `add
693
705
?snapshot:(Some (API. rpc_of_message_t message))
694
- ( " message " , Ref. string_of _ref, 0L )
706
+ {table; obj = Ref. string_of _ref; time = 0L }
695
707
| Message. Del _ref ->
696
- event_of `del ( " message " , Ref. string_of _ref, 0L )
708
+ event_of `del {table; obj = Ref. string_of _ref; time = 0L }
697
709
in
698
710
event :: acc
699
711
)
0 commit comments