Skip to content

Commit 1007fee

Browse files
committed
fix Kafka message too large error on producers
1 parent 4215c04 commit 1007fee

File tree

3 files changed

+19
-7
lines changed

3 files changed

+19
-7
lines changed

src/agent/producer.rs

+9-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use caracat::models::{MPLSLabel, Reply};
2-
use log::{debug, info, warn};
2+
use log::{debug, error, info, warn};
33
use rdkafka::config::ClientConfig;
44
use rdkafka::message::OwnedHeaders;
55
use rdkafka::producer::{FutureProducer, FutureRecord};
@@ -108,6 +108,8 @@ pub async fn produce(config: &AppConfig, auth: KafkaAuth, rx: Receiver<Reply>) {
108108

109109
let message = message.unwrap();
110110
let message_str = encode_reply(config.agent.id.clone(), &message);
111+
112+
// Max message size is 1048576 bytes (including headers)
111113
if final_message.len() + message_str.len() + 1 > config.kafka.message_max_bytes {
112114
additional_message = Some(message);
113115
break;
@@ -138,6 +140,11 @@ pub async fn produce(config: &AppConfig, auth: KafkaAuth, rx: Receiver<Reply>) {
138140
)
139141
.await;
140142

141-
info!("{:?}", delivery_status);
143+
match delivery_status {
144+
Ok(status) => info!("{:?}", status),
145+
Err((error, _)) => {
146+
error!("{}", error.to_string());
147+
}
148+
}
142149
}
143150
}

src/client/producer.rs

+9-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use caracat::models::Probe;
2-
use log::{debug, info};
2+
use log::{debug, error, info};
33
use rdkafka::config::ClientConfig;
44
use rdkafka::message::{Header, OwnedHeaders};
55
use rdkafka::producer::{FutureProducer, FutureRecord};
@@ -15,7 +15,7 @@ fn create_messages(probes: Vec<Probe>, message_max_bytes: usize) -> Vec<String>
1515
for probe in probes {
1616
// Format probe
1717
let probe_str = encode_probe(&probe);
18-
// Max message size is 1048576 bytes
18+
// Max message size is 1048576 bytes (including headers)
1919
if current_message.len() + probe_str.len() + 1 > message_max_bytes {
2020
// Remove the last newline character
2121
current_message.pop();
@@ -87,6 +87,12 @@ pub async fn produce(config: &AppConfig, auth: KafkaAuth, agents: Vec<&str>, pro
8787
)
8888
.await;
8989

90-
info!("{:?}", delivery_status);
90+
match delivery_status {
91+
Ok(status) => info!("{:?}", status),
92+
Err((error, _)) => {
93+
error!("{}", error.to_string());
94+
error!("{}", config.kafka.message_max_bytes);
95+
}
96+
}
9197
}
9298
}

src/config.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -184,8 +184,7 @@ pub fn app_config(config_path: &str) -> AppConfig {
184184
auth_sasl_mechanism: config
185185
.get_string("kafka.auth_sasl_mechanism")
186186
.unwrap_or("SCRAM-SHA-512".to_string()),
187-
message_max_bytes: config.get_int("kafka.message_max_bytes").unwrap_or(1048576)
188-
as usize,
187+
message_max_bytes: config.get_int("kafka.message_max_bytes").unwrap_or(990000) as usize,
189188
in_topics: config
190189
.get_string("kafka.in_topics")
191190
.unwrap_or("saimiris-targets".to_string()),

0 commit comments

Comments
 (0)