Skip to content

Commit 6cef7f2

Browse files
committed
feat(sdk): Implement Client::observe_events and Client::observe_room_events.
Changelog: This patch introduces a mechanism similar to `Client::add_event_handler` and `Client::add_room_event_handler` but with a reactive programming pattern. This patch adds `Client::observe_events` and `Client::observe_room_events`. ```rust // Get an observer. let observer = client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>(); // Subscribe to the observer. let mut subscriber = observer.subscribe(); // Use the subscriber as a `Stream`. let (message_event, (room, push_actions)) = subscriber.next().await.unwrap(); ``` When calling `observe_events`, one has to specify the type of event (in the example, `SyncRoomMessageEvent`) and a context (in the example, `(Room, Vec<Action>)`, respectively for the room and the push actions).
1 parent e798a51 commit 6cef7f2

File tree

4 files changed

+373
-6
lines changed

4 files changed

+373
-6
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/matrix-sdk/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ matrix-sdk-sqlite = { workspace = true, optional = true }
9797
matrix-sdk-test = { workspace = true, optional = true }
9898
mime = "0.3.16"
9999
mime2ext = "0.1.52"
100+
pin-project-lite = { workspace = true }
100101
rand = { workspace = true , optional = true }
101102
ruma = { workspace = true, features = ["rand", "unstable-msc2448", "unstable-msc2965", "unstable-msc3930", "unstable-msc3245-v1-compat", "unstable-msc2867"] }
102103
serde = { workspace = true }

crates/matrix-sdk/src/client/mod.rs

Lines changed: 89 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
use std::{
1818
collections::{btree_map, BTreeMap},
1919
fmt::{self, Debug},
20-
future::Future,
20+
future::{ready, Future},
2121
pin::Pin,
2222
sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
2323
};
@@ -88,7 +88,8 @@ use crate::{
8888
error::{HttpError, HttpResult},
8989
event_cache::EventCache,
9090
event_handler::{
91-
EventHandler, EventHandlerDropGuard, EventHandlerHandle, EventHandlerStore, SyncEvent,
91+
EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
92+
EventHandlerStore, ObservableEventHandler, SyncEvent,
9293
},
9394
http_client::HttpClient,
9495
matrix_auth::MatrixAuth,
@@ -776,7 +777,7 @@ impl Client {
776777
/// ```
777778
pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
778779
where
779-
Ev: SyncEvent + DeserializeOwned + Send + 'static,
780+
Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
780781
H: EventHandler<Ev, Ctx>,
781782
{
782783
self.add_event_handler_impl(handler, None)
@@ -798,12 +799,96 @@ impl Client {
798799
handler: H,
799800
) -> EventHandlerHandle
800801
where
801-
Ev: SyncEvent + DeserializeOwned + Send + 'static,
802+
Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
802803
H: EventHandler<Ev, Ctx>,
803804
{
804805
self.add_event_handler_impl(handler, Some(room_id.to_owned()))
805806
}
806807

808+
/// Observe a specific event type.
809+
///
810+
/// `Ev` represents the kind of event that will be observed. `Ctx`
811+
/// represents the context that will come with the event. It relies on the
812+
/// same mechanism as [`Self::add_event_handler`]. The main difference is
813+
/// that it returns an [`ObservableEventHandler`] and doesn't require a
814+
/// user-defined closure. It is possible to subscribe to the
815+
/// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
816+
/// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
817+
/// Ctx)`.
818+
///
819+
/// # Example
820+
///
821+
/// ```
822+
/// use futures_util::StreamExt as _;
823+
/// use matrix_sdk::{
824+
/// ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
825+
/// Client, Room,
826+
/// };
827+
///
828+
/// # async fn example(client: Client) {
829+
/// let observer =
830+
/// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
831+
///
832+
/// let mut subscriber = observer.subscribe();
833+
///
834+
/// let (message_event, (room, push_actions)) =
835+
/// subscriber.next().await.unwrap();
836+
/// # }
837+
/// ```
838+
///
839+
/// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
840+
pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
841+
where
842+
Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
843+
Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
844+
{
845+
self.observe_room_events_impl(None)
846+
}
847+
848+
/// Observe a specific room, and event type.
849+
///
850+
/// This method works the same way as
851+
/// [`observe_events`][Self::observe_events], except that the observability
852+
/// will only be applied for events in the room with the specified ID.
853+
/// See that method for more details.
854+
pub fn observe_room_events<Ev, Ctx>(
855+
&self,
856+
room_id: &RoomId,
857+
) -> ObservableEventHandler<(Ev, Ctx)>
858+
where
859+
Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
860+
Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
861+
{
862+
self.observe_room_events_impl(Some(room_id.to_owned()))
863+
}
864+
865+
/// Shared implementation for `Self::observe_events` and
866+
/// `Self::observe_room_events`.
867+
fn observe_room_events_impl<Ev, Ctx>(
868+
&self,
869+
room_id: Option<OwnedRoomId>,
870+
) -> ObservableEventHandler<(Ev, Ctx)>
871+
where
872+
Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
873+
Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
874+
{
875+
// The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
876+
// new value.
877+
let shared_observable = SharedObservable::new(None);
878+
879+
ObservableEventHandler::new(
880+
shared_observable.clone(),
881+
self.event_handler_drop_guard(self.add_event_handler_impl(
882+
move |event: Ev, context: Ctx| {
883+
shared_observable.set(Some((event, context)));
884+
885+
ready(())
886+
},
887+
room_id,
888+
)),
889+
)
890+
}
891+
807892
/// Remove the event handler associated with the handle.
808893
///
809894
/// Note that you **must not** call `remove_event_handler` from the

0 commit comments

Comments
 (0)