Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
yuxiqian committed Jul 1, 2024
1 parent 934a251 commit cb119cd
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,118 +97,122 @@ public static Schema mergeCompatibleSchemas(List<Schema> schemas) {

/** Try to combine two schemas with potential incompatible type. */
@VisibleForTesting
public static Schema mergeSchema(@Nullable Schema lhs, Schema rhs) {
if (lhs == null) {
return rhs;
public static Schema mergeSchema(@Nullable Schema lSchema, Schema rSchema) {
if (lSchema == null) {
return rSchema;
}
if (lhs.getColumnCount() != rhs.getColumnCount()) {
if (lSchema.getColumnCount() != rSchema.getColumnCount()) {
throw new IllegalStateException(
String.format(
"Unable to merge schema %s and %s with different column counts.",
lhs, rhs));
lSchema, rSchema));
}
if (!lhs.primaryKeys().equals(rhs.primaryKeys())) {
if (!lSchema.primaryKeys().equals(rSchema.primaryKeys())) {
throw new IllegalStateException(
String.format(
"Unable to merge schema %s and %s with different primary keys.",
lhs, rhs));
lSchema, rSchema));
}
if (!lhs.partitionKeys().equals(rhs.partitionKeys())) {
if (!lSchema.partitionKeys().equals(rSchema.partitionKeys())) {
throw new IllegalStateException(
String.format(
"Unable to merge schema %s and %s with different partition keys.",
lhs, rhs));
lSchema, rSchema));
}
if (!lhs.options().equals(rhs.options())) {
if (!lSchema.options().equals(rSchema.options())) {
throw new IllegalStateException(
String.format(
"Unable to merge schema %s and %s with different options.", lhs, rhs));
"Unable to merge schema %s and %s with different options.",
lSchema, rSchema));
}
if (!Objects.equals(lhs.comment(), rhs.comment())) {
if (!Objects.equals(lSchema.comment(), rSchema.comment())) {
throw new IllegalStateException(
String.format(
"Unable to merge schema %s and %s with different comments.", lhs, rhs));
"Unable to merge schema %s and %s with different comments.",
lSchema, rSchema));
}

List<Column> leftColumns = lhs.getColumns();
List<Column> rightColumns = rhs.getColumns();
List<Column> leftColumns = lSchema.getColumns();
List<Column> rightColumns = rSchema.getColumns();

List<Column> mergedColumns =
IntStream.range(0, lhs.getColumnCount())
IntStream.range(0, lSchema.getColumnCount())
.mapToObj(i -> mergeColumn(leftColumns.get(i), rightColumns.get(i)))
.collect(Collectors.toList());

return lhs.copy(mergedColumns);
return lSchema.copy(mergedColumns);
}

/** Try to combine two columns with potential incompatible type. */
@VisibleForTesting
public static Column mergeColumn(Column lhs, Column rhs) {
if (!Objects.equals(lhs.getName(), rhs.getName())) {
public static Column mergeColumn(Column lColumn, Column rColumn) {
if (!Objects.equals(lColumn.getName(), rColumn.getName())) {
throw new IllegalStateException(
String.format(
"Unable to merge column %s and %s with different name.", lhs, rhs));
"Unable to merge column %s and %s with different name.",
lColumn, rColumn));
}
if (!Objects.equals(lhs.getComment(), rhs.getComment())) {
if (!Objects.equals(lColumn.getComment(), rColumn.getComment())) {
throw new IllegalStateException(
String.format(
"Unable to merge column %s and %s with different comments.", lhs, rhs));
"Unable to merge column %s and %s with different comments.",
lColumn, rColumn));
}
return lhs.copy(mergeDataType(lhs.getType(), rhs.getType()));
return lColumn.copy(mergeDataType(lColumn.getType(), rColumn.getType()));
}

/** Try to combine given data types to a compatible wider data type. */
@VisibleForTesting
public static DataType mergeDataType(DataType lhs, DataType rhs) {
public static DataType mergeDataType(DataType lType, DataType rType) {
// Ignore nullability during data type merge
boolean nullable = lhs.isNullable() || rhs.isNullable();
lhs = lhs.notNull();
rhs = rhs.notNull();
boolean nullable = lType.isNullable() || rType.isNullable();
lType = lType.notNull();
rType = rType.notNull();

DataType mergedType;
if (lhs.equals(rhs)) {
if (lType.equals(rType)) {
// identical type
mergedType = rhs;
} else if (lhs.is(DataTypeFamily.INTEGER_NUMERIC)
&& rhs.is(DataTypeFamily.INTEGER_NUMERIC)) {
mergedType = rType;
} else if (lType.is(DataTypeFamily.INTEGER_NUMERIC)
&& rType.is(DataTypeFamily.INTEGER_NUMERIC)) {
mergedType = DataTypes.BIGINT();
} else if (lhs.is(DataTypeFamily.CHARACTER_STRING)
&& rhs.is(DataTypeFamily.CHARACTER_STRING)) {
} else if (lType.is(DataTypeFamily.CHARACTER_STRING)
&& rType.is(DataTypeFamily.CHARACTER_STRING)) {
mergedType = DataTypes.STRING();
} else if (lhs.is(DataTypeFamily.APPROXIMATE_NUMERIC)
&& rhs.is(DataTypeFamily.APPROXIMATE_NUMERIC)) {
} else if (lType.is(DataTypeFamily.APPROXIMATE_NUMERIC)
&& rType.is(DataTypeFamily.APPROXIMATE_NUMERIC)) {
mergedType = DataTypes.DOUBLE();
} else if (lhs.is(DataTypeRoot.DECIMAL) && rhs.is(DataTypeRoot.DECIMAL)) {
} else if (lType.is(DataTypeRoot.DECIMAL) && rType.is(DataTypeRoot.DECIMAL)) {
// Merge two decimal types
DecimalType lhsDecimal = (DecimalType) lhs;
DecimalType rhsDecimal = (DecimalType) rhs;
DecimalType lhsDecimal = (DecimalType) lType;
DecimalType rhsDecimal = (DecimalType) rType;
int resultIntDigits =
Math.max(
lhsDecimal.getPrecision() - lhsDecimal.getScale(),
rhsDecimal.getPrecision() - rhsDecimal.getScale());
int resultScale = Math.max(lhsDecimal.getScale(), rhsDecimal.getScale());
mergedType = DataTypes.DECIMAL(resultIntDigits + resultScale, resultScale);
} else if (lhs.is(DataTypeRoot.DECIMAL) && rhs.is(DataTypeFamily.EXACT_NUMERIC)) {
} else if (lType.is(DataTypeRoot.DECIMAL) && rType.is(DataTypeFamily.EXACT_NUMERIC)) {
// Merge decimal and int
DecimalType lhsDecimal = (DecimalType) lhs;
DecimalType lhsDecimal = (DecimalType) lType;
mergedType =
DataTypes.DECIMAL(
Math.max(
lhsDecimal.getPrecision(),
lhsDecimal.getScale() + getNumericPrecision(rhs)),
lhsDecimal.getScale() + getNumericPrecision(rType)),
lhsDecimal.getScale());
} else if (rhs.is(DataTypeRoot.DECIMAL) && lhs.is(DataTypeFamily.EXACT_NUMERIC)) {
} else if (rType.is(DataTypeRoot.DECIMAL) && lType.is(DataTypeFamily.EXACT_NUMERIC)) {
// Merge decimal and int
DecimalType rhsDecimal = (DecimalType) rhs;
DecimalType rhsDecimal = (DecimalType) rType;
mergedType =
DataTypes.DECIMAL(
Math.max(
rhsDecimal.getPrecision(),
rhsDecimal.getScale() + getNumericPrecision(lhs)),
rhsDecimal.getScale() + getNumericPrecision(lType)),
rhsDecimal.getScale());
} else {
throw new IllegalStateException(
String.format("Incompatible types: \"%s\" and \"%s\"", lhs, rhs));
String.format("Incompatible types: \"%s\" and \"%s\"", lType, rType));
}

if (nullable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
import org.apache.flink.cdc.runtime.operators.transform.TransformSchemaOperator;

import org.junit.After;
import org.junit.Assert;
Expand All @@ -43,7 +42,7 @@
import java.time.Duration;
import java.util.concurrent.TimeoutException;

/** E2e tests for the {@link TransformSchemaOperator}. */
/** E2e tests for routing features. */
@RunWith(Parameterized.class)
public class RouteE2eITCase extends PipelineTestEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(RouteE2eITCase.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -56,6 +57,7 @@ public class PostTransformProcessor {
private @Nullable TransformFilter transformFilter;
private String timezone;
private Map<String, ProjectionColumnProcessor> projectionColumnProcessorMap;
private Map<String, ProjectionColumn> cachedProjectionColumns;

public PostTransformProcessor(
TableInfo tableInfo,
Expand All @@ -69,6 +71,7 @@ public PostTransformProcessor(
this.transformFilter = transformFilter;
this.timezone = timezone;
this.projectionColumnProcessorMap = new ConcurrentHashMap<>();
this.cachedProjectionColumns = cacheProjectionColumnMap(tableInfo, transformProjection);
}

public boolean hasTableChangeInfo() {
Expand Down Expand Up @@ -112,82 +115,31 @@ public Schema processSchemaChangeEvent(Schema schema) {
.collect(Collectors.toList()));
}

public BinaryRecordData processData(BinaryRecordData after, long epochTime) {
public BinaryRecordData processData(BinaryRecordData payload, long epochTime) {
List<Object> valueList = new ArrayList<>();
for (Column column : tableInfo.getSchema().getColumns()) {
boolean isProjectionColumn = false;
for (ProjectionColumn projectionColumn : transformProjection.getProjectionColumns()) {
if (column.getName().equals(projectionColumn.getColumnName())
&& projectionColumn.isValidTransformedProjectionColumn()) {
if (!projectionColumnProcessorMap.containsKey(
projectionColumn.getColumnName())) {
projectionColumnProcessorMap.put(
projectionColumn.getColumnName(),
ProjectionColumnProcessor.of(
tableInfo, projectionColumn, timezone));
}
ProjectionColumnProcessor projectionColumnProcessor =
projectionColumnProcessorMap.get(projectionColumn.getColumnName());
valueList.add(
DataTypeConverter.convert(
projectionColumnProcessor.evaluate(after, epochTime),
projectionColumn.getDataType()));
isProjectionColumn = true;
break;
}
}
if (!isProjectionColumn) {
ProjectionColumn projectionColumn = cachedProjectionColumns.get(column.getName());
if (cachedProjectionColumns.containsKey(column.getName())) {
projectionColumnProcessorMap.putIfAbsent(
projectionColumn.getColumnName(),
ProjectionColumnProcessor.of(tableInfo, projectionColumn, timezone));
ProjectionColumnProcessor projectionColumnProcessor =
projectionColumnProcessorMap.get(projectionColumn.getColumnName());
valueList.add(
DataTypeConverter.convert(
projectionColumnProcessor.evaluate(payload, epochTime),
projectionColumn.getDataType()));
} else {
valueList.add(
getValueFromBinaryRecordData(
column.getName(),
column.getType(),
after,
payload,
tableInfo.getOriginalSchema().getColumns(),
tableInfo.getOriginalFieldGetters()));
}
}
return tableInfo
.getRecordDataGenerator()
.generate(valueList.toArray(new Object[valueList.size()]));
}

public BinaryRecordData preProcessData(BinaryRecordData after, long epochTime) {
List<Object> valueList = new ArrayList<>();
for (Column column : tableInfo.getSchema().getColumns()) {
boolean isProjectionColumn = false;
for (ProjectionColumn projectionColumn : transformProjection.getProjectionColumns()) {
if (column.getName().equals(projectionColumn.getColumnName())
&& projectionColumn.isValidTransformedProjectionColumn()) {
if (!projectionColumnProcessorMap.containsKey(
projectionColumn.getColumnName())) {
projectionColumnProcessorMap.put(
projectionColumn.getColumnName(),
ProjectionColumnProcessor.of(
tableInfo, projectionColumn, timezone));
}
ProjectionColumnProcessor projectionColumnProcessor =
projectionColumnProcessorMap.get(projectionColumn.getColumnName());
valueList.add(
DataTypeConverter.convert(
projectionColumnProcessor.evaluate(after, epochTime),
projectionColumn.getDataType()));
isProjectionColumn = true;
break;
}
}
if (!isProjectionColumn) {
valueList.add(
getValueFromBinaryRecordData(
column.getName(),
column.getType(),
after,
tableInfo.getSchema().getColumns(),
tableInfo.getFieldGetters()));
}
}
return tableInfo
.getRecordDataGenerator()
.generate(valueList.toArray(new Object[valueList.size()]));
return tableInfo.getRecordDataGenerator().generate(valueList.toArray(new Object[0]));
}

private Object getValueFromBinaryRecordData(
Expand All @@ -204,4 +156,24 @@ private Object getValueFromBinaryRecordData(
}
return null;
}

private Map<String, ProjectionColumn> cacheProjectionColumnMap(
TableInfo tableInfo, TransformProjection transformProjection) {
Map<String, ProjectionColumn> cachedMap = new HashMap<>();
if (!hasTableInfo()) {
return cachedMap;
}

for (Column column : tableInfo.getSchema().getColumns()) {
for (ProjectionColumn projectionColumn : transformProjection.getProjectionColumns()) {
if (column.getName().equals(projectionColumn.getColumnName())
&& projectionColumn.isValidTransformedProjectionColumn()) {
cachedMap.put(column.getName(), projectionColumn);
break;
}
}
}

return cachedMap;
}
}
Loading

0 comments on commit cb119cd

Please sign in to comment.