Skip to content

Commit

Permalink
doc: v5 example
Browse files Browse the repository at this point in the history
  • Loading branch information
Devdutt Shenoi committed Feb 23, 2024
1 parent 76af76d commit 4999ae3
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 3 deletions.
6 changes: 3 additions & 3 deletions rumqttc/examples/pkid_promise.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use futures_util::stream::StreamExt;
use tokio::{
select,
task::{self, JoinSet},
select
};
use tokio_util::time::DelayQueue;
use futures_util::stream::StreamExt;

use rumqttc::{AsyncClient, MqttOptions, QoS};
use std::error::Error;
Expand Down Expand Up @@ -51,7 +51,7 @@ async fn requests(client: AsyncClient) {
}

loop {
select!{
select! {
Some(i) = queue.next() => {
joins.spawn(
client
Expand Down
69 changes: 69 additions & 0 deletions rumqttc/examples/pkid_promise_v5.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use futures_util::stream::StreamExt;
use tokio::{
select,
task::{self, JoinSet},
};
use tokio_util::time::DelayQueue;

use rumqttc::v5::{mqttbytes::QoS, AsyncClient, MqttOptions};
use std::error::Error;
use std::time::Duration;

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn Error>> {
pretty_env_logger::init();
// color_backtrace::install();

let mut mqttoptions = MqttOptions::new("test-1", "broker.emqx.io", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));

let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
task::spawn(async move {
requests(client).await;
});

loop {
let event = eventloop.poll().await;
match &event {
Ok(v) => {
println!("Event = {v:?}");
}
Err(e) => {
println!("Error = {e:?}");
return Ok(());
}
}
}
}

async fn requests(client: AsyncClient) {
let mut joins = JoinSet::new();
joins.spawn(
client
.subscribe("hello/world", QoS::AtMostOnce)
.await
.unwrap(),
);

let mut queue = DelayQueue::new();
for i in 1..=10 {
queue.insert(i as usize, Duration::from_secs(i));
}

loop {
select! {
Some(i) = queue.next() => {
joins.spawn(
client
.publish("hello/world", QoS::ExactlyOnce, false, vec![1; i.into_inner()])
.await
.unwrap(),
);
}
Some(Ok(Ok(pkid))) = joins.join_next() => {
println!("Pkid: {:?}", pkid);
}
else => break,
}
}
}

0 comments on commit 4999ae3

Please sign in to comment.