From 2a2e0691a4fe73b618e93a5534bd2995657a8668 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sun, 31 Dec 2023 19:40:41 +0530 Subject: [PATCH 1/8] fix: use `Vec::extend` to not lost pending --- rumqttc/src/eventloop.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/rumqttc/src/eventloop.rs b/rumqttc/src/eventloop.rs index a1b23f562..b0f385e98 100644 --- a/rumqttc/src/eventloop.rs +++ b/rumqttc/src/eventloop.rs @@ -13,7 +13,6 @@ use std::io; use std::net::SocketAddr; use std::pin::Pin; use std::time::Duration; -use std::vec::IntoIter; #[cfg(unix)] use {std::path::Path, tokio::net::UnixStream}; @@ -79,7 +78,7 @@ pub struct EventLoop { /// Requests handle to send requests pub(crate) requests_tx: Sender, /// Pending packets from last session - pub pending: IntoIter, + pub pending: Vec, /// Network connection to the broker network: Option, /// Keep alive time @@ -102,7 +101,6 @@ impl EventLoop { pub fn new(mqtt_options: MqttOptions, cap: usize) -> EventLoop { let (requests_tx, requests_rx) = bounded(cap); let pending = Vec::new(); - let pending = pending.into_iter(); let max_inflight = mqtt_options.inflight; let manual_acks = mqtt_options.manual_acks; let max_outgoing_packet_size = mqtt_options.max_outgoing_packet_size; @@ -134,7 +132,7 @@ impl EventLoop { let requests_in_channel = self.requests_rx.drain(); pending.extend(requests_in_channel); - self.pending = pending.into_iter(); + self.pending.extend(pending.into_iter()); } /// Yields Next notification or outgoing request and periodically pings @@ -267,7 +265,7 @@ impl EventLoop { } async fn next_request( - pending: &mut IntoIter, + pending: &mut Vec, rx: &Receiver, pending_throttle: Duration, ) -> Result { From 8406c4ceacb3c6c3ce0310636c65dee6f55dd065 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sun, 31 Dec 2023 19:41:41 +0530 Subject: [PATCH 2/8] refactor: remove unnecessary unwrap --- rumqttc/src/eventloop.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rumqttc/src/eventloop.rs b/rumqttc/src/eventloop.rs index b0f385e98..e21cadecd 100644 --- a/rumqttc/src/eventloop.rs +++ b/rumqttc/src/eventloop.rs @@ -269,11 +269,11 @@ impl EventLoop { rx: &Receiver, pending_throttle: Duration, ) -> Result { - if pending.len() > 0 { + if let Some(req) = pending.pop() { time::sleep(pending_throttle).await; // We must call .next() AFTER sleep() otherwise .next() would // advance the iterator but the future might be canceled before return - Ok(pending.next().unwrap()) + Ok(req) } else { match rx.recv_async().await { Ok(r) => Ok(r), From 85384549ad2a10037befefa71c597505ec964521 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sun, 31 Dec 2023 19:58:36 +0530 Subject: [PATCH 3/8] style: clippy suggestions --- rumqttc/src/eventloop.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/rumqttc/src/eventloop.rs b/rumqttc/src/eventloop.rs index e21cadecd..174628764 100644 --- a/rumqttc/src/eventloop.rs +++ b/rumqttc/src/eventloop.rs @@ -125,14 +125,12 @@ impl EventLoop { pub fn clean(&mut self) { self.network = None; self.keepalive_timeout = None; - let mut pending = self.state.clean(); + self.pending.extend(self.state.clean()); // drain requests from channel which weren't yet received // this helps in preventing data loss let requests_in_channel = self.requests_rx.drain(); - pending.extend(requests_in_channel); - - self.pending.extend(pending.into_iter()); + self.pending.extend(requests_in_channel); } /// Yields Next notification or outgoing request and periodically pings @@ -227,7 +225,7 @@ impl EventLoop { &mut self.pending, &self.requests_rx, self.mqtt_options.pending_throttle - ), if self.pending.len() > 0 || (!inflight_full && !collision) => match o { + ), if !self.pending.is_empty() || (!inflight_full && !collision) => match o { Ok(request) => { self.state.handle_outgoing_packet(request)?; match time::timeout(network_timeout, network.flush(&mut self.state.write)).await { From 44a907c17f9bf3de92203c49536f1df5f2f26fee Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sun, 31 Dec 2023 20:01:42 +0530 Subject: [PATCH 4/8] doc: note unchecked growth of pending --- rumqttc/src/eventloop.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rumqttc/src/eventloop.rs b/rumqttc/src/eventloop.rs index 174628764..7fdce3572 100644 --- a/rumqttc/src/eventloop.rs +++ b/rumqttc/src/eventloop.rs @@ -128,7 +128,8 @@ impl EventLoop { self.pending.extend(self.state.clean()); // drain requests from channel which weren't yet received - // this helps in preventing data loss + // NOTE: While this helps in preventing data loss, it could + // lead to a growing pending list if not managed properly. let requests_in_channel = self.requests_rx.drain(); self.pending.extend(requests_in_channel); } From a342e8d5700044391e410b57ef8e6b344dd85a6e Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sun, 31 Dec 2023 23:40:27 +0530 Subject: [PATCH 5/8] fix: `VecDeque` preserves ordering Also reverts 8406c4ceacb3c6c3ce0310636c65dee6f55dd065 --- rumqttc/src/eventloop.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/rumqttc/src/eventloop.rs b/rumqttc/src/eventloop.rs index 7fdce3572..5ac2d3f6e 100644 --- a/rumqttc/src/eventloop.rs +++ b/rumqttc/src/eventloop.rs @@ -9,6 +9,7 @@ use tokio::net::{lookup_host, TcpSocket, TcpStream}; use tokio::select; use tokio::time::{self, Instant, Sleep}; +use std::collections::VecDeque; use std::io; use std::net::SocketAddr; use std::pin::Pin; @@ -78,7 +79,7 @@ pub struct EventLoop { /// Requests handle to send requests pub(crate) requests_tx: Sender, /// Pending packets from last session - pub pending: Vec, + pub pending: VecDeque, /// Network connection to the broker network: Option, /// Keep alive time @@ -100,7 +101,7 @@ impl EventLoop { /// access and update `options`, `state` and `requests`. pub fn new(mqtt_options: MqttOptions, cap: usize) -> EventLoop { let (requests_tx, requests_rx) = bounded(cap); - let pending = Vec::new(); + let pending = VecDeque::new(); let max_inflight = mqtt_options.inflight; let manual_acks = mqtt_options.manual_acks; let max_outgoing_packet_size = mqtt_options.max_outgoing_packet_size; @@ -264,15 +265,15 @@ impl EventLoop { } async fn next_request( - pending: &mut Vec, + pending: &mut VecDeque, rx: &Receiver, pending_throttle: Duration, ) -> Result { - if let Some(req) = pending.pop() { + if !pending.is_empty() { time::sleep(pending_throttle).await; - // We must call .next() AFTER sleep() otherwise .next() would - // advance the iterator but the future might be canceled before return - Ok(req) + // We must call .pop_front() AFTER sleep() otherwise we would have + // advanced the iterator but the future might be canceled before return + Ok(pending.pop_front().unwrap()) } else { match rx.recv_async().await { Ok(r) => Ok(r), From b7282464ca59e4c2cbdcde243ba2b4d9fd384240 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 4 Jan 2024 18:31:39 +0530 Subject: [PATCH 6/8] doc: note for `EventLoop::clean` --- rumqttc/src/eventloop.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rumqttc/src/eventloop.rs b/rumqttc/src/eventloop.rs index 5ac2d3f6e..a8aee76c7 100644 --- a/rumqttc/src/eventloop.rs +++ b/rumqttc/src/eventloop.rs @@ -122,15 +122,15 @@ impl EventLoop { /// republished in the next session. Move pending messages from state to eventloop, drops the /// underlying network connection and clears the keepalive timeout if any. /// - /// NOTE: Use only when EventLoop is blocked on network and unable to immediately handle disconnect + /// > NOTE: Use only when EventLoop is blocked on network and unable to immediately handle disconnect. + /// > Also, while this helps prevent data loss, the pending list length should be managed properly. + /// > For this reason we recommend setting [`AsycClient`](crate::AsyncClient)'s channel capacity to `0`. pub fn clean(&mut self) { self.network = None; self.keepalive_timeout = None; self.pending.extend(self.state.clean()); // drain requests from channel which weren't yet received - // NOTE: While this helps in preventing data loss, it could - // lead to a growing pending list if not managed properly. let requests_in_channel = self.requests_rx.drain(); self.pending.extend(requests_in_channel); } From 84e6c04609d8fb6ac80fe7ccb61dab6eff5a4685 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 4 Jan 2024 18:34:55 +0530 Subject: [PATCH 7/8] fix: same bug in `mod v5` --- rumqttc/src/v5/eventloop.rs | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/rumqttc/src/v5/eventloop.rs b/rumqttc/src/v5/eventloop.rs index cab24c2c4..36c10971d 100644 --- a/rumqttc/src/v5/eventloop.rs +++ b/rumqttc/src/v5/eventloop.rs @@ -8,11 +8,11 @@ use flume::{bounded, Receiver, Sender}; use tokio::select; use tokio::time::{self, error::Elapsed, Instant, Sleep}; +use std::collections::VecDeque; use std::convert::TryInto; use std::io; use std::pin::Pin; use std::time::Duration; -use std::vec::IntoIter; use super::mqttbytes::v5::ConnectReturnCode; @@ -78,7 +78,7 @@ pub struct EventLoop { /// Requests handle to send requests pub(crate) requests_tx: Sender, /// Pending packets from last session - pub pending: IntoIter, + pub pending: VecDeque, /// Network connection to the broker network: Option, /// Keep alive time @@ -99,8 +99,7 @@ impl EventLoop { /// access and update `options`, `state` and `requests`. pub fn new(options: MqttOptions, cap: usize) -> EventLoop { let (requests_tx, requests_rx) = bounded(cap); - let pending = Vec::new(); - let pending = pending.into_iter(); + let pending = VecDeque::new(); let inflight_limit = options.outgoing_inflight_upper_limit.unwrap_or(u16::MAX); let manual_acks = options.manual_acks; @@ -119,18 +118,17 @@ impl EventLoop { /// republished in the next session. Move pending messages from state to eventloop, drops the /// underlying network connection and clears the keepalive timeout if any. /// - /// NOTE: Use only when EventLoop is blocked on network and unable to immediately handle disconnect + /// > NOTE: Use only when EventLoop is blocked on network and unable to immediately handle disconnect. + /// > Also, while this helps prevent data loss, the pending list length should be managed properly. + /// > For this reason we recommend setting [`AsycClient`](super::AsyncClient)'s channel capacity to `0`. pub fn clean(&mut self) { self.network = None; self.keepalive_timeout = None; - let mut pending = self.state.clean(); + self.pending.extend(self.state.clean()); // drain requests from channel which weren't yet received - // this helps in preventing data loss let requests_in_channel = self.requests_rx.drain(); - pending.extend(requests_in_channel); - - self.pending = pending.into_iter(); + self.pending.extend(requests_in_channel); } /// Yields Next notification or outgoing request and periodically pings @@ -210,7 +208,7 @@ impl EventLoop { &mut self.pending, &self.requests_rx, self.options.pending_throttle - ), if self.pending.len() > 0 || (!inflight_full && !collision) => match o { + ), if !self.pending.is_empty() || (!inflight_full && !collision) => match o { Ok(request) => { self.state.handle_outgoing_packet(request)?; network.flush(&mut self.state.write).await?; @@ -239,15 +237,15 @@ impl EventLoop { } async fn next_request( - pending: &mut IntoIter, + pending: &mut VecDeque, rx: &Receiver, pending_throttle: Duration, ) -> Result { - if pending.len() > 0 { + if !pending.is_empty() { time::sleep(pending_throttle).await; // We must call .next() AFTER sleep() otherwise .next() would // advance the iterator but the future might be canceled before return - Ok(pending.next().unwrap()) + Ok(pending.pop_front().unwrap()) } else { match rx.recv_async().await { Ok(r) => Ok(r), From 7e9443f988f7eb1b013738d9b54517e0726ae64c Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 4 Jan 2024 18:37:39 +0530 Subject: [PATCH 8/8] doc: add changelog entry --- rumqttc/CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rumqttc/CHANGELOG.md b/rumqttc/CHANGELOG.md index 4355e6d22..54bb4ac57 100644 --- a/rumqttc/CHANGELOG.md +++ b/rumqttc/CHANGELOG.md @@ -19,7 +19,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Removed the `Key` enum: users do not need to specify the TLS key variant in the `TlsConfiguration` anymore, this is inferred automatically. To update your code simply remove `Key::ECC()` or `Key::RSA()` from the initialization. - certificate for client authentication is now optional while using native-tls. `der` & `password` fields are replaced by `client_auth`. -- Make v5 `RetainForwardRule` public, in order to allow setting it when constructing `Filter` values. +- Make v5 `RetainForwardRule` public, in order to allow setting it when constructing `Filter` values. +- Use `VecDeque` instead of `IntoIter` to fix unintentional drop of pending requests on `EventLoop::clean` (#780) ### Deprecated