From cb119cdf608df06ddc5d434e92e85a37bf639f6b Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Mon, 1 Jul 2024 10:05:22 +0800 Subject: [PATCH] Address comments --- .../flink/cdc/common/utils/SchemaUtils.java | 94 ++++++++-------- .../cdc/pipeline/tests/RouteE2eITCase.java | 3 +- .../transform/PostTransformProcessor.java | 104 +++++++----------- .../transform/PreTransformProcessor.java | 49 +++++++-- 4 files changed, 125 insertions(+), 125 deletions(-) diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java index ea4ce46e337..3d0565df98b 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java @@ -97,118 +97,122 @@ public static Schema mergeCompatibleSchemas(List schemas) { /** Try to combine two schemas with potential incompatible type. */ @VisibleForTesting - public static Schema mergeSchema(@Nullable Schema lhs, Schema rhs) { - if (lhs == null) { - return rhs; + public static Schema mergeSchema(@Nullable Schema lSchema, Schema rSchema) { + if (lSchema == null) { + return rSchema; } - if (lhs.getColumnCount() != rhs.getColumnCount()) { + if (lSchema.getColumnCount() != rSchema.getColumnCount()) { throw new IllegalStateException( String.format( "Unable to merge schema %s and %s with different column counts.", - lhs, rhs)); + lSchema, rSchema)); } - if (!lhs.primaryKeys().equals(rhs.primaryKeys())) { + if (!lSchema.primaryKeys().equals(rSchema.primaryKeys())) { throw new IllegalStateException( String.format( "Unable to merge schema %s and %s with different primary keys.", - lhs, rhs)); + lSchema, rSchema)); } - if (!lhs.partitionKeys().equals(rhs.partitionKeys())) { + if (!lSchema.partitionKeys().equals(rSchema.partitionKeys())) { throw new IllegalStateException( String.format( "Unable to merge schema %s and %s with different partition keys.", - lhs, rhs)); + lSchema, rSchema)); } - if (!lhs.options().equals(rhs.options())) { + if (!lSchema.options().equals(rSchema.options())) { throw new IllegalStateException( String.format( - "Unable to merge schema %s and %s with different options.", lhs, rhs)); + "Unable to merge schema %s and %s with different options.", + lSchema, rSchema)); } - if (!Objects.equals(lhs.comment(), rhs.comment())) { + if (!Objects.equals(lSchema.comment(), rSchema.comment())) { throw new IllegalStateException( String.format( - "Unable to merge schema %s and %s with different comments.", lhs, rhs)); + "Unable to merge schema %s and %s with different comments.", + lSchema, rSchema)); } - List leftColumns = lhs.getColumns(); - List rightColumns = rhs.getColumns(); + List leftColumns = lSchema.getColumns(); + List rightColumns = rSchema.getColumns(); List mergedColumns = - IntStream.range(0, lhs.getColumnCount()) + IntStream.range(0, lSchema.getColumnCount()) .mapToObj(i -> mergeColumn(leftColumns.get(i), rightColumns.get(i))) .collect(Collectors.toList()); - return lhs.copy(mergedColumns); + return lSchema.copy(mergedColumns); } /** Try to combine two columns with potential incompatible type. */ @VisibleForTesting - public static Column mergeColumn(Column lhs, Column rhs) { - if (!Objects.equals(lhs.getName(), rhs.getName())) { + public static Column mergeColumn(Column lColumn, Column rColumn) { + if (!Objects.equals(lColumn.getName(), rColumn.getName())) { throw new IllegalStateException( String.format( - "Unable to merge column %s and %s with different name.", lhs, rhs)); + "Unable to merge column %s and %s with different name.", + lColumn, rColumn)); } - if (!Objects.equals(lhs.getComment(), rhs.getComment())) { + if (!Objects.equals(lColumn.getComment(), rColumn.getComment())) { throw new IllegalStateException( String.format( - "Unable to merge column %s and %s with different comments.", lhs, rhs)); + "Unable to merge column %s and %s with different comments.", + lColumn, rColumn)); } - return lhs.copy(mergeDataType(lhs.getType(), rhs.getType())); + return lColumn.copy(mergeDataType(lColumn.getType(), rColumn.getType())); } /** Try to combine given data types to a compatible wider data type. */ @VisibleForTesting - public static DataType mergeDataType(DataType lhs, DataType rhs) { + public static DataType mergeDataType(DataType lType, DataType rType) { // Ignore nullability during data type merge - boolean nullable = lhs.isNullable() || rhs.isNullable(); - lhs = lhs.notNull(); - rhs = rhs.notNull(); + boolean nullable = lType.isNullable() || rType.isNullable(); + lType = lType.notNull(); + rType = rType.notNull(); DataType mergedType; - if (lhs.equals(rhs)) { + if (lType.equals(rType)) { // identical type - mergedType = rhs; - } else if (lhs.is(DataTypeFamily.INTEGER_NUMERIC) - && rhs.is(DataTypeFamily.INTEGER_NUMERIC)) { + mergedType = rType; + } else if (lType.is(DataTypeFamily.INTEGER_NUMERIC) + && rType.is(DataTypeFamily.INTEGER_NUMERIC)) { mergedType = DataTypes.BIGINT(); - } else if (lhs.is(DataTypeFamily.CHARACTER_STRING) - && rhs.is(DataTypeFamily.CHARACTER_STRING)) { + } else if (lType.is(DataTypeFamily.CHARACTER_STRING) + && rType.is(DataTypeFamily.CHARACTER_STRING)) { mergedType = DataTypes.STRING(); - } else if (lhs.is(DataTypeFamily.APPROXIMATE_NUMERIC) - && rhs.is(DataTypeFamily.APPROXIMATE_NUMERIC)) { + } else if (lType.is(DataTypeFamily.APPROXIMATE_NUMERIC) + && rType.is(DataTypeFamily.APPROXIMATE_NUMERIC)) { mergedType = DataTypes.DOUBLE(); - } else if (lhs.is(DataTypeRoot.DECIMAL) && rhs.is(DataTypeRoot.DECIMAL)) { + } else if (lType.is(DataTypeRoot.DECIMAL) && rType.is(DataTypeRoot.DECIMAL)) { // Merge two decimal types - DecimalType lhsDecimal = (DecimalType) lhs; - DecimalType rhsDecimal = (DecimalType) rhs; + DecimalType lhsDecimal = (DecimalType) lType; + DecimalType rhsDecimal = (DecimalType) rType; int resultIntDigits = Math.max( lhsDecimal.getPrecision() - lhsDecimal.getScale(), rhsDecimal.getPrecision() - rhsDecimal.getScale()); int resultScale = Math.max(lhsDecimal.getScale(), rhsDecimal.getScale()); mergedType = DataTypes.DECIMAL(resultIntDigits + resultScale, resultScale); - } else if (lhs.is(DataTypeRoot.DECIMAL) && rhs.is(DataTypeFamily.EXACT_NUMERIC)) { + } else if (lType.is(DataTypeRoot.DECIMAL) && rType.is(DataTypeFamily.EXACT_NUMERIC)) { // Merge decimal and int - DecimalType lhsDecimal = (DecimalType) lhs; + DecimalType lhsDecimal = (DecimalType) lType; mergedType = DataTypes.DECIMAL( Math.max( lhsDecimal.getPrecision(), - lhsDecimal.getScale() + getNumericPrecision(rhs)), + lhsDecimal.getScale() + getNumericPrecision(rType)), lhsDecimal.getScale()); - } else if (rhs.is(DataTypeRoot.DECIMAL) && lhs.is(DataTypeFamily.EXACT_NUMERIC)) { + } else if (rType.is(DataTypeRoot.DECIMAL) && lType.is(DataTypeFamily.EXACT_NUMERIC)) { // Merge decimal and int - DecimalType rhsDecimal = (DecimalType) rhs; + DecimalType rhsDecimal = (DecimalType) rType; mergedType = DataTypes.DECIMAL( Math.max( rhsDecimal.getPrecision(), - rhsDecimal.getScale() + getNumericPrecision(lhs)), + rhsDecimal.getScale() + getNumericPrecision(lType)), rhsDecimal.getScale()); } else { throw new IllegalStateException( - String.format("Incompatible types: \"%s\" and \"%s\"", lhs, rhs)); + String.format("Incompatible types: \"%s\" and \"%s\"", lType, rType)); } if (nullable) { diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java index 4ff001e79ed..92f0c5ffad3 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java @@ -22,7 +22,6 @@ import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment; -import org.apache.flink.cdc.runtime.operators.transform.TransformSchemaOperator; import org.junit.After; import org.junit.Assert; @@ -43,7 +42,7 @@ import java.time.Duration; import java.util.concurrent.TimeoutException; -/** E2e tests for the {@link TransformSchemaOperator}. */ +/** E2e tests for routing features. */ @RunWith(Parameterized.class) public class RouteE2eITCase extends PipelineTestEnvironment { private static final Logger LOG = LoggerFactory.getLogger(RouteE2eITCase.class); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformProcessor.java index 7b533aa7317..049031f48ab 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformProcessor.java @@ -31,6 +31,7 @@ import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -56,6 +57,7 @@ public class PostTransformProcessor { private @Nullable TransformFilter transformFilter; private String timezone; private Map projectionColumnProcessorMap; + private Map cachedProjectionColumns; public PostTransformProcessor( TableInfo tableInfo, @@ -69,6 +71,7 @@ public PostTransformProcessor( this.transformFilter = transformFilter; this.timezone = timezone; this.projectionColumnProcessorMap = new ConcurrentHashMap<>(); + this.cachedProjectionColumns = cacheProjectionColumnMap(tableInfo, transformProjection); } public boolean hasTableChangeInfo() { @@ -112,82 +115,31 @@ public Schema processSchemaChangeEvent(Schema schema) { .collect(Collectors.toList())); } - public BinaryRecordData processData(BinaryRecordData after, long epochTime) { + public BinaryRecordData processData(BinaryRecordData payload, long epochTime) { List valueList = new ArrayList<>(); for (Column column : tableInfo.getSchema().getColumns()) { - boolean isProjectionColumn = false; - for (ProjectionColumn projectionColumn : transformProjection.getProjectionColumns()) { - if (column.getName().equals(projectionColumn.getColumnName()) - && projectionColumn.isValidTransformedProjectionColumn()) { - if (!projectionColumnProcessorMap.containsKey( - projectionColumn.getColumnName())) { - projectionColumnProcessorMap.put( - projectionColumn.getColumnName(), - ProjectionColumnProcessor.of( - tableInfo, projectionColumn, timezone)); - } - ProjectionColumnProcessor projectionColumnProcessor = - projectionColumnProcessorMap.get(projectionColumn.getColumnName()); - valueList.add( - DataTypeConverter.convert( - projectionColumnProcessor.evaluate(after, epochTime), - projectionColumn.getDataType())); - isProjectionColumn = true; - break; - } - } - if (!isProjectionColumn) { + ProjectionColumn projectionColumn = cachedProjectionColumns.get(column.getName()); + if (cachedProjectionColumns.containsKey(column.getName())) { + projectionColumnProcessorMap.putIfAbsent( + projectionColumn.getColumnName(), + ProjectionColumnProcessor.of(tableInfo, projectionColumn, timezone)); + ProjectionColumnProcessor projectionColumnProcessor = + projectionColumnProcessorMap.get(projectionColumn.getColumnName()); + valueList.add( + DataTypeConverter.convert( + projectionColumnProcessor.evaluate(payload, epochTime), + projectionColumn.getDataType())); + } else { valueList.add( getValueFromBinaryRecordData( column.getName(), column.getType(), - after, + payload, tableInfo.getOriginalSchema().getColumns(), tableInfo.getOriginalFieldGetters())); } } - return tableInfo - .getRecordDataGenerator() - .generate(valueList.toArray(new Object[valueList.size()])); - } - - public BinaryRecordData preProcessData(BinaryRecordData after, long epochTime) { - List valueList = new ArrayList<>(); - for (Column column : tableInfo.getSchema().getColumns()) { - boolean isProjectionColumn = false; - for (ProjectionColumn projectionColumn : transformProjection.getProjectionColumns()) { - if (column.getName().equals(projectionColumn.getColumnName()) - && projectionColumn.isValidTransformedProjectionColumn()) { - if (!projectionColumnProcessorMap.containsKey( - projectionColumn.getColumnName())) { - projectionColumnProcessorMap.put( - projectionColumn.getColumnName(), - ProjectionColumnProcessor.of( - tableInfo, projectionColumn, timezone)); - } - ProjectionColumnProcessor projectionColumnProcessor = - projectionColumnProcessorMap.get(projectionColumn.getColumnName()); - valueList.add( - DataTypeConverter.convert( - projectionColumnProcessor.evaluate(after, epochTime), - projectionColumn.getDataType())); - isProjectionColumn = true; - break; - } - } - if (!isProjectionColumn) { - valueList.add( - getValueFromBinaryRecordData( - column.getName(), - column.getType(), - after, - tableInfo.getSchema().getColumns(), - tableInfo.getFieldGetters())); - } - } - return tableInfo - .getRecordDataGenerator() - .generate(valueList.toArray(new Object[valueList.size()])); + return tableInfo.getRecordDataGenerator().generate(valueList.toArray(new Object[0])); } private Object getValueFromBinaryRecordData( @@ -204,4 +156,24 @@ private Object getValueFromBinaryRecordData( } return null; } + + private Map cacheProjectionColumnMap( + TableInfo tableInfo, TransformProjection transformProjection) { + Map cachedMap = new HashMap<>(); + if (!hasTableInfo()) { + return cachedMap; + } + + for (Column column : tableInfo.getSchema().getColumns()) { + for (ProjectionColumn projectionColumn : transformProjection.getProjectionColumns()) { + if (column.getName().equals(projectionColumn.getColumnName()) + && projectionColumn.isValidTransformedProjectionColumn()) { + cachedMap.put(column.getName(), projectionColumn); + break; + } + } + } + + return cachedMap; + } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformProcessor.java index cd985dce731..be6956ebb6c 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformProcessor.java @@ -28,7 +28,9 @@ import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; /** * The processor of pre-transform projection in {@link PreTransformOperator}. @@ -45,6 +47,7 @@ public class PreTransformProcessor { private TableChangeInfo tableChangeInfo; private TransformProjection transformProjection; private @Nullable TransformFilter transformFilter; + private Set cachedProjectionColumns; public PreTransformProcessor( TableChangeInfo tableChangeInfo, @@ -53,12 +56,20 @@ public PreTransformProcessor( this.tableChangeInfo = tableChangeInfo; this.transformProjection = transformProjection; this.transformFilter = transformFilter; + this.cachedProjectionColumns = + cacheIsProjectionColumnMap(tableChangeInfo, transformProjection); } public boolean hasTableChangeInfo() { return this.tableChangeInfo != null; } + /** + * This method analyses (directly and indirectly) referenced columns, and peels unused columns + * from schema. For example, given original schema with columns (A, B, C, D, E) with projection + * rule (A, B + 1 as newB) and filtering rule (C > 0), a peeled schema containing (A, B, C) only + * will be sent to downstream, and (D, E) column along with corresponding data will be trimmed. + */ public CreateTableEvent preTransformCreateTableEvent(CreateTableEvent createTableEvent) { List preTransformColumns = TransformParser.generateReferencedColumns( @@ -73,15 +84,11 @@ public BinaryRecordData processFillDataField(BinaryRecordData data) { List valueList = new ArrayList<>(); for (Column column : tableChangeInfo.getTransformedSchema().getColumns()) { boolean isProjectionColumn = false; - for (ProjectionColumn projectionColumn : transformProjection.getProjectionColumns()) { - if (column.getName().equals(projectionColumn.getColumnName()) - && projectionColumn.isValidTransformedProjectionColumn()) { - valueList.add(null); - isProjectionColumn = true; - break; - } - } - if (!isProjectionColumn) { + if (cachedProjectionColumns.contains(column.getName())) { + valueList.add(null); + isProjectionColumn = true; + break; + } else { valueList.add( getValueFromBinaryRecordData( column.getName(), @@ -90,9 +97,7 @@ public BinaryRecordData processFillDataField(BinaryRecordData data) { tableChangeInfo.getFieldGetters())); } } - return tableChangeInfo - .getRecordDataGenerator() - .generate(valueList.toArray(new Object[valueList.size()])); + return tableChangeInfo.getRecordDataGenerator().generate(valueList.toArray(new Object[0])); } private Object getValueFromBinaryRecordData( @@ -108,4 +113,24 @@ private Object getValueFromBinaryRecordData( } return null; } + + private Set cacheIsProjectionColumnMap( + TableChangeInfo tableChangeInfo, TransformProjection transformProjection) { + Set cachedMap = new HashSet<>(); + if (!hasTableChangeInfo()) { + return cachedMap; + } + + for (Column column : tableChangeInfo.getTransformedSchema().getColumns()) { + for (ProjectionColumn projectionColumn : transformProjection.getProjectionColumns()) { + if (column.getName().equals(projectionColumn.getColumnName()) + && projectionColumn.isValidTransformedProjectionColumn()) { + cachedMap.add(column.getName()); + break; + } + } + } + + return cachedMap; + } }