Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36981][cdc] Fix the bug of transform merging with route in YAML task #3826

Merged
merged 1 commit into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
Expand Down Expand Up @@ -495,8 +494,9 @@ void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception {
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 22], after=[2, x, 22], op=UPDATE, meta=({op_ts=5})}");
}

@Test
void testOneToOneRouting() throws Exception {
@ParameterizedTest
@EnumSource
void testOneToOneRouting(ValuesDataSink.SinkApi sinkApi) throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();

// Setup value source
Expand All @@ -510,6 +510,7 @@ void testOneToOneRouting() throws Exception {
// Setup value sink
Configuration sinkConfig = new Configuration();
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);

// Setup route
Expand Down Expand Up @@ -570,8 +571,9 @@ void testOneToOneRouting() throws Exception {
"DataChangeEvent{tableId=default_namespace.default_schema.routed1, before=[2, 2], after=[2, x], op=UPDATE, meta=()}");
}

@Test
void testIdenticalOneToOneRouting() throws Exception {
@ParameterizedTest
@EnumSource
void testIdenticalOneToOneRouting(ValuesDataSink.SinkApi sinkApi) throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();

// Setup value source
Expand All @@ -585,6 +587,7 @@ void testIdenticalOneToOneRouting() throws Exception {
// Setup value sink
Configuration sinkConfig = new Configuration();
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);

// Setup route
Expand Down Expand Up @@ -645,8 +648,9 @@ void testIdenticalOneToOneRouting() throws Exception {
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, 2], after=[2, x], op=UPDATE, meta=()}");
}

@Test
void testMergingWithRoute() throws Exception {
@ParameterizedTest
@EnumSource
void testMergingWithRoute(ValuesDataSink.SinkApi sinkApi) throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();

// Setup value source
Expand Down Expand Up @@ -678,7 +682,7 @@ void testMergingWithRoute() throws Exception {
// Table 1: +I[1, Alice, 18]
// Table 1: +I[2, Bob, 20]
// Table 1: -U[2, Bob, 20] +U[2, Bob, 30]
// Create table 2 [id, name, age]
// Create table 2 [id, name, age, description]
// Table 2: +I[3, Charlie, 15, student]
// Table 2: +I[4, Donald, 25, student]
// Table 2: -D[4, Donald, 25, student]
Expand Down Expand Up @@ -782,6 +786,7 @@ void testMergingWithRoute() throws Exception {
// Setup value sink
Configuration sinkConfig = new Configuration();
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);

// Setup route
Expand Down Expand Up @@ -841,8 +846,9 @@ void testMergingWithRoute() throws Exception {
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[6, Frank, 30, student, null, male], op=INSERT, meta=()}");
}

@Test
void testTransformMergingWithRoute() throws Exception {
@ParameterizedTest
@EnumSource
void testTransformMergingWithRoute(ValuesDataSink.SinkApi sinkApi) throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();

// Setup value source
Expand Down Expand Up @@ -978,6 +984,7 @@ void testTransformMergingWithRoute() throws Exception {
// Setup value sink
Configuration sinkConfig = new Configuration();
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);

// Setup transform
Expand Down Expand Up @@ -1049,6 +1056,216 @@ void testTransformMergingWithRoute() throws Exception {
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[6, Frank, 30, last_name, student, male], op=INSERT, meta=()}");
}

@ParameterizedTest
@EnumSource
void testTransformMergingWithRouteChangeOrder(ValuesDataSink.SinkApi sinkApi) throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();

// Setup value source
Configuration sourceConfig = new Configuration();
sourceConfig.set(
ValuesDataSourceOptions.EVENT_SET_ID,
ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);

TableId myTable1 = TableId.tableId("default_namespace", "default_schema", "mytable1");
TableId myTable2 = TableId.tableId("default_namespace", "default_schema", "mytable2");
Schema table1Schema =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("age", DataTypes.INT())
.primaryKey("id")
.build();
Schema table2Schema =
Schema.newBuilder()
.physicalColumn("id", DataTypes.BIGINT())
.physicalColumn("name", DataTypes.VARCHAR(255))
.physicalColumn("age", DataTypes.TINYINT())
.physicalColumn("description", DataTypes.STRING())
.primaryKey("id")
.build();

// Create test dataset:
// Create table 1 [id, name, age]
// Create table 2 [id, name, age, description]
// Table 1: +I[1, Alice, 18]
// Table 1: +I[2, Bob, 20]
// Table 1: -U[2, Bob, 20] +U[2, Bob, 30]
// Table 2: +I[3, Charlie, 15, student]
// Table 2: +I[4, Donald, 25, student]
// Table 2: -D[4, Donald, 25, student]
// Rename column for table 1: name -> last_name
// Add column for table 2: gender
// Table 1: +I[5, Eliza, 24]
// Table 2: +I[6, Frank, 30, student, male]
List<Event> events = new ArrayList<>();
BinaryRecordDataGenerator table1dataGenerator =
new BinaryRecordDataGenerator(
table1Schema.getColumnDataTypes().toArray(new DataType[0]));
BinaryRecordDataGenerator table2dataGenerator =
new BinaryRecordDataGenerator(
table2Schema.getColumnDataTypes().toArray(new DataType[0]));
events.add(new CreateTableEvent(myTable1, table1Schema));
events.add(new CreateTableEvent(myTable2, table2Schema));
events.add(
DataChangeEvent.insertEvent(
myTable1,
table1dataGenerator.generate(
new Object[] {1, BinaryStringData.fromString("Alice"), 18})));
events.add(
DataChangeEvent.insertEvent(
myTable1,
table1dataGenerator.generate(
new Object[] {2, BinaryStringData.fromString("Bob"), 20})));
events.add(
DataChangeEvent.updateEvent(
myTable1,
table1dataGenerator.generate(
new Object[] {2, BinaryStringData.fromString("Bob"), 20}),
table1dataGenerator.generate(
new Object[] {2, BinaryStringData.fromString("Bob"), 30})));
events.add(
DataChangeEvent.insertEvent(
myTable2,
table2dataGenerator.generate(
new Object[] {
3L,
BinaryStringData.fromString("Charlie"),
(byte) 15,
BinaryStringData.fromString("student")
})));
events.add(
DataChangeEvent.insertEvent(
myTable2,
table2dataGenerator.generate(
new Object[] {
4L,
BinaryStringData.fromString("Donald"),
(byte) 25,
BinaryStringData.fromString("student")
})));
events.add(
DataChangeEvent.deleteEvent(
myTable2,
table2dataGenerator.generate(
new Object[] {
4L,
BinaryStringData.fromString("Donald"),
(byte) 25,
BinaryStringData.fromString("student")
})));
// events.add(new RenameColumnEvent(myTable1, ImmutableMap.of("name", "last_name")));
events.add(
new AddColumnEvent(
myTable2,
Collections.singletonList(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("gender", DataTypes.STRING())))));
events.add(
DataChangeEvent.insertEvent(
myTable1,
table1dataGenerator.generate(
new Object[] {5, BinaryStringData.fromString("Eliza"), 24})));
events.add(
DataChangeEvent.insertEvent(
myTable2,
new BinaryRecordDataGenerator(
new DataType[] {
DataTypes.BIGINT(),
DataTypes.VARCHAR(255),
DataTypes.TINYINT(),
DataTypes.STRING(),
DataTypes.STRING()
})
.generate(
new Object[] {
6L,
BinaryStringData.fromString("Frank"),
(byte) 30,
BinaryStringData.fromString("student"),
BinaryStringData.fromString("male")
})));

ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));

SourceDef sourceDef =
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);

// Setup value sink
Configuration sinkConfig = new Configuration();
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);

// Setup transform
List<TransformDef> transformDef =
Collections.singletonList(
new TransformDef(
"default_namespace.default_schema.mytable[0-9]",
"*,'last_name' as last_name",
null,
null,
null,
null,
"",
null));

// Setup route
TableId mergedTable = TableId.tableId("default_namespace", "default_schema", "merged");
List<RouteDef> routeDef =
Collections.singletonList(
new RouteDef(
"default_namespace.default_schema.mytable[0-9]",
mergedTable.toString(),
null,
null));

// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
sinkDef,
routeDef,
transformDef,
Collections.emptyList(),
pipelineConfig);

// Execute the pipeline
PipelineExecution execution = composer.compose(pipelineDef);
execution.execute();
Schema mergedTableSchema = ValuesDatabase.getTableSchema(mergedTable);
assertThat(mergedTableSchema)
.isEqualTo(
Schema.newBuilder()
.physicalColumn("id", DataTypes.BIGINT())
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("age", DataTypes.INT())
.physicalColumn("last_name", DataTypes.STRING())
.physicalColumn("description", DataTypes.STRING())
.physicalColumn("gender", DataTypes.STRING())
.primaryKey("id")
.build());
String[] outputEvents = outCaptor.toString().trim().split("\n");
assertThat(outputEvents)
.containsExactly(
"CreateTableEvent{tableId=default_namespace.default_schema.merged, schema=columns={`id` INT,`name` STRING,`age` INT,`last_name` STRING}, primaryKeys=id, options=()}",
"AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`description` STRING, position=AFTER, existedColumnName=last_name}]}",
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, typeMapping={id=BIGINT}, oldTypeMapping={id=INT}}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[1, Alice, 18, last_name, null], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[2, Bob, 20, last_name, null], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[2, Bob, 20, last_name, null], after=[2, Bob, 30, last_name, null], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[3, Charlie, 15, last_name, student], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[4, Donald, 25, last_name, student], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[4, Donald, 25, last_name, student], after=[], op=DELETE, meta=()}",
"AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`gender` STRING, position=AFTER, existedColumnName=description}]}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[5, Eliza, 24, last_name, null, null], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[6, Frank, 30, last_name, student, male], op=INSERT, meta=()}");
}

@ParameterizedTest
@EnumSource
void testRouteWithReplaceSymbol(ValuesDataSink.SinkApi sinkApi) throws Exception {
Expand Down
Loading
Loading