Skip to content

Commit 79717a5

Browse files
committed
implement SASL authentication
1 parent ac9094e commit 79717a5

File tree

10 files changed

+165
-26
lines changed

10 files changed

+165
-26
lines changed

Cargo.lock

+58
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ ipnet = "2.10.1"
1616
log = "0.4.22"
1717
pcap = "2.2.0"
1818
rand = "0.8.5"
19-
rdkafka = { version = "0.37.0" }
19+
rdkafka = { version = "0.37.0", features = ["sasl", "ssl"] }
2020
tokio = { version = "1.42.0", features = ["full"] }

src/handler.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use std::time::Duration;
1010
use tokio::task;
1111

1212
use crate::prober::probe;
13-
use crate::producer::produce;
13+
use crate::producer::{produce, KafkaAuth};
1414

1515
/// Probing configuration.
1616
#[derive(Debug)]
@@ -143,6 +143,7 @@ pub async fn handle(
143143
_in_topics: &str,
144144
_in_group_id: &str,
145145
out_topic: &str,
146+
out_auth: KafkaAuth,
146147
target: &str,
147148
) -> Result<()> {
148149
// Probe Generation
@@ -155,7 +156,7 @@ pub async fn handle(
155156
let (_, _, results) = result?;
156157

157158
// Produce the results to Kafka topic
158-
produce(brokers, out_topic, prober_id, results).await;
159+
produce(brokers, out_topic, prober_id, out_auth, results).await;
159160

160161
Ok(())
161162
}

src/main.rs

+33-1
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,25 @@ struct Cli {
3333
in_group_id: String,
3434

3535
/// Kafka producer topic
36-
#[clap(long, default_value = "results")]
36+
#[clap(long, default_value = "osiris-results")]
3737
out_topic: String,
3838

39+
/// Kafka producer Authentication Protocol
40+
#[clap(long, default_value = "PLAINTEXT")]
41+
out_auth_protocol: String,
42+
43+
/// Kafka producer Authentication SASL Username
44+
#[clap(long, default_value = "osiris")]
45+
out_auth_sasl_username: String,
46+
47+
/// Kafka producer Authentication SASL Password
48+
#[clap(long, default_value = "osiris")]
49+
out_auth_sasl_password: String,
50+
51+
/// Kafka producer Authentication SASL Mechanism
52+
#[clap(long, default_value = "SCRAM-SHA-512")]
53+
out_auth_sasl_mechanism: String,
54+
3955
/// Target (eg., 2606:4700:4700::1111/128,1,32,1)
4056
#[arg(index = 1)]
4157
target: String,
@@ -65,12 +81,28 @@ async fn main() -> Result<()> {
6581
let cli = Cli::parse();
6682
set_logging(&cli);
6783

84+
// Configure Kafka producer authentication
85+
let out_auth = match cli.out_auth_protocol.as_str() {
86+
"PLAINTEXT" => producer::KafkaAuth::PLAINTEXT,
87+
"SASL" => producer::KafkaAuth::SASL(producer::SaslAuth {
88+
username: cli.out_auth_sasl_username.clone(),
89+
password: cli.out_auth_sasl_password.clone(),
90+
mechanism: cli.out_auth_sasl_mechanism.clone(),
91+
}),
92+
_ => {
93+
return Err(anyhow::anyhow!(
94+
"Invalid Kafka producer authentication protocol"
95+
))
96+
}
97+
};
98+
6899
match handle(
69100
cli.prober_id,
70101
&cli.brokers,
71102
&cli.in_topics,
72103
&cli.in_group_id,
73104
&cli.out_topic,
105+
out_auth,
74106
&cli.target,
75107
)
76108
.await

src/producer.rs

+31-7
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,22 @@
11
use caracat::models::{MPLSLabel, Reply};
2+
use log::info;
23
use rdkafka::config::ClientConfig;
34
use rdkafka::message::{Header, OwnedHeaders};
45
use rdkafka::producer::{FutureProducer, FutureRecord};
56
use std::sync::{Arc, Mutex};
67
use std::time::Duration;
78

9+
pub struct SaslAuth {
10+
pub username: String,
11+
pub password: String,
12+
pub mechanism: String,
13+
}
14+
15+
pub enum KafkaAuth {
16+
SASL(SaslAuth),
17+
PLAINTEXT,
18+
}
19+
820
fn format_mpls_labels(mpls_labels: &Vec<MPLSLabel>) -> String {
921
String::from("[")
1022
+ &mpls_labels
@@ -51,16 +63,28 @@ pub async fn produce(
5163
brokers: &str,
5264
topic_name: &str,
5365
prober_id: u16,
66+
auth: KafkaAuth,
5467
results: Arc<Mutex<Vec<Reply>>>,
5568
) {
56-
let producer: &FutureProducer = &ClientConfig::new()
57-
.set("bootstrap.servers", brokers)
58-
.set("message.timeout.ms", "5000")
59-
.create()
60-
.expect("Producer creation error");
69+
let producer: &FutureProducer = match auth {
70+
KafkaAuth::PLAINTEXT => &ClientConfig::new()
71+
.set("bootstrap.servers", brokers)
72+
.set("message.timeout.ms", "5000")
73+
.create()
74+
.expect("Producer creation error"),
75+
KafkaAuth::SASL(scram_auth) => &ClientConfig::new()
76+
.set("bootstrap.servers", brokers)
77+
.set("message.timeout.ms", "5000")
78+
.set("sasl.username", scram_auth.username)
79+
.set("sasl.password", scram_auth.password)
80+
.set("sasl.mechanisms", scram_auth.mechanism)
81+
.set("security.protocol", "SASL_PLAINTEXT")
82+
.create()
83+
.expect("Producer creation error"),
84+
};
6185

6286
for result in results.lock().unwrap().iter() {
63-
let _ = producer
87+
let delivery_status = producer
6488
.send(
6589
FutureRecord::to(topic_name)
6690
.payload(&format!("{}", format_reply(prober_id, result)))
@@ -72,6 +96,6 @@ pub async fn produce(
7296
Duration::from_secs(0),
7397
)
7498
.await;
99+
info!("{:?}", delivery_status);
75100
}
76-
// let delivery_status = info!("Delivery status: {:?}", delivery_status);
77101
}

testbed/README.md

+4-3
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@
22

33
Docker Compose setup to facilitate the tests of Osiris.
44

5-
The testbed consists in a Redpanda and ClickHouse instance. Required ClickHouse [tables](config/clickhouse/docker-entrypoint-initdb.d/init.sql) are created on startup. The `osiris.results_broker` is using the ClickHouse [Kafka engine](https://clickhouse.com/docs/en/engines/table-engines/integrations/kafka) to fetch the results from Redpanda. The `osiris.results` table is used to store the results.
5+
The testbed consists in a Redpanda and ClickHouse instance. Required ClickHouse [tables](config/clickhouse/docker-entrypoint-initdb.d/init.sql) are created on startup. The `osiris.from_kafka` is using the ClickHouse [Kafka engine](https://clickhouse.com/docs/en/engines/table-engines/integrations/kafka) to fetch the results from Redpanda. The `osiris.results` table is used to store the results.
66

7+
As an example, Redpanda is configured with SASL authentication, and uses the default Osiris SASL credentials.
78

89
## Usage
910

@@ -13,10 +14,10 @@ The testbed consists in a Redpanda and ClickHouse instance. Required ClickHouse
1314
docker compose up -d --force-recreate --renew-anon-volumes
1415
```
1516

16-
* Run Osiris (from the root of the repository)
17+
* Run Osiris (from the root of the repository) with SASL authentication protocol
1718

1819
```sh
19-
cargo run
20+
cargo run -- --out-auth-protocol=SASL 2606:4700:4700::1111/128,1,32,1
2021
```
2122

2223
* Stop the testbed

testbed/compose.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
services:
22
redpanda:
3-
image: vectorized/redpanda:latest
4-
command: redpanda start --check=false --overprovisioned --smp 1 --memory 500M
3+
image: redpandadata/redpanda:latest
54
volumes:
5+
- ./config/redpanda/entrypoint.sh:/entrypoint.sh:ro
66
- ./config/redpanda/redpanda.yml:/etc/redpanda/redpanda.yaml:ro
77
ports:
88
- "9092:9092"

testbed/config/clickhouse/docker-entrypoint-initdb.d/init.sql

+6-6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
CREATE DATABASE osiris;
2-
CREATE TABLE osiris.results_broker
1+
CREATE DATABASE IF NOT EXISTS osiris;
2+
CREATE TABLE osiris.from_kafka
33
(
44
timestamp DateTime64,
55
prober_id UInt16,
@@ -26,8 +26,8 @@ CREATE TABLE osiris.results_broker
2626
ENGINE = Kafka()
2727
SETTINGS
2828
kafka_broker_list = '10.0.0.100:9093',
29-
kafka_topic_list = 'results',
30-
kafka_group_name = 'clickhouse-results-group',
29+
kafka_topic_list = 'osiris-results',
30+
kafka_group_name = 'clickhouse-osiris-group',
3131
kafka_format = 'CSV';
3232

3333
CREATE TABLE osiris.results
@@ -64,5 +64,5 @@ ORDER BY (
6464
probe_ttl
6565
);
6666

67-
CREATE MATERIALIZED VIEW osiris.results_broker_mv TO osiris.results
68-
AS SELECT * FROM osiris.results_broker;
67+
CREATE MATERIALIZED VIEW osiris.from_kafka_mv TO osiris.results
68+
AS SELECT * FROM osiris.from_kafka;

testbed/config/redpanda/entrypoint.sh

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
#!/bin/bash
2+
3+
# Start Redpanda in the background
4+
set -m
5+
/usr/bin/rpk redpanda start --check=false --overprovisioned --smp 1 --memory 500M &
6+
7+
# Wait for Redpanda to be ready
8+
sleep 3
9+
10+
# Create `admin` superuser
11+
/usr/bin/rpk cluster config set superusers ['admin']
12+
/usr/bin/rpk security user create admin -p 'admin' --mechanism=SCRAM-SHA-512
13+
14+
# Enable SASL
15+
/usr/bin/rpk cluster config set enable_sasl true
16+
17+
# Create `osiris` user and grant it access to the cluster and the `osiris-results` topic
18+
/usr/bin/rpk security user create osiris -p 'osiris' --mechanism SCRAM-SHA-512
19+
/usr/bin/rpk security acl create --allow-principal User:osiris --operation all --cluster -X user=admin -X pass='admin' -X sasl.mechanism=SCRAM-SHA-512
20+
/usr/bin/rpk security acl create --allow-principal User:osiris --operation all --topic osiris-results -X user=admin -X pass='admin' -X sasl.mechanism=SCRAM-SHA-512
21+
fg %1

testbed/config/redpanda/redpanda.yml

+6-4
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@ redpanda:
77
port: 33145
88
kafka_api:
99
- address: 0.0.0.0
10-
name: internal
10+
name: external
1111
port: 9092
12+
authentication_method: sasl
1213
- address: 0.0.0.0
13-
name: external
14+
name: internal
1415
port: 9093
16+
authentication_method: none
1517
admin:
1618
address: 0.0.0.0
1719
port: 9644
@@ -20,10 +22,10 @@ redpanda:
2022
port: 33145
2123
advertised_kafka_api:
2224
- address: localhost
23-
name: internal
25+
name: external
2426
port: 9092
2527
- address: 10.0.0.100
26-
name: external
28+
name: internal
2729
port: 9093
2830
developer_mode: true
2931
auto_create_topics_enabled: true

0 commit comments

Comments
 (0)