Skip to content

Commit 1136ecf

Browse files
committed
optionally read probes from a file with the client
1 parent c20254b commit 1136ecf

File tree

6 files changed

+59
-17
lines changed

6 files changed

+59
-17
lines changed

src/agent/producer.rs

+9-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use caracat::models::{MPLSLabel, Reply};
2-
use log::{debug, info, warn};
2+
use log::{debug, error, info, warn};
33
use rdkafka::config::ClientConfig;
44
use rdkafka::message::OwnedHeaders;
55
use rdkafka::producer::{FutureProducer, FutureRecord};
@@ -108,6 +108,8 @@ pub async fn produce(config: &AppConfig, auth: KafkaAuth, rx: Receiver<Reply>) {
108108

109109
let message = message.unwrap();
110110
let message_str = encode_reply(config.agent.id.clone(), &message);
111+
112+
// Max message size is 1048576 bytes (including headers)
111113
if final_message.len() + message_str.len() + 1 > config.kafka.message_max_bytes {
112114
additional_message = Some(message);
113115
break;
@@ -138,6 +140,11 @@ pub async fn produce(config: &AppConfig, auth: KafkaAuth, rx: Receiver<Reply>) {
138140
)
139141
.await;
140142

141-
info!("{:?}", delivery_status);
143+
match delivery_status {
144+
Ok(status) => info!("{:?}", status),
145+
Err((error, _)) => {
146+
error!("{}", error.to_string());
147+
}
148+
}
142149
}
143150
}

src/client/handler.rs

+27-8
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,24 @@
11
use anyhow::Result;
2+
use caracat::models::Probe;
23
use log::trace;
34
use std::io::{stdin, BufRead};
5+
use std::path::PathBuf;
46

57
use crate::auth::{KafkaAuth, SaslAuth};
68
use crate::client::producer::produce;
79
use crate::config::AppConfig;
810
use crate::probe::decode_probe;
911

10-
pub async fn handle(config: &AppConfig, agents: &str) -> Result<()> {
12+
fn read_probes<R: BufRead>(buf_reader: R) -> Result<Vec<Probe>> {
13+
let mut probes = Vec::new();
14+
for line in buf_reader.lines() {
15+
let probe = line?;
16+
probes.push(decode_probe(&probe)?);
17+
}
18+
Ok(probes)
19+
}
20+
21+
pub async fn handle(config: &AppConfig, agents: &str, probes_file: Option<PathBuf>) -> Result<()> {
1122
trace!("Client handler");
1223
trace!("{:?}", config);
1324

@@ -26,16 +37,24 @@ pub async fn handle(config: &AppConfig, agents: &str) -> Result<()> {
2637
}
2738
};
2839

29-
// Get probes from stdin
30-
let mut probes = Vec::new();
31-
for line in stdin().lock().lines() {
32-
let probe = line?;
33-
probes.push(decode_probe(&probe)?);
34-
}
40+
// Read probes from file or stdin
41+
let probes = match probes_file {
42+
Some(probes_file) => {
43+
let file = std::fs::File::open(probes_file)?;
44+
let buf_reader = std::io::BufReader::new(file);
45+
read_probes(buf_reader)?
46+
}
47+
None => {
48+
let stdin = stdin();
49+
let buf_reader = stdin.lock();
50+
read_probes(buf_reader)?
51+
}
52+
};
3553

36-
// Split agents
54+
// Split the agents
3755
let agents = agents.split(',').collect::<Vec<&str>>();
3856

57+
// Produce Kafka messages
3958
produce(config, auth, agents, probes).await;
4059

4160
Ok(())

src/client/producer.rs

+9-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use caracat::models::Probe;
2-
use log::{debug, info};
2+
use log::{debug, error, info};
33
use rdkafka::config::ClientConfig;
44
use rdkafka::message::{Header, OwnedHeaders};
55
use rdkafka::producer::{FutureProducer, FutureRecord};
@@ -15,7 +15,7 @@ fn create_messages(probes: Vec<Probe>, message_max_bytes: usize) -> Vec<String>
1515
for probe in probes {
1616
// Format probe
1717
let probe_str = encode_probe(&probe);
18-
// Max message size is 1048576 bytes
18+
// Max message size is 1048576 bytes (including headers)
1919
if current_message.len() + probe_str.len() + 1 > message_max_bytes {
2020
// Remove the last newline character
2121
current_message.pop();
@@ -87,6 +87,12 @@ pub async fn produce(config: &AppConfig, auth: KafkaAuth, agents: Vec<&str>, pro
8787
)
8888
.await;
8989

90-
info!("{:?}", delivery_status);
90+
match delivery_status {
91+
Ok(status) => info!("{:?}", status),
92+
Err((error, _)) => {
93+
error!("{}", error.to_string());
94+
error!("{}", config.kafka.message_max_bytes);
95+
}
96+
}
9197
}
9298
}

src/config.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -184,8 +184,7 @@ pub fn app_config(config_path: &str) -> AppConfig {
184184
auth_sasl_mechanism: config
185185
.get_string("kafka.auth_sasl_mechanism")
186186
.unwrap_or("SCRAM-SHA-512".to_string()),
187-
message_max_bytes: config.get_int("kafka.message_max_bytes").unwrap_or(1048576)
188-
as usize,
187+
message_max_bytes: config.get_int("kafka.message_max_bytes").unwrap_or(990000) as usize,
189188
in_topics: config
190189
.get_string("kafka.in_topics")
191190
.unwrap_or("saimiris-targets".to_string()),

src/main.rs

+11-2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use clap_verbosity_flag::{InfoLevel, Verbosity};
1212
use env_logger::Builder;
1313
use log::error;
1414
use std::io::{stdin, IsTerminal, Write};
15+
use std::path::PathBuf;
1516

1617
use crate::config::app_config;
1718

@@ -41,6 +42,10 @@ enum Command {
4142
/// Agent IDs (comma separated)
4243
#[arg(index = 1)]
4344
agents: String,
45+
46+
/// Probes file (read stdin if not provided)
47+
#[arg(short, long)]
48+
probes_file: Option<PathBuf>,
4449
},
4550
}
4651

@@ -79,14 +84,18 @@ async fn main() -> Result<()> {
7984
Err(e) => error!("Error: {}", e),
8085
}
8186
}
82-
Command::Client { config, agents } => {
87+
Command::Client {
88+
config,
89+
agents,
90+
probes_file,
91+
} => {
8392
if stdin().is_terminal() {
8493
App::command().print_help().unwrap();
8594
::std::process::exit(2);
8695
}
8796

8897
let app_config = app_config(&config);
89-
match client::handle(&app_config, &agents).await {
98+
match client::handle(&app_config, &agents, probes_file).await {
9099
Ok(_) => (),
91100
Err(e) => error!("Error: {}", e),
92101
}

testbed/config/saimiris/saimiris.yml

+2
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,7 @@ kafka:
88
out_enable: true
99
out_topic: saimiris-results
1010

11+
message_max_bytes: 999995
12+
1113
agent:
1214
id: wbmwwp9vna

0 commit comments

Comments
 (0)