Skip to content

Commit

Permalink
Use a single task manager and dispose the mixnet client on shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
octol committed Feb 17, 2025
1 parent 2ef7fe3 commit c5ced2e
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 190 deletions.
5 changes: 5 additions & 0 deletions nym-vpn-core/crates/nym-mixnet-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,9 @@ impl SharedMixnetClient {
pub fn connection_fd_callback(&self) -> Arc<dyn Fn(RawFd) + Send + Sync> {
self.connection_fd_callback.clone()
}

pub async fn dispose(self) {
// A mixnet client that has an external task manager is dropped to disconnect.
self.lock().await.take();
}
}
158 changes: 0 additions & 158 deletions nym-vpn-core/crates/nym-vpn-lib/src/bandwidth_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,16 @@
// SPDX-License-Identifier: GPL-3.0-only

use std::time::Duration;
#[cfg(unix)]
use std::{os::fd::RawFd, sync::Arc};

use nym_vpn_network_config::Network;
use tokio::{sync::mpsc, time::timeout};
use tokio_stream::{wrappers::IntervalStream, StreamExt};
use tokio_util::sync::CancellationToken;

use nym_authenticator_client::AuthClient;
use nym_credentials_interface::TicketType;
use nym_gateway_directory::GatewayClient;
use nym_sdk::{
mixnet::{ConnectionStatsEvent, CredentialStorage as Storage},
TaskClient,
};
use nym_task::TaskManager;
use nym_validator_client::{
nyxd::{Config as NyxdClientConfig, NyxdClient},
QueryHttpRpcNyxdClient,
Expand All @@ -27,14 +21,6 @@ use nym_wg_gateway_client::{
};
use nym_wireguard_types::DEFAULT_PEER_TIMEOUT_CHECK;

use crate::{
tunnel_state_machine::tunnel::{
MixnetConnectOptions, MIXNET_CLIENT_STARTUP_TIMEOUT, TASK_MANAGER_SHUTDOWN_TIMER_SECS,
},
MixnetClientConfig,
};
use nym_vpn_lib_types::TunnelType;

const DEFAULT_BANDWIDTH_CHECK: Duration = Duration::from_secs(5); // 5 seconds
const DEFAULT_BANDWIDTH_DEPLETION_RATE: u64 = 1024 * 1024; // 1 MB/s

Expand Down Expand Up @@ -149,76 +135,14 @@ impl DepletionRate {
}
}

pub struct ReconnectMixnetClientData {
options: MixnetConnectOptions,
network_env: Network,
bw_controller_task_manager: TaskManager,
mixnet_client_config: MixnetClientConfig,
}

impl ReconnectMixnetClientData {
pub fn new(
options: MixnetConnectOptions,
network_env: Network,
bw_controller_task_manager: TaskManager,
mixnet_client_config: MixnetClientConfig,
) -> Self {
Self {
options,
network_env,
bw_controller_task_manager,
mixnet_client_config,
}
}

pub async fn recreate_mixnet_connection(
&self,
#[cfg(unix)] connection_fd_callback: Arc<dyn Fn(RawFd) + Send + Sync>,
) -> Option<AuthClient> {
let entry_gateway = self.options.selected_gateways.entry.identity();
let mixnet_client = match tokio::time::timeout(
MIXNET_CLIENT_STARTUP_TIMEOUT,
crate::mixnet::setup_mixnet_client(
&self.network_env,
entry_gateway,
&self.options.data_path,
self.bw_controller_task_manager
.subscribe_named("mixnet_client_main"),
self.mixnet_client_config.clone(),
self.options.enable_credentials_mode,
self.options.stats_recipient_address,
self.options.tunnel_type == TunnelType::Wireguard,
#[cfg(unix)]
connection_fd_callback.clone(),
),
)
.await
{
Ok(Ok(client)) => client,
Err(_) => {
tracing::warn!("timed out while trying to recreate mixnet client");
return None;
}
Ok(Err(err)) => {
tracing::warn!("could not re-create mixnet client {:?}", err);
return None;
}
};
Some(AuthClient::new(mixnet_client).await)
}
}

pub(crate) struct BandwidthController<St> {
inner: nym_bandwidth_controller::BandwidthController<QueryHttpRpcNyxdClient, St>,
connected_mixnet: bool,
wg_entry_gateway_client: WgGatewayLightClient,
wg_exit_gateway_client: WgGatewayLightClient,
timeout_check_interval: IntervalStream,
entry_depletion_rate: DepletionRate,
exit_depletion_rate: DepletionRate,
shutdown: TaskClient,
reconnect_mixnet_client_data: ReconnectMixnetClientData,
cancel_token: CancellationToken,
}

impl<St: Storage> BandwidthController<St> {
Expand All @@ -228,25 +152,20 @@ impl<St: Storage> BandwidthController<St> {
wg_entry_gateway_client: WgGatewayLightClient,
wg_exit_gateway_client: WgGatewayLightClient,
shutdown: TaskClient,
reconnect_mixnet_client_data: ReconnectMixnetClientData,
) -> Result<Self> {
let client = get_nyxd_client(network)?;
let inner = nym_bandwidth_controller::BandwidthController::new(storage, client);
let timeout_check_interval =
IntervalStream::new(tokio::time::interval(DEFAULT_BANDWIDTH_CHECK));
let cancel_token = CancellationToken::new();

Ok(BandwidthController {
inner,
connected_mixnet: true,
wg_entry_gateway_client,
wg_exit_gateway_client,
timeout_check_interval,
entry_depletion_rate: Default::default(),
exit_depletion_rate: Default::default(),
shutdown,
reconnect_mixnet_client_data,
cancel_token,
})
}

Expand Down Expand Up @@ -336,7 +255,6 @@ impl<St: Storage> BandwidthController<St> {

tokio::select! {
_ = self.shutdown.recv() => {
self.cancel_token.cancel();
tracing::trace!("BandwidthController: Received shutdown");
}
ret = wg_gateway_client.query_bandwidth() => {
Expand Down Expand Up @@ -382,93 +300,18 @@ impl<St: Storage> BandwidthController<St> {
None
}

pub(crate) async fn try_reconnect(&mut self, mixnet_error_tx: mpsc::Sender<()>) -> bool {
#[cfg(unix)]
let connection_fd_callback = self
.wg_entry_gateway_client
.auth_client()
.mixnet_client()
.connection_fd_callback();
let Some(auth_client) = self
.reconnect_mixnet_client_data
.recreate_mixnet_connection(
#[cfg(unix)]
connection_fd_callback.clone(),
)
.await
else {
self.connected_mixnet = false;
return false;
};

self.wg_entry_gateway_client
.set_auth_client(auth_client.clone());
self.wg_exit_gateway_client.set_auth_client(auth_client);
self.connected_mixnet = true;
self.spawn_wait_for_mixnet_error(mixnet_error_tx);
true
}

fn spawn_wait_for_mixnet_error(&mut self, mixnet_error_tx: mpsc::Sender<()>) {
let cancel_token = self.cancel_token.clone();
let mut task_manager = std::mem::replace(
&mut self.reconnect_mixnet_client_data.bw_controller_task_manager,
TaskManager::new(TASK_MANAGER_SHUTDOWN_TIMER_SECS),
);
tokio::task::spawn(async move {
cancel_token
.run_until_cancelled(task_manager.wait_for_error())
.await;
task_manager.signal_shutdown().ok();
mixnet_error_tx.send(()).await.ok();
});
}

async fn cleanup(mut self) {
self.reconnect_mixnet_client_data
.bw_controller_task_manager
.signal_shutdown()
.ok();
if timeout(
Duration::from_secs(10),
self.reconnect_mixnet_client_data
.bw_controller_task_manager
.wait_for_graceful_shutdown(),
)
.await
.is_err()
{
tracing::error!(
"Timeout waiting for task manager controlled by bandwidth controller to finish waiting for its tasks to all exit"
);
}
}

pub(crate) async fn run(mut self)
where
<St as Storage>::StorageError: Send + Sync + 'static,
{
let (mixnet_error_tx, mut mixnet_error_rx) = mpsc::channel(1);
self.spawn_wait_for_mixnet_error(mixnet_error_tx.clone());

// Skip the first, immediate tick
self.timeout_check_interval.next().await;
while !self.shutdown.is_shutdown() {
tokio::select! {
_ = self.shutdown.recv() => {
self.cancel_token.cancel();
tracing::trace!("BandwidthController: Received shutdown");
}
_ = mixnet_error_rx.recv() => {
break;
// Don't attempt to reconnect and just drop the BandwidthController and shut down
// the tunnel
// self.try_reconnect(mixnet_error_tx.clone()).await;
}
_ = self.timeout_check_interval.next() => {
if !self.connected_mixnet && !self.try_reconnect(mixnet_error_tx.clone()).await {
continue;
}
let current_period = self.timeout_check_interval.as_ref().period();
let entry_duration = self.check_bandwidth(true, current_period).await;
let exit_duration = self.check_bandwidth(false, current_period).await;
Expand All @@ -492,7 +335,6 @@ impl<St: Storage> BandwidthController<St> {
}
}

self.cleanup().await;
tracing::debug!("BandwidthController: Exiting");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,9 @@ impl Connector {
})
}

/// Gracefully shutdown task manager and consume the struct.
/// Gracefully shutdown task manager and mixnet client, and consume the struct.
pub async fn dispose(self) {
tunnel::shutdown_task_manager(self.task_manager).await;
tracing::debug!("Shutting down mixnet client");
tunnel::shutdown_mixnet_client(self.task_manager, self.mixnet_client).await;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@ use tokio_util::sync::CancellationToken;
#[cfg(windows)]
use super::route_handler;
use super::{MixnetEvent, TunnelType};
use crate::{
bandwidth_controller::ReconnectMixnetClientData, GatewayDirectoryError, MixnetClientConfig,
MixnetError,
};
use crate::{GatewayDirectoryError, MixnetClientConfig, MixnetError};
pub use any_tunnel_handle::AnyTunnelHandle;
use status_listener::StatusListener;
pub use tombstone::Tombstone;
Expand All @@ -42,7 +39,6 @@ pub struct ConnectedMixnet {
selected_gateways: SelectedGateways,
data_path: Option<PathBuf>,
mixnet_client: SharedMixnetClient,
reconnect_mixnet_client_data: ReconnectMixnetClientData,
}

impl ConnectedMixnet {
Expand Down Expand Up @@ -112,7 +108,6 @@ impl ConnectedMixnet {
enable_credentials_mode,
self.selected_gateways,
self.data_path,
self.reconnect_mixnet_client_data,
cancel_token,
)
.await
Expand All @@ -127,7 +122,8 @@ impl ConnectedMixnet {

/// Gracefully shutdown the mixnet client and consume the struct.
pub async fn dispose(self) {
shutdown_task_manager(self.task_manager).await;
tracing::debug!("Shutting down connected mixnet");
shutdown_mixnet_client(self.task_manager, self.mixnet_client).await;
}
}

Expand Down Expand Up @@ -176,20 +172,9 @@ pub async fn connect_mixnet(
#[cfg(unix)] connection_fd_callback: Arc<dyn Fn(RawFd) + Send + Sync>,
) -> Result<ConnectedMixnet> {
let task_manager = TaskManager::new(TASK_MANAGER_SHUTDOWN_TIMER_SECS);
let bw_controller_task_manager = TaskManager::new(TASK_MANAGER_SHUTDOWN_TIMER_SECS);

let task_client = match options.tunnel_type {
TunnelType::Mixnet => task_manager.subscribe_named("mixnet_client_main"),
TunnelType::Wireguard => bw_controller_task_manager.subscribe_named("mixnet_client_main"),
};
let task_client = task_manager.subscribe_named("mixnet_client_main");

let mut mixnet_client_config = options.mixnet_client_config.clone().unwrap_or_default();
let reconnect_mixnet_client_data = ReconnectMixnetClientData::new(
options.clone(),
network_env.clone(),
bw_controller_task_manager,
mixnet_client_config.clone(),
);
let user_agent = options
.user_agent
.unwrap_or(UserAgent::from(nym_bin_common::bin_info_local_vergen!()));
Expand Down Expand Up @@ -238,23 +223,34 @@ pub async fn connect_mixnet(
data_path: options.data_path,
gateway_directory_client,
mixnet_client,
reconnect_mixnet_client_data,
}),
Err(e) => {
tracing::error!("JON: Failed to connect to mixnet: {:?}", e);
shutdown_task_manager(task_manager).await;
Err(e)
}
}
}

async fn shutdown_task_manager(mut task_manager: TaskManager) {
tracing::debug!("Shutting down task manager");
if task_manager.signal_shutdown().is_err() {
tracing::error!("Failed to signal task manager shutdown");
}

tracing::debug!("Waiting for task manager to shutdown");
task_manager.wait_for_graceful_shutdown().await;
}

async fn shutdown_mixnet_client(mut task_manager: TaskManager, mixnet_client: SharedMixnetClient) {
if let Err(e) = task_manager.signal_shutdown() {
tracing::error!("Failed to signal task manager shutdown: {}", e);
}

tracing::debug!("Disposing mixnet client");
mixnet_client.dispose().await;

tracing::debug!("Waiting for task manager to shutdown");
task_manager.wait_for_graceful_shutdown().await;
tracing::debug!("Task manager finished");
}

#[derive(Debug, thiserror::Error)]
Expand Down
Loading

0 comments on commit c5ced2e

Please sign in to comment.