Skip to content

Commit 9641aa9

Browse files
zzorbaDaniel SalinasbnjbvrDaniel Salinas
authored
feat(send queue): Add an enqueued time to to-be-sent events (#4385)
Add a new created_at to the send_queue_events and dependent_send_queue_events stored records. This will allow clients to understand how stale a pending message might be in the event that the queue encounters and error and becomes wedged. This change is exposed through the FFI on the `EventTimelineItem` struct as a new optional field named `local_created_at`. It will be `None` for any Remote event, and `Some` for Local events (except for those that were enqueued before the migrations were run). Signed-off-by: Daniel Salinas --------- Signed-off-by: Daniel Salinas <[email protected]> Co-authored-by: Daniel Salinas <[email protected]> Co-authored-by: Benjamin Bouvier <[email protected]> Co-authored-by: Daniel Salinas <[email protected]>
1 parent a8ca77f commit 9641aa9

File tree

11 files changed

+312
-52
lines changed

11 files changed

+312
-52
lines changed

bindings/matrix-sdk-ffi/src/timeline/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1033,6 +1033,7 @@ pub struct EventTimelineItem {
10331033
timestamp: Timestamp,
10341034
reactions: Vec<Reaction>,
10351035
local_send_state: Option<EventSendState>,
1036+
local_created_at: Option<u64>,
10361037
read_receipts: HashMap<String, Receipt>,
10371038
origin: Option<EventItemOrigin>,
10381039
can_be_replied_to: bool,
@@ -1070,6 +1071,7 @@ impl From<matrix_sdk_ui::timeline::EventTimelineItem> for EventTimelineItem {
10701071
timestamp: item.timestamp().into(),
10711072
reactions,
10721073
local_send_state: item.send_state().map(|s| s.into()),
1074+
local_created_at: item.local_created_at().map(|t| t.0.into()),
10731075
read_receipts,
10741076
origin: item.origin(),
10751077
can_be_replied_to: item.can_be_replied_to(),

crates/matrix-sdk-base/src/store/integration_tests.rs

Lines changed: 96 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ use ruma::{
2929
},
3030
owned_event_id, owned_mxc_uri, room_id,
3131
serde::Raw,
32-
uint, user_id, EventId, OwnedEventId, OwnedUserId, RoomId, TransactionId, UserId,
32+
uint, user_id, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedUserId, RoomId,
33+
TransactionId, UserId,
3334
};
3435
use serde_json::{json, value::Value as JsonValue};
3536

@@ -980,13 +981,21 @@ impl StateStoreIntegrationTests for DynStateStore {
980981
let ev =
981982
SerializableEventContent::new(&RoomMessageEventContent::text_plain("sup").into())
982983
.unwrap();
983-
self.save_send_queue_request(room_id, txn.clone(), ev.into(), 0).await?;
984+
self.save_send_queue_request(
985+
room_id,
986+
txn.clone(),
987+
MilliSecondsSinceUnixEpoch::now(),
988+
ev.into(),
989+
0,
990+
)
991+
.await?;
984992

985993
// Add a single dependent queue request.
986994
self.save_dependent_queued_request(
987995
room_id,
988996
&txn,
989997
ChildTransactionId::new(),
998+
MilliSecondsSinceUnixEpoch::now(),
990999
DependentQueuedRequestKind::RedactEvent,
9911000
)
9921001
.await?;
@@ -1242,7 +1251,15 @@ impl StateStoreIntegrationTests for DynStateStore {
12421251
let event0 =
12431252
SerializableEventContent::new(&RoomMessageEventContent::text_plain("msg0").into())
12441253
.unwrap();
1245-
self.save_send_queue_request(room_id, txn0.clone(), event0.into(), 0).await.unwrap();
1254+
self.save_send_queue_request(
1255+
room_id,
1256+
txn0.clone(),
1257+
MilliSecondsSinceUnixEpoch::now(),
1258+
event0.into(),
1259+
0,
1260+
)
1261+
.await
1262+
.unwrap();
12461263

12471264
// Reading it will work.
12481265
let pending = self.load_send_queue_requests(room_id).await.unwrap();
@@ -1266,7 +1283,15 @@ impl StateStoreIntegrationTests for DynStateStore {
12661283
)
12671284
.unwrap();
12681285

1269-
self.save_send_queue_request(room_id, txn, event.into(), 0).await.unwrap();
1286+
self.save_send_queue_request(
1287+
room_id,
1288+
txn,
1289+
MilliSecondsSinceUnixEpoch::now(),
1290+
event.into(),
1291+
0,
1292+
)
1293+
.await
1294+
.unwrap();
12701295
}
12711296

12721297
// Reading all the events should work.
@@ -1364,7 +1389,15 @@ impl StateStoreIntegrationTests for DynStateStore {
13641389
let event =
13651390
SerializableEventContent::new(&RoomMessageEventContent::text_plain("room2").into())
13661391
.unwrap();
1367-
self.save_send_queue_request(room_id2, txn.clone(), event.into(), 0).await.unwrap();
1392+
self.save_send_queue_request(
1393+
room_id2,
1394+
txn.clone(),
1395+
MilliSecondsSinceUnixEpoch::now(),
1396+
event.into(),
1397+
0,
1398+
)
1399+
.await
1400+
.unwrap();
13681401
}
13691402

13701403
// Add and remove one event for room3.
@@ -1374,7 +1407,15 @@ impl StateStoreIntegrationTests for DynStateStore {
13741407
let event =
13751408
SerializableEventContent::new(&RoomMessageEventContent::text_plain("room3").into())
13761409
.unwrap();
1377-
self.save_send_queue_request(room_id3, txn.clone(), event.into(), 0).await.unwrap();
1410+
self.save_send_queue_request(
1411+
room_id3,
1412+
txn.clone(),
1413+
MilliSecondsSinceUnixEpoch::now(),
1414+
event.into(),
1415+
0,
1416+
)
1417+
.await
1418+
.unwrap();
13781419

13791420
self.remove_send_queue_request(room_id3, &txn).await.unwrap();
13801421
}
@@ -1399,21 +1440,45 @@ impl StateStoreIntegrationTests for DynStateStore {
13991440
let ev0 =
14001441
SerializableEventContent::new(&RoomMessageEventContent::text_plain("low0").into())
14011442
.unwrap();
1402-
self.save_send_queue_request(room_id, low0_txn.clone(), ev0.into(), 2).await.unwrap();
1443+
self.save_send_queue_request(
1444+
room_id,
1445+
low0_txn.clone(),
1446+
MilliSecondsSinceUnixEpoch::now(),
1447+
ev0.into(),
1448+
2,
1449+
)
1450+
.await
1451+
.unwrap();
14031452

14041453
// Saving one request with higher priority should work.
14051454
let high_txn = TransactionId::new();
14061455
let ev1 =
14071456
SerializableEventContent::new(&RoomMessageEventContent::text_plain("high").into())
14081457
.unwrap();
1409-
self.save_send_queue_request(room_id, high_txn.clone(), ev1.into(), 10).await.unwrap();
1458+
self.save_send_queue_request(
1459+
room_id,
1460+
high_txn.clone(),
1461+
MilliSecondsSinceUnixEpoch::now(),
1462+
ev1.into(),
1463+
10,
1464+
)
1465+
.await
1466+
.unwrap();
14101467

14111468
// Saving another request with the low priority should work.
14121469
let low1_txn = TransactionId::new();
14131470
let ev2 =
14141471
SerializableEventContent::new(&RoomMessageEventContent::text_plain("low1").into())
14151472
.unwrap();
1416-
self.save_send_queue_request(room_id, low1_txn.clone(), ev2.into(), 2).await.unwrap();
1473+
self.save_send_queue_request(
1474+
room_id,
1475+
low1_txn.clone(),
1476+
MilliSecondsSinceUnixEpoch::now(),
1477+
ev2.into(),
1478+
2,
1479+
)
1480+
.await
1481+
.unwrap();
14171482

14181483
// The requests should be ordered from higher priority to lower, and when equal,
14191484
// should use the insertion order instead.
@@ -1453,7 +1518,15 @@ impl StateStoreIntegrationTests for DynStateStore {
14531518
let event0 =
14541519
SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey").into())
14551520
.unwrap();
1456-
self.save_send_queue_request(room_id, txn0.clone(), event0.into(), 0).await.unwrap();
1521+
self.save_send_queue_request(
1522+
room_id,
1523+
txn0.clone(),
1524+
MilliSecondsSinceUnixEpoch::now(),
1525+
event0.into(),
1526+
0,
1527+
)
1528+
.await
1529+
.unwrap();
14571530

14581531
// No dependents, to start with.
14591532
assert!(self.load_dependent_queued_requests(room_id).await.unwrap().is_empty());
@@ -1464,6 +1537,7 @@ impl StateStoreIntegrationTests for DynStateStore {
14641537
room_id,
14651538
&txn0,
14661539
child_txn.clone(),
1540+
MilliSecondsSinceUnixEpoch::now(),
14671541
DependentQueuedRequestKind::RedactEvent,
14681542
)
14691543
.await
@@ -1515,12 +1589,21 @@ impl StateStoreIntegrationTests for DynStateStore {
15151589
let event1 =
15161590
SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey2").into())
15171591
.unwrap();
1518-
self.save_send_queue_request(room_id, txn1.clone(), event1.into(), 0).await.unwrap();
1592+
self.save_send_queue_request(
1593+
room_id,
1594+
txn1.clone(),
1595+
MilliSecondsSinceUnixEpoch::now(),
1596+
event1.into(),
1597+
0,
1598+
)
1599+
.await
1600+
.unwrap();
15191601

15201602
self.save_dependent_queued_request(
15211603
room_id,
15221604
&txn0,
15231605
ChildTransactionId::new(),
1606+
MilliSecondsSinceUnixEpoch::now(),
15241607
DependentQueuedRequestKind::RedactEvent,
15251608
)
15261609
.await
@@ -1531,6 +1614,7 @@ impl StateStoreIntegrationTests for DynStateStore {
15311614
room_id,
15321615
&txn1,
15331616
ChildTransactionId::new(),
1617+
MilliSecondsSinceUnixEpoch::now(),
15341618
DependentQueuedRequestKind::EditEvent {
15351619
new_content: SerializableEventContent::new(
15361620
&RoomMessageEventContent::text_plain("edit").into(),
@@ -1563,6 +1647,7 @@ impl StateStoreIntegrationTests for DynStateStore {
15631647
room_id,
15641648
&txn,
15651649
child_txn.clone(),
1650+
MilliSecondsSinceUnixEpoch::now(),
15661651
DependentQueuedRequestKind::RedactEvent,
15671652
)
15681653
.await

crates/matrix-sdk-base/src/store/memory_store.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ use ruma::{
3030
},
3131
serde::Raw,
3232
time::Instant,
33-
CanonicalJsonObject, EventId, OwnedEventId, OwnedMxcUri, OwnedRoomId, OwnedTransactionId,
34-
OwnedUserId, RoomId, RoomVersionId, TransactionId, UserId,
33+
CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri,
34+
OwnedRoomId, OwnedTransactionId, OwnedUserId, RoomId, RoomVersionId, TransactionId, UserId,
3535
};
3636
use tracing::{debug, instrument, warn};
3737

@@ -750,6 +750,7 @@ impl StateStore for MemoryStore {
750750
&self,
751751
room_id: &RoomId,
752752
transaction_id: OwnedTransactionId,
753+
created_at: MilliSecondsSinceUnixEpoch,
753754
kind: QueuedRequestKind,
754755
priority: usize,
755756
) -> Result<(), Self::Error> {
@@ -759,7 +760,7 @@ impl StateStore for MemoryStore {
759760
.send_queue_events
760761
.entry(room_id.to_owned())
761762
.or_default()
762-
.push(QueuedRequest { kind, transaction_id, error: None, priority });
763+
.push(QueuedRequest { kind, transaction_id, error: None, priority, created_at });
763764
Ok(())
764765
}
765766

@@ -858,6 +859,7 @@ impl StateStore for MemoryStore {
858859
room: &RoomId,
859860
parent_transaction_id: &TransactionId,
860861
own_transaction_id: ChildTransactionId,
862+
created_at: MilliSecondsSinceUnixEpoch,
861863
content: DependentQueuedRequestKind,
862864
) -> Result<(), Self::Error> {
863865
self.inner
@@ -871,6 +873,7 @@ impl StateStore for MemoryStore {
871873
parent_transaction_id: parent_transaction_id.to_owned(),
872874
own_transaction_id,
873875
parent_key: None,
876+
created_at,
874877
});
875878
Ok(())
876879
}

crates/matrix-sdk-base/src/store/send_queue.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ use ruma::{
2323
AnyMessageLikeEventContent, EventContent as _, RawExt as _,
2424
},
2525
serde::Raw,
26-
OwnedDeviceId, OwnedEventId, OwnedTransactionId, OwnedUserId, TransactionId, UInt,
26+
MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedEventId, OwnedTransactionId, OwnedUserId,
27+
TransactionId, UInt,
2728
};
2829
use serde::{Deserialize, Serialize};
2930

@@ -131,6 +132,9 @@ pub struct QueuedRequest {
131132
/// The bigger the value, the higher the priority at which this request
132133
/// should be handled.
133134
pub priority: usize,
135+
136+
/// The time that the request was originally attempted.
137+
pub created_at: MilliSecondsSinceUnixEpoch,
134138
}
135139

136140
impl QueuedRequest {
@@ -371,6 +375,9 @@ pub struct DependentQueuedRequest {
371375
/// If the parent request has been sent, the parent's request identifier
372376
/// returned by the server once the local echo has been sent out.
373377
pub parent_key: Option<SentRequestKey>,
378+
379+
/// The time that the request was originally attempted.
380+
pub created_at: MilliSecondsSinceUnixEpoch,
374381
}
375382

376383
impl DependentQueuedRequest {

crates/matrix-sdk-base/src/store/traits.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ use ruma::{
3535
},
3636
serde::Raw,
3737
time::SystemTime,
38-
EventId, OwnedEventId, OwnedMxcUri, OwnedRoomId, OwnedTransactionId, OwnedUserId, RoomId,
39-
TransactionId, UserId,
38+
EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri, OwnedRoomId,
39+
OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UserId,
4040
};
4141
use serde::{Deserialize, Serialize};
4242

@@ -359,6 +359,7 @@ pub trait StateStore: AsyncTraitDeps {
359359
&self,
360360
room_id: &RoomId,
361361
transaction_id: OwnedTransactionId,
362+
created_at: MilliSecondsSinceUnixEpoch,
362363
request: QueuedRequestKind,
363364
priority: usize,
364365
) -> Result<(), Self::Error>;
@@ -421,6 +422,7 @@ pub trait StateStore: AsyncTraitDeps {
421422
room_id: &RoomId,
422423
parent_txn_id: &TransactionId,
423424
own_txn_id: ChildTransactionId,
425+
created_at: MilliSecondsSinceUnixEpoch,
424426
content: DependentQueuedRequestKind,
425427
) -> Result<(), Self::Error>;
426428

@@ -657,11 +659,12 @@ impl<T: StateStore> StateStore for EraseStateStoreError<T> {
657659
&self,
658660
room_id: &RoomId,
659661
transaction_id: OwnedTransactionId,
662+
created_at: MilliSecondsSinceUnixEpoch,
660663
content: QueuedRequestKind,
661664
priority: usize,
662665
) -> Result<(), Self::Error> {
663666
self.0
664-
.save_send_queue_request(room_id, transaction_id, content, priority)
667+
.save_send_queue_request(room_id, transaction_id, created_at, content, priority)
665668
.await
666669
.map_err(Into::into)
667670
}
@@ -711,10 +714,11 @@ impl<T: StateStore> StateStore for EraseStateStoreError<T> {
711714
room_id: &RoomId,
712715
parent_txn_id: &TransactionId,
713716
own_txn_id: ChildTransactionId,
717+
created_at: MilliSecondsSinceUnixEpoch,
714718
content: DependentQueuedRequestKind,
715719
) -> Result<(), Self::Error> {
716720
self.0
717-
.save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, content)
721+
.save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, created_at, content)
718722
.await
719723
.map_err(Into::into)
720724
}

0 commit comments

Comments
 (0)