Skip to content

Commit

Permalink
Merge branch 'main' into dima/variable-font-weight
Browse files Browse the repository at this point in the history
  • Loading branch information
boxdot authored Feb 5, 2025
2 parents eb7aad9 + 19cbd97 commit 944699e
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 79 deletions.
117 changes: 95 additions & 22 deletions coreclient/src/clients/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
//
// SPDX-License-Identifier: AGPL-3.0-or-later

use anyhow::Context;
use anyhow::{bail, Context};
use openmls::storage::OpenMlsProvider;
use phnxtypes::{
identifiers::QualifiedUserName, messages::client_ds_out::SendMessageParamsOut, time::TimeStamp,
};
use rusqlite::{Connection, Transaction};
use uuid::Uuid;

use crate::{Conversation, ConversationId, ConversationMessage, MimiContent};
use crate::{Conversation, ConversationId, ConversationMessage, Message, MimiContent};

use super::{ApiClients, CoreUser, Group, PhnxOpenMlsProvider, StoreNotifier};

Expand All @@ -25,7 +26,7 @@ impl CoreUser {
) -> anyhow::Result<ConversationMessage> {
let unsent_group_message = self
.with_transaction(|transaction| {
InitialParams {
UnsentContent {
conversation_id,
content,
}
Expand All @@ -44,21 +45,43 @@ impl CoreUser {
})
.await
}

/// Re-try sending a message, where sending previously failed.
pub async fn re_send_message(&self, local_message_id: Uuid) -> anyhow::Result<()> {
let unsent_group_message = self
.with_transaction(|transaction| {
LocalMessage { local_message_id }
.load(transaction)?
.create_group_message(&PhnxOpenMlsProvider::new(transaction))
})
.await?;

let sent_message = unsent_group_message
.send_message_to_ds(&self.inner.api_clients)
.await?;

self.with_transaction(|transaction| {
sent_message.mark_as_sent_and_read(transaction, self.store_notifier())
})
.await?;

Ok(())
}
}

struct InitialParams {
struct UnsentContent {
conversation_id: ConversationId,
content: MimiContent,
}

impl InitialParams {
impl UnsentContent {
fn store_unsent_message(
self,
connection: &Connection,
mut notifier: StoreNotifier,
sender: &QualifiedUserName,
) -> anyhow::Result<UnsentMessage<WithContent>> {
let InitialParams {
) -> anyhow::Result<UnsentMessage<WithContent, GroupUpdateNeeded>> {
let UnsentContent {
conversation_id,
content,
} = self;
Expand All @@ -85,34 +108,80 @@ impl InitialParams {
conversation,
group,
conversation_message,
state: WithContent(content),
content: WithContent(content),
group_update: GroupUpdateNeeded,
})
}
}

// States of an unsent message
struct LocalMessage {
local_message_id: Uuid,
}

impl LocalMessage {
fn load(
self,
connection: &Connection,
) -> anyhow::Result<UnsentMessage<WithContent, GroupUpdated>> {
let Self { local_message_id } = self;

let conversation_message = ConversationMessage::load(connection, &local_message_id)?
.with_context(|| format!("Can't find unsent message with id {local_message_id}"))?;
let content = match conversation_message.message() {
Message::Content(content_message) if !content_message.was_sent() => {
content_message.content().clone()
}
Message::Content(_) => bail!("Message with id {local_message_id} was already sent"),
_ => bail!("Message with id {local_message_id} is not a content message"),
};
let conversation_id = conversation_message.conversation_id();
let conversation = Conversation::load(connection, &conversation_id)?
.with_context(|| format!("Can't find conversation with id {conversation_id}"))?;
let group_id = conversation.group_id();
let group = Group::load(connection, group_id)?
.with_context(|| format!("Can't find group with id {group_id:?}"))?;

let message = UnsentMessage {
conversation,
group,
conversation_message,
content: WithContent(content),
group_update: GroupUpdated,
};

Ok(message)
}
}

/// Message type state: Message with MIMI content
struct WithContent(MimiContent);
/// Message type state: Message with prepared send parameters
struct WithParams(SendMessageParamsOut);
struct StoredWithParams(SendMessageParamsOut);

struct UnsentMessage<State> {
/// Message type state: Group update needed before sending the message
struct GroupUpdateNeeded;
/// Message type state: Group already updated, message can be sent
struct GroupUpdated;

struct UnsentMessage<State, GroupUpdate> {
conversation: Conversation,
group: Group,
conversation_message: ConversationMessage,
state: State,
content: State,
group_update: GroupUpdate,
}

impl UnsentMessage<WithContent> {
impl<GroupUpdate> UnsentMessage<WithContent, GroupUpdate> {
fn create_group_message(
self,
provider: &impl OpenMlsProvider,
) -> anyhow::Result<UnsentMessage<WithParams>> {
) -> anyhow::Result<UnsentMessage<WithParams, GroupUpdate>> {
let Self {
conversation,
mut group,
conversation_message,
state: WithContent(content),
content: WithContent(content),
group_update,
} = self;

let params = group.create_message(provider, content)?;
Expand All @@ -121,22 +190,24 @@ impl UnsentMessage<WithContent> {
conversation,
conversation_message,
group,
state: WithParams(params),
content: WithParams(params),
group_update,
})
}
}

impl UnsentMessage<WithParams> {
impl UnsentMessage<WithParams, GroupUpdateNeeded> {
fn store_group_update(
self,
transaction: &Transaction,
mut notifier: StoreNotifier,
) -> anyhow::Result<UnsentMessage<StoredWithParams>> {
) -> anyhow::Result<UnsentMessage<WithParams, GroupUpdated>> {
let Self {
conversation,
group,
conversation_message,
state: WithParams(params),
content: WithParams(params),
group_update: GroupUpdateNeeded,
} = self;

// Immediately write the group back. No need to wait for the DS to
Expand All @@ -155,18 +226,20 @@ impl UnsentMessage<WithParams> {
conversation,
group,
conversation_message,
state: StoredWithParams(params),
content: WithParams(params),
group_update: GroupUpdated,
})
}
}

impl UnsentMessage<StoredWithParams> {
impl UnsentMessage<WithParams, GroupUpdated> {
async fn send_message_to_ds(self, api_clients: &ApiClients) -> anyhow::Result<SentMessage> {
let Self {
conversation,
conversation_message,
group,
state: StoredWithParams(params),
content: WithParams(params),
group_update: GroupUpdated,
} = self;

let ds_timestamp = api_clients
Expand Down
58 changes: 1 addition & 57 deletions coreclient/src/clients/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ use store::ClientRecord;
use thiserror::Error;
use tokio_stream::Stream;
use tracing::{error, info};
use uuid::Uuid;

use crate::store::StoreNotificationsSender;
use crate::utils::persistence::{SqliteConnection, Storable};
use crate::{
clients::connection_establishment::{ConnectionEstablishmentPackageTbs, FriendshipPackage},
contacts::{Contact, ContactAddInfos, PartialContact},
Expand All @@ -75,10 +75,6 @@ use crate::{
Asset,
};
use crate::{key_stores::as_credentials::AsCredentials, ConversationId};
use crate::{
utils::persistence::{SqliteConnection, Storable},
Message,
};

use self::{api_clients::ApiClients, create_user::InitialUserState, store::UserCreationState};

Expand Down Expand Up @@ -522,58 +518,6 @@ impl CoreUser {
Ok(conversation_messages)
}

/// Re-try sending a message, where sending previously failed.
pub async fn re_send_message(&self, local_message_id: Uuid) -> Result<()> {
// Phase 1: Load the unsent message
let connection = self.inner.connection.lock().await;
let mut unsent_message = ConversationMessage::load(&connection, &local_message_id)?.ok_or(
anyhow!("Can't find unsent message with id {}", local_message_id),
)?;
let content = match unsent_message.message() {
Message::Content(content_message) if !content_message.was_sent() => {
content_message.content().clone()
}
_ => bail!("Message with id {} was already sent", local_message_id),
};
let conversation_id = unsent_message.conversation_id();
let conversation = Conversation::load(&connection, &conversation_id)?.ok_or(anyhow!(
"Can't find conversation with id {}",
conversation_id.as_uuid()
))?;
let group_id = conversation.group_id();
let mut group = Group::load(&connection, group_id)?
.ok_or(anyhow!("Can't find group with id {:?}", group_id))?;
let params = {
let provider = PhnxOpenMlsProvider::new(&connection);
group.create_message(&provider, content)?
};
drop(connection);

// Phase 2: Send message to DS
let ds_timestamp = self
.inner
.api_clients
.get(&conversation.owner_domain())?
.ds_send_message(params, group.leaf_signer(), group.group_state_ear_key())
.await?;

// Phase 3: Merge the commit into the group & update conversation
let mut connection = self.inner.connection.lock().await;
let mut notifier = self.store_notifier();
unsent_message.mark_as_sent(&connection, &mut notifier, ds_timestamp)?;
group.store_update(&connection)?;
let transaction = connection.transaction()?;
Conversation::mark_as_read(
&transaction,
&mut notifier,
vec![(conversation.id(), unsent_message.timestamp())].into_iter(),
)?;
transaction.commit()?;
notifier.notify();

Ok(())
}

/// Create a connection with a new user.
///
/// Returns the [`ConversationId`] of the newly created connection
Expand Down

0 comments on commit 944699e

Please sign in to comment.