diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java index a8acf798996..942ff1f72e3 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java @@ -321,7 +321,15 @@ void testTransform(ValuesDataSink.SinkApi sinkApi) throws Exception { // Execute the pipeline PipelineExecution execution = composer.compose(pipelineDef); execution.execute(); - + Schema transformedTableSchema = ValuesDatabase.getTableSchema(TABLE_1); + assertThat(transformedTableSchema) + .isEqualTo( + Schema.newBuilder() + .physicalColumn("col1", DataTypes.STRING()) + .physicalColumn("col12", DataTypes.STRING()) + .physicalColumn("newCol3", DataTypes.STRING()) + .primaryKey("col1") + .build()); // Check the order and content of all received events String[] outputEvents = outCaptor.toString().trim().split("\n"); assertThat(outputEvents) @@ -332,8 +340,8 @@ void testTransform(ValuesDataSink.SinkApi sinkApi) throws Exception { "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}", "RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}", "DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 10], after=[], op=DELETE, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 20], after=[2, x, 20], op=UPDATE, meta=()}"); + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 10, 1], after=[], op=DELETE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, 20, ], after=[2, 20, x], op=UPDATE, meta=()}"); } @ParameterizedTest @@ -388,7 +396,15 @@ void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception { // Execute the pipeline PipelineExecution execution = composer.compose(pipelineDef); execution.execute(); - + Schema transformedTableSchema = ValuesDatabase.getTableSchema(TABLE_1); + assertThat(transformedTableSchema) + .isEqualTo( + Schema.newBuilder() + .physicalColumn("col1", DataTypes.STRING()) + .physicalColumn("col12", DataTypes.STRING()) + .physicalColumn("newCol3", DataTypes.STRING()) + .primaryKey("col1") + .build()); // Check the order and content of all received events String[] outputEvents = outCaptor.toString().trim().split("\n"); assertThat(outputEvents) @@ -399,8 +415,8 @@ void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception { "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}", "RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}", "DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 11], after=[], op=DELETE, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 22], after=[2, x, 22], op=UPDATE, meta=()}"); + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 11, 1], after=[], op=DELETE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, 22, ], after=[2, 22, x], op=UPDATE, meta=()}"); } @Test @@ -504,7 +520,7 @@ void testMergingWithRoute() throws Exception { // Table 1: +I[1, Alice, 18] // Table 1: +I[2, Bob, 20] // Table 1: -U[2, Bob, 20] +U[2, Bob, 30] - // Create table 2 [id, name, age] + // Create table 2 [id, name, age, description] // Table 2: +I[3, Charlie, 15, student] // Table 2: +I[4, Donald, 25, student] // Table 2: -D[4, Donald, 25, student] @@ -717,4 +733,201 @@ void testRouteWithReplaceSymbol(ValuesDataSink.SinkApi sinkApi) throws Exception "DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[1, 1], after=[], op=DELETE, meta=()}", "DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[2, 2], after=[2, x], op=UPDATE, meta=()}"); } + + @Test + void testTransformMergingWithRoute() throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS); + + TableId myTable1 = TableId.tableId("default_namespace", "default_schema", "mytable1"); + TableId myTable2 = TableId.tableId("default_namespace", "default_schema", "mytable2"); + Schema table1Schema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.INT()) + .primaryKey("id") + .build(); + Schema table2Schema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.BIGINT()) + .physicalColumn("name", DataTypes.VARCHAR(255)) + .physicalColumn("age", DataTypes.TINYINT()) + .physicalColumn("description", DataTypes.STRING()) + .primaryKey("id") + .build(); + + // Create test dataset: + // Create table 1 [id, name, age] + // Table 1: +I[1, Alice, 18] + // Table 1: +I[2, Bob, 20] + // Table 1: -U[2, Bob, 20] +U[2, Bob, 30] + // Create table 2 [id, name, age, description] + // Table 2: +I[3, Charlie, 15, student] + // Table 2: +I[4, Donald, 25, student] + // Table 2: -D[4, Donald, 25, student] + // Add column for table 2: gender + // Table 1: +I[5, Eliza, 24] + // Table 2: +I[6, Frank, 30, student, male] + List events = new ArrayList<>(); + BinaryRecordDataGenerator table1dataGenerator = + new BinaryRecordDataGenerator( + table1Schema.getColumnDataTypes().toArray(new DataType[0])); + BinaryRecordDataGenerator table2dataGenerator = + new BinaryRecordDataGenerator( + table2Schema.getColumnDataTypes().toArray(new DataType[0])); + events.add(new CreateTableEvent(myTable1, table1Schema)); + events.add( + DataChangeEvent.insertEvent( + myTable1, + table1dataGenerator.generate( + new Object[] {1, BinaryStringData.fromString("Alice"), 18}))); + events.add( + DataChangeEvent.insertEvent( + myTable1, + table1dataGenerator.generate( + new Object[] {2, BinaryStringData.fromString("Bob"), 20}))); + events.add( + DataChangeEvent.updateEvent( + myTable1, + table1dataGenerator.generate( + new Object[] {2, BinaryStringData.fromString("Bob"), 20}), + table1dataGenerator.generate( + new Object[] {2, BinaryStringData.fromString("Bob"), 30}))); + events.add(new CreateTableEvent(myTable2, table2Schema)); + events.add( + DataChangeEvent.insertEvent( + myTable2, + table2dataGenerator.generate( + new Object[] { + 3L, + BinaryStringData.fromString("Charlie"), + (byte) 15, + BinaryStringData.fromString("student") + }))); + events.add( + DataChangeEvent.insertEvent( + myTable2, + table2dataGenerator.generate( + new Object[] { + 4L, + BinaryStringData.fromString("Donald"), + (byte) 25, + BinaryStringData.fromString("student") + }))); + events.add( + DataChangeEvent.deleteEvent( + myTable2, + table2dataGenerator.generate( + new Object[] { + 4L, + BinaryStringData.fromString("Donald"), + (byte) 25, + BinaryStringData.fromString("student") + }))); + events.add( + new AddColumnEvent( + myTable2, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("gender", DataTypes.STRING()))))); + events.add( + DataChangeEvent.insertEvent( + myTable1, + table1dataGenerator.generate( + new Object[] {5, BinaryStringData.fromString("Eliza"), 24}))); + events.add( + DataChangeEvent.insertEvent( + myTable2, + new BinaryRecordDataGenerator( + new DataType[] { + DataTypes.BIGINT(), + DataTypes.VARCHAR(255), + DataTypes.TINYINT(), + DataTypes.STRING(), + DataTypes.STRING() + }) + .generate( + new Object[] { + 6L, + BinaryStringData.fromString("Frank"), + (byte) 30, + BinaryStringData.fromString("student"), + BinaryStringData.fromString("male") + }))); + + ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events)); + + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup transform + List transformDef = + Collections.singletonList( + new TransformDef( + "default_namespace.default_schema.mytable[0-9]", + "*,'last_name' as last_name", + null, + null, + null, + null, + "")); + + // Setup route + TableId mergedTable = TableId.tableId("default_namespace", "default_schema", "merged"); + List routeDef = + Collections.singletonList( + new RouteDef( + "default_namespace.default_schema.mytable[0-9]", + mergedTable.toString(), + null, + null)); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + PipelineDef pipelineDef = + new PipelineDef(sourceDef, sinkDef, routeDef, transformDef, pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + execution.execute(); + Schema mergedTableSchema = ValuesDatabase.getTableSchema(mergedTable); + assertThat(mergedTableSchema) + .isEqualTo( + Schema.newBuilder() + .physicalColumn("id", DataTypes.BIGINT()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.BIGINT()) + .physicalColumn("last_name", DataTypes.STRING()) + .physicalColumn("description", DataTypes.STRING()) + .physicalColumn("gender", DataTypes.STRING()) + .primaryKey("id") + .build()); + String[] outputEvents = outCaptor.toString().trim().split("\n"); + assertThat(outputEvents) + .containsExactly( + "CreateTableEvent{tableId=default_namespace.default_schema.merged, schema=columns={`id` INT,`name` STRING,`age` INT,`last_name` STRING}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[1, Alice, 18, last_name], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[2, Bob, 20, last_name], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[2, Bob, 20, last_name], after=[2, Bob, 30, last_name], op=UPDATE, meta=()}", + "AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`description` STRING, position=LAST, existedColumnName=null}]}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, nameMapping={age=BIGINT, id=BIGINT}}", + "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[3, Charlie, 15, last_name, student], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[4, Donald, 25, last_name, student], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[4, Donald, 25, last_name, student], after=[], op=DELETE, meta=()}", + "AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`gender` STRING, position=LAST, existedColumnName=null}]}", + "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[5, Eliza, 24, last_name, null, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[6, Frank, 30, last_name, student, male], op=INSERT, meta=()}"); + } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperator.java index 20479c87322..47c73bf0a71 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperator.java @@ -234,13 +234,13 @@ private void transformSchema(TableId tableId, Schema schema) throws Exception { Tuple2.of(tableId, transformProjection))) { transformProjectionProcessorMap.put( Tuple2.of(tableId, transformProjection), - TransformProjectionProcessor.of(transformProjection)); + TransformProjectionProcessor.of(transformProjection, timezone)); } TransformProjectionProcessor transformProjectionProcessor = transformProjectionProcessorMap.get( Tuple2.of(tableId, transformProjection)); // update the columns of projection and add the column of projection into Schema - transformProjectionProcessor.processSchemaChangeEvent(schema); + transformProjectionProcessor.processSchemaChangeEvent(tableId, schema); } } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java index 7049bbdfda5..d5e6f60cbe3 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java @@ -20,6 +20,7 @@ import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.data.binary.BinaryRecordData; import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.runtime.parser.TransformParser; @@ -83,6 +84,11 @@ public static TransformProjectionProcessor of( return new TransformProjectionProcessor(null, tableChangeInfo, transformProjection, null); } + public static TransformProjectionProcessor of( + TransformProjection transformProjection, String timezone) { + return new TransformProjectionProcessor(null, null, transformProjection, timezone); + } + public static TransformProjectionProcessor of(TransformProjection transformProjection) { return new TransformProjectionProcessor(null, null, transformProjection, null); } @@ -99,7 +105,8 @@ public CreateTableEvent processCreateTableEvent(CreateTableEvent createTableEven return new CreateTableEvent(createTableEvent.tableId(), schema); } - public void processSchemaChangeEvent(Schema schema) { + public void processSchemaChangeEvent(TableId tableId, Schema schema) { + tableInfo = TableInfo.of(tableId, schema); List projectionColumns = TransformParser.generateProjectionColumns( transformProjection.getProjection(), schema.getColumns()); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformSchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformSchemaOperator.java index ca8949ed6d6..29663b5eba0 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformSchemaOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformSchemaOperator.java @@ -193,6 +193,7 @@ private SchemaChangeEvent cacheChangeSchema(SchemaChangeEvent event) { Schema newSchema = SchemaUtils.applySchemaChangeEvent(tableChangeInfo.getTransformedSchema(), event); tableChangeInfoMap.put(tableId, TableChangeInfo.of(tableId, originalSchema, newSchema)); + transformSchemaChange(tableId, originalSchema, newSchema); return event; } @@ -229,6 +230,24 @@ private CreateTableEvent transformCreateTableEvent(CreateTableEvent createTableE return createTableEvent; } + private void transformSchemaChange(TableId tableId, Schema originalSchema, Schema schema) { + for (Tuple2> transform : transforms) { + Selectors selectors = transform.f0; + if (selectors.isMatch(tableId) && transform.f1.isPresent()) { + TransformProjection transformProjection = transform.f1.get(); + if (transformProjection.isValid()) { + if (processorMap.containsKey(tableId)) { + processorMap.put( + tableId, + TransformProjectionProcessor.of( + TableChangeInfo.of(tableId, originalSchema, schema), + transformProjection)); + } + } + } + } + } + private Schema transformSchemaMetaData( Schema schema, SchemaMetadataTransform schemaMetadataTransform) { Schema.Builder schemaBuilder = Schema.newBuilder().setColumns(schema.getColumns());