-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathproducer.rs
98 lines (88 loc) · 3.19 KB
/
producer.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
use caracat::models::Probe;
use log::{debug, error, info};
use rdkafka::config::ClientConfig;
use rdkafka::message::{Header, OwnedHeaders};
use rdkafka::producer::{FutureProducer, FutureRecord};
use std::time::Duration;
use crate::auth::KafkaAuth;
use crate::config::AppConfig;
use crate::probe::encode_probe;
fn create_messages(probes: Vec<Probe>, message_max_bytes: usize) -> Vec<String> {
let mut messages = Vec::new();
let mut current_message = String::new();
for probe in probes {
// Format probe
let probe_str = encode_probe(&probe);
// Max message size is 1048576 bytes (including headers)
if current_message.len() + probe_str.len() + 1 > message_max_bytes {
// Remove the last newline character
current_message.pop();
messages.push(current_message);
current_message = String::new();
}
current_message.push_str(&probe_str);
current_message.push_str("\n");
}
if !current_message.is_empty() {
// Remove the last newline character
current_message.pop();
messages.push(current_message);
}
messages
}
pub async fn produce(config: &AppConfig, auth: KafkaAuth, agents: Vec<&str>, probes: Vec<Probe>) {
let producer: &FutureProducer = match auth {
KafkaAuth::PlainText => &ClientConfig::new()
.set("bootstrap.servers", config.kafka.brokers.clone())
.set("message.timeout.ms", "5000")
.create()
.expect("Producer creation error"),
KafkaAuth::SasalPlainText(scram_auth) => &ClientConfig::new()
.set("bootstrap.servers", config.kafka.brokers.clone())
.set("message.timeout.ms", "5000")
.set("sasl.username", scram_auth.username)
.set("sasl.password", scram_auth.password)
.set("sasl.mechanisms", scram_auth.mechanism)
.set("security.protocol", "SASL_PLAINTEXT")
.create()
.expect("Producer creation error"),
};
let topic = config.kafka.in_topics.split(',').collect::<Vec<&str>>()[0];
// Construct headers
let mut headers = OwnedHeaders::new();
for agent in agents {
headers = headers.insert(Header {
key: agent,
value: Some(agent),
});
}
// Place probes into Kafka messages
let probes_len = probes.len();
let messages = create_messages(probes, config.kafka.message_max_bytes);
info!(
"topic={},messages={},probes={}",
topic,
messages.len(),
probes_len,
);
// Send to Kafka
for message in messages {
debug!("{}", message);
let delivery_status = producer
.send(
FutureRecord::to(topic)
.payload(&format!("{}", message))
.key(&format!("")) // TODO Client ID
.headers(headers.clone()),
Duration::from_secs(0),
)
.await;
match delivery_status {
Ok(status) => info!("{:?}", status),
Err((error, _)) => {
error!("{}", error.to_string());
error!("{}", config.kafka.message_max_bytes);
}
}
}
}