Skip to content

Commit 44dc6f3

Browse files
committed
changefeedccl: block db-level changefeed creation for non-empty tableset
On startup, the changefeed should not be started until there exist tables in the database to watch. If no tables exist at the time of startup, then a tableset watcher is created and polled; the changefeed will be started from where there is first a target table. This patch also makes a change to SHOW CHANGEFEED JOBS to show changefeed jobs with no target tables (ie db-level changefeed waiting for a table to be created). Fixes: #147371 Epic: CRDB-1421 Release note: none
1 parent b4ddc73 commit 44dc6f3

File tree

4 files changed

+270
-7
lines changed

4 files changed

+270
-7
lines changed

pkg/ccl/changefeedccl/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ go_library(
6464
"//pkg/ccl/changefeedccl/kvfeed",
6565
"//pkg/ccl/changefeedccl/resolvedspan",
6666
"//pkg/ccl/changefeedccl/schemafeed",
67+
"//pkg/ccl/changefeedccl/tableset",
6768
"//pkg/ccl/changefeedccl/timers",
6869
"//pkg/ccl/kvccl/kvfollowerreadsccl",
6970
"//pkg/ccl/utilccl",

pkg/ccl/changefeedccl/changefeed_stmt.go

Lines changed: 101 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
1919
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedvalidators"
2020
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/checkpoint"
21+
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/tableset"
2122
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
2223
"github.com/cockroachdb/cockroach/pkg/cloud"
2324
"github.com/cockroachdb/cockroach/pkg/cloud/externalconn"
@@ -40,9 +41,11 @@ import (
4041
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
4142
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
4243
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
44+
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
4345
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
4446
"github.com/cockroachdb/cockroach/pkg/sql/exprutil"
4547
"github.com/cockroachdb/cockroach/pkg/sql/isql"
48+
"github.com/cockroachdb/cockroach/pkg/sql/parser"
4649
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
4750
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
4851
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotice"
@@ -59,6 +62,7 @@ import (
5962
"github.com/cockroachdb/cockroach/pkg/util/log/channel"
6063
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
6164
"github.com/cockroachdb/cockroach/pkg/util/log/severity"
65+
"github.com/cockroachdb/cockroach/pkg/util/mon"
6266
"github.com/cockroachdb/cockroach/pkg/util/randutil"
6367
"github.com/cockroachdb/cockroach/pkg/util/span"
6468
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
@@ -1750,6 +1754,12 @@ func (b *changefeedResumer) resumeWithRetries(
17501754

17511755
maxBackoff := changefeedbase.MaxRetryBackoff.Get(&execCfg.Settings.SV)
17521756
backoffReset := changefeedbase.RetryBackoffReset.Get(&execCfg.Settings.SV)
1757+
1758+
// Create memory monitor for tableset watcher. Similar to how processors create
1759+
// their monitors at function start, we create this unconditionally.
1760+
watcherMemMonitor := execinfra.NewMonitor(ctx, execCfg.DistSQLSrv.ChangefeedMonitor, mon.MakeName("changefeed-tableset-watcher-mem"))
1761+
defer watcherMemMonitor.Stop(ctx)
1762+
17531763
for r := getRetry(ctx, maxBackoff, backoffReset); r.Next(); {
17541764
flowErr := maybeUpgradePreProductionReadyExpression(ctx, jobID, details, jobExec)
17551765

@@ -1763,7 +1773,7 @@ func (b *changefeedResumer) resumeWithRetries(
17631773
knobs.BeforeDistChangefeed()
17641774
}
17651775

1766-
confPoller := make(chan struct{})
1776+
runningChangefeedChan := make(chan struct{})
17671777
g := ctxgroup.WithContext(ctx)
17681778
initialHighWater, schemaTS, err := computeDistChangefeedTimestamps(ctx, jobExec, details, localState)
17691779
if err != nil {
@@ -1777,28 +1787,113 @@ func (b *changefeedResumer) resumeWithRetries(
17771787
if err != nil {
17781788
return err
17791789
}
1790+
1791+
// This watcher is only used for db-level changefeeds with no watched tables.
1792+
var watcher *tableset.Watcher
1793+
watcherChan := make(chan []tableset.TableDiff, 1)
1794+
var cancelWatcher context.CancelFunc
1795+
var watcherCtx context.Context
1796+
if len(details.TargetSpecifications) == 1 &&
1797+
details.TargetSpecifications[0].Type == jobspb.ChangefeedTargetSpecification_DATABASE &&
1798+
targets.NumUniqueTables() == 0 {
1799+
1800+
// Create watcher dependencies.
1801+
dbID := details.TargetSpecifications[0].DescID
1802+
filter := tableset.Filter{
1803+
DatabaseID: dbID,
1804+
}
1805+
if details.TargetSpecifications[0].FilterList != nil && details.TargetSpecifications[0].FilterList.Tables != nil {
1806+
if details.TargetSpecifications[0].FilterList.FilterType == tree.IncludeFilter {
1807+
filter.IncludeTables = make(map[string]struct{})
1808+
for fqTableName := range details.TargetSpecifications[0].FilterList.Tables {
1809+
// Extract just the table name from the fully qualified name (e.g., "db.public.table" -> "table")
1810+
tn, err := parser.ParseQualifiedTableName(fqTableName)
1811+
if err != nil {
1812+
log.Changefeed.Warningf(ctx, "failed to parse table name %q: %v", fqTableName, err)
1813+
continue
1814+
}
1815+
filter.IncludeTables[tn.Object()] = struct{}{}
1816+
}
1817+
} else if details.TargetSpecifications[0].FilterList.FilterType == tree.ExcludeFilter {
1818+
filter.ExcludeTables = make(map[string]struct{})
1819+
for fqTableName := range details.TargetSpecifications[0].FilterList.Tables {
1820+
// Extract just the table name from the fully qualified name (e.g., "db.public.table" -> "table")
1821+
tn, err := parser.ParseQualifiedTableName(fqTableName)
1822+
if err != nil {
1823+
log.Changefeed.Warningf(ctx, "failed to parse table name %q: %v", fqTableName, err)
1824+
continue
1825+
}
1826+
filter.ExcludeTables[tn.Object()] = struct{}{}
1827+
}
1828+
}
1829+
}
1830+
1831+
// Create a watcher for the database.
1832+
watcher = tableset.NewWatcher(filter, execCfg, watcherMemMonitor, int64(jobID))
1833+
g.GoCtx(func(ctx context.Context) error {
1834+
watcherCtx, cancelWatcher = context.WithCancel(ctx)
1835+
defer cancelWatcher()
1836+
err := watcher.Start(watcherCtx, schemaTS)
1837+
if err != nil && !errors.Is(err, context.Canceled) {
1838+
return err
1839+
}
1840+
return nil
1841+
})
1842+
}
1843+
1844+
// Start the changefeed.
17801845
g.GoCtx(func(ctx context.Context) error {
1781-
defer close(confPoller)
1782-
return startDistChangefeed(ctx, jobExec, jobID, schemaTS, details, description,
1783-
initialHighWater, localState, startedCh, onTracingEvent, targets)
1846+
defer close(runningChangefeedChan)
1847+
if watcher != nil {
1848+
select {
1849+
case <-ctx.Done():
1850+
return ctx.Err()
1851+
case diffs := <-watcherChan:
1852+
schemaTS = diffs[0].AsOf
1853+
initialHighWater = diffs[0].AsOf
1854+
targets, err = AllTargets(ctx, details, execCfg, schemaTS)
1855+
if err != nil {
1856+
return err
1857+
}
1858+
}
1859+
}
1860+
return startDistChangefeed(ctx, jobExec, jobID, schemaTS, details, description, initialHighWater, localState, startedCh, onTracingEvent, targets)
17841861
})
1862+
1863+
// Poll for updated configuration or new database tables if hibernating.
17851864
g.GoCtx(func(ctx context.Context) error {
17861865
t := time.NewTicker(15 * time.Second)
17871866
defer t.Stop()
1867+
diffSent := false
17881868
for {
17891869
select {
17901870
case <-ctx.Done():
17911871
return ctx.Err()
1792-
case <-confPoller:
1872+
case <-runningChangefeedChan:
17931873
return nil
1794-
case <-t.C:
1874+
case tick := <-t.C:
17951875
newDest, err := reloadDest(ctx, jobID, execCfg)
17961876
if err != nil {
17971877
log.Changefeed.Warningf(ctx, "failed to check for updated configuration: %v", err)
17981878
} else if newDest != resolvedDest {
17991879
resolvedDest = newDest
18001880
return replanErr
18011881
}
1882+
if watcher != nil && !diffSent {
1883+
unchanged, diffs, err := watcher.PopUnchangedUpTo(ctx, hlc.Timestamp{WallTime: tick.UnixNano()})
1884+
if err != nil {
1885+
return err
1886+
}
1887+
if !unchanged && len(diffs) > 0 {
1888+
select {
1889+
case watcherChan <- diffs:
1890+
diffSent = true
1891+
cancelWatcher()
1892+
case <-ctx.Done():
1893+
return ctx.Err()
1894+
}
1895+
}
1896+
}
18021897
}
18031898
}
18041899
})

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12913,3 +12913,170 @@ func TestDatabaseLevelChangefeedChangingTableset(t *testing.T) {
1291312913
}
1291412914
})
1291512915
}
12916+
12917+
// TestDatabaseLevelChangefeedEmptyTableset tests that a database-level changefeed
12918+
// hibernates while there are no tables in the database.
12919+
func TestDatabaseLevelChangefeedEmptyTableset(t *testing.T) {
12920+
defer leaktest.AfterTest(t)()
12921+
defer log.Scope(t).Close(t)
12922+
12923+
testFnNoWait := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
12924+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
12925+
sqlDB.Exec(t, `CREATE DATABASE db`)
12926+
sqlDB.Exec(t, `GRANT CHANGEFEED ON DATABASE db TO enterprisefeeduser`)
12927+
dbcf := feed(t, f, `CREATE CHANGEFEED FOR DATABASE db`)
12928+
defer closeFeed(t, dbcf)
12929+
12930+
// create a table
12931+
sqlDB.Exec(t, `CREATE TABLE db.foo (a INT PRIMARY KEY, b STRING)`)
12932+
sqlDB.Exec(t, `INSERT INTO db.foo VALUES (0, 'initial')`)
12933+
12934+
assertPayloads(t, dbcf, []string{
12935+
`foo: [0]->{"after": {"a": 0, "b": "initial"}}`,
12936+
})
12937+
}
12938+
cdcTest(t, testFnNoWait, feedTestEnterpriseSinks)
12939+
12940+
testFnWait := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
12941+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
12942+
sqlDB.Exec(t, `CREATE DATABASE db`)
12943+
sqlDB.Exec(t, `GRANT CHANGEFEED ON DATABASE db TO enterprisefeeduser`)
12944+
dbcf := feed(t, f, `CREATE CHANGEFEED FOR DATABASE db`)
12945+
defer closeFeed(t, dbcf)
12946+
12947+
time.Sleep(5 * time.Second)
12948+
12949+
// create a table
12950+
sqlDB.Exec(t, `CREATE TABLE db.foo (a INT PRIMARY KEY, b STRING)`)
12951+
sqlDB.Exec(t, `INSERT INTO db.foo VALUES (0, 'initial')`)
12952+
12953+
assertPayloads(t, dbcf, []string{
12954+
`foo: [0]->{"after": {"a": 0, "b": "initial"}}`,
12955+
})
12956+
}
12957+
cdcTest(t, testFnWait, feedTestEnterpriseSinks)
12958+
}
12959+
12960+
// TestDatabaseLevelChangefeedFiltersHibernation tests that a database-level changefeed
12961+
// with include/exclude filters correctly handles hibernation:
12962+
// - With EXCLUDE filter: creating an excluded table should not wake the changefeed,
12963+
// but creating a non-excluded table should wake it.
12964+
// - With INCLUDE filter: creating a non-included table should not wake the changefeed,
12965+
// but creating an included table should wake it.
12966+
func TestDatabaseLevelChangefeedFiltersHibernation(t *testing.T) {
12967+
defer leaktest.AfterTest(t)()
12968+
defer log.Scope(t).Close(t)
12969+
12970+
full_filter := map[string]string{
12971+
"include": "EXCLUDE TABLES excluded_table",
12972+
"exclude": "INCLUDE TABLES included_table",
12973+
}
12974+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory, filterType string) {
12975+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
12976+
sqlDB.Exec(t, `CREATE DATABASE db`)
12977+
sqlDB.Exec(t, `GRANT CHANGEFEED ON DATABASE db TO enterprisefeeduser`)
12978+
12979+
// Create changefeed with exclude filter - should hibernate since no tables exist
12980+
createStmt := fmt.Sprintf(`CREATE CHANGEFEED FOR DATABASE db %s`, full_filter[filterType])
12981+
dbcf := feed(t, f, createStmt)
12982+
defer closeFeed(t, dbcf)
12983+
12984+
var jobID jobspb.JobID
12985+
if ef, ok := dbcf.(cdctest.EnterpriseTestFeed); ok {
12986+
jobID = ef.JobID()
12987+
} else {
12988+
t.Fatal("expected EnterpriseTestFeed")
12989+
}
12990+
12991+
// Get initial diagram count (should be 0 when hibernating, as no diagram is written yet)
12992+
getDiagramCount := func() int {
12993+
var count int
12994+
sqlDB.QueryRow(t,
12995+
`SELECT count(*) FROM system.job_info WHERE job_id = $1 AND info_key LIKE '~dsp-diag-url-%'`,
12996+
jobID,
12997+
).Scan(&count)
12998+
return count
12999+
}
13000+
13001+
testutils.SucceedsSoon(t, func() error {
13002+
var count int
13003+
sqlDB.QueryRow(t,
13004+
`SELECT count(*) FROM [SHOW CHANGEFEED JOB $1] WHERE running_status = 'running'`,
13005+
jobID,
13006+
).Scan(&count)
13007+
return nil
13008+
})
13009+
require.Equal(t, 0, getDiagramCount(), "changefeed should be hibernating (no diagram written)")
13010+
time.Sleep(20 * time.Second)
13011+
require.Equal(t, 0, getDiagramCount(), "changefeed should stay hibernating (no diagram written)")
13012+
13013+
// Create a table that is excluded - changefeed should stay hibernating
13014+
sqlDB.Exec(t, `CREATE TABLE db.excluded_table (a INT PRIMARY KEY, b STRING)`)
13015+
sqlDB.Exec(t, `INSERT INTO db.excluded_table VALUES (0, 'excluded')`)
13016+
13017+
// // Verify changefeed stays hibernating (no new diagram written)
13018+
time.Sleep(20 * time.Second)
13019+
require.Equal(t, 0, getDiagramCount(), "changefeed should stay hibernating (no new diagram written)")
13020+
13021+
// Create a table that is NOT excluded - changefeed should wake up
13022+
sqlDB.Exec(t, `CREATE TABLE db.included_table (a INT PRIMARY KEY, b STRING)`)
13023+
sqlDB.Exec(t, `INSERT INTO db.included_table VALUES (0, 'included')`)
13024+
13025+
// Wait for a new diagram to be written (indicating changefeed woke up)
13026+
time.Sleep(20 * time.Second)
13027+
require.Equal(t, 1, getDiagramCount(), "changefeed should wake up (new diagram written)")
13028+
13029+
// Should only receive events from the included table
13030+
assertPayloads(t, dbcf, []string{
13031+
`included_table: [0]->{"after": {"a": 0, "b": "included"}}`,
13032+
})
13033+
}
13034+
testutils.RunValues(t, "filterType", []string{"include", "exclude"}, func(t *testing.T, filterType string) {
13035+
runTestFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
13036+
testFn(t, s, f, filterType)
13037+
}
13038+
cdcTest(t, runTestFn, feedTestEnterpriseSinks)
13039+
})
13040+
}
13041+
13042+
// TestChangefeedWatcherCleanupOnStop verifies that the watcher context is properly
13043+
// cleaned up when a changefeed is stopped before receiving any table diffs.
13044+
// This is a regression test for context leaks in the watcher lifecycle.
13045+
func TestChangefeedWatcherCleanupOnStop(t *testing.T) {
13046+
defer leaktest.AfterTest(t)()
13047+
defer log.Scope(t).Close(t)
13048+
13049+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
13050+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
13051+
13052+
// Create a database with no tables. This will trigger the watcher
13053+
// since the changefeed targets the database but there are no tables yet.
13054+
sqlDB.Exec(t, `CREATE DATABASE db`)
13055+
sqlDB.Exec(t, `GRANT CHANGEFEED ON DATABASE db TO enterprisefeeduser`)
13056+
13057+
// Create a changefeed on the empty database. This will start the watcher
13058+
// but won't receive any diffs until a table is created.
13059+
feed, err := f.Feed(`CREATE CHANGEFEED FOR DATABASE db`)
13060+
require.NoError(t, err)
13061+
enterpriseFeed := feed.(cdctest.EnterpriseTestFeed)
13062+
13063+
// Wait for the changefeed job to be running to ensure the watcher
13064+
// goroutine has started.
13065+
waitForJobState(sqlDB, t, enterpriseFeed.JobID(), jobs.StateRunning)
13066+
13067+
// Cancel the changefeed job before any tables are created. This tests the
13068+
// scenario where the changefeed is stopped before the watcher receives diffs,
13069+
// which previously could lead to a context leak.
13070+
sqlDB.Exec(t, `CANCEL JOB $1`, enterpriseFeed.JobID())
13071+
13072+
// Wait for the job to be canceled.
13073+
waitForJobState(sqlDB, t, enterpriseFeed.JobID(), jobs.StateCanceled)
13074+
13075+
// Close the feed to ensure all resources are released.
13076+
require.NoError(t, feed.Close())
13077+
13078+
// The leaktest.AfterTest will verify that no goroutines or contexts leaked.
13079+
}
13080+
13081+
cdcTest(t, testFn, feedTestEnterpriseSinks)
13082+
}

pkg/sql/delegate/show_changefeed_jobs.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ SELECT
104104
database.name AS database_name
105105
FROM
106106
crdb_internal.jobs
107-
INNER JOIN targets ON job_id = targets.id
107+
LEFT JOIN targets ON job_id = targets.id
108108
INNER JOIN payload ON job_id = payload.id
109109
LEFT JOIN database ON job_id = database.id
110110
`

0 commit comments

Comments
 (0)