Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: communicate store changes from background processing #342

Merged
merged 5 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions apiclient/src/qs_api/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

use core::time;
use std::time::Duration;
use std::{pin::pin, time::Duration};

use base64::{engine::general_purpose, Engine as _};
use futures_util::{pin_mut, SinkExt, StreamExt};
use futures_util::{SinkExt, StreamExt};
use http::{HeaderValue, Request};
use phnxtypes::{
codec::PhnxCodec,
Expand Down Expand Up @@ -116,7 +116,7 @@ impl QsWebSocket {
let mut interval = tokio::time::interval(Duration::from_secs(1));

// Pin the stream
pin_mut!(ws_stream);
let mut ws_stream = pin!(ws_stream);

// Initialize the connection status
let mut connection_status = ConnectionStatus::new();
Expand Down
12 changes: 10 additions & 2 deletions applogic/src/api/user_cubit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::time::Duration;
use anyhow::bail;
use flutter_rust_bridge::frb;
use phnxapiclient::qs_api::ws::WsEvent;
use phnxcoreclient::{clients::CoreUser, ConversationId};
use phnxcoreclient::{clients::CoreUser, store::Store, ConversationId};
use phnxcoreclient::{Asset, UserProfile};
use phnxtypes::identifiers::QualifiedUserName;
use phnxtypes::messages::client_ds::QsWsMessage;
Expand Down Expand Up @@ -318,7 +318,15 @@ fn spawn_polling(core_user: CoreUser, cancel: CancellationToken) {

async fn handle_websocket_message(event: WsEvent, core_user: &CoreUser) {
match event {
WsEvent::ConnectedEvent => info!("connected to websocket"),
WsEvent::ConnectedEvent => {
info!("connected to websocket");
// After (re)connecting, dequeue any pending store notifications that might have been
// enqueued by the push notifications background processing task.
match core_user.dequeue_notification().await {
Ok(notification) => core_user.notify(notification),
Err(error) => error!(%error, "Failed to dequeue store notification"),
}
}
WsEvent::DisconnectedEvent => info!("disconnected from websocket"),
WsEvent::MessageEvent(QsWsMessage::Event(event)) => {
warn!("ignoring websocket event: {event:?}")
Expand Down
10 changes: 10 additions & 0 deletions applogic/src/background_execution/processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//
// SPDX-License-Identifier: AGPL-3.0-or-later

use phnxcoreclient::store::Store;
use std::panic::{self, AssertUnwindSafe};
use tokio::runtime::Builder;
use tracing::{error, info};
Expand Down Expand Up @@ -76,6 +77,9 @@ pub(crate) async fn retrieve_messages(path: String) -> NotificationBatch {
}
};

// capture store notification in below store calls
let pending_store_notifications = user.user.subscribe_iter();

let notifications = match user.fetch_all_messages().await {
Ok(fetched_messages) => {
info!("All messages fetched");
Expand All @@ -100,6 +104,12 @@ pub(crate) async fn retrieve_messages(path: String) -> NotificationBatch {

let badge_count = user.global_unread_messages_count().await;

for store_notification in pending_store_notifications {
if let Err(error) = Store::enqueue_notification(&user.user, &store_notification).await {
error!(%error, "Failed to enqueue store notification");
}
}

NotificationBatch {
badge_count,
removals: Vec::new(),
Expand Down
9 changes: 9 additions & 0 deletions coreclient/migrations/V3__store_notifications_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// SPDX-FileCopyrightText: 2025 Phoenix R&D GmbH <[email protected]>
//
// SPDX-License-Identifier: AGPL-3.0-or-later

use crate::store::StoreNotification;

pub fn migration() -> String {
StoreNotification::CREATE_TABLE_STATEMENT.to_string()
}
30 changes: 29 additions & 1 deletion coreclient/src/clients/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,21 +274,49 @@ impl CoreUser {
}

pub(crate) fn send_store_notification(&self, notification: StoreNotification) {
if !notification.ops.is_empty() {
if !notification.is_empty() {
self.inner.store_notifications_tx.notify(notification);
}
}

/// Subscribes to store notifications.
///
/// All notifications sent after this function was called are observed as items of the returned
/// stream.
pub(crate) fn subscribe_to_store_notifications(
&self,
) -> impl Stream<Item = Arc<StoreNotification>> + Send + 'static {
self.inner.store_notifications_tx.subscribe()
}

/// Subcribes to pending store notifications.
///
/// Unlike `subscribe_to_store_notifications`, this function does not remove stored
/// notifications from the persisted queue.
pub(crate) fn subscribe_iter_to_store_notifications(
&self,
) -> impl Iterator<Item = Arc<StoreNotification>> + Send + 'static {
self.inner.store_notifications_tx.subscribe_iter()
}

pub(crate) fn store_notifier(&self) -> StoreNotifier {
StoreNotifier::new(self.inner.store_notifications_tx.clone())
}

pub(crate) async fn enqueue_store_notification(
&self,
notification: &StoreNotification,
) -> Result<()> {
notification.enqueue(&mut *self.inner.connection.lock().await)?;
Ok(())
}

pub(crate) async fn dequeue_store_notification(&self) -> Result<StoreNotification> {
Ok(StoreNotification::dequeue(
&mut *self.inner.connection.lock().await,
)?)
}

pub async fn set_own_user_profile(&self, mut user_profile: UserProfile) -> Result<()> {
if user_profile.user_name() != &self.user_name() {
bail!("Can't set user profile for users other than the current user.",);
Expand Down
12 changes: 12 additions & 0 deletions coreclient/src/store/impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,16 @@ impl Store for CoreUser {
fn subscribe(&self) -> impl Stream<Item = Arc<StoreNotification>> + Send + 'static {
self.subscribe_to_store_notifications()
}

fn subscribe_iter(&self) -> impl Iterator<Item = Arc<StoreNotification>> + Send + 'static {
self.subscribe_iter_to_store_notifications()
}

async fn enqueue_notification(&self, notification: &StoreNotification) -> StoreResult<()> {
self.enqueue_store_notification(notification).await
}

async fn dequeue_notification(&self) -> StoreResult<StoreNotification> {
self.dequeue_store_notification().await
}
}
7 changes: 7 additions & 0 deletions coreclient/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub(crate) use notification::{StoreNotificationsSender, StoreNotifier};

mod r#impl;
mod notification;
mod persistence;

/// The result type of a failable [`Store`] method
pub type StoreResult<T> = anyhow::Result<T>;
Expand Down Expand Up @@ -133,4 +134,10 @@ pub trait LocalStore {
fn notify(&self, notification: StoreNotification);

fn subscribe(&self) -> impl Stream<Item = Arc<StoreNotification>> + Send + 'static;

fn subscribe_iter(&self) -> impl Iterator<Item = Arc<StoreNotification>> + Send + 'static;

async fn enqueue_notification(&self, notification: &StoreNotification) -> StoreResult<()>;

async fn dequeue_notification(&self) -> StoreResult<StoreNotification>;
}
111 changes: 110 additions & 1 deletion coreclient/src/store/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,13 @@ impl StoreNotificationsSender {

/// Sends a notification to all current subscribers.
pub(crate) fn notify(&self, notification: impl Into<Arc<StoreNotification>>) {
let _no_receivers = self.tx.send(notification.into());
let notification = notification.into();
debug!(
num_receivers = self.tx.receiver_count(),
?notification,
"StoreNotificationsSender::notify"
);
let _no_receivers = self.tx.send(notification);
}

/// Creates a new subscription to the notifications.
Expand All @@ -126,6 +132,31 @@ impl StoreNotificationsSender {
}
})
}

/// Returns all pending notifications.
///
/// The pending notifications are the notifications captured starting at the call to this function.
/// Getting the next item from the iterator gets the next pending notification is there is any,
/// otherwise it returns `None`. Therefore, the iterator is not fused.
///
/// This is useful for capturing all pending notifications synchronously.
pub(crate) fn subscribe_iter(
&self,
) -> impl Iterator<Item = Arc<StoreNotification>> + Send + 'static {
let mut rx = self.tx.subscribe();
std::iter::from_fn(move || loop {
match rx.try_recv() {
Ok(notification) => return Some(notification),
Err(broadcast::error::TryRecvError::Lagged(n)) => {
error!(n, "store notifications lagged");
continue;
}
Err(
broadcast::error::TryRecvError::Closed | broadcast::error::TryRecvError::Empty,
) => return None,
}
})
}
}

impl Default for StoreNotificationsSender {
Expand All @@ -139,6 +170,7 @@ impl Default for StoreNotificationsSender {
/// Bundles all changes to the store, that is, all entities that have been added, updated or
/// removed.
#[derive(Debug, Default)]
#[cfg_attr(test, derive(PartialEq, Eq))]
pub struct StoreNotification {
pub ops: BTreeMap<StoreEntityId, StoreOperation>,
}
Expand All @@ -147,6 +179,10 @@ impl StoreNotification {
fn empty() -> Self {
Self::default()
}

pub fn is_empty(&self) -> bool {
self.ops.is_empty()
}
}

/// Operation which was performed in a [`super::Store`]
Expand All @@ -168,3 +204,76 @@ pub enum StoreEntityId {
Conversation(ConversationId),
Message(ConversationMessageId),
}

impl StoreEntityId {
pub(crate) fn kind(&self) -> StoreEntityKind {
match self {
StoreEntityId::User(_) => StoreEntityKind::User,
StoreEntityId::Conversation(_) => StoreEntityKind::Conversation,
StoreEntityId::Message(_) => StoreEntityKind::Message,
}
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub(crate) enum StoreEntityKind {
User = 0,
Conversation = 1,
Message = 2,
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn subscribe_iter() {
let tx = StoreNotificationsSender::new();

let ops_1: BTreeMap<_, _> = [(
StoreEntityId::User("alice@localhost".parse().unwrap()),
StoreOperation::Add,
)]
.into_iter()
.collect();

let ops_2: BTreeMap<_, _> = [(
StoreEntityId::User("bob@localhost".parse().unwrap()),
StoreOperation::Update,
)]
.into_iter()
.collect();

let ops_3: BTreeMap<_, _> = [(
StoreEntityId::User("eve@localhost".parse().unwrap()),
StoreOperation::Remove,
)]
.into_iter()
.collect();

let ops_4: BTreeMap<_, _> = [(
StoreEntityId::User("mallory@localhost".parse().unwrap()),
StoreOperation::Add,
)]
.into_iter()
.collect();

tx.notify(StoreNotification {
ops: ops_1.into_iter().collect(),
});

let mut iter = tx.subscribe_iter();

tx.notify(StoreNotification { ops: ops_2.clone() });

// first notification is not observed, because it was sent before the subscription
assert_eq!(iter.next().unwrap().ops, ops_2);
assert_eq!(iter.next(), None);

tx.notify(StoreNotification { ops: ops_3.clone() });
assert_eq!(iter.next().unwrap().ops, ops_3);
tx.notify(StoreNotification { ops: ops_4.clone() });
assert_eq!(iter.next().unwrap().ops, ops_4);
assert_eq!(iter.next(), None);
}
}
Loading