-
Notifications
You must be signed in to change notification settings - Fork 4k
changefeedccl: block db-level changefeed creation for non-empty tableset #156771
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
changefeedccl: block db-level changefeed creation for non-empty tableset #156771
Conversation
c12885a to
fe6ddd7
Compare
Potential Bug(s) DetectedThe three-stage Claude Code analysis has identified potential bug(s) in this PR that may warrant investigation. Next Steps: Note: When viewing the workflow output, scroll to the bottom to find the Final Analysis Summary. After you review the findings, please tag the issue as follows:
|
fe6ddd7 to
624bb89
Compare
Potential Bug(s) DetectedThe three-stage Claude Code analysis has identified potential bug(s) in this PR that may warrant investigation. Next Steps: Note: When viewing the workflow output, scroll to the bottom to find the Final Analysis Summary. After you review the findings, please tag the issue as follows:
|
624bb89 to
109f8b0
Compare
e4b7882 to
80d8f1b
Compare
Potential Bug(s) DetectedThe three-stage Claude Code analysis has identified potential bug(s) in this PR that may warrant investigation. Next Steps: Note: When viewing the workflow output, scroll to the bottom to find the Final Analysis Summary. After you review the findings, please tag the issue as follows:
|
80d8f1b to
1933e2b
Compare
Potential Bug(s) DetectedThe three-stage Claude Code analysis has identified potential bug(s) in this PR that may warrant investigation. Next Steps: Note: When viewing the workflow output, scroll to the bottom to find the Final Analysis Summary. After you review the findings, please tag the issue as follows:
|
1933e2b to
ace2477
Compare
97fbe59 to
3bd2f5b
Compare
| DatabaseID: dbID, | ||
| } | ||
| if details.TargetSpecifications[0].FilterList != nil { | ||
| if details.TargetSpecifications[0].FilterList.FilterType == tree.IncludeFilter && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: let's have this as the first conditional since we're doing it for both cases anyway.
if details.TargetSpecifications[0].FilterList.Tables != nil {
continue
}
| details.TargetSpecifications[0].FilterList.Tables != nil { | ||
| filter.IncludeTables = make(map[string]struct{}) | ||
| for fqTableName := range details.TargetSpecifications[0].FilterList.Tables { | ||
| // Extract just the table name from the fully qualified name (e.g., "db.public.table" -> "table") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We definitely want to keep at least the schema name. Will need to wait for @asg0451 's change to the tableset watcher to know what to pass in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll update this once that inflight change is ready
| return err | ||
| } | ||
|
|
||
| // Watcher is only used for db-level changefeeds with no watched tables. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But eventually a db-level changefeed should always have a watcher right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes right, but there will be a different watcher for running changefeeds. Changed this comment to make clear I'm talking about this watcher.
| // Create a watcher for the database. | ||
| watcher = tableset.NewWatcher(filter, execCfg, watcherMemMonitor, int64(jobID)) | ||
| g.GoCtx(func(ctx context.Context) error { | ||
| go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not great with gorutines, is there a reason why we do this instead of calling cancelWatcher when watcher.Start(watcherCtx, schemaTS) returns without error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A tableset watcher runs continuously until its context is canceled.
The goroutines in resumeWithRetries share a context from their context group. The watcher is given its own, different context, so it can be canceled (shutting down the watcher) when a diff is found and the other goroutines will continue running.
This goroutine is to stop the watcher when the context of the group is canceled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in crl source code, we should not be spinning off raw goroutines. But to step back, i'm a bit confused by the purpose of this pr. The title is " block db-level changefeed creation for non-empty tableset". Can't we simply fast fail the changefeed if there are no tables in the database at creation time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To the purpose of this PR, for a database-level changefeed, we don't want the changefeed to fail if there are no tables in the database. We want to wait until a table is added and then plan/run the changefeed. Either in the case of a changefeed created on an initially-empty database, or a running db-level changefeed that restarts when a table in the database is dropped.
3bd2f5b to
773a7b7
Compare
Potential Bug(s) DetectedThe three-stage Claude Code analysis has identified potential bug(s) in this PR that may warrant investigation. Next Steps: Note: When viewing the workflow output, scroll to the bottom to find the Final Analysis Summary. After you review the findings, please tag the issue as follows:
|
65179dc to
44dc6f3
Compare
Potential Bug(s) DetectedThe three-stage Claude Code analysis has identified potential bug(s) in this PR that may warrant investigation. Next Steps: Note: When viewing the workflow output, scroll to the bottom to find the Final Analysis Summary. After you review the findings, please tag the issue as follows:
|
| filter := tableset.Filter{ | ||
| DatabaseID: dbID, | ||
| } | ||
| if details.TargetSpecifications[0].FilterList != nil && details.TargetSpecifications[0].FilterList.Tables != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FilterList.Tables is non nullable so you should check for the length instead
| } else if details.TargetSpecifications[0].FilterList.FilterType == tree.ExcludeFilter { | ||
| filter.ExcludeTables = make(map[string]struct{}) | ||
| for fqTableName := range details.TargetSpecifications[0].FilterList.Tables { | ||
| // Extract just the table name from the fully qualified name (e.g., "db.public.table" -> "table") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Flagging this for removal before merge.
44dc6f3 to
5d333ec
Compare
|
|
||
| // This watcher is only used for db-level changefeeds with no watched tables. | ||
| var watcher *tableset.Watcher | ||
| watcherChan := make(chan []tableset.TableDiff, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why buffer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the poller goroutine doesn't block on sending the diff and it can keep polling. Though the goroutine waiting for the diff, before starting the changefeed, should already be listening on this channel.
| if len(details.TargetSpecifications) == 1 && | ||
| details.TargetSpecifications[0].Type == jobspb.ChangefeedTargetSpecification_DATABASE && | ||
| targets.NumUniqueTables() == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you make this a helper fn or method so we can standardize the check
| tn, err := parser.ParseQualifiedTableName(fqTableName) | ||
| if err != nil { | ||
| log.Changefeed.Warningf(ctx, "failed to parse table name %q: %v", fqTableName, err) | ||
| continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why ignore the error? i know its temporary but etc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Changed to propogate.
| if err != nil && !errors.Is(err, context.Canceled) { | ||
| return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why return nil on ctx cancel
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the poller goroutine is calling cancelWatcher to stop the watcher. Then we don't want to propagate that error up and cancel the goroutines in this context group that're running startDistChangefeed and the poller. Although maybe then it's better for the poller to signal this goroutine and it will know whether to propagate the context canceled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm calling cancelWatcher to stop the watcher, once a diff has been found. But the other goroutines in this context group need to keep running. So I was trying to not fail the whole context group when the watcher is canceled.
I just changed to move the variable tracking whether a diff was sent from the watcher to the changefeedResumer struct, so it can be shared between the goroutines.
| }) | ||
| } | ||
|
|
||
| // Start the changefeed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you improve this comment by talking about how we might wait for etc
| select { | ||
| case <-ctx.Done(): | ||
| return ctx.Err() | ||
| case diffs := <-watcherChan: | ||
| schemaTS = diffs[0].AsOf | ||
| initialHighWater = diffs[0].AsOf | ||
| targets, err = AllTargets(ctx, details, execCfg, schemaTS) | ||
| if err != nil { | ||
| return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so we wait for the first event from the watcher before starting? i dont think that's right... what if the tables are already there and none change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
startDistChangefeed is outside of the if watcher != nil. There's only a watcher created if needed, so it would otherwise go right into running the changefeed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the conditional check for this if statement so it's more clear. But it will only enter into this select statement when there was an empty tableset and the watcher was created. If there's already tables, it will just go straight to running startDistChangefeed.
| case <-ctx.Done(): | ||
| return ctx.Err() | ||
| case <-confPoller: | ||
| case <-runningChangefeedChan: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: i dont love this rename
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to change since it isn't just polling for updated configuration now. It's also polling the changefeed watcher. This channel is just blocking until the goroutine running the changefeed returns. How do you feel about changefeedDoneCh?
| if watcher != nil && !diffSent { | ||
| unchanged, diffs, err := watcher.PopUnchangedUpTo(ctx, hlc.Timestamp{WallTime: tick.UnixNano()}) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| if !unchanged && len(diffs) > 0 { | ||
| select { | ||
| case watcherChan <- diffs: | ||
| diffSent = true | ||
| cancelWatcher() | ||
| case <-ctx.Done(): | ||
| return ctx.Err() | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why start up the watcher here so it runs every tick unless diffsent? surely its simpler to do it another way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PopUnchangedUpTo is the correct way to consume from the tableset watcher, right? I clarified the condition here, replacing watcher != nil with a variable to make clear that this and other code is only run if there was an empty tableset for a db-level changefeed.
So, if this is a db-level changefeed waiting for a non-empty tableset, it will keep checking for diffs until one is found.
| // Close the feed to ensure all resources are released. | ||
| require.NoError(t, feed.Close()) | ||
|
|
||
| // The leaktest.AfterTest will verify that no goroutines or contexts leaked. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: unnecessary comment, and also throughout this test. is this ai generated? if so please give it a once over after to make sure it complies with our code guidelines (in this case, comments should say why not what (unless its really unclear))
5d333ec to
9f0ce38
Compare
Potential Bug(s) DetectedThe three-stage Claude Code analysis has identified potential bug(s) in this PR that may warrant investigation. Next Steps: Note: When viewing the workflow output, scroll to the bottom to find the Final Analysis Summary. After you review the findings, please tag the issue as follows:
|
|
Most recent Claude code review raised concern about a race condition: cancelWatcher invoked in one goroutine before being set by another goroutine. This can't happen, since there's the additional synchronization that watcher.PopUnchangedTo won't return anything before watcher.Start is invoked. The goroutine which invokes cancelWatcher can't do so until it receives diff, which must be preceded by creating the watcher context and then starting the watcher. |
On startup, the changefeed should not be started until there exist tables in the database to watch. If no tables exist at the time of startup, then a tableset watcher is created and polled; the changefeed will be started from where there is first a target table. This patch also makes a change to SHOW CHANGEFEED JOBS to show changefeed jobs with no target tables (ie db-level changefeed waiting for a table to be created). Fixes: cockroachdb#147371 Epic: CRDB-1421 Release note: none
9f0ce38 to
488d871
Compare
Potential Bug(s) DetectedThe three-stage Claude Code analysis has identified potential bug(s) in this PR that may warrant investigation. Next Steps: Note: When viewing the workflow output, scroll to the bottom to find the Final Analysis Summary. After you review the findings, please tag the issue as follows:
|
On startup, the changefeed should not be started until there exist tables in the database to watch. If no tables exist at the time of startup, then a tableset watcher is created and polled; the changefeed will be started from where there is first a target table.