-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-35740][mysql] Allow column as chunk key even if not in Primary Keys #3448
[FLINK-35740][mysql] Allow column as chunk key even if not in Primary Keys #3448
Conversation
PTAL @leonardBang @lvyanquan @PatrickRen 🙇🏻♂️🙇🏻♂️ |
I was wondering whether to allow a column that is not a primary key as a chunk key. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps testAssignCompositePkTableWithWrongChunkKeyColumn
test case should be updated, too?
yes! I have to change those test code. |
…WrongChunkKeyColumn
…lSnapshotSplitAssignerTest
…eWithoutPrimaryKeyWithChunkKeyColumn
|
Generally LGTM, but my concern is arbitrarily chosen columns can be risky, since chunking by columns without index is very slow. I think we need to update related documentations and address warnings about this. Looking forward to thoughts from @SML0127. |
From flink cdc v2.4.0, we have enabled to use incremental snapshots for tables without primary keys. I will update documentations from this PR, too. |
…heck splitKeyRange using chunkKeyStruct
45106d4
to
09e65e1
Compare
…r table with primary key
@leonardBang PTAL |
@leonardBang @yuxiqian PTAL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! Just left a minor comment about unit testing.
@Test | ||
public void testAssignTableWithoutPrimaryKeyWithChunkKeyColumn() { | ||
String tableWithoutPrimaryKey = "customers_no_pk"; | ||
List<String> expected = | ||
Arrays.asList( | ||
"customers_no_pk null [462]", | ||
"customers_no_pk [462] [823]", | ||
"customers_no_pk [823] [1184]", | ||
"customers_no_pk [1184] [1545]", | ||
"customers_no_pk [1545] [1906]", | ||
"customers_no_pk [1906] null"); | ||
List<String> splits = | ||
getTestAssignSnapshotSplits( | ||
customerDatabase, | ||
4, | ||
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), | ||
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), | ||
new String[] {tableWithoutPrimaryKey}, | ||
"id"); | ||
assertEquals(expected, splits); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice if we can also test using non-primary key columns as chunk keys for table with primary keys.
…ColumnNotInPrimaryKey
...a/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
Show resolved
Hide resolved
@leonardBang @ruanhang1993 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@SML0127 Thanks for the update, looks generally to me, just one minor comment that could you update the doc under docs/content.zh directory as well?
@leonardBang |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @SML0127 for the update, LGTM
…'s not primary key This closes apache#3448.
…'s not primary key This closes apache#3448.
…'s not primary key This closes apache#3448.
Allow column as chunk key even if it is not in the primary keys.
There are cases where the primary key is not a numeric type, such as varchar or varbinary.
In this case, the
distributed factor
andchunk range
may be calculated incorrectly, resulting in one chunk containing too many records.But there was no conditions for restrict column type of primary keys or input chunk key, so this may cause out of memory in the flink task manager.
Actually, in our company, there was a mysql tables that PK is a varbinary column and the above situation occurred.
https://issues.apache.org/jira/browse/FLINK-35740