Skip to content

Commit 8ec7eb1

Browse files
committed
changefeedccl: add sink_io_workers option to CREATE statement
Previously, the number of SinkIOWorkers could only be defined in cluster-wide setting. This patch allows to set the number of SinkIOWorkers per changefeed. If both (cluster-wide, per-changefeed) values are given, the changefeed value will be given precedence. Release note (sql change): The CREATE CHANGEFEED statement was extended with an optional `io_sink_workers` setting that can be used to set the number of SinkIOWorkers per changefeed. Fixes: #154546
1 parent 0f2bc7b commit 8ec7eb1

File tree

10 files changed

+322
-16
lines changed

10 files changed

+322
-16
lines changed

WORKSPACE

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ load(
185185
go_download_sdk(
186186
name = "go_sdk",
187187
sdks = {
188+
"darwin_amd64": ("go1.25.3.darwin-amd64.tar.gz", "2ba409dd10128f529c2b44623b559e0b7890e5f210a311325215a5ae5879474d"),
188189
"darwin_arm64": ("go1.25.3.darwin-arm64.tar.gz", "746818c3703980b10279f9afec54145b35b0e3d5801fe6f5bdceeea53bcb6792"),
189190
"linux_amd64": ("go1.25.3.linux-amd64.tar.gz", "821f2ede78f535fe10f95c38e2c6c87bbc200649dee20068ae4424de400196c7"),
190191
"linux_arm64": ("go1.25.3.linux-arm64.tar.gz", "f6537bed6500b20d3ac1c8fd2e27609cb3c89b5e54579e344c09febb651e9598"),
@@ -582,14 +583,17 @@ register_toolchains(
582583
"//build/toolchains:cross_arm64_windows_toolchain",
583584
"//build/toolchains:cross_arm64_macos_toolchain",
584585
"//build/toolchains:cross_arm64_macos_arm_toolchain",
586+
"@copy_directory_toolchains//:darwin_amd64_toolchain",
585587
"@copy_directory_toolchains//:darwin_arm64_toolchain",
586588
"@copy_directory_toolchains//:linux_amd64_toolchain",
587589
"@copy_directory_toolchains//:linux_arm64_toolchain",
588590
"@copy_directory_toolchains//:windows_amd64_toolchain",
591+
"@copy_to_directory_toolchains//:darwin_amd64_toolchain",
589592
"@copy_to_directory_toolchains//:darwin_arm64_toolchain",
590593
"@copy_to_directory_toolchains//:linux_amd64_toolchain",
591594
"@copy_to_directory_toolchains//:linux_arm64_toolchain",
592595
"@copy_to_directory_toolchains//:windows_amd64_toolchain",
596+
"@nodejs_toolchains//:darwin_amd64_toolchain",
593597
"@nodejs_toolchains//:darwin_arm64_toolchain",
594598
"@nodejs_toolchains//:linux_amd64_toolchain",
595599
"@nodejs_toolchains//:linux_arm64_toolchain",

build/bazelutil/distdir_files.bzl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1186,6 +1186,7 @@ DISTDIR_FILES = {
11861186
"https://storage.googleapis.com/public-bazel-artifacts/c-deps/20250801-193032/libproj_foreign.macos.20250801-193032.tar.gz": "8d28434cd175f0a32dfdd8ba8a5fa44c3d04d1e53cccfe9dbb3c6e301a03a47c",
11871187
"https://storage.googleapis.com/public-bazel-artifacts/c-deps/20250801-193032/libproj_foreign.macosarm.20250801-193032.tar.gz": "a4b0bbb056bb462682b49ec34816f02c71047b38733d50d8de78b737c892db61",
11881188
"https://storage.googleapis.com/public-bazel-artifacts/c-deps/20250801-193032/libproj_foreign.windows.20250801-193032.tar.gz": "a61f4faf7a7d017a194c64b453a38c982423ef3678fa049dbf114920759da59c",
1189+
"https://storage.googleapis.com/public-bazel-artifacts/go/20251028-171726/go1.25.3.darwin-amd64.tar.gz": "2ba409dd10128f529c2b44623b559e0b7890e5f210a311325215a5ae5879474d",
11891190
"https://storage.googleapis.com/public-bazel-artifacts/go/20251028-171726/go1.25.3.darwin-arm64.tar.gz": "746818c3703980b10279f9afec54145b35b0e3d5801fe6f5bdceeea53bcb6792",
11901191
"https://storage.googleapis.com/public-bazel-artifacts/go/20251028-171726/go1.25.3.linux-amd64.tar.gz": "821f2ede78f535fe10f95c38e2c6c87bbc200649dee20068ae4424de400196c7",
11911192
"https://storage.googleapis.com/public-bazel-artifacts/go/20251028-171726/go1.25.3.linux-arm64.tar.gz": "f6537bed6500b20d3ac1c8fd2e27609cb3c89b5e54579e344c09febb651e9598",

build/nodejs.bzl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ load("@rules_nodejs//nodejs/private:toolchains_repo.bzl", "toolchains_repo")
66

77
_NODE_VERSION = "16.14.2"
88
_NODE_VERSIONS = {
9+
"darwin_amd64": ("node-v16.14.2-darwin-x64.tar.gz", "node-v16.14.2-darwin-x64", "d3076ca7fcc7269c8ff9b03fe7d1c277d913a7e84a46a14eff4af7791ff9d055"),
910
"darwin_arm64": ("node-v16.14.2-darwin-arm64.tar.gz", "node-v16.14.2-darwin-arm64", "a66d9217d2003bd416d3dd06dfd2c7a044c4c9ff2e43a27865790bd0d59c682d"),
1011
"linux_amd64": ("node-v16.14.2-linux-x64.tar.xz", "node-v16.14.2-linux-x64", "e40c6f81bfd078976d85296b5e657be19e06862497741ad82902d0704b34bb1b"),
1112
"linux_arm64": ("node-v16.14.2-linux-arm64.tar.xz", "node-v16.14.2-linux-arm64", "f7c5a573c06a520d6c2318f6ae204141b8420386553a692fc359f8ae3d88df96"),
@@ -20,6 +21,7 @@ _URL_PREFIX = "https://storage.googleapis.com/public-bazel-artifacts/js/aspect-b
2021
_COPY_DIRECTORY_URL_PREFIX = _URL_PREFIX + "/copy_directory-"
2122

2223
_COPY_DIRECTORY_VERSIONS = {
24+
"darwin_amd64": "2f4befad49d25f867221f9beb7d03b174c03af5395fc860c24447c85fbdd2d7d",
2325
"darwin_arm64": "e8ca2ab1655cc71fab3106d433dd4274389bdf9f143f586ab83a6f8ab7aeabad",
2426
"linux_amd64": "fae863215e3acc6e5e50ac2979e6d9d29c95b57fa1eb719de801926db57e5941",
2527
"linux_arm64": "7dff652aa2b1e4d5ab163cf2be841038da1cc3530e9875d5fb2cef0ef4e9efb8",
@@ -30,6 +32,7 @@ _COPY_DIRECTORY_VERSIONS = {
3032
_COPY_TO_DIRECTORY_URL_PREFIX = _URL_PREFIX + "/copy_to_directory-"
3133

3234
_COPY_TO_DIRECTORY_VERSIONS = {
35+
"darwin_amd64": "6132ce07141ed17d658acb72a777bf619b17a18b6f3950b3689ac057f81ebdb0",
3336
"darwin_arm64": "fe8ba630878178adcebe52097dde407b1554c6118c3a17b67c0f47f461c7b3d5",
3437
"linux_amd64": "6ce36555a198a42fa1642b19fcd685d9584fb71e0da9b9fec15dc14f43527171",
3538
"linux_arm64": "17015a948a3d106222c157925a4b5edc832d336beb5fc7f8d34a7ee0c827809d",

build/scripts/build-bazel-lib-helpers.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ set -euxo pipefail
1010
THIS_DIR=$(cd "$(dirname "$0")" && pwd)
1111
BIN_DIR=$(realpath $THIS_DIR/../../bin/aspect-bazel-lib-utils-$(date +%Y%m%d-%H%M%S))
1212

13-
PLATS=(darwin_arm64 linux_amd64 linux_arm64 linux_s390x windows_amd64)
13+
PLATS=(darwin_amd64 darwin_arm64 linux_amd64 linux_arm64 linux_s390x windows_amd64)
1414

1515
mkdir -p $BIN_DIR
1616

build/teamcity/internal/release/build-and-publish-patched-go/impl.sh

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ cd go
6969
git checkout $GOCOMMIT
7070
cd ..
7171
72-
CONFIGS="linux_amd64 linux_arm64 linux_s390x darwin_arm64 windows_amd64"
72+
CONFIGS="linux_amd64 linux_arm64 linux_s390x darwin_amd64 darwin_arm64 windows_amd64"
7373
7474
for CONFIG in $CONFIGS; do
7575
case $CONFIG in
@@ -85,6 +85,10 @@ for CONFIG in $CONFIGS; do
8585
CC_FOR_TARGET=/x-tools/s390x-ibm-linux-gnu/bin/s390x-ibm-linux-gnu-cc
8686
CXX_FOR_TARGET=/x-tools/s390x-ibm-linux-gnu/bin/s390x-ibm-linux-gnu-c++
8787
;;
88+
darwin_amd64)
89+
CC_FOR_TARGET=/x-tools/x86_64-apple-darwin21.2/bin/x86_64-apple-darwin21.2-cc
90+
CXX_FOR_TARGET=/x-tools/x86_64-apple-darwin21.2/bin/x86_64-apple-darwin21.2-c++
91+
;;
8892
darwin_arm64)
8993
CC_FOR_TARGET=/x-tools/x86_64-apple-darwin21.2/bin/aarch64-apple-darwin21.2-cc
9094
CXX_FOR_TARGET=/x-tools/x86_64-apple-darwin21.2/bin/aarch64-apple-darwin21.2-c++

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"path"
2222
"reflect"
2323
"regexp"
24+
"runtime"
2425
"slices"
2526
"sort"
2627
"strconv"
@@ -13084,3 +13085,117 @@ func TestDatabaseLevelChangefeedWithInitialScanOptions(t *testing.T) {
1308413085
})
1308513086
}
1308613087
}
13088+
13089+
// TestChangefeedNumSinkWorkersPrecedence verifies that the sink_io_workers
13090+
// option works correctly and that the precedence logic (changefeed option is used when set)
13091+
// is applied when both cluster setting and per-changefeed option are set.
13092+
func TestChangefeedNumSinkWorkersPrecedence(t *testing.T) {
13093+
defer leaktest.AfterTest(t)()
13094+
defer log.Scope(t).Close(t)
13095+
13096+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
13097+
registry := s.Server.JobRegistry().(*jobs.Registry)
13098+
metrics := registry.MetricsStruct().Changefeed.(*Metrics).AggMetrics
13099+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
13100+
13101+
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
13102+
sqlDB.Exec(t, `INSERT INTO foo VALUES (1)`)
13103+
KafkaV2Enabled.Override(context.Background(), &s.Server.ClusterSettings().SV, true)
13104+
13105+
t.Run("uses per-changefeed option when cluster setting is zero", func(t *testing.T) {
13106+
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.sink_io_workers = 0`)
13107+
cf := feed(t, f, `CREATE CHANGEFEED FOR foo WITH sink_io_workers = '5'`)
13108+
defer closeFeed(t, cf)
13109+
13110+
testutils.SucceedsSoon(t, func() error {
13111+
workers := metrics.ParallelIOWorkers.Value()
13112+
if workers != 5 {
13113+
return errors.Newf("expected 5 workers, got %d", workers)
13114+
}
13115+
return nil
13116+
})
13117+
})
13118+
13119+
t.Run("uses per-changefeed option when it is set", func(t *testing.T) {
13120+
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.sink_io_workers = 10`)
13121+
cf := feed(t, f, `CREATE CHANGEFEED FOR foo WITH sink_io_workers = '3'`)
13122+
defer closeFeed(t, cf)
13123+
13124+
testutils.SucceedsSoon(t, func() error {
13125+
workers := metrics.ParallelIOWorkers.Value()
13126+
if workers != 3 {
13127+
return errors.Newf("expected 3 workers (from changefeed setting), got %d", workers)
13128+
}
13129+
return nil
13130+
})
13131+
})
13132+
13133+
t.Run("uses cluster setting when per-changefeed option is not set", func(t *testing.T) {
13134+
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.sink_io_workers = 7`)
13135+
cf := feed(t, f, `CREATE CHANGEFEED FOR foo`)
13136+
defer closeFeed(t, cf)
13137+
13138+
testutils.SucceedsSoon(t, func() error {
13139+
workers := metrics.ParallelIOWorkers.Value()
13140+
if workers != 7 {
13141+
return errors.Newf("expected 7 workers (from cluster setting), got %d", workers)
13142+
}
13143+
return nil
13144+
})
13145+
})
13146+
13147+
t.Run("uses GOMAXPROCS default when changefeed setting is negative", func(t *testing.T) {
13148+
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.sink_io_workers = 10`)
13149+
cf := feed(t, f, `CREATE CHANGEFEED FOR foo WITH sink_io_workers = '-1'`)
13150+
defer closeFeed(t, cf)
13151+
13152+
testutils.SucceedsSoon(t, func() error {
13153+
workers := metrics.ParallelIOWorkers.Value()
13154+
expectedWorkers := runtime.GOMAXPROCS(0)
13155+
if workers != int64(expectedWorkers) {
13156+
return errors.Newf("expected %d workers (GOMAXPROCS-based), got %d", expectedWorkers, workers)
13157+
}
13158+
return nil
13159+
})
13160+
})
13161+
13162+
t.Run("uses GOMAXPROCS default when cluster setting is negative and no changefeed option is set", func(t *testing.T) {
13163+
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.sink_io_workers = -1`)
13164+
cf := feed(t, f, `CREATE CHANGEFEED FOR foo`)
13165+
defer closeFeed(t, cf)
13166+
13167+
testutils.SucceedsSoon(t, func() error {
13168+
workers := metrics.ParallelIOWorkers.Value()
13169+
expectedWorkers := runtime.GOMAXPROCS(0)
13170+
if workers != int64(expectedWorkers) {
13171+
return errors.Newf("expected %d workers (GOMAXPROCS-based), got %d", expectedWorkers, workers)
13172+
}
13173+
return nil
13174+
})
13175+
})
13176+
13177+
t.Run("uses GOMAXPROCS default when both sink_io_workers settings are negative", func(t *testing.T) {
13178+
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.sink_io_workers = -1`)
13179+
cf := feed(t, f, `CREATE CHANGEFEED FOR foo WITH sink_io_workers = '-1'`)
13180+
defer closeFeed(t, cf)
13181+
13182+
testutils.SucceedsSoon(t, func() error {
13183+
workers := metrics.ParallelIOWorkers.Value()
13184+
expectedWorkers := runtime.GOMAXPROCS(0)
13185+
if workers != int64(expectedWorkers) {
13186+
return errors.Newf("expected %d workers (GOMAXPROCS-based), got %d", expectedWorkers, workers)
13187+
}
13188+
return nil
13189+
})
13190+
})
13191+
13192+
t.Run("rejects invalid sink_io_workers value", func(t *testing.T) {
13193+
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.sink_io_workers = 5`)
13194+
sqlDB.ExpectErrWithTimeout(t, "invalid integer value",
13195+
`CREATE CHANGEFEED FOR foo WITH sink_io_workers = 'invalid'`)
13196+
})
13197+
}
13198+
13199+
// Test with sinks that support parallel IO.
13200+
cdcTest(t, testFn, feedTestRestrictSinks("pubsub", "webhook", "kafka"))
13201+
}

pkg/ccl/changefeedccl/changefeedbase/options.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"context"
1010
"fmt"
1111
"net/url"
12+
"strconv"
1213
"strings"
1314
"time"
1415

@@ -132,6 +133,7 @@ const (
132133
OptLaggingRangesPollingInterval = `lagging_ranges_polling_interval`
133134
OptIgnoreDisableChangefeedReplication = `ignore_disable_changefeed_replication`
134135
OptEncodeJSONValueNullAsObject = `encode_json_value_null_as_object`
136+
OptSinkIOWorkers = `sink_io_workers`
135137
// TODO(#142273): look into whether we want to add headers to pub/sub, and other
136138
// sinks as well (eg cloudstorage, webhook, ..). Currently it's kafka-only.
137139
OptHeadersJSONColumnName = `headers_json_column_name`
@@ -430,6 +432,7 @@ var ChangefeedOptionExpectValues = map[string]OptionPermittedValues{
430432
OptLaggingRangesPollingInterval: durationOption,
431433
OptIgnoreDisableChangefeedReplication: flagOption,
432434
OptEncodeJSONValueNullAsObject: flagOption,
435+
OptSinkIOWorkers: stringOption,
433436
OptEnrichedProperties: csv(string(EnrichedPropertySource), string(EnrichedPropertySchema)),
434437
OptRangeDistributionStrategy: enum(string(ChangefeedRangeDistributionStrategyDefault), string(ChangefeedRangeDistributionStrategyBalancedSimple)),
435438
OptHeadersJSONColumnName: stringOption,
@@ -449,6 +452,7 @@ var CommonOptions = makeStringSet(OptCursor, OptEndTime, OptEnvelope,
449452
OptExecutionLocality, OptLaggingRangesThreshold, OptLaggingRangesPollingInterval,
450453
OptIgnoreDisableChangefeedReplication, OptEncodeJSONValueNullAsObject, OptEnrichedProperties,
451454
OptRangeDistributionStrategy,
455+
OptSinkIOWorkers,
452456
)
453457

454458
// SQLValidOptions is options exclusive to SQL sink
@@ -1235,6 +1239,21 @@ func (s StatementOptions) GetPTSExpiration() (time.Duration, error) {
12351239
return *exp, nil
12361240
}
12371241

1242+
// GetNumSinkWorkers returns the number of sink IO workers to use.
1243+
// Returns nil if not set.
1244+
// Returns an error if the set value is not a valid integer.
1245+
func (s StatementOptions) GetNumSinkWorkers() (*int64, error) {
1246+
v, ok := s.m[OptSinkIOWorkers]
1247+
if !ok {
1248+
return nil, nil
1249+
}
1250+
result, err := strconv.ParseInt(v, 10, 64)
1251+
if err != nil {
1252+
return nil, errors.Newf("invalid integer value for %s: %q", OptSinkIOWorkers, v)
1253+
}
1254+
return &result, nil
1255+
}
1256+
12381257
// ForceKeyInValue sets the encoding option KeyInValue to true and then validates the
12391258
// resoluting encoding options.
12401259
func (s StatementOptions) ForceKeyInValue() error {
@@ -1377,6 +1396,10 @@ func (s StatementOptions) ValidateForCreateChangefeed(isPredicateChangefeed bool
13771396
}
13781397
}
13791398
}
1399+
// Validate that sink_io_workers is an integer value if it is set.
1400+
if _, err := s.GetNumSinkWorkers(); err != nil {
1401+
return err
1402+
}
13801403
return nil
13811404
}
13821405

pkg/ccl/changefeedccl/changefeedbase/options_test.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,68 @@ func TestOptionsValidations(t *testing.T) {
5151
}
5252
}
5353

54+
func TestNumSinkWorkersOption(t *testing.T) {
55+
defer leaktest.AfterTest(t)()
56+
defer log.Scope(t).Close(t)
57+
58+
tests := []struct {
59+
name string
60+
input map[string]string
61+
expected int64
62+
expectErr bool
63+
}{
64+
{
65+
name: "positive value",
66+
input: map[string]string{"sink_io_workers": "5"},
67+
expected: 5,
68+
expectErr: false,
69+
},
70+
{
71+
name: "zero value (default)",
72+
input: map[string]string{"sink_io_workers": "0"},
73+
expected: 0,
74+
expectErr: false,
75+
},
76+
{
77+
name: "negative value (disable)",
78+
input: map[string]string{"sink_io_workers": "-1"},
79+
expected: -1,
80+
expectErr: false,
81+
},
82+
{
83+
name: "not set",
84+
input: map[string]string{},
85+
expected: 0,
86+
expectErr: false,
87+
},
88+
{
89+
name: "invalid non-integer",
90+
input: map[string]string{"sink_io_workers": "abc"},
91+
expected: 0,
92+
expectErr: true,
93+
},
94+
{
95+
name: "invalid float",
96+
input: map[string]string{"sink_io_workers": "3.14"},
97+
expected: 0,
98+
expectErr: true,
99+
},
100+
}
101+
102+
for _, test := range tests {
103+
t.Run(test.name, func(t *testing.T) {
104+
o := MakeStatementOptions(test.input)
105+
val, err := o.GetNumSinkWorkers()
106+
if test.expectErr {
107+
require.Error(t, err)
108+
} else {
109+
require.NoError(t, err)
110+
require.Equal(t, test.expected, val)
111+
}
112+
})
113+
}
114+
}
115+
54116
func TestEncodingOptionsValidations(t *testing.T) {
55117
defer leaktest.AfterTest(t)()
56118
defer log.Scope(t).Close(t)

0 commit comments

Comments
 (0)