Skip to content

Commit c287e8e

Browse files
authored
feat: communicate store changes from background processing (#342)
A background processing task fetches new messages and stores them in the database. This task is triggered by push notifications. This commit introduces a store notifications queue. The background task stores in the queue any changes made to the store in the background. When the foreground application reconnects to the websocket, it dequeues the pending store notifications and communicates them to the UI so it can update its state. Fixes #337
1 parent 988180f commit c287e8e

File tree

10 files changed

+350
-7
lines changed

10 files changed

+350
-7
lines changed

apiclient/src/qs_api/ws.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@
33
// SPDX-License-Identifier: AGPL-3.0-or-later
44

55
use core::time;
6-
use std::time::Duration;
6+
use std::{pin::pin, time::Duration};
77

88
use base64::{engine::general_purpose, Engine as _};
9-
use futures_util::{pin_mut, SinkExt, StreamExt};
9+
use futures_util::{SinkExt, StreamExt};
1010
use http::{HeaderValue, Request};
1111
use phnxtypes::{
1212
codec::PhnxCodec,
@@ -116,7 +116,7 @@ impl QsWebSocket {
116116
let mut interval = tokio::time::interval(Duration::from_secs(1));
117117

118118
// Pin the stream
119-
pin_mut!(ws_stream);
119+
let mut ws_stream = pin!(ws_stream);
120120

121121
// Initialize the connection status
122122
let mut connection_status = ConnectionStatus::new();

applogic/src/api/user_cubit.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use std::time::Duration;
1010
use anyhow::bail;
1111
use flutter_rust_bridge::frb;
1212
use phnxapiclient::qs_api::ws::WsEvent;
13-
use phnxcoreclient::{clients::CoreUser, ConversationId};
13+
use phnxcoreclient::{clients::CoreUser, store::Store, ConversationId};
1414
use phnxcoreclient::{Asset, UserProfile};
1515
use phnxtypes::identifiers::QualifiedUserName;
1616
use phnxtypes::messages::client_ds::QsWsMessage;
@@ -318,7 +318,15 @@ fn spawn_polling(core_user: CoreUser, cancel: CancellationToken) {
318318

319319
async fn handle_websocket_message(event: WsEvent, core_user: &CoreUser) {
320320
match event {
321-
WsEvent::ConnectedEvent => info!("connected to websocket"),
321+
WsEvent::ConnectedEvent => {
322+
info!("connected to websocket");
323+
// After (re)connecting, dequeue any pending store notifications that might have been
324+
// enqueued by the push notifications background processing task.
325+
match core_user.dequeue_notification().await {
326+
Ok(notification) => core_user.notify(notification),
327+
Err(error) => error!(%error, "Failed to dequeue store notification"),
328+
}
329+
}
322330
WsEvent::DisconnectedEvent => info!("disconnected from websocket"),
323331
WsEvent::MessageEvent(QsWsMessage::Event(event)) => {
324332
warn!("ignoring websocket event: {event:?}")

applogic/src/background_execution/processing.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
//
33
// SPDX-License-Identifier: AGPL-3.0-or-later
44

5+
use phnxcoreclient::store::Store;
56
use std::panic::{self, AssertUnwindSafe};
67
use tokio::runtime::Builder;
78
use tracing::{error, info};
@@ -76,6 +77,9 @@ pub(crate) async fn retrieve_messages(path: String) -> NotificationBatch {
7677
}
7778
};
7879

80+
// capture store notification in below store calls
81+
let pending_store_notifications = user.user.subscribe_iter();
82+
7983
let notifications = match user.fetch_all_messages().await {
8084
Ok(fetched_messages) => {
8185
info!("All messages fetched");
@@ -100,6 +104,12 @@ pub(crate) async fn retrieve_messages(path: String) -> NotificationBatch {
100104

101105
let badge_count = user.global_unread_messages_count().await;
102106

107+
for store_notification in pending_store_notifications {
108+
if let Err(error) = Store::enqueue_notification(&user.user, &store_notification).await {
109+
error!(%error, "Failed to enqueue store notification");
110+
}
111+
}
112+
103113
NotificationBatch {
104114
badge_count,
105115
removals: Vec::new(),
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
// SPDX-FileCopyrightText: 2025 Phoenix R&D GmbH <[email protected]>
2+
//
3+
// SPDX-License-Identifier: AGPL-3.0-or-later
4+
5+
use crate::store::StoreNotification;
6+
7+
pub fn migration() -> String {
8+
StoreNotification::CREATE_TABLE_STATEMENT.to_string()
9+
}

coreclient/src/clients/mod.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,21 +274,49 @@ impl CoreUser {
274274
}
275275

276276
pub(crate) fn send_store_notification(&self, notification: StoreNotification) {
277-
if !notification.ops.is_empty() {
277+
if !notification.is_empty() {
278278
self.inner.store_notifications_tx.notify(notification);
279279
}
280280
}
281281

282+
/// Subscribes to store notifications.
283+
///
284+
/// All notifications sent after this function was called are observed as items of the returned
285+
/// stream.
282286
pub(crate) fn subscribe_to_store_notifications(
283287
&self,
284288
) -> impl Stream<Item = Arc<StoreNotification>> + Send + 'static {
285289
self.inner.store_notifications_tx.subscribe()
286290
}
287291

292+
/// Subcribes to pending store notifications.
293+
///
294+
/// Unlike `subscribe_to_store_notifications`, this function does not remove stored
295+
/// notifications from the persisted queue.
296+
pub(crate) fn subscribe_iter_to_store_notifications(
297+
&self,
298+
) -> impl Iterator<Item = Arc<StoreNotification>> + Send + 'static {
299+
self.inner.store_notifications_tx.subscribe_iter()
300+
}
301+
288302
pub(crate) fn store_notifier(&self) -> StoreNotifier {
289303
StoreNotifier::new(self.inner.store_notifications_tx.clone())
290304
}
291305

306+
pub(crate) async fn enqueue_store_notification(
307+
&self,
308+
notification: &StoreNotification,
309+
) -> Result<()> {
310+
notification.enqueue(&mut *self.inner.connection.lock().await)?;
311+
Ok(())
312+
}
313+
314+
pub(crate) async fn dequeue_store_notification(&self) -> Result<StoreNotification> {
315+
Ok(StoreNotification::dequeue(
316+
&mut *self.inner.connection.lock().await,
317+
)?)
318+
}
319+
292320
pub async fn set_own_user_profile(&self, mut user_profile: UserProfile) -> Result<()> {
293321
if user_profile.user_name() != &self.user_name() {
294322
bail!("Can't set user profile for users other than the current user.",);

coreclient/src/store/impl.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,4 +167,16 @@ impl Store for CoreUser {
167167
fn subscribe(&self) -> impl Stream<Item = Arc<StoreNotification>> + Send + 'static {
168168
self.subscribe_to_store_notifications()
169169
}
170+
171+
fn subscribe_iter(&self) -> impl Iterator<Item = Arc<StoreNotification>> + Send + 'static {
172+
self.subscribe_iter_to_store_notifications()
173+
}
174+
175+
async fn enqueue_notification(&self, notification: &StoreNotification) -> StoreResult<()> {
176+
self.enqueue_store_notification(notification).await
177+
}
178+
179+
async fn dequeue_notification(&self) -> StoreResult<StoreNotification> {
180+
self.dequeue_store_notification().await
181+
}
170182
}

coreclient/src/store/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ pub(crate) use notification::{StoreNotificationsSender, StoreNotifier};
1919

2020
mod r#impl;
2121
mod notification;
22+
mod persistence;
2223

2324
/// The result type of a failable [`Store`] method
2425
pub type StoreResult<T> = anyhow::Result<T>;
@@ -133,4 +134,10 @@ pub trait LocalStore {
133134
fn notify(&self, notification: StoreNotification);
134135

135136
fn subscribe(&self) -> impl Stream<Item = Arc<StoreNotification>> + Send + 'static;
137+
138+
fn subscribe_iter(&self) -> impl Iterator<Item = Arc<StoreNotification>> + Send + 'static;
139+
140+
async fn enqueue_notification(&self, notification: &StoreNotification) -> StoreResult<()>;
141+
142+
async fn dequeue_notification(&self) -> StoreResult<StoreNotification>;
136143
}

coreclient/src/store/notification.rs

Lines changed: 110 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,13 @@ impl StoreNotificationsSender {
108108

109109
/// Sends a notification to all current subscribers.
110110
pub(crate) fn notify(&self, notification: impl Into<Arc<StoreNotification>>) {
111-
let _no_receivers = self.tx.send(notification.into());
111+
let notification = notification.into();
112+
debug!(
113+
num_receivers = self.tx.receiver_count(),
114+
?notification,
115+
"StoreNotificationsSender::notify"
116+
);
117+
let _no_receivers = self.tx.send(notification);
112118
}
113119

114120
/// Creates a new subscription to the notifications.
@@ -126,6 +132,31 @@ impl StoreNotificationsSender {
126132
}
127133
})
128134
}
135+
136+
/// Returns all pending notifications.
137+
///
138+
/// The pending notifications are the notifications captured starting at the call to this function.
139+
/// Getting the next item from the iterator gets the next pending notification is there is any,
140+
/// otherwise it returns `None`. Therefore, the iterator is not fused.
141+
///
142+
/// This is useful for capturing all pending notifications synchronously.
143+
pub(crate) fn subscribe_iter(
144+
&self,
145+
) -> impl Iterator<Item = Arc<StoreNotification>> + Send + 'static {
146+
let mut rx = self.tx.subscribe();
147+
std::iter::from_fn(move || loop {
148+
match rx.try_recv() {
149+
Ok(notification) => return Some(notification),
150+
Err(broadcast::error::TryRecvError::Lagged(n)) => {
151+
error!(n, "store notifications lagged");
152+
continue;
153+
}
154+
Err(
155+
broadcast::error::TryRecvError::Closed | broadcast::error::TryRecvError::Empty,
156+
) => return None,
157+
}
158+
})
159+
}
129160
}
130161

131162
impl Default for StoreNotificationsSender {
@@ -139,6 +170,7 @@ impl Default for StoreNotificationsSender {
139170
/// Bundles all changes to the store, that is, all entities that have been added, updated or
140171
/// removed.
141172
#[derive(Debug, Default)]
173+
#[cfg_attr(test, derive(PartialEq, Eq))]
142174
pub struct StoreNotification {
143175
pub ops: BTreeMap<StoreEntityId, StoreOperation>,
144176
}
@@ -147,6 +179,10 @@ impl StoreNotification {
147179
fn empty() -> Self {
148180
Self::default()
149181
}
182+
183+
pub fn is_empty(&self) -> bool {
184+
self.ops.is_empty()
185+
}
150186
}
151187

152188
/// Operation which was performed in a [`super::Store`]
@@ -168,3 +204,76 @@ pub enum StoreEntityId {
168204
Conversation(ConversationId),
169205
Message(ConversationMessageId),
170206
}
207+
208+
impl StoreEntityId {
209+
pub(crate) fn kind(&self) -> StoreEntityKind {
210+
match self {
211+
StoreEntityId::User(_) => StoreEntityKind::User,
212+
StoreEntityId::Conversation(_) => StoreEntityKind::Conversation,
213+
StoreEntityId::Message(_) => StoreEntityKind::Message,
214+
}
215+
}
216+
}
217+
218+
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
219+
pub(crate) enum StoreEntityKind {
220+
User = 0,
221+
Conversation = 1,
222+
Message = 2,
223+
}
224+
225+
#[cfg(test)]
226+
mod tests {
227+
use super::*;
228+
229+
#[test]
230+
fn subscribe_iter() {
231+
let tx = StoreNotificationsSender::new();
232+
233+
let ops_1: BTreeMap<_, _> = [(
234+
StoreEntityId::User("alice@localhost".parse().unwrap()),
235+
StoreOperation::Add,
236+
)]
237+
.into_iter()
238+
.collect();
239+
240+
let ops_2: BTreeMap<_, _> = [(
241+
StoreEntityId::User("bob@localhost".parse().unwrap()),
242+
StoreOperation::Update,
243+
)]
244+
.into_iter()
245+
.collect();
246+
247+
let ops_3: BTreeMap<_, _> = [(
248+
StoreEntityId::User("eve@localhost".parse().unwrap()),
249+
StoreOperation::Remove,
250+
)]
251+
.into_iter()
252+
.collect();
253+
254+
let ops_4: BTreeMap<_, _> = [(
255+
StoreEntityId::User("mallory@localhost".parse().unwrap()),
256+
StoreOperation::Add,
257+
)]
258+
.into_iter()
259+
.collect();
260+
261+
tx.notify(StoreNotification {
262+
ops: ops_1.into_iter().collect(),
263+
});
264+
265+
let mut iter = tx.subscribe_iter();
266+
267+
tx.notify(StoreNotification { ops: ops_2.clone() });
268+
269+
// first notification is not observed, because it was sent before the subscription
270+
assert_eq!(iter.next().unwrap().ops, ops_2);
271+
assert_eq!(iter.next(), None);
272+
273+
tx.notify(StoreNotification { ops: ops_3.clone() });
274+
assert_eq!(iter.next().unwrap().ops, ops_3);
275+
tx.notify(StoreNotification { ops: ops_4.clone() });
276+
assert_eq!(iter.next().unwrap().ops, ops_4);
277+
assert_eq!(iter.next(), None);
278+
}
279+
}

0 commit comments

Comments
 (0)