diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java index 9590e1c4e13..d7da2c6150b 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java @@ -438,6 +438,27 @@ public void testAssignTableWithoutPrimaryKeyWithChunkKeyColumn() { assertEquals(expected, splits); } + public void testAssignTableWithPrimaryKeyWithChunkKeyColumnNotInPrimaryKey() { + String tableWithoutPrimaryKey = "customers"; + List expected = + Arrays.asList( + "customers_no_pk null [user_5]", + "customers_no_pk [user_5] [user_9]", + "customers_no_pk [user_9] [user_13]", + "customers_no_pk [user_13] [user_17]", + "customers_no_pk [user_17] [user_20]", + "customers_no_pk [user_20] null"); + List splits = + getTestAssignSnapshotSplits( + customerDatabase, + 4, + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + new String[] {tableWithoutPrimaryKey}, + "name"); + assertEquals(expected, splits); + } + @Test public void testEnumerateTablesLazily() { final MySqlSourceConfig configuration =