diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java index 3d0565df98b..68cdcd0e1d7 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java @@ -81,7 +81,7 @@ public static List restoreOriginalData( } /** Merge compatible upstream schemas. */ - public static Schema mergeCompatibleSchemas(List schemas) { + public static Schema inferWiderSchema(List schemas) { if (schemas.isEmpty()) { return null; } else if (schemas.size() == 1) { @@ -89,7 +89,7 @@ public static Schema mergeCompatibleSchemas(List schemas) { } else { Schema outputSchema = null; for (Schema schema : schemas) { - outputSchema = mergeSchema(outputSchema, schema); + outputSchema = inferWiderSchema(outputSchema, schema); } return outputSchema; } @@ -97,7 +97,7 @@ public static Schema mergeCompatibleSchemas(List schemas) { /** Try to combine two schemas with potential incompatible type. */ @VisibleForTesting - public static Schema mergeSchema(@Nullable Schema lSchema, Schema rSchema) { + public static Schema inferWiderSchema(@Nullable Schema lSchema, Schema rSchema) { if (lSchema == null) { return rSchema; } @@ -137,7 +137,7 @@ public static Schema mergeSchema(@Nullable Schema lSchema, Schema rSchema) { List mergedColumns = IntStream.range(0, lSchema.getColumnCount()) - .mapToObj(i -> mergeColumn(leftColumns.get(i), rightColumns.get(i))) + .mapToObj(i -> inferWiderColumn(leftColumns.get(i), rightColumns.get(i))) .collect(Collectors.toList()); return lSchema.copy(mergedColumns); @@ -145,7 +145,7 @@ public static Schema mergeSchema(@Nullable Schema lSchema, Schema rSchema) { /** Try to combine two columns with potential incompatible type. */ @VisibleForTesting - public static Column mergeColumn(Column lColumn, Column rColumn) { + public static Column inferWiderColumn(Column lColumn, Column rColumn) { if (!Objects.equals(lColumn.getName(), rColumn.getName())) { throw new IllegalStateException( String.format( @@ -158,12 +158,12 @@ public static Column mergeColumn(Column lColumn, Column rColumn) { "Unable to merge column %s and %s with different comments.", lColumn, rColumn)); } - return lColumn.copy(mergeDataType(lColumn.getType(), rColumn.getType())); + return lColumn.copy(inferWiderType(lColumn.getType(), rColumn.getType())); } /** Try to combine given data types to a compatible wider data type. */ @VisibleForTesting - public static DataType mergeDataType(DataType lType, DataType rType) { + public static DataType inferWiderType(DataType lType, DataType rType) { // Ignore nullability during data type merge boolean nullable = lType.isNullable() || rType.isNullable(); lType = lType.notNull(); diff --git a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java index 726e3f1c2a2..8a508a8908e 100644 --- a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java +++ b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java @@ -180,140 +180,143 @@ public void testGetNumericPrecision() { } @Test - public void testMergeDataType() { - Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.BINARY(17), DataTypes.BINARY(17))) + public void testInferWiderType() { + Assertions.assertThat( + SchemaUtils.inferWiderType(DataTypes.BINARY(17), DataTypes.BINARY(17))) .isEqualTo(DataTypes.BINARY(17)); Assertions.assertThat( - SchemaUtils.mergeDataType(DataTypes.VARBINARY(17), DataTypes.VARBINARY(17))) + SchemaUtils.inferWiderType( + DataTypes.VARBINARY(17), DataTypes.VARBINARY(17))) .isEqualTo(DataTypes.VARBINARY(17)); - Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.BYTES(), DataTypes.BYTES())) + Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.BYTES(), DataTypes.BYTES())) .isEqualTo(DataTypes.BYTES()); - Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.BOOLEAN(), DataTypes.BOOLEAN())) + Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.BOOLEAN(), DataTypes.BOOLEAN())) .isEqualTo(DataTypes.BOOLEAN()); - Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.INT(), DataTypes.INT())) + Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.INT(), DataTypes.INT())) .isEqualTo(DataTypes.INT()); - Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.TINYINT(), DataTypes.TINYINT())) + Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.TINYINT(), DataTypes.TINYINT())) .isEqualTo(DataTypes.TINYINT()); - Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.SMALLINT(), DataTypes.SMALLINT())) + Assertions.assertThat( + SchemaUtils.inferWiderType(DataTypes.SMALLINT(), DataTypes.SMALLINT())) .isEqualTo(DataTypes.SMALLINT()); - Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.BIGINT(), DataTypes.BIGINT())) + Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.BIGINT(), DataTypes.BIGINT())) .isEqualTo(DataTypes.BIGINT()); - Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.FLOAT(), DataTypes.FLOAT())) + Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.FLOAT(), DataTypes.FLOAT())) .isEqualTo(DataTypes.FLOAT()); - Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.DOUBLE(), DataTypes.DOUBLE())) + Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.DOUBLE(), DataTypes.DOUBLE())) .isEqualTo(DataTypes.DOUBLE()); - Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.CHAR(17), DataTypes.CHAR(17))) + Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.CHAR(17), DataTypes.CHAR(17))) .isEqualTo(DataTypes.CHAR(17)); Assertions.assertThat( - SchemaUtils.mergeDataType(DataTypes.VARCHAR(17), DataTypes.VARCHAR(17))) + SchemaUtils.inferWiderType(DataTypes.VARCHAR(17), DataTypes.VARCHAR(17))) .isEqualTo(DataTypes.VARCHAR(17)); - Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.STRING(), DataTypes.STRING())) + Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.STRING(), DataTypes.STRING())) .isEqualTo(DataTypes.STRING()); Assertions.assertThat( - SchemaUtils.mergeDataType( + SchemaUtils.inferWiderType( DataTypes.DECIMAL(17, 7), DataTypes.DECIMAL(17, 7))) .isEqualTo(DataTypes.DECIMAL(17, 7)); - Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.DATE(), DataTypes.DATE())) + Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.DATE(), DataTypes.DATE())) .isEqualTo(DataTypes.DATE()); - Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.TIME(), DataTypes.TIME())) + Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.TIME(), DataTypes.TIME())) .isEqualTo(DataTypes.TIME()); - Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.TIME(6), DataTypes.TIME(6))) + Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.TIME(6), DataTypes.TIME(6))) .isEqualTo(DataTypes.TIME(6)); Assertions.assertThat( - SchemaUtils.mergeDataType(DataTypes.TIMESTAMP(), DataTypes.TIMESTAMP())) + SchemaUtils.inferWiderType(DataTypes.TIMESTAMP(), DataTypes.TIMESTAMP())) .isEqualTo(DataTypes.TIMESTAMP()); Assertions.assertThat( - SchemaUtils.mergeDataType(DataTypes.TIMESTAMP(3), DataTypes.TIMESTAMP(3))) + SchemaUtils.inferWiderType(DataTypes.TIMESTAMP(3), DataTypes.TIMESTAMP(3))) .isEqualTo(DataTypes.TIMESTAMP(3)); Assertions.assertThat( - SchemaUtils.mergeDataType( + SchemaUtils.inferWiderType( DataTypes.TIMESTAMP_TZ(), DataTypes.TIMESTAMP_TZ())) .isEqualTo(DataTypes.TIMESTAMP_TZ()); Assertions.assertThat( - SchemaUtils.mergeDataType( + SchemaUtils.inferWiderType( DataTypes.TIMESTAMP_TZ(3), DataTypes.TIMESTAMP_TZ(3))) .isEqualTo(DataTypes.TIMESTAMP_TZ(3)); Assertions.assertThat( - SchemaUtils.mergeDataType( + SchemaUtils.inferWiderType( DataTypes.TIMESTAMP_LTZ(), DataTypes.TIMESTAMP_LTZ())) .isEqualTo(DataTypes.TIMESTAMP_LTZ()); Assertions.assertThat( - SchemaUtils.mergeDataType( + SchemaUtils.inferWiderType( DataTypes.TIMESTAMP_LTZ(3), DataTypes.TIMESTAMP_LTZ(3))) .isEqualTo(DataTypes.TIMESTAMP_LTZ(3)); Assertions.assertThat( - SchemaUtils.mergeDataType( + SchemaUtils.inferWiderType( DataTypes.ARRAY(DataTypes.INT()), DataTypes.ARRAY(DataTypes.INT()))) .isEqualTo(DataTypes.ARRAY(DataTypes.INT())); Assertions.assertThat( - SchemaUtils.mergeDataType( + SchemaUtils.inferWiderType( DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()), DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()))) .isEqualTo(DataTypes.MAP(DataTypes.INT(), DataTypes.STRING())); // Test compatible widening cast - Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.INT(), DataTypes.BIGINT())) + Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.INT(), DataTypes.BIGINT())) .isEqualTo(DataTypes.BIGINT()); - Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.VARCHAR(17), DataTypes.STRING())) + Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.VARCHAR(17), DataTypes.STRING())) .isEqualTo(DataTypes.STRING()); - Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.FLOAT(), DataTypes.DOUBLE())) + Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.FLOAT(), DataTypes.DOUBLE())) .isEqualTo(DataTypes.DOUBLE()); - Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.INT(), DataTypes.DECIMAL(4, 0))) + Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.INT(), DataTypes.DECIMAL(4, 0))) .isEqualTo(DataTypes.DECIMAL(10, 0)); - Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.INT(), DataTypes.DECIMAL(10, 5))) + Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.INT(), DataTypes.DECIMAL(10, 5))) .isEqualTo(DataTypes.DECIMAL(15, 5)); Assertions.assertThat( - SchemaUtils.mergeDataType(DataTypes.BIGINT(), DataTypes.DECIMAL(10, 5))) + SchemaUtils.inferWiderType(DataTypes.BIGINT(), DataTypes.DECIMAL(10, 5))) .isEqualTo(DataTypes.DECIMAL(24, 5)); Assertions.assertThat( - SchemaUtils.mergeDataType( + SchemaUtils.inferWiderType( DataTypes.DECIMAL(5, 4), DataTypes.DECIMAL(10, 2))) .isEqualTo(DataTypes.DECIMAL(12, 4)); // Test merging with nullability Assertions.assertThat( - SchemaUtils.mergeDataType( + SchemaUtils.inferWiderType( DataTypes.INT().notNull(), DataTypes.INT().notNull())) .isEqualTo(DataTypes.INT().notNull()); Assertions.assertThat( - SchemaUtils.mergeDataType( + SchemaUtils.inferWiderType( DataTypes.INT().nullable(), DataTypes.INT().notNull())) .isEqualTo(DataTypes.INT().nullable()); Assertions.assertThat( - SchemaUtils.mergeDataType( + SchemaUtils.inferWiderType( DataTypes.INT().notNull(), DataTypes.INT().nullable())) .isEqualTo(DataTypes.INT().nullable()); Assertions.assertThat( - SchemaUtils.mergeDataType( + SchemaUtils.inferWiderType( DataTypes.INT().nullable(), DataTypes.INT().nullable())) .isEqualTo(DataTypes.INT().nullable()); // incompatible type merges test Assertions.assertThatThrownBy( - () -> SchemaUtils.mergeDataType(DataTypes.INT(), DataTypes.DOUBLE())) + () -> SchemaUtils.inferWiderType(DataTypes.INT(), DataTypes.DOUBLE())) .isExactlyInstanceOf(IllegalStateException.class); Assertions.assertThatThrownBy( () -> - SchemaUtils.mergeDataType( + SchemaUtils.inferWiderType( DataTypes.DECIMAL(17, 0), DataTypes.DOUBLE())) .isExactlyInstanceOf(IllegalStateException.class); Assertions.assertThatThrownBy( - () -> SchemaUtils.mergeDataType(DataTypes.INT(), DataTypes.STRING())) + () -> SchemaUtils.inferWiderType(DataTypes.INT(), DataTypes.STRING())) .isExactlyInstanceOf(IllegalStateException.class); } @Test - public void testMergeColumn() { + public void testInferWiderColumn() { // Test normal merges Assertions.assertThat( - SchemaUtils.mergeColumn( + SchemaUtils.inferWiderColumn( Column.physicalColumn("Column1", DataTypes.INT()), Column.physicalColumn("Column1", DataTypes.BIGINT()))) .isEqualTo(Column.physicalColumn("Column1", DataTypes.BIGINT())); Assertions.assertThat( - SchemaUtils.mergeColumn( + SchemaUtils.inferWiderColumn( Column.physicalColumn("Column2", DataTypes.FLOAT()), Column.physicalColumn("Column2", DataTypes.DOUBLE()))) .isEqualTo(Column.physicalColumn("Column2", DataTypes.DOUBLE())); @@ -321,7 +324,7 @@ public void testMergeColumn() { // Test merging columns with incompatible types Assertions.assertThatThrownBy( () -> - SchemaUtils.mergeColumn( + SchemaUtils.inferWiderColumn( Column.physicalColumn("Column3", DataTypes.INT()), Column.physicalColumn("Column3", DataTypes.STRING()))) .isExactlyInstanceOf(IllegalStateException.class); @@ -329,17 +332,17 @@ public void testMergeColumn() { // Test merging with incompatible names Assertions.assertThatThrownBy( () -> - SchemaUtils.mergeColumn( + SchemaUtils.inferWiderColumn( Column.physicalColumn("Column4", DataTypes.INT()), Column.physicalColumn("AnotherColumn4", DataTypes.INT()))) .isExactlyInstanceOf(IllegalStateException.class); } @Test - public void testMergeSchema() { + public void testInferWiderSchema() { // Test normal merges Assertions.assertThat( - SchemaUtils.mergeSchema( + SchemaUtils.inferWiderSchema( Schema.newBuilder() .physicalColumn("Column1", DataTypes.INT()) .physicalColumn("Column2", DataTypes.DOUBLE()) @@ -363,7 +366,7 @@ public void testMergeSchema() { // Test merging with incompatible types Assertions.assertThatThrownBy( () -> - SchemaUtils.mergeSchema( + SchemaUtils.inferWiderSchema( Schema.newBuilder() .physicalColumn("Column1", DataTypes.INT()) .physicalColumn("Column2", DataTypes.DOUBLE()) @@ -381,7 +384,7 @@ public void testMergeSchema() { // Test merging with incompatible column names Assertions.assertThatThrownBy( () -> - SchemaUtils.mergeSchema( + SchemaUtils.inferWiderSchema( Schema.newBuilder() .physicalColumn("Column1", DataTypes.INT()) .physicalColumn("Column2", DataTypes.DOUBLE()) @@ -399,7 +402,7 @@ public void testMergeSchema() { // Test merging with different column counts Assertions.assertThatThrownBy( () -> - SchemaUtils.mergeSchema( + SchemaUtils.inferWiderSchema( Schema.newBuilder() .physicalColumn("Column1", DataTypes.INT()) .physicalColumn("Column2", DataTypes.DOUBLE()) @@ -418,7 +421,7 @@ public void testMergeSchema() { // Test merging with incompatible schema metadata Assertions.assertThatThrownBy( () -> - SchemaUtils.mergeSchema( + SchemaUtils.inferWiderSchema( Schema.newBuilder() .physicalColumn("Column1", DataTypes.INT()) .physicalColumn("Column2", DataTypes.DOUBLE()) diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java index b155cdf6519..60e644cee1f 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java @@ -359,7 +359,7 @@ void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception { new TransformDef( "default_namespace.default_schema.table1", "*,concat(col1,'1') as col12", - "col1 = '1'", + "col1 = '1' OR col1 = '999'", "col1", "col12", "key1=value1", diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/ComplexDataTypesE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/ComplexDataTypesE2eITCase.java index 954e359edb6..d3671032aa3 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/ComplexDataTypesE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/ComplexDataTypesE2eITCase.java @@ -46,6 +46,7 @@ import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -159,7 +160,7 @@ public void after() { } @Test - public void testSyncWholeDatabase() throws Exception { + public void testComplexDataTypes() throws Exception { String pipelineJob = String.format( "source:\n" @@ -183,6 +184,7 @@ public void testSyncWholeDatabase() throws Exception { + "transform:\n" + " - source-table: %s.DATA_TYPES_TABLE\n" + " projection: \\*, 'fine' AS FINE\n" + + " filter: id <> 3 AND id <> 4\n" + "pipeline:\n" + " parallelism: 1", MYSQL_TEST_USER, @@ -200,11 +202,9 @@ public void testSyncWholeDatabase() throws Exception { validateSinkResult( complexDataTypesDatabase.getDatabaseName(), "DATA_TYPES_TABLE", - 4, - Arrays.asList( - "1001 | 2012-12-21 17:00:02 | 100.00 | fine", - "1002 | 2012-12-21 17:00:03 | 100.10 | fine", - "1003 | 2012-12-21 17:00:05 | 100.86 | fine")); + 52, + Collections.singletonList( + "1 | 127 | 255 | 255 | 32767 | 65535 | 65535 | 8388607 | 16777215 | 16777215 | 2147483647 | 4294967295 | 4294967295 | 2147483647 | 9223372036854775807 | Hello World | abc | 123.102 | 123.102 | 123.103 | 123.104 | 404.4443 | 404.4444 | 404.4445 | 123.4567 | 123.4568 | 123.4569 | 346 | 34567892.1 | 0 | 1 | 1 | 2020-07-17 | 2020-07-17 18:00:22.0 | 2020-07-17 18:00:22.0 | 2020-07-17 18:00:22 | text | EA== | EA== | EA== | EA== | 2021 | red | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0} | {\"coordinates\":[[3,0],[3,3],[3,5]],\"type\":\"LineString\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0} | {\"coordinates\":[[1,1],[2,2]],\"type\":\"MultiPoint\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],\"type\":\"MultiLineString\",\"srid\":0} | {\"coordinates\":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],\"type\":\"MultiPolygon\",\"srid\":0} | {\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0} | fine")); LOG.info("Begin incremental reading stage."); // generate binlogs @@ -219,18 +219,37 @@ public void testSyncWholeDatabase() throws Exception { mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); Statement stat = conn.createStatement()) { - stat.execute( - "INSERT INTO DATA_TYPES_TABLE VALUES (1004, '2012-12-21 17:00:07', 110.37);"); + // Insert id = 2, 3, 4, 5 + for (int i = 2; i < 6; i++) { + stat.execute( + "INSERT INTO DATA_TYPES_TABLE\n" + + "VALUES (" + + i + + ", 127, 255, 255, 32767, 65535, 65535, 8388607, 16777215, 16777215, 2147483647,\n" + + " 4294967295, 4294967295, 2147483647, 9223372036854775807,\n" + + " 'Hello World', 'abc', 123.102, 123.102, 123.103, 123.104, 404.4443, 404.4444, 404.4445,\n" + + " 123.4567, 123.4568, 123.4569, 345.6, 34567892.1, 0, 1, true,\n" + + " '2020-07-17', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456', '2020-07-17 18:00:22',\n" + + " 'text', UNHEX(HEX(16)), UNHEX(HEX(16)), UNHEX(HEX(16)), UNHEX(HEX(16)), 2021,\n" + + " 'red',\n" + + " ST_GeomFromText('POINT(1 1)'),\n" + + " ST_GeomFromText('POLYGON((1 1, 2 1, 2 2, 1 2, 1 1))'),\n" + + " ST_GeomFromText('LINESTRING(3 0, 3 3, 3 5)'),\n" + + " ST_GeomFromText('POLYGON((1 1, 2 1, 2 2, 1 2, 1 1))'),\n" + + " ST_GeomFromText('MULTIPOINT((1 1),(2 2))'),\n" + + " ST_GeomFromText('MultiLineString((1 1,2 2,3 3),(4 4,5 5))'),\n" + + " ST_GeomFromText('MULTIPOLYGON(((0 0, 10 0, 10 10, 0 10, 0 0)), ((5 5, 7 5, 7 7, 5 7, 5 5)))'),\n" + + " ST_GeomFromText('GEOMETRYCOLLECTION(POINT(10 10), POINT(30 30), LINESTRING(15 15, 20 20))'));"); + } validateSinkResult( complexDataTypesDatabase.getDatabaseName(), "DATA_TYPES_TABLE", - 4, + 52, Arrays.asList( - "1001 | 2012-12-21 17:00:02 | 100.00 | fine", - "1002 | 2012-12-21 17:00:03 | 100.10 | fine", - "1003 | 2012-12-21 17:00:05 | 100.86 | fine", - "1004 | 2012-12-21 17:00:07 | 110.37 | fine")); + "1 | 127 | 255 | 255 | 32767 | 65535 | 65535 | 8388607 | 16777215 | 16777215 | 2147483647 | 4294967295 | 4294967295 | 2147483647 | 9223372036854775807 | Hello World | abc | 123.102 | 123.102 | 123.103 | 123.104 | 404.4443 | 404.4444 | 404.4445 | 123.4567 | 123.4568 | 123.4569 | 346 | 34567892.1 | 0 | 1 | 1 | 2020-07-17 | 2020-07-17 18:00:22.0 | 2020-07-17 18:00:22.0 | 2020-07-17 18:00:22 | text | EA== | EA== | EA== | EA== | 2021 | red | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0} | {\"coordinates\":[[3,0],[3,3],[3,5]],\"type\":\"LineString\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0} | {\"coordinates\":[[1,1],[2,2]],\"type\":\"MultiPoint\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],\"type\":\"MultiLineString\",\"srid\":0} | {\"coordinates\":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],\"type\":\"MultiPolygon\",\"srid\":0} | {\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0} | fine", + "2 | 127 | 255 | 255 | 32767 | 65535 | 65535 | 8388607 | 16777215 | 16777215 | 2147483647 | 4294967295 | 4294967295 | 2147483647 | 9223372036854775807 | Hello World | abc | 123.102 | 123.102 | 123.103 | 123.104 | 404.4443 | 404.4444 | 404.4445 | 123.4567 | 123.4568 | 123.4569 | 346 | 34567892.1 | 0 | 1 | 1 | 2020-07-17 | 2020-07-17 18:00:22.0 | 2020-07-17 18:00:22.0 | 2020-07-17 18:00:22 | text | EA== | EA== | EA== | EA== | 2021 | red | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0} | {\"coordinates\":[[3,0],[3,3],[3,5]],\"type\":\"LineString\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0} | {\"coordinates\":[[1,1],[2,2]],\"type\":\"MultiPoint\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],\"type\":\"MultiLineString\",\"srid\":0} | {\"coordinates\":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],\"type\":\"MultiPolygon\",\"srid\":0} | {\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0} | fine", + "5 | 127 | 255 | 255 | 32767 | 65535 | 65535 | 8388607 | 16777215 | 16777215 | 2147483647 | 4294967295 | 4294967295 | 2147483647 | 9223372036854775807 | Hello World | abc | 123.102 | 123.102 | 123.103 | 123.104 | 404.4443 | 404.4444 | 404.4445 | 123.4567 | 123.4568 | 123.4569 | 346 | 34567892.1 | 0 | 1 | 1 | 2020-07-17 | 2020-07-17 18:00:22.0 | 2020-07-17 18:00:22.0 | 2020-07-17 18:00:22 | text | EA== | EA== | EA== | EA== | 2021 | red | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0} | {\"coordinates\":[[3,0],[3,3],[3,5]],\"type\":\"LineString\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0} | {\"coordinates\":[[1,1],[2,2]],\"type\":\"MultiPoint\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],\"type\":\"MultiLineString\",\"srid\":0} | {\"coordinates\":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],\"type\":\"MultiPolygon\",\"srid\":0} | {\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0} | fine")); } catch (SQLException e) { LOG.error("Update table for CDC failed.", e); throw e; @@ -279,12 +298,17 @@ private void validateSinkResult( String databaseName, String tableName, int columnCount, List expected) throws Exception { long startWaitingTimestamp = System.currentTimeMillis(); + List results = new ArrayList<>(); while (true) { if (System.currentTimeMillis() - startWaitingTimestamp > TESTCASE_TIMEOUT_SECONDS * 1000) { - throw new RuntimeException("Doris backend startup timed out."); + LOG.error( + "Failed to retrieve enough {} entries before timeout.\nResults for now: {}", + expected.size(), + results); + throw new RuntimeException("Failed to retrieve enough entries before timeout."); } - List results = new ArrayList<>(); + results.clear(); try (Connection conn = DriverManager.getConnection( DORIS.getJdbcUrl(databaseName, DORIS.getUsername())); diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/data_types_test.sql b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/data_types_test.sql index a080b645e27..4c7621250f9 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/data_types_test.sql +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/data_types_test.sql @@ -13,16 +13,79 @@ -- See the License for the specific language governing permissions and -- limitations under the License. -DROP TABLE IF EXISTS DATA_TYPES_TABLE; +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: column_type_test +-- ---------------------------------------------------------------------------------------------------------------- -CREATE TABLE DATA_TYPES_TABLE ( - ID INT NOT NULL, - TS DATETIME(0), - NUM DECIMAL(10, 2), - PRIMARY KEY (ID) -); - -INSERT INTO DATA_TYPES_TABLE VALUES (1001, '2012-12-21 17:00:02', 100.00); -INSERT INTO DATA_TYPES_TABLE VALUES (1002, '2012-12-21 17:00:03', 100.10); -INSERT INTO DATA_TYPES_TABLE VALUES (1003, '2012-12-21 17:00:05', 100.86); +CREATE TABLE DATA_TYPES_TABLE +( + id INT, + tiny_c TINYINT, + tiny_un_c TINYINT UNSIGNED, + tiny_un_z_c TINYINT UNSIGNED ZEROFILL, + small_c SMALLINT, + small_un_c SMALLINT UNSIGNED, + small_un_z_c SMALLINT UNSIGNED ZEROFILL, + medium_c MEDIUMINT, + medium_un_c MEDIUMINT UNSIGNED, + medium_un_z_c MEDIUMINT UNSIGNED ZEROFILL, + int_c INTEGER, + int_un_c INTEGER UNSIGNED, + int_un_z_c INTEGER UNSIGNED ZEROFILL, + int11_c INT(11), + big_c BIGINT, + varchar_c VARCHAR(255), + char_c CHAR(3), + real_c REAL, + float_c FLOAT, + float_un_c FLOAT UNSIGNED, + float_un_z_c FLOAT UNSIGNED ZEROFILL, + double_c DOUBLE, + double_un_c DOUBLE UNSIGNED, + double_un_z_c DOUBLE UNSIGNED ZEROFILL, + decimal_c DECIMAL(8, 4), + decimal_un_c DECIMAL(8, 4) UNSIGNED, + decimal_un_z_c DECIMAL(8, 4) UNSIGNED ZEROFILL, + numeric_c NUMERIC(6, 0), + big_decimal_c DECIMAL(65, 1), + bit1_c BIT, + tiny1_c TINYINT(1), + boolean_c BOOLEAN, + date_c DATE, + datetime3_c DATETIME(3), + datetime6_c DATETIME(6), + timestamp_c TIMESTAMP, + text_c TEXT, + tiny_blob_c TINYBLOB, + blob_c BLOB, + medium_blob_c MEDIUMBLOB, + long_blob_c LONGBLOB, + year_c YEAR, + enum_c enum('red', 'white') default 'red', + point_c POINT, + geometry_c GEOMETRY, + linestring_c LINESTRING, + polygon_c POLYGON, + multipoint_c MULTIPOINT, + multiline_c MULTILINESTRING, + multipolygon_c MULTIPOLYGON, + geometrycollection_c GEOMCOLLECTION, + PRIMARY KEY (id) +) DEFAULT CHARSET=utf8; +INSERT INTO DATA_TYPES_TABLE +VALUES (1, 127, 255, 255, 32767, 65535, 65535, 8388607, 16777215, 16777215, 2147483647, + 4294967295, 4294967295, 2147483647, 9223372036854775807, + 'Hello World', 'abc', 123.102, 123.102, 123.103, 123.104, 404.4443, 404.4444, 404.4445, + 123.4567, 123.4568, 123.4569, 345.6, 34567892.1, 0, 1, true, + '2020-07-17', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456', '2020-07-17 18:00:22', + 'text', UNHEX(HEX(16)), UNHEX(HEX(16)), UNHEX(HEX(16)), UNHEX(HEX(16)), 2021, + 'red', + ST_GeomFromText('POINT(1 1)'), + ST_GeomFromText('POLYGON((1 1, 2 1, 2 2, 1 2, 1 1))'), + ST_GeomFromText('LINESTRING(3 0, 3 3, 3 5)'), + ST_GeomFromText('POLYGON((1 1, 2 1, 2 2, 1 2, 1 1))'), + ST_GeomFromText('MULTIPOINT((1 1),(2 2))'), + ST_GeomFromText('MultiLineString((1 1,2 2,3 3),(4 4,5 5))'), + ST_GeomFromText('MULTIPOLYGON(((0 0, 10 0, 10 10, 0 10, 0 0)), ((5 5, 7 5, 7 7, 5 7, 5 5)))'), + ST_GeomFromText('GEOMETRYCOLLECTION(POINT(10 10), POINT(30 30), LINESTRING(15 15, 20 20))')); \ No newline at end of file diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java index d1f468bfe7c..355978222c3 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java @@ -66,6 +66,7 @@ import javax.annotation.Nullable; +import java.io.Serializable; import java.time.Duration; import java.util.ArrayList; import java.util.List; @@ -83,7 +84,7 @@ */ @Internal public class SchemaOperator extends AbstractStreamOperator - implements OneInputStreamOperator { + implements OneInputStreamOperator, Serializable { private static final long serialVersionUID = 1L; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkFunctionOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkFunctionOperator.java index 438c3f302d5..3cb3899b69d 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkFunctionOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkFunctionOperator.java @@ -34,6 +34,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; +import java.io.Serializable; import java.util.HashSet; import java.util.Optional; import java.util.Set; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformChangeInfo.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformChangeInfo.java index c6accd488b3..989af2e75ad 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformChangeInfo.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformChangeInfo.java @@ -26,13 +26,16 @@ import java.util.List; -/** The TableInfo applies to cache schema and fieldGetters. */ +/** + * PostTransformChangeInfo caches pre-transformed / pre-transformed schema, schema field getters, + * and binary record data generator for post-transform schema. + */ public class PostTransformChangeInfo { private TableId tableId; + private Schema preTransformedSchema; private Schema postTransformedSchema; - private RecordData.FieldGetter[] postTransformedFieldGetters; - private Schema inputSchema; private RecordData.FieldGetter[] preTransformedFieldGetters; + private RecordData.FieldGetter[] postTransformedFieldGetters; private BinaryRecordDataGenerator recordDataGenerator; public PostTransformChangeInfo( @@ -45,7 +48,7 @@ public PostTransformChangeInfo( this.tableId = tableId; this.postTransformedSchema = postTransformedSchema; this.postTransformedFieldGetters = postTransformedFieldGetters; - this.inputSchema = preTransformedSchema; + this.preTransformedSchema = preTransformedSchema; this.preTransformedFieldGetters = preTransformedFieldGetters; this.recordDataGenerator = recordDataGenerator; } @@ -74,8 +77,8 @@ public Schema getPostTransformedSchema() { return postTransformedSchema; } - public Schema getInputSchema() { - return inputSchema; + public Schema getPreTransformedSchema() { + return preTransformedSchema; } public RecordData.FieldGetter[] getPostTransformedFieldGetters() { diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java index 2ad8af77261..b280bf90168 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java @@ -36,6 +36,7 @@ import javax.annotation.Nullable; +import java.io.Serializable; import java.time.ZoneId; import java.util.ArrayList; import java.util.List; @@ -50,11 +51,13 @@ * projection. */ public class PostTransformOperator extends AbstractStreamOperator - implements OneInputStreamOperator { + implements OneInputStreamOperator, Serializable { + + private static final long serialVersionUID = 1L; private final String timezone; private final List transformRules; - private transient List transforms; + private transient List transforms; /** keep the relationship of TableId and table information. */ private final Map postTransformChangeInfoMap; @@ -140,7 +143,7 @@ public void open() throws Exception { new Selectors.SelectorsBuilder() .includeTables(tableInclusions) .build(); - return new PostTransformers( + return new PostTransformer( selectors, TransformProjection.of(projection).orElse(null), TransformFilter.of(filterExpression).orElse(null)); @@ -190,7 +193,7 @@ private SchemaChangeEvent cacheSchema(SchemaChangeEvent event) throws Exception } else { schema = SchemaUtils.applySchemaChangeEvent( - getPostTransformChangeInfo(tableId).getInputSchema(), event); + getPostTransformChangeInfo(tableId).getPreTransformedSchema(), event); } Schema projectedSchema = transformSchema(tableId, schema); @@ -214,7 +217,7 @@ private PostTransformChangeInfo getPostTransformChangeInfo(TableId tableId) { private Schema transformSchema(TableId tableId, Schema schema) throws Exception { List newSchemas = new ArrayList<>(); - for (PostTransformers transform : transforms) { + for (PostTransformer transform : transforms) { Selectors selectors = transform.getSelectors(); if (selectors.isMatch(tableId) && transform.getProjection().isPresent()) { TransformProjection transformProjection = transform.getProjection().get(); @@ -237,7 +240,7 @@ private Schema transformSchema(TableId tableId, Schema schema) throws Exception return schema; } - return SchemaUtils.mergeCompatibleSchemas(newSchemas); + return SchemaUtils.inferWiderSchema(newSchemas); } private Optional processDataChangeEvent(DataChangeEvent dataChangeEvent) @@ -246,7 +249,7 @@ private Optional processDataChangeEvent(DataChangeEvent dataCha PostTransformChangeInfo tableInfo = getPostTransformChangeInfo(tableId); List> transformedDataChangeEventOptionalList = new ArrayList<>(); long epochTime = System.currentTimeMillis(); - for (PostTransformers transform : transforms) { + for (PostTransformer transform : transforms) { Selectors selectors = transform.getSelectors(); if (selectors.isMatch(tableId)) { diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformers.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformer.java similarity index 92% rename from flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformers.java rename to flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformer.java index 9daea231e70..33d4395c194 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformers.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformer.java @@ -23,14 +23,14 @@ import java.util.Optional; -/** Transformation rules used by {@link PostTransformOperator}. */ -public class PostTransformers { +/** Post-Transformation rule used by {@link PostTransformOperator}. */ +public class PostTransformer { private final Selectors selectors; private final Optional projection; private final Optional filter; - public PostTransformers( + public PostTransformer( Selectors selectors, @Nullable TransformProjection projection, @Nullable TransformFilter filter) { diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformChangeInfo.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformChangeInfo.java index 4abec8d3959..b1eca835e9c 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformChangeInfo.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformChangeInfo.java @@ -36,7 +36,10 @@ import java.io.IOException; import java.util.List; -/** The TableInfo applies to cache schema change and fieldGetters. */ +/** + * PreTransformChangeInfo caches source / pre-transformed schema, source schema field getters, and + * binary record data generator for pre-transform schema. + */ public class PreTransformChangeInfo { private TableId tableId; private Schema sourceSchema; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java index f4a6c41bebe..868583830b5 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java @@ -40,6 +40,7 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -50,10 +51,12 @@ * A data process function that filters out columns which aren't (directly & indirectly) referenced. */ public class PreTransformOperator extends AbstractStreamOperator - implements OneInputStreamOperator { + implements OneInputStreamOperator, Serializable { + + private static final long serialVersionUID = 1L; private final List transformRules; - private transient List transforms; + private transient List transforms; private final Map preTransformChangeInfoMap; private final List> schemaMetadataTransformers; private transient ListState state; @@ -118,7 +121,7 @@ public void open() throws Exception { Selectors selectors = new Selectors.SelectorsBuilder().includeTables(tableInclusions).build(); transforms.add( - new PreTransformers( + new PreTransformer( selectors, TransformProjection.of(projection).orElse(null), TransformFilter.of(filter).orElse(null))); @@ -243,7 +246,7 @@ private CreateTableEvent transformCreateTableEvent(CreateTableEvent createTableE } } - for (PreTransformers transform : transforms) { + for (PreTransformer transform : transforms) { Selectors selectors = transform.getSelectors(); if (selectors.isMatch(tableId) && transform.getProjection().isPresent()) { TransformProjection transformProjection = transform.getProjection().get(); @@ -289,7 +292,7 @@ private Schema transformSchemaMetaData( private DataChangeEvent processDataChangeEvent(DataChangeEvent dataChangeEvent) throws Exception { TableId tableId = dataChangeEvent.tableId(); - for (PreTransformers transform : transforms) { + for (PreTransformer transform : transforms) { Selectors selectors = transform.getSelectors(); if (selectors.isMatch(tableId) && transform.getProjection().isPresent()) { diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformers.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformer.java similarity index 93% rename from flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformers.java rename to flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformer.java index 9df7103b244..305d7f3874f 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformers.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformer.java @@ -23,14 +23,14 @@ import java.util.Optional; -/** Transformation rules used by {@link PreTransformOperator}. */ -public class PreTransformers { +/** Pre-Transformation rule used by {@link PreTransformOperator}. */ +public class PreTransformer { private final Selectors selectors; private final Optional projection; private final Optional filter; - public PreTransformers( + public PreTransformer( Selectors selectors, @Nullable TransformProjection projection, @Nullable TransformFilter filter) { diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java index 67c9b4c2a97..0d7df06ab01 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java @@ -79,7 +79,7 @@ public Object evaluate(BinaryRecordData after, long epochTime) { private Object[] generateParams(BinaryRecordData after, long epochTime) { List params = new ArrayList<>(); - List columns = tableInfo.getInputSchema().getColumns(); + List columns = tableInfo.getPreTransformedSchema().getColumns(); RecordData.FieldGetter[] fieldGetters = tableInfo.getPreTransformedFieldGetters(); for (String originalColumnName : projectionColumn.getOriginalColumnNames()) { switch (originalColumnName) { @@ -118,7 +118,7 @@ private Object[] generateParams(BinaryRecordData after, long epochTime) { private TransformExpressionKey generateTransformExpressionKey() { List argumentNames = new ArrayList<>(); List> paramTypes = new ArrayList<>(); - List columns = tableInfo.getInputSchema().getColumns(); + List columns = tableInfo.getPreTransformedSchema().getColumns(); String scriptExpression = projectionColumn.getScriptExpression(); List originalColumnNames = projectionColumn.getOriginalColumnNames(); for (String originalColumnName : originalColumnNames) { diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java index 6c6782ae821..9bae6a57309 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java @@ -17,11 +17,11 @@ package org.apache.flink.cdc.runtime.operators.transform; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.data.binary.BinaryRecordData; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.runtime.parser.JaninoCompiler; -import org.apache.flink.cdc.runtime.parser.TransformParser; import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter; import org.codehaus.janino.ExpressionEvaluator; @@ -31,6 +31,11 @@ import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.List; +import java.util.stream.Stream; + +import static org.apache.flink.cdc.runtime.parser.TransformParser.DEFAULT_NAMESPACE_NAME; +import static org.apache.flink.cdc.runtime.parser.TransformParser.DEFAULT_SCHEMA_NAME; +import static org.apache.flink.cdc.runtime.parser.TransformParser.DEFAULT_TABLE_NAME; /** The processor of the transform filter. It processes the data change event of matched table. */ public class TransformFilterProcessor { @@ -68,22 +73,52 @@ public boolean process(BinaryRecordData after, long epochTime) { } } + private Tuple2, List>> generateArguments() { + List argNames = new ArrayList<>(); + List> argTypes = new ArrayList<>(); + String scriptExpression = transformFilter.getScriptExpression(); + List columns = tableInfo.getPreTransformedSchema().getColumns(); + List columnNames = transformFilter.getColumnNames(); + for (String columnName : columnNames) { + for (Column column : columns) { + if (column.getName().equals(columnName)) { + if (!argNames.contains(columnName)) { + argNames.add(columnName); + argTypes.add(DataTypeConverter.convertOriginalClass(column.getType())); + } + break; + } + } + } + Stream.of(DEFAULT_NAMESPACE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_TABLE_NAME) + .forEach( + metadataColumn -> { + if (scriptExpression.contains(metadataColumn) + && !argNames.contains(metadataColumn)) { + argNames.add(metadataColumn); + argTypes.add(String.class); + } + }); + return Tuple2.of(argNames, argTypes); + } + private Object[] generateParams(BinaryRecordData after, long epochTime) { List params = new ArrayList<>(); - List columns = tableInfo.getInputSchema().getColumns(); + List columns = tableInfo.getPreTransformedSchema().getColumns(); + + Tuple2, List>> args = generateArguments(); RecordData.FieldGetter[] fieldGetters = tableInfo.getPreTransformedFieldGetters(); - for (String columnName : transformFilter.getColumnNames()) { - if (columnName.equals(TransformParser.DEFAULT_NAMESPACE_NAME)) { - params.add(tableInfo.getNamespace()); - continue; - } - if (columnName.equals(TransformParser.DEFAULT_SCHEMA_NAME)) { - params.add(tableInfo.getSchemaName()); - continue; - } - if (columnName.equals(TransformParser.DEFAULT_TABLE_NAME)) { - params.add(tableInfo.getTableName()); - continue; + for (String columnName : args.f0) { + switch (columnName) { + case DEFAULT_NAMESPACE_NAME: + params.add(tableInfo.getNamespace()); + continue; + case DEFAULT_SCHEMA_NAME: + params.add(tableInfo.getSchemaName()); + continue; + case DEFAULT_TABLE_NAME: + params.add(tableInfo.getTableName()); + continue; } for (int i = 0; i < columns.size(); i++) { Column column = columns.get(i); @@ -101,47 +136,17 @@ private Object[] generateParams(BinaryRecordData after, long epochTime) { } private TransformExpressionKey generateTransformExpressionKey() { - List argumentNames = new ArrayList<>(); - List> paramTypes = new ArrayList<>(); - List columns = tableInfo.getInputSchema().getColumns(); - String scriptExpression = transformFilter.getScriptExpression(); - List columnNames = transformFilter.getColumnNames(); - for (String columnName : columnNames) { - for (Column column : columns) { - if (column.getName().equals(columnName)) { - argumentNames.add(columnName); - paramTypes.add(DataTypeConverter.convertOriginalClass(column.getType())); - break; - } - } - } - if (scriptExpression.contains(TransformParser.DEFAULT_NAMESPACE_NAME) - && !argumentNames.contains(TransformParser.DEFAULT_NAMESPACE_NAME)) { - argumentNames.add(TransformParser.DEFAULT_NAMESPACE_NAME); - paramTypes.add(String.class); - } - - if (scriptExpression.contains(TransformParser.DEFAULT_SCHEMA_NAME) - && !argumentNames.contains(TransformParser.DEFAULT_SCHEMA_NAME)) { - argumentNames.add(TransformParser.DEFAULT_SCHEMA_NAME); - paramTypes.add(String.class); - } - - if (scriptExpression.contains(TransformParser.DEFAULT_TABLE_NAME) - && !argumentNames.contains(TransformParser.DEFAULT_TABLE_NAME)) { - argumentNames.add(TransformParser.DEFAULT_TABLE_NAME); - paramTypes.add(String.class); - } + Tuple2, List>> args = generateArguments(); - argumentNames.add(JaninoCompiler.DEFAULT_TIME_ZONE); - paramTypes.add(String.class); - argumentNames.add(JaninoCompiler.DEFAULT_EPOCH_TIME); - paramTypes.add(Long.class); + args.f0.add(JaninoCompiler.DEFAULT_TIME_ZONE); + args.f1.add(String.class); + args.f0.add(JaninoCompiler.DEFAULT_EPOCH_TIME); + args.f1.add(Long.class); return TransformExpressionKey.of( - JaninoCompiler.loadSystemFunction(scriptExpression), - argumentNames, - paramTypes, + JaninoCompiler.loadSystemFunction(transformFilter.getScriptExpression()), + args.f0, + args.f1, Boolean.class); } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java index e942ad1e1be..307c890fd6d 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java @@ -114,7 +114,7 @@ public BinaryRecordData processData(BinaryRecordData payload, long epochTime) { column.getName(), column.getType(), payload, - postTransformChangeInfo.getInputSchema().getColumns(), + postTransformChangeInfo.getPreTransformedSchema().getColumns(), postTransformChangeInfo.getPreTransformedFieldGetters())); } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java index 0c11005653f..2a14c6affdc 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java @@ -39,13 +39,14 @@ import org.apache.flink.shaded.guava31.com.google.common.cache.CacheLoader; import org.apache.flink.shaded.guava31.com.google.common.cache.LoadingCache; +import java.io.Serializable; import java.time.Duration; import java.util.Optional; /** Operator for processing events from {@link SchemaOperator} before {@link EventPartitioner}. */ @Internal public class PrePartitionOperator extends AbstractStreamOperator - implements OneInputStreamOperator { + implements OneInputStreamOperator, Serializable { private static final long serialVersionUID = 1L; private static final Duration CACHE_EXPIRE_DURATION = Duration.ofDays(1);