Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35272][cdc][runtime] Pipeline Transform job supports omitting / renaming calculation column #3285

Merged
merged 8 commits into from
Aug 8, 2024
Merged
4 changes: 3 additions & 1 deletion docs/content.zh/docs/core-concept/transform.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,9 @@ transform:
Tips: The format of table-options is `key1=value1,key2=value2`.

## Classification mapping
Multiple transform rules can be defined to classify input data rows and apply different processings. For example, we may define a transform rule as follows:
Multiple transform rules can be defined to classify input data rows and apply different processing.
Only the first matched transform rule will apply.
For example, we may define a transform rule as follows:

```yaml
transform:
Expand Down
4 changes: 3 additions & 1 deletion docs/content/docs/core-concept/transform.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,9 @@ transform:
Tips: The format of table-options is `key1=value1,key2=value2`.

## Classification mapping
Multiple transform rules can be defined to classify input data rows and apply different processings. For example, we may define a transform rule as follows:
Multiple transform rules can be defined to classify input data rows and apply different processing.
Only the first matched transform rule will apply.
For example, we may define a transform rule as follows:

```yaml
transform:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.cdc.common.utils;

import org.apache.flink.cdc.common.annotation.PublicEvolving;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
Expand All @@ -26,11 +27,21 @@
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypeFamily;
import org.apache.flink.cdc.common.types.DataTypeRoot;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.DecimalType;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/** Utils for {@link Schema} to perform the ability of evolution. */
@PublicEvolving
Expand All @@ -56,6 +67,181 @@ public static List<RecordData.FieldGetter> createFieldGetters(List<Column> colum
return fieldGetters;
}

/** Restore original data fields from RecordData structure. */
public static List<Object> restoreOriginalData(
@Nullable RecordData recordData, List<RecordData.FieldGetter> fieldGetters) {
if (recordData == null) {
return Collections.emptyList();
}
List<Object> actualFields = new ArrayList<>();
for (RecordData.FieldGetter fieldGetter : fieldGetters) {
actualFields.add(fieldGetter.getFieldOrNull(recordData));
}
return actualFields;
}

/** Merge compatible upstream schemas. */
public static Schema inferWiderSchema(List<Schema> schemas) {
if (schemas.isEmpty()) {
return null;
} else if (schemas.size() == 1) {
return schemas.get(0);
} else {
Schema outputSchema = null;
for (Schema schema : schemas) {
outputSchema = inferWiderSchema(outputSchema, schema);
}
return outputSchema;
}
}

/** Try to combine two schemas with potential incompatible type. */
@VisibleForTesting
public static Schema inferWiderSchema(@Nullable Schema lSchema, Schema rSchema) {
if (lSchema == null) {
return rSchema;
}
if (lSchema.getColumnCount() != rSchema.getColumnCount()) {
throw new IllegalStateException(
String.format(
"Unable to merge schema %s and %s with different column counts.",
lSchema, rSchema));
}
if (!lSchema.primaryKeys().equals(rSchema.primaryKeys())) {
throw new IllegalStateException(
String.format(
"Unable to merge schema %s and %s with different primary keys.",
lSchema, rSchema));
}
if (!lSchema.partitionKeys().equals(rSchema.partitionKeys())) {
throw new IllegalStateException(
String.format(
"Unable to merge schema %s and %s with different partition keys.",
lSchema, rSchema));
}
if (!lSchema.options().equals(rSchema.options())) {
throw new IllegalStateException(
String.format(
"Unable to merge schema %s and %s with different options.",
lSchema, rSchema));
}
if (!Objects.equals(lSchema.comment(), rSchema.comment())) {
throw new IllegalStateException(
String.format(
"Unable to merge schema %s and %s with different comments.",
lSchema, rSchema));
}

List<Column> leftColumns = lSchema.getColumns();
List<Column> rightColumns = rSchema.getColumns();

List<Column> mergedColumns =
IntStream.range(0, lSchema.getColumnCount())
.mapToObj(i -> inferWiderColumn(leftColumns.get(i), rightColumns.get(i)))
.collect(Collectors.toList());

return lSchema.copy(mergedColumns);
}

/** Try to combine two columns with potential incompatible type. */
@VisibleForTesting
public static Column inferWiderColumn(Column lColumn, Column rColumn) {
if (!Objects.equals(lColumn.getName(), rColumn.getName())) {
throw new IllegalStateException(
String.format(
"Unable to merge column %s and %s with different name.",
lColumn, rColumn));
}
if (!Objects.equals(lColumn.getComment(), rColumn.getComment())) {
throw new IllegalStateException(
String.format(
"Unable to merge column %s and %s with different comments.",
lColumn, rColumn));
}
return lColumn.copy(inferWiderType(lColumn.getType(), rColumn.getType()));
}

/** Try to combine given data types to a compatible wider data type. */
@VisibleForTesting
public static DataType inferWiderType(DataType lType, DataType rType) {
// Ignore nullability during data type merge
boolean nullable = lType.isNullable() || rType.isNullable();
lType = lType.notNull();
rType = rType.notNull();

DataType mergedType;
if (lType.equals(rType)) {
// identical type
mergedType = rType;
} else if (lType.is(DataTypeFamily.INTEGER_NUMERIC)
&& rType.is(DataTypeFamily.INTEGER_NUMERIC)) {
mergedType = DataTypes.BIGINT();
} else if (lType.is(DataTypeFamily.CHARACTER_STRING)
&& rType.is(DataTypeFamily.CHARACTER_STRING)) {
mergedType = DataTypes.STRING();
} else if (lType.is(DataTypeFamily.APPROXIMATE_NUMERIC)
&& rType.is(DataTypeFamily.APPROXIMATE_NUMERIC)) {
mergedType = DataTypes.DOUBLE();
} else if (lType.is(DataTypeRoot.DECIMAL) && rType.is(DataTypeRoot.DECIMAL)) {
// Merge two decimal types
DecimalType lhsDecimal = (DecimalType) lType;
DecimalType rhsDecimal = (DecimalType) rType;
int resultIntDigits =
Math.max(
lhsDecimal.getPrecision() - lhsDecimal.getScale(),
rhsDecimal.getPrecision() - rhsDecimal.getScale());
int resultScale = Math.max(lhsDecimal.getScale(), rhsDecimal.getScale());
mergedType = DataTypes.DECIMAL(resultIntDigits + resultScale, resultScale);
} else if (lType.is(DataTypeRoot.DECIMAL) && rType.is(DataTypeFamily.EXACT_NUMERIC)) {
// Merge decimal and int
DecimalType lhsDecimal = (DecimalType) lType;
mergedType =
DataTypes.DECIMAL(
Math.max(
lhsDecimal.getPrecision(),
lhsDecimal.getScale() + getNumericPrecision(rType)),
lhsDecimal.getScale());
} else if (rType.is(DataTypeRoot.DECIMAL) && lType.is(DataTypeFamily.EXACT_NUMERIC)) {
// Merge decimal and int
DecimalType rhsDecimal = (DecimalType) rType;
mergedType =
DataTypes.DECIMAL(
Math.max(
rhsDecimal.getPrecision(),
rhsDecimal.getScale() + getNumericPrecision(lType)),
rhsDecimal.getScale());
} else {
throw new IllegalStateException(
String.format("Incompatible types: \"%s\" and \"%s\"", lType, rType));
}

if (nullable) {
return mergedType.nullable();
} else {
return mergedType.notNull();
}
}

@VisibleForTesting
public static int getNumericPrecision(DataType dataType) {
if (dataType.is(DataTypeFamily.EXACT_NUMERIC)) {
if (dataType.is(DataTypeRoot.TINYINT)) {
return 3;
} else if (dataType.is(DataTypeRoot.SMALLINT)) {
return 5;
} else if (dataType.is(DataTypeRoot.INTEGER)) {
return 10;
} else if (dataType.is(DataTypeRoot.BIGINT)) {
return 19;
} else if (dataType.is(DataTypeRoot.DECIMAL)) {
return ((DecimalType) dataType).getPrecision();
}
}

throw new IllegalArgumentException(
"Failed to get precision of non-exact decimal type " + dataType);
}

/** apply SchemaChangeEvent to the old schema and return the schema after changing. */
public static Schema applySchemaChangeEvent(Schema schema, SchemaChangeEvent event) {
if (event instanceof AddColumnEvent) {
Expand Down
Loading
Loading