Skip to content

Commit 837a722

Browse files
authored
refactor: send message flow as a state machine (#300)
1 parent c24246c commit 837a722

File tree

6 files changed

+243
-85
lines changed

6 files changed

+243
-85
lines changed

coreclient/src/clients/message.rs

Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
// SPDX-FileCopyrightText: 2025 Phoenix R&D GmbH <[email protected]>
2+
//
3+
// SPDX-License-Identifier: AGPL-3.0-or-later
4+
5+
use anyhow::Context;
6+
use openmls::storage::OpenMlsProvider;
7+
use phnxtypes::{
8+
identifiers::QualifiedUserName, messages::client_ds_out::SendMessageParamsOut, time::TimeStamp,
9+
};
10+
use rusqlite::{Connection, Transaction};
11+
12+
use crate::{Conversation, ConversationId, ConversationMessage, MimiContent};
13+
14+
use super::{ApiClients, CoreUser, Group, PhnxOpenMlsProvider, StoreNotifier};
15+
16+
impl CoreUser {
17+
/// Send a message and return it.
18+
///
19+
/// The message unsent messages is stored, then sent to the DS and finally returned. The
20+
/// converstion is marked as read until this message.
21+
pub(crate) async fn send_message(
22+
&self,
23+
conversation_id: ConversationId,
24+
content: MimiContent,
25+
) -> anyhow::Result<ConversationMessage> {
26+
let unsent_group_message = self
27+
.with_transaction(|transaction| {
28+
InitialParams {
29+
conversation_id,
30+
content,
31+
}
32+
.store_unsent_message(transaction, self.store_notifier(), &self.user_name())?
33+
.create_group_message(&PhnxOpenMlsProvider::new(transaction))?
34+
.store_group_update(transaction, self.store_notifier())
35+
})
36+
.await?;
37+
38+
let sent_message = unsent_group_message
39+
.send_message_to_ds(&self.inner.api_clients)
40+
.await?;
41+
42+
self.with_transaction(|transaction| {
43+
sent_message.mark_as_sent_and_read(transaction, self.store_notifier())
44+
})
45+
.await
46+
}
47+
}
48+
49+
struct InitialParams {
50+
conversation_id: ConversationId,
51+
content: MimiContent,
52+
}
53+
54+
impl InitialParams {
55+
fn store_unsent_message(
56+
self,
57+
connection: &Connection,
58+
mut notifier: StoreNotifier,
59+
sender: &QualifiedUserName,
60+
) -> anyhow::Result<UnsentMessage<WithContent>> {
61+
let InitialParams {
62+
conversation_id,
63+
content,
64+
} = self;
65+
66+
let conversation = Conversation::load(connection, &conversation_id)?
67+
.with_context(|| format!("Can't find conversation with id {conversation_id}"))?;
68+
// Store the message as unsent so that we don't lose it in case
69+
// something goes wrong.
70+
let conversation_message = ConversationMessage::new_unsent_message(
71+
sender.to_string(),
72+
conversation_id,
73+
content.clone(),
74+
);
75+
conversation_message.store(connection, &mut notifier)?;
76+
77+
let group_id = conversation.group_id();
78+
let group = Group::load(connection, group_id)?
79+
.with_context(|| format!("Can't find group with id {group_id:?}"))?;
80+
81+
// Notify as early as possible to react to the not yet sent message
82+
notifier.notify();
83+
84+
Ok(UnsentMessage {
85+
conversation,
86+
group,
87+
conversation_message,
88+
state: WithContent(content),
89+
})
90+
}
91+
}
92+
93+
// States of an unsent message
94+
95+
struct WithContent(MimiContent);
96+
struct WithParams(SendMessageParamsOut);
97+
struct StoredWithParams(SendMessageParamsOut);
98+
99+
struct UnsentMessage<State> {
100+
conversation: Conversation,
101+
group: Group,
102+
conversation_message: ConversationMessage,
103+
state: State,
104+
}
105+
106+
impl UnsentMessage<WithContent> {
107+
fn create_group_message(
108+
self,
109+
provider: &impl OpenMlsProvider,
110+
) -> anyhow::Result<UnsentMessage<WithParams>> {
111+
let Self {
112+
conversation,
113+
mut group,
114+
conversation_message,
115+
state: WithContent(content),
116+
} = self;
117+
118+
let params = group.create_message(provider, content)?;
119+
120+
Ok(UnsentMessage {
121+
conversation,
122+
conversation_message,
123+
group,
124+
state: WithParams(params),
125+
})
126+
}
127+
}
128+
129+
impl UnsentMessage<WithParams> {
130+
fn store_group_update(
131+
self,
132+
transaction: &Transaction,
133+
mut notifier: StoreNotifier,
134+
) -> anyhow::Result<UnsentMessage<StoredWithParams>> {
135+
let Self {
136+
conversation,
137+
group,
138+
conversation_message,
139+
state: WithParams(params),
140+
} = self;
141+
142+
// Immediately write the group back. No need to wait for the DS to
143+
// confirm as this is just an application message.
144+
group.store_update(transaction)?;
145+
// Also, mark the message (and all messages preceeding it) as read.
146+
Conversation::mark_as_read_until_message_id(
147+
transaction,
148+
&mut notifier,
149+
conversation.id(),
150+
conversation_message.id(),
151+
)?;
152+
notifier.notify();
153+
154+
Ok(UnsentMessage {
155+
conversation,
156+
group,
157+
conversation_message,
158+
state: StoredWithParams(params),
159+
})
160+
}
161+
}
162+
163+
impl UnsentMessage<StoredWithParams> {
164+
async fn send_message_to_ds(self, api_clients: &ApiClients) -> anyhow::Result<SentMessage> {
165+
let Self {
166+
conversation,
167+
conversation_message,
168+
group,
169+
state: StoredWithParams(params),
170+
} = self;
171+
172+
let ds_timestamp = api_clients
173+
.get(&conversation.owner_domain())?
174+
.ds_send_message(params, group.leaf_signer(), group.group_state_ear_key())
175+
.await?;
176+
177+
Ok(SentMessage {
178+
conversation_message,
179+
ds_timestamp,
180+
})
181+
}
182+
}
183+
184+
struct SentMessage {
185+
conversation_message: ConversationMessage,
186+
ds_timestamp: TimeStamp,
187+
}
188+
189+
impl SentMessage {
190+
fn mark_as_sent_and_read(
191+
self,
192+
transaction: &Transaction,
193+
mut notifier: StoreNotifier,
194+
) -> anyhow::Result<ConversationMessage> {
195+
let Self {
196+
mut conversation_message,
197+
ds_timestamp,
198+
} = self;
199+
200+
conversation_message.mark_as_sent(transaction, &mut notifier, ds_timestamp)?;
201+
Conversation::mark_as_read_until_message_id(
202+
transaction,
203+
&mut notifier,
204+
conversation_message.conversation_id(),
205+
conversation_message.id(),
206+
)?;
207+
208+
notifier.notify();
209+
210+
Ok(conversation_message)
211+
}
212+
}

coreclient/src/clients/mod.rs

Lines changed: 24 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ use tokio_stream::Stream;
5353
use tracing::{error, info};
5454
use uuid::Uuid;
5555

56+
use crate::store::StoreNotificationsSender;
5657
use crate::{
5758
clients::connection_establishment::{ConnectionEstablishmentPackageTbs, FriendshipPackage},
5859
contacts::{Contact, ContactAddInfos, PartialContact},
@@ -75,7 +76,6 @@ use crate::{
7576
Asset,
7677
};
7778
use crate::{key_stores::as_credentials::AsCredentials, ConversationId};
78-
use crate::{mimi_content::MimiContent, store::StoreNotificationsSender};
7979
use crate::{
8080
utils::persistence::{SqliteConnection, Storable},
8181
Message,
@@ -87,6 +87,7 @@ pub(crate) mod api_clients;
8787
pub(crate) mod connection_establishment;
8888
pub mod conversations;
8989
mod create_user;
90+
mod message;
9091
pub(crate) mod own_client_info;
9192
mod persistence;
9293
pub mod process;
@@ -260,6 +261,20 @@ impl CoreUser {
260261
Ok(Some(self_user))
261262
}
262263

264+
/// Executes a fallible closure `f` with a transaction.
265+
///
266+
/// Transaction is committed on success and rolled back on failure of the closure `f`.
267+
pub(crate) async fn with_transaction<T>(
268+
&self,
269+
f: impl FnOnce(&Transaction) -> anyhow::Result<T>,
270+
) -> anyhow::Result<T> {
271+
let mut connection = self.inner.connection.lock().await;
272+
let transaction = connection.transaction()?;
273+
let value = f(&transaction)?;
274+
transaction.commit()?;
275+
Ok(value)
276+
}
277+
263278
pub(crate) fn send_store_notification(&self, notification: StoreNotification) {
264279
if !notification.ops.is_empty() {
265280
self.inner.store_notifications_tx.notify(notification);
@@ -504,79 +519,6 @@ impl CoreUser {
504519
Ok(conversation_messages)
505520
}
506521

507-
/// Send a message and return it. Note that the message has already been
508-
/// sent to the DS and has internally been stored in the conversation store.
509-
pub async fn send_message(
510-
&self,
511-
conversation_id: ConversationId,
512-
content: MimiContent,
513-
) -> Result<ConversationMessage> {
514-
// Phase 1: Load the conversation and group
515-
let (group, params, conversation, mut conversation_message) = {
516-
let mut notifier = self.store_notifier();
517-
let mut connection = self.inner.connection.lock().await;
518-
let mut transaction = connection.transaction()?;
519-
let conversation =
520-
Conversation::load(&transaction, &conversation_id)?.ok_or(anyhow!(
521-
"Can't find conversation with id {}",
522-
conversation_id.as_uuid()
523-
))?;
524-
let group_id = conversation.group_id();
525-
// Store the message as unsent so that we don't lose it in case
526-
// something goes wrong.
527-
let conversation_message = ConversationMessage::new_unsent_message(
528-
self.user_name().to_string(),
529-
conversation_id,
530-
content.clone(),
531-
);
532-
conversation_message.store(&transaction, &mut notifier)?;
533-
534-
// Notify as early as possible to react to the not yet sent message
535-
notifier.notify();
536-
537-
let mut group = Group::load(&transaction, group_id)?
538-
.ok_or(anyhow!("Can't find group with id {group_id:?}"))?;
539-
let params = group.create_message(&transaction, content)?;
540-
// Immediately write the group back. No need to wait for the DS to
541-
// confirm as this is just an application message.
542-
group.store_update(&transaction)?;
543-
// Also, mark the message (and all messages preceeding it) as read.
544-
let mut notifier = self.store_notifier();
545-
Conversation::mark_as_read(
546-
&mut transaction,
547-
&mut notifier,
548-
vec![(conversation.id(), conversation_message.timestamp())].into_iter(),
549-
)?;
550-
transaction.commit()?;
551-
notifier.notify();
552-
drop(connection);
553-
(group, params, conversation, conversation_message)
554-
};
555-
556-
// Phase 2: Send message to DS
557-
let ds_timestamp = self
558-
.inner
559-
.api_clients
560-
.get(&conversation.owner_domain())?
561-
.ds_send_message(params, group.leaf_signer(), group.group_state_ear_key())
562-
.await?;
563-
564-
// Phase 3: Mark the message as sent and read (again).
565-
let mut connection = self.inner.connection.lock().await;
566-
let mut notifier = self.store_notifier();
567-
conversation_message.mark_as_sent(&connection, &mut notifier, ds_timestamp)?;
568-
let mut transaction = connection.transaction()?;
569-
Conversation::mark_as_read(
570-
&mut transaction,
571-
&mut notifier,
572-
vec![(conversation.id(), conversation_message.timestamp())].into_iter(),
573-
)?;
574-
transaction.commit()?;
575-
notifier.notify();
576-
577-
Ok(conversation_message)
578-
}
579-
580522
/// Re-try sending a message, where sending previously failed.
581523
pub async fn re_send_message(&self, local_message_id: Uuid) -> Result<()> {
582524
// Phase 1: Load the unsent message
@@ -598,7 +540,10 @@ impl CoreUser {
598540
let group_id = conversation.group_id();
599541
let mut group = Group::load(&connection, group_id)?
600542
.ok_or(anyhow!("Can't find group with id {:?}", group_id))?;
601-
let params = group.create_message(&connection, content)?;
543+
let params = {
544+
let provider = PhnxOpenMlsProvider::new(&connection);
545+
group.create_message(&provider, content)?
546+
};
602547
drop(connection);
603548

604549
// Phase 2: Send message to DS
@@ -614,9 +559,9 @@ impl CoreUser {
614559
let mut notifier = self.store_notifier();
615560
unsent_message.mark_as_sent(&connection, &mut notifier, ds_timestamp)?;
616561
group.store_update(&connection)?;
617-
let mut transaction = connection.transaction()?;
562+
let transaction = connection.transaction()?;
618563
Conversation::mark_as_read(
619-
&mut transaction,
564+
&transaction,
620565
&mut notifier,
621566
vec![(conversation.id(), unsent_message.timestamp())].into_iter(),
622567
)?;
@@ -1161,9 +1106,9 @@ impl CoreUser {
11611106
mark_as_read_data: T,
11621107
) -> Result<(), rusqlite::Error> {
11631108
let mut connection = self.inner.connection.lock().await;
1164-
let mut transaction = connection.transaction()?;
1109+
let transaction = connection.transaction()?;
11651110
let mut notifier = self.store_notifier();
1166-
Conversation::mark_as_read(&mut transaction, &mut notifier, mark_as_read_data)?;
1111+
Conversation::mark_as_read(&transaction, &mut notifier, mark_as_read_data)?;
11671112
transaction.commit()?;
11681113
notifier.notify();
11691114
Ok(())

coreclient/src/conversations/persistence.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ impl Conversation {
143143
/// [`ConversationId`]s to the given timestamps. This is used to mark all
144144
/// messages up to this timestamp as read.
145145
pub(crate) fn mark_as_read(
146-
transaction: &mut Transaction,
146+
transaction: &Transaction,
147147
notifier: &mut StoreNotifier,
148148
mark_as_read_data: impl IntoIterator<Item = (ConversationId, DateTime<Utc>)>,
149149
) -> Result<(), rusqlite::Error> {

coreclient/src/groups/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -848,10 +848,9 @@ impl Group {
848848
/// Send an application message to the group.
849849
pub(super) fn create_message(
850850
&mut self,
851-
connection: &Connection,
851+
provider: &impl OpenMlsProvider,
852852
content: MimiContent,
853853
) -> Result<SendMessageParamsOut, GroupOperationError> {
854-
let provider = &PhnxOpenMlsProvider::new(connection);
855854
let mls_message = self.mls_group.create_message(
856855
provider,
857856
&self.leaf_signer,

0 commit comments

Comments
 (0)