-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathraw_client.rs
99 lines (84 loc) · 3 KB
/
raw_client.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
use std::collections::HashMap;
use std::sync::atomic::AtomicI32;
use std::sync::Arc;
use rabbitmq_stream_client::types::{Message, MessageResult, ResponseKind};
use rabbitmq_stream_client::{types::OffsetSpecification, Client, ClientOptions};
use tokio::sync::Notify;
use tracing::info;
use tracing::Level;
use tracing_subscriber::FmtSubscriber;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let subscriber = FmtSubscriber::builder()
.with_max_level(Level::TRACE)
.finish();
let notifier = Arc::new(Notify::new());
let messages = 10;
tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");
let client = Client::connect(ClientOptions::default()).await?;
let stream = "test";
let _ = client.delete_stream(stream).await?;
let _ = client.create_stream(stream, HashMap::new()).await?;
start_subscriber(stream, &client, notifier.clone()).await?;
client
.declare_publisher(1, Some("my_publisher".to_owned()), stream)
.await?;
for i in 0..messages {
let _ = client
.publish(
1,
Message::builder()
.body(format!("message {}", i).as_bytes().to_vec())
.build(),
1,
)
.await
.unwrap();
}
notifier.notified().await;
client.delete_publisher(1).await?;
let _ = client.delete_stream(stream).await?;
Ok(())
}
async fn start_subscriber(
stream: &str,
client: &Client,
notifier: Arc<Notify>,
) -> Result<(), Box<dyn std::error::Error>> {
let client_inner = client.clone();
let notifier_inner = notifier.clone();
let counter = Arc::new(AtomicI32::new(0));
let handler = move |msg: MessageResult| async move {
if let Some(Ok(response)) = msg {
match response.kind_ref() {
ResponseKind::Deliver(delivery) => {
for message in &delivery.messages {
info!(
"Got message {:?}",
message.data().map(|data| String::from_utf8(data.to_vec()))
);
}
let len = delivery.messages.len();
let current = counter
.fetch_add(len as i32, std::sync::atomic::Ordering::Relaxed)
+ len as i32;
if current == 10 {
client_inner.unsubscribe(1).await.unwrap();
notifier_inner.notify_one();
} else {
client_inner.credit(1, 1).await.unwrap();
}
}
_ => {
info!("Got response {:?}", &response);
}
}
}
Ok(())
};
client.set_handler(handler).await;
let _ = client
.subscribe(1, stream, OffsetSpecification::Next, 1, HashMap::new())
.await?;
Ok(())
}