From eec218f95e7052accbf60dec36b3051de40253ba Mon Sep 17 00:00:00 2001 From: CQ Xiao Date: Mon, 13 May 2024 09:21:37 +0800 Subject: [PATCH 1/9] Check if session present to restore pending publishes. --- rumqttc/src/v5/eventloop.rs | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/rumqttc/src/v5/eventloop.rs b/rumqttc/src/v5/eventloop.rs index a59094807..d3b643f30 100644 --- a/rumqttc/src/v5/eventloop.rs +++ b/rumqttc/src/v5/eventloop.rs @@ -138,7 +138,7 @@ impl EventLoop { if self.network.is_none() { let (network, connack) = time::timeout( Duration::from_secs(self.options.connection_timeout()), - connect(&mut self.options), + connect(&mut self.pending, &mut self.options), ) .await??; self.network = Some(network); @@ -263,15 +263,15 @@ impl EventLoop { /// the stream. /// This function (for convenience) includes internal delays for users to perform internal sleeps /// between re-connections so that cancel semantics can be used during this sleep -async fn connect(options: &mut MqttOptions) -> Result<(Network, Incoming), ConnectionError> { +async fn connect(pending: &mut VecDeque, options: &mut MqttOptions) -> Result<(Network, Incoming), ConnectionError> { // connect to the broker let mut network = network_connect(options).await?; // make MQTT connection request (which internally awaits for ack) - let packet = mqtt_connect(options, &mut network).await?; + let packet = mqtt_connect(pending, options, &mut network).await?; // Last session might contain packets which aren't acked. MQTT says these packets should be - // republished in the next session + // republished in the next session if session is resumed // move pending messages from state to eventloop // let pending = self.state.clean(); // self.pending = pending.into_iter(); @@ -385,6 +385,7 @@ async fn network_connect(options: &MqttOptions) -> Result, options: &mut MqttOptions, network: &mut Network, ) -> Result { @@ -406,12 +407,22 @@ async fn mqtt_connect( // validate connack match network.read().await? { Incoming::ConnAck(connack) if connack.code == ConnectReturnCode::Success => { - // Override local keep_alive value if set by server. + // Check if it's a new session + if !connack.session_present { + // If it's a new session, clear the pendings + pending.clear(); + } + // Override local settings if set by server. if let Some(props) = &connack.properties { + // Override local keep_alive value if set by server. if let Some(keep_alive) = props.server_keep_alive { options.keep_alive = Duration::from_secs(keep_alive as u64); } network.set_max_outgoing_size(props.max_packet_size); + // Override local session_expiry_interval value if set by server. + if (props.session_expiry_interval).is_some() { + options.set_session_expiry_interval(props.session_expiry_interval); + } } Ok(Packet::ConnAck(connack)) } From deac0f66ab753d5ee0cdcbb68b99207cf137eb8d Mon Sep 17 00:00:00 2001 From: CQ Xiao Date: Tue, 14 May 2024 14:34:57 +0800 Subject: [PATCH 2/9] Modify changelog. --- rumqttc/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/rumqttc/CHANGELOG.md b/rumqttc/CHANGELOG.md index d85c89032..5d3c9f08a 100644 --- a/rumqttc/CHANGELOG.md +++ b/rumqttc/CHANGELOG.md @@ -28,6 +28,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 * Validate filters while creating subscription requests. * Make v4::Connect::write return correct value * Ordering of `State.events` related to `QoS > 0` publishes +* Resume v5 session only if broker CONNACK with Session Present 1. ### Security From d68a3ba5447c41f5a984c1ab3571af206d33dd9b Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 14 May 2024 10:28:21 +0000 Subject: [PATCH 3/9] remove changes that don't seem to be related --- rumqttc/src/v5/eventloop.rs | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/rumqttc/src/v5/eventloop.rs b/rumqttc/src/v5/eventloop.rs index d3b643f30..6fa5cb458 100644 --- a/rumqttc/src/v5/eventloop.rs +++ b/rumqttc/src/v5/eventloop.rs @@ -412,19 +412,13 @@ async fn mqtt_connect( // If it's a new session, clear the pendings pending.clear(); } - // Override local settings if set by server. if let Some(props) = &connack.properties { - // Override local keep_alive value if set by server. if let Some(keep_alive) = props.server_keep_alive { options.keep_alive = Duration::from_secs(keep_alive as u64); } network.set_max_outgoing_size(props.max_packet_size); - // Override local session_expiry_interval value if set by server. - if (props.session_expiry_interval).is_some() { - options.set_session_expiry_interval(props.session_expiry_interval); - } } - Ok(Packet::ConnAck(connack)) + Ok(connack) } Incoming::ConnAck(connack) => Err(ConnectionError::ConnectionRefused(connack.code)), packet => Err(ConnectionError::NotConnAck(Box::new(packet))), From 1387d5e07d717a9d3d1407ec3e5275de679ab743 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 14 May 2024 10:32:43 +0000 Subject: [PATCH 4/9] refactor: improve readability --- rumqttc/src/v5/eventloop.rs | 32 ++++++++++++++------------------ 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/rumqttc/src/v5/eventloop.rs b/rumqttc/src/v5/eventloop.rs index 6fa5cb458..9c3e59222 100644 --- a/rumqttc/src/v5/eventloop.rs +++ b/rumqttc/src/v5/eventloop.rs @@ -138,21 +138,28 @@ impl EventLoop { if self.network.is_none() { let (network, connack) = time::timeout( Duration::from_secs(self.options.connection_timeout()), - connect(&mut self.pending, &mut self.options), + connect(&mut self.options), ) .await??; + // Last session might contain packets which aren't acked. If it's a new session, clear the pending packets. + if !connack.session_present { + self.pending.clear(); + } self.network = Some(network); if self.keepalive_timeout.is_none() { self.keepalive_timeout = Some(Box::pin(time::sleep(self.options.keep_alive))); } - self.state.handle_incoming_packet(connack)?; + self.state + .handle_incoming_packet(Incoming::ConnAck(connack))?; } match self.select().await { Ok(v) => Ok(v), Err(e) => { + // MQTT requires that packets pending acknowledgement should be republished on session resume. + // Move pending messages from state to eventloop. self.clean(); Err(e) } @@ -263,19 +270,14 @@ impl EventLoop { /// the stream. /// This function (for convenience) includes internal delays for users to perform internal sleeps /// between re-connections so that cancel semantics can be used during this sleep -async fn connect(pending: &mut VecDeque, options: &mut MqttOptions) -> Result<(Network, Incoming), ConnectionError> { +async fn connect(options: &mut MqttOptions) -> Result<(Network, ConnAck), ConnectionError> { // connect to the broker let mut network = network_connect(options).await?; // make MQTT connection request (which internally awaits for ack) - let packet = mqtt_connect(pending, options, &mut network).await?; - - // Last session might contain packets which aren't acked. MQTT says these packets should be - // republished in the next session if session is resumed - // move pending messages from state to eventloop - // let pending = self.state.clean(); - // self.pending = pending.into_iter(); - Ok((network, packet)) + let connack = mqtt_connect(options, &mut network).await?; + + Ok((network, connack)) } async fn network_connect(options: &MqttOptions) -> Result { @@ -385,10 +387,9 @@ async fn network_connect(options: &MqttOptions) -> Result, options: &mut MqttOptions, network: &mut Network, -) -> Result { +) -> Result { let keep_alive = options.keep_alive().as_secs() as u16; let clean_start = options.clean_start(); let client_id = options.client_id(); @@ -407,11 +408,6 @@ async fn mqtt_connect( // validate connack match network.read().await? { Incoming::ConnAck(connack) if connack.code == ConnectReturnCode::Success => { - // Check if it's a new session - if !connack.session_present { - // If it's a new session, clear the pendings - pending.clear(); - } if let Some(props) = &connack.properties { if let Some(keep_alive) = props.server_keep_alive { options.keep_alive = Duration::from_secs(keep_alive as u64); From 2396c849c53ec07015d6cd1690ab51293904cb47 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 14 May 2024 10:41:53 +0000 Subject: [PATCH 5/9] feat: apply changes to v4 --- rumqttc/CHANGELOG.md | 2 +- rumqttc/src/eventloop.rs | 20 ++++++++++++-------- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/rumqttc/CHANGELOG.md b/rumqttc/CHANGELOG.md index 5d3c9f08a..d2a59f178 100644 --- a/rumqttc/CHANGELOG.md +++ b/rumqttc/CHANGELOG.md @@ -28,7 +28,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 * Validate filters while creating subscription requests. * Make v4::Connect::write return correct value * Ordering of `State.events` related to `QoS > 0` publishes -* Resume v5 session only if broker CONNACK with Session Present 1. +* Resume session only if broker sends `CONNACK` with `session_present == 1`. ### Security diff --git a/rumqttc/src/eventloop.rs b/rumqttc/src/eventloop.rs index a9b1ce8c5..25081a179 100644 --- a/rumqttc/src/eventloop.rs +++ b/rumqttc/src/eventloop.rs @@ -149,18 +149,24 @@ impl EventLoop { Ok(inner) => inner?, Err(_) => return Err(ConnectionError::NetworkTimeout), }; + // Last session might contain packets which aren't acked. If it's a new session, clear the pending packets. + if !connack.session_present { + self.pending.clear(); + } self.network = Some(network); if self.keepalive_timeout.is_none() && !self.mqtt_options.keep_alive.is_zero() { self.keepalive_timeout = Some(Box::pin(time::sleep(self.mqtt_options.keep_alive))); } - return Ok(Event::Incoming(connack)); + return Ok(Event::Incoming(Packet::ConnAck(connack))); } match self.select().await { Ok(v) => Ok(v), Err(e) => { + // MQTT requires that packets pending acknowledgement should be republished on session resume. + // Move pending messages from state to eventloop. self.clean(); Err(e) } @@ -294,14 +300,14 @@ impl EventLoop { async fn connect( mqtt_options: &MqttOptions, network_options: NetworkOptions, -) -> Result<(Network, Incoming), ConnectionError> { +) -> Result<(Network, ConnAck), ConnectionError> { // connect to the broker let mut network = network_connect(mqtt_options, network_options).await?; // make MQTT connection request (which internally awaits for ack) - let packet = mqtt_connect(mqtt_options, &mut network).await?; + let connack = mqtt_connect(mqtt_options, &mut network).await?; - Ok((network, packet)) + Ok((network, connack)) } pub(crate) async fn socket_connect( @@ -469,7 +475,7 @@ async fn network_connect( async fn mqtt_connect( options: &MqttOptions, network: &mut Network, -) -> Result { +) -> Result { let keep_alive = options.keep_alive().as_secs() as u16; let clean_session = options.clean_session(); let last_will = options.last_will(); @@ -485,9 +491,7 @@ async fn mqtt_connect( // validate connack match network.read().await? { - Incoming::ConnAck(connack) if connack.code == ConnectReturnCode::Success => { - Ok(Packet::ConnAck(connack)) - } + Incoming::ConnAck(connack) if connack.code == ConnectReturnCode::Success => Ok(connack), Incoming::ConnAck(connack) => Err(ConnectionError::ConnectionRefused(connack.code)), packet => Err(ConnectionError::NotConnAck(packet)), } From 35e4cc4916aea6fecddf82b58d12a58f9ed41334 Mon Sep 17 00:00:00 2001 From: CQ Xiao Date: Wed, 15 May 2024 13:58:09 +0800 Subject: [PATCH 6/9] Remove session_expiry_interval related code. --- rumqttc/src/v5/eventloop.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/rumqttc/src/v5/eventloop.rs b/rumqttc/src/v5/eventloop.rs index d3b643f30..d289e52f6 100644 --- a/rumqttc/src/v5/eventloop.rs +++ b/rumqttc/src/v5/eventloop.rs @@ -419,10 +419,6 @@ async fn mqtt_connect( options.keep_alive = Duration::from_secs(keep_alive as u64); } network.set_max_outgoing_size(props.max_packet_size); - // Override local session_expiry_interval value if set by server. - if (props.session_expiry_interval).is_some() { - options.set_session_expiry_interval(props.session_expiry_interval); - } } Ok(Packet::ConnAck(connack)) } From 13e3d0b20f08f1b8dc3776aa265058fcc8d65566 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 20 May 2024 14:09:21 +0530 Subject: [PATCH 7/9] test: set clean session --- rumqttc/tests/reliability.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rumqttc/tests/reliability.rs b/rumqttc/tests/reliability.rs index 0a83d57ce..e72ccf193 100644 --- a/rumqttc/tests/reliability.rs +++ b/rumqttc/tests/reliability.rs @@ -514,7 +514,9 @@ async fn reconnection_resumes_from_the_previous_state() { #[tokio::test] async fn reconnection_resends_unacked_packets_from_the_previous_connection_first() { let mut options = MqttOptions::new("dummy", "127.0.0.1", 3002); - options.set_keep_alive(Duration::from_secs(5)); + options + .set_keep_alive(Duration::from_secs(5)) + .set_clean_session(false); // start sending qos0 publishes. this makes sure that there is // outgoing activity but no incoming activity From 8eca150a411d3330bb891933bf4c29b5e149bfbd Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 20 May 2024 14:28:42 +0530 Subject: [PATCH 8/9] test: broker saved session --- rumqttc/tests/broker.rs | 9 ++++++--- rumqttc/tests/reliability.rs | 30 +++++++++++++++--------------- 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/rumqttc/tests/broker.rs b/rumqttc/tests/broker.rs index ea66448f5..760a2ab37 100644 --- a/rumqttc/tests/broker.rs +++ b/rumqttc/tests/broker.rs @@ -21,7 +21,7 @@ pub struct Broker { impl Broker { /// Create a new broker which accepts 1 mqtt connection - pub async fn new(port: u16, connack: u8) -> Broker { + pub async fn new(port: u16, connack: u8, session_saved: bool) -> Broker { let addr = format!("127.0.0.1:{port}"); let listener = TcpListener::bind(&addr).await.unwrap(); @@ -32,9 +32,12 @@ impl Broker { framed.readb(&mut incoming).await.unwrap(); match incoming.pop_front().unwrap() { - Packet::Connect(_) => { + Packet::Connect(connect) => { let connack = match connack { - 0 => ConnAck::new(ConnectReturnCode::Success, false), + 0 => ConnAck::new( + ConnectReturnCode::Success, + !connect.clean_session && session_saved, + ), 1 => ConnAck::new(ConnectReturnCode::BadUserNamePassword, false), _ => { return Broker { diff --git a/rumqttc/tests/reliability.rs b/rumqttc/tests/reliability.rs index e72ccf193..7ca5ff000 100644 --- a/rumqttc/tests/reliability.rs +++ b/rumqttc/tests/reliability.rs @@ -72,7 +72,7 @@ async fn _tick( #[tokio::test] async fn connection_should_timeout_on_time() { task::spawn(async move { - let _broker = Broker::new(1880, 3).await; + let _broker = Broker::new(1880, 3, false).await; time::sleep(Duration::from_secs(10)).await; }); @@ -125,7 +125,7 @@ async fn idle_connection_triggers_pings_on_time() { run(&mut eventloop, false).await.unwrap(); }); - let mut broker = Broker::new(1885, 0).await; + let mut broker = Broker::new(1885, 0, false).await; let mut count = 0; let mut start = Instant::now(); @@ -169,7 +169,7 @@ async fn some_outgoing_and_no_incoming_should_trigger_pings_on_time() { run(&mut eventloop, false).await.unwrap(); }); - let mut broker = Broker::new(1886, 0).await; + let mut broker = Broker::new(1886, 0, false).await; let mut count = 0; let mut start = Instant::now(); @@ -204,7 +204,7 @@ async fn some_incoming_and_no_outgoing_should_trigger_pings_on_time() { run(&mut eventloop, false).await.unwrap(); }); - let mut broker = Broker::new(2000, 0).await; + let mut broker = Broker::new(2000, 0, false).await; let mut count = 0; // Start sending qos 0 publishes to the client. This triggers @@ -238,7 +238,7 @@ async fn detects_halfopen_connections_in_the_second_ping_request() { // A broker which consumes packets but doesn't reply task::spawn(async move { - let mut broker = Broker::new(2001, 0).await; + let mut broker = Broker::new(2001, 0, false).await; broker.blackhole().await; }); @@ -279,7 +279,7 @@ async fn requests_are_blocked_after_max_inflight_queue_size() { run(&mut eventloop, false).await.unwrap(); }); - let mut broker = Broker::new(1887, 0).await; + let mut broker = Broker::new(1887, 0, false).await; for i in 1..=10 { let packet = broker.read_publish().await; @@ -306,7 +306,7 @@ async fn requests_are_recovered_after_inflight_queue_size_falls_below_max() { run(&mut eventloop, true).await.unwrap(); }); - let mut broker = Broker::new(1888, 0).await; + let mut broker = Broker::new(1888, 0, false).await; // packet 1, 2, and 3 assert!(broker.read_publish().await.is_some()); @@ -341,7 +341,7 @@ async fn packet_id_collisions_are_detected_and_flow_control_is_applied() { }); task::spawn(async move { - let mut broker = Broker::new(1891, 0).await; + let mut broker = Broker::new(1891, 0, false).await; // read all incoming packets first for i in 1..=4 { @@ -449,8 +449,8 @@ async fn next_poll_after_connect_failure_reconnects() { let options = MqttOptions::new("dummy", "127.0.0.1", 3000); task::spawn(async move { - let _broker = Broker::new(3000, 1).await; - let _broker = Broker::new(3000, 0).await; + let _broker = Broker::new(3000, 1, false).await; + let _broker = Broker::new(3000, 0, false).await; time::sleep(Duration::from_secs(15)).await; }); @@ -489,7 +489,7 @@ async fn reconnection_resumes_from_the_previous_state() { }); // broker connection 1 - let mut broker = Broker::new(3001, 0).await; + let mut broker = Broker::new(3001, 0, false).await; for i in 1..=2 { let packet = broker.read_publish().await.unwrap(); assert_eq!(i, packet.payload[0]); @@ -503,7 +503,7 @@ async fn reconnection_resumes_from_the_previous_state() { // a block around broker with {} is closing the connection as expected // broker connection 2 - let mut broker = Broker::new(3001, 0).await; + let mut broker = Broker::new(3001, 0, false).await; for i in 3..=4 { let packet = broker.read_publish().await.unwrap(); assert_eq!(i, packet.payload[0]); @@ -532,14 +532,14 @@ async fn reconnection_resends_unacked_packets_from_the_previous_connection_first }); // broker connection 1. receive but don't ack - let mut broker = Broker::new(3002, 0).await; + let mut broker = Broker::new(3002, 0, false).await; for i in 1..=2 { let packet = broker.read_publish().await.unwrap(); assert_eq!(i, packet.payload[0]); } // broker connection 2 receives from scratch - let mut broker = Broker::new(3002, 0).await; + let mut broker = Broker::new(3002, 0, true).await; for i in 1..=6 { let packet = broker.read_publish().await.unwrap(); assert_eq!(i, packet.payload[0]); @@ -561,7 +561,7 @@ async fn state_is_being_cleaned_properly_and_pending_request_calculated_properly }); task::spawn(async move { - let mut broker = Broker::new(3004, 0).await; + let mut broker = Broker::new(3004, 0, false).await; while (broker.read_packet().await).is_some() { time::sleep(Duration::from_secs_f64(0.5)).await; } From 3ee273c99ec9e3d06b69815a1cd6c62bb9d0a13b Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 20 May 2024 14:30:34 +0530 Subject: [PATCH 9/9] test: fix resume reconnect --- rumqttc/tests/reliability.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/rumqttc/tests/reliability.rs b/rumqttc/tests/reliability.rs index 7ca5ff000..633ca4706 100644 --- a/rumqttc/tests/reliability.rs +++ b/rumqttc/tests/reliability.rs @@ -474,7 +474,9 @@ async fn next_poll_after_connect_failure_reconnects() { #[tokio::test] async fn reconnection_resumes_from_the_previous_state() { let mut options = MqttOptions::new("dummy", "127.0.0.1", 3001); - options.set_keep_alive(Duration::from_secs(5)); + options + .set_keep_alive(Duration::from_secs(5)) + .set_clean_session(false); // start sending qos0 publishes. Makes sure that there is out activity but no in activity let (client, mut eventloop) = AsyncClient::new(options, 5); @@ -503,7 +505,7 @@ async fn reconnection_resumes_from_the_previous_state() { // a block around broker with {} is closing the connection as expected // broker connection 2 - let mut broker = Broker::new(3001, 0, false).await; + let mut broker = Broker::new(3001, 0, true).await; for i in 3..=4 { let packet = broker.read_publish().await.unwrap(); assert_eq!(i, packet.payload[0]);