diff --git a/nym-vpn-core/crates/nym-connection-monitor/src/icmp_beacon.rs b/nym-vpn-core/crates/nym-connection-monitor/src/icmp_beacon.rs index a7c107bcae..7031b26279 100644 --- a/nym-vpn-core/crates/nym-connection-monitor/src/icmp_beacon.rs +++ b/nym-vpn-core/crates/nym-connection-monitor/src/icmp_beacon.rs @@ -140,17 +140,29 @@ impl IcmpConnectionBeacon { break; } _ = ping_interval.tick() => { - if let Err(err) = self.ping_v4_ipr_tun_device_over_the_mixnet().await { - error!("Failed to send ICMP ping: {err}"); - } - if let Err(err) = self.ping_v6_ipr_tun_device_over_the_mixnet().await { - error!("Failed to send ICMPv6 ping: {err}"); - } - if let Err(err) = self.ping_v4_some_external_ip_over_the_mixnet().await { - error!("Failed to send ICMP ping: {err}"); - } - if let Err(err) = self.ping_v6_some_external_ip_over_the_mixnet().await { - error!("Failed to send ICMPv6 ping: {err}"); + let cancellable_fut = async { + if let Err(err) = self.ping_v4_ipr_tun_device_over_the_mixnet().await { + error!("Failed to send ICMP ping: {err}"); + } + if let Err(err) = self.ping_v6_ipr_tun_device_over_the_mixnet().await { + error!("Failed to send ICMPv6 ping: {err}"); + } + if let Err(err) = self.ping_v4_some_external_ip_over_the_mixnet().await { + error!("Failed to send ICMP ping: {err}"); + } + if let Err(err) = self.ping_v6_some_external_ip_over_the_mixnet().await { + error!("Failed to send ICMPv6 ping: {err}"); + } + }; + + tokio::select! { + _ = cancellable_fut => { + continue; + }, + _ = shutdown.recv() => { + trace!("IcmpConnectionBeacon: Received shutdown"); + break; + } } } } diff --git a/nym-vpn-core/crates/nym-connection-monitor/src/mixnet_beacon.rs b/nym-vpn-core/crates/nym-connection-monitor/src/mixnet_beacon.rs index a376b1e284..219bb4e7a4 100644 --- a/nym-vpn-core/crates/nym-connection-monitor/src/mixnet_beacon.rs +++ b/nym-vpn-core/crates/nym-connection-monitor/src/mixnet_beacon.rs @@ -45,14 +45,22 @@ impl MixnetConnectionBeacon { break; } _ = ping_interval.tick() => { - let _ping_id = match self.send_mixnet_self_ping().await { - Ok(id) => id, - Err(err) => { - error!("Failed to send mixnet self ping: {err}"); - continue; + tokio::select! { + _ = shutdown.recv() => { + trace!("MixnetConnectionBeacon: Received shutdown"); + break; + }, + ping_result = self.send_mixnet_self_ping() => { + let _ping_id = match ping_result { + Ok(id) => id, + Err(err) => { + error!("Failed to send mixnet self ping: {err}"); + continue; + } + }; + // TODO: store ping_id to be able to monitor or ping timeouts } }; - // TODO: store ping_id to be able to monitor or ping timeouts } } } diff --git a/nym-vpn-core/crates/nym-vpn-lib/src/mixnet/processor.rs b/nym-vpn-core/crates/nym-vpn-lib/src/mixnet/processor.rs index dc46995d73..899838da9a 100644 --- a/nym-vpn-core/crates/nym-vpn-lib/src/mixnet/processor.rs +++ b/nym-vpn-core/crates/nym-vpn-lib/src/mixnet/processor.rs @@ -127,9 +127,16 @@ impl MixnetProcessor { match message_creator.create_input_message(bundled_packets) { Ok(input_message) => { - let ret = sender.send(input_message).await; - if ret.is_err() && !task_client_mix_processor.is_shutdown_poll() { - error!("Could not forward IP packet to the mixnet. The packet will be dropped."); + tokio::select! { + ret = sender.send(input_message) => { + if ret.is_err() && !task_client_mix_processor.is_shutdown_poll() { + error!("Could not forward IP packet to the mixnet. The packet will be dropped."); + } + } + _ = task_client_mix_processor.recv_with_delay() => { + trace!("MixnetProcessor: Received shutdown while sending."); + break; + } } } Err(err) => { @@ -144,9 +151,16 @@ impl MixnetProcessor { { match message_creator.create_input_message(input_message) { Ok(input_message) => { - let ret = sender.send(input_message).await; - if ret.is_err() && !task_client_mix_processor.is_shutdown_poll() { - error!("Could not forward IP packet to the mixnet. The packet(s) will be dropped."); + tokio::select! { + ret = sender.send(input_message) => { + if ret.is_err() && !task_client_mix_processor.is_shutdown_poll() { + error!("Could not forward IP packet to the mixnet. The packet(s) will be dropped."); + } + } + _ = task_client_mix_processor.recv_with_delay() => { + trace!("MixnetProcessor: Received shutdown while sending."); + break; + } } } Err(err) => { diff --git a/nym-vpn-core/crates/nym-vpn-lib/src/tunnel_state_machine/tunnel/mod.rs b/nym-vpn-core/crates/nym-vpn-lib/src/tunnel_state_machine/tunnel/mod.rs index 0690996789..1158c3fc90 100644 --- a/nym-vpn-core/crates/nym-vpn-lib/src/tunnel_state_machine/tunnel/mod.rs +++ b/nym-vpn-core/crates/nym-vpn-lib/src/tunnel_state_machine/tunnel/mod.rs @@ -58,6 +58,7 @@ impl ConnectedMixnet { pub async fn start_event_listener( &mut self, event_sender: mpsc::UnboundedSender, + cancel_token: CancellationToken, ) -> JoinHandle<()> { let (status_tx, status_rx) = futures::channel::mpsc::channel(10); @@ -65,7 +66,7 @@ impl ConnectedMixnet { .start_status_listener(status_tx, TaskStatus::Ready) .await; - StatusListener::spawn(status_rx, event_sender) + StatusListener::spawn(status_rx, event_sender, cancel_token) } /// Creates a tunnel over Mixnet. diff --git a/nym-vpn-core/crates/nym-vpn-lib/src/tunnel_state_machine/tunnel/status_listener.rs b/nym-vpn-core/crates/nym-vpn-lib/src/tunnel_state_machine/tunnel/status_listener.rs index 7dc976a4fe..fe0aeb246c 100644 --- a/nym-vpn-core/crates/nym-vpn-lib/src/tunnel_state_machine/tunnel/status_listener.rs +++ b/nym-vpn-core/crates/nym-vpn-lib/src/tunnel_state_machine/tunnel/status_listener.rs @@ -6,16 +6,26 @@ use nym_bandwidth_controller::BandwidthStatusMessage; use nym_connection_monitor::ConnectionMonitorStatus; use nym_task::{StatusReceiver, TaskStatus}; use nym_vpn_lib_types::{BandwidthEvent, ConnectionEvent, ConnectionStatisticsEvent, MixnetEvent}; +use tokio_util::sync::CancellationToken; pub struct StatusListener { rx: StatusReceiver, tx: mpsc::UnboundedSender, + cancel_token: CancellationToken, } impl StatusListener { - pub fn spawn(rx: StatusReceiver, tx: mpsc::UnboundedSender) -> JoinHandle<()> { + pub fn spawn( + rx: StatusReceiver, + tx: mpsc::UnboundedSender, + cancel_token: CancellationToken, + ) -> JoinHandle<()> { tokio::spawn(async move { - let status_listener = Self { rx, tx }; + let status_listener = Self { + rx, + tx, + cancel_token, + }; status_listener.run().await; }) } @@ -23,22 +33,34 @@ impl StatusListener { async fn run(mut self) { tracing::debug!("Starting status listener loop"); - while let Some(msg) = self.rx.next().await { - if let Some(msg) = msg.as_any().downcast_ref::() { - tracing::debug!("Received ignored TaskStatus message: {msg}"); - } else if let Some(msg) = msg.as_any().downcast_ref::() { - self.send_event(MixnetEvent::Connection(ConnectionEvent::from(msg))); - } else if let Some(msg) = msg.as_any().downcast_ref::() { - self.send_event(MixnetEvent::Bandwidth(BandwidthEvent::from(msg))); - } else if let Some(msg) = msg - .as_any() - .downcast_ref::() - { - self.send_event(MixnetEvent::ConnectionStatistics( - ConnectionStatisticsEvent::from(msg), - )); - } else { - tracing::debug!("Unknown status message received: {msg}"); + loop { + tokio::select! { + msg = self.rx.next() => { + let Some(msg) = msg else { + break + }; + + if let Some(msg) = msg.as_any().downcast_ref::() { + tracing::debug!("Received ignored TaskStatus message: {msg}"); + } else if let Some(msg) = msg.as_any().downcast_ref::() { + self.send_event(MixnetEvent::Connection(ConnectionEvent::from(msg))); + } else if let Some(msg) = msg.as_any().downcast_ref::() { + self.send_event(MixnetEvent::Bandwidth(BandwidthEvent::from(msg))); + } else if let Some(msg) = msg + .as_any() + .downcast_ref::() + { + self.send_event(MixnetEvent::ConnectionStatistics( + ConnectionStatisticsEvent::from(msg), + )); + } else { + tracing::debug!("Unknown status message received: {msg}"); + } + } + + _ = self.cancel_token.cancelled() => { + break; + } } } diff --git a/nym-vpn-core/crates/nym-vpn-lib/src/tunnel_state_machine/tunnel_monitor.rs b/nym-vpn-core/crates/nym-vpn-lib/src/tunnel_state_machine/tunnel_monitor.rs index 6c9e862735..5ccb860807 100644 --- a/nym-vpn-core/crates/nym-vpn-lib/src/tunnel_state_machine/tunnel_monitor.rs +++ b/nym-vpn-core/crates/nym-vpn-lib/src/tunnel_state_machine/tunnel_monitor.rs @@ -310,7 +310,10 @@ impl TunnelMonitor { } let status_listener_handle = connected_mixnet - .start_event_listener(self.mixnet_event_sender.clone()) + .start_event_listener( + self.mixnet_event_sender.clone(), + self.cancel_token.child_token(), + ) .await; let selected_gateways = connected_mixnet.selected_gateways().clone();