Skip to content

Commit 431bc68

Browse files
committed
add test
1 parent e116802 commit 431bc68

File tree

1 file changed

+202
-5
lines changed

1 file changed

+202
-5
lines changed

flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java

Lines changed: 202 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -331,8 +331,8 @@ void testTransform(ValuesDataSink.SinkApi sinkApi) throws Exception {
331331
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}",
332332
"RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}",
333333
"DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}",
334-
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 10], after=[], op=DELETE, meta=()}",
335-
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 20], after=[2, x, 20], op=UPDATE, meta=()}");
334+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 10, 1], after=[], op=DELETE, meta=()}",
335+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, 20, ], after=[2, 20, x], op=UPDATE, meta=()}");
336336
}
337337

338338
@ParameterizedTest
@@ -398,8 +398,8 @@ void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception {
398398
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}",
399399
"RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}",
400400
"DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}",
401-
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 11], after=[], op=DELETE, meta=()}",
402-
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 22], after=[2, x, 22], op=UPDATE, meta=()}");
401+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 11, 1], after=[], op=DELETE, meta=()}",
402+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, 22, ], after=[2, 22, x], op=UPDATE, meta=()}");
403403
}
404404

405405
@Test
@@ -503,7 +503,7 @@ void testMergingWithRoute() throws Exception {
503503
// Table 1: +I[1, Alice, 18]
504504
// Table 1: +I[2, Bob, 20]
505505
// Table 1: -U[2, Bob, 20] +U[2, Bob, 30]
506-
// Create table 2 [id, name, age]
506+
// Create table 2 [id, name, age, description]
507507
// Table 2: +I[3, Charlie, 15, student]
508508
// Table 2: +I[4, Donald, 25, student]
509509
// Table 2: -D[4, Donald, 25, student]
@@ -716,4 +716,201 @@ void testRouteWithReplaceSymbol(ValuesDataSink.SinkApi sinkApi) throws Exception
716716
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[1, 1], after=[], op=DELETE, meta=()}",
717717
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[2, 2], after=[2, x], op=UPDATE, meta=()}");
718718
}
719+
720+
@Test
721+
void testTransformMergingWithRoute() throws Exception {
722+
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
723+
724+
// Setup value source
725+
Configuration sourceConfig = new Configuration();
726+
sourceConfig.set(
727+
ValuesDataSourceOptions.EVENT_SET_ID,
728+
ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
729+
730+
TableId myTable1 = TableId.tableId("default_namespace", "default_schema", "mytable1");
731+
TableId myTable2 = TableId.tableId("default_namespace", "default_schema", "mytable2");
732+
Schema table1Schema =
733+
Schema.newBuilder()
734+
.physicalColumn("id", DataTypes.INT())
735+
.physicalColumn("name", DataTypes.STRING())
736+
.physicalColumn("age", DataTypes.INT())
737+
.primaryKey("id")
738+
.build();
739+
Schema table2Schema =
740+
Schema.newBuilder()
741+
.physicalColumn("id", DataTypes.BIGINT())
742+
.physicalColumn("name", DataTypes.VARCHAR(255))
743+
.physicalColumn("age", DataTypes.TINYINT())
744+
.physicalColumn("description", DataTypes.STRING())
745+
.primaryKey("id")
746+
.build();
747+
748+
// Create test dataset:
749+
// Create table 1 [id, name, age]
750+
// Table 1: +I[1, Alice, 18]
751+
// Table 1: +I[2, Bob, 20]
752+
// Table 1: -U[2, Bob, 20] +U[2, Bob, 30]
753+
// Create table 2 [id, name, age, description]
754+
// Table 2: +I[3, Charlie, 15, student]
755+
// Table 2: +I[4, Donald, 25, student]
756+
// Table 2: -D[4, Donald, 25, student]
757+
// Add column for table 2: gender
758+
// Table 1: +I[5, Eliza, 24]
759+
// Table 2: +I[6, Frank, 30, student, male]
760+
List<Event> events = new ArrayList<>();
761+
BinaryRecordDataGenerator table1dataGenerator =
762+
new BinaryRecordDataGenerator(
763+
table1Schema.getColumnDataTypes().toArray(new DataType[0]));
764+
BinaryRecordDataGenerator table2dataGenerator =
765+
new BinaryRecordDataGenerator(
766+
table2Schema.getColumnDataTypes().toArray(new DataType[0]));
767+
events.add(new CreateTableEvent(myTable1, table1Schema));
768+
events.add(
769+
DataChangeEvent.insertEvent(
770+
myTable1,
771+
table1dataGenerator.generate(
772+
new Object[] {1, BinaryStringData.fromString("Alice"), 18})));
773+
events.add(
774+
DataChangeEvent.insertEvent(
775+
myTable1,
776+
table1dataGenerator.generate(
777+
new Object[] {2, BinaryStringData.fromString("Bob"), 20})));
778+
events.add(
779+
DataChangeEvent.updateEvent(
780+
myTable1,
781+
table1dataGenerator.generate(
782+
new Object[] {2, BinaryStringData.fromString("Bob"), 20}),
783+
table1dataGenerator.generate(
784+
new Object[] {2, BinaryStringData.fromString("Bob"), 30})));
785+
events.add(new CreateTableEvent(myTable2, table2Schema));
786+
events.add(
787+
DataChangeEvent.insertEvent(
788+
myTable2,
789+
table2dataGenerator.generate(
790+
new Object[] {
791+
3L,
792+
BinaryStringData.fromString("Charlie"),
793+
(byte) 15,
794+
BinaryStringData.fromString("student")
795+
})));
796+
events.add(
797+
DataChangeEvent.insertEvent(
798+
myTable2,
799+
table2dataGenerator.generate(
800+
new Object[] {
801+
4L,
802+
BinaryStringData.fromString("Donald"),
803+
(byte) 25,
804+
BinaryStringData.fromString("student")
805+
})));
806+
events.add(
807+
DataChangeEvent.deleteEvent(
808+
myTable2,
809+
table2dataGenerator.generate(
810+
new Object[] {
811+
4L,
812+
BinaryStringData.fromString("Donald"),
813+
(byte) 25,
814+
BinaryStringData.fromString("student")
815+
})));
816+
events.add(
817+
new AddColumnEvent(
818+
myTable2,
819+
Collections.singletonList(
820+
new AddColumnEvent.ColumnWithPosition(
821+
Column.physicalColumn("gender", DataTypes.STRING())))));
822+
events.add(
823+
DataChangeEvent.insertEvent(
824+
myTable1,
825+
table1dataGenerator.generate(
826+
new Object[] {5, BinaryStringData.fromString("Eliza"), 24})));
827+
events.add(
828+
DataChangeEvent.insertEvent(
829+
myTable2,
830+
new BinaryRecordDataGenerator(
831+
new DataType[] {
832+
DataTypes.BIGINT(),
833+
DataTypes.VARCHAR(255),
834+
DataTypes.TINYINT(),
835+
DataTypes.STRING(),
836+
DataTypes.STRING()
837+
})
838+
.generate(
839+
new Object[] {
840+
6L,
841+
BinaryStringData.fromString("Frank"),
842+
(byte) 30,
843+
BinaryStringData.fromString("student"),
844+
BinaryStringData.fromString("male")
845+
})));
846+
847+
ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));
848+
849+
SourceDef sourceDef =
850+
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);
851+
852+
// Setup value sink
853+
Configuration sinkConfig = new Configuration();
854+
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
855+
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
856+
857+
// Setup transform
858+
List<TransformDef> transformDef =
859+
Collections.singletonList(
860+
new TransformDef(
861+
"default_namespace.default_schema.mytable[0-9]",
862+
"*,'last_name' as last_name",
863+
null,
864+
null,
865+
null,
866+
null,
867+
""));
868+
869+
// Setup route
870+
TableId mergedTable = TableId.tableId("default_namespace", "default_schema", "merged");
871+
List<RouteDef> routeDef =
872+
Collections.singletonList(
873+
new RouteDef(
874+
"default_namespace.default_schema.mytable[0-9]",
875+
mergedTable.toString(),
876+
null,
877+
null));
878+
879+
// Setup pipeline
880+
Configuration pipelineConfig = new Configuration();
881+
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
882+
PipelineDef pipelineDef =
883+
new PipelineDef(sourceDef, sinkDef, routeDef, transformDef, pipelineConfig);
884+
885+
// Execute the pipeline
886+
PipelineExecution execution = composer.compose(pipelineDef);
887+
execution.execute();
888+
Schema mergedTableSchema = ValuesDatabase.getTableSchema(mergedTable);
889+
assertThat(mergedTableSchema)
890+
.isEqualTo(
891+
Schema.newBuilder()
892+
.physicalColumn("id", DataTypes.BIGINT())
893+
.physicalColumn("name", DataTypes.STRING())
894+
.physicalColumn("age", DataTypes.BIGINT())
895+
.physicalColumn("last_name", DataTypes.STRING())
896+
.physicalColumn("description", DataTypes.STRING())
897+
.physicalColumn("gender", DataTypes.STRING())
898+
.primaryKey("id")
899+
.build());
900+
String[] outputEvents = outCaptor.toString().trim().split("\n");
901+
assertThat(outputEvents)
902+
.containsExactly(
903+
"CreateTableEvent{tableId=default_namespace.default_schema.merged, schema=columns={`id` INT,`name` STRING,`age` INT,`last_name` STRING}, primaryKeys=id, options=()}",
904+
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[1, Alice, 18, last_name], op=INSERT, meta=()}",
905+
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[2, Bob, 20, last_name], op=INSERT, meta=()}",
906+
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[2, Bob, 20, last_name], after=[2, Bob, 30, last_name], op=UPDATE, meta=()}",
907+
"AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`description` STRING, position=LAST, existedColumnName=null}]}",
908+
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, nameMapping={age=BIGINT, id=BIGINT}}",
909+
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[3, Charlie, 15, last_name, student], op=INSERT, meta=()}",
910+
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[4, Donald, 25, last_name, student], op=INSERT, meta=()}",
911+
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[4, Donald, 25, last_name, student], after=[], op=DELETE, meta=()}",
912+
"AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`gender` STRING, position=LAST, existedColumnName=null}]}",
913+
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[5, Eliza, 24, last_name, null, null], op=INSERT, meta=()}",
914+
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[6, Frank, 30, last_name, student, male], op=INSERT, meta=()}");
915+
}
719916
}

0 commit comments

Comments
 (0)