Skip to content

Commit 405e834

Browse files
committed
task(ui): Create TimelineSubscriber.
This patch gathers the logic of the `Timeline::subscribe` into a single type: `TimelineSubscriber`. The `TimelineController::subscribe_batched_and_limited` method is renamed `subscribe` to match `Timeline::subscribe`. Things are simpler to apprehend. The `TimelineSubscriber` type configures the subscriber/stream in a single place. It takes an `&ObservableItems` and a `&SkipCount`, and configures everything. It also provides a single place to document the behaviour of the subscriber, with the `Skip` higher-order stream.
1 parent f5d7d26 commit 405e834

File tree

5 files changed

+73
-20
lines changed

5 files changed

+73
-20
lines changed

crates/matrix-sdk-ui/src/timeline/controller/metadata.rs

+2
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ pub(in crate::timeline) struct TimelineMetadata {
3838
/// This value is constant over the lifetime of the metadata.
3939
internal_id_prefix: Option<String>,
4040

41+
/// The `count` value for the `Skip higher-order stream used by the
42+
/// `TimelineSubscriber`. See its documentation to learn more.
4143
pub(super) subscriber_skip_count: SkipCount,
4244

4345
/// The hook to call whenever we run into a unable-to-decrypt event.

crates/matrix-sdk-ui/src/timeline/controller/mod.rs

+7-12
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,10 @@ use tracing::{
5454
debug, error, field, field::debug, info, info_span, instrument, trace, warn, Instrument as _,
5555
};
5656

57-
#[cfg(test)]
58-
pub(super) use self::observable_items::ObservableItems;
5957
pub(super) use self::{
6058
metadata::{RelativePosition, TimelineMetadata},
6159
observable_items::{
62-
AllRemoteEvents, ObservableItemsEntry, ObservableItemsTransaction,
60+
AllRemoteEvents, ObservableItems, ObservableItemsEntry, ObservableItemsTransaction,
6361
ObservableItemsTransactionEntry,
6462
},
6563
state::{FullEventMeta, PendingEdit, PendingEditKind, TimelineState},
@@ -70,6 +68,7 @@ use super::{
7068
event_handler::TimelineEventKind,
7169
event_item::{ReactionStatus, RemoteEventOrigin},
7270
item::TimelineUniqueId,
71+
subscriber::TimelineSubscriber,
7372
traits::{Decryptor, RoomDataProvider},
7473
DateDividerMode, Error, EventSendState, EventTimelineItem, InReplyToDetails, Message,
7574
PaginationError, Profile, ReactionInfo, RepliedToEvent, TimelineDetails, TimelineEventItemId,
@@ -514,18 +513,14 @@ impl<P: RoomDataProvider> TimelineController<P> {
514513
impl Stream<Item = VectorDiff<Arc<TimelineItem>>> + SendOutsideWasm,
515514
) {
516515
let state = self.state.read().await;
517-
(state.items.clone_items(), state.items.subscribe().into_stream())
516+
517+
state.items.subscribe().into_values_and_stream()
518518
}
519519

520-
pub(super) async fn subscribe_batched_and_limited(
521-
&self,
522-
) -> (Vector<Arc<TimelineItem>>, impl Stream<Item = Vec<VectorDiff<Arc<TimelineItem>>>>) {
520+
pub(super) async fn subscribe(&self) -> (Vector<Arc<TimelineItem>>, TimelineSubscriber) {
523521
let state = self.state.read().await;
524-
state
525-
.items
526-
.subscribe()
527-
.into_values_and_batched_stream()
528-
.dynamic_skip_with_initial_count(0, state.meta.subscriber_skip_count.subscribe())
522+
523+
TimelineSubscriber::new(&state.items, &state.meta.subscriber_skip_count)
529524
}
530525

531526
pub(super) async fn subscribe_filter_map<U, F>(

crates/matrix-sdk-ui/src/timeline/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ impl Timeline {
276276
pub async fn subscribe(
277277
&self,
278278
) -> (Vector<Arc<TimelineItem>>, impl Stream<Item = Vec<VectorDiff<Arc<TimelineItem>>>>) {
279-
let (items, stream) = self.controller.subscribe_batched_and_limited().await;
279+
let (items, stream) = self.controller.subscribe().await;
280280
let stream = WithTimelineDropHandle::new(stream, self.drop_handle.clone());
281281
(items, stream)
282282
}

crates/matrix-sdk-ui/src/timeline/subscriber.rs

+62-6
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,14 @@ use std::{
1818
task::{Context, Poll},
1919
};
2020

21+
use eyeball::Subscriber;
22+
use eyeball_im::{VectorDiff, VectorSubscriberBatchedStream};
23+
use eyeball_im_util::vector::{Skip, VectorObserverExt};
2124
use futures_core::Stream;
25+
use imbl::Vector;
2226
use pin_project_lite::pin_project;
2327

24-
use super::TimelineDropHandle;
28+
use super::{controller::ObservableItems, item::TimelineItem, TimelineDropHandle};
2529

2630
pin_project! {
2731
/// A stream that wraps a [`TimelineDropHandle`] so that the `Timeline`
@@ -46,25 +50,77 @@ where
4650
{
4751
type Item = S::Item;
4852

49-
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
50-
self.project().inner.poll_next(cx)
53+
fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
54+
self.project().inner.poll_next(context)
55+
}
56+
}
57+
58+
pin_project! {
59+
/// A type that creates a proper `Timeline` subscriber.
60+
///
61+
/// This type implements [`Stream`], so that it's entirely transparent for
62+
/// all consumers expecting an `impl Stream`.
63+
///
64+
/// This `Stream` pipes `VectorDiff`s from [`ObservableItems`] into a batched
65+
/// stream ([`VectorSubscriberBatchedStream`]), and then apply a skip
66+
/// higher-order stream ([`Skip`]).
67+
///
68+
/// `Skip` works by skipping the first _n_ values, where _n_ is referred
69+
/// as `count`. Here, this `count` value is defined by a `Stream<Item =
70+
/// usize>` (see [`Skip::dynamic_skip_with_initial_count`]). Everytime
71+
/// the `count` stream produces a value, `Skip` adjusts its output.
72+
/// `count` is managed by [`SkipCount`][skip::SkipCount], and is hold in
73+
/// `TimelineMetadata::subscriber_skip_count`.
74+
pub(super) struct TimelineSubscriber {
75+
#[pin]
76+
inner: Skip<VectorSubscriberBatchedStream<Arc<TimelineItem>>, Subscriber<usize>>,
77+
}
78+
}
79+
80+
impl TimelineSubscriber {
81+
/// Creates a [`TimelineSubscriber`], in addition to the initial values of
82+
/// the subscriber.
83+
pub(super) fn new(
84+
observable_items: &ObservableItems,
85+
observable_skip_count: &skip::SkipCount,
86+
) -> (Vector<Arc<TimelineItem>>, Self) {
87+
let (initial_values, stream) = observable_items
88+
.subscribe()
89+
.into_values_and_batched_stream()
90+
.dynamic_skip_with_initial_count(0, observable_skip_count.subscribe());
91+
92+
(initial_values, Self { inner: stream })
93+
}
94+
}
95+
96+
impl Stream for TimelineSubscriber {
97+
type Item = Vec<VectorDiff<Arc<TimelineItem>>>;
98+
99+
fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
100+
self.project().inner.poll_next(context)
51101
}
52102
}
53103

54104
pub mod skip {
55-
use eyeball::SharedObservable;
56-
use futures_core::Stream;
105+
use eyeball::{SharedObservable, Subscriber};
57106

58107
use super::super::controller::TimelineFocusKind;
59108

60109
const MAXIMUM_NUMBER_OF_INITIAL_ITEMS: usize = 20;
61110

111+
/// `SkipCount` helps to manage the `count` value used by the [`Skip`]
112+
/// higher-order stream used by the [`TimelineSubscriber`]. See its
113+
/// documentation to learn more.
114+
///
115+
/// [`Skip`]: eyeball_im_util::vector::Skip
116+
/// [`TimelineSubscriber`]: super::TimelineSubscriber
62117
#[derive(Clone, Debug)]
63118
pub struct SkipCount {
64119
count: SharedObservable<usize>,
65120
}
66121

67122
impl SkipCount {
123+
/// Create a [`SkipCount`] with a default `count` value set to 0.
68124
pub fn new() -> Self {
69125
Self { count: SharedObservable::new(0) }
70126
}
@@ -176,7 +232,7 @@ pub mod skip {
176232
}
177233

178234
/// Subscribe to update of the count value.
179-
pub fn subscribe(&self) -> impl Stream<Item = usize> {
235+
pub fn subscribe(&self) -> Subscriber<usize> {
180236
self.count.subscribe()
181237
}
182238

crates/matrix-sdk-ui/src/timeline/tests/basic.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -461,7 +461,7 @@ async fn test_replace_with_initial_events_when_batched() {
461461
)
462462
.await;
463463

464-
let (items, mut stream) = timeline.controller.subscribe_batched_and_limited().await;
464+
let (items, mut stream) = timeline.controller.subscribe().await;
465465
assert_eq!(items.len(), 2);
466466
assert!(items[0].is_date_divider());
467467
assert_eq!(items[1].as_event().unwrap().content().as_message().unwrap().body(), "hey");

0 commit comments

Comments
 (0)