-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathenv_callback.rs
78 lines (66 loc) · 2.07 KB
/
env_callback.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
use futures::StreamExt;
use rabbitmq_stream_client::{
types::{ByteCapacity, Message, OffsetSpecification},
Environment,
};
use tokio::sync::mpsc::channel;
use tracing::{info, Level};
use tracing_subscriber::FmtSubscriber;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let subscriber = FmtSubscriber::builder()
.with_max_level(Level::TRACE)
.finish();
tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");
let environment = Environment::builder()
.host("localhost")
.port(5552)
.build()
.await?;
let message_count = 10;
let _ = environment
.stream_creator()
.max_length(ByteCapacity::GB(2))
.create("test")
.await;
let producer = environment.producer().build("test").await?;
let (tx, mut rx) = channel(message_count);
for i in 0..message_count {
let tx_cloned = tx.clone();
producer
.send(
Message::builder().body(format!("message{}", i)).build(),
move |confirm_result| {
info!("Message confirm result {:?}", confirm_result);
let tx_cloned = tx_cloned.clone();
async move {
tx_cloned.send(()).await.unwrap();
}
},
)
.await?;
}
drop(tx);
while (rx.recv().await).is_some() {}
producer.close().await?;
let mut consumer = environment
.consumer()
.offset(OffsetSpecification::First)
.build("test")
.await
.unwrap();
for _ in 0..message_count {
let delivery = consumer.next().await.unwrap()?;
info!(
"Got message : {:?} with offset {}",
delivery
.message()
.data()
.map(|data| String::from_utf8(data.to_vec())),
delivery.offset()
);
}
consumer.handle().close().await.unwrap();
environment.delete_stream("test").await?;
Ok(())
}