Skip to content

Commit 6dca083

Browse files
committed
add a basic CLI
1 parent c91035a commit 6dca083

File tree

2 files changed

+52
-34
lines changed

2 files changed

+52
-34
lines changed

src/handler.rs

+20-27
Original file line numberDiff line numberDiff line change
@@ -64,38 +64,38 @@ fn create_config() -> Config {
6464
}
6565
}
6666

67-
struct Payload {
67+
struct Target {
6868
prefix: IpNet,
6969
min_ttl: u8,
7070
max_ttl: u8,
7171
n_flows: u64,
7272
}
7373

74-
fn decode_payload(payload: &str) -> Result<Payload> {
74+
fn decode_payload(payload: &str) -> Result<Target> {
7575
let parts: Vec<&str> = payload.split(',').collect();
76-
Ok(Payload {
76+
Ok(Target {
7777
prefix: parts[0].parse()?,
7878
min_ttl: parts[1].parse()?,
7979
max_ttl: parts[2].parse()?,
8080
n_flows: parts[3].parse()?,
8181
})
8282
}
8383

84-
fn generate_probes(payload: &Payload) -> Result<Vec<Probe>> {
84+
fn generate_probes(target: &Target) -> Result<Vec<Probe>> {
8585
// TODO: We should pass an iterator instead of a vector.
8686
let mut probes = vec![];
8787

8888
// First start by dividing the prefix into /24s or /64s, if necessary.
89-
let subnets = match payload.prefix {
89+
let subnets = match target.prefix {
9090
IpNet::V4(_) => {
91-
let prefix_len = payload.prefix.prefix_len();
91+
let prefix_len = target.prefix.prefix_len();
9292
let target_len = if prefix_len > 24 { prefix_len } else { 24 };
93-
payload.prefix.subnets(target_len)
93+
target.prefix.subnets(target_len)
9494
}
9595
IpNet::V6(_) => {
96-
let prefix_len = payload.prefix.prefix_len();
96+
let prefix_len = target.prefix.prefix_len();
9797
let target_len = if prefix_len > 64 { prefix_len } else { 64 };
98-
payload.prefix.subnets(target_len)
98+
target.prefix.subnets(target_len)
9999
}
100100
}?;
101101

@@ -106,11 +106,11 @@ fn generate_probes(payload: &Payload) -> Result<Vec<Probe>> {
106106
// TODO: implement mapper-like generator such as the ones in diamond-miner.
107107
// https://github.com/dioptra-io/diamond-miner/blob/main/diamond_miner/mappers.py
108108
let mut prefix_hosts = subnet.hosts();
109-
if payload.n_flows > prefix_hosts.count().try_into()? {
109+
if target.n_flows > prefix_hosts.count().try_into()? {
110110
return Err(anyhow::anyhow!("Not enough hosts in the prefix"));
111111
}
112112

113-
for _ in 0..payload.n_flows {
113+
for _ in 0..target.n_flows {
114114
let dst_addr = prefix_hosts.next().unwrap();
115115

116116
// Randomize the probes order within a flow.
@@ -119,7 +119,7 @@ fn generate_probes(payload: &Payload) -> Result<Vec<Probe>> {
119119
// The rational is to avoid results errors due to path changes.
120120
// So, for now, probes belonging to the same traceroute flow will be sent close in time.
121121
// TODO: is this shuffle fast?
122-
let mut ttls: Vec<u8> = (payload.min_ttl..payload.max_ttl).collect();
122+
let mut ttls: Vec<u8> = (target.min_ttl..target.max_ttl).collect();
123123
ttls.shuffle(&mut thread_rng());
124124

125125
for i in ttls {
@@ -139,28 +139,21 @@ fn generate_probes(payload: &Payload) -> Result<Vec<Probe>> {
139139

140140
pub async fn handle(
141141
brokers: &str,
142+
_in_topics: &str,
142143
_in_group_id: &str,
143-
_in_topics: &[&str],
144144
out_topic: &str,
145+
target: &str,
145146
) -> Result<()> {
146-
let payload = "2606:4700:4700::1111/128,1,32,1";
147-
148-
// Probing
147+
// Probe Generation
149148
let config = create_config();
150-
let payload = decode_payload(payload)?;
151-
152-
let probes_to_send = generate_probes(&payload);
153-
let probes_to_send = match probes_to_send {
154-
Ok(probes) => probes,
155-
Err(e) => {
156-
eprintln!("Error: {}", e);
157-
return Ok(());
158-
}
159-
};
149+
let target = decode_payload(target)?;
150+
let probes_to_send = generate_probes(&target)?;
160151

152+
// Probing
161153
let result = task::spawn_blocking(move || probe(config, probes_to_send.into_iter())).await?;
162-
163154
let (_, _, results) = result?;
155+
156+
// Produce the results to Kafka topic
164157
produce(brokers, out_topic, results).await;
165158

166159
Ok(())

src/main.rs

+32-7
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,26 @@ use crate::handler::handle;
1616
#[derive(CliParser, Debug, Clone)]
1717
#[command(version, about, long_about = None)]
1818
struct Cli {
19+
/// Kafka brokers
20+
#[clap(long, default_value = "localhost:9092")]
21+
brokers: String,
22+
23+
/// Kafka consumer topics (comma separated)
24+
#[clap(long, default_value = "osiris")]
25+
in_topics: String,
26+
27+
/// Kafka consumer group ID
28+
#[clap(long, default_value = "osiris")]
29+
in_group_id: String,
30+
31+
/// Kafka producer topic
32+
#[clap(long, default_value = "results")]
33+
out_topic: String,
34+
35+
/// Target (eg., 2606:4700:4700::1111/128,1,32,1)
36+
#[arg(index = 1)]
37+
target: String,
38+
1939
/// Verbosity level
2040
#[clap(flatten)]
2141
verbose: Verbosity<InfoLevel>,
@@ -41,13 +61,18 @@ async fn main() -> Result<()> {
4161
let cli = Cli::parse();
4262
set_logging(&cli);
4363

44-
let brokers = "localhost:9092";
45-
let in_group_id = "osiris";
46-
let in_topics = vec!["osiris"];
47-
let out_topic = "results";
48-
49-
// consume(brokers, group_id, &topics).await;
50-
let _ = handle(&brokers, &in_group_id, &in_topics, &out_topic).await;
64+
match handle(
65+
&cli.brokers,
66+
&cli.in_topics,
67+
&cli.in_group_id,
68+
&cli.out_topic,
69+
&cli.target,
70+
)
71+
.await
72+
{
73+
Ok(_) => (),
74+
Err(e) => log::error!("Error: {}", e),
75+
}
5176

5277
Ok(())
5378
}

0 commit comments

Comments
 (0)