Skip to content

Commit 364d58c

Browse files
committed
changefeedccl: refactor kafka auth to be plugin-based
Epic: none Release note: None
1 parent bdb281f commit 364d58c

File tree

15 files changed

+903
-383
lines changed

15 files changed

+903
-383
lines changed

pkg/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -877,6 +877,7 @@ GO_TARGETS = [
877877
"//pkg/ccl/changefeedccl/changefeedvalidators:changefeedvalidators",
878878
"//pkg/ccl/changefeedccl/checkpoint:checkpoint",
879879
"//pkg/ccl/changefeedccl/checkpoint:checkpoint_test",
880+
"//pkg/ccl/changefeedccl/kafkaauth:kafkaauth",
880881
"//pkg/ccl/changefeedccl/kvevent:kvevent",
881882
"//pkg/ccl/changefeedccl/kvevent:kvevent_test",
882883
"//pkg/ccl/changefeedccl/kvfeed:kvfeed",

pkg/ccl/changefeedccl/BUILD.bazel

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ go_library(
2828
"retry.go",
2929
"scheduled_changefeed.go",
3030
"schema_registry.go",
31-
"scram_client.go",
3231
"sink.go",
3332
"sink_cloudstorage.go",
3433
"sink_external_connection.go",
@@ -58,6 +57,7 @@ go_library(
5857
"//pkg/ccl/changefeedccl/changefeedpb",
5958
"//pkg/ccl/changefeedccl/changefeedvalidators",
6059
"//pkg/ccl/changefeedccl/checkpoint",
60+
"//pkg/ccl/changefeedccl/kafkaauth",
6161
"//pkg/ccl/changefeedccl/kvevent",
6262
"//pkg/ccl/changefeedccl/kvfeed",
6363
"//pkg/ccl/changefeedccl/resolvedspan",
@@ -166,7 +166,6 @@ go_library(
166166
"//pkg/util/tracing",
167167
"//pkg/util/uuid",
168168
"@com_github_apache_pulsar_client_go//pulsar",
169-
"@com_github_aws_aws_msk_iam_sasl_signer_go//signer",
170169
"@com_github_cockroachdb_apd_v3//:apd",
171170
"@com_github_cockroachdb_errors//:errors",
172171
"@com_github_cockroachdb_logtags//:logtags",
@@ -183,12 +182,7 @@ go_library(
183182
"@com_github_twmb_franz_go//pkg/kerr",
184183
"@com_github_twmb_franz_go//pkg/kgo",
185184
"@com_github_twmb_franz_go//pkg/kversion",
186-
"@com_github_twmb_franz_go//pkg/sasl",
187-
"@com_github_twmb_franz_go//pkg/sasl/oauth",
188-
"@com_github_twmb_franz_go//pkg/sasl/plain",
189-
"@com_github_twmb_franz_go//pkg/sasl/scram",
190185
"@com_github_twmb_franz_go_pkg_kadm//:kadm",
191-
"@com_github_xdg_go_scram//:scram",
192186
"@com_google_cloud_go_pubsub//:pubsub",
193187
"@com_google_cloud_go_pubsub//apiv1",
194188
"@com_google_cloud_go_pubsub//apiv1/pubsubpb",
@@ -198,8 +192,6 @@ go_library(
198192
"@org_golang_google_grpc//codes",
199193
"@org_golang_google_grpc//credentials/insecure",
200194
"@org_golang_google_grpc//status",
201-
"@org_golang_x_oauth2//:oauth2",
202-
"@org_golang_x_oauth2//clientcredentials",
203195
"@org_golang_x_oauth2//google",
204196
],
205197
)
@@ -375,6 +367,7 @@ go_test(
375367
"@com_github_twmb_franz_go//pkg/kgo",
376368
"@com_github_twmb_franz_go//pkg/kversion",
377369
"@com_github_twmb_franz_go//pkg/sasl",
370+
"@com_github_twmb_franz_go//pkg/sasl/plain",
378371
"@com_github_twmb_franz_go_pkg_kadm//:kadm",
379372
"@com_google_cloud_go_pubsub//apiv1",
380373
"@com_google_cloud_go_pubsub//apiv1/pubsubpb",

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5593,7 +5593,7 @@ func TestChangefeedErrors(t *testing.T) {
55935593
)
55945594
sqlDB.ExpectErrWithTimeout(
55955595
t, `param sasl_handshake must be a bool`,
5596-
`CREATE CHANGEFEED FOR foo INTO $1`, `kafka://nope/?sasl_enabled=true&sasl_handshake=maybe`,
5596+
`CREATE CHANGEFEED FOR foo INTO $1`, `kafka://nope/?sasl_enabled=true&sasl_user=x&sasl_password=y&sasl_handshake=maybe`,
55975597
)
55985598
sqlDB.ExpectErrWithTimeout(
55995599
t, `sasl_enabled must be enabled to configure SASL handshake behavior`,
@@ -5625,14 +5625,14 @@ func TestChangefeedErrors(t *testing.T) {
56255625
)
56265626
sqlDB.ExpectErrWithTimeout(
56275627
t, `sasl_client_id is only a valid parameter for sasl_mechanism=OAUTHBEARER`,
5628-
`CREATE CHANGEFEED FOR foo INTO $1`, `kafka://nope/?sasl_client_id=a`,
5628+
`CREATE CHANGEFEED FOR foo INTO $1`, `kafka://nope/?sasl_enabled=true&sasl_client_id=a`,
56295629
)
56305630
sqlDB.ExpectErrWithTimeout(
56315631
t, `sasl_enabled must be enabled to configure SASL mechanism`,
56325632
`CREATE CHANGEFEED FOR foo INTO $1`, `kafka://nope/?sasl_mechanism=SCRAM-SHA-256`,
56335633
)
56345634
sqlDB.ExpectErrWithTimeout(
5635-
t, `param sasl_mechanism must be one of SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER, PLAIN or AWS_MSK_IAM`,
5635+
t, `param sasl_mechanism must be one of AWS_MSK_IAM, OAUTHBEARER, PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512`,
56365636
`CREATE CHANGEFEED FOR foo INTO $1`, `kafka://nope/?sasl_enabled=true&sasl_mechanism=unsuppported`,
56375637
)
56385638
sqlDB.ExpectErrWithTimeout(

pkg/ccl/changefeedccl/changefeedbase/options.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -239,9 +239,6 @@ const (
239239
Topics = `topics`
240240
)
241241

242-
// Support additional mechanism on top of the default SASL mechanism.
243-
const SASLTypeAWSMSKIAM = "AWS_MSK_IAM"
244-
245242
func makeStringSet(opts ...string) map[string]struct{} {
246243
res := make(map[string]struct{}, len(opts))
247244
for _, opt := range opts {
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library")
2+
3+
go_library(
4+
name = "kafkaauth",
5+
srcs = [
6+
"doc.go",
7+
"kafkaauth.go",
8+
"sasl_msk.go",
9+
"sasl_oauth.go",
10+
"sasl_plain.go",
11+
"sasl_scram.go",
12+
"scram_client.go",
13+
],
14+
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kafkaauth",
15+
visibility = ["//visibility:public"],
16+
deps = [
17+
"//pkg/ccl/changefeedccl/changefeedbase",
18+
"@com_github_aws_aws_msk_iam_sasl_signer_go//signer",
19+
"@com_github_cockroachdb_errors//:errors",
20+
"@com_github_ibm_sarama//:sarama",
21+
"@com_github_twmb_franz_go//pkg/kgo",
22+
"@com_github_twmb_franz_go//pkg/sasl",
23+
"@com_github_twmb_franz_go//pkg/sasl/oauth",
24+
"@com_github_twmb_franz_go//pkg/sasl/plain",
25+
"@com_github_twmb_franz_go//pkg/sasl/scram",
26+
"@com_github_xdg_go_scram//:scram",
27+
"@org_golang_x_oauth2//:oauth2",
28+
"@org_golang_x_oauth2//clientcredentials",
29+
],
30+
)
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
// Package kafkaauth provides Kafka SASL authentication mechanisms for use with
7+
// the changefeed kafka sinks (v1 & v2). Most standard SASL mechanisms are
8+
// supported, and some custom ones as well.
9+
package kafkaauth
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package kafkaauth
7+
8+
import (
9+
"context"
10+
"sort"
11+
"strings"
12+
13+
"github.com/IBM/sarama"
14+
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
15+
"github.com/cockroachdb/errors"
16+
"github.com/twmb/franz-go/pkg/kgo"
17+
)
18+
19+
type saslMechanismBuilder interface {
20+
name() string
21+
validateParams(u *changefeedbase.SinkURL) error
22+
build(u *changefeedbase.SinkURL) (SASLMechanism, error)
23+
}
24+
25+
// SASLMechanism is an interface for SASL mechanism instances, built from URLs,
26+
// to be applied to sarama and kgo configurations.
27+
type SASLMechanism interface {
28+
// ApplySarama applies the SASL mechanism to the given sarama configuration.
29+
ApplySarama(ctx context.Context, cfg *sarama.Config) error
30+
// KgoOpts returns kgo options that implement the SASL mechanism.
31+
KgoOpts(ctx context.Context) ([]kgo.Opt, error)
32+
}
33+
34+
type saslMechanismRegistry map[string]saslMechanismBuilder
35+
36+
// registry is the global registry of SASL Mechanisms.
37+
var registry saslMechanismRegistry = make(map[string]saslMechanismBuilder)
38+
39+
// register registers a SASL Mechanism to the global registry. It must only be
40+
// called during init().
41+
func (r saslMechanismRegistry) register(b saslMechanismBuilder) {
42+
n := b.name()
43+
if _, ok := r[n]; ok {
44+
panic("duplicate sasl mechanism registered: " + n)
45+
}
46+
r[n] = b
47+
}
48+
49+
// Pick wraps registry.pick() which returns a saslMechanism for the given sink
50+
// URL, or ok=false if none is specified. It consumes all relevant query
51+
// parameters from `u`.
52+
func Pick(u *changefeedbase.SinkURL) (_ SASLMechanism, ok bool, _ error) {
53+
return registry.pick(u)
54+
}
55+
56+
// pick returns a saslMechanism for the given sink URL, or ok=false if none is specified.
57+
func (r saslMechanismRegistry) pick(u *changefeedbase.SinkURL) (_ SASLMechanism, ok bool, _ error) {
58+
if u == nil {
59+
return nil, false, errors.AssertionFailedf("sink url is nil")
60+
}
61+
62+
var enabled bool
63+
if _, err := u.ConsumeBool(changefeedbase.SinkParamSASLEnabled, &enabled); err != nil {
64+
return nil, false, err
65+
}
66+
if !enabled {
67+
return nil, false, maybeHelpfulErrorMessage(enabled, u)
68+
}
69+
70+
mechanism := u.ConsumeParam(changefeedbase.SinkParamSASLMechanism)
71+
if mechanism == "" {
72+
mechanism = sarama.SASLTypePlaintext
73+
}
74+
b, ok := r[mechanism]
75+
if !ok {
76+
return nil, false, errors.Newf("param sasl_mechanism must be one of %s", r.allMechanismNames())
77+
}
78+
79+
// Return slightly nicer errors for this common case.
80+
if b.name() != sarama.SASLTypeOAuth {
81+
if err := validateNoOAuthOnlyParams(u); err != nil {
82+
return nil, false, err
83+
}
84+
}
85+
if err := b.validateParams(u); err != nil {
86+
return nil, false, err
87+
}
88+
mech, err := b.build(u)
89+
if err != nil {
90+
return nil, false, err
91+
}
92+
return mech, true, nil
93+
}
94+
95+
func (r saslMechanismRegistry) allMechanismNames() string {
96+
allMechanisms := make([]string, 0, len(r))
97+
for k := range r {
98+
allMechanisms = append(allMechanisms, k)
99+
}
100+
sort.Strings(allMechanisms)
101+
return strings.Join(allMechanisms[:len(allMechanisms)-1], ", ") +
102+
", or " + allMechanisms[len(allMechanisms)-1]
103+
}
104+
105+
func newRequiredParamError(mechName string, param string) error {
106+
return errors.Newf("%s must be provided when SASL is enabled using mechanism %s", param, mechName)
107+
}
108+
109+
func peekAndRequireParams(
110+
mechName string, u *changefeedbase.SinkURL, requiredParams []string,
111+
) error {
112+
var errs []error
113+
for _, param := range requiredParams {
114+
if u.PeekParam(param) == "" {
115+
errs = append(errs, newRequiredParamError(mechName, param))
116+
}
117+
}
118+
return errors.Join(errs...)
119+
}
120+
121+
// consumeHandshake consumes the handshake parameter from the sink URL.
122+
// handshake defaults to true (if sasl is enabled), unlike other options.
123+
func consumeHandshake(u *changefeedbase.SinkURL) (bool, error) {
124+
var handshake bool
125+
set, err := u.ConsumeBool(changefeedbase.SinkParamSASLHandshake, &handshake)
126+
if err != nil {
127+
return false, err
128+
}
129+
if !set {
130+
handshake = true
131+
}
132+
return handshake, nil
133+
}
134+
135+
// maybeHelpfulErrorMessage returns an error if the user has provided SASL parameters without enabling SASL.
136+
func maybeHelpfulErrorMessage(saslEnabled bool, u *changefeedbase.SinkURL) error {
137+
if !saslEnabled {
138+
// Handle special error messages.
139+
if u.PeekParam(changefeedbase.SinkParamSASLHandshake) != "" {
140+
return errors.New("sasl_enabled must be enabled to configure SASL handshake behavior")
141+
}
142+
if u.PeekParam(changefeedbase.SinkParamSASLMechanism) != "" {
143+
return errors.New("sasl_enabled must be enabled to configure SASL mechanism")
144+
}
145+
146+
saslOnlyParams := []string{
147+
changefeedbase.SinkParamSASLUser,
148+
changefeedbase.SinkParamSASLPassword,
149+
changefeedbase.SinkParamSASLEnabled,
150+
changefeedbase.SinkParamSASLClientID,
151+
changefeedbase.SinkParamSASLClientSecret,
152+
changefeedbase.SinkParamSASLTokenURL,
153+
changefeedbase.SinkParamSASLGrantType,
154+
changefeedbase.SinkParamSASLScopes,
155+
changefeedbase.SinkParamSASLAwsIAMRoleArn,
156+
changefeedbase.SinkParamSASLAwsRegion,
157+
changefeedbase.SinkParamSASLAwsIAMSessionName,
158+
}
159+
for _, p := range saslOnlyParams {
160+
if u.PeekParam(p) != "" {
161+
return errors.Newf("sasl_enabled must be enabled if %s is provided", p)
162+
}
163+
}
164+
}
165+
return nil
166+
}
167+
168+
// validateNoOAuthOnlyParams returns an error if the user has provided
169+
// OAUTHBEARER parameters without setting sasl_mechanism=OAUTHBEARER, for the
170+
// sake of slightly nicer errors.
171+
func validateNoOAuthOnlyParams(u *changefeedbase.SinkURL) error {
172+
oauthOnlyParams := []string{
173+
changefeedbase.SinkParamSASLClientID,
174+
changefeedbase.SinkParamSASLClientSecret,
175+
changefeedbase.SinkParamSASLTokenURL,
176+
changefeedbase.SinkParamSASLGrantType,
177+
changefeedbase.SinkParamSASLScopes,
178+
}
179+
180+
for _, p := range oauthOnlyParams {
181+
if u.PeekParam(p) != "" {
182+
return errors.Newf("%s is only a valid parameter for sasl_mechanism=OAUTHBEARER", p)
183+
}
184+
}
185+
return nil
186+
}
187+
188+
func applySaramaCommon(cfg *sarama.Config, mechName sarama.SASLMechanism, handshake bool) {
189+
cfg.Net.SASL.Enable = true
190+
cfg.Net.SASL.Mechanism = mechName
191+
cfg.Net.SASL.Handshake = handshake
192+
}

0 commit comments

Comments
 (0)