-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-35242] Supports per-SE type configuration & "lenient" evolution behavior #3339
Conversation
case EXCEPTION: | ||
return exceptionOnSchemaChange(input, parallelism); | ||
return exceptionOnSchemaChange( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have some questions about the behavior for IGNORE
and EXCEPTION
.
Now when we setting the behavior as IGNORE or EXCEPTION, the job will be failed, should it be fixed it or create a new PR for it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing it out! I think it would be better fix it in this PR since SchemaOperator & Registry will be greatly modified to implement FLINK-35242.
(Seems #3352 / FLINK-35432 is about an irrelevant problem about MySQL ddl parsing, and should be fine to be reviewed and merged independently.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree it, and i also point it in the issue with JIRA / https://issues.apache.org/jira/browse/FLINK-35436 and have another #3355
I think we can merge it into this PR.
4e4dbf5
to
a3c9efe
Compare
@PatrickRen @leonardBang PTAL |
6b13fe7
to
6959c47
Compare
7470fc4
to
079d531
Compare
a252a61
to
9ac25ef
Compare
Just added Since this PR has grown too big, hopefully this could be reviewed & merged soon to avoid any conflicts hassle in the future. cc @leonardBang |
@yuxiqian Thanks for the great work, could you rebase to latest master? |
Done, rebased with master branch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @yuxiqian for the great work, I finished the first round review and left some comments and I think you can start to address my comments.
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/DataSink.java
Outdated
Show resolved
Hide resolved
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/MetadataApplier.java
Outdated
Show resolved
Hide resolved
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventType.java
Show resolved
Hide resolved
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventType.java
Show resolved
Hide resolved
public static final SchemaChangeEventType[] ADD = {SchemaChangeEventType.ADD_COLUMN}; | ||
|
||
public static final SchemaChangeEventType[] ALTER = {SchemaChangeEventType.ALTER_COLUMN_TYPE}; | ||
|
||
public static final SchemaChangeEventType[] CREATE = {SchemaChangeEventType.CREATE_TABLE}; | ||
|
||
public static final SchemaChangeEventType[] DROP = {SchemaChangeEventType.DROP_COLUMN}; | ||
|
||
public static final SchemaChangeEventType[] RENAME = {SchemaChangeEventType.RENAME_COLUMN}; | ||
|
||
public static final SchemaChangeEventType[] TABLE = {SchemaChangeEventType.CREATE_TABLE}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you share the basis of the categories ? current hierarchy confuse me a bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For class names like AddColumnEvent
, I've splitten them like add.column
, and puts them into both add
and column
family.
...k-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/SchemaEvolveException.java
Outdated
Show resolved
Hide resolved
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/SchemaChangeBehavior.java
Show resolved
Hide resolved
...-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisDataSink.java
Outdated
Show resolved
Hide resolved
import java.util.Map; | ||
|
||
/** A collection class for handling metrics in {@link SchemaOperator}. */ | ||
public class SchemaOperatorMetrics { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can split the metric logic to another PR in next time, it's totally independent with SE behavior. But, I like the idea to report metrics for SchemaOperator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the reminder, I'll take notice on this next time.
b3cc000
to
1c6face
Compare
Squashed & Rebased with Since this also changes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @yuxiqian for the great work, the PR generally looks good to me, I just left some minor comments
...-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
Show resolved
Hide resolved
...k-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/SchemaEvolveException.java
Outdated
Show resolved
Hide resolved
...k-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/SchemaEvolveException.java
Outdated
Show resolved
Hide resolved
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventType.java
Outdated
Show resolved
Hide resolved
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventType.java
Show resolved
Hide resolved
...va/org/apache/flink/cdc/runtime/operators/schema/event/ApplyEvolvedSchemaChangeResponse.java
Outdated
Show resolved
Hide resolved
...va/org/apache/flink/cdc/runtime/operators/schema/event/ApplyUpstreamSchemaChangeRequest.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/flink/cdc/runtime/operators/schema/event/ApplyEvolvedSchemaChangeRequest.java
Show resolved
Hide resolved
...eline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java
Outdated
Show resolved
Hide resolved
… evolution behavior & Add schema operator metrics # Conflicts: # flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManager.java # flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TableChangeInfo.java
...va/org/apache/flink/cdc/runtime/operators/schema/event/ApplyOriginalSchemaChangeRequest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @yuxiqian for the great work, LGTM
… schema evolution behavior This closes apache#3339.
… schema evolution behavior This closes apache#3339.
This closes FLINK-35242.
TRY_EVOLVE
behavior that tolerates exception during metadata applying processinclude.schema.changes
andexclude.schema.changes
sink optionWith this change, Schema Operator now stores both "upstream" schema (which always keep up with the structure coming from pipeline source after the transformation) and "evolved" schema (the schema that actually applied to sink). They might differ when:
IGNORE
behavior)TRY_EVOLVE
mode)If these cases occur,
SchemaOperator
needs to cast upstream schema to match the actual evolved version (by adding / removing columns, converting types, etc.) with tolerance (fill innull
if meaningful casts are not possible.)