Skip to content

Commit 91fc7bb

Browse files
authored
[FLINK-35740][cdc-connector][mysql] Allow column as chunk key even it's not primary key
This closes #3448.
1 parent ea71b23 commit 91fc7bb

File tree

7 files changed

+69
-28
lines changed

7 files changed

+69
-28
lines changed

docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -500,7 +500,7 @@ CREATE TABLE products (
500500
* (3)在快照读取之前,Source 不需要数据库锁权限。
501501

502502
如果希望 source 并行运行,则每个并行 reader 都应该具有唯一的 server id,因此`server id`的范围必须类似于 `5400-6400`
503-
且范围必须大于并行度。在增量快照读取过程中,MySQL CDC Source 首先通过表的主键将表划分成多个块(chunk)
503+
且范围必须大于并行度。在增量快照读取过程中,MySQL CDC Source 源首先会根据您指定的表块键将表分块(chunk)
504504
然后 MySQL CDC Source 将多个块分配给多个 reader 以并行读取表的数据。
505505

506506
#### 并发读取
@@ -550,7 +550,7 @@ MySQL 集群中你监控的服务器出现故障后, 你只需将受监视的服
550550

551551
当 MySQL CDC Source 启动时,它并行读取表的快照,然后以单并行度的方式读取表的 binlog。
552552

553-
在快照阶段,根据表的主键和表行的大小将快照切割成多个快照块
553+
在快照阶段,快照会根据表的分块键和表行的大小切割成多个快照块
554554
快照块被分配给多个快照读取器。每个快照读取器使用 [区块读取算法](#snapshot-chunk-reading) 并将读取的数据发送到下游。
555555
Source 会管理块的进程状态(完成或未完成),因此快照阶段的 Source 可以支持块级别的 checkpoint。
556556
如果发生故障,可以恢复 Source 并继续从最后完成的块中读取块。
@@ -565,7 +565,9 @@ Flink 定期为 Source 执行 checkpoint,在故障转移的情况下,作业
565565

566566
在执行增量快照读取时,MySQL CDC source 需要一个用于分片的的算法。
567567
MySQL CDC Source 使用主键列将表划分为多个分片(chunk)。 默认情况下,MySQL CDC source 会识别表的主键列,并使用主键中的第一列作为用作分片列。
568-
如果表中没有主键, 增量快照读取将失败,你可以禁用 `scan.incremental.snapshot.enabled` 来回退到旧的快照读取机制。
568+
如果表中没有主键,用户必须指定 `scan.incremental.snapshot.chunk.key-column`
569+
否则增量快照读取将失败,你可以禁用 `scan.incremental.snapshot.enabled` 恢复到旧的快照读取机制。
570+
请注意,使用不在主键中的列作为分块键可能会降低表的查询性能。
569571

570572
对于数值和自动增量拆分列,MySQL CDC Source 按固定步长高效地拆分块。
571573
例如,如果你有一个主键列为`id`的表,它是自动增量 BIGINT 类型,最小值为`0`,最大值为`100`

docs/content/docs/connectors/flink-sources/mysql-cdc.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,7 @@ Incremental snapshot reading is a new mechanism to read snapshot of a table. Com
530530
If you would like the source run in parallel, each parallel reader should have an unique server id, so the 'server-id' must be a range like '5400-6400',
531531
and the range must be larger than the parallelism.
532532

533-
During the incremental snapshot reading, the MySQL CDC Source firstly splits snapshot chunks (splits) by primary key of table,
533+
During the incremental snapshot reading, the MySQL CDC Source firstly splits snapshot chunks (splits) by user specified chunk key of table,
534534
and then MySQL CDC Source assigns the chunks to multiple readers to read the data of snapshot chunk.
535535

536536
#### Controlling Parallelism
@@ -580,7 +580,7 @@ The CDC job may restart fails in this case. So the heartbeat event will help upd
580580

581581
When the MySQL CDC source is started, it reads snapshot of table parallelly and then reads binlog of table with single parallelism.
582582

583-
In snapshot phase, the snapshot is cut into multiple snapshot chunks according to primary key of table and the size of table rows.
583+
In snapshot phase, the snapshot is cut into multiple snapshot chunks according to chunk key of table and the size of table rows.
584584
Snapshot chunks is assigned to multiple snapshot readers. Each snapshot reader reads its received chunks with [chunk reading algorithm](#snapshot-chunk-reading) and send the read data to downstream.
585585
The source manages the process status (finished or not) of chunks, thus the source of snapshot phase can support checkpoint in chunk level.
586586
If a failure happens, the source can be restored and continue to read chunks from last finished chunks.
@@ -596,7 +596,9 @@ Flink performs checkpoints for the source periodically, in case of failover, the
596596

597597
When performing incremental snapshot reading, MySQL CDC source need a criterion which used to split the table.
598598
MySQL CDC Source use a splitting column to split the table to multiple splits (chunks). By default, MySQL CDC source will identify the primary key column of the table and use the first column in primary key as the splitting column.
599-
If there is no primary key in the table, incremental snapshot reading will fail and you can disable `scan.incremental.snapshot.enabled` to fallback to old snapshot reading mechanism.
599+
If there is no primary key in the table, user must specify `scan.incremental.snapshot.chunk.key-column`,
600+
otherwise incremental snapshot reading will fail and you can disable `scan.incremental.snapshot.enabled` to fallback to old snapshot reading mechanism.
601+
Please note that using a column not in primary key as a chunk key can result in slower table query performance.
600602

601603
For numeric and auto incremental splitting column, MySQL CDC Source efficiently splits chunks by fixed step length.
602604
For example, if you had a table with a primary key column of `id` which is auto-incremental BIGINT type, the minimum value was `0` and maximum value was `100`,

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/JdbcSourceOptions.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,5 @@ public class JdbcSourceOptions extends SourceOptions {
109109
.noDefaultValue()
110110
.withDescription(
111111
"The chunk key of table snapshot, captured tables are split into multiple chunks by a chunk key when read the snapshot of table."
112-
+ "By default, the chunk key is the first column of the primary key and the chunk key is the RowId in oracle."
113-
+ "This column must be a column of the primary key.");
112+
+ "By default, the chunk key is the first column of the primary key and the chunk key is the RowId in oracle.");
114113
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -243,8 +243,7 @@ public class MySqlSourceOptions {
243243
.noDefaultValue()
244244
.withDescription(
245245
"The chunk key of table snapshot, captured tables are split into multiple chunks by a chunk key when read the snapshot of table."
246-
+ "By default, the chunk key is the first column of the primary key."
247-
+ "This column must be a column of the primary key.");
246+
+ "By default, the chunk key is the first column of the primary key.");
248247

249248
@Experimental
250249
public static final ConfigOption<Boolean> SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED =

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,8 @@ public static RowType getChunkKeyColumnType(Column chunkKeyColumn) {
5757

5858
/**
5959
* Get the chunk key column. This column could be set by `chunkKeyColumn`. If the table doesn't
60-
* have primary keys, `chunkKeyColumn` must be set. If the table has primary keys,
61-
* `chunkKeyColumn` must be a column of them or else null. When the parameter `chunkKeyColumn`
62-
* is not set and the table has primary keys, return the first column of primary keys.
60+
* have primary keys, `chunkKeyColumn` must be set. When the parameter `chunkKeyColumn` is not
61+
* set and the table has primary keys, return the first column of primary keys.
6362
*/
6463
public static Column getChunkKeyColumn(Table table, Map<ObjectPath, String> chunkKeyColumns) {
6564
List<Column> primaryKeys = table.primaryKeyColumns();
@@ -68,7 +67,8 @@ public static Column getChunkKeyColumn(Table table, Map<ObjectPath, String> chun
6867
throw new ValidationException(
6968
"'scan.incremental.snapshot.chunk.key-column' must be set when the table doesn't have primary keys.");
7069
}
71-
List<Column> searchColumns = primaryKeys.isEmpty() ? table.columns() : primaryKeys;
70+
71+
List<Column> searchColumns = table.columns();
7272
if (chunkKeyColumn != null) {
7373
Optional<Column> targetColumn =
7474
searchColumns.stream()
@@ -79,9 +79,8 @@ public static Column getChunkKeyColumn(Table table, Map<ObjectPath, String> chun
7979
}
8080
throw new ValidationException(
8181
String.format(
82-
"Chunk key column '%s' doesn't exist in the %s [%s] of the table %s.",
82+
"Chunk key column '%s' doesn't exist in the columns [%s] of the table %s.",
8383
chunkKeyColumn,
84-
primaryKeys.isEmpty() ? "user specified columns" : "primary keys",
8584
searchColumns.stream()
8685
.map(Column::name)
8786
.collect(Collectors.joining(",")),

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,7 @@ public static Object[] rowToArray(ResultSet rs, int size) throws SQLException {
8282
}
8383

8484
public static Struct getStructContainsChunkKey(SourceRecord record) {
85-
// If the table has primary keys, chunk key is in the record key struct
86-
if (record.key() != null) {
87-
return (Struct) record.key();
88-
}
89-
90-
// If the table doesn't have primary keys, chunk key is in the after struct for insert or
85+
// Use chunk key in the after struct for insert or
9186
// the before struct for delete/update
9287
Envelope.Operation op = Envelope.operationFor(record);
9388
Struct value = (Struct) record.value();
@@ -109,9 +104,9 @@ public static void upsertBinlog(
109104
if (isDataChangeRecord(binlogRecord)) {
110105
Struct value = (Struct) binlogRecord.value();
111106
if (value != null) {
112-
Struct keyStruct = getStructContainsChunkKey(binlogRecord);
107+
Struct chunkKeyStruct = getStructContainsChunkKey(binlogRecord);
113108
if (splitKeyRangeContains(
114-
getSplitKey(splitBoundaryType, nameAdjuster, keyStruct),
109+
getSplitKey(splitBoundaryType, nameAdjuster, chunkKeyStruct),
115110
splitStart,
116111
splitEnd)) {
117112
boolean hasPrimaryKey = binlogRecord.key() != null;
@@ -124,7 +119,7 @@ public static void upsertBinlog(
124119
snapshotRecords,
125120
binlogRecord,
126121
hasPrimaryKey
127-
? keyStruct
122+
? (Struct) binlogRecord.key()
128123
: createReadOpValue(
129124
binlogRecord, Envelope.FieldName.AFTER),
130125
false);
@@ -152,15 +147,15 @@ public static void upsertBinlog(
152147
upsertBinlog(
153148
snapshotRecords,
154149
binlogRecord,
155-
hasPrimaryKey ? keyStruct : structFromAfter,
150+
hasPrimaryKey ? (Struct) binlogRecord.key() : structFromAfter,
156151
false);
157152
break;
158153
case DELETE:
159154
upsertBinlog(
160155
snapshotRecords,
161156
binlogRecord,
162157
hasPrimaryKey
163-
? keyStruct
158+
? (Struct) binlogRecord.key()
164159
: createReadOpValue(
165160
binlogRecord, Envelope.FieldName.BEFORE),
166161
true);

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ public void testAssignCompositePkTableWithWrongChunkKeyColumn() {
166166
assertTrue(
167167
ExceptionUtils.findThrowableWithMessage(
168168
t,
169-
"Chunk key column 'errorCol' doesn't exist in the primary keys [card_no,level] of the table")
169+
"Chunk key column 'errorCol' doesn't exist in the columns [card_no,level,name,note] of the table")
170170
.isPresent());
171171
}
172172
}
@@ -416,6 +416,51 @@ public void testTableWithoutPrimaryKey() {
416416
}
417417
}
418418

419+
@Test
420+
public void testAssignTableWithoutPrimaryKeyWithChunkKeyColumn() {
421+
String tableWithoutPrimaryKey = "customers_no_pk";
422+
List<String> expected =
423+
Arrays.asList(
424+
"customers_no_pk null [462]",
425+
"customers_no_pk [462] [823]",
426+
"customers_no_pk [823] [1184]",
427+
"customers_no_pk [1184] [1545]",
428+
"customers_no_pk [1545] [1906]",
429+
"customers_no_pk [1906] null");
430+
List<String> splits =
431+
getTestAssignSnapshotSplits(
432+
customerDatabase,
433+
4,
434+
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
435+
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
436+
new String[] {tableWithoutPrimaryKey},
437+
"id");
438+
assertEquals(expected, splits);
439+
}
440+
441+
@Test
442+
public void testAssignTableWithPrimaryKeyWithChunkKeyColumnNotInPrimaryKey() {
443+
String tableWithoutPrimaryKey = "customers";
444+
List<String> expected =
445+
Arrays.asList(
446+
"customers null [user_12]",
447+
"customers [user_12] [user_15]",
448+
"customers [user_15] [user_18]",
449+
"customers [user_18] [user_20]",
450+
"customers [user_20] [user_4]",
451+
"customers [user_4] [user_7]",
452+
"customers [user_7] null");
453+
List<String> splits =
454+
getTestAssignSnapshotSplits(
455+
customerDatabase,
456+
4,
457+
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
458+
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
459+
new String[] {tableWithoutPrimaryKey},
460+
"name");
461+
assertEquals(expected, splits);
462+
}
463+
419464
@Test
420465
public void testEnumerateTablesLazily() {
421466
final MySqlSourceConfig configuration =

0 commit comments

Comments
 (0)