Skip to content

Commit

Permalink
Add support for complex union types and maps (#98)
Browse files Browse the repository at this point in the history
* Add support for complex union types and maps

* Add AvroConverter changes

* Add tests
  • Loading branch information
jogrogan authored Feb 3, 2025
1 parent 00e7864 commit 2e84b5a
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 40 deletions.
15 changes: 7 additions & 8 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
[libraries]
assertj = "org.assertj:assertj-core:3.12.0"
avro = "org.apache.avro:avro:1.10.2"
calcite-avatica = "org.apache.calcite.avatica:avatica:1.23.0"
calcite-core = "org.apache.calcite:calcite-core:1.34.0"
calcite-server = "org.apache.calcite:calcite-server:1.34.0"
calcite-avatica = "org.apache.calcite.avatica:avatica:1.25.0"
calcite-core = "org.apache.calcite:calcite-core:1.38.0"
calcite-server = "org.apache.calcite:calcite-server:1.38.0"
commons-cli = "commons-cli:commons-cli:1.4"
flink-clients = "org.apache.flink:flink-clients:1.18.1"
flink-connector-base = "org.apache.flink:flink-connector-base:1.18.1"
Expand All @@ -19,16 +19,15 @@ flink-table-common = "org.apache.flink:flink-table-common:1.18.1"
flink-table-planner = "org.apache.flink:flink-table-planner_2.12:1.18.1"
flink-table-runtime = "org.apache.flink:flink-table-runtime:1.18.1"
gson = "com.google.code.gson:gson:2.9.0"
jackson = "com.fasterxml.jackson.core:jackson-core:2.14.1"
jackson-dataformat-yaml = "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.14.1"
jackson = "com.fasterxml.jackson.core:jackson-core:2.15.0"
jackson-dataformat-yaml = "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.15.0"
javax-annotation-api = "javax.annotation:javax.annotation-api:1.3.2"
junit = "junit:junit:4.12"
kafka-clients = "org.apache.kafka:kafka-clients:3.2.0"
kubernetes-client = "io.kubernetes:client-java:16.0.2"
kubernetes-extended-client = "io.kubernetes:client-java-extended:16.0.2"
kubernetes-client = "io.kubernetes:client-java:18.0.0"
kubernetes-extended-client = "io.kubernetes:client-java-extended:18.0.0"
slf4j-simple = "org.slf4j:slf4j-simple:1.7.30"
slf4j-api = "org.slf4j:slf4j-api:1.7.30"
snakeyaml = "org.yaml:snakeyaml:1.33"
sqlline = "sqlline:sqlline:1.12.0"
quidem = "net.hydromatic:quidem:0.11"
venice = "com.linkedin.venice:venice-common:0.4.376"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public static Schema avro(String namespace, String name, RelDataType dataType) {
switch (dataType.getSqlTypeName()) {
case INTEGER:
return createAvroTypeWithNullability(Schema.Type.INT, dataType.isNullable());
case SMALLINT:
return createAvroTypeWithNullability(Schema.Type.INT, dataType.isNullable());
case BIGINT:
return createAvroTypeWithNullability(Schema.Type.LONG, dataType.isNullable());
case VARCHAR:
Expand All @@ -48,10 +50,9 @@ public static Schema avro(String namespace, String name, RelDataType dataType) {
case ARRAY:
return createAvroSchemaWithNullability(Schema.createArray(avro(null, null, dataType.getComponentType())),
dataType.isNullable());
// TODO support map types
// Appears to require a Calcite version bump
// case MAP:
// return createAvroSchemaWithNullability(Schema.createMap(avroPrimitive(dataType.getValueType())), dataType.isNullable());
case MAP:
return createAvroSchemaWithNullability(Schema.createMap(avro(null, null, dataType.getValueType())),
dataType.isNullable());
case UNKNOWN:
case NULL:
return Schema.createUnion(Schema.create(Schema.Type.NULL));
Expand Down Expand Up @@ -90,12 +91,11 @@ public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory, boo
RelDataType unknown = typeFactory.createUnknownType();
switch (schema.getType()) {
case RECORD:
return typeFactory.createStructType(schema.getFields()
.stream()
return typeFactory.createTypeWithNullability(typeFactory.createStructType(schema.getFields().stream()
.map(x -> new AbstractMap.SimpleEntry<>(x.name(), rel(x.schema(), typeFactory, nullable)))
.filter(x -> x.getValue().getSqlTypeName() != SqlTypeName.NULL)
.filter(x -> x.getValue().getSqlTypeName() != unknown.getSqlTypeName())
.collect(Collectors.toList()));
.collect(Collectors.toList())), nullable);
case INT:
return createRelType(typeFactory, SqlTypeName.INTEGER, nullable);
case LONG:
Expand All @@ -113,21 +113,25 @@ public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory, boo
case BOOLEAN:
return createRelType(typeFactory, SqlTypeName.BOOLEAN, nullable);
case ARRAY:
return typeFactory.createArrayType(rel(schema.getElementType(), typeFactory, true), -1);
// TODO support map types
// Appears to require a Calcite version bump
// case MAP:
// return typeFactory.createMapType(typeFactory.createSqlType(SqlTypeName.VARCHAR), rel(schema.getValueType(), typeFactory, nullable))
return typeFactory.createTypeWithNullability(
typeFactory.createArrayType(rel(schema.getElementType(), typeFactory, true), -1), nullable);
case MAP:
return typeFactory.createTypeWithNullability(
typeFactory.createMapType(typeFactory.createSqlType(SqlTypeName.VARCHAR), rel(schema.getValueType(), typeFactory, nullable)), nullable);
case UNION:
boolean isNullable = schema.isNullable();
if (schema.isNullable() && schema.getTypes().size() == 2) {
Schema innerType = schema.getTypes().stream().filter(x -> x.getType() != Schema.Type.NULL).findFirst().get();
return typeFactory.createTypeWithNullability(rel(innerType, typeFactory, true), true);
} else {
// TODO support more elaborate union types
return typeFactory.createTypeWithNullability(typeFactory.createUnknownType(), true);
}
return typeFactory.createTypeWithNullability(typeFactory.createStructType(schema.getTypes().stream()
.filter(x -> x.getType() != Schema.Type.NULL)
.map(x -> new AbstractMap.SimpleEntry<>(x.getName(), rel(x, typeFactory, isNullable)))
.filter(x -> x.getValue().getSqlTypeName() != SqlTypeName.NULL)
.filter(x -> x.getValue().getSqlTypeName() != unknown.getSqlTypeName())
.collect(Collectors.toList())), isNullable);
default:
return typeFactory.createTypeWithNullability(typeFactory.createUnknownType(), nullable);
return typeFactory.createTypeWithNullability(typeFactory.createUnknownType(), true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,23 @@ public void convertsNestedSchemas() {
RelDataType rel4 = AvroConverter.rel(avroSchema4);
assertTrue("types match", RelOptUtil.eq("rel4", rel4, "rel1", rel1, Litmus.THROW));
}

@Test
public void convertsNestedUnionSchemas() {
String schemaString = "{\"type\":\"record\",\"name\":\"record\",\"namespace\":\"ns\",\"fields\":[{\"name\":\"event\",\"type\":[{\"type\":\"record\",\"name\":\"record_event1\",\"fields\":[{\"name\":\"strField\",\"type\":\"string\"}]},{\"type\":\"record\",\"name\":\"record_event2\",\"fields\":[{\"name\":\"strField\",\"type\":\"string\"}]}]}]}";
Schema avroSchema1 = (new Schema.Parser()).parse(schemaString);
RelDataType rel1 = AvroConverter.rel(avroSchema1);
assertEquals(rel1.toString(), rel1.getFieldCount(), avroSchema1.getFields().size());
assertNotNull(rel1.toString(), rel1.getField("event", false, false));
RelDataType rel2 = rel1.getField("event", false, false).getType();
assertTrue(rel2.isStruct());
Schema avroSchema2 = avroSchema1.getField("event").schema();
assertEquals(rel2.toString(), rel2.getFieldCount(), avroSchema2.getTypes().size());
RelDataType rel3 = rel2.getField("record_event1", false, false).getType();
Schema avroSchema3 = avroSchema2.getTypes().get(0);
assertEquals(rel3.toString(), rel3.getFieldCount(), avroSchema3.getFields().size());
Schema avroSchema4 = AvroConverter.avro("NS", "R", rel1);
assertFalse("!avroSchema4.isNullable()", avroSchema4.isNullable());
assertEquals(avroSchema4.toString(), avroSchema4.getFields().size(), rel1.getFieldCount());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@

public final class DataTypeUtils {

private static final String MAP_KEY_TYPE = "keyType";
private static final String MAP_VALUE_TYPE = "valueType";

private DataTypeUtils() {
}

Expand Down Expand Up @@ -46,13 +49,17 @@ private static void flattenInto(RelDataTypeFactory typeFactory, RelDataType data
flattenInto(typeFactory, field.getType(), builder, Stream.concat(path.stream(),
Stream.of(field.getName())).collect(Collectors.toList()));
}
} else if (!dataType.isStruct()) {
builder.add(String.join("$", path), dataType);
} else {
} else if (dataType.isStruct()) {
for (RelDataTypeField field : dataType.getFieldList()) {
flattenInto(typeFactory, field.getType(), builder, Stream.concat(path.stream(),
Stream.of(field.getName())).collect(Collectors.toList()));
}
} else if (dataType.getKeyType() != null && dataType.getValueType() != null) {
builder.add(String.join("$", path) + "$" + MAP_KEY_TYPE, dataType.getKeyType());
flattenInto(typeFactory, dataType.getValueType(), builder, Stream.concat(path.stream(),
Stream.of(MAP_VALUE_TYPE)).collect(Collectors.toList()));
} else {
builder.add(String.join("$", path), dataType);
}
}

Expand Down Expand Up @@ -86,6 +93,13 @@ private static RelDataType buildRecord(Node node, RelDataTypeFactory typeFactory
return node.dataType;
}
RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory);
// Placeholders to handle MAP type
if (node.children.size() == 2
&& node.children.containsKey(MAP_KEY_TYPE) && node.children.containsKey(MAP_VALUE_TYPE)) {
RelDataType keyType = buildRecord(node.children.get(MAP_KEY_TYPE), typeFactory);
RelDataType valueType = buildRecord(node.children.get(MAP_VALUE_TYPE), typeFactory);
return typeFactory.createMapType(keyType, valueType);
}
for (Map.Entry<String, Node> child : node.children.entrySet()) {
builder.add(child.getKey(), buildRecord(child.getValue(), typeFactory));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.util.Pair;
import org.apache.calcite.runtime.ImmutablePairList;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;

import com.linkedin.hoptimator.Deployable;
import com.linkedin.hoptimator.Job;
Expand Down Expand Up @@ -49,15 +48,15 @@ public interface PipelineRel extends RelNode {

/** Implements a deployable Pipeline. */
class Implementor {
private final ImmutableList<Pair<Integer, String>> targetFields;
private final ImmutablePairList<Integer, String> targetFields;
private final Map<Source, RelDataType> sources = new LinkedHashMap<>();
private RelNode query;
private String sinkDatabase = "pipeline";
private List<String> sinkPath = Arrays.asList(new String[]{"PIPELINE", "SINK"});
private RelDataType sinkRowType = null;
private Map<String, String> sinkOptions = Collections.emptyMap();

public Implementor(ImmutableList<Pair<Integer, String>> targetFields) {
public Implementor(ImmutablePairList<Integer, String> targetFields) {
this.targetFields = targetFields;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.RelFactories;
Expand All @@ -20,13 +19,15 @@
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.runtime.ImmutablePairList;
import org.apache.calcite.sql.SqlBasicTypeNameSpec;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlCollectionTypeNameSpec;
import org.apache.calcite.sql.SqlDataTypeSpec;
import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlMapTypeNameSpec;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlRowTypeNameSpec;
Expand All @@ -42,11 +43,8 @@
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.util.SqlShuttle;
import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.util.Pair;
import org.apache.calcite.util.Util;

import com.google.common.collect.ImmutableList;


/**
* An abstract way to write SQL scripts.
Expand Down Expand Up @@ -103,7 +101,7 @@ default ScriptImplementor database(String database) {
}

/** Append an insert statement, e.g. `INSERT INTO ... SELECT ...` */
default ScriptImplementor insert(String schema, String table, RelNode relNode, ImmutableList<Pair<Integer, String>> targetFields) {
default ScriptImplementor insert(String schema, String table, RelNode relNode, ImmutablePairList<Integer, String> targetFields) {
return with(new InsertImplementor(schema, table, relNode, targetFields));
}

Expand Down Expand Up @@ -262,9 +260,9 @@ class InsertImplementor implements ScriptImplementor {
private final String schema;
private final String table;
private final RelNode relNode;
private final ImmutableList<Pair<Integer, String>> targetFields;
private final ImmutablePairList<Integer, String> targetFields;

public InsertImplementor(String schema, String table, RelNode relNode, ImmutableList<Pair<Integer, String>> targetFields) {
public InsertImplementor(String schema, String table, RelNode relNode, ImmutablePairList<Integer, String> targetFields) {
this.schema = schema;
this.table = table;
this.relNode = relNode;
Expand All @@ -283,10 +281,10 @@ public void implement(SqlWriter w) {

// Drops NULL fields
// Drops non-target columns, for use case: INSERT INTO (col1, col2) SELECT * FROM ...
private static RelNode dropFields(RelNode relNode, ImmutableList<Pair<Integer, String>> targetFields) {
private static RelNode dropFields(RelNode relNode, ImmutablePairList<Integer, String> targetFields) {
List<Integer> cols = new ArrayList<>();
int i = 0;
Set<String> targetFieldNames = targetFields.stream().map(x -> x.right).collect(Collectors.toSet());
Set<String> targetFieldNames = targetFields.stream().map(Map.Entry::getValue).collect(Collectors.toSet());
for (RelDataTypeField field : relNode.getRowType().getFieldList()) {
if (targetFieldNames.contains(field.getName())
&& !field.getType().getSqlTypeName().equals(SqlTypeName.NULL)) {
Expand Down Expand Up @@ -463,6 +461,9 @@ private static SqlDataTypeSpec toSpec(RelDataType dataType) {
.map(RelDataType::getSqlTypeName)
.orElseThrow(() -> new IllegalArgumentException("not a collection?")), SqlParserPos.ZERO),
dataType.getSqlTypeName(), SqlParserPos.ZERO), SqlParserPos.ZERO));
} else if (dataType.getKeyType() != null && dataType.getValueType() != null) {
return maybeNullable(dataType, new SqlDataTypeSpec(new SqlMapTypeNameSpec(
toSpec(dataType.getKeyType()), toSpec(dataType.getValueType()), SqlParserPos.ZERO), SqlParserPos.ZERO));
} else {
return maybeNullable(dataType,
new SqlDataTypeSpec(new SqlBasicTypeNameSpec(dataType.getSqlTypeName(), SqlParserPos.ZERO),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,46 @@ public void flattenUnflattenNestedArrays() {
+ "`CAR` FLOAT ARRAY) WITH ();", unflattenedConnector,
"Flattened-unflattened connector should be correct");
}

@Test
public void flattenUnflattenComplexMap() {
RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
RelDataTypeFactory.Builder mapValue = new RelDataTypeFactory.Builder(typeFactory);
mapValue.add("BAR", SqlTypeName.VARCHAR);
mapValue.add("CAR", SqlTypeName.INTEGER);

RelDataTypeFactory.Builder keyBuilder = new RelDataTypeFactory.Builder(typeFactory);
RelDataTypeFactory.Builder valueBuilder = new RelDataTypeFactory.Builder(typeFactory);
keyBuilder.add("QUX", SqlTypeName.VARCHAR);
valueBuilder.add("QIZ", mapValue.build());

RelDataTypeFactory.Builder mapBuilder = new RelDataTypeFactory.Builder(typeFactory);
mapBuilder.add("FOO", typeFactory.createMapType(keyBuilder.build(), valueBuilder.build()));
RelDataType rowType = mapBuilder.build();
Assertions.assertEquals(1, rowType.getFieldList().size());
RelDataType flattenedType = DataTypeUtils.flatten(rowType, typeFactory);
Assertions.assertEquals(3, flattenedType.getFieldList().size());
List<String> flattenedNames = flattenedType.getFieldList().stream().map(RelDataTypeField::getName)
.collect(Collectors.toList());
Assertions.assertIterableEquals(Arrays.asList("FOO$keyType", "FOO$valueType$QIZ$BAR", "FOO$valueType$QIZ$CAR"), flattenedNames);
String flattenedConnector = new ScriptImplementor.ConnectorImplementor("S", "T1",
flattenedType, Collections.emptyMap()).sql();
Assertions.assertEquals("CREATE TABLE IF NOT EXISTS `S`.`T1` ("
+ "`FOO_keyType` ROW(`QUX` VARCHAR), "
+ "`FOO_valueType_QIZ_BAR` VARCHAR, "
+ "`FOO_valueType_QIZ_CAR` INTEGER) WITH ();", flattenedConnector,
"Flattened connector should have simplified map");

RelDataType unflattenedType = DataTypeUtils.unflatten(flattenedType, typeFactory);
RelOptUtil.eq("original", rowType, "flattened-unflattened", unflattenedType, Litmus.THROW);
String originalConnector = new ScriptImplementor.ConnectorImplementor("S", "T1",
rowType, Collections.emptyMap()).sql();
String unflattenedConnector = new ScriptImplementor.ConnectorImplementor("S", "T1",
unflattenedType, Collections.emptyMap()).sql();
Assertions.assertEquals(originalConnector, unflattenedConnector,
"Flattening and unflattening data types should have no impact on connector");
Assertions.assertEquals("CREATE TABLE IF NOT EXISTS `S`.`T1` ("
+ "`FOO` MAP< ROW(`QUX` VARCHAR), ROW(`QIZ` ROW(`BAR` VARCHAR, `CAR` INTEGER)) >) WITH ();", unflattenedConnector,
"Flattened-unflattened connector should be correct");
}
}

0 comments on commit 2e84b5a

Please sign in to comment.