Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into dima/conversation-list
Browse files Browse the repository at this point in the history
  • Loading branch information
boxdot committed Jan 27, 2025
2 parents 159cb45 + 837a722 commit 1bb2b11
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 85 deletions.
212 changes: 212 additions & 0 deletions coreclient/src/clients/message.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
// SPDX-FileCopyrightText: 2025 Phoenix R&D GmbH <[email protected]>
//
// SPDX-License-Identifier: AGPL-3.0-or-later

use anyhow::Context;
use openmls::storage::OpenMlsProvider;
use phnxtypes::{
identifiers::QualifiedUserName, messages::client_ds_out::SendMessageParamsOut, time::TimeStamp,
};
use rusqlite::{Connection, Transaction};

use crate::{Conversation, ConversationId, ConversationMessage, MimiContent};

use super::{ApiClients, CoreUser, Group, PhnxOpenMlsProvider, StoreNotifier};

impl CoreUser {
/// Send a message and return it.
///
/// The message unsent messages is stored, then sent to the DS and finally returned. The
/// converstion is marked as read until this message.
pub(crate) async fn send_message(
&self,
conversation_id: ConversationId,
content: MimiContent,
) -> anyhow::Result<ConversationMessage> {
let unsent_group_message = self
.with_transaction(|transaction| {
InitialParams {
conversation_id,
content,
}
.store_unsent_message(transaction, self.store_notifier(), &self.user_name())?
.create_group_message(&PhnxOpenMlsProvider::new(transaction))?
.store_group_update(transaction, self.store_notifier())
})
.await?;

let sent_message = unsent_group_message
.send_message_to_ds(&self.inner.api_clients)
.await?;

self.with_transaction(|transaction| {
sent_message.mark_as_sent_and_read(transaction, self.store_notifier())
})
.await
}
}

struct InitialParams {
conversation_id: ConversationId,
content: MimiContent,
}

impl InitialParams {
fn store_unsent_message(
self,
connection: &Connection,
mut notifier: StoreNotifier,
sender: &QualifiedUserName,
) -> anyhow::Result<UnsentMessage<WithContent>> {
let InitialParams {
conversation_id,
content,
} = self;

let conversation = Conversation::load(connection, &conversation_id)?
.with_context(|| format!("Can't find conversation with id {conversation_id}"))?;
// Store the message as unsent so that we don't lose it in case
// something goes wrong.
let conversation_message = ConversationMessage::new_unsent_message(
sender.to_string(),
conversation_id,
content.clone(),
);
conversation_message.store(connection, &mut notifier)?;

let group_id = conversation.group_id();
let group = Group::load(connection, group_id)?
.with_context(|| format!("Can't find group with id {group_id:?}"))?;

// Notify as early as possible to react to the not yet sent message
notifier.notify();

Ok(UnsentMessage {
conversation,
group,
conversation_message,
state: WithContent(content),
})
}
}

// States of an unsent message

struct WithContent(MimiContent);
struct WithParams(SendMessageParamsOut);
struct StoredWithParams(SendMessageParamsOut);

struct UnsentMessage<State> {
conversation: Conversation,
group: Group,
conversation_message: ConversationMessage,
state: State,
}

impl UnsentMessage<WithContent> {
fn create_group_message(
self,
provider: &impl OpenMlsProvider,
) -> anyhow::Result<UnsentMessage<WithParams>> {
let Self {
conversation,
mut group,
conversation_message,
state: WithContent(content),
} = self;

let params = group.create_message(provider, content)?;

Ok(UnsentMessage {
conversation,
conversation_message,
group,
state: WithParams(params),
})
}
}

impl UnsentMessage<WithParams> {
fn store_group_update(
self,
transaction: &Transaction,
mut notifier: StoreNotifier,
) -> anyhow::Result<UnsentMessage<StoredWithParams>> {
let Self {
conversation,
group,
conversation_message,
state: WithParams(params),
} = self;

// Immediately write the group back. No need to wait for the DS to
// confirm as this is just an application message.
group.store_update(transaction)?;
// Also, mark the message (and all messages preceeding it) as read.
Conversation::mark_as_read_until_message_id(
transaction,
&mut notifier,
conversation.id(),
conversation_message.id(),
)?;
notifier.notify();

Ok(UnsentMessage {
conversation,
group,
conversation_message,
state: StoredWithParams(params),
})
}
}

impl UnsentMessage<StoredWithParams> {
async fn send_message_to_ds(self, api_clients: &ApiClients) -> anyhow::Result<SentMessage> {
let Self {
conversation,
conversation_message,
group,
state: StoredWithParams(params),
} = self;

let ds_timestamp = api_clients
.get(&conversation.owner_domain())?
.ds_send_message(params, group.leaf_signer(), group.group_state_ear_key())
.await?;

Ok(SentMessage {
conversation_message,
ds_timestamp,
})
}
}

struct SentMessage {
conversation_message: ConversationMessage,
ds_timestamp: TimeStamp,
}

impl SentMessage {
fn mark_as_sent_and_read(
self,
transaction: &Transaction,
mut notifier: StoreNotifier,
) -> anyhow::Result<ConversationMessage> {
let Self {
mut conversation_message,
ds_timestamp,
} = self;

conversation_message.mark_as_sent(transaction, &mut notifier, ds_timestamp)?;
Conversation::mark_as_read_until_message_id(
transaction,
&mut notifier,
conversation_message.conversation_id(),
conversation_message.id(),
)?;

notifier.notify();

Ok(conversation_message)
}
}
103 changes: 24 additions & 79 deletions coreclient/src/clients/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use tokio_stream::Stream;
use tracing::{error, info};
use uuid::Uuid;

use crate::store::StoreNotificationsSender;
use crate::{
clients::connection_establishment::{ConnectionEstablishmentPackageTbs, FriendshipPackage},
contacts::{Contact, ContactAddInfos, PartialContact},
Expand All @@ -75,7 +76,6 @@ use crate::{
Asset,
};
use crate::{key_stores::as_credentials::AsCredentials, ConversationId};
use crate::{mimi_content::MimiContent, store::StoreNotificationsSender};
use crate::{
utils::persistence::{SqliteConnection, Storable},
Message,
Expand All @@ -87,6 +87,7 @@ pub(crate) mod api_clients;
pub(crate) mod connection_establishment;
pub mod conversations;
mod create_user;
mod message;
pub(crate) mod own_client_info;
mod persistence;
pub mod process;
Expand Down Expand Up @@ -260,6 +261,20 @@ impl CoreUser {
Ok(Some(self_user))
}

/// Executes a fallible closure `f` with a transaction.
///
/// Transaction is committed on success and rolled back on failure of the closure `f`.
pub(crate) async fn with_transaction<T>(
&self,
f: impl FnOnce(&Transaction) -> anyhow::Result<T>,
) -> anyhow::Result<T> {
let mut connection = self.inner.connection.lock().await;
let transaction = connection.transaction()?;
let value = f(&transaction)?;
transaction.commit()?;
Ok(value)
}

pub(crate) fn send_store_notification(&self, notification: StoreNotification) {
if !notification.ops.is_empty() {
self.inner.store_notifications_tx.notify(notification);
Expand Down Expand Up @@ -504,79 +519,6 @@ impl CoreUser {
Ok(conversation_messages)
}

/// Send a message and return it. Note that the message has already been
/// sent to the DS and has internally been stored in the conversation store.
pub async fn send_message(
&self,
conversation_id: ConversationId,
content: MimiContent,
) -> Result<ConversationMessage> {
// Phase 1: Load the conversation and group
let (group, params, conversation, mut conversation_message) = {
let mut notifier = self.store_notifier();
let mut connection = self.inner.connection.lock().await;
let mut transaction = connection.transaction()?;
let conversation =
Conversation::load(&transaction, &conversation_id)?.ok_or(anyhow!(
"Can't find conversation with id {}",
conversation_id.as_uuid()
))?;
let group_id = conversation.group_id();
// Store the message as unsent so that we don't lose it in case
// something goes wrong.
let conversation_message = ConversationMessage::new_unsent_message(
self.user_name().to_string(),
conversation_id,
content.clone(),
);
conversation_message.store(&transaction, &mut notifier)?;

// Notify as early as possible to react to the not yet sent message
notifier.notify();

let mut group = Group::load(&transaction, group_id)?
.ok_or(anyhow!("Can't find group with id {group_id:?}"))?;
let params = group.create_message(&transaction, content)?;
// Immediately write the group back. No need to wait for the DS to
// confirm as this is just an application message.
group.store_update(&transaction)?;
// Also, mark the message (and all messages preceeding it) as read.
let mut notifier = self.store_notifier();
Conversation::mark_as_read(
&mut transaction,
&mut notifier,
vec![(conversation.id(), conversation_message.timestamp())].into_iter(),
)?;
transaction.commit()?;
notifier.notify();
drop(connection);
(group, params, conversation, conversation_message)
};

// Phase 2: Send message to DS
let ds_timestamp = self
.inner
.api_clients
.get(&conversation.owner_domain())?
.ds_send_message(params, group.leaf_signer(), group.group_state_ear_key())
.await?;

// Phase 3: Mark the message as sent and read (again).
let mut connection = self.inner.connection.lock().await;
let mut notifier = self.store_notifier();
conversation_message.mark_as_sent(&connection, &mut notifier, ds_timestamp)?;
let mut transaction = connection.transaction()?;
Conversation::mark_as_read(
&mut transaction,
&mut notifier,
vec![(conversation.id(), conversation_message.timestamp())].into_iter(),
)?;
transaction.commit()?;
notifier.notify();

Ok(conversation_message)
}

/// Re-try sending a message, where sending previously failed.
pub async fn re_send_message(&self, local_message_id: Uuid) -> Result<()> {
// Phase 1: Load the unsent message
Expand All @@ -598,7 +540,10 @@ impl CoreUser {
let group_id = conversation.group_id();
let mut group = Group::load(&connection, group_id)?
.ok_or(anyhow!("Can't find group with id {:?}", group_id))?;
let params = group.create_message(&connection, content)?;
let params = {
let provider = PhnxOpenMlsProvider::new(&connection);
group.create_message(&provider, content)?
};
drop(connection);

// Phase 2: Send message to DS
Expand All @@ -614,9 +559,9 @@ impl CoreUser {
let mut notifier = self.store_notifier();
unsent_message.mark_as_sent(&connection, &mut notifier, ds_timestamp)?;
group.store_update(&connection)?;
let mut transaction = connection.transaction()?;
let transaction = connection.transaction()?;
Conversation::mark_as_read(
&mut transaction,
&transaction,
&mut notifier,
vec![(conversation.id(), unsent_message.timestamp())].into_iter(),
)?;
Expand Down Expand Up @@ -1161,9 +1106,9 @@ impl CoreUser {
mark_as_read_data: T,
) -> Result<(), rusqlite::Error> {
let mut connection = self.inner.connection.lock().await;
let mut transaction = connection.transaction()?;
let transaction = connection.transaction()?;
let mut notifier = self.store_notifier();
Conversation::mark_as_read(&mut transaction, &mut notifier, mark_as_read_data)?;
Conversation::mark_as_read(&transaction, &mut notifier, mark_as_read_data)?;
transaction.commit()?;
notifier.notify();
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion coreclient/src/conversations/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl Conversation {
/// [`ConversationId`]s to the given timestamps. This is used to mark all
/// messages up to this timestamp as read.
pub(crate) fn mark_as_read(
transaction: &mut Transaction,
transaction: &Transaction,
notifier: &mut StoreNotifier,
mark_as_read_data: impl IntoIterator<Item = (ConversationId, DateTime<Utc>)>,
) -> Result<(), rusqlite::Error> {
Expand Down
3 changes: 1 addition & 2 deletions coreclient/src/groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -848,10 +848,9 @@ impl Group {
/// Send an application message to the group.
pub(super) fn create_message(
&mut self,
connection: &Connection,
provider: &impl OpenMlsProvider,
content: MimiContent,
) -> Result<SendMessageParamsOut, GroupOperationError> {
let provider = &PhnxOpenMlsProvider::new(connection);
let mls_message = self.mls_group.create_message(
provider,
&self.leaf_signer,
Expand Down
Loading

0 comments on commit 1bb2b11

Please sign in to comment.