@@ -525,6 +525,57 @@ let rec next ~__context =
525
525
else
526
526
rpc_of_events relevant
527
527
528
+ type entry = string * string * Xapi_database.Db_cache_types.Time .t
529
+
530
+ type acc = {
531
+ creates : entry list
532
+ ; mods : entry list
533
+ ; deletes : entry list
534
+ ; last : Xapi_database.Db_cache_types.Time .t
535
+ }
536
+
537
+ let collect_events subs tables last_generation acc table =
538
+ let open Xapi_database in
539
+ let open Db_cache_types in
540
+ let table_value = TableSet. find table tables in
541
+ let prepend_recent obj stat _ ({creates; mods; last; _} as entries ) =
542
+ let Stat. {created; modified; deleted} = stat in
543
+ if Subscription. object_matches subs table obj then
544
+ let last = max last (max modified deleted) in
545
+ let creates =
546
+ if created > ! last_generation then
547
+ (table, obj, created) :: creates
548
+ else
549
+ creates
550
+ in
551
+ let mods =
552
+ if modified > ! last_generation && not (created > ! last_generation) then
553
+ (table, obj, modified) :: mods
554
+ else
555
+ mods
556
+ in
557
+ {entries with creates; mods; last}
558
+ else
559
+ entries
560
+ in
561
+ let prepend_deleted obj stat ({deletes; last; _} as entries ) =
562
+ let Stat. {created; modified; deleted} = stat in
563
+ if Subscription. object_matches subs table obj then
564
+ let last = max last (max modified deleted) in
565
+ let deletes =
566
+ if created < = ! last_generation then
567
+ (table, obj, deleted) :: deletes
568
+ else
569
+ deletes
570
+ in
571
+ {entries with deletes; last}
572
+ else
573
+ entries
574
+ in
575
+ acc
576
+ |> Table. fold_over_recent ! last_generation prepend_recent table_value
577
+ |> Table. fold_over_deleted ! last_generation prepend_deleted table_value
578
+
528
579
let from_inner __context session subs from from_t timer batching =
529
580
let open Xapi_database in
530
581
let open From in
@@ -551,75 +602,25 @@ let from_inner __context session subs from from_t timer batching =
551
602
else
552
603
(0L , [] )
553
604
in
554
- ( msg_gen
555
- , messages
556
- , tableset
557
- , List. fold_left
558
- (fun acc table ->
559
- (* Fold over the live objects *)
560
- let acc =
561
- Db_cache_types.Table. fold_over_recent ! last_generation
562
- (fun objref {Db_cache_types.Stat. created; modified; deleted} _
563
- (creates , mods , deletes , last ) ->
564
- if Subscription. object_matches subs table objref then
565
- let last = max last (max modified deleted) in
566
- (* mtime guaranteed to always be larger than ctime *)
567
- ( ( if created > ! last_generation then
568
- (table, objref, created) :: creates
569
- else
570
- creates
571
- )
572
- , ( if
573
- modified > ! last_generation
574
- && not (created > ! last_generation)
575
- then
576
- (table, objref, modified) :: mods
577
- else
578
- mods
579
- )
580
- , (* Only have a mod event if we don't have a created event *)
581
- deletes
582
- , last
583
- )
584
- else
585
- (creates, mods, deletes, last)
586
- )
587
- (Db_cache_types.TableSet. find table tableset)
588
- acc
589
- in
590
- (* Fold over the deleted objects *)
591
- Db_cache_types.Table. fold_over_deleted ! last_generation
592
- (fun objref {Db_cache_types.Stat. created; modified; deleted}
593
- (creates , mods , deletes , last ) ->
594
- if Subscription. object_matches subs table objref then
595
- let last = max last (max modified deleted) in
596
- (* mtime guaranteed to always be larger than ctime *)
597
- if created > ! last_generation then
598
- (creates, mods, deletes, last)
599
- (* It was created and destroyed since the last update *)
600
- else
601
- (creates, mods, (table, objref, deleted) :: deletes, last)
602
- (* It might have been modified, but we can't tell now *)
603
- else
604
- (creates, mods, deletes, last)
605
- )
606
- (Db_cache_types.TableSet. find table tableset)
607
- acc
608
- )
609
- ([] , [] , [] , ! last_generation)
610
- tables
611
- )
605
+ let events =
606
+ let initial =
607
+ {creates= [] ; mods= [] ; deletes= [] ; last= ! last_generation}
608
+ in
609
+ let folder = collect_events subs tableset last_generation in
610
+ List. fold_left folder initial tables
611
+ in
612
+ (msg_gen, messages, tableset, events)
612
613
in
613
614
(* Each event.from should have an independent subscription record *)
614
- let msg_gen, messages, tableset, (creates, mods, deletes, last) =
615
+ let msg_gen, messages, tableset, events =
615
616
with_call session subs (fun sub ->
616
617
let grab_nonempty_range =
617
618
Throttle.Batching. with_recursive_loop batching @@ fun self arg ->
618
- let ( (msg_gen, messages, _tableset, (creates, mods, deletes, last))
619
- as result
620
- ) =
619
+ let result =
621
620
Db_lock. with_lock (fun () -> grab_range (Db_backend. make () ))
622
621
in
622
+ let msg_gen, messages, _tables, events = result in
623
+ let {creates; mods; deletes; last} = events in
623
624
if
624
625
creates = []
625
626
&& mods = []
@@ -640,6 +641,7 @@ let from_inner __context session subs from from_t timer batching =
640
641
grab_nonempty_range ()
641
642
)
642
643
in
644
+ let {creates; mods; deletes; last} = events in
643
645
last_generation := last ;
644
646
let event_of op ?snapshot (table , objref , time ) =
645
647
{
0 commit comments