Skip to content

Commit a252a61

Browse files
committed
Add lenient schema evolve behavior & tests
1 parent 79be8f8 commit a252a61

File tree

7 files changed

+937
-9
lines changed

7 files changed

+937
-9
lines changed

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,13 @@ public class PipelineOptions {
5555
.linebreak()
5656
.add(
5757
ListElement.list(
58+
text("IGNORE: Drop all schema change events."),
59+
text(
60+
"LENIENT: Apply schema changes to downstream tolerantly, and keeps executing if applying fails."),
61+
text(
62+
"TRY_EVOLVE: Apply schema changes to downstream, but keeps executing if applying fails."),
5863
text(
5964
"EVOLVE: Apply schema changes to downstream. This requires sink to support handling schema changes."),
60-
text("IGNORE: Drop all schema change events."),
6165
text(
6266
"EXCEPTION: Throw an exception to terminate the sync pipeline.")))
6367
.build());

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/SchemaChangeBehavior.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
@PublicEvolving
2424
public enum SchemaChangeBehavior {
2525
IGNORE,
26+
LENIENT,
2627
TRY_EVOLVE,
2728
EVOLVE,
2829
EXCEPTION

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -653,6 +653,82 @@ public void testFineGrainedSchemaEvolution() throws Exception {
653653
jobManagerConsumer);
654654
}
655655

656+
@Test
657+
public void testLenientSchemaEvolution() throws Exception {
658+
String pipelineJob =
659+
String.format(
660+
"source:\n"
661+
+ " type: mysql\n"
662+
+ " hostname: %s\n"
663+
+ " port: 3306\n"
664+
+ " username: %s\n"
665+
+ " password: %s\n"
666+
+ " tables: %s.members\n"
667+
+ " server-id: 5400-5404\n"
668+
+ " server-time-zone: UTC\n"
669+
+ "\n"
670+
+ "sink:\n"
671+
+ " type: values\n"
672+
+ "\n"
673+
+ "pipeline:\n"
674+
+ " schema.change.behavior: lenient\n"
675+
+ " parallelism: 1",
676+
INTER_CONTAINER_MYSQL_ALIAS,
677+
MYSQL_TEST_USER,
678+
MYSQL_TEST_PASSWORD,
679+
schemaEvolveDatabase.getDatabaseName());
680+
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
681+
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
682+
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
683+
submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar);
684+
waitUntilJobRunning(Duration.ofSeconds(30));
685+
LOG.info("Pipeline job is running");
686+
validateSnapshotData("members");
687+
688+
LOG.info("Starting schema evolution");
689+
String mysqlJdbcUrl =
690+
String.format(
691+
"jdbc:mysql://%s:%s/%s",
692+
MYSQL.getHost(),
693+
MYSQL.getDatabasePort(),
694+
schemaEvolveDatabase.getDatabaseName());
695+
696+
try (Connection conn =
697+
DriverManager.getConnection(
698+
mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
699+
Statement stmt = conn.createStatement()) {
700+
701+
waitForIncrementalStage("members", stmt);
702+
703+
// triggers AddColumnEvent
704+
stmt.execute("ALTER TABLE members ADD COLUMN gender TINYINT AFTER age;");
705+
stmt.execute("INSERT INTO members VALUES (1012, 'Eve', 17, 0);");
706+
707+
// triggers AlterColumnTypeEvent and RenameColumnEvent
708+
stmt.execute("ALTER TABLE members CHANGE COLUMN age precise_age DOUBLE;");
709+
710+
// triggers RenameColumnEvent
711+
stmt.execute("ALTER TABLE members RENAME COLUMN gender TO biological_sex;");
712+
713+
// triggers DropColumnEvent
714+
stmt.execute("ALTER TABLE members DROP COLUMN biological_sex");
715+
stmt.execute("INSERT INTO members VALUES (1013, 'Fiona', 16);");
716+
}
717+
718+
List<String> expected =
719+
Stream.of(
720+
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}",
721+
"DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17, 0], op=INSERT, meta=()}",
722+
"AlterColumnTypeEvent{tableId=%s.members, nameMapping={age=DOUBLE}}",
723+
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`precise_age` DOUBLE, position=LAST, existedColumnName=null}]}",
724+
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`biological_sex` TINYINT, position=LAST, existedColumnName=null}]}",
725+
"DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, null, null, 16.0, null], op=INSERT, meta=()}")
726+
.map(s -> String.format(s, schemaEvolveDatabase.getDatabaseName()))
727+
.collect(Collectors.toList());
728+
729+
validateResult(expected, taskManagerConsumer);
730+
}
731+
656732
private void validateResult(List<String> expectedEvents, ToStringConsumer consumer)
657733
throws Exception {
658734
for (String event : expectedEvents) {

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -338,8 +338,13 @@ private RecordData regenerateRecordData(
338338
RecordData.FieldGetter fieldGetter =
339339
RecordData.createFieldGetter(
340340
originalSchema.getColumn(columnName).get().getType(), columnIndex);
341-
// Check type compatibility
342-
if (originalSchema.getColumn(columnName).get().getType().equals(column.getType())) {
341+
// Check type compatibility, ignoring nullability
342+
if (originalSchema
343+
.getColumn(columnName)
344+
.get()
345+
.getType()
346+
.nullable()
347+
.equals(column.getType().nullable())) {
343348
fieldGetters.add(fieldGetter);
344349
} else {
345350
fieldGetters.add(
@@ -408,14 +413,15 @@ private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaCh
408413
schemaEvolveResponse.getPrintableFailedSchemaChangeEvents()));
409414
}
410415
} else if (schemaChangeBehavior == SchemaChangeBehavior.TRY_EVOLVE
416+
|| schemaChangeBehavior == SchemaChangeBehavior.LENIENT
411417
|| schemaChangeBehavior == SchemaChangeBehavior.IGNORE) {
412418
if (schemaEvolveResponse.hasException()) {
413419
schemaEvolveResponse
414420
.getFailedSchemaChangeEvents()
415421
.forEach(
416422
e ->
417423
LOG.warn(
418-
"Failed to apply event {}, but keeps running in TRY_EVOLVE mode. Caused by: {}",
424+
"Failed to apply event {}, but keeps running in tolerant mode. Caused by: {}",
419425
e.f0,
420426
e.f1));
421427
}

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java

Lines changed: 122 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,20 @@
1919

2020
import org.apache.flink.api.java.tuple.Tuple2;
2121
import org.apache.flink.cdc.common.annotation.Internal;
22+
import org.apache.flink.cdc.common.event.AddColumnEvent;
23+
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
2224
import org.apache.flink.cdc.common.event.CreateTableEvent;
25+
import org.apache.flink.cdc.common.event.DropColumnEvent;
26+
import org.apache.flink.cdc.common.event.RenameColumnEvent;
2327
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
2428
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
2529
import org.apache.flink.cdc.common.event.TableId;
2630
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
2731
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
32+
import org.apache.flink.cdc.common.schema.Column;
33+
import org.apache.flink.cdc.common.schema.Schema;
2834
import org.apache.flink.cdc.common.sink.MetadataApplier;
35+
import org.apache.flink.cdc.common.types.DataType;
2936
import org.apache.flink.cdc.runtime.operators.schema.event.RefreshPendingListsResponse;
3037
import org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest;
3138
import org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamResponse;
@@ -41,14 +48,19 @@
4148

4249
import java.io.Closeable;
4350
import java.io.IOException;
51+
import java.util.ArrayList;
4452
import java.util.Collections;
53+
import java.util.HashMap;
4554
import java.util.HashSet;
4655
import java.util.LinkedList;
4756
import java.util.List;
57+
import java.util.Map;
4858
import java.util.Set;
4959
import java.util.concurrent.CompletableFuture;
5060
import java.util.concurrent.ExecutorService;
5161
import java.util.concurrent.Executors;
62+
import java.util.stream.Collectors;
63+
import java.util.stream.Stream;
5264

5365
import static org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.RequestStatus.RECEIVED_RELEASE_REQUEST;
5466
import static org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils.wrap;
@@ -170,7 +182,7 @@ public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
170182
}
171183
schemaManager.applyUpstreamSchemaChange(request.getSchemaChangeEvent());
172184
List<SchemaChangeEvent> derivedSchemaChangeEvents =
173-
schemaDerivation.applySchemaChange(request.getSchemaChangeEvent());
185+
calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent());
174186
CompletableFuture<CoordinationResponse> response =
175187
CompletableFuture.completedFuture(
176188
wrap(new SchemaChangeResponse(derivedSchemaChangeEvents)));
@@ -261,7 +273,7 @@ private void startNextSchemaChangeRequest() {
261273
} else {
262274
schemaManager.applyUpstreamSchemaChange(request.getSchemaChangeEvent());
263275
List<SchemaChangeEvent> derivedSchemaChangeEvents =
264-
schemaDerivation.applySchemaChange(request.getSchemaChangeEvent());
276+
calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent());
265277
pendingSchemaChange
266278
.getResponseFuture()
267279
.complete(wrap(new SchemaChangeResponse(derivedSchemaChangeEvents)));
@@ -301,6 +313,114 @@ public void close() throws IOException {
301313
}
302314
}
303315

316+
private List<SchemaChangeEvent> calculateDerivedSchemaChangeEvents(SchemaChangeEvent event) {
317+
if (SchemaChangeBehavior.LENIENT.equals(schemaChangeBehavior)) {
318+
return lenientizeSchemaChangeEvent(event).stream()
319+
.flatMap(evt -> schemaDerivation.applySchemaChange(evt).stream())
320+
.collect(Collectors.toList());
321+
} else {
322+
return schemaDerivation.applySchemaChange(event);
323+
}
324+
}
325+
326+
private List<SchemaChangeEvent> lenientizeSchemaChangeEvent(SchemaChangeEvent event) {
327+
if (event instanceof CreateTableEvent) {
328+
return Collections.singletonList(event);
329+
}
330+
TableId tableId = event.tableId();
331+
Schema evolvedSchema =
332+
schemaManager
333+
.getLatestEvolvedSchema(tableId)
334+
.orElseThrow(
335+
() ->
336+
new IllegalStateException(
337+
"Evolved schema does not exist, not ready for schema change event "
338+
+ event));
339+
switch (event.getType()) {
340+
case ADD_COLUMN:
341+
{
342+
AddColumnEvent addColumnEvent = (AddColumnEvent) event;
343+
return Collections.singletonList(
344+
new AddColumnEvent(
345+
tableId,
346+
addColumnEvent.getAddedColumns().stream()
347+
.map(
348+
col ->
349+
new AddColumnEvent.ColumnWithPosition(
350+
Column.physicalColumn(
351+
col.getAddColumn()
352+
.getName(),
353+
col.getAddColumn()
354+
.getType()
355+
.nullable(),
356+
col.getAddColumn()
357+
.getComment())))
358+
.collect(Collectors.toList())));
359+
}
360+
case DROP_COLUMN:
361+
{
362+
DropColumnEvent dropColumnEvent = (DropColumnEvent) event;
363+
Map<String, DataType> convertNullableColumns =
364+
dropColumnEvent.getDroppedColumnNames().stream()
365+
.map(evolvedSchema::getColumn)
366+
.flatMap(e -> e.map(Stream::of).orElse(Stream.empty()))
367+
.filter(col -> !col.getType().isNullable())
368+
.collect(
369+
Collectors.toMap(
370+
Column::getName,
371+
column -> column.getType().nullable()));
372+
373+
if (convertNullableColumns.isEmpty()) {
374+
return Collections.emptyList();
375+
} else {
376+
return Collections.singletonList(
377+
new AlterColumnTypeEvent(tableId, convertNullableColumns));
378+
}
379+
}
380+
case RENAME_COLUMN:
381+
{
382+
RenameColumnEvent renameColumnEvent = (RenameColumnEvent) event;
383+
List<AddColumnEvent.ColumnWithPosition> appendColumns = new ArrayList<>();
384+
Map<String, DataType> convertNullableColumns = new HashMap<>();
385+
renameColumnEvent
386+
.getNameMapping()
387+
.forEach(
388+
(key, value) -> {
389+
Column column =
390+
evolvedSchema
391+
.getColumn(key)
392+
.orElseThrow(
393+
() ->
394+
new IllegalArgumentException(
395+
"Non-existed column "
396+
+ key
397+
+ " in evolved schema."));
398+
if (!column.getType().isNullable()) {
399+
// It's a not-nullable column, we need to cast it to
400+
// nullable first
401+
convertNullableColumns.put(
402+
key, column.getType().nullable());
403+
}
404+
appendColumns.add(
405+
new AddColumnEvent.ColumnWithPosition(
406+
Column.physicalColumn(
407+
value,
408+
column.getType().nullable(),
409+
column.getComment())));
410+
});
411+
412+
List<SchemaChangeEvent> events = new ArrayList<>();
413+
events.add(new AddColumnEvent(tableId, appendColumns));
414+
if (!convertNullableColumns.isEmpty()) {
415+
events.add(new AlterColumnTypeEvent(tableId, convertNullableColumns));
416+
}
417+
return events;
418+
}
419+
default:
420+
return Collections.singletonList(event);
421+
}
422+
}
423+
304424
private static class PendingSchemaChange {
305425
private final SchemaChangeRequest changeRequest;
306426
private List<SchemaChangeEvent> derivedSchemaChangeEvents;

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/metrics/SchemaOperatorMetrics.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,10 @@ public class SchemaOperatorMetrics {
3535
new HashMap<SchemaChangeBehavior, Integer>() {
3636
{
3737
put(SchemaChangeBehavior.IGNORE, 0);
38-
put(SchemaChangeBehavior.TRY_EVOLVE, 1);
39-
put(SchemaChangeBehavior.EVOLVE, 2);
40-
put(SchemaChangeBehavior.EXCEPTION, 3);
38+
put(SchemaChangeBehavior.LENIENT, 1);
39+
put(SchemaChangeBehavior.TRY_EVOLVE, 2);
40+
put(SchemaChangeBehavior.EVOLVE, 3);
41+
put(SchemaChangeBehavior.EXCEPTION, 4);
4142
}
4243
};
4344

0 commit comments

Comments
 (0)