Skip to content

Commit 8f2c2a8

Browse files
committed
remove the concept of target list
1 parent 6425afa commit 8f2c2a8

11 files changed

+78
-175
lines changed

README.md

+5-17
Original file line numberDiff line numberDiff line change
@@ -9,37 +9,25 @@
99
1010
## Architecture
1111

12-
Right now the probing system is composed of two main components: the **client** and the **agent**. Those components send and receive messages from Kafka topics, respectively. The results are stored in a ClickHouse table.
12+
The probing system is composed of two main components: the **client** and the **agent**. Those components send and receive messages from Kafka topics, respectively. The measurements results can be handled in any way, such as storing them in a ClickHouse database.
1313

1414
Check the [testbed](testbed/README.md) for a quick setup to test the system.
1515

1616
### Agent
1717

18-
The agent performs the measurements. It listens for measurements to be made from a Kafka topic, performs the measurements, and then produces the results to another Kafka topic. The results will eventually be inserted into a ClickHouse table.
18+
The agent performs the measurements. It is always listening for results. It consumes probes to send from Kafka topic, performs the measurements, and then produces the results to another Kafka topic.
1919

2020
```sh
2121
samiris agent --config=saimiris.yml
2222
```
2323

2424
### Client
2525

26-
The client is the agent that sends the measurements to the agent. It sends a message to a Kafka topic, which represents a set of probes to be sent consecutively. A measurement can be composed of multiple messages.
26+
The client is the agent that sends the measurements to the agent. It sends messages to a Kafka topic, which represents a set of probes to be sent consecutively. A measurement can be composed of multiple messages.
2727

2828

2929
```sh
30-
samiris client --config=saimiris.yml <comma-separated-agent-id> <target>
30+
cat probes.txt | samiris client --config=saimiris.yml <comma-separated-agent-ids>
3131
```
3232

33-
A target is a network to probe. It must follow this format:
34-
35-
```
36-
network,protocol,min_ttl,max_ttl,n_flows
37-
```
38-
39-
where:
40-
- the network is a IPv4/IPv6 prefix.
41-
- the prococol can be `icmp` or `udp`.
42-
- the min_ttl and max_ttl are the minimum and maximum TTL values to probe.
43-
- the n_flows is the number of flows to use.
44-
45-
33+
The probes to send are in the [caracal](https://dioptra-io.github.io/caracal/usage/) format.

src/agent/handler.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::agent::receiver::ReceiveLoop;
1313
use crate::agent::sender::send;
1414
use crate::auth::{KafkaAuth, SaslAuth};
1515
use crate::config::AppConfig;
16-
use crate::probe::decode_probes;
16+
use crate::probe::decode_probe;
1717
use crate::utils::test_id;
1818

1919
pub async fn handle(config: &AppConfig) -> Result<()> {
@@ -101,7 +101,10 @@ pub async fn handle(config: &AppConfig) -> Result<()> {
101101
}
102102

103103
// Decode probes
104-
let probes_to_send = decode_probes(probes)?;
104+
let probes_to_send = probes
105+
.split('\n')
106+
.map(|probe| decode_probe(probe))
107+
.collect::<Result<Vec<_>>>()?;
105108

106109
// Probing
107110
let config_clone = config.clone();

src/client/generate.rs

-63
This file was deleted.

src/client/handler.rs

+10-7
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
use anyhow::Result;
22
use log::trace;
3+
use std::io::{stdin, BufRead};
34

45
use crate::auth::{KafkaAuth, SaslAuth};
5-
use crate::client::generate::generate_probes;
66
use crate::client::producer::produce;
7-
use crate::client::target::decode_target;
87
use crate::config::AppConfig;
8+
use crate::probe::decode_probe;
99

10-
pub async fn handle(config: &AppConfig, agents: &str, target: &str) -> Result<()> {
10+
pub async fn handle(config: &AppConfig, agents: &str) -> Result<()> {
1111
trace!("Client handler");
1212
trace!("{:?}", config);
1313

@@ -26,13 +26,16 @@ pub async fn handle(config: &AppConfig, agents: &str, target: &str) -> Result<()
2626
}
2727
};
2828

29+
// Get probes from stdin
30+
let mut probes = Vec::new();
31+
for line in stdin().lock().lines() {
32+
let probe = line?;
33+
probes.push(decode_probe(&probe)?);
34+
}
35+
2936
// Split agents
3037
let agents = agents.split(',').collect::<Vec<&str>>();
3138

32-
// Probe Generation
33-
let target = decode_target(target)?;
34-
let probes = generate_probes(&target)?;
35-
3639
produce(config, auth, agents, probes).await;
3740

3841
Ok(())

src/client/mod.rs

-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
mod generate;
21
mod handler;
32
mod producer;
4-
mod target;
53

64
pub use handler::handle;

src/client/producer.rs

+16-10
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,6 @@ use crate::auth::KafkaAuth;
99
use crate::config::AppConfig;
1010
use crate::probe::encode_probe;
1111

12-
// TODO
13-
// - check target format
14-
1512
pub async fn produce(config: &AppConfig, auth: KafkaAuth, agents: Vec<&str>, probes: Vec<Probe>) {
1613
let producer: &FutureProducer = match auth {
1714
KafkaAuth::PlainText => &ClientConfig::new()
@@ -31,7 +28,6 @@ pub async fn produce(config: &AppConfig, auth: KafkaAuth, agents: Vec<&str>, pro
3128
};
3229

3330
let topic = config.kafka.in_topics.split(',').collect::<Vec<&str>>()[0];
34-
info!("Producing {} probes to {}", probes.len(), topic);
3531

3632
// Construct headers
3733
let mut headers = OwnedHeaders::new();
@@ -42,22 +38,32 @@ pub async fn produce(config: &AppConfig, auth: KafkaAuth, agents: Vec<&str>, pro
4238
});
4339
}
4440

45-
// Bucket probes into Kafka messages
41+
// Place probes into Kafka messages
4642
let mut messages = Vec::new();
4743
let mut current_message = String::new();
48-
for probe in probes {
44+
for (i, probe) in probes.iter().enumerate() {
4945
// Format probe
50-
let probe_str = encode_probe(&probe);
46+
let probe_str = encode_probe(probe);
5147
// Max message size is 1048576 bytes
52-
if current_message.len() + probe_str.len() > 1048576 {
48+
if current_message.len() + probe_str.len() + 1 > 1048576 {
5349
messages.push(current_message);
5450
current_message = String::new();
5551
}
5652
current_message.push_str(&probe_str);
53+
if i < probes.len() - 1 {
54+
current_message.push_str("\n");
55+
}
56+
}
57+
if !current_message.is_empty() {
58+
messages.push(current_message);
5759
}
58-
messages.push(current_message);
5960

60-
info!("Sending {} messages", messages.len());
61+
info!(
62+
"topic={},messages={},probes={}",
63+
topic,
64+
messages.len(),
65+
probes.len(),
66+
);
6167

6268
// Send to Kafka
6369
for message in messages {

src/client/target.rs

-43
This file was deleted.

src/main.rs

+9-12
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@ mod utils;
77

88
use anyhow::Result;
99
use chrono::Local;
10-
use clap::{Args, Parser, Subcommand};
10+
use clap::{Args, CommandFactory, Parser, Subcommand};
1111
use clap_verbosity_flag::{InfoLevel, Verbosity};
1212
use env_logger::Builder;
1313
use log::error;
14-
use std::io::Write;
14+
use std::io::{stdin, IsTerminal, Write};
1515

1616
use crate::config::app_config;
1717

@@ -41,10 +41,6 @@ enum Command {
4141
/// Agent IDs (comma separated)
4242
#[arg(index = 1)]
4343
agents: String,
44-
45-
/// Target (eg., 2606:4700:4700::1111/128,icmp,1,32,1)
46-
#[arg(index = 2)]
47-
target: String,
4844
},
4945
}
5046

@@ -83,13 +79,14 @@ async fn main() -> Result<()> {
8379
Err(e) => error!("Error: {}", e),
8480
}
8581
}
86-
Command::Client {
87-
config,
88-
agents,
89-
target,
90-
} => {
82+
Command::Client { config, agents } => {
83+
if stdin().is_terminal() {
84+
App::command().print_help().unwrap();
85+
::std::process::exit(2);
86+
}
87+
9188
let app_config = app_config(&config);
92-
match client::handle(&app_config, &agents, &target).await {
89+
match client::handle(&app_config, &agents).await {
9390
Ok(_) => (),
9491
Err(e) => error!("Error: {}", e),
9592
}

src/probe.rs

+12-18
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ pub fn decode_protocol(protocol: &str) -> Result<caracat::models::L4> {
2020

2121
pub fn encode_probe(probe: &Probe) -> String {
2222
format!(
23-
"{},{},{},{},{}\n",
23+
"{},{},{},{},{}",
2424
probe.dst_addr,
2525
probe.src_port,
2626
probe.dst_port,
@@ -29,23 +29,17 @@ pub fn encode_probe(probe: &Probe) -> String {
2929
)
3030
}
3131

32-
pub fn decode_probes(probes: &str) -> Result<Vec<Probe>> {
33-
let mut decoded_probes = vec![];
34-
35-
for probe in probes.lines() {
36-
let fields: Vec<&str> = probe.split(',').collect();
37-
if fields.len() != 5 {
38-
return Err(anyhow::anyhow!("Invalid probe format: {}", probe));
39-
}
40-
41-
decoded_probes.push(Probe {
42-
dst_addr: fields[0].parse()?,
43-
src_port: fields[1].parse()?,
44-
dst_port: fields[2].parse()?,
45-
ttl: fields[3].parse()?,
46-
protocol: decode_protocol(fields[4])?,
47-
});
32+
pub fn decode_probe(probe: &str) -> Result<Probe> {
33+
let fields: Vec<&str> = probe.split(',').collect();
34+
if fields.len() != 5 {
35+
return Err(anyhow::anyhow!("Invalid probe format: {}", probe));
4836
}
4937

50-
Ok(decoded_probes)
38+
Ok(Probe {
39+
dst_addr: fields[0].parse()?,
40+
src_port: fields[1].parse()?,
41+
dst_port: fields[2].parse()?,
42+
ttl: fields[3].parse()?,
43+
protocol: decode_protocol(fields[4])?,
44+
})
5145
}

testbed/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ cargo run -- agent --config=testbed/config/saimiris/saimiris.yml
2323
* Run Saimiris Client (from the root of the repository)
2424

2525
```sh
26-
cargo run -- client --config=testbed/config/saimiris/saimiris.yml wbmwwp9vna 2606:4700:4700::1111/128,icmp,1,32,1
26+
cat testbed/probes.txt | cargo run -- client --config=testbed/config/saimiris/saimiris.yml wbmwwp9vna
2727
```
2828

2929
* Stop the testbed

testbed/probes.txt

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
2606:4700:4700::1111,24000,33434,1,icmp
2+
2606:4700:4700::1111,24000,33434,2,icmp
3+
2606:4700:4700::1111,24000,33434,3,icmp
4+
2606:4700:4700::1111,24000,33434,4,icmp
5+
2606:4700:4700::1111,24000,33434,5,icmp
6+
2606:4700:4700::1111,24000,33434,6,icmp
7+
2606:4700:4700::1111,24000,33434,7,icmp
8+
2606:4700:4700::1111,24000,33434,8,icmp
9+
2606:4700:4700::1111,24000,33434,9,icmp
10+
2606:4700:4700::1111,24000,33434,10,icmp
11+
2606:4700:4700::1111,24000,33434,11,icmp
12+
2606:4700:4700::1111,24000,33434,12,icmp
13+
2606:4700:4700::1111,24000,33434,13,icmp
14+
2606:4700:4700::1111,24000,33434,14,icmp
15+
2606:4700:4700::1111,24000,33434,15,icmp
16+
2606:4700:4700::1111,24000,33434,16,icmp
17+
2606:4700:4700::1111,24000,33434,17,icmp
18+
2606:4700:4700::1111,24000,33434,18,icmp
19+
2606:4700:4700::1111,24000,33434,19,icmp
20+
2606:4700:4700::1111,24000,33434,20,icmp

0 commit comments

Comments
 (0)