Skip to content

Commit

Permalink
[FLINK-35272][cdc-runtime] Transform supports omitting and renaming c…
Browse files Browse the repository at this point in the history
…omputed column

This closes apache#3285.
  • Loading branch information
yuxiqian authored and qiaozongmi committed Sep 23, 2024
1 parent 93429db commit 9705c5d
Show file tree
Hide file tree
Showing 37 changed files with 4,624 additions and 1,070 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ pipeline:
<thead>
<tr>
<th class="text-left" style="width:10%;">CDC type</th>
<th class="text-left" style="width:30%;">Doris type<a href="https://doris.apache.org/zh-CN/docs/dev/sql-manual/sql-types/Data-Types/BOOLEAN/"></a></th>
<th class="text-left" style="width:30%;">Doris type</th>
<th class="text-left" style="width:60%;">NOTE</th>
</tr>
</thead>
Expand Down
4 changes: 3 additions & 1 deletion docs/content.zh/docs/core-concept/transform.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,9 @@ transform:
Tips: The format of table-options is `key1=value1,key2=value2`.

## Classification mapping
Multiple transform rules can be defined to classify input data rows and apply different processings. For example, we may define a transform rule as follows:
Multiple transform rules can be defined to classify input data rows and apply different processing.
Only the first matched transform rule will apply.
For example, we may define a transform rule as follows:

```yaml
transform:
Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/connectors/pipeline-connectors/doris.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ pipeline:
<thead>
<tr>
<th class="text-left" style="width:10%;">Flink CDC Type</th>
<th class="text-left" style="width:30%;"><a href="https://doris.apache.org/docs/dev/sql-manual/sql-types/Data-Types/BOOLEAN/">Doris Type</a></th>
<th class="text-left" style="width:30%;">Doris Type</th>
<th class="text-left" style="width:60%;">Note</th>
</tr>
</thead>
Expand Down
4 changes: 3 additions & 1 deletion docs/content/docs/core-concept/transform.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,9 @@ transform:
Tips: The format of table-options is `key1=value1,key2=value2`.

## Classification mapping
Multiple transform rules can be defined to classify input data rows and apply different processings. For example, we may define a transform rule as follows:
Multiple transform rules can be defined to classify input data rows and apply different processing.
Only the first matched transform rule will apply.
For example, we may define a transform rule as follows:

```yaml
transform:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.cdc.common.utils;

import org.apache.flink.cdc.common.annotation.PublicEvolving;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
Expand All @@ -26,11 +27,21 @@
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypeFamily;
import org.apache.flink.cdc.common.types.DataTypeRoot;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.DecimalType;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/** Utils for {@link Schema} to perform the ability of evolution. */
@PublicEvolving
Expand All @@ -56,6 +67,181 @@ public static List<RecordData.FieldGetter> createFieldGetters(List<Column> colum
return fieldGetters;
}

/** Restore original data fields from RecordData structure. */
public static List<Object> restoreOriginalData(
@Nullable RecordData recordData, List<RecordData.FieldGetter> fieldGetters) {
if (recordData == null) {
return Collections.emptyList();
}
List<Object> actualFields = new ArrayList<>();
for (RecordData.FieldGetter fieldGetter : fieldGetters) {
actualFields.add(fieldGetter.getFieldOrNull(recordData));
}
return actualFields;
}

/** Merge compatible upstream schemas. */
public static Schema inferWiderSchema(List<Schema> schemas) {
if (schemas.isEmpty()) {
return null;
} else if (schemas.size() == 1) {
return schemas.get(0);
} else {
Schema outputSchema = null;
for (Schema schema : schemas) {
outputSchema = inferWiderSchema(outputSchema, schema);
}
return outputSchema;
}
}

/** Try to combine two schemas with potential incompatible type. */
@VisibleForTesting
public static Schema inferWiderSchema(@Nullable Schema lSchema, Schema rSchema) {
if (lSchema == null) {
return rSchema;
}
if (lSchema.getColumnCount() != rSchema.getColumnCount()) {
throw new IllegalStateException(
String.format(
"Unable to merge schema %s and %s with different column counts.",
lSchema, rSchema));
}
if (!lSchema.primaryKeys().equals(rSchema.primaryKeys())) {
throw new IllegalStateException(
String.format(
"Unable to merge schema %s and %s with different primary keys.",
lSchema, rSchema));
}
if (!lSchema.partitionKeys().equals(rSchema.partitionKeys())) {
throw new IllegalStateException(
String.format(
"Unable to merge schema %s and %s with different partition keys.",
lSchema, rSchema));
}
if (!lSchema.options().equals(rSchema.options())) {
throw new IllegalStateException(
String.format(
"Unable to merge schema %s and %s with different options.",
lSchema, rSchema));
}
if (!Objects.equals(lSchema.comment(), rSchema.comment())) {
throw new IllegalStateException(
String.format(
"Unable to merge schema %s and %s with different comments.",
lSchema, rSchema));
}

List<Column> leftColumns = lSchema.getColumns();
List<Column> rightColumns = rSchema.getColumns();

List<Column> mergedColumns =
IntStream.range(0, lSchema.getColumnCount())
.mapToObj(i -> inferWiderColumn(leftColumns.get(i), rightColumns.get(i)))
.collect(Collectors.toList());

return lSchema.copy(mergedColumns);
}

/** Try to combine two columns with potential incompatible type. */
@VisibleForTesting
public static Column inferWiderColumn(Column lColumn, Column rColumn) {
if (!Objects.equals(lColumn.getName(), rColumn.getName())) {
throw new IllegalStateException(
String.format(
"Unable to merge column %s and %s with different name.",
lColumn, rColumn));
}
if (!Objects.equals(lColumn.getComment(), rColumn.getComment())) {
throw new IllegalStateException(
String.format(
"Unable to merge column %s and %s with different comments.",
lColumn, rColumn));
}
return lColumn.copy(inferWiderType(lColumn.getType(), rColumn.getType()));
}

/** Try to combine given data types to a compatible wider data type. */
@VisibleForTesting
public static DataType inferWiderType(DataType lType, DataType rType) {
// Ignore nullability during data type merge
boolean nullable = lType.isNullable() || rType.isNullable();
lType = lType.notNull();
rType = rType.notNull();

DataType mergedType;
if (lType.equals(rType)) {
// identical type
mergedType = rType;
} else if (lType.is(DataTypeFamily.INTEGER_NUMERIC)
&& rType.is(DataTypeFamily.INTEGER_NUMERIC)) {
mergedType = DataTypes.BIGINT();
} else if (lType.is(DataTypeFamily.CHARACTER_STRING)
&& rType.is(DataTypeFamily.CHARACTER_STRING)) {
mergedType = DataTypes.STRING();
} else if (lType.is(DataTypeFamily.APPROXIMATE_NUMERIC)
&& rType.is(DataTypeFamily.APPROXIMATE_NUMERIC)) {
mergedType = DataTypes.DOUBLE();
} else if (lType.is(DataTypeRoot.DECIMAL) && rType.is(DataTypeRoot.DECIMAL)) {
// Merge two decimal types
DecimalType lhsDecimal = (DecimalType) lType;
DecimalType rhsDecimal = (DecimalType) rType;
int resultIntDigits =
Math.max(
lhsDecimal.getPrecision() - lhsDecimal.getScale(),
rhsDecimal.getPrecision() - rhsDecimal.getScale());
int resultScale = Math.max(lhsDecimal.getScale(), rhsDecimal.getScale());
mergedType = DataTypes.DECIMAL(resultIntDigits + resultScale, resultScale);
} else if (lType.is(DataTypeRoot.DECIMAL) && rType.is(DataTypeFamily.EXACT_NUMERIC)) {
// Merge decimal and int
DecimalType lhsDecimal = (DecimalType) lType;
mergedType =
DataTypes.DECIMAL(
Math.max(
lhsDecimal.getPrecision(),
lhsDecimal.getScale() + getNumericPrecision(rType)),
lhsDecimal.getScale());
} else if (rType.is(DataTypeRoot.DECIMAL) && lType.is(DataTypeFamily.EXACT_NUMERIC)) {
// Merge decimal and int
DecimalType rhsDecimal = (DecimalType) rType;
mergedType =
DataTypes.DECIMAL(
Math.max(
rhsDecimal.getPrecision(),
rhsDecimal.getScale() + getNumericPrecision(lType)),
rhsDecimal.getScale());
} else {
throw new IllegalStateException(
String.format("Incompatible types: \"%s\" and \"%s\"", lType, rType));
}

if (nullable) {
return mergedType.nullable();
} else {
return mergedType.notNull();
}
}

@VisibleForTesting
public static int getNumericPrecision(DataType dataType) {
if (dataType.is(DataTypeFamily.EXACT_NUMERIC)) {
if (dataType.is(DataTypeRoot.TINYINT)) {
return 3;
} else if (dataType.is(DataTypeRoot.SMALLINT)) {
return 5;
} else if (dataType.is(DataTypeRoot.INTEGER)) {
return 10;
} else if (dataType.is(DataTypeRoot.BIGINT)) {
return 19;
} else if (dataType.is(DataTypeRoot.DECIMAL)) {
return ((DecimalType) dataType).getPrecision();
}
}

throw new IllegalArgumentException(
"Failed to get precision of non-exact decimal type " + dataType);
}

/** apply SchemaChangeEvent to the old schema and return the schema after changing. */
public static Schema applySchemaChangeEvent(Schema schema, SchemaChangeEvent event) {
if (event instanceof AddColumnEvent) {
Expand Down
Loading

0 comments on commit 9705c5d

Please sign in to comment.