-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathtls_producer.rs
88 lines (72 loc) · 2.75 KB
/
tls_producer.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
use tracing::info;
use tracing_subscriber::FmtSubscriber;
use rabbitmq_stream_client::{types::Message, Environment, NoDedup, Producer, TlsConfiguration};
const BATCH_SIZE: usize = 100;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let stream_name = String::from("tls_test_stream");
let subscriber = FmtSubscriber::builder().finish();
tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");
// to make this example work you need to run the following command
// `make rabbitmq-server`
// it will start a rabbitmq server with compatible TLS certificates
let tls_configuration: TlsConfiguration = TlsConfiguration::builder()
.add_root_certificates(String::from(".ci/certs/ca_certificate.pem"))
.build()?;
// Use this configuration if you want to trust the certificates
// without providing the root certificate and the client certificates
// let tls_configuration: TlsConfiguration = TlsConfiguration::builder()
// .add_root_certificates(String::from(".ci/certs/ca_certificate.pem"))
// .add_client_certificates_keys(
// String::from(".ci/certs/client_certificate.pem"),
// String::from(".ci/certs/client_key.pem"),
// )
// .build();
// Use this configuration if you want to trust the certificates
// without providing the root certificate
// let tls_configuration: TlsConfiguration = TlsConfiguration::builder()
// .trust_certificates(true)
// .build();
let environment = Environment::builder()
.host("localhost")
.port(5551)
.tls(tls_configuration)
.build()
.await?;
start_publisher(environment.clone(), &stream_name)
.await
.expect("error in publisher");
Ok(())
}
async fn start_publisher(
env: Environment,
stream: &String,
) -> Result<(), Box<dyn std::error::Error>> {
let _ = env.stream_creator().create(stream).await;
let producer = env.producer().batch_size(BATCH_SIZE).build(stream).await?;
let is_batch_send = true;
tokio::task::spawn(async move {
info!(
"Starting producer with batch size {} and batch send {}",
BATCH_SIZE, is_batch_send
);
info!("Sending {} simple messages", BATCH_SIZE);
batch_send_simple(&producer).await;
})
.await?;
Ok(())
}
async fn batch_send_simple(producer: &Producer<NoDedup>) {
let mut msg = Vec::with_capacity(BATCH_SIZE);
for i in 0..BATCH_SIZE {
msg.push(
Message::builder()
.body(format!("rust message{}", i))
.build(),
);
}
producer
.batch_send(msg, move |_| async move {})
.await
.unwrap();
}