Skip to content

Commit 07324de

Browse files
committed
Add wildcard tests at different position
1 parent a31a225 commit 07324de

File tree

1 file changed

+180
-0
lines changed

1 file changed

+180
-0
lines changed

flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -539,6 +539,52 @@ public void testWildcardTransform() throws Exception {
539539
.deletePostTransformed(1000, "Alice", 17, 1017)
540540
.runTests()
541541
.destroyHarness();
542+
543+
UnifiedTransformTestCase.of(
544+
tableId,
545+
"id + age as computed, *",
546+
"id > 100",
547+
Schema.newBuilder()
548+
.physicalColumn("id", DataTypes.INT())
549+
.physicalColumn("name", DataTypes.STRING())
550+
.physicalColumn("age", DataTypes.INT())
551+
.primaryKey("id")
552+
.build(),
553+
Schema.newBuilder()
554+
.physicalColumn("id", DataTypes.INT())
555+
.physicalColumn("name", DataTypes.STRING())
556+
.physicalColumn("age", DataTypes.INT())
557+
.primaryKey("id")
558+
.build(),
559+
Schema.newBuilder()
560+
.physicalColumn("computed", DataTypes.INT())
561+
.physicalColumn("id", DataTypes.INT())
562+
.physicalColumn("name", DataTypes.STRING())
563+
.physicalColumn("age", DataTypes.INT())
564+
.primaryKey("id")
565+
.build())
566+
.initializeHarness()
567+
.insertSource(1000, "Alice", 17)
568+
.insertPreTransformed(1000, "Alice", 17)
569+
.insertPostTransformed(1017, 1000, "Alice", 17)
570+
.insertSource(2000, "Bob", 18)
571+
.insertPreTransformed(2000, "Bob", 18)
572+
.insertPostTransformed(2018, 2000, "Bob", 18)
573+
.updateSource(new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16})
574+
.updatePreTransformed(
575+
new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16})
576+
.updatePostTransformed(
577+
new Object[] {2018, 2000, "Bob", 18},
578+
new Object[] {2016, 2000, "Barcarolle", 16})
579+
// filtered out data row
580+
.insertSource(50, "Carol", 19)
581+
.insertPreTransformed(50, "Carol", 19)
582+
.insertPostTransformed()
583+
.deleteSource(1000, "Alice", 17)
584+
.deletePreTransformed(1000, "Alice", 17)
585+
.deletePostTransformed(1017, 1000, "Alice", 17)
586+
.runTests()
587+
.destroyHarness();
542588
}
543589

544590
@Test
@@ -645,6 +691,54 @@ public void testCalculatedMetadataTransform() throws Exception {
645691
.deletePostTransformed(1000, "Alice", 17, "my_company.my_branch.metadata_transform")
646692
.runTests()
647693
.destroyHarness();
694+
695+
UnifiedTransformTestCase.of(
696+
tableId,
697+
"__namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS identifier_name, *",
698+
"id > 100",
699+
Schema.newBuilder()
700+
.physicalColumn("id", DataTypes.INT())
701+
.physicalColumn("name", DataTypes.STRING())
702+
.physicalColumn("age", DataTypes.INT())
703+
.primaryKey("id")
704+
.build(),
705+
Schema.newBuilder()
706+
.physicalColumn("id", DataTypes.INT())
707+
.physicalColumn("name", DataTypes.STRING())
708+
.physicalColumn("age", DataTypes.INT())
709+
.primaryKey("id")
710+
.build(),
711+
Schema.newBuilder()
712+
.physicalColumn("identifier_name", DataTypes.STRING())
713+
.physicalColumn("id", DataTypes.INT())
714+
.physicalColumn("name", DataTypes.STRING())
715+
.physicalColumn("age", DataTypes.INT())
716+
.primaryKey("id")
717+
.build())
718+
.initializeHarness()
719+
.insertSource(1000, "Alice", 17)
720+
.insertPreTransformed(1000, "Alice", 17)
721+
.insertPostTransformed("my_company.my_branch.metadata_transform", 1000, "Alice", 17)
722+
.insertSource(2000, "Bob", 18)
723+
.insertPreTransformed(2000, "Bob", 18)
724+
.insertPostTransformed("my_company.my_branch.metadata_transform", 2000, "Bob", 18)
725+
.updateSource(new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16})
726+
.updatePreTransformed(
727+
new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16})
728+
.updatePostTransformed(
729+
new Object[] {"my_company.my_branch.metadata_transform", 2000, "Bob", 18},
730+
new Object[] {
731+
"my_company.my_branch.metadata_transform", 2000, "Barcarolle", 16
732+
})
733+
// filtered out data row
734+
.insertSource(50, "Carol", 19)
735+
.insertPreTransformed(50, "Carol", 19)
736+
.insertPostTransformed()
737+
.deleteSource(1000, "Alice", 17)
738+
.deletePreTransformed(1000, "Alice", 17)
739+
.deletePostTransformed("my_company.my_branch.metadata_transform", 1000, "Alice", 17)
740+
.runTests()
741+
.destroyHarness();
648742
}
649743

650744
@Test
@@ -735,5 +829,91 @@ public void testMetadataAndCalculatedTransform() throws Exception {
735829
"metadata_transform")
736830
.runTests()
737831
.destroyHarness();
832+
833+
UnifiedTransformTestCase.of(
834+
tableId,
835+
"__namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS identifier_name, __namespace_name__, __schema_name__, __table_name__, *",
836+
"id > 100",
837+
Schema.newBuilder()
838+
.physicalColumn("id", DataTypes.INT())
839+
.physicalColumn("name", DataTypes.STRING())
840+
.physicalColumn("age", DataTypes.INT())
841+
.primaryKey("id")
842+
.build(),
843+
Schema.newBuilder()
844+
.physicalColumn("id", DataTypes.INT())
845+
.physicalColumn("name", DataTypes.STRING())
846+
.physicalColumn("age", DataTypes.INT())
847+
.primaryKey("id")
848+
.build(),
849+
Schema.newBuilder()
850+
.physicalColumn("identifier_name", DataTypes.STRING())
851+
.physicalColumn("__namespace_name__", DataTypes.STRING().notNull())
852+
.physicalColumn("__schema_name__", DataTypes.STRING().notNull())
853+
.physicalColumn("__table_name__", DataTypes.STRING().notNull())
854+
.physicalColumn("id", DataTypes.INT())
855+
.physicalColumn("name", DataTypes.STRING())
856+
.physicalColumn("age", DataTypes.INT())
857+
.primaryKey("id")
858+
.build())
859+
.initializeHarness()
860+
.insertSource(1000, "Alice", 17)
861+
.insertPreTransformed(1000, "Alice", 17)
862+
.insertPostTransformed(
863+
"my_company.my_branch.metadata_transform",
864+
"my_company",
865+
"my_branch",
866+
"metadata_transform",
867+
1000,
868+
"Alice",
869+
17)
870+
.insertSource(2000, "Bob", 18)
871+
.insertPreTransformed(2000, "Bob", 18)
872+
.insertPostTransformed(
873+
"my_company.my_branch.metadata_transform",
874+
"my_company",
875+
"my_branch",
876+
"metadata_transform",
877+
2000,
878+
"Bob",
879+
18)
880+
.updateSource(new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16})
881+
.updatePreTransformed(
882+
new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16})
883+
.updatePostTransformed(
884+
new Object[] {
885+
"my_company.my_branch.metadata_transform",
886+
"my_company",
887+
"my_branch",
888+
"metadata_transform",
889+
2000,
890+
"Bob",
891+
18
892+
},
893+
new Object[] {
894+
"my_company.my_branch.metadata_transform",
895+
"my_company",
896+
"my_branch",
897+
"metadata_transform",
898+
2000,
899+
"Barcarolle",
900+
16
901+
})
902+
// filtered out data row
903+
.insertSource(50, "Carol", 19)
904+
.insertPreTransformed(50, "Carol", 19)
905+
.insertPostTransformed()
906+
.deleteSource(1000, "Alice", 17)
907+
.deletePreTransformed(1000, "Alice", 17)
908+
.deletePostTransformed(
909+
"my_company.my_branch.metadata_transform",
910+
"my_company",
911+
"my_branch",
912+
"metadata_transform",
913+
1000,
914+
"Alice",
915+
17)
916+
.runTests()
917+
.destroyHarness();
738918
}
739919
}

0 commit comments

Comments
 (0)