Skip to content

Commit

Permalink
review feedback: use Option<Notification> instead
Browse files Browse the repository at this point in the history
  • Loading branch information
jmpesp committed Feb 5, 2025
1 parent 007d95a commit 912e376
Showing 1 changed file with 18 additions and 17 deletions.
35 changes: 18 additions & 17 deletions upstairs/src/notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ pub(crate) fn spawn_notify_task(addr: Ipv6Addr, log: &Logger) -> NotifyQueue {
}

struct Notification {
maybe_message: Option<(DateTime<Utc>, NotifyRequest)>,
message: (DateTime<Utc>, NotifyRequest),
qos: NotifyQos,
retries: usize,
}
Expand All @@ -139,7 +139,7 @@ async fn notify_task_nexus(
info!(log, "notify_task started");

// Store high QoS messages if they can't be sent
let mut stored_notification = None;
let mut stored_notification: Option<Notification> = None;

let reqwest_client = reqwest::ClientBuilder::new()
.connect_timeout(std::time::Duration::from_secs(15))
Expand All @@ -148,33 +148,34 @@ async fn notify_task_nexus(
.unwrap();

loop {
let Notification {
maybe_message,
qos,
retries,
} = {
if let Some(notification) = stored_notification.take() {
notification
let r = {
if stored_notification.is_some() {
stored_notification.take()
} else {
tokio::select! {
biased;

i = rx_high.recv() => Notification {
maybe_message: i,
i = rx_high.recv() => i.map(|message| Notification {
message,
qos: NotifyQos::High,
retries: 0,
},
}),

i = rx_low.recv() => Notification {
maybe_message: i,
i = rx_low.recv() => i.map(|message| Notification {
message,
qos: NotifyQos::Low,
retries: 0,
},
}),
}
}
};

let Some((time, m)) = maybe_message else {
let Some(Notification {
message: (time, m),
qos,
retries,
}) = r
else {
error!(log, "one of the notify channels was closed!");
break;
};
Expand Down Expand Up @@ -395,7 +396,7 @@ async fn notify_task_nexus(
warn!(log, "retries > 3, dropping {m:?}");
} else {
stored_notification = Some(Notification {
maybe_message: Some((time, m)),
message: (time, m),
qos,
retries: retries + 1,
});
Expand Down

0 comments on commit 912e376

Please sign in to comment.