From 4999ae3c83a1abe9173d4cb50f53ccc631942ca0 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 23 Feb 2024 02:24:44 +0000 Subject: [PATCH] doc: v5 example --- rumqttc/examples/pkid_promise.rs | 6 +-- rumqttc/examples/pkid_promise_v5.rs | 69 +++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+), 3 deletions(-) create mode 100644 rumqttc/examples/pkid_promise_v5.rs diff --git a/rumqttc/examples/pkid_promise.rs b/rumqttc/examples/pkid_promise.rs index bcb3fbd5b..3042c03a1 100644 --- a/rumqttc/examples/pkid_promise.rs +++ b/rumqttc/examples/pkid_promise.rs @@ -1,9 +1,9 @@ +use futures_util::stream::StreamExt; use tokio::{ + select, task::{self, JoinSet}, - select }; use tokio_util::time::DelayQueue; -use futures_util::stream::StreamExt; use rumqttc::{AsyncClient, MqttOptions, QoS}; use std::error::Error; @@ -51,7 +51,7 @@ async fn requests(client: AsyncClient) { } loop { - select!{ + select! { Some(i) = queue.next() => { joins.spawn( client diff --git a/rumqttc/examples/pkid_promise_v5.rs b/rumqttc/examples/pkid_promise_v5.rs new file mode 100644 index 000000000..95e64a7d1 --- /dev/null +++ b/rumqttc/examples/pkid_promise_v5.rs @@ -0,0 +1,69 @@ +use futures_util::stream::StreamExt; +use tokio::{ + select, + task::{self, JoinSet}, +}; +use tokio_util::time::DelayQueue; + +use rumqttc::v5::{mqttbytes::QoS, AsyncClient, MqttOptions}; +use std::error::Error; +use std::time::Duration; + +#[tokio::main(flavor = "current_thread")] +async fn main() -> Result<(), Box> { + pretty_env_logger::init(); + // color_backtrace::install(); + + let mut mqttoptions = MqttOptions::new("test-1", "broker.emqx.io", 1883); + mqttoptions.set_keep_alive(Duration::from_secs(5)); + + let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); + task::spawn(async move { + requests(client).await; + }); + + loop { + let event = eventloop.poll().await; + match &event { + Ok(v) => { + println!("Event = {v:?}"); + } + Err(e) => { + println!("Error = {e:?}"); + return Ok(()); + } + } + } +} + +async fn requests(client: AsyncClient) { + let mut joins = JoinSet::new(); + joins.spawn( + client + .subscribe("hello/world", QoS::AtMostOnce) + .await + .unwrap(), + ); + + let mut queue = DelayQueue::new(); + for i in 1..=10 { + queue.insert(i as usize, Duration::from_secs(i)); + } + + loop { + select! { + Some(i) = queue.next() => { + joins.spawn( + client + .publish("hello/world", QoS::ExactlyOnce, false, vec![1; i.into_inner()]) + .await + .unwrap(), + ); + } + Some(Ok(Ok(pkid))) = joins.join_next() => { + println!("Pkid: {:?}", pkid); + } + else => break, + } + } +}