-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-36964] Fix potential exception when SchemaChange in parallel w… #3818
Conversation
Hi @lvyanquan, I have some concern about how BucketAssignOperator works with schema evolution stuff.
However, the bucket assigning strategy might break that assumption, where data sink writers might receive data change events with stale schema, even after external schema evolution processes have finished.
Basically same as described in #3680. In short, if a broadcast / custom partitioning topology is applied, then blocking one upstream partition will implicitly block all downstream partitions from handling events.
AFAIK all data change events have been hashed & distributed in PartitionOperator. Since adding a BucketAssignOperator does not change the parallelism, is there any reason we can't calculate the correct bucket partition ID in advance, instead of creating another partitioning topology? |
A little off topic, but we're really lacking E2e tests to real-life data sinks with complicated writing topologies. #3491 (with much higher testing pressure) might be necessary to expose more issues like this in the future. |
Hi @yuxiqian, could you help to review this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for @lvyanquan's great work, just left some comments.
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java
Outdated
Show resolved
Hide resolved
...tor-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEventSink.java
Outdated
Show resolved
Hide resolved
...r-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PreCommitOperator.java
Outdated
Show resolved
Hide resolved
...r-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PreCommitOperator.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java
Show resolved
Hide resolved
...rc/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/FlushEventAlignmentOperator.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/FlushEventAlignmentOperator.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/FlushEventAlignmentOperator.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's nice to have corresponding E2e cases to verify changes in this PR. Could @leonardBang @ruanhang1993 please trigger the CI workflow?
Last CI passed and rebase master to fix the conflict. |
@lvyanquan Would you like to rebase to latest master branch as conflicts happens? |
cf08b66
to
566fc6a
Compare
566fc6a
to
3a83d05
Compare
Rebased to master. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @lvyanquan for the contribution and @yuxiqian for the review, LGTM
Fix potential exception when SchemaChange in parallel with Paimon Sink.
This close FLINK-36964 and FLINK-35888.