Skip to content

Commit e4b7882

Browse files
committed
changefeedccl: block db-level changefeed creation for non-empty tableset
On startup, the changefeed should not be started until there exist tables in the database to watch. If no tables exist at the time of startup, then a tableset watcher is created and polled; the changefeed will be started from where there is first a target table. Fixes: #147371 Epic: CRDB-1421 Release note: none
1 parent d222ca1 commit e4b7882

File tree

4 files changed

+123
-5
lines changed

4 files changed

+123
-5
lines changed

pkg/ccl/changefeedccl/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ go_library(
6464
"//pkg/ccl/changefeedccl/kvfeed",
6565
"//pkg/ccl/changefeedccl/resolvedspan",
6666
"//pkg/ccl/changefeedccl/schemafeed",
67+
"//pkg/ccl/changefeedccl/tableset",
6768
"//pkg/ccl/changefeedccl/timers",
6869
"//pkg/ccl/kvccl/kvfollowerreadsccl",
6970
"//pkg/ccl/utilccl",

pkg/ccl/changefeedccl/changefeed_dist.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,7 @@ func startDistChangefeed(
252252
onTracingEvent func(ctx context.Context, meta *execinfrapb.TracingAggregatorEvents),
253253
targets changefeedbase.Targets,
254254
) error {
255+
fmt.Println("startDistChangefeed: schemaTS: ", schemaTS)
255256
execCfg := execCtx.ExecCfg()
256257
tableDescs, err := fetchTableDescriptors(ctx, execCfg, targets, schemaTS)
257258
if err != nil {

pkg/ccl/changefeedccl/changefeed_stmt.go

Lines changed: 80 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
1919
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedvalidators"
2020
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/checkpoint"
21+
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/tableset"
2122
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
2223
"github.com/cockroachdb/cockroach/pkg/cloud"
2324
"github.com/cockroachdb/cockroach/pkg/cloud/externalconn"
@@ -40,6 +41,7 @@ import (
4041
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
4142
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
4243
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
44+
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
4345
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
4446
"github.com/cockroachdb/cockroach/pkg/sql/exprutil"
4547
"github.com/cockroachdb/cockroach/pkg/sql/isql"
@@ -59,6 +61,7 @@ import (
5961
"github.com/cockroachdb/cockroach/pkg/util/log/channel"
6062
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
6163
"github.com/cockroachdb/cockroach/pkg/util/log/severity"
64+
"github.com/cockroachdb/cockroach/pkg/util/mon"
6265
"github.com/cockroachdb/cockroach/pkg/util/randutil"
6366
"github.com/cockroachdb/cockroach/pkg/util/span"
6467
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
@@ -1724,6 +1727,12 @@ func (b *changefeedResumer) resumeWithRetries(
17241727

17251728
maxBackoff := changefeedbase.MaxRetryBackoff.Get(&execCfg.Settings.SV)
17261729
backoffReset := changefeedbase.RetryBackoffReset.Get(&execCfg.Settings.SV)
1730+
1731+
// Create memory monitor for tableset watcher. Similar to how processors create
1732+
// their monitors at function start, we create this unconditionally.
1733+
watcherMemMonitor := execinfra.NewMonitor(ctx, execCfg.DistSQLSrv.ChangefeedMonitor, mon.MakeName("changefeed-tableset-watcher-mem"))
1734+
defer watcherMemMonitor.Stop(ctx)
1735+
17271736
for r := getRetry(ctx, maxBackoff, backoffReset); r.Next(); {
17281737
flowErr := maybeUpgradePreProductionReadyExpression(ctx, jobID, details, jobExec)
17291738

@@ -1737,7 +1746,7 @@ func (b *changefeedResumer) resumeWithRetries(
17371746
knobs.BeforeDistChangefeed()
17381747
}
17391748

1740-
confPoller := make(chan struct{})
1749+
runningChangefeedChan := make(chan struct{})
17411750
g := ctxgroup.WithContext(ctx)
17421751
initialHighWater, schemaTS, err := computeDistChangefeedTimestamps(ctx, jobExec, details, localState)
17431752
if err != nil {
@@ -1751,28 +1760,94 @@ func (b *changefeedResumer) resumeWithRetries(
17511760
if haveKnobs && maybeCfKnobs.TablesetChangingCallback != nil {
17521761
maybeCfKnobs.TablesetChangingCallback()
17531762
}
1763+
1764+
// Watcher is only used for db-level changefeeds with no watched tables.
1765+
var watcher *tableset.Watcher
1766+
watcherChan := make(chan []tableset.TableDiff, 1)
1767+
var watcherCancel context.CancelFunc
1768+
if len(details.TargetSpecifications) == 1 &&
1769+
details.TargetSpecifications[0].Type == jobspb.ChangefeedTargetSpecification_DATABASE &&
1770+
targets.NumUniqueTables() == 0 {
1771+
1772+
// Create watcher dependencies.
1773+
dbID := details.TargetSpecifications[0].DescID
1774+
filter := tableset.Filter{
1775+
DatabaseID: dbID,
1776+
}
1777+
1778+
var watcherCtx context.Context
1779+
// Create a separate cancellable context for the watcher so that
1780+
// it can be stopped when the changefeed is stopped.
1781+
watcherCtx, watcherCancel = context.WithCancel(ctx)
1782+
1783+
// Create a watcher for the database.
1784+
watcher = tableset.NewWatcher(filter, execCfg, watcherMemMonitor, int64(jobID))
1785+
g.GoCtx(func(ctx context.Context) error {
1786+
err := watcher.Start(watcherCtx, schemaTS)
1787+
if err != nil && errors.Is(err, context.Canceled) {
1788+
// Check if watcherCtx was canceled (expected) vs parent ctx (error)
1789+
if errors.Is(watcherCtx.Err(), context.Canceled) && ctx.Err() == nil {
1790+
// Watcher was intentionally canceled, not an error
1791+
return nil
1792+
}
1793+
}
1794+
return err
1795+
})
1796+
}
1797+
1798+
// Start the changefeed.
17541799
g.GoCtx(func(ctx context.Context) error {
1755-
defer close(confPoller)
1800+
defer close(runningChangefeedChan)
1801+
if watcher != nil {
1802+
select {
1803+
case <-ctx.Done():
1804+
return ctx.Err()
1805+
case diffs := <-watcherChan:
1806+
schemaTS = diffs[0].AsOf
1807+
initialHighWater = diffs[0].AsOf
1808+
targets, err = AllTargets(ctx, details, execCfg, schemaTS)
1809+
if err != nil {
1810+
return err
1811+
}
1812+
watcherCancel()
1813+
}
1814+
}
17561815
return startDistChangefeed(ctx, jobExec, jobID, schemaTS, details, description, initialHighWater, localState, startedCh, onTracingEvent, targets)
1757-
17581816
})
1817+
1818+
// Poll for updated configuration or new database tables if hibernating.
17591819
g.GoCtx(func(ctx context.Context) error {
17601820
t := time.NewTicker(15 * time.Second)
17611821
defer t.Stop()
1822+
diffSent := false
17621823
for {
17631824
select {
17641825
case <-ctx.Done():
17651826
return ctx.Err()
1766-
case <-confPoller:
1827+
case <-runningChangefeedChan:
17671828
return nil
1768-
case <-t.C:
1829+
case tick := <-t.C:
17691830
newDest, err := reloadDest(ctx, jobID, execCfg)
17701831
if err != nil {
17711832
log.Changefeed.Warningf(ctx, "failed to check for updated configuration: %v", err)
17721833
} else if newDest != resolvedDest {
17731834
resolvedDest = newDest
17741835
return replanErr
17751836
}
1837+
if watcher != nil && !diffSent {
1838+
unchanged, diffs, err := watcher.PopUnchangedUpTo(ctx, hlc.Timestamp{WallTime: tick.UnixNano()})
1839+
if err != nil {
1840+
return err
1841+
}
1842+
if !unchanged && len(diffs) > 0 {
1843+
select {
1844+
case watcherChan <- diffs:
1845+
diffSent = true
1846+
case <-ctx.Done():
1847+
return ctx.Err()
1848+
}
1849+
}
1850+
}
17761851
}
17771852
}
17781853
})

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12933,3 +12933,44 @@ func TestDatabaseLevelChangefeedChangingTableset(t *testing.T) {
1293312933
}
1293412934
cdcTest(t, testFnAddEnriched, withKnobsFn(knobsFn), feedTestForceSink("kafka"))
1293512935
}
12936+
12937+
func TestDatabaseLevelChangefeedEmptyTableset(t *testing.T) {
12938+
defer leaktest.AfterTest(t)()
12939+
defer log.Scope(t).Close(t)
12940+
12941+
testFnNoWait := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
12942+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
12943+
sqlDB.Exec(t, `CREATE DATABASE db`)
12944+
sqlDB.Exec(t, `GRANT CHANGEFEED ON DATABASE db TO enterprisefeeduser`)
12945+
dbcf := feed(t, f, `CREATE CHANGEFEED FOR DATABASE db`)
12946+
defer closeFeed(t, dbcf)
12947+
12948+
// create a table
12949+
sqlDB.Exec(t, `CREATE TABLE db.foo (a INT PRIMARY KEY, b STRING)`)
12950+
sqlDB.Exec(t, `INSERT INTO db.foo VALUES (0, 'initial')`)
12951+
12952+
assertPayloads(t, dbcf, []string{
12953+
`foo: [0]->{"after": {"a": 0, "b": "initial"}}`,
12954+
})
12955+
}
12956+
cdcTest(t, testFnNoWait, feedTestEnterpriseSinks)
12957+
12958+
testFnWait := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
12959+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
12960+
sqlDB.Exec(t, `CREATE DATABASE db`)
12961+
sqlDB.Exec(t, `GRANT CHANGEFEED ON DATABASE db TO enterprisefeeduser`)
12962+
dbcf := feed(t, f, `CREATE CHANGEFEED FOR DATABASE db`)
12963+
defer closeFeed(t, dbcf)
12964+
12965+
time.Sleep(5 * time.Second)
12966+
12967+
// create a table
12968+
sqlDB.Exec(t, `CREATE TABLE db.foo (a INT PRIMARY KEY, b STRING)`)
12969+
sqlDB.Exec(t, `INSERT INTO db.foo VALUES (0, 'initial')`)
12970+
12971+
assertPayloads(t, dbcf, []string{
12972+
`foo: [0]->{"after": {"a": 0, "b": "initial"}}`,
12973+
})
12974+
}
12975+
cdcTest(t, testFnWait, feedTestEnterpriseSinks)
12976+
}

0 commit comments

Comments
 (0)