Skip to content

Commit 876a396

Browse files
committed
implement basic kafka producer
1 parent 2829d94 commit 876a396

File tree

8 files changed

+259
-113
lines changed

8 files changed

+259
-113
lines changed

src/consumer.rs

Lines changed: 7 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,19 @@
1-
use log::{info, warn};
1+
use log::warn;
22

3-
use rdkafka::client::ClientContext;
43
use rdkafka::config::{ClientConfig, RDKafkaLogLevel};
54
use rdkafka::consumer::stream_consumer::StreamConsumer;
6-
use rdkafka::consumer::{BaseConsumer, CommitMode, Consumer, ConsumerContext, Rebalance};
7-
use rdkafka::error::KafkaResult;
8-
use rdkafka::topic_partition_list::TopicPartitionList;
9-
10-
use crate::handler;
11-
12-
// A context can be used to change the behavior of producers and consumers by adding callbacks
13-
// that will be executed by librdkafka.
14-
// This particular context sets up custom callbacks to log rebalancing events.
15-
struct CustomContext;
16-
17-
impl ClientContext for CustomContext {}
18-
19-
impl ConsumerContext for CustomContext {
20-
fn pre_rebalance(&self, _: &BaseConsumer<Self>, rebalance: &Rebalance) {
21-
info!("Pre rebalance {:?}", rebalance);
22-
}
23-
24-
fn post_rebalance(&self, _: &BaseConsumer<Self>, rebalance: &Rebalance) {
25-
info!("Post rebalance {:?}", rebalance);
26-
}
27-
28-
fn commit_callback(&self, result: KafkaResult<()>, _offsets: &TopicPartitionList) {
29-
info!("Committing offsets: {:?}", result);
30-
}
31-
}
32-
33-
// A type alias with your custom consumer can be created for convenience.
34-
type LoggingConsumer = StreamConsumer<CustomContext>;
5+
use rdkafka::consumer::{CommitMode, Consumer, DefaultConsumerContext};
356

7+
#[allow(dead_code)]
368
pub async fn consume(brokers: &str, group_id: &str, topics: &[&str]) {
37-
let context = CustomContext;
9+
let context = DefaultConsumerContext;
3810

39-
let consumer: LoggingConsumer = ClientConfig::new()
11+
let consumer: StreamConsumer<DefaultConsumerContext> = ClientConfig::new()
4012
.set("group.id", group_id)
4113
.set("bootstrap.servers", brokers)
4214
.set("enable.partition.eof", "false")
4315
.set("session.timeout.ms", "6000")
4416
.set("enable.auto.commit", "true")
45-
//.set("statistics.interval.ms", "30000")
46-
//.set("auto.offset.reset", "smallest")
4717
.set_log_level(RDKafkaLogLevel::Debug)
4818
.create_with_context(context)
4919
.expect("Consumer creation failed");
@@ -56,8 +26,8 @@ pub async fn consume(brokers: &str, group_id: &str, topics: &[&str]) {
5626
match consumer.recv().await {
5727
Err(e) => warn!("Kafka error: {}", e),
5828
Ok(m) => {
59-
let _ = handler::handle(&m).await;
60-
consumer.commit_message(&m, CommitMode::Async).unwrap();
29+
// Do something with the message
30+
let _ = consumer.commit_message(&m, CommitMode::Async).unwrap();
6131
}
6232
};
6333
}

src/handler.rs

Lines changed: 38 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
use anyhow::Result;
22
use caracat::models::Probe;
33
use caracat::rate_limiter::RateLimitingMethod;
4-
use log::{info, warn};
5-
use rdkafka::message::BorrowedMessage;
6-
use rdkafka::message::{Headers, Message};
4+
// use log::{info, warn};
5+
// use rdkafka::message::BorrowedMessage;
6+
// use rdkafka::message::{Headers, Message};
77
use std::net::IpAddr;
88
use std::net::Ipv4Addr;
99
use std::net::Ipv6Addr;
1010
use std::time::Duration;
1111
use tokio::task;
1212

1313
use crate::prober::probe;
14+
use crate::producer::produce;
1415

1516
/// Probing configuration.
1617
#[derive(Debug)]
@@ -79,31 +80,39 @@ fn decode_payload(payload: &str) -> Result<Payload> {
7980
})
8081
}
8182

82-
pub async fn handle(m: &BorrowedMessage<'_>) -> Result<()> {
83-
let payload = match m.payload_view::<str>() {
84-
None => "",
85-
Some(Ok(s)) => s,
86-
Some(Err(e)) => {
87-
warn!("Error while deserializing message payload: {:?}", e);
88-
""
89-
}
90-
};
83+
// pub async fn handle(m: &BorrowedMessage<'_>) -> Result<()> {
84+
pub async fn handle(
85+
brokers: &str,
86+
_in_group_id: &str,
87+
_in_topics: &[&str],
88+
out_topic: &str,
89+
) -> Result<()> {
90+
// let payload = match m.payload_view::<str>() {
91+
// None => "",
92+
// Some(Ok(s)) => s,
93+
// Some(Err(e)) => {
94+
// warn!("Error while deserializing message payload: {:?}", e);
95+
// ""
96+
// }
97+
// };
9198

92-
info!(
93-
"key: '{:?}', payload: '{}', topic: {}, partition: {}, offset: {}, timestamp: {:?}",
94-
m.key(),
95-
payload,
96-
m.topic(),
97-
m.partition(),
98-
m.offset(),
99-
m.timestamp()
100-
);
99+
// info!(
100+
// "key: '{:?}', payload: '{}', topic: {}, partition: {}, offset: {}, timestamp: {:?}",
101+
// m.key(),
102+
// payload,
103+
// m.topic(),
104+
// m.partition(),
105+
// m.offset(),
106+
// m.timestamp()
107+
// );
101108

102-
if let Some(headers) = m.headers() {
103-
for header in headers.iter() {
104-
info!(" Header {:#?}: {:?}", header.key, header.value);
105-
}
106-
}
109+
// if let Some(headers) = m.headers() {
110+
// for header in headers.iter() {
111+
// info!(" Header {:#?}: {:?}", header.key, header.value);
112+
// }
113+
// }
114+
115+
let payload = "2606:4700:4700::1111,1,32";
107116

108117
// Probing
109118
let config = create_config();
@@ -119,18 +128,10 @@ pub async fn handle(m: &BorrowedMessage<'_>) -> Result<()> {
119128
protocol: caracat::models::L4::ICMPv6,
120129
});
121130
}
122-
let result = task::spawn_blocking(move || {
123-
probe(
124-
config,
125-
probes_to_send.into_iter(),
126-
Some(String::from("./test.csv")),
127-
)
128-
})
129-
.await?;
131+
let result = task::spawn_blocking(move || probe(config, probes_to_send.into_iter())).await?;
130132

131-
if let Err(e) = result {
132-
warn!("Error while probing: {:?}", e);
133-
}
133+
let (_, _, results) = result?;
134+
produce(brokers, out_topic, results).await;
134135

135136
Ok(())
136137
}

src/main.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
mod consumer;
22
mod handler;
33
mod prober;
4+
mod producer;
45

56
use anyhow::Result;
67
use chrono::Local;
@@ -9,7 +10,8 @@ use clap_verbosity_flag::{InfoLevel, Verbosity};
910
use env_logger::Builder;
1011
use std::io::Write;
1112

12-
use crate::consumer::consume;
13+
// use crate::consumer::consume;
14+
use crate::handler::handle;
1315

1416
#[derive(CliParser, Debug, Clone)]
1517
#[command(version, about, long_about = None)]
@@ -40,10 +42,12 @@ async fn main() -> Result<()> {
4042
set_logging(&cli);
4143

4244
let brokers = "localhost:9092";
43-
let group_id = "osiris";
44-
let topics = vec!["osiris"];
45+
let in_group_id = "osiris";
46+
let in_topics = vec!["osiris"];
47+
let out_topic = "results";
4548

46-
consume(brokers, group_id, &topics).await;
49+
// consume(brokers, group_id, &topics).await;
50+
let _ = handle(&brokers, &in_group_id, &in_topics, &out_topic).await;
4751

4852
Ok(())
4953
}

src/prober.rs

Lines changed: 36 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use hyperloglog::HyperLogLog;
44

55
use std::fmt::Display;
66
use std::fmt::Formatter;
7-
use std::io::{stdout, Write};
87
use std::sync::{Arc, Mutex};
98
use std::thread;
109
use std::thread::sleep;
@@ -13,7 +12,7 @@ use std::thread::JoinHandle;
1312
use log::{error, info, trace};
1413

1514
use crate::handler::Config;
16-
use caracat::models::Probe;
15+
use caracat::models::{Probe, Reply};
1716
use caracat::rate_limiter::RateLimiter;
1817
use caracat::receiver::Receiver;
1918
use caracat::sender::Sender;
@@ -22,8 +21,11 @@ use caracat::sender::Sender;
2221
pub fn probe<T: Iterator<Item = Probe>>(
2322
config: Config,
2423
probes: T,
25-
csv_output: Option<String>,
26-
) -> Result<(SendStatistics, ReceiveStatistics)> {
24+
) -> Result<(
25+
SendStatistics,
26+
ReceiveStatistics,
27+
Arc<Mutex<Vec<caracat::models::Reply>>>,
28+
)> {
2729
info!("{:?}", config);
2830

2931
let rate_limiter = RateLimiter::new(
@@ -36,7 +38,6 @@ pub fn probe<T: Iterator<Item = Probe>>(
3638
config.interface.clone(),
3739
config.instance_id,
3840
config.integrity_check,
39-
csv_output,
4041
);
4142

4243
let mut prober = SendLoop::new(
@@ -49,8 +50,8 @@ pub fn probe<T: Iterator<Item = Probe>>(
4950
rate_limiter,
5051
Sender::new(
5152
&config.interface,
52-
None,
53-
None,
53+
config.src_ipv4_addr,
54+
config.src_ipv6_addr,
5455
config.instance_id,
5556
config.dry_run,
5657
)?,
@@ -63,27 +64,26 @@ pub fn probe<T: Iterator<Item = Probe>>(
6364
// TODO: Cleaner way?
6465
let final_prober_statistics = *prober.statistics().lock().unwrap();
6566
let final_receiver_statistics = receiver.statistics().lock().unwrap().clone();
67+
let results = Arc::clone(&receiver.results);
6668

6769
receiver.stop();
6870

69-
Ok((final_prober_statistics, final_receiver_statistics))
71+
Ok((
72+
final_prober_statistics,
73+
final_receiver_statistics,
74+
Arc::clone(&results),
75+
))
7076
}
7177

72-
// The pcap crate doesn't support `pcap_loop` and `pcap_breakloop`,
73-
// so we implement our own looping mechanism.
7478
pub struct ReceiveLoop {
7579
handle: JoinHandle<()>,
7680
stopped: Arc<Mutex<bool>>,
7781
statistics: Arc<Mutex<ReceiveStatistics>>,
82+
results: Arc<Mutex<Vec<Reply>>>,
7883
}
7984

8085
impl ReceiveLoop {
81-
pub fn new(
82-
interface: String,
83-
instance_id: u16,
84-
integrity_check: bool,
85-
output_csv: Option<String>,
86-
) -> Self {
86+
pub fn new(interface: String, instance_id: u16, integrity_check: bool) -> Self {
8787
// By default if a thread panic, the other threads are not affected and the error
8888
// is only surfaced when joining the thread. However since this is a long-lived thread,
8989
// we're not calling join until the end of the process. Since this loop is critical to
@@ -94,20 +94,23 @@ impl ReceiveLoop {
9494
let statistics = Arc::new(Mutex::new(ReceiveStatistics::default()));
9595
let statistics_thr = statistics.clone();
9696

97+
let results = Arc::new(Mutex::new(vec![]));
98+
let results_thr = results.clone();
99+
97100
let handle = thread::spawn(move || {
98101
let mut receiver = Receiver::new_batch(&interface).unwrap();
99102

100-
let wtr: Box<dyn Write> = match output_csv {
101-
Some(output_csv) => {
102-
let file = std::fs::File::create(output_csv).unwrap();
103-
Box::new(std::io::BufWriter::new(file))
104-
}
105-
None => Box::new(stdout().lock()),
106-
};
103+
// let wtr: Box<dyn Write> = match output_csv {
104+
// Some(output_csv) => {
105+
// let file = std::fs::File::create(output_csv).unwrap();
106+
// Box::new(std::io::BufWriter::new(file))
107+
// }
108+
// None => Box::new(stdout().lock()),
109+
// };
107110

108-
let mut csv_writer = csv::WriterBuilder::new()
109-
.has_headers(false) // TODO: Set to true, but how to serialize MPLS labels?
110-
.from_writer(wtr);
111+
// let mut csv_writer = csv::WriterBuilder::new()
112+
// .has_headers(false) // TODO: Set to true, but how to serialize MPLS labels?
113+
// .from_writer(wtr);
111114

112115
loop {
113116
// TODO: Cleanup this loop & statistics handling
@@ -117,6 +120,9 @@ impl ReceiveLoop {
117120
statistics.pcap_received = pcap_statistics.received;
118121
statistics.pcap_dropped = pcap_statistics.dropped;
119122
statistics.pcap_if_dropped = pcap_statistics.if_dropped;
123+
124+
let mut results = results_thr.lock().unwrap();
125+
120126
match result {
121127
Ok(reply) => {
122128
trace!("{:?}", reply);
@@ -130,7 +136,8 @@ impl ReceiveLoop {
130136
.icmp_messages_excl_dest
131137
.insert(&reply.reply_src_addr);
132138
}
133-
csv_writer.serialize(reply).unwrap();
139+
140+
results.push(reply);
134141
// TODO: Write round column.
135142
// TODO: Compare output with caracal (capture timestamp resolution?)
136143
} else {
@@ -158,12 +165,13 @@ impl ReceiveLoop {
158165
break;
159166
}
160167
}
161-
csv_writer.flush().unwrap();
162168
});
169+
163170
ReceiveLoop {
164171
handle,
165172
stopped,
166173
statistics,
174+
results,
167175
}
168176
}
169177

@@ -271,7 +279,6 @@ impl SendLoop {
271279
}
272280
}
273281

274-
275282
#[derive(Clone, Debug)]
276283
pub struct ReceiveStatistics {
277284
/// Number of packets received.

0 commit comments

Comments
 (0)