Skip to content

Commit 7d37d37

Browse files
committed
app configuration as config file
1 parent 59a33d1 commit 7d37d37

File tree

9 files changed

+615
-171
lines changed

9 files changed

+615
-171
lines changed

Cargo.lock

+396-3
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ caracat = { git = "https://github.com/maxmouchet/caracat" }
99
chrono = "0.4.39"
1010
clap = { version = "4.5.20", features = ["derive"] }
1111
clap-verbosity-flag = "3.0.0"
12+
config = "0.15.6"
1213
csv = "1.3.1"
1314
env_logger = "0.11.5"
1415
hyperloglog = "1.0.2"

src/config.rs

+69
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
use config::Config as AppConfig;
2+
3+
#[allow(dead_code)]
4+
pub struct ProberConfig {
5+
/// Prober ID
6+
pub prober_id: u16,
7+
8+
/// Kafka brokers
9+
pub brokers: String,
10+
11+
/// Kafka consumer topics (comma separated)
12+
pub in_topics: String,
13+
14+
/// Kafka consumer group ID
15+
pub in_group_id: String,
16+
17+
/// Kafka producer topic
18+
pub out_topic: String,
19+
20+
/// Kafka producer Authentication Protocol
21+
pub out_auth_protocol: String,
22+
23+
/// Kafka producer Authentication SASL Username
24+
pub out_auth_sasl_username: String,
25+
26+
/// Kafka producer Authentication SASL Password
27+
pub out_auth_sasl_password: String,
28+
29+
/// Kafka producer Authentication SASL Mechanism
30+
pub out_auth_sasl_mechanism: String,
31+
}
32+
33+
pub fn load_config(config_path: &str) -> AppConfig {
34+
AppConfig::builder()
35+
.add_source(config::File::with_name(config_path))
36+
.add_source(config::Environment::with_prefix("OSIRIS"))
37+
.build()
38+
.unwrap()
39+
}
40+
41+
pub fn prober_config(config: AppConfig) -> ProberConfig {
42+
ProberConfig {
43+
prober_id: config.get_int("prober.id").unwrap_or(0) as u16,
44+
brokers: config
45+
.get_string("kafka.brokers")
46+
.unwrap_or("localhost:9092".to_string()),
47+
in_topics: config
48+
.get_string("kafka.in_topics")
49+
.unwrap_or("osiris".to_string()),
50+
in_group_id: config
51+
.get_string("kafka.in_group_id")
52+
.unwrap_or("osiris".to_string()),
53+
out_topic: config
54+
.get_string("kafka.out_topic")
55+
.unwrap_or("osiris-results".to_string()),
56+
out_auth_protocol: config
57+
.get_string("kafka.out_auth_protocol")
58+
.unwrap_or("PLAINTEXT".to_string()),
59+
out_auth_sasl_username: config
60+
.get_string("kafka.out_auth_sasl_username")
61+
.unwrap_or("osiris".to_string()),
62+
out_auth_sasl_password: config
63+
.get_string("kafka.out_auth_sasl_password")
64+
.unwrap_or("osiris".to_string()),
65+
out_auth_sasl_mechanism: config
66+
.get_string("kafka.out_auth_sasl_mechanism")
67+
.unwrap_or("SCRAM-SHA-512".to_string()),
68+
}
69+
}

src/handler.rs

+23-70
Original file line numberDiff line numberDiff line change
@@ -1,68 +1,13 @@
11
use anyhow::Result;
22
use caracat::models::Probe;
3-
use caracat::rate_limiter::RateLimitingMethod;
43
use ipnet::IpNet;
54
use rand::seq::SliceRandom;
65
use rand::thread_rng;
7-
use std::net::Ipv4Addr;
8-
use std::net::Ipv6Addr;
9-
use std::time::Duration;
106
use tokio::task;
117

12-
use crate::prober::probe;
13-
use crate::producer::{produce, KafkaAuth};
14-
15-
/// Probing configuration.
16-
#[derive(Debug)]
17-
pub struct Config {
18-
/// Number of probes to send before calling the rate limiter.
19-
pub batch_size: u64,
20-
/// Identifier encoded in the probes (random by default).
21-
pub instance_id: u16,
22-
/// Whether to actually send the probes on the network or not.
23-
pub dry_run: bool,
24-
/// Do not send probes with ttl < min_ttl.
25-
pub min_ttl: Option<u8>,
26-
/// Do not send probes with ttl > max_ttl.
27-
pub max_ttl: Option<u8>,
28-
/// Check that replies match valid probes.
29-
pub integrity_check: bool,
30-
/// Interface from which to send the packets.
31-
pub interface: String,
32-
/// Source IPv4 address
33-
pub src_ipv4_addr: Option<Ipv4Addr>,
34-
/// Source IPv6 address
35-
pub src_ipv6_addr: Option<Ipv6Addr>,
36-
/// Maximum number of probes to send (unlimited by default).
37-
pub max_probes: Option<u64>,
38-
/// Number of packets to send per probe.
39-
pub packets: u64,
40-
/// Probing rate in packets per second.
41-
pub probing_rate: u64,
42-
/// Method to use to limit the packets rate.
43-
pub rate_limiting_method: RateLimitingMethod,
44-
/// Time in seconds to wait after sending the probes to stop the receiver.
45-
pub receiver_wait_time: Duration,
46-
}
47-
48-
fn create_config() -> Config {
49-
Config {
50-
batch_size: 128,
51-
dry_run: false,
52-
min_ttl: None,
53-
max_ttl: None,
54-
integrity_check: true,
55-
instance_id: 0,
56-
interface: caracat::utilities::get_default_interface(),
57-
src_ipv4_addr: None,
58-
src_ipv6_addr: None,
59-
max_probes: None,
60-
packets: 1,
61-
probing_rate: 100,
62-
rate_limiting_method: caracat::rate_limiter::RateLimitingMethod::Auto,
63-
receiver_wait_time: Duration::new(3, 0),
64-
}
65-
}
8+
use crate::config::ProberConfig;
9+
use crate::prober::{load_caracat_config, probe};
10+
use crate::producer::{produce, KafkaAuth, SaslAuth};
6611

6712
struct Target {
6813
prefix: IpNet,
@@ -137,26 +82,34 @@ fn generate_probes(target: &Target) -> Result<Vec<Probe>> {
13782
Ok(probes)
13883
}
13984

140-
pub async fn handle(
141-
prober_id: u16,
142-
brokers: &str,
143-
_in_topics: &str,
144-
_in_group_id: &str,
145-
out_topic: &str,
146-
out_auth: KafkaAuth,
147-
target: &str,
148-
) -> Result<()> {
85+
pub async fn handle(config: &ProberConfig, target: &str) -> Result<()> {
86+
// Configure Kafka producer authentication
87+
let out_auth = match config.out_auth_protocol.as_str() {
88+
"PLAINTEXT" => KafkaAuth::PlainText,
89+
"SASL_PLAINTEXT" => KafkaAuth::SasalPlainText(SaslAuth {
90+
username: config.out_auth_sasl_username.clone(),
91+
password: config.out_auth_sasl_password.clone(),
92+
mechanism: config.out_auth_sasl_mechanism.clone(),
93+
}),
94+
_ => {
95+
return Err(anyhow::anyhow!(
96+
"Invalid Kafka producer authentication protocol"
97+
))
98+
}
99+
};
100+
149101
// Probe Generation
150-
let config = create_config();
102+
let caracat_config = load_caracat_config();
151103
let target = decode_payload(target)?;
152104
let probes_to_send = generate_probes(&target)?;
153105

154106
// Probing
155-
let result = task::spawn_blocking(move || probe(config, probes_to_send.into_iter())).await?;
107+
let result =
108+
task::spawn_blocking(move || probe(caracat_config, probes_to_send.into_iter())).await?;
156109
let (_, _, results) = result?;
157110

158111
// Produce the results to Kafka topic
159-
produce(brokers, out_topic, prober_id, out_auth, results).await;
112+
produce(config, out_auth, results).await;
160113

161114
Ok(())
162115
}

src/main.rs

+39-73
Original file line numberDiff line numberDiff line change
@@ -1,67 +1,51 @@
1+
mod config;
12
mod consumer;
23
mod handler;
34
mod prober;
45
mod producer;
56

67
use anyhow::Result;
78
use chrono::Local;
8-
use clap::Parser as CliParser;
9+
use clap::{Args, Parser, Subcommand};
910
use clap_verbosity_flag::{InfoLevel, Verbosity};
1011
use env_logger::Builder;
1112
use std::io::Write;
1213

1314
// use crate::consumer::consume;
15+
use crate::config::{load_config, prober_config};
1416
use crate::handler::handle;
1517

16-
#[derive(CliParser, Debug, Clone)]
17-
#[command(version, about, long_about = None)]
18-
struct Cli {
19-
/// Prober ID
20-
#[clap(long, default_value = "0")]
21-
prober_id: u16,
22-
23-
/// Kafka brokers
24-
#[clap(long, default_value = "localhost:9092")]
25-
brokers: String,
26-
27-
/// Kafka consumer topics (comma separated)
28-
#[clap(long, default_value = "osiris")]
29-
in_topics: String,
30-
31-
/// Kafka consumer group ID
32-
#[clap(long, default_value = "osiris")]
33-
in_group_id: String,
34-
35-
/// Kafka producer topic
36-
#[clap(long, default_value = "osiris-results")]
37-
out_topic: String,
38-
39-
/// Kafka producer Authentication Protocol
40-
#[clap(long, default_value = "PLAINTEXT")]
41-
out_auth_protocol: String,
42-
43-
/// Kafka producer Authentication SASL Username
44-
#[clap(long, default_value = "osiris")]
45-
out_auth_sasl_username: String,
46-
47-
/// Kafka producer Authentication SASL Password
48-
#[clap(long, default_value = "osiris")]
49-
out_auth_sasl_password: String,
50-
51-
/// Kafka producer Authentication SASL Mechanism
52-
#[clap(long, default_value = "SCRAM-SHA-512")]
53-
out_auth_sasl_mechanism: String,
18+
#[derive(Debug, Parser)]
19+
#[clap(name = "Osiris", version)]
20+
pub struct App {
21+
#[clap(flatten)]
22+
global_opts: GlobalOpts,
5423

55-
/// Target (eg., 2606:4700:4700::1111/128,1,32,1)
56-
#[arg(index = 1)]
57-
target: String,
24+
#[clap(subcommand)]
25+
command: Command,
26+
}
27+
#[derive(Debug, Subcommand)]
28+
#[command(version, about, long_about = None)]
29+
enum Command {
30+
Prober {
31+
/// Configuration file
32+
#[arg(short, long)]
33+
config: String,
34+
35+
/// Target (eg., 2606:4700:4700::1111/128,1,32,1)
36+
#[arg(index = 1)]
37+
target: String,
38+
},
39+
}
5840

41+
#[derive(Debug, Args)]
42+
struct GlobalOpts {
5943
/// Verbosity level
6044
#[clap(flatten)]
6145
verbose: Verbosity<InfoLevel>,
6246
}
6347

64-
fn set_logging(cli: &Cli) {
48+
fn set_logging(cli: &GlobalOpts) {
6549
Builder::new()
6650
.format(|buf, record| {
6751
writeln!(
@@ -78,37 +62,19 @@ fn set_logging(cli: &Cli) {
7862

7963
#[tokio::main]
8064
async fn main() -> Result<()> {
81-
let cli = Cli::parse();
82-
set_logging(&cli);
83-
84-
// Configure Kafka producer authentication
85-
let out_auth = match cli.out_auth_protocol.as_str() {
86-
"PLAINTEXT" => producer::KafkaAuth::PLAINTEXT,
87-
"SASL" => producer::KafkaAuth::SASL(producer::SaslAuth {
88-
username: cli.out_auth_sasl_username.clone(),
89-
password: cli.out_auth_sasl_password.clone(),
90-
mechanism: cli.out_auth_sasl_mechanism.clone(),
91-
}),
92-
_ => {
93-
return Err(anyhow::anyhow!(
94-
"Invalid Kafka producer authentication protocol"
95-
))
65+
let cli = App::parse();
66+
set_logging(&cli.global_opts);
67+
68+
match cli.command {
69+
Command::Prober { config, target } => {
70+
let app_config = load_config(&config);
71+
let prober_config = prober_config(app_config);
72+
73+
match handle(&prober_config, &target).await {
74+
Ok(_) => (),
75+
Err(e) => log::error!("Error: {}", e),
76+
}
9677
}
97-
};
98-
99-
match handle(
100-
cli.prober_id,
101-
&cli.brokers,
102-
&cli.in_topics,
103-
&cli.in_group_id,
104-
&cli.out_topic,
105-
out_auth,
106-
&cli.target,
107-
)
108-
.await
109-
{
110-
Ok(_) => (),
111-
Err(e) => log::error!("Error: {}", e),
11278
}
11379

11480
Ok(())

0 commit comments

Comments
 (0)