Skip to content

Commit

Permalink
Fix transformed schema merging logic & Fix missing rewrite in CASE WH…
Browse files Browse the repository at this point in the history
…EN statement & Remove unused `containFilteredComputedColumn` field
  • Loading branch information
yuxiqian committed May 11, 2024
1 parent 07324de commit eb1dac3
Show file tree
Hide file tree
Showing 6 changed files with 449 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.cdc.common.utils;

import org.apache.flink.cdc.common.annotation.PublicEvolving;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
Expand All @@ -26,14 +27,21 @@
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypeFamily;
import org.apache.flink.cdc.common.types.DataTypeRoot;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.DecimalType;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/** Utils for {@link Schema} to perform the ability of evolution. */
@PublicEvolving
Expand Down Expand Up @@ -72,6 +80,164 @@ public static List<Object> restoreOriginalData(
return actualFields;
}

/** Merge compatible upstream schemas. */
public static Schema mergeCompatibleSchemas(List<Schema> schemas) {
if (schemas.isEmpty()) {
return null;
} else if (schemas.size() == 1) {
return schemas.get(0);
} else {
Schema outputSchema = null;
for (Schema schema : schemas) {
outputSchema = mergeSchema(outputSchema, schema);
}
return outputSchema;
}
}

/** Try to combine two schemas with potential incompatible type. */
@VisibleForTesting
public static Schema mergeSchema(@Nullable Schema lhs, Schema rhs) {
if (lhs == null) {
return rhs;
}
if (lhs.getColumnCount() != rhs.getColumnCount()) {
throw new IllegalStateException(
String.format(
"Unable to merge schema %s and %s with different column counts.",
lhs, rhs));
}
if (!lhs.primaryKeys().equals(rhs.primaryKeys())) {
throw new IllegalStateException(
String.format(
"Unable to merge schema %s and %s with different primary keys.",
lhs, rhs));
}
if (!lhs.partitionKeys().equals(rhs.partitionKeys())) {
throw new IllegalStateException(
String.format(
"Unable to merge schema %s and %s with different partition keys.",
lhs, rhs));
}
if (!lhs.options().equals(rhs.options())) {
throw new IllegalStateException(
String.format(
"Unable to merge schema %s and %s with different options.", lhs, rhs));
}
if (!Objects.equals(lhs.comment(), rhs.comment())) {
throw new IllegalStateException(
String.format(
"Unable to merge schema %s and %s with different comments.", lhs, rhs));
}

List<Column> leftColumns = lhs.getColumns();
List<Column> rightColumns = rhs.getColumns();

List<Column> mergedColumns =
IntStream.range(0, lhs.getColumnCount())
.mapToObj(i -> mergeColumn(leftColumns.get(i), rightColumns.get(i)))
.collect(Collectors.toList());

return lhs.copy(mergedColumns);
}

/** Try to combine two columns with potential incompatible type. */
@VisibleForTesting
public static Column mergeColumn(Column lhs, Column rhs) {
if (!Objects.equals(lhs.getName(), rhs.getName())) {
throw new IllegalStateException(
String.format(
"Unable to merge column %s and %s with different name.", lhs, rhs));
}
if (!Objects.equals(lhs.getComment(), rhs.getComment())) {
throw new IllegalStateException(
String.format(
"Unable to merge column %s and %s with different comments.", lhs, rhs));
}
return lhs.copy(mergeDataType(lhs.getType(), rhs.getType()));
}

/** Try to combine given data types to a compatible wider data type. */
@VisibleForTesting
public static DataType mergeDataType(DataType lhs, DataType rhs) {
// Ignore nullability during data type merge
boolean nullable = lhs.isNullable() || rhs.isNullable();
lhs = lhs.notNull();
rhs = rhs.notNull();

DataType mergedType;
if (lhs.equals(rhs)) {
// identical type
mergedType = rhs;
} else if (lhs.is(DataTypeFamily.INTEGER_NUMERIC)
&& rhs.is(DataTypeFamily.INTEGER_NUMERIC)) {
mergedType = DataTypes.BIGINT();
} else if (lhs.is(DataTypeFamily.CHARACTER_STRING)
&& rhs.is(DataTypeFamily.CHARACTER_STRING)) {
mergedType = DataTypes.STRING();
} else if (lhs.is(DataTypeFamily.APPROXIMATE_NUMERIC)
&& rhs.is(DataTypeFamily.APPROXIMATE_NUMERIC)) {
mergedType = DataTypes.DOUBLE();
} else if (lhs.is(DataTypeRoot.DECIMAL) && rhs.is(DataTypeRoot.DECIMAL)) {
// Merge two decimal types
DecimalType lhsDecimal = (DecimalType) lhs;
DecimalType rhsDecimal = (DecimalType) rhs;
int resultIntDigits =
Math.max(
lhsDecimal.getPrecision() - lhsDecimal.getScale(),
rhsDecimal.getPrecision() - rhsDecimal.getScale());
int resultScale = Math.max(lhsDecimal.getScale(), rhsDecimal.getScale());
mergedType = DataTypes.DECIMAL(resultIntDigits + resultScale, resultScale);
} else if (lhs.is(DataTypeRoot.DECIMAL) && rhs.is(DataTypeFamily.EXACT_NUMERIC)) {
// Merge decimal and int
DecimalType lhsDecimal = (DecimalType) lhs;
mergedType =
DataTypes.DECIMAL(
Math.max(
lhsDecimal.getPrecision(),
lhsDecimal.getScale() + getNumericPrecision(rhs)),
lhsDecimal.getScale());
} else if (rhs.is(DataTypeRoot.DECIMAL) && lhs.is(DataTypeFamily.EXACT_NUMERIC)) {
// Merge decimal and int
DecimalType rhsDecimal = (DecimalType) rhs;
mergedType =
DataTypes.DECIMAL(
Math.max(
rhsDecimal.getPrecision(),
rhsDecimal.getScale() + getNumericPrecision(lhs)),
rhsDecimal.getScale());
} else {
throw new IllegalStateException(
String.format("Incompatible types: \"%s\" and \"%s\"", lhs, rhs));
}

if (nullable) {
return mergedType.nullable();
} else {
return mergedType.notNull();
}
}

@VisibleForTesting
public static int getNumericPrecision(DataType dataType) {
if (dataType.is(DataTypeFamily.EXACT_NUMERIC)) {
if (dataType.is(DataTypeRoot.TINYINT)) {
return 3;
} else if (dataType.is(DataTypeRoot.SMALLINT)) {
return 5;
} else if (dataType.is(DataTypeRoot.INTEGER)) {
return 10;
} else if (dataType.is(DataTypeRoot.BIGINT)) {
return 19;
} else if (dataType.is(DataTypeRoot.DECIMAL)) {
return ((DecimalType) dataType).getPrecision();
}
}

throw new IllegalArgumentException(
"Failed to get precision of non-exact decimal type " + dataType);
}

/** apply SchemaChangeEvent to the old schema and return the schema after changing. */
public static Schema applySchemaChangeEvent(Schema schema, SchemaChangeEvent event) {
if (event instanceof AddColumnEvent) {
Expand Down
Loading

0 comments on commit eb1dac3

Please sign in to comment.