Skip to content

Commit 75ad78b

Browse files
authored
Merge branch 'main' into dima/fix-record-delete
2 parents df549ff + d6b1d56 commit 75ad78b

File tree

11 files changed

+109
-73
lines changed

11 files changed

+109
-73
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ tracing-subscriber = { version = "0.3", features = [
5353
"parking_lot",
5454
] }
5555

56+
tokio-util = "0.7.13"
57+
58+
5659
[patch.crates-io]
5760
#opaque-ke = { git = "https://github.com/facebook/opaque-ke", branch = "dependabot/cargo/voprf-eq-0.5.0" }
5861

apiclient/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ reqwest = { workspace = true }
1414
thiserror = "1"
1515
phnxtypes = { path = "../types" }
1616
tokio = { version = "1.18.2", features = ["macros"] }
17+
tokio-util = { workspace = true }
1718
tokio-tungstenite = { version = "0.23", features = ["rustls-tls-webpki-roots"] }
1819
futures-util = "0.3.21"
1920
http = "1"
@@ -23,6 +24,7 @@ mls-assist = { workspace = true }
2324
privacypass = { workspace = true }
2425
tls_codec = { workspace = true }
2526
url = "2"
27+
uuid = { version = "1", features = ["v4"] }
2628

2729
[dev-dependencies]
2830
tokio = { version = "1.18.2", features = ["macros"] }

apiclient/src/qs_api/tests.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use phnxtypes::{
2020
messages::{client_ds::QsWsMessage, client_qs::QsOpenWsParams},
2121
};
2222
use tls_codec::Serialize;
23+
use tokio_util::sync::CancellationToken;
2324
use tracing::{error, info};
2425
use uuid::Uuid;
2526

@@ -49,8 +50,9 @@ async fn ws_lifecycle() {
4950
let client = ApiClient::with_default_http_client(address).expect("Failed to initialize client");
5051

5152
// Spawn the websocket connection task
53+
let cancel = CancellationToken::new();
5254
let mut ws = client
53-
.spawn_websocket(queue_id, timeout, retry_interval)
55+
.spawn_websocket(queue_id, timeout, retry_interval, cancel)
5456
.await
5557
.expect("Failed to execute request");
5658

apiclient/src/qs_api/ws.rs

Lines changed: 63 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,17 @@ use thiserror::*;
1818
use tls_codec::DeserializeBytes;
1919
use tokio::{
2020
net::TcpStream,
21-
sync::broadcast::{self, Receiver, Sender},
22-
task::JoinHandle,
21+
sync::mpsc,
2322
time::{sleep, Instant},
2423
};
2524
use tokio_tungstenite::{
2625
connect_async,
2726
tungstenite::{client::IntoClientRequest, protocol::Message},
2827
MaybeTlsStream, WebSocketStream,
2928
};
29+
use tokio_util::sync::{CancellationToken, DropGuard};
3030
use tracing::{error, info};
31+
use uuid::Uuid;
3132

3233
use crate::{ApiClient, Protocol};
3334

@@ -53,9 +54,12 @@ impl ConnectionStatus {
5354
Self { connected: false }
5455
}
5556

56-
fn set_connected(&mut self, tx: &Sender<WsEvent>) -> Result<(), ConnectionStatusError> {
57+
async fn set_connected(
58+
&mut self,
59+
tx: &mpsc::Sender<WsEvent>,
60+
) -> Result<(), ConnectionStatusError> {
5761
if !self.connected {
58-
if let Err(error) = tx.send(WsEvent::ConnectedEvent) {
62+
if let Err(error) = tx.send(WsEvent::ConnectedEvent).await {
5963
error!(%error, "Error sending to channel");
6064
self.connected = false;
6165
return Err(ConnectionStatusError::ChannelClosed);
@@ -65,9 +69,12 @@ impl ConnectionStatus {
6569
Ok(())
6670
}
6771

68-
fn set_disconnected(&mut self, tx: &Sender<WsEvent>) -> Result<(), ConnectionStatusError> {
72+
async fn set_disconnected(
73+
&mut self,
74+
tx: &mpsc::Sender<WsEvent>,
75+
) -> Result<(), ConnectionStatusError> {
6976
if self.connected {
70-
if let Err(error) = tx.send(WsEvent::DisconnectedEvent) {
77+
if let Err(error) = tx.send(WsEvent::DisconnectedEvent).await {
7178
error!(%error, "Error sending to channel");
7279
return Err(ConnectionStatusError::ChannelClosed);
7380
}
@@ -79,48 +86,30 @@ impl ConnectionStatus {
7986

8087
/// A websocket connection to the QS server. See the
8188
/// [`ApiClient::spawn_websocket`] method for more information.
89+
///
90+
/// When dropped, the websocket connection will be closed.
8291
pub struct QsWebSocket {
83-
rx: Receiver<WsEvent>,
84-
tx: Sender<WsEvent>,
85-
handle: JoinHandle<()>,
92+
rx: mpsc::Receiver<WsEvent>,
93+
_cancel: DropGuard,
8694
}
8795

8896
impl QsWebSocket {
8997
/// Returns the next [`WsEvent`] event. This will block until an event is
9098
/// sent or the connection is closed (in which case a final `None` is
9199
/// returned).
92100
pub async fn next(&mut self) -> Option<WsEvent> {
93-
match self.rx.recv().await {
94-
Ok(message) => Some(message),
95-
Err(error) => {
96-
error!(%error, "Error receiving from channel");
97-
None
98-
}
99-
}
100-
}
101-
102-
/// Subscribe to the event stream
103-
pub fn subscribe(&self) -> Receiver<WsEvent> {
104-
self.tx.subscribe()
105-
}
106-
107-
/// Join the websocket connection task. This will block until the task has
108-
/// completed.
109-
pub async fn join(self) -> Result<(), tokio::task::JoinError> {
110-
self.handle.await
111-
}
112-
113-
/// Abort the websocket connection task. This will close the websocket connection.
114-
pub fn abort(&mut self) {
115-
self.handle.abort();
101+
self.rx.recv().await
116102
}
117103

118104
/// Internal helper function to handle an established websocket connection
105+
///
106+
/// Returns `true` if the connection should be re-established, otherwise `false`.
119107
async fn handle_connection(
120108
ws_stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
121-
tx: &Sender<WsEvent>,
109+
tx: &mpsc::Sender<WsEvent>,
122110
timeout: u64,
123-
) {
111+
cancel: &CancellationToken,
112+
) -> bool {
124113
let mut last_ping = Instant::now();
125114

126115
// Watchdog to monitor the connection.
@@ -131,26 +120,31 @@ impl QsWebSocket {
131120

132121
// Initialize the connection status
133122
let mut connection_status = ConnectionStatus::new();
134-
if connection_status.set_connected(tx).is_err() {
123+
if connection_status.set_connected(tx).await.is_err() {
135124
// Close the stream if all subscribers of the watch have been dropped
136125
let _ = ws_stream.close().await;
137-
return;
126+
return false;
138127
}
139128

140129
// Loop while the connection is open
141130
loop {
142131
tokio::select! {
132+
// Check is the handler is cancelled
133+
_ = cancel.cancelled() => {
134+
info!("QS WebSocket connection cancelled");
135+
break false;
136+
},
143137
// Check if the connection is still alive
144138
_ = interval.tick() => {
145139
let now = Instant::now();
146140
// Check if we have reached the timeout
147141
if now.duration_since(last_ping) > Duration::from_secs(timeout) {
148142
// Change the status to Disconnected and send an event
149143
let _ = ws_stream.close().await;
150-
if connection_status.set_disconnected(tx).is_err() {
144+
if connection_status.set_disconnected(tx).await.is_err() {
151145
// Close the stream if all subscribers of the watch have been dropped
152146
info!("Closing the connection because all subscribers are dropped");
153-
return;
147+
return false;
154148
}
155149
}
156150
},
@@ -163,53 +157,53 @@ impl QsWebSocket {
163157
// Reset the last ping time
164158
last_ping = Instant::now();
165159
// Change the status to Connected and send an event
166-
if connection_status.set_connected(tx).is_err() {
160+
if connection_status.set_connected(tx).await.is_err() {
167161
// Close the stream if all subscribers of the watch have been dropped
168162
info!("Closing the connection because all subscribers are dropped");
169163
let _ = ws_stream.close().await;
170-
return;
164+
return false;
171165
}
172166
// Try to deserialize the message
173167
if let Ok(QsWsMessage::QueueUpdate) =
174168
QsWsMessage::tls_deserialize_exact_bytes(&data)
175169
{
176170
// We received a new message notification from the QS
177171
// Send the event to the channel
178-
if tx.send(WsEvent::MessageEvent(QsWsMessage::QueueUpdate)).is_err() {
172+
if tx.send(WsEvent::MessageEvent(QsWsMessage::QueueUpdate)).await.is_err() {
179173
info!("Closing the connection because all subscribers are dropped");
180174
// Close the stream if all subscribers of the watch have been dropped
181175
let _ = ws_stream.close().await;
182-
return;
176+
return false;
183177
}
184178
}
185179
},
186180
// We received a ping
187181
Message::Ping(_) => {
188182
// We update the last ping time
189183
last_ping = Instant::now();
190-
if connection_status.set_connected(tx).is_err() {
184+
if connection_status.set_connected(tx).await.is_err() {
191185
// Close the stream if all subscribers of the watch have been dropped
192186
info!("Closing the connection because all subscribers are dropped");
193187
let _ = ws_stream.close().await;
194-
return;
188+
return false;
195189
}
196190
}
197191
Message::Close(_) => {
198192
// Change the status to Disconnected and send an
199193
// event
200-
let _ = connection_status.set_disconnected(tx);
194+
let _ = connection_status.set_disconnected(tx).await;
201195
// We close the websocket
202196
let _ = ws_stream.close().await;
203-
return;
197+
return true;
204198
}
205199
_ => {
206200
}
207201
}
208202
} else {
209203
// It seems the connection is closed, send disconnect
210204
// event
211-
let _ = connection_status.set_disconnected(tx);
212-
break;
205+
let _ = connection_status.set_disconnected(tx).await;
206+
break true;
213207
}
214208
},
215209
}
@@ -255,13 +249,14 @@ impl ApiClient {
255249
/// [`WsEvent::ConnectedEvent].
256250
///
257251
/// The connection will be closed if all subscribers of the [`QsWebSocket`]
258-
/// have been dropped, or when it is manually closed with using the
259-
/// [`QsWebSocket::abort()`] function.
252+
/// have been dropped, or when it is manually closed by cancelling the token
253+
/// `cancel`.
260254
///
261255
/// # Arguments
262256
/// - `queue_id` - The ID of the queue monitor.
263257
/// - `timeout` - The timeout for the connection in seconds.
264258
/// - `retry_interval` - The interval between connection attempts in seconds.
259+
/// - `cancel` - The cancellation token to stop the socket.
265260
///
266261
/// # Returns
267262
/// A new [`QsWebSocket`] that represents the websocket connection.
@@ -270,6 +265,7 @@ impl ApiClient {
270265
queue_id: QsClientId,
271266
timeout: u64,
272267
retry_interval: u64,
268+
cancel: CancellationToken,
273269
) -> Result<QsWebSocket, SpawnWsError> {
274270
// Set the request parameter
275271
let qs_ws_open_params = QsOpenWsParams { queue_id };
@@ -289,19 +285,19 @@ impl ApiClient {
289285
})?;
290286

291287
// We create a channel to send events to
292-
let (tx, rx) = broadcast::channel(100);
288+
let (tx, rx) = mpsc::channel(100);
293289

294-
// We clone the sender, so that we can subscribe to more receivers
295-
let tx_clone = tx.clone();
290+
let connection_id = Uuid::new_v4();
291+
info!(%connection_id, "Spawning the websocket connection...");
296292

297-
info!("Spawning the websocket connection...");
293+
let cancel_guard = cancel.clone().drop_guard();
298294

299295
// Spawn the connection task
300-
let handle = tokio::spawn(async move {
296+
tokio::spawn(async move {
301297
// Connection loop
302298
#[cfg(test)]
303299
let mut counter = 0;
304-
loop {
300+
while !cancel.is_cancelled() {
305301
// We build the request and set a custom header
306302
let req = match address.clone().into_client_request() {
307303
Ok(mut req) => {
@@ -319,13 +315,15 @@ impl ApiClient {
319315
match connect_async(req).await {
320316
// The connection was established
321317
Ok((ws_stream, _)) => {
322-
info!("Connected to QS WebSocket");
318+
info!(%connection_id, "Connected to QS WebSocket");
323319
// Hand over the connection to the handler
324-
QsWebSocket::handle_connection(ws_stream, &tx, timeout).await;
320+
if !QsWebSocket::handle_connection(ws_stream, &tx, timeout, &cancel).await {
321+
break;
322+
}
325323
}
326324
// The connection was not established, wait and try again
327-
Err(e) => {
328-
error!("Error connecting to QS WebSocket: {}", e);
325+
Err(error) => {
326+
error!(%error, "Error connecting to QS WebSocket");
329327
#[cfg(test)]
330328
{
331329
counter += 1;
@@ -336,17 +334,20 @@ impl ApiClient {
336334
}
337335
}
338336
info!(
337+
%connection_id,
339338
retry_in_sec = retry_interval,
339+
is_cancelled = cancel.is_cancelled(),
340340
"The websocket was closed, will reconnect...",
341341
);
342342
sleep(time::Duration::from_secs(retry_interval)).await;
343343
}
344+
345+
info!(%connection_id, "QS WebSocket closed");
344346
});
345347

346348
Ok(QsWebSocket {
347349
rx,
348-
tx: tx_clone,
349-
handle,
350+
_cancel: cancel_guard,
350351
})
351352
}
352353
}

applogic/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,6 @@ flutter_rust_bridge = { version = "=2.7.0", features = ["chrono", "uuid"] }
3737
notify-rust = "4"
3838
chrono = { workspace = true }
3939
jni = "0.21"
40-
tokio-util = "0.7.13"
40+
tokio-util = { workspace = true }
4141
tokio-stream = "0.1.17"
4242
blake3 = "1.5.5"

0 commit comments

Comments
 (0)