Skip to content

Commit 8c495d8

Browse files
committed
changefeedccl: add sink_io_workers option to CREATE statement
Previously, the number of SinkIOWorkers could only be defined in cluster-wide setting. This patch allows to set the number of SinkIOWorkers per changefeed. If both (cluster-wide, per-changefeed) values are given, the changefeed value will be given precedence. Release note (sql change): The CREATE CHANGEFEED statement was extended with an optional `io_sink_workers` setting that can be used to set the number of SinkIOWorkers per changefeed. Fixes: #154546
1 parent 0f2bc7b commit 8c495d8

File tree

5 files changed

+308
-14
lines changed

5 files changed

+308
-14
lines changed

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"path"
2222
"reflect"
2323
"regexp"
24+
"runtime"
2425
"slices"
2526
"sort"
2627
"strconv"
@@ -13084,3 +13085,117 @@ func TestDatabaseLevelChangefeedWithInitialScanOptions(t *testing.T) {
1308413085
})
1308513086
}
1308613087
}
13088+
13089+
// TestChangefeedNumSinkWorkersPrecedence verifies that the sink_io_workers
13090+
// option works correctly and that the precedence logic (smaller value wins)
13091+
// is applied when both cluster setting and per-changefeed option are set.
13092+
func TestChangefeedNumSinkWorkersPrecedence(t *testing.T) {
13093+
defer leaktest.AfterTest(t)()
13094+
defer log.Scope(t).Close(t)
13095+
13096+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
13097+
registry := s.Server.JobRegistry().(*jobs.Registry)
13098+
metrics := registry.MetricsStruct().Changefeed.(*Metrics).AggMetrics
13099+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
13100+
13101+
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
13102+
sqlDB.Exec(t, `INSERT INTO foo VALUES (1)`)
13103+
KafkaV2Enabled.Override(context.Background(), &s.Server.ClusterSettings().SV, true)
13104+
13105+
t.Run("uses per-changefeed option when cluster setting is zero", func(t *testing.T) {
13106+
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.sink_io_workers = 0`)
13107+
cf := feed(t, f, `CREATE CHANGEFEED FOR foo WITH sink_io_workers = '5'`)
13108+
defer closeFeed(t, cf)
13109+
13110+
testutils.SucceedsSoon(t, func() error {
13111+
workers := metrics.ParallelIOWorkers.Value()
13112+
if workers != 5 {
13113+
return errors.Newf("expected 5 workers, got %d", workers)
13114+
}
13115+
return nil
13116+
})
13117+
})
13118+
13119+
t.Run("uses per-changefeed option when it is set", func(t *testing.T) {
13120+
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.sink_io_workers = 10`)
13121+
cf := feed(t, f, `CREATE CHANGEFEED FOR foo WITH sink_io_workers = '3'`)
13122+
defer closeFeed(t, cf)
13123+
13124+
testutils.SucceedsSoon(t, func() error {
13125+
workers := metrics.ParallelIOWorkers.Value()
13126+
if workers != 3 {
13127+
return errors.Newf("expected 3 workers (from changefeed setting), got %d", workers)
13128+
}
13129+
return nil
13130+
})
13131+
})
13132+
13133+
t.Run("uses cluster setting when per-changefeed option is not set", func(t *testing.T) {
13134+
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.sink_io_workers = 7`)
13135+
cf := feed(t, f, `CREATE CHANGEFEED FOR foo`)
13136+
defer closeFeed(t, cf)
13137+
13138+
testutils.SucceedsSoon(t, func() error {
13139+
workers := metrics.ParallelIOWorkers.Value()
13140+
if workers != 7 {
13141+
return errors.Newf("expected 7 workers (from cluster setting), got %d", workers)
13142+
}
13143+
return nil
13144+
})
13145+
})
13146+
13147+
t.Run("uses GOMAXPROCS default when changefeed setting is negative", func(t *testing.T) {
13148+
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.sink_io_workers = 10`)
13149+
cf := feed(t, f, `CREATE CHANGEFEED FOR foo WITH sink_io_workers = '-1'`)
13150+
defer closeFeed(t, cf)
13151+
13152+
testutils.SucceedsSoon(t, func() error {
13153+
workers := metrics.ParallelIOWorkers.Value()
13154+
expectedWorkers := runtime.GOMAXPROCS(0)
13155+
if workers != int64(expectedWorkers) {
13156+
return errors.Newf("expected %d workers (GOMAXPROCS-based), got %d", expectedWorkers, workers)
13157+
}
13158+
return nil
13159+
})
13160+
})
13161+
13162+
t.Run("uses GOMAXPROCS default when cluster setting is negative and no changefeed option is set", func(t *testing.T) {
13163+
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.sink_io_workers = -1`)
13164+
cf := feed(t, f, `CREATE CHANGEFEED FOR foo`)
13165+
defer closeFeed(t, cf)
13166+
13167+
testutils.SucceedsSoon(t, func() error {
13168+
workers := metrics.ParallelIOWorkers.Value()
13169+
expectedWorkers := runtime.GOMAXPROCS(0)
13170+
if workers != int64(expectedWorkers) {
13171+
return errors.Newf("expected %d workers (GOMAXPROCS-based), got %d", expectedWorkers, workers)
13172+
}
13173+
return nil
13174+
})
13175+
})
13176+
13177+
t.Run("uses GOMAXPROCS default when both sink_io_workers settings are negative", func(t *testing.T) {
13178+
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.sink_io_workers = -1`)
13179+
cf := feed(t, f, `CREATE CHANGEFEED FOR foo WITH sink_io_workers = '-1'`)
13180+
defer closeFeed(t, cf)
13181+
13182+
testutils.SucceedsSoon(t, func() error {
13183+
workers := metrics.ParallelIOWorkers.Value()
13184+
expectedWorkers := runtime.GOMAXPROCS(0)
13185+
if workers != int64(expectedWorkers) {
13186+
return errors.Newf("expected %d workers (GOMAXPROCS-based), got %d", expectedWorkers, workers)
13187+
}
13188+
return nil
13189+
})
13190+
})
13191+
13192+
t.Run("rejects invalid sink_io_workers value", func(t *testing.T) {
13193+
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.sink_io_workers = 5`)
13194+
sqlDB.ExpectErrWithTimeout(t, "invalid integer value",
13195+
`CREATE CHANGEFEED FOR foo WITH sink_io_workers = 'invalid'`)
13196+
})
13197+
}
13198+
13199+
// Test with sinks that support parallel IO.
13200+
cdcTest(t, testFn, feedTestRestrictSinks("pubsub", "webhook", "kafka"))
13201+
}

pkg/ccl/changefeedccl/changefeedbase/options.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"context"
1010
"fmt"
1111
"net/url"
12+
"strconv"
1213
"strings"
1314
"time"
1415

@@ -132,6 +133,7 @@ const (
132133
OptLaggingRangesPollingInterval = `lagging_ranges_polling_interval`
133134
OptIgnoreDisableChangefeedReplication = `ignore_disable_changefeed_replication`
134135
OptEncodeJSONValueNullAsObject = `encode_json_value_null_as_object`
136+
OptSinkIOWorkers = `sink_io_workers`
135137
// TODO(#142273): look into whether we want to add headers to pub/sub, and other
136138
// sinks as well (eg cloudstorage, webhook, ..). Currently it's kafka-only.
137139
OptHeadersJSONColumnName = `headers_json_column_name`
@@ -430,6 +432,7 @@ var ChangefeedOptionExpectValues = map[string]OptionPermittedValues{
430432
OptLaggingRangesPollingInterval: durationOption,
431433
OptIgnoreDisableChangefeedReplication: flagOption,
432434
OptEncodeJSONValueNullAsObject: flagOption,
435+
OptSinkIOWorkers: stringOption,
433436
OptEnrichedProperties: csv(string(EnrichedPropertySource), string(EnrichedPropertySchema)),
434437
OptRangeDistributionStrategy: enum(string(ChangefeedRangeDistributionStrategyDefault), string(ChangefeedRangeDistributionStrategyBalancedSimple)),
435438
OptHeadersJSONColumnName: stringOption,
@@ -449,6 +452,7 @@ var CommonOptions = makeStringSet(OptCursor, OptEndTime, OptEnvelope,
449452
OptExecutionLocality, OptLaggingRangesThreshold, OptLaggingRangesPollingInterval,
450453
OptIgnoreDisableChangefeedReplication, OptEncodeJSONValueNullAsObject, OptEnrichedProperties,
451454
OptRangeDistributionStrategy,
455+
OptSinkIOWorkers,
452456
)
453457

454458
// SQLValidOptions is options exclusive to SQL sink
@@ -1235,6 +1239,21 @@ func (s StatementOptions) GetPTSExpiration() (time.Duration, error) {
12351239
return *exp, nil
12361240
}
12371241

1242+
// GetNumSinkWorkers returns the number of sink IO workers to use.
1243+
// Returns nil if not set.
1244+
// Returns an error if the set value is not a valid integer.
1245+
func (s StatementOptions) GetNumSinkWorkers() (*int64, error) {
1246+
v, ok := s.m[OptSinkIOWorkers]
1247+
if !ok {
1248+
return nil, nil
1249+
}
1250+
result, err := strconv.ParseInt(v, 10, 64)
1251+
if err != nil {
1252+
return nil, errors.Newf("invalid integer value for %s: %q", OptSinkIOWorkers, v)
1253+
}
1254+
return &result, nil
1255+
}
1256+
12381257
// ForceKeyInValue sets the encoding option KeyInValue to true and then validates the
12391258
// resoluting encoding options.
12401259
func (s StatementOptions) ForceKeyInValue() error {
@@ -1377,6 +1396,10 @@ func (s StatementOptions) ValidateForCreateChangefeed(isPredicateChangefeed bool
13771396
}
13781397
}
13791398
}
1399+
// Validate that sink_io_workers is an integer value if it is set.
1400+
if _, err := s.GetNumSinkWorkers(); err != nil {
1401+
return err
1402+
}
13801403
return nil
13811404
}
13821405

pkg/ccl/changefeedccl/changefeedbase/options_test.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,68 @@ func TestOptionsValidations(t *testing.T) {
5151
}
5252
}
5353

54+
func TestNumSinkWorkersOption(t *testing.T) {
55+
defer leaktest.AfterTest(t)()
56+
defer log.Scope(t).Close(t)
57+
58+
tests := []struct {
59+
name string
60+
input map[string]string
61+
expected int64
62+
expectErr bool
63+
}{
64+
{
65+
name: "positive value",
66+
input: map[string]string{"sink_io_workers": "5"},
67+
expected: 5,
68+
expectErr: false,
69+
},
70+
{
71+
name: "zero value (default)",
72+
input: map[string]string{"sink_io_workers": "0"},
73+
expected: 0,
74+
expectErr: false,
75+
},
76+
{
77+
name: "negative value (disable)",
78+
input: map[string]string{"sink_io_workers": "-1"},
79+
expected: -1,
80+
expectErr: false,
81+
},
82+
{
83+
name: "not set",
84+
input: map[string]string{},
85+
expected: 0,
86+
expectErr: false,
87+
},
88+
{
89+
name: "invalid non-integer",
90+
input: map[string]string{"sink_io_workers": "abc"},
91+
expected: 0,
92+
expectErr: true,
93+
},
94+
{
95+
name: "invalid float",
96+
input: map[string]string{"sink_io_workers": "3.14"},
97+
expected: 0,
98+
expectErr: true,
99+
},
100+
}
101+
102+
for _, test := range tests {
103+
t.Run(test.name, func(t *testing.T) {
104+
o := MakeStatementOptions(test.input)
105+
val, err := o.GetNumSinkWorkers()
106+
if test.expectErr {
107+
require.Error(t, err)
108+
} else {
109+
require.NoError(t, err)
110+
require.Equal(t, test.expected, val)
111+
}
112+
})
113+
}
114+
}
115+
54116
func TestEncodingOptionsValidations(t *testing.T) {
55117
defer leaktest.AfterTest(t)()
56118
defer log.Scope(t).Close(t)

pkg/ccl/changefeedccl/sink.go

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -258,8 +258,12 @@ func getSink(
258258
return nil, err
259259
}
260260
if KafkaV2Enabled.Get(&serverCfg.Settings.SV) {
261+
numWorkers, err := numSinkIOWorkers(serverCfg, opts)
262+
if err != nil {
263+
return nil, err
264+
}
261265
return makeKafkaSinkV2(ctx, &changefeedbase.SinkURL{URL: u}, targets, sinkOpts,
262-
numSinkIOWorkers(serverCfg), newCPUPacerFactory(ctx, serverCfg), timeutil.DefaultTimeSource{},
266+
numWorkers, newCPUPacerFactory(ctx, serverCfg), timeutil.DefaultTimeSource{},
263267
serverCfg.Settings, metricsBuilder, kafkaSinkV2Knobs{})
264268
} else {
265269
return makeKafkaSink(ctx, &changefeedbase.SinkURL{URL: u}, targets, sinkOpts, serverCfg.Settings, metricsBuilder)
@@ -282,17 +286,25 @@ func getSink(
282286
return nil, err
283287
}
284288
return validateOptionsAndMakeSink(changefeedbase.WebhookValidOptions, func() (Sink, error) {
289+
numWorkers, err := numSinkIOWorkers(serverCfg, opts)
290+
if err != nil {
291+
return nil, err
292+
}
285293
return makeWebhookSink(ctx, &changefeedbase.SinkURL{URL: u}, encodingOpts, webhookOpts,
286-
numSinkIOWorkers(serverCfg), newCPUPacerFactory(ctx, serverCfg), timeutil.DefaultTimeSource{},
294+
numWorkers, newCPUPacerFactory(ctx, serverCfg), timeutil.DefaultTimeSource{},
287295
metricsBuilder, serverCfg.Settings)
288296
})
289297
case isPubsubSink(u):
290298
var testingKnobs *TestingKnobs
291299
if knobs, ok := serverCfg.TestingKnobs.Changefeed.(*TestingKnobs); ok {
292300
testingKnobs = knobs
293301
}
302+
numWorkers, err := numSinkIOWorkers(serverCfg, opts)
303+
if err != nil {
304+
return nil, err
305+
}
294306
return makePubsubSink(ctx, u, encodingOpts, opts.GetPubsubConfigJSON(), targets,
295-
opts.IsSet(changefeedbase.OptUnordered), numSinkIOWorkers(serverCfg),
307+
opts.IsSet(changefeedbase.OptUnordered), numWorkers,
296308
newCPUPacerFactory(ctx, serverCfg), timeutil.DefaultTimeSource{},
297309
metricsBuilder, serverCfg.Settings, testingKnobs)
298310
case isCloudStorageSink(u):
@@ -446,9 +458,11 @@ type encDatumRowBuffer []rowenc.EncDatumRow
446458
func (b *encDatumRowBuffer) IsEmpty() bool {
447459
return b == nil || len(*b) == 0
448460
}
461+
449462
func (b *encDatumRowBuffer) Push(r rowenc.EncDatumRow) {
450463
*b = append(*b, r)
451464
}
465+
452466
func (b *encDatumRowBuffer) Pop() rowenc.EncDatumRow {
453467
ret := (*b)[0]
454468
*b = (*b)[1:]
@@ -745,7 +759,7 @@ func getSinkConfigFromJson(
745759
) (batchCfg sinkBatchConfig, retryCfg retry.Options, err error) {
746760
retryCfg = defaultRetryConfig()
747761

748-
var cfg = baseConfig
762+
cfg := baseConfig
749763
cfg.Retry.Max = jsonMaxRetries(retryCfg.MaxRetries)
750764
cfg.Retry.Backoff = jsonDuration(retryCfg.InitialBackoff)
751765
if jsonStr != `` {
@@ -802,20 +816,25 @@ func (j *jsonMaxRetries) UnmarshalJSON(b []byte) error {
802816
return nil
803817
}
804818

805-
func numSinkIOWorkers(cfg *execinfra.ServerConfig) int {
806-
numWorkers := changefeedbase.SinkIOWorkers.Get(&cfg.Settings.SV)
807-
if numWorkers > 0 {
808-
return int(numWorkers)
819+
func numSinkIOWorkers(
820+
cfg *execinfra.ServerConfig, opts changefeedbase.StatementOptions,
821+
) (int, error) {
822+
changefeedWorkers, err := opts.GetNumSinkWorkers()
823+
if err != nil {
824+
return 0, errors.Wrap(err, "Invalid sink config. num_sink_workers must be an integer")
809825
}
810826

811-
idealNumber := runtime.GOMAXPROCS(0)
812-
if idealNumber < 1 {
813-
return 1
827+
clusterWorkers := changefeedbase.SinkIOWorkers.Get(&cfg.Settings.SV)
828+
829+
value := int(clusterWorkers)
830+
if changefeedWorkers != nil {
831+
value = int(*changefeedWorkers)
814832
}
815-
if idealNumber > 32 {
816-
return 32
833+
if value <= 0 {
834+
value = min(runtime.GOMAXPROCS(0), 32)
817835
}
818-
return idealNumber
836+
837+
return value, nil
819838
}
820839

821840
func newCPUPacerFactory(ctx context.Context, cfg *execinfra.ServerConfig) func() *admission.Pacer {

0 commit comments

Comments
 (0)