Skip to content

Commit 07f0209

Browse files
committed
allow to disable Kafka producer
1 parent d9047b3 commit 07f0209

File tree

4 files changed

+14
-3
lines changed

4 files changed

+14
-3
lines changed

src/agent/handler.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ pub async fn handle(config: &AppConfig) -> Result<()> {
108108
info!("Starting receiver");
109109
ReceiveLoop::new(tx, config.caracat.clone());
110110

111-
// Start the Kafka producer task
111+
// Start the Kafka producer task if enabled
112112
let config_task = config.clone();
113113
let kafka_auth_task = kafka_auth.clone();
114114
task::spawn(async move {

src/agent/producer.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use caracat::models::{MPLSLabel, Reply};
2-
use log::info;
2+
use log::{info, warn};
33
use rdkafka::config::ClientConfig;
44
use rdkafka::message::{Header, OwnedHeaders};
55
use rdkafka::producer::{FutureProducer, FutureRecord};
@@ -52,6 +52,13 @@ fn format_reply(agent_id: String, reply: &Reply) -> String {
5252
}
5353

5454
pub async fn produce(config: &AppConfig, auth: KafkaAuth, rx: Receiver<Reply>) {
55+
if config.kafka.out_enable == false {
56+
warn!("Kafka producer is disabled");
57+
loop {
58+
rx.recv().unwrap();
59+
}
60+
}
61+
5562
let producer: &FutureProducer = match auth {
5663
KafkaAuth::PlainText => &ClientConfig::new()
5764
.set("bootstrap.servers", config.kafka.brokers.clone())
@@ -69,7 +76,6 @@ pub async fn produce(config: &AppConfig, auth: KafkaAuth, rx: Receiver<Reply>) {
6976
.expect("Producer creation error"),
7077
};
7178

72-
// TODO: send batch of replies
7379
loop {
7480
let result = rx.recv().unwrap();
7581
let delivery_status = producer

src/config.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ pub struct KafkaConfig {
6565

6666
/// Kafka producer topic
6767
pub out_topic: String,
68+
69+
/// Enable Kafka producer
70+
pub out_enable: bool,
6871
}
6972

7073
#[derive(Debug, Clone)]
@@ -136,6 +139,7 @@ pub fn app_config(config_path: &str) -> AppConfig {
136139
out_topic: config
137140
.get_string("kafka.out_topic")
138141
.unwrap_or("saimiris-results".to_string()),
142+
out_enable: config.get_bool("kafka.out_enable").unwrap_or(true),
139143
},
140144

141145
// Agent configuration

testbed/config/saimiris/saimiris.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ kafka:
55
in_topics: saimiris-targets
66
in_group_id: saimiris-agent-wbmwwp9vna
77

8+
out_enable: true
89
out_topic: saimiris-results
910

1011
agent:

0 commit comments

Comments
 (0)