Skip to content

Commit 691b171

Browse files
authored
Merge pull request #139237 from cockroachdb/blathers/backport-release-25.1-138944
release-25.1: changefeedccl: refactor kafka auth to be plugin-based
2 parents 1da02ed + 364d58c commit 691b171

32 files changed

+1136
-565
lines changed

pkg/BUILD.bazel

+1
Original file line numberDiff line numberDiff line change
@@ -877,6 +877,7 @@ GO_TARGETS = [
877877
"//pkg/ccl/changefeedccl/changefeedvalidators:changefeedvalidators",
878878
"//pkg/ccl/changefeedccl/checkpoint:checkpoint",
879879
"//pkg/ccl/changefeedccl/checkpoint:checkpoint_test",
880+
"//pkg/ccl/changefeedccl/kafkaauth:kafkaauth",
880881
"//pkg/ccl/changefeedccl/kvevent:kvevent",
881882
"//pkg/ccl/changefeedccl/kvevent:kvevent_test",
882883
"//pkg/ccl/changefeedccl/kvfeed:kvfeed",

pkg/ccl/changefeedccl/BUILD.bazel

+2-9
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ go_library(
2828
"retry.go",
2929
"scheduled_changefeed.go",
3030
"schema_registry.go",
31-
"scram_client.go",
3231
"sink.go",
3332
"sink_cloudstorage.go",
3433
"sink_external_connection.go",
@@ -58,6 +57,7 @@ go_library(
5857
"//pkg/ccl/changefeedccl/changefeedpb",
5958
"//pkg/ccl/changefeedccl/changefeedvalidators",
6059
"//pkg/ccl/changefeedccl/checkpoint",
60+
"//pkg/ccl/changefeedccl/kafkaauth",
6161
"//pkg/ccl/changefeedccl/kvevent",
6262
"//pkg/ccl/changefeedccl/kvfeed",
6363
"//pkg/ccl/changefeedccl/resolvedspan",
@@ -166,7 +166,6 @@ go_library(
166166
"//pkg/util/tracing",
167167
"//pkg/util/uuid",
168168
"@com_github_apache_pulsar_client_go//pulsar",
169-
"@com_github_aws_aws_msk_iam_sasl_signer_go//signer",
170169
"@com_github_cockroachdb_apd_v3//:apd",
171170
"@com_github_cockroachdb_errors//:errors",
172171
"@com_github_cockroachdb_logtags//:logtags",
@@ -183,12 +182,7 @@ go_library(
183182
"@com_github_twmb_franz_go//pkg/kerr",
184183
"@com_github_twmb_franz_go//pkg/kgo",
185184
"@com_github_twmb_franz_go//pkg/kversion",
186-
"@com_github_twmb_franz_go//pkg/sasl",
187-
"@com_github_twmb_franz_go//pkg/sasl/oauth",
188-
"@com_github_twmb_franz_go//pkg/sasl/plain",
189-
"@com_github_twmb_franz_go//pkg/sasl/scram",
190185
"@com_github_twmb_franz_go_pkg_kadm//:kadm",
191-
"@com_github_xdg_go_scram//:scram",
192186
"@com_google_cloud_go_pubsub//:pubsub",
193187
"@com_google_cloud_go_pubsub//apiv1",
194188
"@com_google_cloud_go_pubsub//apiv1/pubsubpb",
@@ -198,8 +192,6 @@ go_library(
198192
"@org_golang_google_grpc//codes",
199193
"@org_golang_google_grpc//credentials/insecure",
200194
"@org_golang_google_grpc//status",
201-
"@org_golang_x_oauth2//:oauth2",
202-
"@org_golang_x_oauth2//clientcredentials",
203195
"@org_golang_x_oauth2//google",
204196
],
205197
)
@@ -375,6 +367,7 @@ go_test(
375367
"@com_github_twmb_franz_go//pkg/kgo",
376368
"@com_github_twmb_franz_go//pkg/kversion",
377369
"@com_github_twmb_franz_go//pkg/sasl",
370+
"@com_github_twmb_franz_go//pkg/sasl/plain",
378371
"@com_github_twmb_franz_go_pkg_kadm//:kadm",
379372
"@com_google_cloud_go_pubsub//apiv1",
380373
"@com_google_cloud_go_pubsub//apiv1/pubsubpb",

pkg/ccl/changefeedccl/changefeed_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -5593,7 +5593,7 @@ func TestChangefeedErrors(t *testing.T) {
55935593
)
55945594
sqlDB.ExpectErrWithTimeout(
55955595
t, `param sasl_handshake must be a bool`,
5596-
`CREATE CHANGEFEED FOR foo INTO $1`, `kafka://nope/?sasl_enabled=true&sasl_handshake=maybe`,
5596+
`CREATE CHANGEFEED FOR foo INTO $1`, `kafka://nope/?sasl_enabled=true&sasl_user=x&sasl_password=y&sasl_handshake=maybe`,
55975597
)
55985598
sqlDB.ExpectErrWithTimeout(
55995599
t, `sasl_enabled must be enabled to configure SASL handshake behavior`,
@@ -5625,14 +5625,14 @@ func TestChangefeedErrors(t *testing.T) {
56255625
)
56265626
sqlDB.ExpectErrWithTimeout(
56275627
t, `sasl_client_id is only a valid parameter for sasl_mechanism=OAUTHBEARER`,
5628-
`CREATE CHANGEFEED FOR foo INTO $1`, `kafka://nope/?sasl_client_id=a`,
5628+
`CREATE CHANGEFEED FOR foo INTO $1`, `kafka://nope/?sasl_enabled=true&sasl_client_id=a`,
56295629
)
56305630
sqlDB.ExpectErrWithTimeout(
56315631
t, `sasl_enabled must be enabled to configure SASL mechanism`,
56325632
`CREATE CHANGEFEED FOR foo INTO $1`, `kafka://nope/?sasl_mechanism=SCRAM-SHA-256`,
56335633
)
56345634
sqlDB.ExpectErrWithTimeout(
5635-
t, `param sasl_mechanism must be one of SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER, PLAIN or AWS_MSK_IAM`,
5635+
t, `param sasl_mechanism must be one of AWS_MSK_IAM, OAUTHBEARER, PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512`,
56365636
`CREATE CHANGEFEED FOR foo INTO $1`, `kafka://nope/?sasl_enabled=true&sasl_mechanism=unsuppported`,
56375637
)
56385638
sqlDB.ExpectErrWithTimeout(

pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel

+2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ go_library(
77
"errors.go",
88
"options.go",
99
"settings.go",
10+
"sink_url.go",
1011
"target.go",
1112
],
1213
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase",
@@ -20,6 +21,7 @@ go_library(
2021
"//pkg/sql/catalog/descpb",
2122
"//pkg/sql/pgwire/pgcode",
2223
"//pkg/sql/pgwire/pgerror",
24+
"//pkg/util",
2325
"//pkg/util/iterutil",
2426
"//pkg/util/metamorphic",
2527
"@com_github_cockroachdb_errors//:errors",

pkg/ccl/changefeedccl/changefeedbase/options.go

-3
Original file line numberDiff line numberDiff line change
@@ -239,9 +239,6 @@ const (
239239
Topics = `topics`
240240
)
241241

242-
// Support additional mechanism on top of the default SASL mechanism.
243-
const SASLTypeAWSMSKIAM = "AWS_MSK_IAM"
244-
245242
func makeStringSet(opts ...string) map[string]struct{} {
246243
res := make(map[string]struct{}, len(opts))
247244
for _, opt := range opts {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package changefeedbase
7+
8+
import (
9+
"encoding/base64"
10+
"net/url"
11+
"strconv"
12+
13+
"github.com/cockroachdb/cockroach/pkg/util"
14+
"github.com/cockroachdb/errors"
15+
)
16+
17+
// SinkURL is a helper struct which for "consuming" URL query
18+
// parameters from the underlying URL.
19+
type SinkURL struct {
20+
_ util.NoCopy
21+
*url.URL
22+
q url.Values
23+
}
24+
25+
func (u *SinkURL) PeekParam(p string) string {
26+
if u.q == nil {
27+
u.q = u.Query()
28+
}
29+
v := u.q.Get(p)
30+
return v
31+
}
32+
33+
func (u *SinkURL) ConsumeParam(p string) string {
34+
v := u.PeekParam(p)
35+
u.q.Del(p)
36+
return v
37+
}
38+
39+
func (u *SinkURL) ConsumeParams(p string) []string {
40+
if u.q == nil {
41+
u.q = u.Query()
42+
}
43+
v := u.q[p]
44+
u.q.Del(p)
45+
return v
46+
}
47+
48+
func (u *SinkURL) AddParam(p string, value string) {
49+
if u.q == nil {
50+
u.q = u.Query()
51+
}
52+
u.q.Add(p, value)
53+
}
54+
55+
func (u *SinkURL) SetParam(p string, value string) {
56+
if u.q == nil {
57+
u.q = u.Query()
58+
}
59+
u.q.Set(p, value)
60+
}
61+
62+
func (u *SinkURL) ConsumeBool(param string, dest *bool) (wasSet bool, err error) {
63+
if paramVal := u.ConsumeParam(param); paramVal != "" {
64+
wasSet, err := strToBool(paramVal, dest)
65+
if err != nil {
66+
return false, errors.Wrapf(err, "param %s must be a bool", param)
67+
}
68+
return wasSet, err
69+
}
70+
return false, nil
71+
}
72+
73+
func (u *SinkURL) ConsumeBoolParam(param string) (bool, error) {
74+
var b bool
75+
_, err := u.ConsumeBool(param, &b)
76+
return b, err
77+
}
78+
79+
func (u *SinkURL) DecodeBase64(param string, dest *[]byte) error {
80+
// TODO(dan): There's a straightforward and unambiguous transformation
81+
// between the base 64 encoding defined in RFC 4648 and the URL variant
82+
// defined in the same RFC: simply replace all `+` with `-` and `/` with
83+
// `_`. Consider always doing this for the user and accepting either
84+
// variant.
85+
val := u.ConsumeParam(param)
86+
err := DecodeBase64FromString(val, dest)
87+
if err != nil {
88+
return errors.Wrapf(err, `param %s must be base 64 encoded`, param)
89+
}
90+
return nil
91+
}
92+
93+
func (u *SinkURL) RemainingQueryParams() (res []string) {
94+
for p := range u.q {
95+
res = append(res, p)
96+
}
97+
return
98+
}
99+
100+
func (u *SinkURL) String() string {
101+
if u.q != nil {
102+
// If we changed query params, re-encode them.
103+
u.URL.RawQuery = u.q.Encode()
104+
u.q = nil
105+
}
106+
return u.URL.String()
107+
}
108+
109+
func strToBool(src string, dest *bool) (wasSet bool, err error) {
110+
b, err := strconv.ParseBool(src)
111+
if err != nil {
112+
return false, err
113+
}
114+
*dest = b
115+
return true, nil
116+
}
117+
118+
func DecodeBase64FromString(src string, dest *[]byte) error {
119+
if src == `` {
120+
return nil
121+
}
122+
decoded, err := base64.StdEncoding.DecodeString(src)
123+
if err != nil {
124+
return err
125+
}
126+
*dest = decoded
127+
return nil
128+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library")
2+
3+
go_library(
4+
name = "kafkaauth",
5+
srcs = [
6+
"doc.go",
7+
"kafkaauth.go",
8+
"sasl_msk.go",
9+
"sasl_oauth.go",
10+
"sasl_plain.go",
11+
"sasl_scram.go",
12+
"scram_client.go",
13+
],
14+
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kafkaauth",
15+
visibility = ["//visibility:public"],
16+
deps = [
17+
"//pkg/ccl/changefeedccl/changefeedbase",
18+
"@com_github_aws_aws_msk_iam_sasl_signer_go//signer",
19+
"@com_github_cockroachdb_errors//:errors",
20+
"@com_github_ibm_sarama//:sarama",
21+
"@com_github_twmb_franz_go//pkg/kgo",
22+
"@com_github_twmb_franz_go//pkg/sasl",
23+
"@com_github_twmb_franz_go//pkg/sasl/oauth",
24+
"@com_github_twmb_franz_go//pkg/sasl/plain",
25+
"@com_github_twmb_franz_go//pkg/sasl/scram",
26+
"@com_github_xdg_go_scram//:scram",
27+
"@org_golang_x_oauth2//:oauth2",
28+
"@org_golang_x_oauth2//clientcredentials",
29+
],
30+
)
+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
// Package kafkaauth provides Kafka SASL authentication mechanisms for use with
7+
// the changefeed kafka sinks (v1 & v2). Most standard SASL mechanisms are
8+
// supported, and some custom ones as well.
9+
package kafkaauth

0 commit comments

Comments
 (0)