Skip to content

Commit 624bb89

Browse files
committed
changefeedccl: block db-level changefeed creation for non-empty tableset
On startup, the changefeed should not be started until there exist tables in the database to watch. If no tables exist at the time of startup, then a tableset watcher is created and polled; the changefeed will be started from where there is first a target table. Fixes: #147371 Epic: CRDB-1421 Release note: none
1 parent d222ca1 commit 624bb89

File tree

3 files changed

+117
-5
lines changed

3 files changed

+117
-5
lines changed

pkg/ccl/changefeedccl/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ go_library(
6464
"//pkg/ccl/changefeedccl/kvfeed",
6565
"//pkg/ccl/changefeedccl/resolvedspan",
6666
"//pkg/ccl/changefeedccl/schemafeed",
67+
"//pkg/ccl/changefeedccl/tableset",
6768
"//pkg/ccl/changefeedccl/timers",
6869
"//pkg/ccl/kvccl/kvfollowerreadsccl",
6970
"//pkg/ccl/utilccl",

pkg/ccl/changefeedccl/changefeed_stmt.go

Lines changed: 75 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
1919
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedvalidators"
2020
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/checkpoint"
21+
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/tableset"
2122
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
2223
"github.com/cockroachdb/cockroach/pkg/cloud"
2324
"github.com/cockroachdb/cockroach/pkg/cloud/externalconn"
@@ -40,6 +41,7 @@ import (
4041
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
4142
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
4243
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
44+
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
4345
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
4446
"github.com/cockroachdb/cockroach/pkg/sql/exprutil"
4547
"github.com/cockroachdb/cockroach/pkg/sql/isql"
@@ -59,6 +61,7 @@ import (
5961
"github.com/cockroachdb/cockroach/pkg/util/log/channel"
6062
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
6163
"github.com/cockroachdb/cockroach/pkg/util/log/severity"
64+
"github.com/cockroachdb/cockroach/pkg/util/mon"
6265
"github.com/cockroachdb/cockroach/pkg/util/randutil"
6366
"github.com/cockroachdb/cockroach/pkg/util/span"
6467
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
@@ -1724,6 +1727,9 @@ func (b *changefeedResumer) resumeWithRetries(
17241727

17251728
maxBackoff := changefeedbase.MaxRetryBackoff.Get(&execCfg.Settings.SV)
17261729
backoffReset := changefeedbase.RetryBackoffReset.Get(&execCfg.Settings.SV)
1730+
1731+
memMonitor := execinfra.NewMonitor(ctx, execCfg.DistSQLSrv.ChangefeedMonitor, mon.MakeName("changefeed-empty-watcher-mem"))
1732+
defer memMonitor.Stop(ctx)
17271733
for r := getRetry(ctx, maxBackoff, backoffReset); r.Next(); {
17281734
flowErr := maybeUpgradePreProductionReadyExpression(ctx, jobID, details, jobExec)
17291735

@@ -1737,7 +1743,7 @@ func (b *changefeedResumer) resumeWithRetries(
17371743
knobs.BeforeDistChangefeed()
17381744
}
17391745

1740-
confPoller := make(chan struct{})
1746+
runningChangefeedChan := make(chan struct{})
17411747
g := ctxgroup.WithContext(ctx)
17421748
initialHighWater, schemaTS, err := computeDistChangefeedTimestamps(ctx, jobExec, details, localState)
17431749
if err != nil {
@@ -1751,28 +1757,92 @@ func (b *changefeedResumer) resumeWithRetries(
17511757
if haveKnobs && maybeCfKnobs.TablesetChangingCallback != nil {
17521758
maybeCfKnobs.TablesetChangingCallback()
17531759
}
1760+
1761+
// Watcher is only used for db-level changefeeds with no watched tables.
1762+
var watcher *tableset.Watcher
1763+
watcherChan := make(chan []tableset.TableDiff, 1)
1764+
if len(details.TargetSpecifications) == 1 &&
1765+
details.TargetSpecifications[0].Type == jobspb.ChangefeedTargetSpecification_DATABASE &&
1766+
targets.NumUniqueTables() == 0 {
1767+
1768+
// Create watcher dependencies.
1769+
dbID := details.TargetSpecifications[0].DescID
1770+
filter := tableset.Filter{
1771+
DatabaseID: dbID,
1772+
}
1773+
var initialWatcherTS hlc.Timestamp
1774+
if h := localState.progress.GetHighWater(); h != nil && !h.IsEmpty() {
1775+
initialWatcherTS = *h
1776+
} else {
1777+
initialWatcherTS = details.StatementTime
1778+
}
1779+
1780+
// Create a watcher for the database.
1781+
watcher = tableset.NewWatcher(filter, execCfg, memMonitor, int64(jobID))
1782+
timestamp := initialWatcherTS
1783+
g.GoCtx(func(ctx context.Context) error {
1784+
// defer close(watcherChan)
1785+
return watcher.Start(ctx, timestamp)
1786+
})
1787+
// } else {
1788+
// close(watcherChan)
1789+
}
1790+
1791+
// Start the changefeed.
17541792
g.GoCtx(func(ctx context.Context) error {
1755-
defer close(confPoller)
1793+
defer close(runningChangefeedChan)
1794+
var schemaTSOverride hlc.Timestamp
1795+
if targets.NumUniqueTables() == 0 {
1796+
select {
1797+
case <-ctx.Done():
1798+
return ctx.Err()
1799+
case diffs := <-watcherChan:
1800+
// Get the diff with the earliest timestamp.
1801+
schemaTSOverride = diffs[0].AsOf
1802+
schemaTS, initialHighWater = schemaTSOverride, schemaTSOverride
1803+
targets, err = AllTargets(ctx, details, execCfg, schemaTSOverride)
1804+
if err != nil {
1805+
return err
1806+
}
1807+
}
1808+
}
1809+
// return distChangefeedFlow(ctx, jobExec, jobID, details, description, localState, startedCh, onTracingEvent, targets, schemaTSOverride)
17561810
return startDistChangefeed(ctx, jobExec, jobID, schemaTS, details, description, initialHighWater, localState, startedCh, onTracingEvent, targets)
1757-
17581811
})
1812+
1813+
// Poll for updated configuration or new database tables if hibernating.
17591814
g.GoCtx(func(ctx context.Context) error {
17601815
t := time.NewTicker(15 * time.Second)
17611816
defer t.Stop()
1817+
diffSent := false
17621818
for {
17631819
select {
17641820
case <-ctx.Done():
17651821
return ctx.Err()
1766-
case <-confPoller:
1822+
case <-runningChangefeedChan:
17671823
return nil
1768-
case <-t.C:
1824+
case tick := <-t.C:
17691825
newDest, err := reloadDest(ctx, jobID, execCfg)
17701826
if err != nil {
17711827
log.Changefeed.Warningf(ctx, "failed to check for updated configuration: %v", err)
17721828
} else if newDest != resolvedDest {
17731829
resolvedDest = newDest
17741830
return replanErr
17751831
}
1832+
if watcher != nil && !diffSent {
1833+
unchanged, diffs, err := watcher.PopUnchangedUpTo(ctx, hlc.Timestamp{WallTime: tick.UnixNano()})
1834+
if err != nil {
1835+
return err
1836+
}
1837+
if !unchanged {
1838+
select {
1839+
case watcherChan <- diffs:
1840+
diffSent = true
1841+
case <-ctx.Done():
1842+
return ctx.Err()
1843+
}
1844+
}
1845+
}
17761846
}
17771847
}
17781848
})

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12933,3 +12933,44 @@ func TestDatabaseLevelChangefeedChangingTableset(t *testing.T) {
1293312933
}
1293412934
cdcTest(t, testFnAddEnriched, withKnobsFn(knobsFn), feedTestForceSink("kafka"))
1293512935
}
12936+
12937+
func TestDatabaseLevelChangefeedEmptyTableset(t *testing.T) {
12938+
defer leaktest.AfterTest(t)()
12939+
defer log.Scope(t).Close(t)
12940+
12941+
testFnNoWait := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
12942+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
12943+
sqlDB.Exec(t, `CREATE DATABASE db`)
12944+
sqlDB.Exec(t, `GRANT CHANGEFEED ON DATABASE db TO enterprisefeeduser`)
12945+
dbcf := feed(t, f, `CREATE CHANGEFEED FOR DATABASE db`)
12946+
defer closeFeed(t, dbcf)
12947+
12948+
// create a table
12949+
sqlDB.Exec(t, `CREATE TABLE db.foo (a INT PRIMARY KEY, b STRING)`)
12950+
sqlDB.Exec(t, `INSERT INTO db.foo VALUES (0, 'initial')`)
12951+
12952+
assertPayloads(t, dbcf, []string{
12953+
`foo: [0]->{"after": {"a": 0, "b": "initial"}}`,
12954+
})
12955+
}
12956+
cdcTest(t, testFnNoWait, feedTestForceSink("enterprise"))
12957+
12958+
testFnWait := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
12959+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
12960+
sqlDB.Exec(t, `CREATE DATABASE db`)
12961+
sqlDB.Exec(t, `GRANT CHANGEFEED ON DATABASE db TO enterprisefeeduser`)
12962+
dbcf := feed(t, f, `CREATE CHANGEFEED FOR DATABASE db`)
12963+
defer closeFeed(t, dbcf)
12964+
12965+
time.Sleep(5 * time.Second)
12966+
12967+
// create a table
12968+
sqlDB.Exec(t, `CREATE TABLE db.foo (a INT PRIMARY KEY, b STRING)`)
12969+
sqlDB.Exec(t, `INSERT INTO db.foo VALUES (0, 'initial')`)
12970+
12971+
assertPayloads(t, dbcf, []string{
12972+
`foo: [0]->{"after": {"a": 0, "b": "initial"}}`,
12973+
})
12974+
}
12975+
cdcTest(t, testFnWait, feedTestForceSink("enterprise"))
12976+
}

0 commit comments

Comments
 (0)