From 1007feed253a0bfcd535d9dde2c5d2ca47c351ec Mon Sep 17 00:00:00 2001 From: matthieugouel Date: Mon, 10 Mar 2025 15:02:34 +0100 Subject: [PATCH] fix Kafka message too large error on producers --- src/agent/producer.rs | 11 +++++++++-- src/client/producer.rs | 12 +++++++++--- src/config.rs | 3 +-- 3 files changed, 19 insertions(+), 7 deletions(-) diff --git a/src/agent/producer.rs b/src/agent/producer.rs index f3887d5..eb2c271 100644 --- a/src/agent/producer.rs +++ b/src/agent/producer.rs @@ -1,5 +1,5 @@ use caracat::models::{MPLSLabel, Reply}; -use log::{debug, info, warn}; +use log::{debug, error, info, warn}; use rdkafka::config::ClientConfig; use rdkafka::message::OwnedHeaders; use rdkafka::producer::{FutureProducer, FutureRecord}; @@ -108,6 +108,8 @@ pub async fn produce(config: &AppConfig, auth: KafkaAuth, rx: Receiver) { let message = message.unwrap(); let message_str = encode_reply(config.agent.id.clone(), &message); + + // Max message size is 1048576 bytes (including headers) if final_message.len() + message_str.len() + 1 > config.kafka.message_max_bytes { additional_message = Some(message); break; @@ -138,6 +140,11 @@ pub async fn produce(config: &AppConfig, auth: KafkaAuth, rx: Receiver) { ) .await; - info!("{:?}", delivery_status); + match delivery_status { + Ok(status) => info!("{:?}", status), + Err((error, _)) => { + error!("{}", error.to_string()); + } + } } } diff --git a/src/client/producer.rs b/src/client/producer.rs index c53ac46..017f332 100644 --- a/src/client/producer.rs +++ b/src/client/producer.rs @@ -1,5 +1,5 @@ use caracat::models::Probe; -use log::{debug, info}; +use log::{debug, error, info}; use rdkafka::config::ClientConfig; use rdkafka::message::{Header, OwnedHeaders}; use rdkafka::producer::{FutureProducer, FutureRecord}; @@ -15,7 +15,7 @@ fn create_messages(probes: Vec, message_max_bytes: usize) -> Vec for probe in probes { // Format probe let probe_str = encode_probe(&probe); - // Max message size is 1048576 bytes + // Max message size is 1048576 bytes (including headers) if current_message.len() + probe_str.len() + 1 > message_max_bytes { // Remove the last newline character current_message.pop(); @@ -87,6 +87,12 @@ pub async fn produce(config: &AppConfig, auth: KafkaAuth, agents: Vec<&str>, pro ) .await; - info!("{:?}", delivery_status); + match delivery_status { + Ok(status) => info!("{:?}", status), + Err((error, _)) => { + error!("{}", error.to_string()); + error!("{}", config.kafka.message_max_bytes); + } + } } } diff --git a/src/config.rs b/src/config.rs index 01081df..8df5c58 100644 --- a/src/config.rs +++ b/src/config.rs @@ -184,8 +184,7 @@ pub fn app_config(config_path: &str) -> AppConfig { auth_sasl_mechanism: config .get_string("kafka.auth_sasl_mechanism") .unwrap_or("SCRAM-SHA-512".to_string()), - message_max_bytes: config.get_int("kafka.message_max_bytes").unwrap_or(1048576) - as usize, + message_max_bytes: config.get_int("kafka.message_max_bytes").unwrap_or(990000) as usize, in_topics: config .get_string("kafka.in_topics") .unwrap_or("saimiris-targets".to_string()),