Skip to content

Commit e143e9b

Browse files
committed
implement prober selection
1 parent 5021ad0 commit e143e9b

File tree

13 files changed

+123
-48
lines changed

13 files changed

+123
-48
lines changed

Cargo.lock

+31-21
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ env_logger = "0.11.5"
1515
hyperloglog = "1.0.2"
1616
ipnet = "2.10.1"
1717
log = "0.4.22"
18+
nanoid = "0.4.0"
1819
pcap = "2.2.0"
1920
rand = "0.8.5"
2021
rdkafka = { version = "0.37.0", features = ["sasl", "ssl"] }

README.md

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
<p align="center">
2-
<img src="logo/logo.png" height="256" width="256" alt="saimiris project logo" />
2+
<img src="logo/logo.png" height="256" width="256" alt="Project Logo" />
33
</p>
44

5-
65
# Saimiris
76

87
> [!WARNING]

src/client/handler.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::auth::{KafkaAuth, SaslAuth};
44
use crate::client::producer::produce;
55
use crate::config::AppConfig;
66

7-
pub async fn handle(config: &AppConfig, target: &str) -> Result<()> {
7+
pub async fn handle(config: &AppConfig, probers: &str, target: &str) -> Result<()> {
88
// Configure Kafka authentication
99
let auth = match config.kafka.auth_protocol.as_str() {
1010
"PLAINTEXT" => KafkaAuth::PlainText,
@@ -20,7 +20,9 @@ pub async fn handle(config: &AppConfig, target: &str) -> Result<()> {
2020
}
2121
};
2222

23-
produce(config, auth, target).await;
23+
let probers = probers.split(',').collect::<Vec<&str>>();
24+
25+
produce(config, auth, probers, target).await;
2426

2527
Ok(())
2628
}

src/client/producer.rs

+12-7
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use crate::config::AppConfig;
1111
// - check target format
1212
// - Put as header the IDs of the prober to use
1313

14-
pub async fn produce(config: &AppConfig, auth: KafkaAuth, target: &str) {
14+
pub async fn produce(config: &AppConfig, auth: KafkaAuth, probers: Vec<&str>, target: &str) {
1515
let producer: &FutureProducer = match auth {
1616
KafkaAuth::PlainText => &ClientConfig::new()
1717
.set("bootstrap.servers", config.kafka.brokers.clone())
@@ -32,16 +32,21 @@ pub async fn produce(config: &AppConfig, auth: KafkaAuth, target: &str) {
3232
let topic = config.kafka.in_topics.split(',').collect::<Vec<&str>>()[0];
3333
info!("Producing to topic: {}", topic);
3434

35+
// Construct headers
36+
let mut headers = OwnedHeaders::new();
37+
for prober in probers {
38+
headers = headers.insert(Header {
39+
key: prober,
40+
value: Some(prober),
41+
});
42+
}
43+
3544
let delivery_status = producer
3645
.send(
3746
FutureRecord::to(topic)
3847
.payload(target)
39-
.key(&format!("Key")) // TODO
40-
.headers(OwnedHeaders::new().insert(Header {
41-
// TODO
42-
key: "header_key",
43-
value: Some("header_value"),
44-
})),
48+
.key(&format!("")) // TODO Client ID
49+
.headers(headers),
4550
Duration::from_secs(0),
4651
)
4752
.await;

src/config.rs

+7-4
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1+
use caracat::rate_limiter::RateLimitingMethod;
2+
use config::Config;
13
use std::{
24
net::{Ipv4Addr, Ipv6Addr},
35
time::Duration,
46
};
57

6-
use caracat::rate_limiter::RateLimitingMethod;
7-
use config::Config;
8+
use crate::utils::generate_id;
89

910
#[derive(Debug, Clone)]
1011
pub struct CaracatConfig {
@@ -68,7 +69,7 @@ pub struct KafkaConfig {
6869
#[derive(Debug, Clone)]
6970
pub struct ProberConfig {
7071
/// Prober identifier.
71-
pub prober_id: u16,
72+
pub prober_id: String,
7273
}
7374

7475
#[derive(Debug, Clone)]
@@ -146,7 +147,9 @@ pub fn prober_config(config: Config) -> AppConfig {
146147

147148
// Prober configuration
148149
prober: ProberConfig {
149-
prober_id: config.get_int("prober.prober_id").unwrap_or(0) as u16,
150+
prober_id: config
151+
.get_string("prober.id")
152+
.unwrap_or(generate_id(None, None)),
150153
},
151154
}
152155
}

src/main.rs

+15-5
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@ mod auth;
22
mod client;
33
mod config;
44
mod prober;
5+
mod utils;
56

67
use anyhow::Result;
78
use chrono::Local;
89
use clap::{Args, Parser, Subcommand};
910
use clap_verbosity_flag::{InfoLevel, Verbosity};
1011
use env_logger::Builder;
12+
use log::error;
1113
use std::io::Write;
1214

1315
use crate::config::{load_config, prober_config};
@@ -35,8 +37,12 @@ enum Command {
3537
#[arg(short, long)]
3638
config: String,
3739

38-
/// Target (eg., 2606:4700:4700::1111/128,1,32,1)
40+
/// Probers IDs (comma separated)
3941
#[arg(index = 1)]
42+
probers: String,
43+
44+
/// Target (eg., 2606:4700:4700::1111/128,1,32,1)
45+
#[arg(index = 2)]
4046
target: String,
4147
},
4248
}
@@ -75,16 +81,20 @@ async fn main() -> Result<()> {
7581

7682
match prober::handle(&prober_config).await {
7783
Ok(_) => (),
78-
Err(e) => log::error!("Error: {}", e),
84+
Err(e) => error!("Error: {}", e),
7985
}
8086
}
81-
Command::Client { config, target } => {
87+
Command::Client {
88+
config,
89+
probers,
90+
target,
91+
} => {
8292
let app_config = load_config(&config);
8393
let prober_config = prober_config(app_config);
8494

85-
match client::handle(&prober_config, &target).await {
95+
match client::handle(&prober_config, &probers, &target).await {
8696
Ok(_) => (),
87-
Err(e) => log::error!("Error: {}", e),
97+
Err(e) => error!("Error: {}", e),
8898
}
8999
}
90100
}

src/prober/handler.rs

+24
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use crate::config::AppConfig;
1515
use crate::prober::consumer::init_consumer;
1616
use crate::prober::prober::probe;
1717
use crate::prober::producer::produce;
18+
use crate::utils::test_id;
1819

1920
struct Target {
2021
prefix: IpNet,
@@ -95,6 +96,13 @@ fn generate_probes(target: &Target) -> Result<Vec<Probe>> {
9596
}
9697

9798
pub async fn handle(config: &AppConfig) -> Result<()> {
99+
// Test input ID
100+
if !test_id(Some(config.prober.prober_id.clone()), None, None) {
101+
return Err(anyhow::anyhow!("Invalid prober ID"));
102+
}
103+
104+
info!("Prober ID: {}", config.prober.prober_id);
105+
98106
// Configure Kafka authentication
99107
let out_auth = match config.kafka.auth_protocol.as_str() {
100108
"PLAINTEXT" => KafkaAuth::PlainText,
@@ -144,6 +152,22 @@ pub async fn handle(config: &AppConfig) -> Result<()> {
144152
}
145153
}
146154

155+
// Filter out the messages that are not intended for the prober
156+
// by looking at the prober ID in the headers
157+
let mut is_intended = false;
158+
if let Some(headers) = m.headers() {
159+
for header in headers.iter() {
160+
if header.key == &config.prober.prober_id {
161+
is_intended = true;
162+
break;
163+
}
164+
}
165+
}
166+
if !is_intended {
167+
info!("Target not intended for this prober");
168+
continue;
169+
}
170+
147171
// Probe Generation
148172
let target = decode_payload(target)?;
149173
let probes_to_send = generate_probes(&target)?;

src/prober/producer.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ fn format_mpls_labels(mpls_labels: &Vec<MPLSLabel>) -> String {
2424
+ "]"
2525
}
2626

27-
fn format_reply(prober_id: u16, reply: &Reply) -> String {
27+
fn format_reply(prober_id: String, reply: &Reply) -> String {
2828
let mut output = vec![];
2929
output.push(format!("{}", reply.capture_timestamp.as_millis()));
3030
output.push(format!("{}", prober_id));
@@ -75,7 +75,7 @@ pub async fn produce(config: &AppConfig, auth: KafkaAuth, results: Arc<Mutex<Vec
7575
FutureRecord::to(config.kafka.out_topic.as_str())
7676
.payload(&format!(
7777
"{}",
78-
format_reply(config.prober.prober_id, result)
78+
format_reply(config.prober.prober_id.clone(), result)
7979
))
8080
.key(&format!("Key")) // TODO
8181
.headers(OwnedHeaders::new().insert(Header {

src/utils.rs

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
use nanoid::nanoid;
2+
3+
const DEFAULT_ID_ALPHABET: &str = "abcdefghijklmnopqrstuvwxyz0123456789";
4+
const DEFAULT_ID_SIZE: usize = 10;
5+
6+
pub fn generate_id(alphabet: Option<&str>, size: Option<usize>) -> String {
7+
let alphabet = alphabet.unwrap_or(DEFAULT_ID_ALPHABET);
8+
let size = size.unwrap_or(DEFAULT_ID_SIZE);
9+
10+
nanoid!(size, &alphabet.chars().collect::<Vec<char>>())
11+
}
12+
13+
pub fn test_id(id: Option<String>, alphabet: Option<&str>, size: Option<usize>) -> bool {
14+
let alphabet = alphabet.unwrap_or(DEFAULT_ID_ALPHABET);
15+
let size = size.unwrap_or(DEFAULT_ID_SIZE);
16+
17+
match id {
18+
Some(id) => id.len() == size && id.chars().all(|c| alphabet.contains(c)),
19+
None => false,
20+
}
21+
}

testbed/README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
Docker Compose setup to facilitate the tests of Saimiris.
44

5-
The testbed consists in a Redpanda and ClickHouse instance. Required ClickHouse [tables](config/clickhouse/docker-entrypoint-initdb.d/init.sql) are created on startup. The `saimiris.from_kafka` is using the ClickHouse [Kafka engine](https://clickhouse.com/docs/en/engines/table-engines/integrations/kafka) to fetch the results from Redpanda. The `saimiris.results` table is used to store the results.
5+
The testbed consists in a Redpanda and ClickHouse instance. Required ClickHouse [tables](config/clickhouse/docker-entrypoint-initdb.d/init.sql) are created on startup. The `saimiris.from_kafka` table is using the ClickHouse [Kafka engine](https://clickhouse.com/docs/en/engines/table-engines/integrations/kafka) to fetch the results from Redpanda. The `saimiris.results` table is used to store the results.
66

77
As an example, Redpanda is configured with SASL authentication, and uses the default Saimiris SASL credentials.
88

@@ -23,7 +23,7 @@ cargo run -- prober --config=testbed/config/saimiris/saimiris.yml
2323
* Run Saimiris Client (from the root of the repository)
2424

2525
```sh
26-
cargo run -- prober --config=testbed/config/saimiris/saimiris.yml 2606:4700:4700::1111/128,1,32,1
26+
cargo run -- client --config=testbed/config/saimiris/saimiris.yml wbmwwp9vna 2606:4700:4700::1111/128,1,32,1
2727
```
2828

2929
* Stop the testbed

testbed/config/clickhouse/docker-entrypoint-initdb.d/init.sql

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ CREATE DATABASE IF NOT EXISTS saimiris;
22
CREATE TABLE saimiris.from_kafka
33
(
44
timestamp DateTime64,
5-
prober_id UInt16,
5+
prober_id String,
66
reply_src_addr IPv6,
77
reply_dst_addr IPv6,
88
reply_id UInt16,
@@ -33,7 +33,7 @@ SETTINGS
3333
CREATE TABLE saimiris.results
3434
(
3535
timestamp DateTime64,
36-
prober_id UInt16,
36+
prober_id String,
3737
reply_src_addr IPv6,
3838
reply_dst_addr IPv6,
3939
reply_id UInt16,

testbed/config/saimiris/osiris.yml testbed/config/saimiris/saimiris.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,4 @@ kafka:
1212
out_topic: saimiris-results
1313

1414
prober:
15-
id: 42
15+
id: wbmwwp9vna

0 commit comments

Comments
 (0)