diff --git a/rumqttc/CHANGELOG.md b/rumqttc/CHANGELOG.md index c8c8716a3..6a94b5112 100644 --- a/rumqttc/CHANGELOG.md +++ b/rumqttc/CHANGELOG.md @@ -9,12 +9,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -* `size()` method on `Packet` calculates size once serialized. -* `read()` and `write()` methods on `Packet`. -* `ConnectionAborted` variant on `StateError` type to denote abrupt end to a connection -* `set_session_expiry_interval` and `session_expiry_interval` methods on `MqttOptions`. -* `Auth` packet as per MQTT5 standards -* Allow configuring the `nodelay` property of underlying TCP client with the `tcp_nodelay` field in `NetworkOptions` +* `size()` method on `Packet` calculates size once serialized; +* `read()` and `write()` methods on `Packet`; +* `ConnectionAborted` variant on `StateError` type to denote abrupt end to a connection; +* `set_session_expiry_interval` and `session_expiry_interval` methods on `MqttOptions`; +* `Auth` packet as per MQTT5 standards; +* Allow configuring the `nodelay` property of underlying TCP client with the `tcp_nodelay` field in `NetworkOptions`; +* `disconnect_with_properties` and `try_disconnect_with_properties` methods on `Client` and `AsyncClient`, allowing disconnection from the broker with custom properties and reason. ### Changed @@ -23,7 +24,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 * use `Login` to store credentials * Made `DisconnectProperties` struct public. * Replace `Vec>` with `FixedBitSet` for managing packet ids of released QoS 2 publishes and incoming QoS 2 publishes in `MqttState`. -* Accept `native_tls::TlsConnector` as input for `Transport::tls_with_config`. +* Accept `native_tls::TlsConnector` as input for `Transport::tls_with_config`; +* Updated `Request::Disconnect` to include a Disconnect struct. ### Deprecated diff --git a/rumqttc/src/v5/client.rs b/rumqttc/src/v5/client.rs index 4913d1d0f..05c61a64e 100644 --- a/rumqttc/src/v5/client.rs +++ b/rumqttc/src/v5/client.rs @@ -7,7 +7,10 @@ use super::mqttbytes::v5::{ Unsubscribe, UnsubscribeProperties, }; use super::mqttbytes::QoS; -use super::{ConnectionError, Event, EventLoop, MqttOptions, Request}; +use super::{ + ConnectionError, Disconnect, DisconnectProperties, DisconnectReasonCode, Event, EventLoop, + MqttOptions, Request, +}; use crate::{valid_filter, valid_topic}; use bytes::Bytes; @@ -429,19 +432,68 @@ impl AsyncClient { self.handle_try_unsubscribe(topic, None) } - /// Sends a MQTT disconnect to the `EventLoop` + /// Sends a MQTT disconnect to the `EventLoop` with default DisconnectReasonCode::NormalDisconnection pub async fn disconnect(&self) -> Result<(), ClientError> { - let request = Request::Disconnect; + self.handle_disconnect(DisconnectReasonCode::NormalDisconnection, None) + .await + } + + /// Sends a MQTT disconnect to the `EventLoop` with properties + pub async fn disconnect_with_properties( + &self, + reason: DisconnectReasonCode, + properties: DisconnectProperties, + ) -> Result<(), ClientError> { + self.handle_disconnect(reason, Some(properties)).await + } + + // Handle disconnect interface which can have properties or not + async fn handle_disconnect( + &self, + reason: DisconnectReasonCode, + properties: Option, + ) -> Result<(), ClientError> { + let request = self.build_disconnect_request(reason, properties); self.request_tx.send_async(request).await?; Ok(()) } - /// Attempts to send a MQTT disconnect to the `EventLoop` + /// Attempts to send a MQTT disconnect to the `EventLoop` with default DisconnectReasonCode::NormalDisconnection pub fn try_disconnect(&self) -> Result<(), ClientError> { - let request = Request::Disconnect; + self.handle_try_disconnect(DisconnectReasonCode::NormalDisconnection, None) + } + + /// Sends a MQTT disconnect to the `EventLoop` with properties + pub fn try_disconnect_with_properties( + &self, + reason: DisconnectReasonCode, + properties: DisconnectProperties, + ) -> Result<(), ClientError> { + self.handle_try_disconnect(reason, Some(properties)) + } + + // Handle disconnect interface which can have properties or not + fn handle_try_disconnect( + &self, + reason: DisconnectReasonCode, + properties: Option, + ) -> Result<(), ClientError> { + let request = self.build_disconnect_request(reason, properties); self.request_tx.try_send(request)?; Ok(()) } + + // Helper function to build disconnect request + fn build_disconnect_request( + &self, + reason: DisconnectReasonCode, + properties: Option, + ) -> Request { + match properties { + Some(p) => Request::Disconnect(Disconnect::new_with_properties(reason, p)), + None => Request::Disconnect(Disconnect::new(reason)), + } + } } fn get_ack_req(publish: &Publish) -> Option { @@ -732,17 +784,42 @@ impl Client { self.client.try_unsubscribe(topic) } - /// Sends a MQTT disconnect to the `EventLoop` + /// Sends a MQTT disconnect to the `EventLoop` with default DisconnectReasonCode::NormalDisconnection pub fn disconnect(&self) -> Result<(), ClientError> { - let request = Request::Disconnect; + self.handle_disconnect(DisconnectReasonCode::NormalDisconnection, None) + } + + /// Sends a MQTT disconnect to the `EventLoop` with properties + pub fn disconnect_with_properties( + &self, + reason: DisconnectReasonCode, + properties: DisconnectProperties, + ) -> Result<(), ClientError> { + self.handle_disconnect(reason, Some(properties)) + } + + fn handle_disconnect( + &self, + reason: DisconnectReasonCode, + properties: Option, + ) -> Result<(), ClientError> { + let request = self.client.build_disconnect_request(reason, properties); self.client.request_tx.send(request)?; Ok(()) } - /// Sends a MQTT disconnect to the `EventLoop` + /// Try to send a MQTT disconnect to the `EventLoop` with default DisconnectReasonCode::NormalDisconnection pub fn try_disconnect(&self) -> Result<(), ClientError> { - self.client.try_disconnect()?; - Ok(()) + self.client.try_disconnect() + } + + /// Try to sends a MQTT disconnect to the `EventLoop` with properties + pub fn try_disconnect_with_properties( + &self, + reason: DisconnectReasonCode, + properties: DisconnectProperties, + ) -> Result<(), ClientError> { + self.client.handle_try_disconnect(reason, Some(properties)) } } diff --git a/rumqttc/src/v5/mod.rs b/rumqttc/src/v5/mod.rs index 6e0e43931..104805bc6 100644 --- a/rumqttc/src/v5/mod.rs +++ b/rumqttc/src/v5/mod.rs @@ -46,7 +46,7 @@ pub enum Request { SubAck(SubAck), Unsubscribe(Unsubscribe), UnsubAck(UnsubAck), - Disconnect, + Disconnect(Disconnect), } impl From for Request { diff --git a/rumqttc/src/v5/mqttbytes/v5/disconnect.rs b/rumqttc/src/v5/mqttbytes/v5/disconnect.rs index 098260c2a..9d89d1e81 100644 --- a/rumqttc/src/v5/mqttbytes/v5/disconnect.rs +++ b/rumqttc/src/v5/mqttbytes/v5/disconnect.rs @@ -259,6 +259,16 @@ impl Disconnect { } } + pub fn new_with_properties( + reason: DisconnectReasonCode, + properties: DisconnectProperties, + ) -> Self { + Self { + reason_code: reason, + properties: Some(properties), + } + } + fn len(&self) -> usize { if self.reason_code == DisconnectReasonCode::NormalDisconnection && self.properties.is_none() diff --git a/rumqttc/src/v5/state.rs b/rumqttc/src/v5/state.rs index 0f08a33b8..5e779267b 100644 --- a/rumqttc/src/v5/state.rs +++ b/rumqttc/src/v5/state.rs @@ -183,9 +183,7 @@ impl MqttState { Request::Subscribe(subscribe) => self.outgoing_subscribe(subscribe)?, Request::Unsubscribe(unsubscribe) => self.outgoing_unsubscribe(unsubscribe)?, Request::PingReq => self.outgoing_ping()?, - Request::Disconnect => { - self.outgoing_disconnect(DisconnectReasonCode::NormalDisconnection)? - } + Request::Disconnect(disconnect) => self.outgoing_disconnect(disconnect)?, Request::PubAck(puback) => self.outgoing_puback(puback)?, Request::PubRec(pubrec) => self.outgoing_pubrec(pubrec)?, _ => unimplemented!(), @@ -228,7 +226,8 @@ impl MqttState { pub fn handle_protocol_error(&mut self) -> Result, StateError> { // send DISCONNECT packet with REASON_CODE 0x82 - self.outgoing_disconnect(DisconnectReasonCode::ProtocolError) + let disconnect = Disconnect::new(DisconnectReasonCode::ProtocolError); + self.outgoing_disconnect(disconnect) } fn handle_incoming_suback( @@ -242,7 +241,7 @@ impl MqttState { } _ => { warn!("SubAck Pkid = {:?}, Reason = {:?}", suback.pkid, reason); - }, + } } } Ok(None) @@ -364,7 +363,10 @@ impl MqttState { if puback.reason != PubAckReason::Success && puback.reason != PubAckReason::NoMatchingSubscribers { - warn!("PubAck Pkid = {:?}, reason: {:?}", puback.pkid, puback.reason); + warn!( + "PubAck Pkid = {:?}, reason: {:?}", + puback.pkid, puback.reason + ); return Ok(None); } @@ -397,7 +399,10 @@ impl MqttState { if pubrec.reason != PubRecReason::Success && pubrec.reason != PubRecReason::NoMatchingSubscribers { - warn!("PubRec Pkid = {:?}, reason: {:?}", pubrec.pkid, pubrec.reason); + warn!( + "PubRec Pkid = {:?}, reason: {:?}", + pubrec.pkid, pubrec.reason + ); return Ok(None); } @@ -417,7 +422,10 @@ impl MqttState { self.incoming_pub.set(pubrel.pkid as usize, false); if pubrel.reason != PubRelReason::Success { - warn!("PubRel Pkid = {:?}, reason: {:?}", pubrel.pkid, pubrel.reason); + warn!( + "PubRel Pkid = {:?}, reason: {:?}", + pubrel.pkid, pubrel.reason + ); return Ok(None); } @@ -444,7 +452,10 @@ impl MqttState { self.outgoing_rel.set(pubcomp.pkid as usize, false); if pubcomp.reason != PubCompReason::Success { - warn!("PubComp Pkid = {:?}, reason: {:?}", pubcomp.pkid, pubcomp.reason); + warn!( + "PubComp Pkid = {:?}, reason: {:?}", + pubcomp.pkid, pubcomp.reason + ); return Ok(None); } @@ -614,15 +625,16 @@ impl MqttState { Ok(Some(Packet::Unsubscribe(unsub))) } + /// Send Disconnect packet to broker fn outgoing_disconnect( &mut self, - reason: DisconnectReasonCode, + disconnect: Disconnect, ) -> Result, StateError> { - debug!("Disconnect with {:?}", reason); + debug!("Disconnect with {:?}", disconnect.reason_code); let event = Event::Outgoing(Outgoing::Disconnect); self.events.push_back(event); - Ok(Some(Packet::Disconnect(Disconnect::new(reason)))) + Ok(Some(Packet::Disconnect(disconnect))) } fn check_collision(&mut self, pkid: u16) -> Option {