Skip to content

Commit aa3b840

Browse files
committed
move probe generation to client
1 parent 07f0209 commit aa3b840

File tree

5 files changed

+160
-89
lines changed

5 files changed

+160
-89
lines changed

src/agent/handler.rs

Lines changed: 5 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
use anyhow::Result;
2-
use caracat::models::Probe;
3-
use ipnet::IpNet;
42
use log::{info, trace};
5-
use rand::seq::SliceRandom;
6-
use rand::thread_rng;
3+
74
use rdkafka::consumer::{CommitMode, Consumer};
85
use rdkafka::message::Headers;
96
use rdkafka::Message;
@@ -16,65 +13,9 @@ use crate::agent::receiver::ReceiveLoop;
1613
use crate::agent::sender::send;
1714
use crate::auth::{KafkaAuth, SaslAuth};
1815
use crate::config::AppConfig;
19-
use crate::target::{decode_target, Target};
16+
use crate::probe::decode_probes;
2017
use crate::utils::test_id;
2118

22-
fn generate_probes(target: &Target) -> Result<Vec<Probe>> {
23-
// TODO: We should pass an iterator instead of a vector.
24-
let mut probes = vec![];
25-
26-
// First start by dividing the prefix into /24s or /64s, if necessary.
27-
let subnets = match target.prefix {
28-
IpNet::V4(_) => {
29-
let prefix_len = target.prefix.prefix_len();
30-
let target_len = if prefix_len > 24 { prefix_len } else { 24 };
31-
target.prefix.subnets(target_len)
32-
}
33-
IpNet::V6(_) => {
34-
let prefix_len = target.prefix.prefix_len();
35-
let target_len = if prefix_len > 64 { prefix_len } else { 64 };
36-
target.prefix.subnets(target_len)
37-
}
38-
}?;
39-
40-
// Iterate over the subnets and generate the probes.
41-
for subnet in subnets {
42-
// Right now the probe generation is simplistic, we just iterate over the hosts.
43-
// If we need more flows than hosts, we will we explicitely fail.
44-
// TODO: implement mapper-like generator such as the ones in diamond-miner.
45-
// https://github.com/dioptra-io/diamond-miner/blob/main/diamond_miner/mappers.py
46-
let mut prefix_hosts = subnet.hosts();
47-
if target.n_flows > prefix_hosts.count().try_into()? {
48-
return Err(anyhow::anyhow!("Not enough hosts in the prefix"));
49-
}
50-
51-
for _ in 0..target.n_flows {
52-
let dst_addr = prefix_hosts.next().unwrap();
53-
54-
// Randomize the probes order within a flow.
55-
// In YARRP we randomize the probes over the entire probing space.
56-
// This is of course a very big simplication, but it's not that silly.
57-
// The rational is to avoid results errors due to path changes.
58-
// So, for now, probes belonging to the same traceroute flow will be sent close in time.
59-
// TODO: is this shuffle fast?
60-
let mut ttls: Vec<u8> = (target.min_ttl..target.max_ttl).collect();
61-
ttls.shuffle(&mut thread_rng());
62-
63-
for i in ttls {
64-
probes.push(Probe {
65-
dst_addr,
66-
src_port: 24000,
67-
dst_port: 33434,
68-
ttl: i,
69-
protocol: target.protocol.clone(),
70-
});
71-
}
72-
}
73-
}
74-
75-
Ok(probes)
76-
}
77-
7819
pub async fn handle(config: &AppConfig) -> Result<()> {
7920
trace!("Agent handler");
8021
trace!("{:?}", config);
@@ -123,7 +64,7 @@ pub async fn handle(config: &AppConfig) -> Result<()> {
12364
info!("Kafka error: {}", e);
12465
}
12566
Ok(m) => {
126-
let target = match m.payload_view::<str>() {
67+
let probes = match m.payload_view::<str>() {
12768
None => "",
12869
Some(Ok(s)) => s,
12970
Some(Err(e)) => {
@@ -134,15 +75,6 @@ pub async fn handle(config: &AppConfig) -> Result<()> {
13475
}
13576
};
13677

137-
info!(
138-
"key: '{:?}', payload: '{}', topic: {}, partition: {}, offset: {}, timestamp: {:?}",
139-
m.key(),
140-
target,
141-
m.topic(),
142-
m.partition(),
143-
m.offset(),
144-
m.timestamp()
145-
);
14678
if let Some(headers) = m.headers() {
14779
for header in headers.iter() {
14880
info!(" Header {:#?}: {:?}", header.key, header.value);
@@ -168,9 +100,8 @@ pub async fn handle(config: &AppConfig) -> Result<()> {
168100
continue;
169101
}
170102

171-
// Probe Generation
172-
let target = decode_target(target)?;
173-
let probes_to_send = generate_probes(&target)?;
103+
// Decode probes
104+
let probes_to_send = decode_probes(probes)?;
174105

175106
// Probing
176107
let config_clone = config.clone();

src/client/handler.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use log::trace;
44
use crate::auth::{KafkaAuth, SaslAuth};
55
use crate::client::producer::produce;
66
use crate::config::AppConfig;
7+
use crate::probe::generate_probes;
78
use crate::target::decode_target;
89

910
pub async fn handle(config: &AppConfig, agents: &str, target: &str) -> Result<()> {
@@ -25,10 +26,14 @@ pub async fn handle(config: &AppConfig, agents: &str, target: &str) -> Result<()
2526
}
2627
};
2728

29+
// Split agents
2830
let agents = agents.split(',').collect::<Vec<&str>>();
29-
decode_target(target)?;
3031

31-
produce(config, auth, agents, target).await;
32+
// Probe Generation
33+
let target = decode_target(target)?;
34+
let probes = generate_probes(&target)?;
35+
36+
produce(config, auth, agents, probes).await;
3237

3338
Ok(())
3439
}

src/client/producer.rs

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use caracat::models::Probe;
12
use log::info;
23
use rdkafka::config::ClientConfig;
34
use rdkafka::message::{Header, OwnedHeaders};
@@ -6,11 +7,12 @@ use std::time::Duration;
67

78
use crate::auth::KafkaAuth;
89
use crate::config::AppConfig;
10+
use crate::probe::encode_probe;
911

1012
// TODO
1113
// - check target format
1214

13-
pub async fn produce(config: &AppConfig, auth: KafkaAuth, agents: Vec<&str>, target: &str) {
15+
pub async fn produce(config: &AppConfig, auth: KafkaAuth, agents: Vec<&str>, probes: Vec<Probe>) {
1416
let producer: &FutureProducer = match auth {
1517
KafkaAuth::PlainText => &ClientConfig::new()
1618
.set("bootstrap.servers", config.kafka.brokers.clone())
@@ -29,7 +31,7 @@ pub async fn produce(config: &AppConfig, auth: KafkaAuth, agents: Vec<&str>, tar
2931
};
3032

3133
let topic = config.kafka.in_topics.split(',').collect::<Vec<&str>>()[0];
32-
info!("Producing to topic: {}", topic);
34+
info!("Producing {} probes to {}", probes.len(), topic);
3335

3436
// Construct headers
3537
let mut headers = OwnedHeaders::new();
@@ -40,15 +42,35 @@ pub async fn produce(config: &AppConfig, auth: KafkaAuth, agents: Vec<&str>, tar
4042
});
4143
}
4244

43-
let delivery_status = producer
44-
.send(
45-
FutureRecord::to(topic)
46-
.payload(target)
47-
.key(&format!("")) // TODO Client ID
48-
.headers(headers),
49-
Duration::from_secs(0),
50-
)
51-
.await;
52-
53-
info!("{:?}", delivery_status);
45+
// Bucket probes into Kafka messages
46+
let mut messages = Vec::new();
47+
let mut current_message = String::new();
48+
for probe in probes {
49+
// Format probe
50+
let probe_str = encode_probe(&probe);
51+
// Max message size is 1048576 bytes
52+
if current_message.len() + probe_str.len() > 1048576 {
53+
messages.push(current_message);
54+
current_message = String::new();
55+
}
56+
current_message.push_str(&probe_str);
57+
}
58+
messages.push(current_message);
59+
60+
info!("Sending {} messages", messages.len());
61+
62+
// Send to Kafka
63+
for message in messages {
64+
let delivery_status = producer
65+
.send(
66+
FutureRecord::to(topic)
67+
.payload(&format!("{}", message))
68+
.key(&format!("")) // TODO Client ID
69+
.headers(headers.clone()),
70+
Duration::from_secs(0),
71+
)
72+
.await;
73+
74+
info!("{:?}", delivery_status);
75+
}
5476
}

src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ mod agent;
22
mod auth;
33
mod client;
44
mod config;
5+
mod probe;
56
mod target;
67
mod utils;
78

src/probe.rs

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
use anyhow::Result;
2+
use caracat::models::Probe;
3+
use ipnet::IpNet;
4+
use rand::seq::SliceRandom;
5+
use rand::thread_rng;
6+
7+
use crate::target::Target;
8+
9+
pub fn generate_probes(target: &Target) -> Result<Vec<Probe>> {
10+
// TODO: We should pass an iterator instead of a vector.
11+
let mut probes = vec![];
12+
13+
// First start by dividing the prefix into /24s or /64s, if necessary.
14+
let subnets = match target.prefix {
15+
IpNet::V4(_) => {
16+
let prefix_len = target.prefix.prefix_len();
17+
let target_len = if prefix_len > 24 { prefix_len } else { 24 };
18+
target.prefix.subnets(target_len)
19+
}
20+
IpNet::V6(_) => {
21+
let prefix_len = target.prefix.prefix_len();
22+
let target_len = if prefix_len > 64 { prefix_len } else { 64 };
23+
target.prefix.subnets(target_len)
24+
}
25+
}?;
26+
27+
// Iterate over the subnets and generate the probes.
28+
for subnet in subnets {
29+
// Right now the probe generation is simplistic, we just iterate over the hosts.
30+
// If we need more flows than hosts, we will we explicitely fail.
31+
// TODO: implement mapper-like generator such as the ones in diamond-miner.
32+
// https://github.com/dioptra-io/diamond-miner/blob/main/diamond_miner/mappers.py
33+
let mut prefix_hosts = subnet.hosts();
34+
if target.n_flows > prefix_hosts.count().try_into()? {
35+
return Err(anyhow::anyhow!("Not enough hosts in the prefix"));
36+
}
37+
38+
for _ in 0..target.n_flows {
39+
let dst_addr = prefix_hosts.next().unwrap();
40+
41+
// Randomize the probes order within a flow.
42+
// In YARRP we randomize the probes over the entire probing space.
43+
// This is of course a very big simplication, but it's not that silly.
44+
// The rational is to avoid results errors due to path changes.
45+
// So, for now, probes belonging to the same traceroute flow will be sent close in time.
46+
// TODO: is this shuffle fast?
47+
let mut ttls: Vec<u8> = (target.min_ttl..target.max_ttl).collect();
48+
ttls.shuffle(&mut thread_rng());
49+
50+
for i in ttls {
51+
probes.push(Probe {
52+
dst_addr,
53+
src_port: 24000,
54+
dst_port: 33434,
55+
ttl: i,
56+
protocol: target.protocol.clone(),
57+
});
58+
}
59+
}
60+
}
61+
62+
Ok(probes)
63+
}
64+
65+
pub fn encode_protocol(protocol: caracat::models::L4) -> String {
66+
match protocol {
67+
caracat::models::L4::ICMP => "icmp".to_string(),
68+
caracat::models::L4::ICMPv6 => "icmpv6".to_string(),
69+
caracat::models::L4::UDP => "udp".to_string(),
70+
}
71+
}
72+
73+
pub fn decode_protocol(protocol: &str) -> Result<caracat::models::L4> {
74+
match protocol {
75+
"icmp" => Ok(caracat::models::L4::ICMP),
76+
"icmpv6" => Ok(caracat::models::L4::ICMPv6),
77+
"udp" => Ok(caracat::models::L4::UDP),
78+
_ => Err(anyhow::anyhow!("Invalid protocol: {}", protocol)),
79+
}
80+
}
81+
82+
pub fn encode_probe(probe: &Probe) -> String {
83+
format!(
84+
"{},{},{},{},{}\n",
85+
probe.dst_addr,
86+
probe.src_port,
87+
probe.dst_port,
88+
probe.ttl,
89+
encode_protocol(probe.protocol),
90+
)
91+
}
92+
93+
pub fn decode_probes(probes: &str) -> Result<Vec<Probe>> {
94+
let mut decoded_probes = vec![];
95+
96+
for probe in probes.lines() {
97+
let fields: Vec<&str> = probe.split(',').collect();
98+
if fields.len() != 5 {
99+
return Err(anyhow::anyhow!("Invalid probe format: {}", probe));
100+
}
101+
102+
decoded_probes.push(Probe {
103+
dst_addr: fields[0].parse()?,
104+
src_port: fields[1].parse()?,
105+
dst_port: fields[2].parse()?,
106+
ttl: fields[3].parse()?,
107+
protocol: decode_protocol(fields[4])?,
108+
});
109+
}
110+
111+
Ok(decoded_probes)
112+
}

0 commit comments

Comments
 (0)