Skip to content

Commit ac9094e

Browse files
committed
add prober_id cli arg
1 parent 6dca083 commit ac9094e

File tree

4 files changed

+18
-4
lines changed

4 files changed

+18
-4
lines changed

src/handler.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ fn generate_probes(target: &Target) -> Result<Vec<Probe>> {
138138
}
139139

140140
pub async fn handle(
141+
prober_id: u16,
141142
brokers: &str,
142143
_in_topics: &str,
143144
_in_group_id: &str,
@@ -154,7 +155,7 @@ pub async fn handle(
154155
let (_, _, results) = result?;
155156

156157
// Produce the results to Kafka topic
157-
produce(brokers, out_topic, results).await;
158+
produce(brokers, out_topic, prober_id, results).await;
158159

159160
Ok(())
160161
}

src/main.rs

+5
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ use crate::handler::handle;
1616
#[derive(CliParser, Debug, Clone)]
1717
#[command(version, about, long_about = None)]
1818
struct Cli {
19+
/// Prober ID
20+
#[clap(long, default_value = "0")]
21+
prober_id: u16,
22+
1923
/// Kafka brokers
2024
#[clap(long, default_value = "localhost:9092")]
2125
brokers: String,
@@ -62,6 +66,7 @@ async fn main() -> Result<()> {
6266
set_logging(&cli);
6367

6468
match handle(
69+
cli.prober_id,
6570
&cli.brokers,
6671
&cli.in_topics,
6772
&cli.in_group_id,

src/producer.rs

+9-3
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ fn format_mpls_labels(mpls_labels: &Vec<MPLSLabel>) -> String {
2020
+ "]"
2121
}
2222

23-
fn format_reply(reply: &Reply) -> String {
23+
fn format_reply(prober_id: u16, reply: &Reply) -> String {
2424
let mut output = vec![];
2525
output.push(format!("{}", reply.capture_timestamp.as_millis()));
26+
output.push(format!("{}", prober_id));
2627
output.push(format!("{}", reply.reply_src_addr));
2728
output.push(format!("{}", reply.reply_dst_addr));
2829
output.push(format!("{}", reply.reply_id));
@@ -46,7 +47,12 @@ fn format_reply(reply: &Reply) -> String {
4647
output.join(",")
4748
}
4849

49-
pub async fn produce(brokers: &str, topic_name: &str, results: Arc<Mutex<Vec<Reply>>>) {
50+
pub async fn produce(
51+
brokers: &str,
52+
topic_name: &str,
53+
prober_id: u16,
54+
results: Arc<Mutex<Vec<Reply>>>,
55+
) {
5056
let producer: &FutureProducer = &ClientConfig::new()
5157
.set("bootstrap.servers", brokers)
5258
.set("message.timeout.ms", "5000")
@@ -57,7 +63,7 @@ pub async fn produce(brokers: &str, topic_name: &str, results: Arc<Mutex<Vec<Rep
5763
let _ = producer
5864
.send(
5965
FutureRecord::to(topic_name)
60-
.payload(&format!("{}", format_reply(result)))
66+
.payload(&format!("{}", format_reply(prober_id, result)))
6167
.key(&format!("Key"))
6268
.headers(OwnedHeaders::new().insert(Header {
6369
key: "header_key",

testbed/config/clickhouse/docker-entrypoint-initdb.d/init.sql

+2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ CREATE DATABASE osiris;
22
CREATE TABLE osiris.results_broker
33
(
44
timestamp DateTime64,
5+
prober_id UInt16,
56
reply_src_addr IPv6,
67
reply_dst_addr IPv6,
78
reply_id UInt16,
@@ -32,6 +33,7 @@ SETTINGS
3233
CREATE TABLE osiris.results
3334
(
3435
timestamp DateTime64,
36+
prober_id UInt16,
3537
reply_src_addr IPv6,
3638
reply_dst_addr IPv6,
3739
reply_id UInt16,

0 commit comments

Comments
 (0)