Skip to content

Commit 2f031a4

Browse files
committed
feat(sdk): support for observing m.beacon events
1 parent cefd5a2 commit 2f031a4

File tree

4 files changed

+320
-8
lines changed

4 files changed

+320
-8
lines changed

crates/matrix-sdk/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ pub use sliding_sync::{
9494
#[cfg(feature = "uniffi")]
9595
uniffi::setup_scaffolding!();
9696

97+
pub mod live_location_share;
9798
#[cfg(any(test, feature = "testing"))]
9899
pub mod test_utils;
99100

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Copyright 2024 The Matrix.org Foundation C.I.C.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
//! Types for live location sharing.
16+
use async_stream::stream;
17+
use futures_util::Stream;
18+
use ruma::{
19+
events::{
20+
beacon::OriginalSyncBeaconEvent, beacon_info::BeaconInfoEventContent,
21+
location::LocationContent,
22+
},
23+
MilliSecondsSinceUnixEpoch, OwnedUserId, RoomId,
24+
};
25+
26+
use crate::{event_handler::ObservableEventHandler, Client, Room};
27+
28+
/// An observable live location.
29+
#[derive(Debug)]
30+
pub struct ObservableLiveLocation {
31+
observable_room_events: ObservableEventHandler<(OriginalSyncBeaconEvent, Room)>,
32+
}
33+
34+
impl ObservableLiveLocation {
35+
/// Create a new `ObservableLiveLocation` for a particular room.
36+
pub fn new(client: &Client, room_id: &RoomId) -> Self {
37+
Self { observable_room_events: client.observe_room_events(room_id) }
38+
}
39+
40+
/// Get a stream of [`LiveLocationShare`].
41+
pub fn subscribe(&self) -> impl Stream<Item = LiveLocationShare> {
42+
let stream = self.observable_room_events.subscribe();
43+
stream! {
44+
for await (event, room) in stream {
45+
yield LiveLocationShare {
46+
last_location: LastLocation {
47+
location: event.content.location,
48+
ts: event.origin_server_ts,
49+
},
50+
beacon_info: room
51+
.get_user_beacon_info(&event.sender)
52+
.await
53+
.ok()
54+
.map(|info| info.content),
55+
user_id: event.sender,
56+
};
57+
}
58+
}
59+
}
60+
}
61+
62+
/// Details of the last known location beacon.
63+
#[derive(Clone, Debug)]
64+
pub struct LastLocation {
65+
/// The most recent location content of the user.
66+
pub location: LocationContent,
67+
/// The timestamp of when the location was updated.
68+
pub ts: MilliSecondsSinceUnixEpoch,
69+
}
70+
71+
/// Details of a users live location share.
72+
#[derive(Clone, Debug)]
73+
pub struct LiveLocationShare {
74+
/// The user's last known location.
75+
pub last_location: LastLocation,
76+
/// Information about the associated beacon event.
77+
pub beacon_info: Option<BeaconInfoEventContent>,
78+
/// The user ID of the person sharing their live location.
79+
pub user_id: OwnedUserId,
80+
}

crates/matrix-sdk/src/room/mod.rs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ use crate::{
132132
error::{BeaconError, WrongRoomState},
133133
event_cache::{self, EventCacheDropHandles, RoomEventCache},
134134
event_handler::{EventHandler, EventHandlerDropGuard, EventHandlerHandle, SyncEvent},
135+
live_location_share::ObservableLiveLocation,
135136
media::{MediaFormat, MediaRequestParameters},
136137
notification_settings::{IsEncrypted, IsOneToOne, RoomNotificationMode},
137138
room::power_levels::{RoomPowerLevelChanges, RoomPowerLevelsExt},
@@ -2990,17 +2991,18 @@ impl Room {
29902991
Ok(())
29912992
}
29922993

2993-
/// Get the beacon information event in the room for the current user.
2994+
/// Get the beacon information event in the room for the `user_id`.
29942995
///
29952996
/// # Errors
29962997
///
29972998
/// Returns an error if the event is redacted, stripped, not found or could
29982999
/// not be deserialized.
2999-
async fn get_user_beacon_info(
3000+
pub(crate) async fn get_user_beacon_info(
30003001
&self,
3002+
user_id: &UserId,
30013003
) -> Result<OriginalSyncStateEvent<BeaconInfoEventContent>, BeaconError> {
30023004
let raw_event = self
3003-
.get_state_event_static_for_key::<BeaconInfoEventContent, _>(self.own_user_id())
3005+
.get_state_event_static_for_key::<BeaconInfoEventContent, _>(user_id)
30043006
.await?
30053007
.ok_or(BeaconError::NotFound)?;
30063008

@@ -3053,7 +3055,7 @@ impl Room {
30533055
) -> Result<send_state_event::v3::Response, BeaconError> {
30543056
self.ensure_room_joined()?;
30553057

3056-
let mut beacon_info_event = self.get_user_beacon_info().await?;
3058+
let mut beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
30573059
beacon_info_event.content.stop();
30583060
Ok(self.send_state_event_for_key(self.own_user_id(), beacon_info_event.content).await?)
30593061
}
@@ -3075,7 +3077,7 @@ impl Room {
30753077
) -> Result<send_message_event::v3::Response, BeaconError> {
30763078
self.ensure_room_joined()?;
30773079

3078-
let beacon_info_event = self.get_user_beacon_info().await?;
3080+
let beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
30793081

30803082
if beacon_info_event.content.is_live() {
30813083
let content = BeaconEventContent::new(beacon_info_event.event_id, geo_uri, None);
@@ -3166,6 +3168,14 @@ impl Room {
31663168
},
31673169
}
31683170
}
3171+
3172+
/// Observe live location sharing events for this room.
3173+
///
3174+
/// The returned receiver will receive the newest event for each sync
3175+
/// response that contains a `m.beacon` event.
3176+
pub fn observe_live_location_shares(&self) -> ObservableLiveLocation {
3177+
ObservableLiveLocation::new(&self.client, self.room_id())
3178+
}
31693179
}
31703180

31713181
#[cfg(all(feature = "e2e-encryption", not(target_arch = "wasm32")))]

crates/matrix-sdk/tests/integration/room/beacon/mod.rs

Lines changed: 224 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
use std::time::{Duration, UNIX_EPOCH};
22

3-
use matrix_sdk::config::SyncSettings;
4-
use matrix_sdk_test::{async_test, mocks::mock_encryption_state, test_json, DEFAULT_TEST_ROOM_ID};
5-
use ruma::{event_id, time::SystemTime};
3+
use futures_util::{pin_mut, StreamExt as _};
4+
use js_int::uint;
5+
use matrix_sdk::{config::SyncSettings, live_location_share::LiveLocationShare};
6+
use matrix_sdk_test::{
7+
async_test, mocks::mock_encryption_state, sync_timeline_event, test_json, JoinedRoomBuilder,
8+
SyncResponseBuilder, DEFAULT_TEST_ROOM_ID,
9+
};
10+
use ruma::{event_id, events::location::AssetType, time::SystemTime, MilliSecondsSinceUnixEpoch};
611
use serde_json::json;
712
use wiremock::{
813
matchers::{body_partial_json, header, method, path_regex},
@@ -153,3 +158,219 @@ async fn test_send_location_beacon_with_expired_live_share() {
153158

154159
assert!(response.is_err());
155160
}
161+
162+
#[async_test]
163+
async fn test_most_recent_event_in_stream() {
164+
let (client, server) = logged_in_client_with_server().await;
165+
166+
let mut sync_builder = SyncResponseBuilder::new();
167+
168+
let current_time = MilliSecondsSinceUnixEpoch::now();
169+
let millis_time = current_time
170+
.to_system_time()
171+
.unwrap()
172+
.duration_since(UNIX_EPOCH)
173+
.expect("Time went backwards")
174+
.as_millis() as u64;
175+
176+
mock_sync(
177+
&server,
178+
json!({
179+
"next_batch": "s526_47314_0_7_1_1_1_1_1",
180+
"rooms": {
181+
"join": {
182+
*DEFAULT_TEST_ROOM_ID: {
183+
"state": {
184+
"events": [
185+
{
186+
"content": {
187+
"description": "Live Share",
188+
"live": true,
189+
"org.matrix.msc3488.ts": millis_time,
190+
"timeout": 3000,
191+
"org.matrix.msc3488.asset": { "type": "m.self" }
192+
},
193+
"event_id": "$15139375514XsgmR:localhost",
194+
"origin_server_ts": millis_time,
195+
"sender": "@example:localhost",
196+
"state_key": "@example:localhost",
197+
"type": "org.matrix.msc3672.beacon_info",
198+
"unsigned": {
199+
"age": 7034220
200+
}
201+
},
202+
]
203+
}
204+
}
205+
}
206+
}
207+
208+
}),
209+
None,
210+
)
211+
.await;
212+
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
213+
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
214+
server.reset().await;
215+
216+
let room = client.get_room(*DEFAULT_TEST_ROOM_ID).unwrap();
217+
218+
let observable_live_location_shares = room.observe_live_location_shares();
219+
let stream = observable_live_location_shares.subscribe();
220+
pin_mut!(stream);
221+
222+
let mut timeline_events = Vec::new();
223+
224+
for nth in 0..25 {
225+
timeline_events.push(sync_timeline_event!({
226+
"content": {
227+
"m.relates_to": {
228+
"event_id": "$15139375514XsgmR:localhost",
229+
"rel_type": "m.reference"
230+
},
231+
"org.matrix.msc3488.location": {
232+
"uri": format!("geo:{nth}.9575274619722,12.494122581370175;u={nth}")
233+
},
234+
"org.matrix.msc3488.ts": 1_636_829_458
235+
},
236+
"event_id": format!("$event_for_stream_{nth}"),
237+
"origin_server_ts": 1_636_829_458,
238+
"sender": "@example:localhost",
239+
"type": "org.matrix.msc3672.beacon",
240+
"unsigned": {
241+
"age": 598971
242+
}
243+
}));
244+
}
245+
246+
sync_builder.add_joined_room(
247+
JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_timeline_bulk(timeline_events),
248+
);
249+
250+
mock_sync(&server, sync_builder.build_json_sync_response(), None).await;
251+
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
252+
server.reset().await;
253+
254+
// Stream should only process the latest beacon event for the user, ignoring any
255+
// previous events.
256+
let LiveLocationShare { user_id, last_location, beacon_info } =
257+
stream.next().await.expect("Another live location was expected");
258+
259+
assert_eq!(user_id.to_string(), "@example:localhost");
260+
261+
assert_eq!(last_location.location.uri, "geo:24.9575274619722,12.494122581370175;u=24");
262+
263+
assert!(last_location.location.description.is_none());
264+
assert!(last_location.location.zoom_level.is_none());
265+
assert_eq!(last_location.ts, MilliSecondsSinceUnixEpoch(uint!(1_636_829_458)));
266+
267+
let beacon_info = beacon_info.expect("Live location share is missing the beacon_info");
268+
269+
assert!(beacon_info.live);
270+
assert!(beacon_info.is_live());
271+
assert_eq!(beacon_info.description, Some("Live Share".to_owned()));
272+
assert_eq!(beacon_info.timeout, Duration::from_millis(3000));
273+
assert_eq!(beacon_info.ts, current_time);
274+
assert_eq!(beacon_info.asset.type_, AssetType::Self_);
275+
}
276+
277+
#[async_test]
278+
async fn test_observe_single_live_location_share() {
279+
let (client, server) = logged_in_client_with_server().await;
280+
281+
let current_time = MilliSecondsSinceUnixEpoch::now();
282+
let millis_time = current_time
283+
.to_system_time()
284+
.unwrap()
285+
.duration_since(UNIX_EPOCH)
286+
.expect("Time went backwards")
287+
.as_millis() as u64;
288+
289+
mock_sync(
290+
&server,
291+
json!({
292+
"next_batch": "s526_47314_0_7_1_1_1_1_1",
293+
"rooms": {
294+
"join": {
295+
*DEFAULT_TEST_ROOM_ID: {
296+
"state": {
297+
"events": [
298+
{
299+
"content": {
300+
"description": "Test Live Share",
301+
"live": true,
302+
"org.matrix.msc3488.ts": millis_time,
303+
"timeout": 3000,
304+
"org.matrix.msc3488.asset": { "type": "m.self" }
305+
},
306+
"event_id": "$test_beacon_info",
307+
"origin_server_ts": millis_time,
308+
"sender": "@example:localhost",
309+
"state_key": "@example:localhost",
310+
"type": "org.matrix.msc3672.beacon_info",
311+
}
312+
]
313+
}
314+
}
315+
}
316+
}
317+
}),
318+
None,
319+
)
320+
.await;
321+
322+
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
323+
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
324+
server.reset().await;
325+
326+
let room = client.get_room(*DEFAULT_TEST_ROOM_ID).unwrap();
327+
let observable_live_location_shares = room.observe_live_location_shares();
328+
let stream = observable_live_location_shares.subscribe();
329+
pin_mut!(stream);
330+
331+
let timeline_event = sync_timeline_event!({
332+
"content": {
333+
"m.relates_to": {
334+
"event_id": "$test_beacon_info",
335+
"rel_type": "m.reference"
336+
},
337+
"org.matrix.msc3488.location": {
338+
"uri": "geo:10.000000,20.000000;u=5"
339+
},
340+
"org.matrix.msc3488.ts": 1_636_829_458
341+
},
342+
"event_id": "$location_event",
343+
"origin_server_ts": millis_time,
344+
"sender": "@example:localhost",
345+
"type": "org.matrix.msc3672.beacon",
346+
});
347+
348+
mock_sync(
349+
&server,
350+
SyncResponseBuilder::new()
351+
.add_joined_room(
352+
JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_timeline_event(timeline_event),
353+
)
354+
.build_json_sync_response(),
355+
None,
356+
)
357+
.await;
358+
359+
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
360+
server.reset().await;
361+
362+
let LiveLocationShare { user_id, last_location, beacon_info } =
363+
stream.next().await.expect("Another live location was expected");
364+
365+
assert_eq!(user_id.to_string(), "@example:localhost");
366+
assert_eq!(last_location.location.uri, "geo:10.000000,20.000000;u=5");
367+
assert_eq!(last_location.ts, current_time);
368+
369+
let beacon_info = beacon_info.expect("Live location share is missing the beacon_info");
370+
371+
assert!(beacon_info.live);
372+
assert!(beacon_info.is_live());
373+
assert_eq!(beacon_info.description, Some("Test Live Share".to_owned()));
374+
assert_eq!(beacon_info.timeout, Duration::from_millis(3000));
375+
assert_eq!(beacon_info.ts, current_time);
376+
}

0 commit comments

Comments
 (0)