Skip to content

Commit

Permalink
make example fun
Browse files Browse the repository at this point in the history
  • Loading branch information
Devdutt Shenoi committed Feb 23, 2024
1 parent c7ac03d commit 408b01c
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 13 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rumqttc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ matches = "0.1"
pretty_assertions = "1"
pretty_env_logger = "0.5"
serde = { version = "1", features = ["derive"] }
tokio-util = { version = "0.7", features = ["time"] }

[[example]]
name = "tls"
Expand Down
34 changes: 21 additions & 13 deletions rumqttc/examples/pkid_promise.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use tokio::{
task::{self, JoinSet},
time,
select
};
use tokio_util::time::DelayQueue;
use futures_util::stream::StreamExt;

use rumqttc::{AsyncClient, MqttOptions, QoS};
use std::error::Error;
Expand All @@ -12,7 +14,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
pretty_env_logger::init();
// color_backtrace::install();

let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
let mut mqttoptions = MqttOptions::new("test-1", "broker.emqx.io", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));

let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
Expand Down Expand Up @@ -43,19 +45,25 @@ async fn requests(client: AsyncClient) {
.unwrap(),
);

let mut queue = DelayQueue::new();
for i in 1..=10 {
joins.spawn(
client
.publish("hello/world", QoS::ExactlyOnce, false, vec![1; i])
.await
.unwrap(),
);

time::sleep(Duration::from_secs(1)).await;
queue.insert(i as usize, Duration::from_secs(i));
}

// TODO: maybe rewrite to showcase in-between resolutions?
while let Some(Ok(Ok(pkid))) = joins.join_next().await {
println!("Pkid: {:?}", pkid);
loop {
select!{
Some(i) = queue.next() => {
joins.spawn(
client
.publish("hello/world", QoS::ExactlyOnce, false, vec![1; i.into_inner()])
.await
.unwrap(),
);
}
Some(Ok(Ok(pkid))) = joins.join_next() => {
println!("Pkid: {:?}", pkid);
}
else => break,
}
}
}

0 comments on commit 408b01c

Please sign in to comment.