diff --git a/rumqttc/src/v5/client.rs b/rumqttc/src/v5/client.rs index 910da504f..ed7005794 100644 --- a/rumqttc/src/v5/client.rs +++ b/rumqttc/src/v5/client.rs @@ -8,7 +8,7 @@ use super::mqttbytes::v5::{ }; use super::mqttbytes::QoS; use super::{ConnectionError, Event, EventLoop, MqttOptions, Request}; -use crate::valid_topic; +use crate::{valid_topic, PkidPromise}; use bytes::Bytes; use flume::{SendError, Sender, TrySendError}; @@ -78,7 +78,7 @@ impl AsyncClient { retain: bool, payload: P, properties: Option, - ) -> Result<(), ClientError> + ) -> Result where S: Into, P: Into, @@ -86,12 +86,21 @@ impl AsyncClient { let topic = topic.into(); let mut publish = Publish::new(&topic, qos, payload, properties); publish.retain = retain; + + let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); + // Fulfill instantly for QoS 0 + if qos == QoS::AtMostOnce { + _ = pkid_tx.send(0); + } else { + publish.place_pkid_tx(pkid_tx); + } + let publish = Request::Publish(publish); if !valid_topic(&topic) { return Err(ClientError::Request(publish)); } self.request_tx.send_async(publish).await?; - Ok(()) + Ok(pkid_rx) } pub async fn publish_with_properties( @@ -101,7 +110,7 @@ impl AsyncClient { retain: bool, payload: P, properties: PublishProperties, - ) -> Result<(), ClientError> + ) -> Result where S: Into, P: Into, @@ -116,7 +125,7 @@ impl AsyncClient { qos: QoS, retain: bool, payload: P, - ) -> Result<(), ClientError> + ) -> Result where S: Into, P: Into, @@ -132,7 +141,7 @@ impl AsyncClient { retain: bool, payload: P, properties: Option, - ) -> Result<(), ClientError> + ) -> Result where S: Into, P: Into, @@ -140,12 +149,21 @@ impl AsyncClient { let topic = topic.into(); let mut publish = Publish::new(&topic, qos, payload, properties); publish.retain = retain; + + let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); + // Fulfill instantly for QoS 0 + if qos == QoS::AtMostOnce { + _ = pkid_tx.send(0); + } else { + publish.place_pkid_tx(pkid_tx); + } + let publish = Request::Publish(publish); if !valid_topic(&topic) { return Err(ClientError::TryRequest(publish)); } self.request_tx.try_send(publish)?; - Ok(()) + Ok(pkid_rx) } pub fn try_publish_with_properties( @@ -155,7 +173,7 @@ impl AsyncClient { retain: bool, payload: P, properties: PublishProperties, - ) -> Result<(), ClientError> + ) -> Result where S: Into, P: Into, @@ -169,7 +187,7 @@ impl AsyncClient { qos: QoS, retain: bool, payload: P, - ) -> Result<(), ClientError> + ) -> Result where S: Into, P: Into, @@ -204,19 +222,28 @@ impl AsyncClient { retain: bool, payload: Bytes, properties: Option, - ) -> Result<(), ClientError> + ) -> Result where S: Into, { let topic = topic.into(); let mut publish = Publish::new(&topic, qos, payload, properties); publish.retain = retain; + + let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); + // Fulfill instantly for QoS 0 + if qos == QoS::AtMostOnce { + _ = pkid_tx.send(0); + } else { + publish.place_pkid_tx(pkid_tx); + } + let publish = Request::Publish(publish); if !valid_topic(&topic) { return Err(ClientError::TryRequest(publish)); } self.request_tx.send_async(publish).await?; - Ok(()) + Ok(pkid_rx) } pub async fn publish_bytes_with_properties( @@ -226,7 +253,7 @@ impl AsyncClient { retain: bool, payload: Bytes, properties: PublishProperties, - ) -> Result<(), ClientError> + ) -> Result where S: Into, { @@ -240,7 +267,7 @@ impl AsyncClient { qos: QoS, retain: bool, payload: Bytes, - ) -> Result<(), ClientError> + ) -> Result where S: Into, { @@ -254,12 +281,17 @@ impl AsyncClient { topic: S, qos: QoS, properties: Option, - ) -> Result<(), ClientError> { + ) -> Result { let filter = Filter::new(topic, qos); - let subscribe = Subscribe::new(filter, properties); + let mut subscribe = Subscribe::new(filter, properties); + + let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); + + subscribe.place_pkid_tx(pkid_tx); + let request: Request = Request::Subscribe(subscribe); self.request_tx.send_async(request).await?; - Ok(()) + Ok(pkid_rx) } pub async fn subscribe_with_properties>( @@ -267,11 +299,15 @@ impl AsyncClient { topic: S, qos: QoS, properties: SubscribeProperties, - ) -> Result<(), ClientError> { + ) -> Result { self.handle_subscribe(topic, qos, Some(properties)).await } - pub async fn subscribe>(&self, topic: S, qos: QoS) -> Result<(), ClientError> { + pub async fn subscribe>( + &self, + topic: S, + qos: QoS, + ) -> Result { self.handle_subscribe(topic, qos, None).await } @@ -281,12 +317,16 @@ impl AsyncClient { topic: S, qos: QoS, properties: Option, - ) -> Result<(), ClientError> { + ) -> Result { let filter = Filter::new(topic, qos); - let subscribe = Subscribe::new(filter, properties); + let mut subscribe = Subscribe::new(filter, properties); + + let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); + subscribe.place_pkid_tx(pkid_tx); + let request = Request::Subscribe(subscribe); self.request_tx.try_send(request)?; - Ok(()) + Ok(pkid_rx) } pub fn try_subscribe_with_properties>( @@ -294,11 +334,15 @@ impl AsyncClient { topic: S, qos: QoS, properties: SubscribeProperties, - ) -> Result<(), ClientError> { + ) -> Result { self.handle_try_subscribe(topic, qos, Some(properties)) } - pub fn try_subscribe>(&self, topic: S, qos: QoS) -> Result<(), ClientError> { + pub fn try_subscribe>( + &self, + topic: S, + qos: QoS, + ) -> Result { self.handle_try_subscribe(topic, qos, None) } @@ -307,28 +351,32 @@ impl AsyncClient { &self, topics: T, properties: Option, - ) -> Result<(), ClientError> + ) -> Result where T: IntoIterator, { - let subscribe = Subscribe::new_many(topics, properties); + let mut subscribe = Subscribe::new_many(topics, properties); + + let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); + subscribe.place_pkid_tx(pkid_tx); + let request = Request::Subscribe(subscribe); self.request_tx.send_async(request).await?; - Ok(()) + Ok(pkid_rx) } pub async fn subscribe_many_with_properties( &self, topics: T, properties: SubscribeProperties, - ) -> Result<(), ClientError> + ) -> Result where T: IntoIterator, { self.handle_subscribe_many(topics, Some(properties)).await } - pub async fn subscribe_many(&self, topics: T) -> Result<(), ClientError> + pub async fn subscribe_many(&self, topics: T) -> Result where T: IntoIterator, { @@ -340,28 +388,32 @@ impl AsyncClient { &self, topics: T, properties: Option, - ) -> Result<(), ClientError> + ) -> Result where T: IntoIterator, { - let subscribe = Subscribe::new_many(topics, properties); + let mut subscribe = Subscribe::new_many(topics, properties); + + let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); + subscribe.place_pkid_tx(pkid_tx); + let request = Request::Subscribe(subscribe); self.request_tx.try_send(request)?; - Ok(()) + Ok(pkid_rx) } pub fn try_subscribe_many_with_properties( &self, topics: T, properties: SubscribeProperties, - ) -> Result<(), ClientError> + ) -> Result where T: IntoIterator, { self.handle_try_subscribe_many(topics, Some(properties)) } - pub fn try_subscribe_many(&self, topics: T) -> Result<(), ClientError> + pub fn try_subscribe_many(&self, topics: T) -> Result where T: IntoIterator, { @@ -373,22 +425,26 @@ impl AsyncClient { &self, topic: S, properties: Option, - ) -> Result<(), ClientError> { - let unsubscribe = Unsubscribe::new(topic, properties); + ) -> Result { + let mut unsubscribe = Unsubscribe::new(topic, properties); + + let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); + unsubscribe.place_pkid_tx(pkid_tx); + let request = Request::Unsubscribe(unsubscribe); self.request_tx.send_async(request).await?; - Ok(()) + Ok(pkid_rx) } pub async fn unsubscribe_with_properties>( &self, topic: S, properties: UnsubscribeProperties, - ) -> Result<(), ClientError> { + ) -> Result { self.handle_unsubscribe(topic, Some(properties)).await } - pub async fn unsubscribe>(&self, topic: S) -> Result<(), ClientError> { + pub async fn unsubscribe>(&self, topic: S) -> Result { self.handle_unsubscribe(topic, None).await } @@ -397,22 +453,26 @@ impl AsyncClient { &self, topic: S, properties: Option, - ) -> Result<(), ClientError> { - let unsubscribe = Unsubscribe::new(topic, properties); + ) -> Result { + let mut unsubscribe = Unsubscribe::new(topic, properties); + + let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); + unsubscribe.place_pkid_tx(pkid_tx); + let request = Request::Unsubscribe(unsubscribe); self.request_tx.try_send(request)?; - Ok(()) + Ok(pkid_rx) } pub fn try_unsubscribe_with_properties>( &self, topic: S, properties: UnsubscribeProperties, - ) -> Result<(), ClientError> { + ) -> Result { self.handle_try_unsubscribe(topic, Some(properties)) } - pub fn try_unsubscribe>(&self, topic: S) -> Result<(), ClientError> { + pub fn try_unsubscribe>(&self, topic: S) -> Result { self.handle_try_unsubscribe(topic, None) } @@ -490,7 +550,7 @@ impl Client { retain: bool, payload: P, properties: Option, - ) -> Result<(), ClientError> + ) -> Result where S: Into, P: Into, @@ -498,12 +558,21 @@ impl Client { let topic = topic.into(); let mut publish = Publish::new(&topic, qos, payload, properties); publish.retain = retain; + + let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); + // Fulfill instantly for QoS 0 + if qos == QoS::AtMostOnce { + _ = pkid_tx.send(0); + } else { + publish.place_pkid_tx(pkid_tx); + } + let publish = Request::Publish(publish); if !valid_topic(&topic) { return Err(ClientError::Request(publish)); } self.client.request_tx.send(publish)?; - Ok(()) + Ok(pkid_rx) } pub fn publish_with_properties( @@ -513,7 +582,7 @@ impl Client { retain: bool, payload: P, properties: PublishProperties, - ) -> Result<(), ClientError> + ) -> Result where S: Into, P: Into, @@ -527,7 +596,7 @@ impl Client { qos: QoS, retain: bool, payload: P, - ) -> Result<(), ClientError> + ) -> Result where S: Into, P: Into, @@ -542,7 +611,7 @@ impl Client { retain: bool, payload: P, properties: PublishProperties, - ) -> Result<(), ClientError> + ) -> Result where S: Into, P: Into, @@ -557,7 +626,7 @@ impl Client { qos: QoS, retain: bool, payload: P, - ) -> Result<(), ClientError> + ) -> Result where S: Into, P: Into, @@ -587,12 +656,16 @@ impl Client { topic: S, qos: QoS, properties: Option, - ) -> Result<(), ClientError> { + ) -> Result { let filter = Filter::new(topic, qos); - let subscribe = Subscribe::new(filter, properties); + let mut subscribe = Subscribe::new(filter, properties); + + let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); + subscribe.place_pkid_tx(pkid_tx); + let request = Request::Subscribe(subscribe); self.client.request_tx.send(request)?; - Ok(()) + Ok(pkid_rx) } pub fn subscribe_with_properties>( @@ -600,11 +673,15 @@ impl Client { topic: S, qos: QoS, properties: SubscribeProperties, - ) -> Result<(), ClientError> { + ) -> Result { self.handle_subscribe(topic, qos, Some(properties)) } - pub fn subscribe>(&self, topic: S, qos: QoS) -> Result<(), ClientError> { + pub fn subscribe>( + &self, + topic: S, + qos: QoS, + ) -> Result { self.handle_subscribe(topic, qos, None) } @@ -614,12 +691,16 @@ impl Client { topic: S, qos: QoS, properties: SubscribeProperties, - ) -> Result<(), ClientError> { + ) -> Result { self.client .try_subscribe_with_properties(topic, qos, properties) } - pub fn try_subscribe>(&self, topic: S, qos: QoS) -> Result<(), ClientError> { + pub fn try_subscribe>( + &self, + topic: S, + qos: QoS, + ) -> Result { self.client.try_subscribe(topic, qos) } @@ -628,28 +709,32 @@ impl Client { &self, topics: T, properties: Option, - ) -> Result<(), ClientError> + ) -> Result where T: IntoIterator, { - let subscribe = Subscribe::new_many(topics, properties); + let mut subscribe = Subscribe::new_many(topics, properties); + + let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); + subscribe.place_pkid_tx(pkid_tx); + let request = Request::Subscribe(subscribe); self.client.request_tx.send(request)?; - Ok(()) + Ok(pkid_rx) } pub fn subscribe_many_with_properties( &self, topics: T, properties: SubscribeProperties, - ) -> Result<(), ClientError> + ) -> Result where T: IntoIterator, { self.handle_subscribe_many(topics, Some(properties)) } - pub fn subscribe_many(&self, topics: T) -> Result<(), ClientError> + pub fn subscribe_many(&self, topics: T) -> Result where T: IntoIterator, { @@ -660,7 +745,7 @@ impl Client { &self, topics: T, properties: SubscribeProperties, - ) -> Result<(), ClientError> + ) -> Result where T: IntoIterator, { @@ -668,7 +753,7 @@ impl Client { .try_subscribe_many_with_properties(topics, properties) } - pub fn try_subscribe_many(&self, topics: T) -> Result<(), ClientError> + pub fn try_subscribe_many(&self, topics: T) -> Result where T: IntoIterator, { @@ -680,22 +765,26 @@ impl Client { &self, topic: S, properties: Option, - ) -> Result<(), ClientError> { - let unsubscribe = Unsubscribe::new(topic, properties); + ) -> Result { + let mut unsubscribe = Unsubscribe::new(topic, properties); + + let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); + unsubscribe.place_pkid_tx(pkid_tx); + let request = Request::Unsubscribe(unsubscribe); self.client.request_tx.send(request)?; - Ok(()) + Ok(pkid_rx) } pub fn unsubscribe_with_properties>( &self, topic: S, properties: UnsubscribeProperties, - ) -> Result<(), ClientError> { + ) -> Result { self.handle_unsubscribe(topic, Some(properties)) } - pub fn unsubscribe>(&self, topic: S) -> Result<(), ClientError> { + pub fn unsubscribe>(&self, topic: S) -> Result { self.handle_unsubscribe(topic, None) } @@ -704,12 +793,12 @@ impl Client { &self, topic: S, properties: UnsubscribeProperties, - ) -> Result<(), ClientError> { + ) -> Result { self.client .try_unsubscribe_with_properties(topic, properties) } - pub fn try_unsubscribe>(&self, topic: S) -> Result<(), ClientError> { + pub fn try_unsubscribe>(&self, topic: S) -> Result { self.client.try_unsubscribe(topic) } diff --git a/rumqttc/src/v5/mqttbytes/v5/publish.rs b/rumqttc/src/v5/mqttbytes/v5/publish.rs index 74fbee225..c787e6004 100644 --- a/rumqttc/src/v5/mqttbytes/v5/publish.rs +++ b/rumqttc/src/v5/mqttbytes/v5/publish.rs @@ -1,8 +1,11 @@ -use super::*; use bytes::{Buf, Bytes}; +use tokio::sync::oneshot::Sender; + +use super::*; +use crate::Pkid; /// Publish packet -#[derive(Clone, Debug, PartialEq, Eq, Default)] +#[derive(Debug, Default)] pub struct Publish { pub dup: bool, pub qos: QoS, @@ -11,8 +14,39 @@ pub struct Publish { pub pkid: u16, pub payload: Bytes, pub properties: Option, + pub pkid_tx: Option>, +} + +// TODO: figure out if this is even required +impl Clone for Publish { + fn clone(&self) -> Self { + Self { + dup: self.dup, + qos: self.qos, + retain: self.retain, + topic: self.topic.clone(), + payload: self.payload.clone(), + pkid: self.pkid, + properties: self.properties.clone(), + pkid_tx: None, + } + } +} + +impl PartialEq for Publish { + fn eq(&self, other: &Self) -> bool { + self.dup == other.dup + && self.qos == other.qos + && self.retain == other.retain + && self.topic == other.topic + && self.payload == other.payload + && self.pkid == other.pkid + && self.properties == other.properties + } } +impl Eq for Publish {} + impl Publish { pub fn new, P: Into>( topic: T, @@ -85,6 +119,7 @@ impl Publish { topic, payload: bytes, properties, + pkid_tx: None, }; Ok(publish) @@ -120,6 +155,10 @@ impl Publish { Ok(1 + count + len) } + + pub fn place_pkid_tx(&mut self, pkid_tx: Sender) { + self.pkid_tx = Some(pkid_tx) + } } #[derive(Debug, Clone, PartialEq, Eq, Default)] diff --git a/rumqttc/src/v5/mqttbytes/v5/subscribe.rs b/rumqttc/src/v5/mqttbytes/v5/subscribe.rs index 4167cd671..28796fe30 100644 --- a/rumqttc/src/v5/mqttbytes/v5/subscribe.rs +++ b/rumqttc/src/v5/mqttbytes/v5/subscribe.rs @@ -1,14 +1,40 @@ -use super::*; use bytes::{Buf, Bytes}; +use tokio::sync::oneshot::Sender; + +use super::*; +use crate::Pkid; /// Subscription packet -#[derive(Clone, Debug, PartialEq, Eq, Default)] +#[derive(Debug, Default)] pub struct Subscribe { pub pkid: u16, pub filters: Vec, pub properties: Option, + pub pkid_tx: Option>, } +// TODO: figure out if this is even required +impl Clone for Subscribe { + fn clone(&self) -> Self { + Self { + pkid: self.pkid, + filters: self.filters.clone(), + properties: self.properties.clone(), + pkid_tx: None, + } + } +} + +impl PartialEq for Subscribe { + fn eq(&self, other: &Self) -> bool { + self.pkid == other.pkid + && self.filters == other.filters + && self.properties == other.properties + } +} + +impl Eq for Subscribe {} + impl Subscribe { pub fn new(filter: Filter, properties: Option) -> Self { Self { @@ -67,6 +93,7 @@ impl Subscribe { pkid, filters, properties, + pkid_tx: None, }), } } @@ -95,6 +122,10 @@ impl Subscribe { Ok(1 + remaining_len_bytes + remaining_len) } + + pub fn place_pkid_tx(&mut self, pkid_tx: Sender) { + self.pkid_tx = Some(pkid_tx) + } } /// Subscription filter diff --git a/rumqttc/src/v5/mqttbytes/v5/unsubscribe.rs b/rumqttc/src/v5/mqttbytes/v5/unsubscribe.rs index 2b671ce39..92d1d5610 100644 --- a/rumqttc/src/v5/mqttbytes/v5/unsubscribe.rs +++ b/rumqttc/src/v5/mqttbytes/v5/unsubscribe.rs @@ -1,14 +1,38 @@ -use super::*; use bytes::{Buf, Bytes}; +use tokio::sync::oneshot::Sender; + +use super::*; +use crate::Pkid; /// Unsubscribe packet -#[derive(Debug, Clone, PartialEq, Eq, Default)] +#[derive(Debug, Default)] pub struct Unsubscribe { pub pkid: u16, pub filters: Vec, pub properties: Option, + pub pkid_tx: Option>, +} + +// TODO: figure out if this is even required +impl Clone for Unsubscribe { + fn clone(&self) -> Self { + Self { + pkid: self.pkid, + filters: self.filters.clone(), + properties: self.properties.clone(), + pkid_tx: None, + } + } +} + +impl PartialEq for Unsubscribe { + fn eq(&self, other: &Self) -> bool { + self.pkid == other.pkid && self.filters == other.filters + } } +impl Eq for Unsubscribe {} + impl Unsubscribe { pub fn new>(filter: S, properties: Option) -> Self { Self { @@ -59,6 +83,7 @@ impl Unsubscribe { pkid, filters, properties, + pkid_tx: None, }; Ok(unsubscribe) } @@ -86,6 +111,10 @@ impl Unsubscribe { Ok(1 + remaining_len_bytes + remaining_len) } + + pub fn place_pkid_tx(&mut self, pkid_tx: Sender) { + self.pkid_tx = Some(pkid_tx) + } } #[derive(Debug, Clone, PartialEq, Eq)] diff --git a/rumqttc/src/v5/state.rs b/rumqttc/src/v5/state.rs index 8473f1f4c..24c75a21b 100644 --- a/rumqttc/src/v5/state.rs +++ b/rumqttc/src/v5/state.rs @@ -485,12 +485,19 @@ impl MqttState { /// Adds next packet identifier to QoS 1 and 2 publish packets and returns /// it buy wrapping publish in packet fn outgoing_publish(&mut self, mut publish: Publish) -> Result<(), StateError> { + // NOTE: pkid promise need not be fulfilled for QoS 0, + // user should know this but still handled in Client. if publish.qos != QoS::AtMostOnce { if publish.pkid == 0 { publish.pkid = self.next_pkid(); } let pkid = publish.pkid; + // Fulfill the pkid promise + if let Some(pkid_tx) = publish.pkid_tx.take() { + _ = pkid_tx.send(pkid); + } + if self .outgoing_pub .get(publish.pkid as usize) @@ -604,6 +611,10 @@ impl MqttState { let pkid = self.next_pkid(); subscription.pkid = pkid; + // Fulfill the pkid promise + if let Some(pkid_tx) = subscription.pkid_tx.take() { + _ = pkid_tx.send(pkid); + } debug!( "Subscribe. Topics = {:?}, Pkid = {:?}", @@ -620,6 +631,10 @@ impl MqttState { fn outgoing_unsubscribe(&mut self, mut unsub: Unsubscribe) -> Result<(), StateError> { let pkid = self.next_pkid(); unsub.pkid = pkid; + // Fulfill the pkid promise + if let Some(pkid_tx) = unsub.pkid_tx.take() { + _ = pkid_tx.send(pkid); + } debug!( "Unsubscribe. Topics = {:?}, Pkid = {:?}",