-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-36701][cdc-runtime] Add steps to get and emit schemaManager's latest evolvedSchema when SinkDataWriterOperator handles FlushEvent #3802
[FLINK-36701][cdc-runtime] Add steps to get and emit schemaManager's latest evolvedSchema when SinkDataWriterOperator handles FlushEvent #3802
Conversation
...untime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java
Outdated
Show resolved
Hide resolved
20a00d4
to
705d763
Compare
Thanks for @Jzjsnow's clear and detailed diagram! Just noticed that we still lack failover tests for pipeline jobs. Could we add some recovering tests like |
596fa99
to
468feec
Compare
@yuxiqian Thanks for the suggestion, I have added some tests to the |
Here it looks like we have once again encountered a |
It's worrying to notice similar OceanBase test case failures here, but seems irrelevant to this PR. Will investigate this. |
@Jzjsnow Should be fixed now... please rebase to master branch. |
…latest evolvedSchema when SinkDataWriterOperator handles FlushEvent.
468feec
to
84bf4d0
Compare
@yuxiqian Thanks for the quick fix, now we've rebased to the master branch. |
The Here is the error log:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for @Jzjsnow's great work, left some minor comments.
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/FlushEvent.java
Outdated
Show resolved
Hide resolved
...source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java
Outdated
Show resolved
Hide resolved
e0a3e30
to
c7de55b
Compare
…ager's latest evolvedSchema when SinkDataWriterOperator handles FlushEvent.
…ager's latest evolvedSchema when SinkDataWriterOperator handles FlushEvent.
…ager's latest evolvedSchema when SinkDataWriterOperator handles FlushEvent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for @Jzjsnow's quick response! Just left some trivial comments, please take a look when available.
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/FlushEvent.java
Outdated
Show resolved
Hide resolved
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/FlushEvent.java
Outdated
Show resolved
Hide resolved
/** Flag indicating whether the FlushEvent is sent before a create table event. */ | ||
private final SchemaChangeEventType schemaChangeEventType; | ||
|
||
public FlushEvent(int sourceSubTaskId, SchemaChangeEventType schemaChangeEventType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems this constructor should be @VisibleForTesting
. But I'm not quite sure why BucketWrapperFlushEvent
is also calling this, since it might lose track of the tableIds
field if sink declares an extra bucket assigning topology.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the careful review. In order for the BucketWrapperFlushEvent
to retain the full FlushEvent
's information, the sink table IDs and schema change event type are passed in its constructor. So this constructor for FlushEvent
is no longer necessary and is removed.
...java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java
Outdated
Show resolved
Hide resolved
...runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/EventSerializerTest.java
Outdated
Show resolved
Hide resolved
...test/java/org/apache/flink/cdc/runtime/serializer/event/PartitioningEventSerializerTest.java
Outdated
Show resolved
Hide resolved
…ager's latest evolvedSchema when SinkDataWriterOperator handles FlushEvent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. Would @leonardBang like to take a look?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, directly after a failover, when the pipeline first handles a schema change event (e.g. AddColumnEvent) and then a DataChangeEvent, it may cause the job to fail again as sink has repeatedly applied that schema change.
The issue is revealed as follows:
![before](https://private-user-images.githubusercontent.com/25243041/396093948-c7675072-00fb-4d95-a577-a333d05d3dc0.jpg?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MzkzNzU0ODcsIm5iZiI6MTczOTM3NTE4NywicGF0aCI6Ii8yNTI0MzA0MS8zOTYwOTM5NDgtYzc2NzUwNzItMDBmYi00ZDk1LWE1NzctYTMzM2QwNWQzZGMwLmpwZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNTAyMTIlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjUwMjEyVDE1NDYyN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWE1MDc4M2NhMjQ5MmNiYjA0OTE3ODliY2Q5YzQ3ZjYwMzgwZGNkZWE2YzlhZjVkMTZmYjQyOTJlNWU0ODFkZmImWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0In0.bln3lDXCu9ypN51tuPGho84HV8rHfNUQWFP7_2yQZ7M)
I add steps to get and emit schemaManager's latest evolvedSchema when SinkDataWriterOperator handles FlushEvent. If the operator doesn't have a local cache of the schema when handling the flushevent, it will request Schema Manager's latest evolved schema. At this point the evolved schema is the same with the sink table's, as the schema change event hasn't been applied to the evolved schema by the master yet.
Now the new process is changed as follows:
![after](https://private-user-images.githubusercontent.com/25243041/396094024-23b54e3d-132b-4b15-a777-02d96c9f009a.jpg?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MzkzNzU0ODcsIm5iZiI6MTczOTM3NTE4NywicGF0aCI6Ii8yNTI0MzA0MS8zOTYwOTQwMjQtMjNiNTRlM2QtMTMyYi00YjE1LWE3NzctMDJkOTZjOWYwMDlhLmpwZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNTAyMTIlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjUwMjEyVDE1NDYyN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTljN2FjOWYyOWY1ZTJjMTQ0YWU4MTY5NjVjZjgwNjEyN2Q4MTAzNjllOThjMTMzZjhjYjhjNWUxMDQ0MzcxNzUmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0In0.M9LlcWLZQ-wIaSHLXCV7zfaJBxJ5aZPnzS0X-oAKXaQ)