Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
yuxiqian committed Jul 23, 2024
1 parent 6e6a991 commit e1bcab8
Show file tree
Hide file tree
Showing 16 changed files with 276 additions and 167 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,23 +81,23 @@ public static List<Object> restoreOriginalData(
}

/** Merge compatible upstream schemas. */
public static Schema mergeCompatibleSchemas(List<Schema> schemas) {
public static Schema inferWiderSchema(List<Schema> schemas) {
if (schemas.isEmpty()) {
return null;
} else if (schemas.size() == 1) {
return schemas.get(0);
} else {
Schema outputSchema = null;
for (Schema schema : schemas) {
outputSchema = mergeSchema(outputSchema, schema);
outputSchema = inferWiderSchema(outputSchema, schema);
}
return outputSchema;
}
}

/** Try to combine two schemas with potential incompatible type. */
@VisibleForTesting
public static Schema mergeSchema(@Nullable Schema lSchema, Schema rSchema) {
public static Schema inferWiderSchema(@Nullable Schema lSchema, Schema rSchema) {
if (lSchema == null) {
return rSchema;
}
Expand Down Expand Up @@ -137,15 +137,15 @@ public static Schema mergeSchema(@Nullable Schema lSchema, Schema rSchema) {

List<Column> mergedColumns =
IntStream.range(0, lSchema.getColumnCount())
.mapToObj(i -> mergeColumn(leftColumns.get(i), rightColumns.get(i)))
.mapToObj(i -> inferWiderColumn(leftColumns.get(i), rightColumns.get(i)))
.collect(Collectors.toList());

return lSchema.copy(mergedColumns);
}

/** Try to combine two columns with potential incompatible type. */
@VisibleForTesting
public static Column mergeColumn(Column lColumn, Column rColumn) {
public static Column inferWiderColumn(Column lColumn, Column rColumn) {
if (!Objects.equals(lColumn.getName(), rColumn.getName())) {
throw new IllegalStateException(
String.format(
Expand All @@ -158,12 +158,12 @@ public static Column mergeColumn(Column lColumn, Column rColumn) {
"Unable to merge column %s and %s with different comments.",
lColumn, rColumn));
}
return lColumn.copy(mergeDataType(lColumn.getType(), rColumn.getType()));
return lColumn.copy(inferWiderType(lColumn.getType(), rColumn.getType()));
}

/** Try to combine given data types to a compatible wider data type. */
@VisibleForTesting
public static DataType mergeDataType(DataType lType, DataType rType) {
public static DataType inferWiderType(DataType lType, DataType rType) {
// Ignore nullability during data type merge
boolean nullable = lType.isNullable() || rType.isNullable();
lType = lType.notNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,166 +180,169 @@ public void testGetNumericPrecision() {
}

@Test
public void testMergeDataType() {
Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.BINARY(17), DataTypes.BINARY(17)))
public void testInferWiderType() {
Assertions.assertThat(
SchemaUtils.inferWiderType(DataTypes.BINARY(17), DataTypes.BINARY(17)))
.isEqualTo(DataTypes.BINARY(17));
Assertions.assertThat(
SchemaUtils.mergeDataType(DataTypes.VARBINARY(17), DataTypes.VARBINARY(17)))
SchemaUtils.inferWiderType(
DataTypes.VARBINARY(17), DataTypes.VARBINARY(17)))
.isEqualTo(DataTypes.VARBINARY(17));
Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.BYTES(), DataTypes.BYTES()))
Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.BYTES(), DataTypes.BYTES()))
.isEqualTo(DataTypes.BYTES());
Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.BOOLEAN(), DataTypes.BOOLEAN()))
Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.BOOLEAN(), DataTypes.BOOLEAN()))
.isEqualTo(DataTypes.BOOLEAN());
Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.INT(), DataTypes.INT()))
Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.INT(), DataTypes.INT()))
.isEqualTo(DataTypes.INT());
Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.TINYINT(), DataTypes.TINYINT()))
Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.TINYINT(), DataTypes.TINYINT()))
.isEqualTo(DataTypes.TINYINT());
Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.SMALLINT(), DataTypes.SMALLINT()))
Assertions.assertThat(
SchemaUtils.inferWiderType(DataTypes.SMALLINT(), DataTypes.SMALLINT()))
.isEqualTo(DataTypes.SMALLINT());
Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.BIGINT(), DataTypes.BIGINT()))
Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.BIGINT(), DataTypes.BIGINT()))
.isEqualTo(DataTypes.BIGINT());
Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.FLOAT(), DataTypes.FLOAT()))
Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.FLOAT(), DataTypes.FLOAT()))
.isEqualTo(DataTypes.FLOAT());
Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.DOUBLE(), DataTypes.DOUBLE()))
Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.DOUBLE(), DataTypes.DOUBLE()))
.isEqualTo(DataTypes.DOUBLE());
Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.CHAR(17), DataTypes.CHAR(17)))
Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.CHAR(17), DataTypes.CHAR(17)))
.isEqualTo(DataTypes.CHAR(17));
Assertions.assertThat(
SchemaUtils.mergeDataType(DataTypes.VARCHAR(17), DataTypes.VARCHAR(17)))
SchemaUtils.inferWiderType(DataTypes.VARCHAR(17), DataTypes.VARCHAR(17)))
.isEqualTo(DataTypes.VARCHAR(17));
Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.STRING(), DataTypes.STRING()))
Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.STRING(), DataTypes.STRING()))
.isEqualTo(DataTypes.STRING());
Assertions.assertThat(
SchemaUtils.mergeDataType(
SchemaUtils.inferWiderType(
DataTypes.DECIMAL(17, 7), DataTypes.DECIMAL(17, 7)))
.isEqualTo(DataTypes.DECIMAL(17, 7));
Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.DATE(), DataTypes.DATE()))
Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.DATE(), DataTypes.DATE()))
.isEqualTo(DataTypes.DATE());
Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.TIME(), DataTypes.TIME()))
Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.TIME(), DataTypes.TIME()))
.isEqualTo(DataTypes.TIME());
Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.TIME(6), DataTypes.TIME(6)))
Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.TIME(6), DataTypes.TIME(6)))
.isEqualTo(DataTypes.TIME(6));
Assertions.assertThat(
SchemaUtils.mergeDataType(DataTypes.TIMESTAMP(), DataTypes.TIMESTAMP()))
SchemaUtils.inferWiderType(DataTypes.TIMESTAMP(), DataTypes.TIMESTAMP()))
.isEqualTo(DataTypes.TIMESTAMP());
Assertions.assertThat(
SchemaUtils.mergeDataType(DataTypes.TIMESTAMP(3), DataTypes.TIMESTAMP(3)))
SchemaUtils.inferWiderType(DataTypes.TIMESTAMP(3), DataTypes.TIMESTAMP(3)))
.isEqualTo(DataTypes.TIMESTAMP(3));
Assertions.assertThat(
SchemaUtils.mergeDataType(
SchemaUtils.inferWiderType(
DataTypes.TIMESTAMP_TZ(), DataTypes.TIMESTAMP_TZ()))
.isEqualTo(DataTypes.TIMESTAMP_TZ());
Assertions.assertThat(
SchemaUtils.mergeDataType(
SchemaUtils.inferWiderType(
DataTypes.TIMESTAMP_TZ(3), DataTypes.TIMESTAMP_TZ(3)))
.isEqualTo(DataTypes.TIMESTAMP_TZ(3));
Assertions.assertThat(
SchemaUtils.mergeDataType(
SchemaUtils.inferWiderType(
DataTypes.TIMESTAMP_LTZ(), DataTypes.TIMESTAMP_LTZ()))
.isEqualTo(DataTypes.TIMESTAMP_LTZ());
Assertions.assertThat(
SchemaUtils.mergeDataType(
SchemaUtils.inferWiderType(
DataTypes.TIMESTAMP_LTZ(3), DataTypes.TIMESTAMP_LTZ(3)))
.isEqualTo(DataTypes.TIMESTAMP_LTZ(3));
Assertions.assertThat(
SchemaUtils.mergeDataType(
SchemaUtils.inferWiderType(
DataTypes.ARRAY(DataTypes.INT()), DataTypes.ARRAY(DataTypes.INT())))
.isEqualTo(DataTypes.ARRAY(DataTypes.INT()));
Assertions.assertThat(
SchemaUtils.mergeDataType(
SchemaUtils.inferWiderType(
DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()),
DataTypes.MAP(DataTypes.INT(), DataTypes.STRING())))
.isEqualTo(DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()));

// Test compatible widening cast
Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.INT(), DataTypes.BIGINT()))
Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.INT(), DataTypes.BIGINT()))
.isEqualTo(DataTypes.BIGINT());
Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.VARCHAR(17), DataTypes.STRING()))
Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.VARCHAR(17), DataTypes.STRING()))
.isEqualTo(DataTypes.STRING());
Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.FLOAT(), DataTypes.DOUBLE()))
Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.FLOAT(), DataTypes.DOUBLE()))
.isEqualTo(DataTypes.DOUBLE());
Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.INT(), DataTypes.DECIMAL(4, 0)))
Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.INT(), DataTypes.DECIMAL(4, 0)))
.isEqualTo(DataTypes.DECIMAL(10, 0));
Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.INT(), DataTypes.DECIMAL(10, 5)))
Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.INT(), DataTypes.DECIMAL(10, 5)))
.isEqualTo(DataTypes.DECIMAL(15, 5));
Assertions.assertThat(
SchemaUtils.mergeDataType(DataTypes.BIGINT(), DataTypes.DECIMAL(10, 5)))
SchemaUtils.inferWiderType(DataTypes.BIGINT(), DataTypes.DECIMAL(10, 5)))
.isEqualTo(DataTypes.DECIMAL(24, 5));
Assertions.assertThat(
SchemaUtils.mergeDataType(
SchemaUtils.inferWiderType(
DataTypes.DECIMAL(5, 4), DataTypes.DECIMAL(10, 2)))
.isEqualTo(DataTypes.DECIMAL(12, 4));

// Test merging with nullability
Assertions.assertThat(
SchemaUtils.mergeDataType(
SchemaUtils.inferWiderType(
DataTypes.INT().notNull(), DataTypes.INT().notNull()))
.isEqualTo(DataTypes.INT().notNull());
Assertions.assertThat(
SchemaUtils.mergeDataType(
SchemaUtils.inferWiderType(
DataTypes.INT().nullable(), DataTypes.INT().notNull()))
.isEqualTo(DataTypes.INT().nullable());
Assertions.assertThat(
SchemaUtils.mergeDataType(
SchemaUtils.inferWiderType(
DataTypes.INT().notNull(), DataTypes.INT().nullable()))
.isEqualTo(DataTypes.INT().nullable());
Assertions.assertThat(
SchemaUtils.mergeDataType(
SchemaUtils.inferWiderType(
DataTypes.INT().nullable(), DataTypes.INT().nullable()))
.isEqualTo(DataTypes.INT().nullable());

// incompatible type merges test
Assertions.assertThatThrownBy(
() -> SchemaUtils.mergeDataType(DataTypes.INT(), DataTypes.DOUBLE()))
() -> SchemaUtils.inferWiderType(DataTypes.INT(), DataTypes.DOUBLE()))
.isExactlyInstanceOf(IllegalStateException.class);

Assertions.assertThatThrownBy(
() ->
SchemaUtils.mergeDataType(
SchemaUtils.inferWiderType(
DataTypes.DECIMAL(17, 0), DataTypes.DOUBLE()))
.isExactlyInstanceOf(IllegalStateException.class);
Assertions.assertThatThrownBy(
() -> SchemaUtils.mergeDataType(DataTypes.INT(), DataTypes.STRING()))
() -> SchemaUtils.inferWiderType(DataTypes.INT(), DataTypes.STRING()))
.isExactlyInstanceOf(IllegalStateException.class);
}

@Test
public void testMergeColumn() {
public void testInferWiderColumn() {
// Test normal merges
Assertions.assertThat(
SchemaUtils.mergeColumn(
SchemaUtils.inferWiderColumn(
Column.physicalColumn("Column1", DataTypes.INT()),
Column.physicalColumn("Column1", DataTypes.BIGINT())))
.isEqualTo(Column.physicalColumn("Column1", DataTypes.BIGINT()));

Assertions.assertThat(
SchemaUtils.mergeColumn(
SchemaUtils.inferWiderColumn(
Column.physicalColumn("Column2", DataTypes.FLOAT()),
Column.physicalColumn("Column2", DataTypes.DOUBLE())))
.isEqualTo(Column.physicalColumn("Column2", DataTypes.DOUBLE()));

// Test merging columns with incompatible types
Assertions.assertThatThrownBy(
() ->
SchemaUtils.mergeColumn(
SchemaUtils.inferWiderColumn(
Column.physicalColumn("Column3", DataTypes.INT()),
Column.physicalColumn("Column3", DataTypes.STRING())))
.isExactlyInstanceOf(IllegalStateException.class);

// Test merging with incompatible names
Assertions.assertThatThrownBy(
() ->
SchemaUtils.mergeColumn(
SchemaUtils.inferWiderColumn(
Column.physicalColumn("Column4", DataTypes.INT()),
Column.physicalColumn("AnotherColumn4", DataTypes.INT())))
.isExactlyInstanceOf(IllegalStateException.class);
}

@Test
public void testMergeSchema() {
public void testInferWiderSchema() {
// Test normal merges
Assertions.assertThat(
SchemaUtils.mergeSchema(
SchemaUtils.inferWiderSchema(
Schema.newBuilder()
.physicalColumn("Column1", DataTypes.INT())
.physicalColumn("Column2", DataTypes.DOUBLE())
Expand All @@ -363,7 +366,7 @@ public void testMergeSchema() {
// Test merging with incompatible types
Assertions.assertThatThrownBy(
() ->
SchemaUtils.mergeSchema(
SchemaUtils.inferWiderSchema(
Schema.newBuilder()
.physicalColumn("Column1", DataTypes.INT())
.physicalColumn("Column2", DataTypes.DOUBLE())
Expand All @@ -381,7 +384,7 @@ public void testMergeSchema() {
// Test merging with incompatible column names
Assertions.assertThatThrownBy(
() ->
SchemaUtils.mergeSchema(
SchemaUtils.inferWiderSchema(
Schema.newBuilder()
.physicalColumn("Column1", DataTypes.INT())
.physicalColumn("Column2", DataTypes.DOUBLE())
Expand All @@ -399,7 +402,7 @@ public void testMergeSchema() {
// Test merging with different column counts
Assertions.assertThatThrownBy(
() ->
SchemaUtils.mergeSchema(
SchemaUtils.inferWiderSchema(
Schema.newBuilder()
.physicalColumn("Column1", DataTypes.INT())
.physicalColumn("Column2", DataTypes.DOUBLE())
Expand All @@ -418,7 +421,7 @@ public void testMergeSchema() {
// Test merging with incompatible schema metadata
Assertions.assertThatThrownBy(
() ->
SchemaUtils.mergeSchema(
SchemaUtils.inferWiderSchema(
Schema.newBuilder()
.physicalColumn("Column1", DataTypes.INT())
.physicalColumn("Column2", DataTypes.DOUBLE())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception {
new TransformDef(
"default_namespace.default_schema.table1",
"*,concat(col1,'1') as col12",
"col1 = '1'",
"col1 = '1' OR col1 = '999'",
"col1",
"col12",
"key1=value1",
Expand Down
Loading

0 comments on commit e1bcab8

Please sign in to comment.