Skip to content

Commit

Permalink
Handle cancellation when underlying channel(1) is congested (#2034)
Browse files Browse the repository at this point in the history
  • Loading branch information
pronebird authored Jan 30, 2025
1 parent 3c2350a commit bcfb9fa
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 43 deletions.
34 changes: 23 additions & 11 deletions nym-vpn-core/crates/nym-connection-monitor/src/icmp_beacon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,17 +140,29 @@ impl IcmpConnectionBeacon {
break;
}
_ = ping_interval.tick() => {
if let Err(err) = self.ping_v4_ipr_tun_device_over_the_mixnet().await {
error!("Failed to send ICMP ping: {err}");
}
if let Err(err) = self.ping_v6_ipr_tun_device_over_the_mixnet().await {
error!("Failed to send ICMPv6 ping: {err}");
}
if let Err(err) = self.ping_v4_some_external_ip_over_the_mixnet().await {
error!("Failed to send ICMP ping: {err}");
}
if let Err(err) = self.ping_v6_some_external_ip_over_the_mixnet().await {
error!("Failed to send ICMPv6 ping: {err}");
let cancellable_fut = async {
if let Err(err) = self.ping_v4_ipr_tun_device_over_the_mixnet().await {
error!("Failed to send ICMP ping: {err}");
}
if let Err(err) = self.ping_v6_ipr_tun_device_over_the_mixnet().await {
error!("Failed to send ICMPv6 ping: {err}");
}
if let Err(err) = self.ping_v4_some_external_ip_over_the_mixnet().await {
error!("Failed to send ICMP ping: {err}");
}
if let Err(err) = self.ping_v6_some_external_ip_over_the_mixnet().await {
error!("Failed to send ICMPv6 ping: {err}");
}
};

tokio::select! {
_ = cancellable_fut => {
continue;
},
_ = shutdown.recv() => {
trace!("IcmpConnectionBeacon: Received shutdown");
break;
}
}
}
}
Expand Down
20 changes: 14 additions & 6 deletions nym-vpn-core/crates/nym-connection-monitor/src/mixnet_beacon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,22 @@ impl MixnetConnectionBeacon {
break;
}
_ = ping_interval.tick() => {
let _ping_id = match self.send_mixnet_self_ping().await {
Ok(id) => id,
Err(err) => {
error!("Failed to send mixnet self ping: {err}");
continue;
tokio::select! {
_ = shutdown.recv() => {
trace!("MixnetConnectionBeacon: Received shutdown");
break;
},
ping_result = self.send_mixnet_self_ping() => {
let _ping_id = match ping_result {
Ok(id) => id,
Err(err) => {
error!("Failed to send mixnet self ping: {err}");
continue;
}
};
// TODO: store ping_id to be able to monitor or ping timeouts
}
};
// TODO: store ping_id to be able to monitor or ping timeouts
}
}
}
Expand Down
26 changes: 20 additions & 6 deletions nym-vpn-core/crates/nym-vpn-lib/src/mixnet/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,16 @@ impl MixnetProcessor {

match message_creator.create_input_message(bundled_packets) {
Ok(input_message) => {
let ret = sender.send(input_message).await;
if ret.is_err() && !task_client_mix_processor.is_shutdown_poll() {
error!("Could not forward IP packet to the mixnet. The packet will be dropped.");
tokio::select! {
ret = sender.send(input_message) => {
if ret.is_err() && !task_client_mix_processor.is_shutdown_poll() {
error!("Could not forward IP packet to the mixnet. The packet will be dropped.");
}
}
_ = task_client_mix_processor.recv_with_delay() => {
trace!("MixnetProcessor: Received shutdown while sending.");
break;
}
}
}
Err(err) => {
Expand All @@ -144,9 +151,16 @@ impl MixnetProcessor {
{
match message_creator.create_input_message(input_message) {
Ok(input_message) => {
let ret = sender.send(input_message).await;
if ret.is_err() && !task_client_mix_processor.is_shutdown_poll() {
error!("Could not forward IP packet to the mixnet. The packet(s) will be dropped.");
tokio::select! {
ret = sender.send(input_message) => {
if ret.is_err() && !task_client_mix_processor.is_shutdown_poll() {
error!("Could not forward IP packet to the mixnet. The packet(s) will be dropped.");
}
}
_ = task_client_mix_processor.recv_with_delay() => {
trace!("MixnetProcessor: Received shutdown while sending.");
break;
}
}
}
Err(err) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,15 @@ impl ConnectedMixnet {
pub async fn start_event_listener(
&mut self,
event_sender: mpsc::UnboundedSender<MixnetEvent>,
cancel_token: CancellationToken,
) -> JoinHandle<()> {
let (status_tx, status_rx) = futures::channel::mpsc::channel(10);

self.task_manager
.start_status_listener(status_tx, TaskStatus::Ready)
.await;

StatusListener::spawn(status_rx, event_sender)
StatusListener::spawn(status_rx, event_sender, cancel_token)
}

/// Creates a tunnel over Mixnet.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,61 @@ use nym_bandwidth_controller::BandwidthStatusMessage;
use nym_connection_monitor::ConnectionMonitorStatus;
use nym_task::{StatusReceiver, TaskStatus};
use nym_vpn_lib_types::{BandwidthEvent, ConnectionEvent, ConnectionStatisticsEvent, MixnetEvent};
use tokio_util::sync::CancellationToken;

pub struct StatusListener {
rx: StatusReceiver,
tx: mpsc::UnboundedSender<MixnetEvent>,
cancel_token: CancellationToken,
}

impl StatusListener {
pub fn spawn(rx: StatusReceiver, tx: mpsc::UnboundedSender<MixnetEvent>) -> JoinHandle<()> {
pub fn spawn(
rx: StatusReceiver,
tx: mpsc::UnboundedSender<MixnetEvent>,
cancel_token: CancellationToken,
) -> JoinHandle<()> {
tokio::spawn(async move {
let status_listener = Self { rx, tx };
let status_listener = Self {
rx,
tx,
cancel_token,
};
status_listener.run().await;
})
}

async fn run(mut self) {
tracing::debug!("Starting status listener loop");

while let Some(msg) = self.rx.next().await {
if let Some(msg) = msg.as_any().downcast_ref::<TaskStatus>() {
tracing::debug!("Received ignored TaskStatus message: {msg}");
} else if let Some(msg) = msg.as_any().downcast_ref::<ConnectionMonitorStatus>() {
self.send_event(MixnetEvent::Connection(ConnectionEvent::from(msg)));
} else if let Some(msg) = msg.as_any().downcast_ref::<BandwidthStatusMessage>() {
self.send_event(MixnetEvent::Bandwidth(BandwidthEvent::from(msg)));
} else if let Some(msg) = msg
.as_any()
.downcast_ref::<MixnetBandwidthStatisticsEvent>()
{
self.send_event(MixnetEvent::ConnectionStatistics(
ConnectionStatisticsEvent::from(msg),
));
} else {
tracing::debug!("Unknown status message received: {msg}");
loop {
tokio::select! {
msg = self.rx.next() => {
let Some(msg) = msg else {
break
};

if let Some(msg) = msg.as_any().downcast_ref::<TaskStatus>() {
tracing::debug!("Received ignored TaskStatus message: {msg}");
} else if let Some(msg) = msg.as_any().downcast_ref::<ConnectionMonitorStatus>() {
self.send_event(MixnetEvent::Connection(ConnectionEvent::from(msg)));
} else if let Some(msg) = msg.as_any().downcast_ref::<BandwidthStatusMessage>() {
self.send_event(MixnetEvent::Bandwidth(BandwidthEvent::from(msg)));
} else if let Some(msg) = msg
.as_any()
.downcast_ref::<MixnetBandwidthStatisticsEvent>()
{
self.send_event(MixnetEvent::ConnectionStatistics(
ConnectionStatisticsEvent::from(msg),
));
} else {
tracing::debug!("Unknown status message received: {msg}");
}
}

_ = self.cancel_token.cancelled() => {
break;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,10 @@ impl TunnelMonitor {
}

let status_listener_handle = connected_mixnet
.start_event_listener(self.mixnet_event_sender.clone())
.start_event_listener(
self.mixnet_event_sender.clone(),
self.cancel_token.child_token(),
)
.await;

let selected_gateways = connected_mixnet.selected_gateways().clone();
Expand Down

0 comments on commit bcfb9fa

Please sign in to comment.