Skip to content

Commit c91035a

Browse files
committed
handle prefixes instead of addresses
1 parent 5edadad commit c91035a

File tree

4 files changed

+96
-44
lines changed

4 files changed

+96
-44
lines changed

Cargo.lock

+8
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ clap-verbosity-flag = "3.0.0"
1212
csv = "1.3.1"
1313
env_logger = "0.11.5"
1414
hyperloglog = "1.0.2"
15+
ipnet = "2.10.1"
1516
log = "0.4.22"
1617
pcap = "2.2.0"
18+
rand = "0.8.5"
1719
rdkafka = { version = "0.37.0" }
1820
tokio = { version = "1.42.0", features = ["full"] }

src/handler.rs

+73-43
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
use anyhow::Result;
22
use caracat::models::Probe;
33
use caracat::rate_limiter::RateLimitingMethod;
4-
// use log::{info, warn};
5-
// use rdkafka::message::BorrowedMessage;
6-
// use rdkafka::message::{Headers, Message};
7-
use std::net::IpAddr;
4+
use ipnet::IpNet;
5+
use rand::seq::SliceRandom;
6+
use rand::thread_rng;
87
use std::net::Ipv4Addr;
98
use std::net::Ipv6Addr;
109
use std::time::Duration;
@@ -66,68 +65,99 @@ fn create_config() -> Config {
6665
}
6766

6867
struct Payload {
69-
dst_addr: IpAddr,
68+
prefix: IpNet,
7069
min_ttl: u8,
7170
max_ttl: u8,
71+
n_flows: u64,
7272
}
7373

7474
fn decode_payload(payload: &str) -> Result<Payload> {
7575
let parts: Vec<&str> = payload.split(',').collect();
7676
Ok(Payload {
77-
dst_addr: parts[0].parse()?,
77+
prefix: parts[0].parse()?,
7878
min_ttl: parts[1].parse()?,
7979
max_ttl: parts[2].parse()?,
80+
n_flows: parts[3].parse()?,
8081
})
8182
}
8283

83-
// pub async fn handle(m: &BorrowedMessage<'_>) -> Result<()> {
84+
fn generate_probes(payload: &Payload) -> Result<Vec<Probe>> {
85+
// TODO: We should pass an iterator instead of a vector.
86+
let mut probes = vec![];
87+
88+
// First start by dividing the prefix into /24s or /64s, if necessary.
89+
let subnets = match payload.prefix {
90+
IpNet::V4(_) => {
91+
let prefix_len = payload.prefix.prefix_len();
92+
let target_len = if prefix_len > 24 { prefix_len } else { 24 };
93+
payload.prefix.subnets(target_len)
94+
}
95+
IpNet::V6(_) => {
96+
let prefix_len = payload.prefix.prefix_len();
97+
let target_len = if prefix_len > 64 { prefix_len } else { 64 };
98+
payload.prefix.subnets(target_len)
99+
}
100+
}?;
101+
102+
// Iterate over the subnets and generate the probes.
103+
for subnet in subnets {
104+
// Right now the probe generation is simplistic, we just iterate over the hosts.
105+
// If we need more flows than hosts, we will we explicitely fail.
106+
// TODO: implement mapper-like generator such as the ones in diamond-miner.
107+
// https://github.com/dioptra-io/diamond-miner/blob/main/diamond_miner/mappers.py
108+
let mut prefix_hosts = subnet.hosts();
109+
if payload.n_flows > prefix_hosts.count().try_into()? {
110+
return Err(anyhow::anyhow!("Not enough hosts in the prefix"));
111+
}
112+
113+
for _ in 0..payload.n_flows {
114+
let dst_addr = prefix_hosts.next().unwrap();
115+
116+
// Randomize the probes order within a flow.
117+
// In YARRP we randomize the probes over the entire probing space.
118+
// This is of course a very big simplication, but it's not that silly.
119+
// The rational is to avoid results errors due to path changes.
120+
// So, for now, probes belonging to the same traceroute flow will be sent close in time.
121+
// TODO: is this shuffle fast?
122+
let mut ttls: Vec<u8> = (payload.min_ttl..payload.max_ttl).collect();
123+
ttls.shuffle(&mut thread_rng());
124+
125+
for i in ttls {
126+
probes.push(Probe {
127+
dst_addr,
128+
src_port: 24000,
129+
dst_port: 33434,
130+
ttl: i,
131+
protocol: caracat::models::L4::ICMPv6,
132+
});
133+
}
134+
}
135+
}
136+
137+
Ok(probes)
138+
}
139+
84140
pub async fn handle(
85141
brokers: &str,
86142
_in_group_id: &str,
87143
_in_topics: &[&str],
88144
out_topic: &str,
89145
) -> Result<()> {
90-
// let payload = match m.payload_view::<str>() {
91-
// None => "",
92-
// Some(Ok(s)) => s,
93-
// Some(Err(e)) => {
94-
// warn!("Error while deserializing message payload: {:?}", e);
95-
// ""
96-
// }
97-
// };
98-
99-
// info!(
100-
// "key: '{:?}', payload: '{}', topic: {}, partition: {}, offset: {}, timestamp: {:?}",
101-
// m.key(),
102-
// payload,
103-
// m.topic(),
104-
// m.partition(),
105-
// m.offset(),
106-
// m.timestamp()
107-
// );
108-
109-
// if let Some(headers) = m.headers() {
110-
// for header in headers.iter() {
111-
// info!(" Header {:#?}: {:?}", header.key, header.value);
112-
// }
113-
// }
114-
115-
let payload = "2606:4700:4700::1111,1,32";
146+
let payload = "2606:4700:4700::1111/128,1,32,1";
116147

117148
// Probing
118149
let config = create_config();
119150
let payload = decode_payload(payload)?;
120151

121-
let mut probes_to_send = vec![];
122-
for i in payload.min_ttl..=payload.max_ttl {
123-
probes_to_send.push(Probe {
124-
dst_addr: payload.dst_addr.clone(),
125-
src_port: 24000,
126-
dst_port: 33434,
127-
ttl: i,
128-
protocol: caracat::models::L4::ICMPv6,
129-
});
130-
}
152+
let probes_to_send = generate_probes(&payload);
153+
let probes_to_send = match probes_to_send {
154+
Ok(probes) => probes,
155+
Err(e) => {
156+
eprintln!("Error: {}", e);
157+
return Ok(());
158+
}
159+
};
160+
131161
let result = task::spawn_blocking(move || probe(config, probes_to_send.into_iter())).await?;
132162

133163
let (_, _, results) = result?;

testbed/README.md

+13-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
Docker Compose setup to facilitate the tests of Osiris.
44

5-
The testbed consists in a Redpanda instance.
5+
The testbed consists in a Redpanda and ClickHouse instance. Required ClickHouse [tables](config/clickhouse/docker-entrypoint-initdb.d/init.sql) are created on startup. The `osiris.results_broker` is using the ClickHouse [Kafka engine](https://clickhouse.com/docs/en/engines/table-engines/integrations/kafka) to fetch the results from Redpanda. The `osiris.results` table is used to store the results.
66

77

88
## Usage
@@ -12,3 +12,15 @@ The testbed consists in a Redpanda instance.
1212
```sh
1313
docker compose up -d --force-recreate --renew-anon-volumes
1414
```
15+
16+
* Run Osiris (from the root of the repository)
17+
18+
```sh
19+
cargo run
20+
```
21+
22+
* Stop the testbed
23+
24+
```sh
25+
docker compose down
26+
```

0 commit comments

Comments
 (0)