From 13746a0eabaa3808d99ca1a4008668ac50eb7439 Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Wed, 17 Jul 2024 18:06:27 +0800 Subject: [PATCH] Store upstream schema & evolved schema separately --- .../flink/FlinkPipelineComposerITCase.java | 68 +++++++++++++++++++ .../schema/coordinator/SchemaDerivation.java | 7 +- .../SchemaRegistryRequestHandler.java | 3 +- .../partitioning/PrePartitionOperator.java | 2 +- 4 files changed, 71 insertions(+), 9 deletions(-) diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java index 26c9c918751..192fdd96daa 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java @@ -470,6 +470,74 @@ void testOneToOneRouting() throws Exception { "DataChangeEvent{tableId=default_namespace.default_schema.routed1, before=[2, 2], after=[2, x], op=UPDATE, meta=()}"); } + @Test + void testIdenticalOneToOneRouting() throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_MULTI_TABLES); + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup route + TableId routedTable1 = TABLE_1; + TableId routedTable2 = TABLE_2; + List routeDef = + Arrays.asList( + new RouteDef(TABLE_1.toString(), routedTable1.toString(), null, null), + new RouteDef(TABLE_2.toString(), routedTable2.toString(), null, null)); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, sinkDef, routeDef, Collections.emptyList(), pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + execution.execute(); + + // Check result in ValuesDatabase + List routed1Results = ValuesDatabase.getResults(routedTable1); + assertThat(routed1Results) + .contains( + "default_namespace.default_schema.table1:col1=2;newCol3=x", + "default_namespace.default_schema.table1:col1=3;newCol3="); + List routed2Results = ValuesDatabase.getResults(routedTable2); + assertThat(routed2Results) + .contains( + "default_namespace.default_schema.table2:col1=1;col2=1", + "default_namespace.default_schema.table2:col1=2;col2=2", + "default_namespace.default_schema.table2:col1=3;col2=3"); + + // Check the order and content of all received events + String[] outputEvents = outCaptor.toString().trim().split("\n"); + assertThat(outputEvents) + .containsExactly( + "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.table2, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3], op=INSERT, meta=()}", + "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}", + "DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[1, 1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[2, 2], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[3, 3], op=INSERT, meta=()}", + "RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}", + "DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1], after=[], op=DELETE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, 2], after=[2, x], op=UPDATE, meta=()}"); + } + @Test void testMergingWithRoute() throws Exception { FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java index e08193e60c4..e4b547b216b 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java @@ -106,12 +106,11 @@ public List applySchemaChange(SchemaChangeEvent schemaChangeE // single source mapping, replace the table ID directly SchemaChangeEvent derivedSchemaChangeEvent = ChangeEventUtils.recreateSchemaChangeEvent(schemaChangeEvent, derivedTable); - schemaManager.applyUpstreamSchemaChange(derivedSchemaChangeEvent); events.add(derivedSchemaChangeEvent); } else { // multiple source mapping (merging tables) Schema derivedTableSchema = - schemaManager.getLatestUpstreamSchema(derivedTable).get(); + schemaManager.getLatestEvolvedSchema(derivedTable).get(); if (schemaChangeEvent instanceof CreateTableEvent) { events.addAll( handleCreateTableEvent( @@ -230,7 +229,6 @@ private List handleRenameColumnEvent( AddColumnEvent derivedSchemaChangeEvent = new AddColumnEvent(derivedTable, newColumns); schemaChangeEvents.add(derivedSchemaChangeEvent); } - schemaChangeEvents.forEach(schemaManager::applyUpstreamSchemaChange); return schemaChangeEvents; } @@ -262,7 +260,6 @@ private List handleAlterColumnTypeEvent( new AlterColumnTypeEvent(derivedTable, typeDifference); schemaChangeEvents.add(derivedSchemaChangeEvent); } - schemaChangeEvents.forEach(schemaManager::applyUpstreamSchemaChange); return schemaChangeEvents; } @@ -301,7 +298,6 @@ private List handleAddColumnEvent( if (!newTypeMapping.isEmpty()) { schemaChangeEvents.add(new AlterColumnTypeEvent(derivedTable, newTypeMapping)); } - schemaChangeEvents.forEach(schemaManager::applyUpstreamSchemaChange); return schemaChangeEvents; } @@ -337,7 +333,6 @@ private List handleCreateTableEvent( if (!newTypeMapping.isEmpty()) { schemaChangeEvents.add(new AlterColumnTypeEvent(derivedTable, newTypeMapping)); } - schemaChangeEvents.forEach(schemaManager::applyUpstreamSchemaChange); return schemaChangeEvents; } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java index 2bfd411f441..7c883e30ff6 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java @@ -265,13 +265,12 @@ private void startNextSchemaChangeRequest() { PendingSchemaChange pendingSchemaChange = pendingSchemaChanges.get(0); SchemaChangeRequest request = pendingSchemaChange.changeRequest; if (request.getSchemaChangeEvent() instanceof CreateTableEvent - && schemaManager.upstreamSchemaExists(request.getTableId())) { + && schemaManager.evolvedSchemaExists(request.getTableId())) { pendingSchemaChange .getResponseFuture() .complete(wrap(new SchemaChangeResponse(Collections.emptyList()))); pendingSchemaChanges.remove(0); } else { - schemaManager.applyUpstreamSchemaChange(request.getSchemaChangeEvent()); List derivedSchemaChangeEvents = calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent()); pendingSchemaChange diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java index 915b9206131..7640a7fdf92 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java @@ -113,7 +113,7 @@ private void broadcastEvent(Event toBroadcast) { private Schema loadLatestSchemaFromRegistry(TableId tableId) { Optional schema; try { - schema = schemaEvolutionClient.getLatestUpstreamSchema(tableId); + schema = schemaEvolutionClient.getLatestEvolvedSchema(tableId); } catch (Exception e) { throw new RuntimeException( String.format("Failed to request latest schema for table \"%s\"", tableId), e);