-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-35272][cdc][runtime] Pipeline Transform job supports omitting / renaming calculation column #3285
Conversation
This PR is still in very early progress, looking for @aiwenmo & @lvyanquan's comments. |
c448e89
to
31a7c1d
Compare
Updated based on previous comments, cc @aiwenmo |
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorTest.java
Outdated
Show resolved
Hide resolved
Thanks for @aiwenmo's kindly review, addressed comments above. |
...me/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
Outdated
Show resolved
Hide resolved
...me/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
Outdated
Show resolved
Hide resolved
...runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformers.java
Outdated
Show resolved
Hide resolved
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
Outdated
Show resolved
Hide resolved
ec700df
to
eb1dac3
Compare
Thanks @aiwenmo for reviewing, I've addressed your comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @yuxiqian for this, I've left some comments.
...me/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformProcessor.java
Outdated
Show resolved
Hide resolved
...me/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformProcessor.java
Show resolved
Hide resolved
...e/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformProcessor.java
Outdated
Show resolved
Hide resolved
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java
Outdated
Show resolved
Hide resolved
9274868
to
cb119cd
Compare
Thanks for @lvyanquan's comments! Addressed your comments in latest commits. |
...src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
Outdated
Show resolved
Hide resolved
abb397f
to
86515a1
Compare
2859296
to
d221efa
Compare
Rebased to latest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @yuxiqian for the update, LGTM now
…rhaul # Conflicts: # flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java # Conflicts: # flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java # flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
Somehow this has been fixed in FLINK-35272. Just added an E2e case to verify if it works as expected.
Seems CI is failing due to an expired link in Doris docs. Pushed another commit to fix this. |
…omputed column This closes apache#3285.
…omputed column This closes apache#3285.
This closes FLINK-35272, FLINK-35774, FLINK-35852.
Currently, pipeline jobs with transform (including projection and filtering) are constructed with the following topology:
where schema projections are applied in
SchemaTransformOp
and data projection & filtering are applied inDataTransformOp
. The idea isSchemaTransformOp
might be embedded inSources
in the future to reduce payload data size transferred in Flink Job.However, current implementation has a known defect that omits unused columns too early, causing some downstream-relied columns got removed after they arrived in
DataTransformOp
. See a example as follows:Such transformation rules will fail since
name
andage
columns are removed inSchemaTransformOp
, and those data rows could not be retrieved inDataTransformOp
, where the actual expression evaluation and filtering comes into effect.This PR introduces a new design, renaming the transform topology as follows:
where the
PreTransformOp
filters out columns, but only if:Referenced columns will be generated with exact same order as in the original schema. All schema and data events about those temporarily-referenced columns will be omitted after
PostTransformOp
. For example, given the following transform rule:PreTransformOp
will yield an intermediate schema(ID INT NOT NULL, AGE INT)
and corresponding trimmed data records to downstream. Calculated columns (newage
here) will not be created then since they haven't been evaluated here; Unused columns (name
here) will be removed as early as possible.If a column is explicity written down, it will be passed to downstream as-is. But for referenced columns, a special prefix will be added to their names. In the example above, a schema like[id, newname, __PREFIX__name, __PREFIX__age]
will be generated to downstream. Notice that the expression evaluation and filtering will not come into effect for now, so aDataChangeEvent
would be like[1, null, 'Alice', 19]
.Adding prefix is meant to deal with such cases:Here we need to distinguish the calculated column(new) name
and the referenced original column(old) name
. So after the name mangling process the schema would be like:[id, name, __PREFIX__name]
.Also, the filtering process is still done inPostTransformOp
since user could write down a filter expression that references calculated column, but their value won't be available untilPostTransformOp
's evaluation. It also means in the following somewhat ambigious case:The filtering expression is applied to the calculatedage
column (doubled!) instead of the original one.Now, any calculated column referenced in filtering column will be rewritten as its original definition. For example, the following transform rule:
...will be rewritten as follows:
Hence, no calculated columns need to be evaluated before filtering process.