Skip to content

Commit 6b23e91

Browse files
committed
feat(sdk): Introduce event_cache::Deduplicator.
This patch introduces `Deduplicator`, an efficient type to detect duplicated events in the event cache. It uses a bloom filter, and decorates a collection of events with `Decoration`, which an enum that marks whether an event is unique, duplicated or invalid.
1 parent ce9dc73 commit 6b23e91

File tree

4 files changed

+273
-0
lines changed

4 files changed

+273
-0
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/matrix-sdk/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ eyeball-im = { workspace = true }
8282
eyre = { version = "0.6.8", optional = true }
8383
futures-core = { workspace = true }
8484
futures-util = { workspace = true }
85+
growable-bloom-filter = { workspace = true }
8586
http = { workspace = true }
8687
imbl = { workspace = true, features = ["serde"] }
8788
indexmap = "2.0.2"
Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
// Copyright 2024 The Matrix.org Foundation C.I.C.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
//! Simple but efficient types to find duplicated events. See [`Deduplicator`]
16+
//! to learn more.
17+
18+
use std::{collections::BTreeSet, sync::Mutex};
19+
20+
use growable_bloom_filter::{GrowableBloom, GrowableBloomBuilder};
21+
22+
use super::store::{Event, RoomEvents};
23+
24+
/// `Deduplicator` is an efficient type to find duplicated events.
25+
///
26+
/// It uses a [bloom filter] to provide a memory efficient probabilistic answer
27+
/// to: “has event E been seen already?”. False positives are possible, while
28+
/// false negatives are impossible. In the case of a positive reply, we fallback
29+
/// to a linear (backward) search on all events to check whether it's a false
30+
/// positive or not
31+
///
32+
/// [bloom filter]: https://en.wikipedia.org/wiki/Bloom_filter
33+
pub struct Deduplicator {
34+
bloom_filter: Mutex<GrowableBloom>,
35+
}
36+
37+
impl Deduplicator {
38+
const APPROXIMATED_MAXIMUM_NUMBER_OF_EVENTS: usize = 800_000;
39+
const DESIRED_FALSE_POSITIVE_RATE: f64 = 0.001;
40+
41+
/// Create a new `Deduplicator`.
42+
pub fn new() -> Self {
43+
Self {
44+
bloom_filter: Mutex::new(
45+
GrowableBloomBuilder::new()
46+
.estimated_insertions(Self::APPROXIMATED_MAXIMUM_NUMBER_OF_EVENTS)
47+
.desired_error_ratio(Self::DESIRED_FALSE_POSITIVE_RATE)
48+
.build(),
49+
),
50+
}
51+
}
52+
53+
/// Scan a collection of events and detect duplications.
54+
///
55+
/// This method takes a collection of events `new_events_to_scan` and
56+
/// returns a new collection of events, where each event is decorated by
57+
/// a [`Decoration`], so that the caller can decide what to do with
58+
/// these events.
59+
///
60+
/// Each scanned event will update `Self`'s internal state.
61+
///
62+
/// `existing_events` represents all events of a room that already exist.
63+
pub fn scan_and_learn<'a, I>(
64+
&'a self,
65+
new_events_to_scan: I,
66+
existing_events: &'a RoomEvents,
67+
) -> impl Iterator<Item = Decoration<I::Item>> + 'a
68+
where
69+
I: Iterator<Item = Event> + 'a,
70+
{
71+
// `new_scanned_events` is not a field of `Self` because it is used only detect
72+
// duplicates in `new_events_to_scan`.
73+
let mut new_scanned_events = BTreeSet::new();
74+
75+
new_events_to_scan.map(move |event| {
76+
let Some(event_id) = event.event_id() else {
77+
// The event has no `event_id`.
78+
return Decoration::Invalid(event);
79+
};
80+
81+
if self.bloom_filter.lock().unwrap().check_and_set(&event_id) {
82+
// Oh oh, it looks like we have found a duplicate!
83+
//
84+
// However, bloom filters have false positives. We are NOT sure the event is NOT
85+
// present. Even if the false positive rate is low, we need to
86+
// iterate over all events to ensure it isn't present.
87+
88+
// First, let's ensure `event` is not a duplicate from `new_events_to_scan`,
89+
// i.e. if the iterator itself contains duplicated events! We use a `BTreeSet`,
90+
// otherwise using a bloom filter again may generate false positives.
91+
if new_scanned_events.contains(&event_id) {
92+
// The iterator contains a duplicated `event`.
93+
return Decoration::Duplicated(event);
94+
}
95+
96+
// Second, we can iterate over all events to ensure `event` is not present in
97+
// `existing_events`.
98+
let duplicated = existing_events.revents().any(|(_position, other_event)| {
99+
other_event.event_id().as_ref() == Some(&event_id)
100+
});
101+
102+
new_scanned_events.insert(event_id);
103+
104+
if duplicated {
105+
Decoration::Duplicated(event)
106+
} else {
107+
Decoration::Unique(event)
108+
}
109+
} else {
110+
new_scanned_events.insert(event_id);
111+
112+
// Bloom filter has no false negatives. We are sure the event is NOT present: we
113+
// can keep it in the iterator.
114+
Decoration::Unique(event)
115+
}
116+
})
117+
}
118+
}
119+
120+
/// Information about the scanned collection of events.
121+
#[derive(Debug)]
122+
pub enum Decoration<I> {
123+
/// This event is not duplicated.
124+
Unique(I),
125+
126+
/// This event is duplicated.
127+
Duplicated(I),
128+
129+
/// This event is invalid (i.e. not well formed).
130+
Invalid(I),
131+
}
132+
133+
#[cfg(test)]
134+
mod tests {
135+
use assert_matches2::assert_let;
136+
use matrix_sdk_base::deserialized_responses::SyncTimelineEvent;
137+
use matrix_sdk_test::{EventBuilder, ALICE};
138+
use ruma::{events::room::message::RoomMessageEventContent, owned_event_id, EventId};
139+
140+
use super::*;
141+
142+
fn sync_timeline_event(event_builder: &EventBuilder, event_id: &EventId) -> SyncTimelineEvent {
143+
SyncTimelineEvent::new(event_builder.make_sync_message_event_with_id(
144+
*ALICE,
145+
event_id,
146+
RoomMessageEventContent::text_plain("foo"),
147+
))
148+
}
149+
150+
#[test]
151+
fn test_filter_no_duplicate() {
152+
let event_builder = EventBuilder::new();
153+
154+
let event_id_0 = owned_event_id!("$ev0");
155+
let event_id_1 = owned_event_id!("$ev1");
156+
let event_id_2 = owned_event_id!("$ev2");
157+
158+
let event_0 = sync_timeline_event(&event_builder, &event_id_0);
159+
let event_1 = sync_timeline_event(&event_builder, &event_id_1);
160+
let event_2 = sync_timeline_event(&event_builder, &event_id_2);
161+
162+
let deduplicator = Deduplicator::new();
163+
let existing_events = RoomEvents::new();
164+
165+
let mut events =
166+
deduplicator.scan_and_learn([event_0, event_1, event_2].into_iter(), &existing_events);
167+
168+
assert_let!(Some(Decoration::Unique(event)) = events.next());
169+
assert_eq!(event.event_id(), Some(event_id_0));
170+
171+
assert_let!(Some(Decoration::Unique(event)) = events.next());
172+
assert_eq!(event.event_id(), Some(event_id_1));
173+
174+
assert_let!(Some(Decoration::Unique(event)) = events.next());
175+
assert_eq!(event.event_id(), Some(event_id_2));
176+
177+
assert!(events.next().is_none());
178+
}
179+
180+
#[test]
181+
fn test_filter_duplicates_in_new_events() {
182+
let event_builder = EventBuilder::new();
183+
184+
let event_id_0 = owned_event_id!("$ev0");
185+
let event_id_1 = owned_event_id!("$ev1");
186+
187+
let event_0 = sync_timeline_event(&event_builder, &event_id_0);
188+
let event_1 = sync_timeline_event(&event_builder, &event_id_1);
189+
190+
let deduplicator = Deduplicator::new();
191+
let existing_events = RoomEvents::new();
192+
193+
let mut events = deduplicator.scan_and_learn(
194+
[
195+
event_0.clone(), // OK
196+
event_0, // Not OK
197+
event_1, // OK
198+
]
199+
.into_iter(),
200+
&existing_events,
201+
);
202+
203+
assert_let!(Some(Decoration::Unique(event)) = events.next());
204+
assert_eq!(event.event_id(), Some(event_id_0.clone()));
205+
206+
assert_let!(Some(Decoration::Duplicated(event)) = events.next());
207+
assert_eq!(event.event_id(), Some(event_id_0));
208+
209+
assert_let!(Some(Decoration::Unique(event)) = events.next());
210+
assert_eq!(event.event_id(), Some(event_id_1));
211+
212+
assert!(events.next().is_none());
213+
}
214+
215+
#[test]
216+
fn test_filter_duplicates_with_existing_events() {
217+
let event_builder = EventBuilder::new();
218+
219+
let event_id_0 = owned_event_id!("$ev0");
220+
let event_id_1 = owned_event_id!("$ev1");
221+
let event_id_2 = owned_event_id!("$ev2");
222+
223+
let event_0 = sync_timeline_event(&event_builder, &event_id_0);
224+
let event_1 = sync_timeline_event(&event_builder, &event_id_1);
225+
let event_2 = sync_timeline_event(&event_builder, &event_id_2);
226+
227+
let deduplicator = Deduplicator::new();
228+
let mut existing_events = RoomEvents::new();
229+
230+
// Simulate `event_1` is inserted inside `existing_events`.
231+
{
232+
let mut events =
233+
deduplicator.scan_and_learn([event_1.clone()].into_iter(), &existing_events);
234+
235+
assert_let!(Some(Decoration::Unique(event_1)) = events.next());
236+
assert_eq!(event_1.event_id(), Some(event_id_1.clone()));
237+
238+
assert!(events.next().is_none());
239+
240+
drop(events); // make the borrow checker happy.
241+
242+
// Now we can push `event_1` inside `existing_events`.
243+
existing_events.push_events([event_1]);
244+
}
245+
246+
// `event_1` will be duplicated.
247+
{
248+
let mut events = deduplicator.scan_and_learn(
249+
[
250+
event_0, // OK
251+
event_1, // Not OK
252+
event_2, // Ok
253+
]
254+
.into_iter(),
255+
&existing_events,
256+
);
257+
258+
assert_let!(Some(Decoration::Unique(event)) = events.next());
259+
assert_eq!(event.event_id(), Some(event_id_0));
260+
261+
assert_let!(Some(Decoration::Duplicated(event)) = events.next());
262+
assert_eq!(event.event_id(), Some(event_id_1));
263+
264+
assert_let!(Some(Decoration::Unique(event)) = events.next());
265+
assert_eq!(event.event_id(), Some(event_id_2));
266+
267+
assert!(events.next().is_none());
268+
}
269+
}
270+
}

crates/matrix-sdk/src/event_cache/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ use tracing::{error, info_span, instrument, trace, warn, Instrument as _, Span};
5353
use self::paginator::PaginatorError;
5454
use crate::{client::WeakClient, Client};
5555

56+
mod deduplicator;
5657
mod linked_chunk;
5758
mod pagination;
5859
mod room;

0 commit comments

Comments
 (0)