Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35774][cdc-runtime] Fix the cache of transform is not updated after process schema change event #3455

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,15 @@ void testTransform(ValuesDataSink.SinkApi sinkApi) throws Exception {
// Execute the pipeline
PipelineExecution execution = composer.compose(pipelineDef);
execution.execute();

Schema transformedTableSchema = ValuesDatabase.getTableSchema(TABLE_1);
assertThat(transformedTableSchema)
.isEqualTo(
Schema.newBuilder()
.physicalColumn("col1", DataTypes.STRING())
.physicalColumn("col12", DataTypes.STRING())
.physicalColumn("newCol3", DataTypes.STRING())
.primaryKey("col1")
.build());
// Check the order and content of all received events
String[] outputEvents = outCaptor.toString().trim().split("\n");
assertThat(outputEvents)
Expand All @@ -332,8 +340,8 @@ void testTransform(ValuesDataSink.SinkApi sinkApi) throws Exception {
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}",
"RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}",
"DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 10], after=[], op=DELETE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 20], after=[2, x, 20], op=UPDATE, meta=()}");
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 10, 1], after=[], op=DELETE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, 20, ], after=[2, 20, x], op=UPDATE, meta=()}");
Copy link
Contributor

@yuxiqian yuxiqian Aug 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please also add tests to verify if downstream schema (could be accessed by ValuesDatabase#getTableSchema) is as expected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will have a try tonight.

}

@ParameterizedTest
Expand Down Expand Up @@ -388,7 +396,15 @@ void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception {
// Execute the pipeline
PipelineExecution execution = composer.compose(pipelineDef);
execution.execute();

Schema transformedTableSchema = ValuesDatabase.getTableSchema(TABLE_1);
assertThat(transformedTableSchema)
.isEqualTo(
Schema.newBuilder()
.physicalColumn("col1", DataTypes.STRING())
.physicalColumn("col12", DataTypes.STRING())
.physicalColumn("newCol3", DataTypes.STRING())
.primaryKey("col1")
.build());
// Check the order and content of all received events
String[] outputEvents = outCaptor.toString().trim().split("\n");
assertThat(outputEvents)
Expand All @@ -399,8 +415,8 @@ void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception {
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}",
"RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}",
"DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 11], after=[], op=DELETE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 22], after=[2, x, 22], op=UPDATE, meta=()}");
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 11, 1], after=[], op=DELETE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, 22, ], after=[2, 22, x], op=UPDATE, meta=()}");
Comment on lines +418 to +419
Copy link
Contributor

@yuxiqian yuxiqian Aug 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is transform semantic changed here? Seems after the schema evolution, upstream schema will be [col1, newCol3], and downstream schema should be inferred as [col1, newCol3, col12] instead of [col1, col12, newCol3].

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Transform semantic has changed. The previous discussion resulted in the current code architecture being unable to achieve the expected semantics.
Based on the following code, it may be possible to achieve.
#3285

}

@Test
Expand Down Expand Up @@ -504,7 +520,7 @@ void testMergingWithRoute() throws Exception {
// Table 1: +I[1, Alice, 18]
// Table 1: +I[2, Bob, 20]
// Table 1: -U[2, Bob, 20] +U[2, Bob, 30]
// Create table 2 [id, name, age]
// Create table 2 [id, name, age, description]
// Table 2: +I[3, Charlie, 15, student]
// Table 2: +I[4, Donald, 25, student]
// Table 2: -D[4, Donald, 25, student]
Expand Down Expand Up @@ -717,4 +733,201 @@ void testRouteWithReplaceSymbol(ValuesDataSink.SinkApi sinkApi) throws Exception
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[1, 1], after=[], op=DELETE, meta=()}",
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[2, 2], after=[2, x], op=UPDATE, meta=()}");
}

@Test
void testTransformMergingWithRoute() throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();

// Setup value source
Configuration sourceConfig = new Configuration();
sourceConfig.set(
ValuesDataSourceOptions.EVENT_SET_ID,
ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);

TableId myTable1 = TableId.tableId("default_namespace", "default_schema", "mytable1");
TableId myTable2 = TableId.tableId("default_namespace", "default_schema", "mytable2");
Schema table1Schema =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("age", DataTypes.INT())
.primaryKey("id")
.build();
Schema table2Schema =
Schema.newBuilder()
.physicalColumn("id", DataTypes.BIGINT())
.physicalColumn("name", DataTypes.VARCHAR(255))
.physicalColumn("age", DataTypes.TINYINT())
.physicalColumn("description", DataTypes.STRING())
.primaryKey("id")
.build();

// Create test dataset:
// Create table 1 [id, name, age]
// Table 1: +I[1, Alice, 18]
// Table 1: +I[2, Bob, 20]
// Table 1: -U[2, Bob, 20] +U[2, Bob, 30]
// Create table 2 [id, name, age, description]
// Table 2: +I[3, Charlie, 15, student]
// Table 2: +I[4, Donald, 25, student]
// Table 2: -D[4, Donald, 25, student]
// Add column for table 2: gender
// Table 1: +I[5, Eliza, 24]
// Table 2: +I[6, Frank, 30, student, male]
List<Event> events = new ArrayList<>();
BinaryRecordDataGenerator table1dataGenerator =
new BinaryRecordDataGenerator(
table1Schema.getColumnDataTypes().toArray(new DataType[0]));
BinaryRecordDataGenerator table2dataGenerator =
new BinaryRecordDataGenerator(
table2Schema.getColumnDataTypes().toArray(new DataType[0]));
events.add(new CreateTableEvent(myTable1, table1Schema));
events.add(
DataChangeEvent.insertEvent(
myTable1,
table1dataGenerator.generate(
new Object[] {1, BinaryStringData.fromString("Alice"), 18})));
events.add(
DataChangeEvent.insertEvent(
myTable1,
table1dataGenerator.generate(
new Object[] {2, BinaryStringData.fromString("Bob"), 20})));
events.add(
DataChangeEvent.updateEvent(
myTable1,
table1dataGenerator.generate(
new Object[] {2, BinaryStringData.fromString("Bob"), 20}),
table1dataGenerator.generate(
new Object[] {2, BinaryStringData.fromString("Bob"), 30})));
events.add(new CreateTableEvent(myTable2, table2Schema));
events.add(
DataChangeEvent.insertEvent(
myTable2,
table2dataGenerator.generate(
new Object[] {
3L,
BinaryStringData.fromString("Charlie"),
(byte) 15,
BinaryStringData.fromString("student")
})));
events.add(
DataChangeEvent.insertEvent(
myTable2,
table2dataGenerator.generate(
new Object[] {
4L,
BinaryStringData.fromString("Donald"),
(byte) 25,
BinaryStringData.fromString("student")
})));
events.add(
DataChangeEvent.deleteEvent(
myTable2,
table2dataGenerator.generate(
new Object[] {
4L,
BinaryStringData.fromString("Donald"),
(byte) 25,
BinaryStringData.fromString("student")
})));
events.add(
new AddColumnEvent(
myTable2,
Collections.singletonList(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("gender", DataTypes.STRING())))));
events.add(
DataChangeEvent.insertEvent(
myTable1,
table1dataGenerator.generate(
new Object[] {5, BinaryStringData.fromString("Eliza"), 24})));
events.add(
DataChangeEvent.insertEvent(
myTable2,
new BinaryRecordDataGenerator(
new DataType[] {
DataTypes.BIGINT(),
DataTypes.VARCHAR(255),
DataTypes.TINYINT(),
DataTypes.STRING(),
DataTypes.STRING()
})
.generate(
new Object[] {
6L,
BinaryStringData.fromString("Frank"),
(byte) 30,
BinaryStringData.fromString("student"),
BinaryStringData.fromString("male")
})));

ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));

SourceDef sourceDef =
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);

// Setup value sink
Configuration sinkConfig = new Configuration();
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);

// Setup transform
List<TransformDef> transformDef =
Collections.singletonList(
new TransformDef(
"default_namespace.default_schema.mytable[0-9]",
"*,'last_name' as last_name",
null,
null,
null,
null,
""));

// Setup route
TableId mergedTable = TableId.tableId("default_namespace", "default_schema", "merged");
List<RouteDef> routeDef =
Collections.singletonList(
new RouteDef(
"default_namespace.default_schema.mytable[0-9]",
mergedTable.toString(),
null,
null));

// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
PipelineDef pipelineDef =
new PipelineDef(sourceDef, sinkDef, routeDef, transformDef, pipelineConfig);

// Execute the pipeline
PipelineExecution execution = composer.compose(pipelineDef);
execution.execute();
Schema mergedTableSchema = ValuesDatabase.getTableSchema(mergedTable);
assertThat(mergedTableSchema)
.isEqualTo(
Schema.newBuilder()
.physicalColumn("id", DataTypes.BIGINT())
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("age", DataTypes.BIGINT())
.physicalColumn("last_name", DataTypes.STRING())
.physicalColumn("description", DataTypes.STRING())
.physicalColumn("gender", DataTypes.STRING())
.primaryKey("id")
.build());
String[] outputEvents = outCaptor.toString().trim().split("\n");
assertThat(outputEvents)
.containsExactly(
"CreateTableEvent{tableId=default_namespace.default_schema.merged, schema=columns={`id` INT,`name` STRING,`age` INT,`last_name` STRING}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[1, Alice, 18, last_name], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[2, Bob, 20, last_name], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[2, Bob, 20, last_name], after=[2, Bob, 30, last_name], op=UPDATE, meta=()}",
"AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`description` STRING, position=LAST, existedColumnName=null}]}",
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, nameMapping={age=BIGINT, id=BIGINT}}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[3, Charlie, 15, last_name, student], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[4, Donald, 25, last_name, student], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[4, Donald, 25, last_name, student], after=[], op=DELETE, meta=()}",
"AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`gender` STRING, position=LAST, existedColumnName=null}]}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[5, Eliza, 24, last_name, null, null], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[6, Frank, 30, last_name, student, male], op=INSERT, meta=()}");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,13 @@ private void transformSchema(TableId tableId, Schema schema) throws Exception {
Tuple2.of(tableId, transformProjection))) {
transformProjectionProcessorMap.put(
Tuple2.of(tableId, transformProjection),
TransformProjectionProcessor.of(transformProjection));
TransformProjectionProcessor.of(transformProjection, timezone));
}
TransformProjectionProcessor transformProjectionProcessor =
transformProjectionProcessorMap.get(
Tuple2.of(tableId, transformProjection));
// update the columns of projection and add the column of projection into Schema
transformProjectionProcessor.processSchemaChangeEvent(schema);
transformProjectionProcessor.processSchemaChangeEvent(tableId, schema);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.runtime.parser.TransformParser;
Expand Down Expand Up @@ -83,6 +84,11 @@ public static TransformProjectionProcessor of(
return new TransformProjectionProcessor(null, tableChangeInfo, transformProjection, null);
}

public static TransformProjectionProcessor of(
TransformProjection transformProjection, String timezone) {
return new TransformProjectionProcessor(null, null, transformProjection, timezone);
}

public static TransformProjectionProcessor of(TransformProjection transformProjection) {
return new TransformProjectionProcessor(null, null, transformProjection, null);
}
Expand All @@ -99,7 +105,8 @@ public CreateTableEvent processCreateTableEvent(CreateTableEvent createTableEven
return new CreateTableEvent(createTableEvent.tableId(), schema);
}

public void processSchemaChangeEvent(Schema schema) {
public void processSchemaChangeEvent(TableId tableId, Schema schema) {
tableInfo = TableInfo.of(tableId, schema);
List<ProjectionColumn> projectionColumns =
TransformParser.generateProjectionColumns(
transformProjection.getProjection(), schema.getColumns());
Expand Down
Loading
Loading