Skip to content

Commit

Permalink
[FLINK-35272][cdc][runtime] Transform projection & filter feature ove…
Browse files Browse the repository at this point in the history
…rhaul
  • Loading branch information
yuxiqian committed Jul 23, 2024
1 parent 313bace commit e58cfd0
Show file tree
Hide file tree
Showing 30 changed files with 4,214 additions and 911 deletions.
4 changes: 3 additions & 1 deletion docs/content.zh/docs/core-concept/transform.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,9 @@ transform:
Tips: The format of table-options is `key1=value1,key2=value2`.

## Classification mapping
Multiple transform rules can be defined to classify input data rows and apply different processings. For example, we may define a transform rule as follows:
Multiple transform rules can be defined to classify input data rows and apply different processing.
Only the first matched transform rule will apply.
For example, we may define a transform rule as follows:

```yaml
transform:
Expand Down
4 changes: 3 additions & 1 deletion docs/content/docs/core-concept/transform.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,9 @@ transform:
Tips: The format of table-options is `key1=value1,key2=value2`.

## Classification mapping
Multiple transform rules can be defined to classify input data rows and apply different processings. For example, we may define a transform rule as follows:
Multiple transform rules can be defined to classify input data rows and apply different processing.
Only the first matched transform rule will apply.
For example, we may define a transform rule as follows:

```yaml
transform:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.cdc.common.utils;

import org.apache.flink.cdc.common.annotation.PublicEvolving;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
Expand All @@ -26,11 +27,21 @@
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypeFamily;
import org.apache.flink.cdc.common.types.DataTypeRoot;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.DecimalType;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/** Utils for {@link Schema} to perform the ability of evolution. */
@PublicEvolving
Expand All @@ -56,6 +67,181 @@ public static List<RecordData.FieldGetter> createFieldGetters(List<Column> colum
return fieldGetters;
}

/** Restore original data fields from RecordData structure. */
public static List<Object> restoreOriginalData(
@Nullable RecordData recordData, List<RecordData.FieldGetter> fieldGetters) {
if (recordData == null) {
return Collections.emptyList();
}
List<Object> actualFields = new ArrayList<>();
for (RecordData.FieldGetter fieldGetter : fieldGetters) {
actualFields.add(fieldGetter.getFieldOrNull(recordData));
}
return actualFields;
}

/** Merge compatible upstream schemas. */
public static Schema mergeCompatibleSchemas(List<Schema> schemas) {
if (schemas.isEmpty()) {
return null;
} else if (schemas.size() == 1) {
return schemas.get(0);
} else {
Schema outputSchema = null;
for (Schema schema : schemas) {
outputSchema = mergeSchema(outputSchema, schema);
}
return outputSchema;
}
}

/** Try to combine two schemas with potential incompatible type. */
@VisibleForTesting
public static Schema mergeSchema(@Nullable Schema lSchema, Schema rSchema) {
if (lSchema == null) {
return rSchema;
}
if (lSchema.getColumnCount() != rSchema.getColumnCount()) {
throw new IllegalStateException(
String.format(
"Unable to merge schema %s and %s with different column counts.",
lSchema, rSchema));
}
if (!lSchema.primaryKeys().equals(rSchema.primaryKeys())) {
throw new IllegalStateException(
String.format(
"Unable to merge schema %s and %s with different primary keys.",
lSchema, rSchema));
}
if (!lSchema.partitionKeys().equals(rSchema.partitionKeys())) {
throw new IllegalStateException(
String.format(
"Unable to merge schema %s and %s with different partition keys.",
lSchema, rSchema));
}
if (!lSchema.options().equals(rSchema.options())) {
throw new IllegalStateException(
String.format(
"Unable to merge schema %s and %s with different options.",
lSchema, rSchema));
}
if (!Objects.equals(lSchema.comment(), rSchema.comment())) {
throw new IllegalStateException(
String.format(
"Unable to merge schema %s and %s with different comments.",
lSchema, rSchema));
}

List<Column> leftColumns = lSchema.getColumns();
List<Column> rightColumns = rSchema.getColumns();

List<Column> mergedColumns =
IntStream.range(0, lSchema.getColumnCount())
.mapToObj(i -> mergeColumn(leftColumns.get(i), rightColumns.get(i)))
.collect(Collectors.toList());

return lSchema.copy(mergedColumns);
}

/** Try to combine two columns with potential incompatible type. */
@VisibleForTesting
public static Column mergeColumn(Column lColumn, Column rColumn) {
if (!Objects.equals(lColumn.getName(), rColumn.getName())) {
throw new IllegalStateException(
String.format(
"Unable to merge column %s and %s with different name.",
lColumn, rColumn));
}
if (!Objects.equals(lColumn.getComment(), rColumn.getComment())) {
throw new IllegalStateException(
String.format(
"Unable to merge column %s and %s with different comments.",
lColumn, rColumn));
}
return lColumn.copy(mergeDataType(lColumn.getType(), rColumn.getType()));
}

/** Try to combine given data types to a compatible wider data type. */
@VisibleForTesting
public static DataType mergeDataType(DataType lType, DataType rType) {
// Ignore nullability during data type merge
boolean nullable = lType.isNullable() || rType.isNullable();
lType = lType.notNull();
rType = rType.notNull();

DataType mergedType;
if (lType.equals(rType)) {
// identical type
mergedType = rType;
} else if (lType.is(DataTypeFamily.INTEGER_NUMERIC)
&& rType.is(DataTypeFamily.INTEGER_NUMERIC)) {
mergedType = DataTypes.BIGINT();
} else if (lType.is(DataTypeFamily.CHARACTER_STRING)
&& rType.is(DataTypeFamily.CHARACTER_STRING)) {
mergedType = DataTypes.STRING();
} else if (lType.is(DataTypeFamily.APPROXIMATE_NUMERIC)
&& rType.is(DataTypeFamily.APPROXIMATE_NUMERIC)) {
mergedType = DataTypes.DOUBLE();
} else if (lType.is(DataTypeRoot.DECIMAL) && rType.is(DataTypeRoot.DECIMAL)) {
// Merge two decimal types
DecimalType lhsDecimal = (DecimalType) lType;
DecimalType rhsDecimal = (DecimalType) rType;
int resultIntDigits =
Math.max(
lhsDecimal.getPrecision() - lhsDecimal.getScale(),
rhsDecimal.getPrecision() - rhsDecimal.getScale());
int resultScale = Math.max(lhsDecimal.getScale(), rhsDecimal.getScale());
mergedType = DataTypes.DECIMAL(resultIntDigits + resultScale, resultScale);
} else if (lType.is(DataTypeRoot.DECIMAL) && rType.is(DataTypeFamily.EXACT_NUMERIC)) {
// Merge decimal and int
DecimalType lhsDecimal = (DecimalType) lType;
mergedType =
DataTypes.DECIMAL(
Math.max(
lhsDecimal.getPrecision(),
lhsDecimal.getScale() + getNumericPrecision(rType)),
lhsDecimal.getScale());
} else if (rType.is(DataTypeRoot.DECIMAL) && lType.is(DataTypeFamily.EXACT_NUMERIC)) {
// Merge decimal and int
DecimalType rhsDecimal = (DecimalType) rType;
mergedType =
DataTypes.DECIMAL(
Math.max(
rhsDecimal.getPrecision(),
rhsDecimal.getScale() + getNumericPrecision(lType)),
rhsDecimal.getScale());
} else {
throw new IllegalStateException(
String.format("Incompatible types: \"%s\" and \"%s\"", lType, rType));
}

if (nullable) {
return mergedType.nullable();
} else {
return mergedType.notNull();
}
}

@VisibleForTesting
public static int getNumericPrecision(DataType dataType) {
if (dataType.is(DataTypeFamily.EXACT_NUMERIC)) {
if (dataType.is(DataTypeRoot.TINYINT)) {
return 3;
} else if (dataType.is(DataTypeRoot.SMALLINT)) {
return 5;
} else if (dataType.is(DataTypeRoot.INTEGER)) {
return 10;
} else if (dataType.is(DataTypeRoot.BIGINT)) {
return 19;
} else if (dataType.is(DataTypeRoot.DECIMAL)) {
return ((DecimalType) dataType).getPrecision();
}
}

throw new IllegalArgumentException(
"Failed to get precision of non-exact decimal type " + dataType);
}

/** apply SchemaChangeEvent to the old schema and return the schema after changing. */
public static Schema applySchemaChangeEvent(Schema schema, SchemaChangeEvent event) {
if (event instanceof AddColumnEvent) {
Expand Down
Loading

0 comments on commit e58cfd0

Please sign in to comment.