Skip to content

Commit 672d61a

Browse files
committed
Store upstream schema & evolved schema separately
1 parent ac5c947 commit 672d61a

File tree

4 files changed

+71
-9
lines changed

4 files changed

+71
-9
lines changed

flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java

+68
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,74 @@ void testOneToOneRouting() throws Exception {
470470
"DataChangeEvent{tableId=default_namespace.default_schema.routed1, before=[2, 2], after=[2, x], op=UPDATE, meta=()}");
471471
}
472472

473+
@Test
474+
void testIdenticalOneToOneRouting() throws Exception {
475+
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
476+
477+
// Setup value source
478+
Configuration sourceConfig = new Configuration();
479+
sourceConfig.set(
480+
ValuesDataSourceOptions.EVENT_SET_ID,
481+
ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_MULTI_TABLES);
482+
SourceDef sourceDef =
483+
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);
484+
485+
// Setup value sink
486+
Configuration sinkConfig = new Configuration();
487+
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
488+
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
489+
490+
// Setup route
491+
TableId routedTable1 = TABLE_1;
492+
TableId routedTable2 = TABLE_2;
493+
List<RouteDef> routeDef =
494+
Arrays.asList(
495+
new RouteDef(TABLE_1.toString(), routedTable1.toString(), null, null),
496+
new RouteDef(TABLE_2.toString(), routedTable2.toString(), null, null));
497+
498+
// Setup pipeline
499+
Configuration pipelineConfig = new Configuration();
500+
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
501+
PipelineDef pipelineDef =
502+
new PipelineDef(
503+
sourceDef, sinkDef, routeDef, Collections.emptyList(), pipelineConfig);
504+
505+
// Execute the pipeline
506+
PipelineExecution execution = composer.compose(pipelineDef);
507+
execution.execute();
508+
509+
// Check result in ValuesDatabase
510+
List<String> routed1Results = ValuesDatabase.getResults(routedTable1);
511+
assertThat(routed1Results)
512+
.contains(
513+
"default_namespace.default_schema.table1:col1=2;newCol3=x",
514+
"default_namespace.default_schema.table1:col1=3;newCol3=");
515+
List<String> routed2Results = ValuesDatabase.getResults(routedTable2);
516+
assertThat(routed2Results)
517+
.contains(
518+
"default_namespace.default_schema.table2:col1=1;col2=1",
519+
"default_namespace.default_schema.table2:col1=2;col2=2",
520+
"default_namespace.default_schema.table2:col1=3;col2=3");
521+
522+
// Check the order and content of all received events
523+
String[] outputEvents = outCaptor.toString().trim().split("\n");
524+
assertThat(outputEvents)
525+
.containsExactly(
526+
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
527+
"CreateTableEvent{tableId=default_namespace.default_schema.table2, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
528+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1], op=INSERT, meta=()}",
529+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2], op=INSERT, meta=()}",
530+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3], op=INSERT, meta=()}",
531+
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}",
532+
"DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[1, 1], op=INSERT, meta=()}",
533+
"DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[2, 2], op=INSERT, meta=()}",
534+
"DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[3, 3], op=INSERT, meta=()}",
535+
"RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}",
536+
"DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}",
537+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1], after=[], op=DELETE, meta=()}",
538+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, 2], after=[2, x], op=UPDATE, meta=()}");
539+
}
540+
473541
@Test
474542
void testMergingWithRoute() throws Exception {
475543
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java

+1-6
Original file line numberDiff line numberDiff line change
@@ -106,12 +106,11 @@ public List<SchemaChangeEvent> applySchemaChange(SchemaChangeEvent schemaChangeE
106106
// single source mapping, replace the table ID directly
107107
SchemaChangeEvent derivedSchemaChangeEvent =
108108
ChangeEventUtils.recreateSchemaChangeEvent(schemaChangeEvent, derivedTable);
109-
schemaManager.applyUpstreamSchemaChange(derivedSchemaChangeEvent);
110109
events.add(derivedSchemaChangeEvent);
111110
} else {
112111
// multiple source mapping (merging tables)
113112
Schema derivedTableSchema =
114-
schemaManager.getLatestUpstreamSchema(derivedTable).get();
113+
schemaManager.getLatestEvolvedSchema(derivedTable).get();
115114
if (schemaChangeEvent instanceof CreateTableEvent) {
116115
events.addAll(
117116
handleCreateTableEvent(
@@ -230,7 +229,6 @@ private List<SchemaChangeEvent> handleRenameColumnEvent(
230229
AddColumnEvent derivedSchemaChangeEvent = new AddColumnEvent(derivedTable, newColumns);
231230
schemaChangeEvents.add(derivedSchemaChangeEvent);
232231
}
233-
schemaChangeEvents.forEach(schemaManager::applyUpstreamSchemaChange);
234232
return schemaChangeEvents;
235233
}
236234

@@ -262,7 +260,6 @@ private List<SchemaChangeEvent> handleAlterColumnTypeEvent(
262260
new AlterColumnTypeEvent(derivedTable, typeDifference);
263261
schemaChangeEvents.add(derivedSchemaChangeEvent);
264262
}
265-
schemaChangeEvents.forEach(schemaManager::applyUpstreamSchemaChange);
266263
return schemaChangeEvents;
267264
}
268265

@@ -301,7 +298,6 @@ private List<SchemaChangeEvent> handleAddColumnEvent(
301298
if (!newTypeMapping.isEmpty()) {
302299
schemaChangeEvents.add(new AlterColumnTypeEvent(derivedTable, newTypeMapping));
303300
}
304-
schemaChangeEvents.forEach(schemaManager::applyUpstreamSchemaChange);
305301
return schemaChangeEvents;
306302
}
307303

@@ -337,7 +333,6 @@ private List<SchemaChangeEvent> handleCreateTableEvent(
337333
if (!newTypeMapping.isEmpty()) {
338334
schemaChangeEvents.add(new AlterColumnTypeEvent(derivedTable, newTypeMapping));
339335
}
340-
schemaChangeEvents.forEach(schemaManager::applyUpstreamSchemaChange);
341336
return schemaChangeEvents;
342337
}
343338

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -265,13 +265,12 @@ private void startNextSchemaChangeRequest() {
265265
PendingSchemaChange pendingSchemaChange = pendingSchemaChanges.get(0);
266266
SchemaChangeRequest request = pendingSchemaChange.changeRequest;
267267
if (request.getSchemaChangeEvent() instanceof CreateTableEvent
268-
&& schemaManager.upstreamSchemaExists(request.getTableId())) {
268+
&& schemaManager.evolvedSchemaExists(request.getTableId())) {
269269
pendingSchemaChange
270270
.getResponseFuture()
271271
.complete(wrap(new SchemaChangeResponse(Collections.emptyList())));
272272
pendingSchemaChanges.remove(0);
273273
} else {
274-
schemaManager.applyUpstreamSchemaChange(request.getSchemaChangeEvent());
275274
List<SchemaChangeEvent> derivedSchemaChangeEvents =
276275
calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent());
277276
pendingSchemaChange

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ private void broadcastEvent(Event toBroadcast) {
114114
private Schema loadLatestSchemaFromRegistry(TableId tableId) {
115115
Optional<Schema> schema;
116116
try {
117-
schema = schemaEvolutionClient.getLatestUpstreamSchema(tableId);
117+
schema = schemaEvolutionClient.getLatestEvolvedSchema(tableId);
118118
} catch (Exception e) {
119119
throw new RuntimeException(
120120
String.format("Failed to request latest schema for table \"%s\"", tableId), e);

0 commit comments

Comments
 (0)