Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ensure pending packets are not lost on EventLoop::clean #780

Merged
merged 8 commits into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion rumqttc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Removed the `Key` enum: users do not need to specify the TLS key variant in the `TlsConfiguration` anymore, this is inferred automatically.
To update your code simply remove `Key::ECC()` or `Key::RSA()` from the initialization.
- certificate for client authentication is now optional while using native-tls. `der` & `password` fields are replaced by `client_auth`.
- Make v5 `RetainForwardRule` public, in order to allow setting it when constructing `Filter` values.
- Make v5 `RetainForwardRule` public, in order to allow setting it when constructing `Filter` values.
- Use `VecDeque` instead of `IntoIter` to fix unintentional drop of pending requests on `EventLoop::clean` (#780)

### Deprecated

Expand Down
30 changes: 14 additions & 16 deletions rumqttc/src/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ use tokio::net::{lookup_host, TcpSocket, TcpStream};
use tokio::select;
use tokio::time::{self, Instant, Sleep};

use std::collections::VecDeque;
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::time::Duration;
use std::vec::IntoIter;

#[cfg(unix)]
use {std::path::Path, tokio::net::UnixStream};
Expand Down Expand Up @@ -79,7 +79,7 @@ pub struct EventLoop {
/// Requests handle to send requests
pub(crate) requests_tx: Sender<Request>,
/// Pending packets from last session
pub pending: IntoIter<Request>,
pub pending: VecDeque<Request>,
/// Network connection to the broker
network: Option<Network>,
/// Keep alive time
Expand All @@ -101,8 +101,7 @@ impl EventLoop {
/// access and update `options`, `state` and `requests`.
pub fn new(mqtt_options: MqttOptions, cap: usize) -> EventLoop {
let (requests_tx, requests_rx) = bounded(cap);
let pending = Vec::new();
let pending = pending.into_iter();
let pending = VecDeque::new();
let max_inflight = mqtt_options.inflight;
let manual_acks = mqtt_options.manual_acks;
let max_outgoing_packet_size = mqtt_options.max_outgoing_packet_size;
Expand All @@ -123,18 +122,17 @@ impl EventLoop {
/// republished in the next session. Move pending messages from state to eventloop, drops the
/// underlying network connection and clears the keepalive timeout if any.
///
/// NOTE: Use only when EventLoop is blocked on network and unable to immediately handle disconnect
/// > NOTE: Use only when EventLoop is blocked on network and unable to immediately handle disconnect.
/// > Also, while this helps prevent data loss, the pending list length should be managed properly.
/// > For this reason we recommend setting [`AsycClient`](crate::AsyncClient)'s channel capacity to `0`.
pub fn clean(&mut self) {
self.network = None;
self.keepalive_timeout = None;
let mut pending = self.state.clean();
self.pending.extend(self.state.clean());

// drain requests from channel which weren't yet received
// this helps in preventing data loss
let requests_in_channel = self.requests_rx.drain();
pending.extend(requests_in_channel);

self.pending = pending.into_iter();
self.pending.extend(requests_in_channel);
}

/// Yields Next notification or outgoing request and periodically pings
Expand Down Expand Up @@ -229,7 +227,7 @@ impl EventLoop {
&mut self.pending,
&self.requests_rx,
self.mqtt_options.pending_throttle
), if self.pending.len() > 0 || (!inflight_full && !collision) => match o {
), if !self.pending.is_empty() || (!inflight_full && !collision) => match o {
Ok(request) => {
self.state.handle_outgoing_packet(request)?;
match time::timeout(network_timeout, network.flush(&mut self.state.write)).await {
Expand Down Expand Up @@ -267,15 +265,15 @@ impl EventLoop {
}

async fn next_request(
pending: &mut IntoIter<Request>,
pending: &mut VecDeque<Request>,
rx: &Receiver<Request>,
pending_throttle: Duration,
) -> Result<Request, ConnectionError> {
if pending.len() > 0 {
if !pending.is_empty() {
time::sleep(pending_throttle).await;
// We must call .next() AFTER sleep() otherwise .next() would
// advance the iterator but the future might be canceled before return
Ok(pending.next().unwrap())
// We must call .pop_front() AFTER sleep() otherwise we would have
// advanced the iterator but the future might be canceled before return
Ok(pending.pop_front().unwrap())
} else {
match rx.recv_async().await {
Ok(r) => Ok(r),
Expand Down
26 changes: 12 additions & 14 deletions rumqttc/src/v5/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ use flume::{bounded, Receiver, Sender};
use tokio::select;
use tokio::time::{self, error::Elapsed, Instant, Sleep};

use std::collections::VecDeque;
use std::convert::TryInto;
use std::io;
use std::pin::Pin;
use std::time::Duration;
use std::vec::IntoIter;

use super::mqttbytes::v5::ConnectReturnCode;

Expand Down Expand Up @@ -78,7 +78,7 @@ pub struct EventLoop {
/// Requests handle to send requests
pub(crate) requests_tx: Sender<Request>,
/// Pending packets from last session
pub pending: IntoIter<Request>,
pub pending: VecDeque<Request>,
/// Network connection to the broker
network: Option<Network>,
/// Keep alive time
Expand All @@ -99,8 +99,7 @@ impl EventLoop {
/// access and update `options`, `state` and `requests`.
pub fn new(options: MqttOptions, cap: usize) -> EventLoop {
let (requests_tx, requests_rx) = bounded(cap);
let pending = Vec::new();
let pending = pending.into_iter();
let pending = VecDeque::new();
let inflight_limit = options.outgoing_inflight_upper_limit.unwrap_or(u16::MAX);
let manual_acks = options.manual_acks;

Expand All @@ -119,18 +118,17 @@ impl EventLoop {
/// republished in the next session. Move pending messages from state to eventloop, drops the
/// underlying network connection and clears the keepalive timeout if any.
///
/// NOTE: Use only when EventLoop is blocked on network and unable to immediately handle disconnect
/// > NOTE: Use only when EventLoop is blocked on network and unable to immediately handle disconnect.
/// > Also, while this helps prevent data loss, the pending list length should be managed properly.
/// > For this reason we recommend setting [`AsycClient`](super::AsyncClient)'s channel capacity to `0`.
pub fn clean(&mut self) {
self.network = None;
self.keepalive_timeout = None;
let mut pending = self.state.clean();
self.pending.extend(self.state.clean());

// drain requests from channel which weren't yet received
// this helps in preventing data loss
let requests_in_channel = self.requests_rx.drain();
pending.extend(requests_in_channel);

self.pending = pending.into_iter();
self.pending.extend(requests_in_channel);
}

/// Yields Next notification or outgoing request and periodically pings
Expand Down Expand Up @@ -210,7 +208,7 @@ impl EventLoop {
&mut self.pending,
&self.requests_rx,
self.options.pending_throttle
), if self.pending.len() > 0 || (!inflight_full && !collision) => match o {
), if !self.pending.is_empty() || (!inflight_full && !collision) => match o {
Ok(request) => {
self.state.handle_outgoing_packet(request)?;
network.flush(&mut self.state.write).await?;
Expand Down Expand Up @@ -239,15 +237,15 @@ impl EventLoop {
}

async fn next_request(
pending: &mut IntoIter<Request>,
pending: &mut VecDeque<Request>,
rx: &Receiver<Request>,
pending_throttle: Duration,
) -> Result<Request, ConnectionError> {
if pending.len() > 0 {
if !pending.is_empty() {
time::sleep(pending_throttle).await;
// We must call .next() AFTER sleep() otherwise .next() would
// advance the iterator but the future might be canceled before return
Ok(pending.next().unwrap())
Ok(pending.pop_front().unwrap())
} else {
match rx.recv_async().await {
Ok(r) => Ok(r),
Expand Down