Skip to content

Commit bdb281f

Browse files
committed
changefeedccl: refactor: move sinkURL to its own file in changefeedbase
Epic: none Release note: None
1 parent 3eec173 commit bdb281f

20 files changed

+253
-202
lines changed

pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ go_library(
77
"errors.go",
88
"options.go",
99
"settings.go",
10+
"sink_url.go",
1011
"target.go",
1112
],
1213
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase",
@@ -20,6 +21,7 @@ go_library(
2021
"//pkg/sql/catalog/descpb",
2122
"//pkg/sql/pgwire/pgcode",
2223
"//pkg/sql/pgwire/pgerror",
24+
"//pkg/util",
2325
"//pkg/util/iterutil",
2426
"//pkg/util/metamorphic",
2527
"@com_github_cockroachdb_errors//:errors",
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package changefeedbase
7+
8+
import (
9+
"encoding/base64"
10+
"net/url"
11+
"strconv"
12+
13+
"github.com/cockroachdb/cockroach/pkg/util"
14+
"github.com/cockroachdb/errors"
15+
)
16+
17+
// SinkURL is a helper struct which for "consuming" URL query
18+
// parameters from the underlying URL.
19+
type SinkURL struct {
20+
_ util.NoCopy
21+
*url.URL
22+
q url.Values
23+
}
24+
25+
func (u *SinkURL) PeekParam(p string) string {
26+
if u.q == nil {
27+
u.q = u.Query()
28+
}
29+
v := u.q.Get(p)
30+
return v
31+
}
32+
33+
func (u *SinkURL) ConsumeParam(p string) string {
34+
v := u.PeekParam(p)
35+
u.q.Del(p)
36+
return v
37+
}
38+
39+
func (u *SinkURL) ConsumeParams(p string) []string {
40+
if u.q == nil {
41+
u.q = u.Query()
42+
}
43+
v := u.q[p]
44+
u.q.Del(p)
45+
return v
46+
}
47+
48+
func (u *SinkURL) AddParam(p string, value string) {
49+
if u.q == nil {
50+
u.q = u.Query()
51+
}
52+
u.q.Add(p, value)
53+
}
54+
55+
func (u *SinkURL) SetParam(p string, value string) {
56+
if u.q == nil {
57+
u.q = u.Query()
58+
}
59+
u.q.Set(p, value)
60+
}
61+
62+
func (u *SinkURL) ConsumeBool(param string, dest *bool) (wasSet bool, err error) {
63+
if paramVal := u.ConsumeParam(param); paramVal != "" {
64+
wasSet, err := strToBool(paramVal, dest)
65+
if err != nil {
66+
return false, errors.Wrapf(err, "param %s must be a bool", param)
67+
}
68+
return wasSet, err
69+
}
70+
return false, nil
71+
}
72+
73+
func (u *SinkURL) ConsumeBoolParam(param string) (bool, error) {
74+
var b bool
75+
_, err := u.ConsumeBool(param, &b)
76+
return b, err
77+
}
78+
79+
func (u *SinkURL) DecodeBase64(param string, dest *[]byte) error {
80+
// TODO(dan): There's a straightforward and unambiguous transformation
81+
// between the base 64 encoding defined in RFC 4648 and the URL variant
82+
// defined in the same RFC: simply replace all `+` with `-` and `/` with
83+
// `_`. Consider always doing this for the user and accepting either
84+
// variant.
85+
val := u.ConsumeParam(param)
86+
err := DecodeBase64FromString(val, dest)
87+
if err != nil {
88+
return errors.Wrapf(err, `param %s must be base 64 encoded`, param)
89+
}
90+
return nil
91+
}
92+
93+
func (u *SinkURL) RemainingQueryParams() (res []string) {
94+
for p := range u.q {
95+
res = append(res, p)
96+
}
97+
return
98+
}
99+
100+
func (u *SinkURL) String() string {
101+
if u.q != nil {
102+
// If we changed query params, re-encode them.
103+
u.URL.RawQuery = u.q.Encode()
104+
u.q = nil
105+
}
106+
return u.URL.String()
107+
}
108+
109+
func strToBool(src string, dest *bool) (wasSet bool, err error) {
110+
b, err := strconv.ParseBool(src)
111+
if err != nil {
112+
return false, err
113+
}
114+
*dest = b
115+
return true, nil
116+
}
117+
118+
func DecodeBase64FromString(src string, dest *[]byte) error {
119+
if src == `` {
120+
return nil
121+
}
122+
decoded, err := base64.StdEncoding.DecodeString(src)
123+
if err != nil {
124+
return err
125+
}
126+
*dest = decoded
127+
return nil
128+
}

pkg/ccl/changefeedccl/schema_registry.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func getAndDeleteParams(u *url.URL) (*schemaRegistryParams, error) {
8989
changefeedbase.RegistryParamClientKey} {
9090
if stringParam := query.Get(k); stringParam != "" {
9191
var decoded []byte
92-
err := decodeBase64FromString(stringParam, &decoded)
92+
err := changefeedbase.DecodeBase64FromString(stringParam, &decoded)
9393
if err != nil {
9494
return nil, errors.Wrapf(err, "param %s must be base 64 encoded", k)
9595
}

pkg/ccl/changefeedccl/sink.go

Lines changed: 11 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -246,23 +246,23 @@ func getSink(
246246
if knobs, ok := serverCfg.TestingKnobs.Changefeed.(*TestingKnobs); ok {
247247
nullIsAccounted = knobs.NullSinkIsExternalIOAccounted
248248
}
249-
return makeNullSink(sinkURL{URL: u}, metricsBuilder(nullIsAccounted))
249+
return makeNullSink(&changefeedbase.SinkURL{URL: u}, metricsBuilder(nullIsAccounted))
250250
case isKafkaSink(u):
251251
return validateOptionsAndMakeSink(changefeedbase.KafkaValidOptions, func() (Sink, error) {
252252
if KafkaV2Enabled.Get(&serverCfg.Settings.SV) {
253-
return makeKafkaSinkV2(ctx, sinkURL{URL: u}, AllTargets(feedCfg), opts.GetKafkaConfigJSON(),
253+
return makeKafkaSinkV2(ctx, &changefeedbase.SinkURL{URL: u}, AllTargets(feedCfg), opts.GetKafkaConfigJSON(),
254254
numSinkIOWorkers(serverCfg), newCPUPacerFactory(ctx, serverCfg), timeutil.DefaultTimeSource{},
255255
serverCfg.Settings, metricsBuilder, kafkaSinkV2Knobs{})
256256
} else {
257-
return makeKafkaSink(ctx, sinkURL{URL: u}, AllTargets(feedCfg), opts.GetKafkaConfigJSON(), serverCfg.Settings, metricsBuilder)
257+
return makeKafkaSink(ctx, &changefeedbase.SinkURL{URL: u}, AllTargets(feedCfg), opts.GetKafkaConfigJSON(), serverCfg.Settings, metricsBuilder)
258258
}
259259
})
260260
case isPulsarSink(u):
261261
var testingKnobs *TestingKnobs
262262
if knobs, ok := serverCfg.TestingKnobs.Changefeed.(*TestingKnobs); ok {
263263
testingKnobs = knobs
264264
}
265-
return makePulsarSink(ctx, sinkURL{URL: u}, encodingOpts, AllTargets(feedCfg), opts.GetKafkaConfigJSON(),
265+
return makePulsarSink(ctx, &changefeedbase.SinkURL{URL: u}, encodingOpts, AllTargets(feedCfg), opts.GetKafkaConfigJSON(),
266266
serverCfg.Settings, metricsBuilder, testingKnobs)
267267
case isWebhookSink(u):
268268
webhookOpts, err := opts.GetWebhookSinkOptions()
@@ -271,13 +271,13 @@ func getSink(
271271
}
272272
if WebhookV2Enabled.Get(&serverCfg.Settings.SV) {
273273
return validateOptionsAndMakeSink(changefeedbase.WebhookValidOptions, func() (Sink, error) {
274-
return makeWebhookSink(ctx, sinkURL{URL: u}, encodingOpts, webhookOpts,
274+
return makeWebhookSink(ctx, &changefeedbase.SinkURL{URL: u}, encodingOpts, webhookOpts,
275275
numSinkIOWorkers(serverCfg), newCPUPacerFactory(ctx, serverCfg), timeutil.DefaultTimeSource{},
276276
metricsBuilder, serverCfg.Settings)
277277
})
278278
} else {
279279
return validateOptionsAndMakeSink(changefeedbase.WebhookValidOptions, func() (Sink, error) {
280-
return makeDeprecatedWebhookSink(ctx, sinkURL{URL: u}, encodingOpts, webhookOpts,
280+
return makeDeprecatedWebhookSink(ctx, &changefeedbase.SinkURL{URL: u}, encodingOpts, webhookOpts,
281281
defaultWorkerCount(), timeutil.DefaultTimeSource{}, metricsBuilder)
282282
})
283283
}
@@ -307,18 +307,18 @@ func getSink(
307307
nodeID = serverCfg.NodeID.SQLInstanceID()
308308
}
309309
return makeCloudStorageSink(
310-
ctx, sinkURL{URL: u}, nodeID, serverCfg.Settings, encodingOpts,
310+
ctx, &changefeedbase.SinkURL{URL: u}, nodeID, serverCfg.Settings, encodingOpts,
311311
timestampOracle, serverCfg.ExternalStorageFromURI, user, metricsBuilder, testingKnobs,
312312
)
313313
})
314314
case u.Scheme == changefeedbase.SinkSchemeExperimentalSQL:
315315
return validateOptionsAndMakeSink(changefeedbase.SQLValidOptions, func() (Sink, error) {
316-
return makeSQLSink(sinkURL{URL: u}, sqlSinkTableName, AllTargets(feedCfg), metricsBuilder)
316+
return makeSQLSink(&changefeedbase.SinkURL{URL: u}, sqlSinkTableName, AllTargets(feedCfg), metricsBuilder)
317317
})
318318
case u.Scheme == changefeedbase.SinkSchemeExternalConnection:
319319
return validateOptionsAndMakeSink(changefeedbase.ExternalConnectionValidOptions, func() (Sink, error) {
320320
return makeExternalConnectionSink(
321-
ctx, sinkURL{URL: u}, user, makeExternalConnectionProvider(ctx, serverCfg.DB),
321+
ctx, &changefeedbase.SinkURL{URL: u}, user, makeExternalConnectionProvider(ctx, serverCfg.DB),
322322
serverCfg, feedCfg, timestampOracle, jobID, m,
323323
)
324324
})
@@ -362,70 +362,6 @@ func validateSinkOptions(opts map[string]string, sinkSpecificOpts map[string]str
362362
return nil
363363
}
364364

365-
// sinkURL is a helper struct which for "consuming" URL query
366-
// parameters from the underlying URL.
367-
type sinkURL struct {
368-
*url.URL
369-
q url.Values
370-
}
371-
372-
func (u *sinkURL) consumeParam(p string) string {
373-
if u.q == nil {
374-
u.q = u.Query()
375-
}
376-
v := u.q.Get(p)
377-
u.q.Del(p)
378-
return v
379-
}
380-
381-
func (u *sinkURL) addParam(p string, value string) {
382-
if u.q == nil {
383-
u.q = u.Query()
384-
}
385-
u.q.Add(p, value)
386-
}
387-
388-
func (u *sinkURL) consumeBool(param string, dest *bool) (wasSet bool, err error) {
389-
if paramVal := u.consumeParam(param); paramVal != "" {
390-
wasSet, err := strToBool(paramVal, dest)
391-
if err != nil {
392-
return false, errors.Wrapf(err, "param %s must be a bool", param)
393-
}
394-
return wasSet, err
395-
}
396-
return false, nil
397-
}
398-
399-
func (u *sinkURL) decodeBase64(param string, dest *[]byte) error {
400-
// TODO(dan): There's a straightforward and unambiguous transformation
401-
// between the base 64 encoding defined in RFC 4648 and the URL variant
402-
// defined in the same RFC: simply replace all `+` with `-` and `/` with
403-
// `_`. Consider always doing this for the user and accepting either
404-
// variant.
405-
val := u.consumeParam(param)
406-
err := decodeBase64FromString(val, dest)
407-
if err != nil {
408-
return errors.Wrapf(err, `param %s must be base 64 encoded`, param)
409-
}
410-
return nil
411-
}
412-
413-
func (u *sinkURL) remainingQueryParams() (res []string) {
414-
for p := range u.q {
415-
res = append(res, p)
416-
}
417-
return
418-
}
419-
420-
func (u *sinkURL) String() string {
421-
if u.q != nil {
422-
// If we changed query params, re-encode them.
423-
u.URL.RawQuery = u.q.Encode()
424-
u.q = nil
425-
}
426-
return u.URL.String()
427-
}
428-
429365
// errorWrapperSink delegates to another sink and marks all returned errors as
430366
// retryable. During changefeed setup, we use the sink once without this to
431367
// verify configuration, but in the steady state, no sink error should be
@@ -616,9 +552,9 @@ func (n *nullSink) getConcreteType() sinkType {
616552

617553
var _ Sink = (*nullSink)(nil)
618554

619-
func makeNullSink(u sinkURL, m metricsRecorder) (Sink, error) {
555+
func makeNullSink(u *changefeedbase.SinkURL, m metricsRecorder) (Sink, error) {
620556
var pacer *time.Ticker
621-
if delay := u.consumeParam(`delay`); delay != "" {
557+
if delay := u.ConsumeParam(`delay`); delay != "" {
622558
pace, err := time.ParseDuration(delay)
623559
if err != nil {
624560
return nil, err

pkg/ccl/changefeedccl/sink_cloudstorage.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ const flushQueueDepth = 256
377377

378378
func makeCloudStorageSink(
379379
ctx context.Context,
380-
u sinkURL,
380+
u *changefeedbase.SinkURL,
381381
srcID base.SQLInstanceID,
382382
settings *cluster.Settings,
383383
encodingOpts changefeedbase.EncodingOptions,
@@ -388,7 +388,7 @@ func makeCloudStorageSink(
388388
testingKnobs *TestingKnobs,
389389
) (Sink, error) {
390390
var targetMaxFileSize int64 = 16 << 20 // 16MB
391-
if fileSizeParam := u.consumeParam(changefeedbase.SinkParamFileSize); fileSizeParam != `` {
391+
if fileSizeParam := u.ConsumeParam(changefeedbase.SinkParamFileSize); fileSizeParam != `` {
392392
var err error
393393
if targetMaxFileSize, err = humanizeutil.ParseBytes(fileSizeParam); err != nil {
394394
return nil, pgerror.Wrapf(err, pgcode.Syntax, `parsing %s`, fileSizeParam)
@@ -431,7 +431,7 @@ func makeCloudStorageSink(
431431
}
432432
s.flushGroup.GoCtx(s.asyncFlusher)
433433

434-
if partitionFormat := u.consumeParam(changefeedbase.SinkParamPartitionFormat); partitionFormat != "" {
434+
if partitionFormat := u.ConsumeParam(changefeedbase.SinkParamPartitionFormat); partitionFormat != "" {
435435
dateFormat, ok := partitionDateFormats[partitionFormat]
436436
if !ok {
437437
return nil, errors.Errorf("invalid partition_format of %s", partitionFormat)

pkg/ccl/changefeedccl/sink_cloudstorage_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -605,7 +605,7 @@ func TestCloudStorageSink(t *testing.T) {
605605
timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf}
606606

607607
sinkURIWithParam := sinkURI(t, targetMaxFileSize)
608-
sinkURIWithParam.addParam(changefeedbase.SinkParamPartitionFormat, tc.format)
608+
sinkURIWithParam.AddParam(changefeedbase.SinkParamPartitionFormat, tc.format)
609609
t.Logf("format=%s sinkgWithParam: %s", tc.format, sinkURIWithParam.String())
610610
s, err := makeCloudStorageSink(
611611
ctx, sinkURIWithParam, 1, settings, opts,
@@ -944,12 +944,12 @@ func testDir(t *testing.T) string {
944944
return strings.ReplaceAll(t.Name(), "/", ";")
945945
}
946946

947-
func sinkURI(t *testing.T, maxFileSize int64) sinkURL {
947+
func sinkURI(t *testing.T, maxFileSize int64) *changefeedbase.SinkURL {
948948
u, err := url.Parse(fmt.Sprintf("nodelocal://1/%s", testDir(t)))
949949
require.NoError(t, err)
950-
sink := sinkURL{URL: u}
950+
sink := &changefeedbase.SinkURL{URL: u}
951951
if maxFileSize != unlimitedFileSize {
952-
sink.addParam(changefeedbase.SinkParamFileSize, strconv.FormatInt(maxFileSize, 10))
952+
sink.AddParam(changefeedbase.SinkParamFileSize, strconv.FormatInt(maxFileSize, 10))
953953
}
954954
return sink
955955
}

pkg/ccl/changefeedccl/sink_external_connection.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121

2222
func makeExternalConnectionSink(
2323
ctx context.Context,
24-
u sinkURL,
24+
u *changefeedbase.SinkURL,
2525
user username.SQLUsername,
2626
p externalConnectionProvider,
2727
serverCfg *execinfra.ServerConfig,

0 commit comments

Comments
 (0)