From eb1dac388c66c8501e8b5cea31e2d325f7f9aa07 Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Sat, 11 May 2024 12:14:49 +0800 Subject: [PATCH] Fix transformed schema merging logic & Fix missing rewrite in CASE WHEN statement & Remove unused `containFilteredComputedColumn` field --- .../flink/cdc/common/utils/SchemaUtils.java | 166 +++++++++++ .../cdc/common/utils/SchemaUtilsTest.java | 268 ++++++++++++++++++ .../transform/PostTransformOperator.java | 36 +-- .../transform/PostTransformProcessor.java | 6 +- .../operators/transform/PostTransformers.java | 10 +- .../cdc/runtime/parser/TransformParser.java | 13 +- 6 files changed, 449 insertions(+), 50 deletions(-) diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java index 5c0d681a063..ea4ce46e337 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java @@ -18,6 +18,7 @@ package org.apache.flink.cdc.common.utils; import org.apache.flink.cdc.common.annotation.PublicEvolving; +import org.apache.flink.cdc.common.annotation.VisibleForTesting; import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.event.AddColumnEvent; import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; @@ -26,6 +27,11 @@ import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypeFamily; +import org.apache.flink.cdc.common.types.DataTypeRoot; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.common.types.DecimalType; import javax.annotation.Nullable; @@ -33,7 +39,9 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; +import java.util.stream.IntStream; /** Utils for {@link Schema} to perform the ability of evolution. */ @PublicEvolving @@ -72,6 +80,164 @@ public static List restoreOriginalData( return actualFields; } + /** Merge compatible upstream schemas. */ + public static Schema mergeCompatibleSchemas(List schemas) { + if (schemas.isEmpty()) { + return null; + } else if (schemas.size() == 1) { + return schemas.get(0); + } else { + Schema outputSchema = null; + for (Schema schema : schemas) { + outputSchema = mergeSchema(outputSchema, schema); + } + return outputSchema; + } + } + + /** Try to combine two schemas with potential incompatible type. */ + @VisibleForTesting + public static Schema mergeSchema(@Nullable Schema lhs, Schema rhs) { + if (lhs == null) { + return rhs; + } + if (lhs.getColumnCount() != rhs.getColumnCount()) { + throw new IllegalStateException( + String.format( + "Unable to merge schema %s and %s with different column counts.", + lhs, rhs)); + } + if (!lhs.primaryKeys().equals(rhs.primaryKeys())) { + throw new IllegalStateException( + String.format( + "Unable to merge schema %s and %s with different primary keys.", + lhs, rhs)); + } + if (!lhs.partitionKeys().equals(rhs.partitionKeys())) { + throw new IllegalStateException( + String.format( + "Unable to merge schema %s and %s with different partition keys.", + lhs, rhs)); + } + if (!lhs.options().equals(rhs.options())) { + throw new IllegalStateException( + String.format( + "Unable to merge schema %s and %s with different options.", lhs, rhs)); + } + if (!Objects.equals(lhs.comment(), rhs.comment())) { + throw new IllegalStateException( + String.format( + "Unable to merge schema %s and %s with different comments.", lhs, rhs)); + } + + List leftColumns = lhs.getColumns(); + List rightColumns = rhs.getColumns(); + + List mergedColumns = + IntStream.range(0, lhs.getColumnCount()) + .mapToObj(i -> mergeColumn(leftColumns.get(i), rightColumns.get(i))) + .collect(Collectors.toList()); + + return lhs.copy(mergedColumns); + } + + /** Try to combine two columns with potential incompatible type. */ + @VisibleForTesting + public static Column mergeColumn(Column lhs, Column rhs) { + if (!Objects.equals(lhs.getName(), rhs.getName())) { + throw new IllegalStateException( + String.format( + "Unable to merge column %s and %s with different name.", lhs, rhs)); + } + if (!Objects.equals(lhs.getComment(), rhs.getComment())) { + throw new IllegalStateException( + String.format( + "Unable to merge column %s and %s with different comments.", lhs, rhs)); + } + return lhs.copy(mergeDataType(lhs.getType(), rhs.getType())); + } + + /** Try to combine given data types to a compatible wider data type. */ + @VisibleForTesting + public static DataType mergeDataType(DataType lhs, DataType rhs) { + // Ignore nullability during data type merge + boolean nullable = lhs.isNullable() || rhs.isNullable(); + lhs = lhs.notNull(); + rhs = rhs.notNull(); + + DataType mergedType; + if (lhs.equals(rhs)) { + // identical type + mergedType = rhs; + } else if (lhs.is(DataTypeFamily.INTEGER_NUMERIC) + && rhs.is(DataTypeFamily.INTEGER_NUMERIC)) { + mergedType = DataTypes.BIGINT(); + } else if (lhs.is(DataTypeFamily.CHARACTER_STRING) + && rhs.is(DataTypeFamily.CHARACTER_STRING)) { + mergedType = DataTypes.STRING(); + } else if (lhs.is(DataTypeFamily.APPROXIMATE_NUMERIC) + && rhs.is(DataTypeFamily.APPROXIMATE_NUMERIC)) { + mergedType = DataTypes.DOUBLE(); + } else if (lhs.is(DataTypeRoot.DECIMAL) && rhs.is(DataTypeRoot.DECIMAL)) { + // Merge two decimal types + DecimalType lhsDecimal = (DecimalType) lhs; + DecimalType rhsDecimal = (DecimalType) rhs; + int resultIntDigits = + Math.max( + lhsDecimal.getPrecision() - lhsDecimal.getScale(), + rhsDecimal.getPrecision() - rhsDecimal.getScale()); + int resultScale = Math.max(lhsDecimal.getScale(), rhsDecimal.getScale()); + mergedType = DataTypes.DECIMAL(resultIntDigits + resultScale, resultScale); + } else if (lhs.is(DataTypeRoot.DECIMAL) && rhs.is(DataTypeFamily.EXACT_NUMERIC)) { + // Merge decimal and int + DecimalType lhsDecimal = (DecimalType) lhs; + mergedType = + DataTypes.DECIMAL( + Math.max( + lhsDecimal.getPrecision(), + lhsDecimal.getScale() + getNumericPrecision(rhs)), + lhsDecimal.getScale()); + } else if (rhs.is(DataTypeRoot.DECIMAL) && lhs.is(DataTypeFamily.EXACT_NUMERIC)) { + // Merge decimal and int + DecimalType rhsDecimal = (DecimalType) rhs; + mergedType = + DataTypes.DECIMAL( + Math.max( + rhsDecimal.getPrecision(), + rhsDecimal.getScale() + getNumericPrecision(lhs)), + rhsDecimal.getScale()); + } else { + throw new IllegalStateException( + String.format("Incompatible types: \"%s\" and \"%s\"", lhs, rhs)); + } + + if (nullable) { + return mergedType.nullable(); + } else { + return mergedType.notNull(); + } + } + + @VisibleForTesting + public static int getNumericPrecision(DataType dataType) { + if (dataType.is(DataTypeFamily.EXACT_NUMERIC)) { + if (dataType.is(DataTypeRoot.TINYINT)) { + return 3; + } else if (dataType.is(DataTypeRoot.SMALLINT)) { + return 5; + } else if (dataType.is(DataTypeRoot.INTEGER)) { + return 10; + } else if (dataType.is(DataTypeRoot.BIGINT)) { + return 19; + } else if (dataType.is(DataTypeRoot.DECIMAL)) { + return ((DecimalType) dataType).getPrecision(); + } + } + + throw new IllegalArgumentException( + "Failed to get precision of non-exact decimal type " + dataType); + } + /** apply SchemaChangeEvent to the old schema and return the schema after changing. */ public static Schema applySchemaChangeEvent(Schema schema, SchemaChangeEvent event) { if (event instanceof AddColumnEvent) { diff --git a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java index 83fb358b2ce..126ef90b6c0 100644 --- a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java +++ b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java @@ -163,4 +163,272 @@ public void testApplySchemaChangeEvent() { .physicalColumn("newCol4", DataTypes.VARCHAR(10)) .build()); } + + @Test + public void testGetNumericPrecision() { + Assert.assertEquals(SchemaUtils.getNumericPrecision(DataTypes.TINYINT()), 3); + Assert.assertEquals(SchemaUtils.getNumericPrecision(DataTypes.SMALLINT()), 5); + Assert.assertEquals(SchemaUtils.getNumericPrecision(DataTypes.INT()), 10); + Assert.assertEquals(SchemaUtils.getNumericPrecision(DataTypes.BIGINT()), 19); + Assert.assertEquals(SchemaUtils.getNumericPrecision(DataTypes.DECIMAL(10, 2)), 10); + Assert.assertEquals(SchemaUtils.getNumericPrecision(DataTypes.DECIMAL(17, 0)), 17); + Assert.assertThrows( + IllegalArgumentException.class, + () -> SchemaUtils.getNumericPrecision(DataTypes.STRING())); + } + + @Test + public void testMergeDataType() { + Assert.assertEquals( + SchemaUtils.mergeDataType(DataTypes.BINARY(17), DataTypes.BINARY(17)), + DataTypes.BINARY(17)); + Assert.assertEquals( + SchemaUtils.mergeDataType(DataTypes.VARBINARY(17), DataTypes.VARBINARY(17)), + DataTypes.VARBINARY(17)); + Assert.assertEquals( + SchemaUtils.mergeDataType(DataTypes.BYTES(), DataTypes.BYTES()), DataTypes.BYTES()); + Assert.assertEquals( + SchemaUtils.mergeDataType(DataTypes.BOOLEAN(), DataTypes.BOOLEAN()), + DataTypes.BOOLEAN()); + Assert.assertEquals( + SchemaUtils.mergeDataType(DataTypes.INT(), DataTypes.INT()), DataTypes.INT()); + Assert.assertEquals( + SchemaUtils.mergeDataType(DataTypes.TINYINT(), DataTypes.TINYINT()), + DataTypes.TINYINT()); + Assert.assertEquals( + SchemaUtils.mergeDataType(DataTypes.SMALLINT(), DataTypes.SMALLINT()), + DataTypes.SMALLINT()); + Assert.assertEquals( + SchemaUtils.mergeDataType(DataTypes.BIGINT(), DataTypes.BIGINT()), + DataTypes.BIGINT()); + Assert.assertEquals( + SchemaUtils.mergeDataType(DataTypes.FLOAT(), DataTypes.FLOAT()), DataTypes.FLOAT()); + Assert.assertEquals( + SchemaUtils.mergeDataType(DataTypes.DOUBLE(), DataTypes.DOUBLE()), + DataTypes.DOUBLE()); + Assert.assertEquals( + SchemaUtils.mergeDataType(DataTypes.CHAR(17), DataTypes.CHAR(17)), + DataTypes.CHAR(17)); + Assert.assertEquals( + SchemaUtils.mergeDataType(DataTypes.VARCHAR(17), DataTypes.VARCHAR(17)), + DataTypes.VARCHAR(17)); + Assert.assertEquals( + SchemaUtils.mergeDataType(DataTypes.STRING(), DataTypes.STRING()), + DataTypes.STRING()); + Assert.assertEquals( + SchemaUtils.mergeDataType(DataTypes.DECIMAL(17, 7), DataTypes.DECIMAL(17, 7)), + DataTypes.DECIMAL(17, 7)); + Assert.assertEquals( + SchemaUtils.mergeDataType(DataTypes.DATE(), DataTypes.DATE()), DataTypes.DATE()); + Assert.assertEquals( + SchemaUtils.mergeDataType(DataTypes.TIME(), DataTypes.TIME()), DataTypes.TIME()); + Assert.assertEquals( + SchemaUtils.mergeDataType(DataTypes.TIME(6), DataTypes.TIME(6)), DataTypes.TIME(6)); + Assert.assertEquals( + SchemaUtils.mergeDataType(DataTypes.TIMESTAMP(), DataTypes.TIMESTAMP()), + DataTypes.TIMESTAMP()); + Assert.assertEquals( + SchemaUtils.mergeDataType(DataTypes.TIMESTAMP(3), DataTypes.TIMESTAMP(3)), + DataTypes.TIMESTAMP(3)); + Assert.assertEquals( + SchemaUtils.mergeDataType(DataTypes.TIMESTAMP_TZ(), DataTypes.TIMESTAMP_TZ()), + DataTypes.TIMESTAMP_TZ()); + Assert.assertEquals( + SchemaUtils.mergeDataType(DataTypes.TIMESTAMP_TZ(3), DataTypes.TIMESTAMP_TZ(3)), + DataTypes.TIMESTAMP_TZ(3)); + Assert.assertEquals( + SchemaUtils.mergeDataType(DataTypes.TIMESTAMP_LTZ(), DataTypes.TIMESTAMP_LTZ()), + DataTypes.TIMESTAMP_LTZ()); + Assert.assertEquals( + SchemaUtils.mergeDataType(DataTypes.TIMESTAMP_LTZ(3), DataTypes.TIMESTAMP_LTZ(3)), + DataTypes.TIMESTAMP_LTZ(3)); + Assert.assertEquals( + SchemaUtils.mergeDataType( + DataTypes.ARRAY(DataTypes.INT()), DataTypes.ARRAY(DataTypes.INT())), + DataTypes.ARRAY(DataTypes.INT())); + Assert.assertEquals( + SchemaUtils.mergeDataType( + DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()), + DataTypes.MAP(DataTypes.INT(), DataTypes.STRING())), + DataTypes.MAP(DataTypes.INT(), DataTypes.STRING())); + + // Test compatible widening cast + Assert.assertEquals( + SchemaUtils.mergeDataType(DataTypes.INT(), DataTypes.BIGINT()), DataTypes.BIGINT()); + Assert.assertEquals( + SchemaUtils.mergeDataType(DataTypes.VARCHAR(17), DataTypes.STRING()), + DataTypes.STRING()); + Assert.assertEquals( + SchemaUtils.mergeDataType(DataTypes.FLOAT(), DataTypes.DOUBLE()), + DataTypes.DOUBLE()); + Assert.assertEquals( + SchemaUtils.mergeDataType(DataTypes.INT(), DataTypes.DECIMAL(4, 0)), + DataTypes.DECIMAL(10, 0)); + Assert.assertEquals( + SchemaUtils.mergeDataType(DataTypes.INT(), DataTypes.DECIMAL(10, 5)), + DataTypes.DECIMAL(15, 5)); + Assert.assertEquals( + SchemaUtils.mergeDataType(DataTypes.BIGINT(), DataTypes.DECIMAL(10, 5)), + DataTypes.DECIMAL(24, 5)); + Assert.assertEquals( + SchemaUtils.mergeDataType(DataTypes.DECIMAL(5, 4), DataTypes.DECIMAL(10, 2)), + DataTypes.DECIMAL(12, 4)); + + // Test merging with nullability + Assert.assertEquals( + SchemaUtils.mergeDataType(DataTypes.INT().notNull(), DataTypes.INT().notNull()), + DataTypes.INT().notNull()); + Assert.assertEquals( + SchemaUtils.mergeDataType(DataTypes.INT().nullable(), DataTypes.INT().notNull()), + DataTypes.INT().nullable()); + Assert.assertEquals( + SchemaUtils.mergeDataType(DataTypes.INT().notNull(), DataTypes.INT().nullable()), + DataTypes.INT().nullable()); + Assert.assertEquals( + SchemaUtils.mergeDataType(DataTypes.INT().nullable(), DataTypes.INT().nullable()), + DataTypes.INT().nullable()); + + // incompatible type merges test + Assert.assertThrows( + IllegalStateException.class, + () -> SchemaUtils.mergeDataType(DataTypes.INT(), DataTypes.DOUBLE())); + Assert.assertThrows( + IllegalStateException.class, + () -> SchemaUtils.mergeDataType(DataTypes.DECIMAL(17, 0), DataTypes.DOUBLE())); + Assert.assertThrows( + IllegalStateException.class, + () -> SchemaUtils.mergeDataType(DataTypes.INT(), DataTypes.STRING())); + } + + @Test + public void testMergeColumn() { + // Test normal merges + Assert.assertEquals( + SchemaUtils.mergeColumn( + Column.physicalColumn("Column1", DataTypes.INT()), + Column.physicalColumn("Column1", DataTypes.BIGINT())), + Column.physicalColumn("Column1", DataTypes.BIGINT())); + + Assert.assertEquals( + SchemaUtils.mergeColumn( + Column.physicalColumn("Column2", DataTypes.FLOAT()), + Column.physicalColumn("Column2", DataTypes.DOUBLE())), + Column.physicalColumn("Column2", DataTypes.DOUBLE())); + + // Test merging columns with incompatible types + Assert.assertThrows( + IllegalStateException.class, + () -> + SchemaUtils.mergeColumn( + Column.physicalColumn("Column3", DataTypes.INT()), + Column.physicalColumn("Column3", DataTypes.STRING()))); + + // Test merging with incompatible names + Assert.assertThrows( + IllegalStateException.class, + () -> + SchemaUtils.mergeColumn( + Column.physicalColumn("Column4", DataTypes.INT()), + Column.physicalColumn("AnotherColumn4", DataTypes.INT()))); + } + + @Test + public void testMergeSchema() { + // Test normal merges + Assert.assertEquals( + SchemaUtils.mergeSchema( + Schema.newBuilder() + .physicalColumn("Column1", DataTypes.INT()) + .physicalColumn("Column2", DataTypes.DOUBLE()) + .primaryKey("Column1") + .partitionKey("Column2") + .build(), + Schema.newBuilder() + .physicalColumn("Column1", DataTypes.BIGINT()) + .physicalColumn("Column2", DataTypes.FLOAT()) + .primaryKey("Column1") + .partitionKey("Column2") + .build()), + Schema.newBuilder() + .physicalColumn("Column1", DataTypes.BIGINT()) + .physicalColumn("Column2", DataTypes.DOUBLE()) + .primaryKey("Column1") + .partitionKey("Column2") + .build()); + + // Test merging with incompatible types + Assert.assertThrows( + IllegalStateException.class, + () -> + SchemaUtils.mergeSchema( + Schema.newBuilder() + .physicalColumn("Column1", DataTypes.INT()) + .physicalColumn("Column2", DataTypes.DOUBLE()) + .primaryKey("Column1") + .partitionKey("Column2") + .build(), + Schema.newBuilder() + .physicalColumn("Column1", DataTypes.STRING()) + .physicalColumn("Column2", DataTypes.STRING()) + .primaryKey("Column1") + .partitionKey("Column2") + .build())); + + // Test merging with incompatible column names + Assert.assertThrows( + IllegalStateException.class, + () -> + SchemaUtils.mergeSchema( + Schema.newBuilder() + .physicalColumn("Column1", DataTypes.INT()) + .physicalColumn("Column2", DataTypes.DOUBLE()) + .primaryKey("Column1") + .partitionKey("Column2") + .build(), + Schema.newBuilder() + .physicalColumn("NotColumn1", DataTypes.INT()) + .physicalColumn("NotColumn2", DataTypes.DOUBLE()) + .primaryKey("NotColumn1") + .partitionKey("NotColumn2") + .build())); + + // Test merging with different column counts + Assert.assertThrows( + IllegalStateException.class, + () -> + SchemaUtils.mergeSchema( + Schema.newBuilder() + .physicalColumn("Column1", DataTypes.INT()) + .physicalColumn("Column2", DataTypes.DOUBLE()) + .physicalColumn("Column3", DataTypes.STRING()) + .primaryKey("Column1") + .partitionKey("Column2") + .build(), + Schema.newBuilder() + .physicalColumn("NotColumn1", DataTypes.INT()) + .physicalColumn("NotColumn2", DataTypes.DOUBLE()) + .primaryKey("NotColumn1") + .partitionKey("NotColumn2") + .build())); + + // Test merging with incompatible schema metadata + Assert.assertThrows( + IllegalStateException.class, + () -> + SchemaUtils.mergeSchema( + Schema.newBuilder() + .physicalColumn("Column1", DataTypes.INT()) + .physicalColumn("Column2", DataTypes.DOUBLE()) + .primaryKey("Column1") + .partitionKey("Column2") + .option("Key1", "Value1") + .build(), + Schema.newBuilder() + .physicalColumn("Column1", DataTypes.INT()) + .physicalColumn("Column2", DataTypes.DOUBLE()) + .primaryKey("Column2") + .partitionKey("Column1") + .option("Key2", "Value2") + .build())); + } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java index 217dea0cdb2..2a104cefac6 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java @@ -28,9 +28,7 @@ import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.schema.Selectors; import org.apache.flink.cdc.common.utils.SchemaUtils; -import org.apache.flink.cdc.common.utils.StringUtils; import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient; -import org.apache.flink.cdc.runtime.parser.TransformParser; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.streaming.api.graph.StreamConfig; @@ -163,9 +161,7 @@ public void open() throws Exception { return new PostTransformers( selectors, TransformProjection.of(projection).orElse(null), - TransformFilter.of(filterExpression).orElse(null), - containFilteredComputedColumn( - projection, filterExpression)); + TransformFilter.of(filterExpression).orElse(null)); }) .collect(Collectors.toList()); this.transformProjectionProcessorMap = new ConcurrentHashMap<>(); @@ -264,19 +260,7 @@ private Schema transformSchema(TableId tableId, Schema schema) throws Exception return schema; } - Schema firstSchema = newSchemas.get(0); - newSchemas.stream() - .skip(1) - .forEach( - testSchema -> { - if (!testSchema.equals(firstSchema)) { - throw new IllegalArgumentException( - String.format( - "Incompatible transform rules result found. Inferred schema: %s and %s", - firstSchema, testSchema)); - } - }); - return firstSchema; + return SchemaUtils.mergeCompatibleSchemas(newSchemas); } private Optional processDataChangeEvent(DataChangeEvent dataChangeEvent) @@ -421,22 +405,6 @@ private BinaryRecordData projectRecord(TableInfo tableInfo, BinaryRecordData rec .generate(valueList.toArray(new Object[valueList.size()])); } - private boolean containFilteredComputedColumn(String projection, String filter) { - boolean contain = false; - if (StringUtils.isNullOrWhitespaceOnly(projection) - || StringUtils.isNullOrWhitespaceOnly(filter)) { - return contain; - } - List computedColumnNames = TransformParser.parseComputedColumnNames(projection); - List filteredColumnNames = TransformParser.parseFilterColumnNameList(filter); - for (String computedColumnName : computedColumnNames) { - if (filteredColumnNames.contains(computedColumnName)) { - return true; - } - } - return contain; - } - private void clearOperator() { this.transforms = null; this.transformProjectionProcessorMap = null; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformProcessor.java index 9b382f34e01..7b533aa7317 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformProcessor.java @@ -21,6 +21,7 @@ import org.apache.flink.cdc.common.data.binary.BinaryRecordData; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.runtime.parser.TransformParser; import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter; @@ -139,6 +140,7 @@ public BinaryRecordData processData(BinaryRecordData after, long epochTime) { valueList.add( getValueFromBinaryRecordData( column.getName(), + column.getType(), after, tableInfo.getOriginalSchema().getColumns(), tableInfo.getOriginalFieldGetters())); @@ -177,6 +179,7 @@ public BinaryRecordData preProcessData(BinaryRecordData after, long epochTime) { valueList.add( getValueFromBinaryRecordData( column.getName(), + column.getType(), after, tableInfo.getSchema().getColumns(), tableInfo.getFieldGetters())); @@ -189,13 +192,14 @@ public BinaryRecordData preProcessData(BinaryRecordData after, long epochTime) { private Object getValueFromBinaryRecordData( String columnName, + DataType expectedType, BinaryRecordData binaryRecordData, List columns, RecordData.FieldGetter[] fieldGetters) { for (int i = 0; i < columns.size(); i++) { if (columnName.equals(columns.get(i).getName())) { return DataTypeConverter.convert( - fieldGetters[i].getFieldOrNull(binaryRecordData), columns.get(i).getType()); + fieldGetters[i].getFieldOrNull(binaryRecordData), expectedType); } } return null; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformers.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformers.java index 9f47ca92299..9daea231e70 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformers.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformers.java @@ -30,17 +30,13 @@ public class PostTransformers { private final Optional projection; private final Optional filter; - private final boolean containFilteredComputedColumn; - public PostTransformers( Selectors selectors, @Nullable TransformProjection projection, - @Nullable TransformFilter filter, - boolean containFilteredComputedColumn) { + @Nullable TransformFilter filter) { this.selectors = selectors; this.projection = projection != null ? Optional.of(projection) : Optional.empty(); this.filter = filter != null ? Optional.of(filter) : Optional.empty(); - this.containFilteredComputedColumn = containFilteredComputedColumn; } public Selectors getSelectors() { @@ -54,8 +50,4 @@ public Optional getProjection() { public Optional getFilter() { return filter; } - - public boolean isContainFilteredComputedColumn() { - return containFilteredComputedColumn; - } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java index 0a39dfc61fd..fd8306249ef 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java @@ -40,6 +40,7 @@ import org.apache.calcite.rel.type.RelDataTypeSystem; import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.sql.SqlBasicCall; +import org.apache.calcite.sql.SqlCall; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlNode; @@ -489,16 +490,16 @@ public static SqlSelect parseFilterExpression(String filterExpression) { } public static SqlNode rewriteExpression(SqlNode sqlNode, Map replaceMap) { - if (sqlNode instanceof SqlBasicCall) { - SqlBasicCall sqlBasicCall = (SqlBasicCall) sqlNode; + if (sqlNode instanceof SqlCall) { + SqlCall sqlCall = (SqlCall) sqlNode; - List operands = sqlBasicCall.getOperandList(); - IntStream.range(0, sqlBasicCall.operandCount()) + List operands = sqlCall.getOperandList(); + IntStream.range(0, sqlCall.operandCount()) .forEach( i -> - sqlBasicCall.setOperand( + sqlCall.setOperand( i, rewriteExpression(operands.get(i), replaceMap))); - return sqlBasicCall; + return sqlCall; } else if (sqlNode instanceof SqlIdentifier) { SqlIdentifier sqlIdentifier = (SqlIdentifier) sqlNode; if (sqlIdentifier.names.size() == 1) {