Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ where
let (current_state_handler, initial_state) = if is_offline {
OfflineState::enter()
} else {
SyncingState::enter(&shared_state, 0)
SyncingState::enter(&shared_state)
};

let public_initial_state = AccountControllerState::from(initial_state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl<C: ConnectivityMonitor> AccountControllerStateHandler<C> for ErrorState {
tracing::debug!("VPN API is firewalled, timed account syncing skipped");
return NextAccountControllerState::NewState(ErrorState::enter(self.reason));
} else {
return NextAccountControllerState::NewState(SyncingState::enter(shared_state, 0));
return NextAccountControllerState::NewState(SyncingState::enter(shared_state));
}
},
Some(command) = command_rx.recv() => {
Expand All @@ -82,7 +82,7 @@ impl<C: ConnectivityMonitor> AccountControllerStateHandler<C> for ErrorState {
let error = res.is_err();
return_sender.send(res);
if error {
return NextAccountControllerState::NewState(SyncingState::enter(shared_state, 0));
return NextAccountControllerState::NewState(SyncingState::enter(shared_state));
} else {
return NextAccountControllerState::NewState(LoggedOutState::enter());
}
Expand All @@ -104,15 +104,15 @@ impl<C: ConnectivityMonitor> AccountControllerStateHandler<C> for ErrorState {
if error {
return NextAccountControllerState::SameState(self);
} else {
return NextAccountControllerState::NewState(SyncingState::enter(shared_state, 0));
return NextAccountControllerState::NewState(SyncingState::enter(shared_state));
}
},
AccountCommand::RefreshAccountState(return_sender) => {
return_sender.send(Ok(()));
if shared_state.firewall_active {
return NextAccountControllerState::SameState(self);
} else {
return NextAccountControllerState::NewState(SyncingState::enter(shared_state, 0));
return NextAccountControllerState::NewState(SyncingState::enter(shared_state));
}
},

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ impl<C: ConnectivityMonitor> AccountControllerStateHandler<C> for LoggedOutState
match command {
AccountCommand::CreateAccount(return_sender) => {
return_sender.send(handler::handle_create_account(shared_state).await);
return NextAccountControllerState::NewState(SyncingState::enter(shared_state, 0));
return NextAccountControllerState::NewState(SyncingState::enter(shared_state));
}
AccountCommand::StoreAccount(return_sender, storable_account) => {
return if let Err(e) = handler::handle_store_account(shared_state, storable_account).await{
return_sender.send(Err(e));
NextAccountControllerState::SameState(self)
} else {
return_sender.send(Ok(()));
NextAccountControllerState::NewState(SyncingState::enter(shared_state, 0))
NextAccountControllerState::NewState(SyncingState::enter(shared_state))
}
},
AccountCommand::ForgetAccount(return_sender) => return_sender.send(Ok(())),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl<C: ConnectivityMonitor> AccountControllerStateHandler<C> for OfflineState {
if connectivity.is_offline() {
NextAccountControllerState::SameState(self)
} else {
NextAccountControllerState::NewState(SyncingState::enter(shared_state, 0))
NextAccountControllerState::NewState(SyncingState::enter(shared_state))
}
}
_ = shutdown_token.cancelled() => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl<C: ConnectivityMonitor> AccountControllerStateHandler<C> for PendingSubscri
tracing::debug!("VPN API is firewalled, timed account syncing skipped");
return NextAccountControllerState::NewState(PendingSubscriptionState::enter());
} else {
return NextAccountControllerState::NewState(SyncingState::enter(shared_state, 0));
return NextAccountControllerState::NewState(SyncingState::enter(shared_state));
}
},
Some(command) = command_rx.recv() => {
Expand All @@ -73,7 +73,7 @@ impl<C: ConnectivityMonitor> AccountControllerStateHandler<C> for PendingSubscri
let error = res.is_err();
return_sender.send(res);
if error {
return NextAccountControllerState::NewState(SyncingState::enter(shared_state, 0));
return NextAccountControllerState::NewState(SyncingState::enter(shared_state));
} else {
return NextAccountControllerState::NewState(LoggedOutState::enter());
}
Expand All @@ -95,15 +95,15 @@ impl<C: ConnectivityMonitor> AccountControllerStateHandler<C> for PendingSubscri
if error {
return NextAccountControllerState::SameState(self);
} else {
return NextAccountControllerState::NewState(SyncingState::enter(shared_state, 0));
return NextAccountControllerState::NewState(SyncingState::enter(shared_state));
}
},
AccountCommand::RefreshAccountState(return_sender) => {
return_sender.send(Ok(()));
if shared_state.firewall_active {
return NextAccountControllerState::SameState(self);
} else {
return NextAccountControllerState::NewState(SyncingState::enter(shared_state, 0));
return NextAccountControllerState::NewState(SyncingState::enter(shared_state));
}
},
AccountCommand::VpnApiFirewallDown(return_sender) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl<C: ConnectivityMonitor> AccountControllerStateHandler<C> for ReadyState {
tracing::debug!("VPN API is firewalled, timed account syncing skipped");
return NextAccountControllerState::NewState(ReadyState::enter());
} else {
return NextAccountControllerState::NewState(SyncingState::enter(shared_state, 0));
return NextAccountControllerState::NewState(SyncingState::enter(shared_state));
}
},
Some(command) = command_rx.recv() => {
Expand Down Expand Up @@ -97,15 +97,15 @@ impl<C: ConnectivityMonitor> AccountControllerStateHandler<C> for ReadyState {
if error {
return NextAccountControllerState::SameState(self);
} else {
return NextAccountControllerState::NewState(SyncingState::enter(shared_state, 0));
return NextAccountControllerState::NewState(SyncingState::enter(shared_state));
}
},
AccountCommand::RefreshAccountState(return_sender) => {
return_sender.send(Ok(()));
if shared_state.firewall_active {
return NextAccountControllerState::SameState(self);
} else {
return NextAccountControllerState::NewState(SyncingState::enter(shared_state, 0));
return NextAccountControllerState::NewState(SyncingState::enter(shared_state));
}
},

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only

use std::{cmp::min, sync::Arc, time::Duration};
use std::sync::Arc;

use crate::{
SharedAccountState,
commands::{AccountCommand, UpgradeModeCommand, common_handler, handler},
state_machine::{
AccountControllerStateHandler, DecentralisedState, ErrorState, LoggedOutState,
NextAccountControllerState, OfflineState, PendingSubscriptionState,
PrivateAccountControllerState,
PrivateAccountControllerState, ReadyState,
},
};
use nym_offline_monitor::ConnectivityMonitor;
Expand All @@ -25,18 +25,12 @@ use nym_vpn_lib_types::{
use requesting_zknym_state::RequestingZkNymsState;
use tokio::sync::mpsc;
use tokio_util::sync::{CancellationToken, DropGuard};
use tracing::warn;
use tracing::{debug, warn};

pub(super) mod requesting_zknym_state;

const MAX_SYNCING_ATTEMPTS: u32 = 10;
const SYNCING_STATE_CONTEXT: &str = "SYNCING_STATE";

// bounded exponential backoff for retries [0.25, 0.5, 1.0, 2.0, 4.0, 8.0, 8.0, 8.0, 8.0, 8.0, 8.0] = 55.75s max wait
const RETRY_BACKOFF: Duration = Duration::from_millis(250);
const MAX_BACKOFF_EXPONENT: u32 = 5;
const BACKOFF_BASE: u32 = 2;

enum SyncEvent {
/// Account summary is received
AccountSummary(Box<VpnAccountSummary>),
Expand Down Expand Up @@ -67,15 +61,13 @@ enum SyncEvent {
/// - OfflineState : the connectivity monitor is telling we're not connected
/// - DecentralisedState : The loaded account is set to "decentralised" mode
pub struct SyncingState {
attempts: u32,
event_rx: mpsc::UnboundedReceiver<SyncEvent>,
sync_cancel_token: Option<DropGuard>,
}

impl SyncingState {
pub fn enter<C: ConnectivityMonitor>(
shared_state: &SharedAccountState<C>,
attempts: u32,
) -> (
Box<dyn AccountControllerStateHandler<C>>,
PrivateAccountControllerState,
Expand All @@ -101,18 +93,11 @@ impl SyncingState {
// This handle does not need to be awaited since event channel and cancellation token are sufficient.
let _syncing_state_handle =
tokio::spawn(sync_cancel_token.child_token().run_until_cancelled_owned(
SyncingState::sync_account(
event_tx,
vpn_api_client,
vpn_api_account,
device,
attempts,
),
SyncingState::sync_account(event_tx, vpn_api_client, vpn_api_account, device),
));

(
Box::new(Self {
attempts,
event_rx,
sync_cancel_token: Some(sync_cancel_token.drop_guard()),
}),
Expand All @@ -125,12 +110,7 @@ impl SyncingState {
vpn_api_client: VpnApiClient,
vpn_api_account: Arc<VpnAccount>,
device: Device,
attempts: u32,
) {
if attempts > 0 {
tokio::time::sleep(Self::get_delay(attempts)).await;
}

let final_event =
Self::sync_account_inner(event_tx.clone(), vpn_api_client, vpn_api_account, device)
.await
Expand All @@ -157,7 +137,7 @@ impl SyncingState {
.await
.map_err(Self::map_vpn_api_error)?;

tracing::debug!("{summary:#?}");
tracing::debug!("{summary:?}");

Self::handle_received_account_summary(
event_tx.clone(),
Expand Down Expand Up @@ -250,11 +230,6 @@ impl SyncingState {
.await?;
Ok(()) // We can register a device, we have fair usage
}

/// The attempt retries should start with attempt 1
fn get_delay(attempts: u32) -> Duration {
RETRY_BACKOFF * BACKOFF_BASE.pow(min(attempts - 1, MAX_BACKOFF_EXPONENT))
}
}

#[async_trait::async_trait]
Expand All @@ -277,34 +252,33 @@ impl<C: ConnectivityMonitor> AccountControllerStateHandler<C> for SyncingState {
NextAccountControllerState::SameState(self)
}
SyncEvent::Failure(err) => {
let is_retryable = err.is_retryable();
let err_str = err.to_string();
// debug print the error so that it contains display information about sub error
let err_str = format!("{err:?}");
match err.into_error_reason() {
None => {
tracing::debug!("Subscription is pending, waiting before retrying");
NextAccountControllerState::NewState(PendingSubscriptionState::enter())
}
Some(reason) if is_retryable => {
if self.attempts > MAX_SYNCING_ATTEMPTS {
tracing::debug!("Error trying to get account summary, exhausted retries : {err_str}");
NextAccountControllerState::NewState(ErrorState::enter(reason))
} else {
tracing::debug!(
"Error trying to get account summary attempt {}, retrying after {:?} : {err_str}",
self.attempts,
Self::get_delay(self.attempts + 1),
);
NextAccountControllerState::NewState(SyncingState::enter(shared_state, self.attempts + 1))
}
}
Some(reason) => {
tracing::debug!("Error trying to get account summary, not retrying : {err_str}");
NextAccountControllerState::NewState(ErrorState::enter(reason))
}
}
}
SyncEvent::Finished => {
NextAccountControllerState::NewState(RequestingZkNymsState::enter(shared_state, self.attempts, false))
if let Ok(true) =
// If we already have sufficient tickets skip to ready state.
// tickets should be handled by top-up after once connected.
shared_state
.credential_storage
.is_all_ticket_types_above_minimal_threshold().await {

debug!("proceeding with existing tickets");
NextAccountControllerState::NewState(ReadyState::enter())
} else {
NextAccountControllerState::NewState(RequestingZkNymsState::enter(shared_state, false))
}

}
}
}
Expand Down Expand Up @@ -341,20 +315,20 @@ impl<C: ConnectivityMonitor> AccountControllerStateHandler<C> for SyncingState {
return if shared_state.firewall_active {
NextAccountControllerState::SameState(self)
} else {
NextAccountControllerState::NewState(SyncingState::enter(shared_state, 0))
NextAccountControllerState::NewState(SyncingState::enter(shared_state))
}
},
AccountCommand::ResetDeviceIdentity(return_sender, seed) => {
return_sender.send(handler::handle_reset_device_identity(shared_state, seed).await);
return NextAccountControllerState::NewState(SyncingState::enter(shared_state,0));
return NextAccountControllerState::NewState(SyncingState::enter(shared_state));
},

AccountCommand::VpnApiFirewallDown(return_sender) => {
return_sender.send(Ok(()));
// No-op if the firewall was already down
if shared_state.firewall_active {
shared_state.firewall_active = false;
return NextAccountControllerState::NewState(SyncingState::enter(shared_state, self.attempts));
return NextAccountControllerState::NewState(SyncingState::enter(shared_state));
}
},

Expand Down Expand Up @@ -408,15 +382,6 @@ enum SyncError {
}

impl SyncError {
fn is_retryable(&self) -> bool {
matches!(
self,
SyncError::ApiRequestError(_)
| SyncError::DeviceTimeDesynced
| SyncError::InactiveSubscription // in the case of IAP, it might take a while for the subscription to become active
)
}

/// Returns the corresponding error reason for the error state, or `None` if the error
/// should not result in an error state (e.g. pending subscription has its own state).
fn into_error_reason(self) -> Option<AccountControllerErrorStateReason> {
Expand Down
Loading
Loading