Skip to content

Commit 6de60c8

Browse files
auth with client cert in kafka plugin (#607)
* auth with client cert in kafka plugin * because you can set content of cert or key * e2e test for kafka auth by client cert
1 parent a810132 commit 6de60c8

File tree

8 files changed

+189
-35
lines changed

8 files changed

+189
-35
lines changed

e2e/kafka_auth/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
/certs/

e2e/kafka_auth/docker-compose.yml

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
version: "2"
1+
version: "2.1"
22

33
services:
44
zookeeper:
@@ -9,31 +9,51 @@ services:
99
- "zookeeper_data:/bitnami"
1010
environment:
1111
- ALLOW_ANONYMOUS_LOGIN=yes
12+
init-certs:
13+
image: docker.io/bitnami/kafka:3.6
14+
command: /tmp/generate.sh
15+
working_dir: /tmp/
16+
user: 0:0
17+
volumes:
18+
- ./certs/:/tmp/certs/
19+
- "./generate.sh:/tmp/generate.sh"
1220
kafka:
1321
image: docker.io/bitnami/kafka:3.6
22+
container_name: kafka
1423
depends_on:
15-
- zookeeper
24+
zookeeper:
25+
condition: service_started
26+
init-certs:
27+
condition: service_completed_successfully
1628
ports:
17-
- "9093:9092"
29+
- "9093:9093"
1830
volumes:
1931
- "kafka_data:/bitnami"
32+
- ./certs/kafka.truststore.jks:/bitnami/kafka/config/certs/kafka.truststore.jks
33+
- ./certs/kafka.keystore.jks:/bitnami/kafka/config/certs/kafka.keystore.jks
2034
environment:
2135
# Zookeeper
2236
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
2337
# Listeners
24-
- KAFKA_CFG_LISTENERS=SASL_PLAINTEXT://:9092,PLAINTEXT://:9094
25-
- KAFKA_CFG_ADVERTISED_LISTENERS=SASL_PLAINTEXT://localhost:9093,PLAINTEXT://:9094
26-
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT
38+
- KAFKA_CFG_LISTENERS=SASL_SSL://:9093,SASL_PLAINTEXT://:9095,PLAINTEXT://:9094
39+
- KAFKA_CFG_ADVERTISED_LISTENERS=SASL_SSL://localhost:9093,SASL_PLAINTEXT://localhost:9095,PLAINTEXT://:9094
40+
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,SSL:SSL
2741
# Inter broker
2842
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
2943
- KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
3044
# Security
3145
- KAFKA_CFG_SASL_ENABLED_MECHANISMS=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512
3246
- KAFKA_CLIENT_USERS=user,admin
3347
- KAFKA_CLIENT_PASSWORDS=pass,admin
48+
- KAFKA_CFG_SSL_KEYSTORE_LOCATION=/opt/bitnami/kafka/config/certs/kafka.keystore.jks
49+
- KAFKA_CFG_SSL_KEYSTORE_PASSWORD=supersecret
50+
- KAFKA_CFG_SSL_KEY_PASSWORD=supersecret
51+
- KAFKA_CFG_SSL_TRUSTSTORE_LOCATION=/opt/bitnami/kafka/config/certs/kafka.truststore.jks
52+
- KAFKA_CFG_SSL_TRUSTSTORE_PASSWORD=supersecret
53+
- KAFKA_CFG_SSL_CLIENT_AUTH=required
3454

3555
volumes:
3656
zookeeper_data:
3757
driver: local
3858
kafka_data:
39-
driver: local
59+
driver: local

e2e/kafka_auth/generate.sh

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
#!/usr/bin/env bash
2+
3+
set -eu
4+
5+
PASSWORD="${PASSWORD:-supersecret}"
6+
7+
rm -rf certs/*
8+
# Root CA
9+
echo "Creating CA certificate and key"
10+
openssl req -new -x509 -keyout certs/ca.key -out certs/ca.crt -days 365 -subj "/CN=Sample CA/OU=US/O=US/ST=US/C=US" -passout pass:$PASSWORD
11+
12+
echo "Creating Truststore"
13+
keytool -keystore certs/kafka.truststore.jks -alias CARoot -import -file certs/ca.crt -storepass $PASSWORD -keypass $PASSWORD -noprompt
14+
15+
# Node certs
16+
17+
echo "Creating node key"
18+
keytool -keystore certs/kafka.keystore.jks -alias kafka -validity 365 -genkey -keyalg RSA -dname "cn=kafka, ou=US, o=US, c=US" -storepass $PASSWORD -keypass $PASSWORD
19+
echo "Creating certificate sign request"
20+
keytool -keystore certs/kafka.keystore.jks -alias kafka -certreq -file certs/tls.srl -storepass $PASSWORD -keypass $PASSWORD
21+
echo "Signing certificate request using self-signed CA"
22+
openssl x509 -req -CA certs/ca.crt -CAkey certs/ca.key \
23+
-in certs/tls.srl -out certs/tls.crt \
24+
-days 365 -CAcreateserial \
25+
-passin pass:$PASSWORD
26+
echo "Adding Ca certificate to the keystore"
27+
keytool -keystore certs/kafka.keystore.jks -alias CARoot -import -file certs/ca.crt -storepass $PASSWORD -keypass $PASSWORD -noprompt
28+
echo "Adding signed certificate"
29+
keytool -keystore certs/kafka.keystore.jks -alias kafka -import -file certs/tls.crt -storepass $PASSWORD -keypass $PASSWORD -noprompt
30+
31+
32+
33+
# Client cert
34+
35+
echo "Creating client key"
36+
keytool -keystore certs/kafka.keystore.jks -alias client -validity 365 -genkey -keyalg RSA -dname "cn=client, ou=US, o=US, c=US" -storepass $PASSWORD -keypass $PASSWORD
37+
38+
echo "Creating certificate sign request"
39+
keytool -keystore certs/kafka.keystore.jks -alias client -certreq -file certs/client.srl -storepass $PASSWORD -keypass $PASSWORD
40+
41+
42+
echo $PASSWORD | keytool -importkeystore \
43+
-srckeystore certs/kafka.keystore.jks \
44+
-destkeystore certs/kafka.keystore.p12 \
45+
-deststoretype PKCS12 \
46+
-srcalias client \
47+
-deststorepass $PASSWORD \
48+
-destkeypass $PASSWORD
49+
50+
openssl pkcs12 -in certs/kafka.keystore.p12 -nokeys -out certs/client_cert.pem -passin pass:$PASSWORD
51+
openssl pkcs12 -in certs/kafka.keystore.p12 -nodes -nocerts -out certs/client_key.pem -passin pass:$PASSWORD
52+
53+
chmod 644 certs/client*

e2e/kafka_auth/kafka_auth.go

Lines changed: 39 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,11 @@ type Config struct {
2424

2525
func (c *Config) Configure(t *testing.T, _ *cfg.Config, _ string) {
2626
type saslData struct {
27-
Enabled bool
28-
Mechanism string
29-
Username string
30-
Password string
27+
Enabled bool
28+
Mechanism string
29+
Username string
30+
Password string
31+
AuthByClientCert bool
3132
}
3233

3334
type tCase struct {
@@ -38,28 +39,31 @@ func (c *Config) Configure(t *testing.T, _ *cfg.Config, _ string) {
3839
cases := []tCase{
3940
{
4041
sasl: saslData{
41-
Enabled: true,
42-
Mechanism: "PLAIN",
43-
Username: "user",
44-
Password: "pass",
42+
Enabled: true,
43+
Mechanism: "PLAIN",
44+
Username: "user",
45+
Password: "pass",
46+
AuthByClientCert: true,
4547
},
4648
authorized: true,
4749
},
4850
{
4951
sasl: saslData{
50-
Enabled: true,
51-
Mechanism: "SCRAM-SHA-256",
52-
Username: "user",
53-
Password: "pass",
52+
Enabled: true,
53+
Mechanism: "SCRAM-SHA-256",
54+
Username: "user",
55+
Password: "pass",
56+
AuthByClientCert: true,
5457
},
5558
authorized: true,
5659
},
5760
{
5861
sasl: saslData{
59-
Enabled: true,
60-
Mechanism: "SCRAM-SHA-512",
61-
Username: "user",
62-
Password: "pass",
62+
Enabled: true,
63+
Mechanism: "SCRAM-SHA-512",
64+
Username: "user",
65+
Password: "pass",
66+
AuthByClientCert: true,
6367
},
6468
authorized: true,
6569
},
@@ -71,10 +75,11 @@ func (c *Config) Configure(t *testing.T, _ *cfg.Config, _ string) {
7175
},
7276
{
7377
sasl: saslData{
74-
Enabled: true,
75-
Mechanism: "PLAIN",
76-
Username: "user",
77-
Password: "pass123",
78+
Enabled: true,
79+
Mechanism: "PLAIN",
80+
Username: "user",
81+
Password: "pass123",
82+
AuthByClientCert: false,
7883
},
7984
authorized: false,
8085
},
@@ -91,6 +96,8 @@ func (c *Config) Configure(t *testing.T, _ *cfg.Config, _ string) {
9196
ClientID: "test-auth-out",
9297
BatchSize_: 10,
9398
MaxMessageBytes_: 1000000,
99+
SslEnabled: true,
100+
SslSkipVerify: true,
94101
}
95102
if tt.sasl.Enabled {
96103
config.SaslEnabled = true
@@ -99,6 +106,11 @@ func (c *Config) Configure(t *testing.T, _ *cfg.Config, _ string) {
99106
config.SaslPassword = tt.sasl.Password
100107
}
101108

109+
if tt.sasl.AuthByClientCert {
110+
config.ClientKey = "./kafka_auth/certs/client_key.pem"
111+
config.ClientCert = "./kafka_auth/certs/client_cert.pem"
112+
}
113+
102114
kafka_out.NewProducer(config,
103115
zap.NewNop().WithOptions(zap.WithFatalHook(zapcore.WriteThenPanic)).Sugar(),
104116
)
@@ -113,6 +125,8 @@ func (c *Config) Configure(t *testing.T, _ *cfg.Config, _ string) {
113125
Offset_: kafka_in.OffsetTypeNewest,
114126
ConsumerMaxProcessingTime_: 200 * time.Millisecond,
115127
ConsumerMaxWaitTime_: 250 * time.Millisecond,
128+
SslEnabled: true,
129+
SslSkipVerify: true,
116130
}
117131
if tt.sasl.Enabled {
118132
config.SaslEnabled = true
@@ -121,6 +135,11 @@ func (c *Config) Configure(t *testing.T, _ *cfg.Config, _ string) {
121135
config.SaslPassword = tt.sasl.Password
122136
}
123137

138+
if tt.sasl.AuthByClientCert {
139+
config.ClientKey = "./kafka_auth/certs/client_key.pem"
140+
config.ClientCert = "./kafka_auth/certs/client_cert.pem"
141+
}
142+
124143
kafka_in.NewConsumerGroup(config,
125144
zap.NewNop().WithOptions(zap.WithFatalHook(zapcore.WriteThenPanic)).Sugar(),
126145
)

plugin/input/kafka/README.md

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,19 @@ If set, the plugin will skip SSL/TLS verification.
111111

112112
<br>
113113

114-
**`pem_file`** *`string`* *`default=/file.d/certs`*
114+
**`client_cert`** *`string`*
115+
116+
Path or content of a PEM-encoded client certificate file.
117+
118+
<br>
119+
120+
**`client_key`** *`string`*
121+
122+
> Path or content of a PEM-encoded client key file.
123+
124+
<br>
125+
126+
**`ca_cert`** *`string`*
115127

116128
Path or content of a PEM-encoded CA file.
117129

plugin/input/kafka/kafka.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -142,10 +142,20 @@ type Config struct {
142142
// > If set, the plugin will skip SSL/TLS verification.
143143
SslSkipVerify bool `json:"ssl_skip_verify" default:"false"` // *
144144

145+
// > @3@4@5@6
146+
// >
147+
// > Path or content of a PEM-encoded client certificate file.
148+
ClientCert string `json:"client_cert"` // *
149+
150+
// > @3@4@5@6
151+
// >
152+
// > > Path or content of a PEM-encoded client key file.
153+
ClientKey string `json:"client_key"` // *
154+
145155
// > @3@4@5@6
146156
// >
147157
// > Path or content of a PEM-encoded CA file.
148-
SslPem string `json:"pem_file" default:"/file.d/certs"` // *
158+
CACert string `json:"ca_cert"` // *
149159
}
150160

151161
func init() {
@@ -239,11 +249,20 @@ func NewConsumerGroup(c *Config, l *zap.SugaredLogger) sarama.ConsumerGroup {
239249
config.Net.TLS.Enable = true
240250

241251
tlsCfg := xtls.NewConfigBuilder()
242-
if err := tlsCfg.AppendCARoot(c.SslPem); err != nil {
243-
l.Fatalf("can't load cert: %s", err.Error())
252+
253+
if c.CACert != "" {
254+
if err := tlsCfg.AppendCARoot(c.CACert); err != nil {
255+
l.Fatalf("can't load ca cert: %s", err.Error())
256+
}
244257
}
245258
tlsCfg.SetSkipVerify(c.SslSkipVerify)
246259

260+
if c.ClientCert != "" || c.ClientKey != "" {
261+
if err := tlsCfg.AppendX509KeyPair(c.ClientCert, c.ClientKey); err != nil {
262+
l.Fatalf("can't load client certificate and key: %s", err.Error())
263+
}
264+
}
265+
247266
config.Net.TLS.Config = tlsCfg.Build()
248267
}
249268

plugin/output/kafka/README.md

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,19 @@ If set, the plugin will skip SSL/TLS verification.
126126

127127
<br>
128128

129-
**`pem_file`** *`string`* *`default=/file.d/certs`*
129+
**`client_cert`** *`string`*
130+
131+
Path or content of a PEM-encoded client certificate file.
132+
133+
<br>
134+
135+
**`client_key`** *`string`*
136+
137+
> Path or content of a PEM-encoded client key file.
138+
139+
<br>
140+
141+
**`ca_cert`** *`string`*
130142

131143
Path or content of a PEM-encoded CA file.
132144

plugin/output/kafka/kafka.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,10 +156,20 @@ type Config struct {
156156
// > If set, the plugin will skip SSL/TLS verification.
157157
SslSkipVerify bool `json:"ssl_skip_verify" default:"false"` // *
158158

159+
// > @3@4@5@6
160+
// >
161+
// > Path or content of a PEM-encoded client certificate file.
162+
ClientCert string `json:"client_cert"` // *
163+
164+
// > @3@4@5@6
165+
// >
166+
// > > Path or content of a PEM-encoded client key file.
167+
ClientKey string `json:"client_key"` // *
168+
159169
// > @3@4@5@6
160170
// >
161171
// > Path or content of a PEM-encoded CA file.
162-
SslPem string `json:"pem_file" default:"/file.d/certs"` // *
172+
CACert string `json:"ca_cert"` // *
163173
}
164174

165175
func init() {
@@ -320,11 +330,19 @@ func NewProducer(c *Config, l *zap.SugaredLogger) sarama.SyncProducer {
320330
config.Net.TLS.Enable = true
321331

322332
tlsCfg := xtls.NewConfigBuilder()
323-
if err := tlsCfg.AppendCARoot(c.SslPem); err != nil {
324-
l.Fatalf("can't load cert: %s", err.Error())
333+
if c.CACert != "" {
334+
if err := tlsCfg.AppendCARoot(c.CACert); err != nil {
335+
l.Fatalf("can't load ca cert: %s", err.Error())
336+
}
325337
}
326338
tlsCfg.SetSkipVerify(c.SslSkipVerify)
327339

340+
if c.ClientCert != "" || c.ClientKey != "" {
341+
if err := tlsCfg.AppendX509KeyPair(c.ClientCert, c.ClientKey); err != nil {
342+
l.Fatalf("can't load client certificate and key: %s", err.Error())
343+
}
344+
}
345+
328346
config.Net.TLS.Config = tlsCfg.Build()
329347
}
330348

0 commit comments

Comments
 (0)