diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index e1df204..c8c950b 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -1,9 +1,9 @@ [libraries] assertj = "org.assertj:assertj-core:3.12.0" avro = "org.apache.avro:avro:1.10.2" -calcite-avatica = "org.apache.calcite.avatica:avatica:1.23.0" -calcite-core = "org.apache.calcite:calcite-core:1.34.0" -calcite-server = "org.apache.calcite:calcite-server:1.34.0" +calcite-avatica = "org.apache.calcite.avatica:avatica:1.25.0" +calcite-core = "org.apache.calcite:calcite-core:1.38.0" +calcite-server = "org.apache.calcite:calcite-server:1.38.0" commons-cli = "commons-cli:commons-cli:1.4" flink-clients = "org.apache.flink:flink-clients:1.18.1" flink-connector-base = "org.apache.flink:flink-connector-base:1.18.1" @@ -19,16 +19,15 @@ flink-table-common = "org.apache.flink:flink-table-common:1.18.1" flink-table-planner = "org.apache.flink:flink-table-planner_2.12:1.18.1" flink-table-runtime = "org.apache.flink:flink-table-runtime:1.18.1" gson = "com.google.code.gson:gson:2.9.0" -jackson = "com.fasterxml.jackson.core:jackson-core:2.14.1" -jackson-dataformat-yaml = "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.14.1" +jackson = "com.fasterxml.jackson.core:jackson-core:2.15.0" +jackson-dataformat-yaml = "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.15.0" javax-annotation-api = "javax.annotation:javax.annotation-api:1.3.2" junit = "junit:junit:4.12" kafka-clients = "org.apache.kafka:kafka-clients:3.2.0" -kubernetes-client = "io.kubernetes:client-java:16.0.2" -kubernetes-extended-client = "io.kubernetes:client-java-extended:16.0.2" +kubernetes-client = "io.kubernetes:client-java:18.0.0" +kubernetes-extended-client = "io.kubernetes:client-java-extended:18.0.0" slf4j-simple = "org.slf4j:slf4j-simple:1.7.30" slf4j-api = "org.slf4j:slf4j-api:1.7.30" -snakeyaml = "org.yaml:snakeyaml:1.33" sqlline = "sqlline:sqlline:1.12.0" quidem = "net.hydromatic:quidem:0.11" venice = "com.linkedin.venice:venice-common:0.4.376" diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/AvroConverter.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/AvroConverter.java index feec012..74c3730 100644 --- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/AvroConverter.java +++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/AvroConverter.java @@ -33,6 +33,8 @@ public static Schema avro(String namespace, String name, RelDataType dataType) { switch (dataType.getSqlTypeName()) { case INTEGER: return createAvroTypeWithNullability(Schema.Type.INT, dataType.isNullable()); + case SMALLINT: + return createAvroTypeWithNullability(Schema.Type.INT, dataType.isNullable()); case BIGINT: return createAvroTypeWithNullability(Schema.Type.LONG, dataType.isNullable()); case VARCHAR: @@ -48,10 +50,9 @@ public static Schema avro(String namespace, String name, RelDataType dataType) { case ARRAY: return createAvroSchemaWithNullability(Schema.createArray(avro(null, null, dataType.getComponentType())), dataType.isNullable()); - // TODO support map types - // Appears to require a Calcite version bump - // case MAP: - // return createAvroSchemaWithNullability(Schema.createMap(avroPrimitive(dataType.getValueType())), dataType.isNullable()); + case MAP: + return createAvroSchemaWithNullability(Schema.createMap(avro(null, null, dataType.getValueType())), + dataType.isNullable()); case UNKNOWN: case NULL: return Schema.createUnion(Schema.create(Schema.Type.NULL)); @@ -90,12 +91,11 @@ public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory, boo RelDataType unknown = typeFactory.createUnknownType(); switch (schema.getType()) { case RECORD: - return typeFactory.createStructType(schema.getFields() - .stream() + return typeFactory.createTypeWithNullability(typeFactory.createStructType(schema.getFields().stream() .map(x -> new AbstractMap.SimpleEntry<>(x.name(), rel(x.schema(), typeFactory, nullable))) .filter(x -> x.getValue().getSqlTypeName() != SqlTypeName.NULL) .filter(x -> x.getValue().getSqlTypeName() != unknown.getSqlTypeName()) - .collect(Collectors.toList())); + .collect(Collectors.toList())), nullable); case INT: return createRelType(typeFactory, SqlTypeName.INTEGER, nullable); case LONG: @@ -113,21 +113,25 @@ public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory, boo case BOOLEAN: return createRelType(typeFactory, SqlTypeName.BOOLEAN, nullable); case ARRAY: - return typeFactory.createArrayType(rel(schema.getElementType(), typeFactory, true), -1); -// TODO support map types -// Appears to require a Calcite version bump -// case MAP: -// return typeFactory.createMapType(typeFactory.createSqlType(SqlTypeName.VARCHAR), rel(schema.getValueType(), typeFactory, nullable)) + return typeFactory.createTypeWithNullability( + typeFactory.createArrayType(rel(schema.getElementType(), typeFactory, true), -1), nullable); + case MAP: + return typeFactory.createTypeWithNullability( + typeFactory.createMapType(typeFactory.createSqlType(SqlTypeName.VARCHAR), rel(schema.getValueType(), typeFactory, nullable)), nullable); case UNION: + boolean isNullable = schema.isNullable(); if (schema.isNullable() && schema.getTypes().size() == 2) { Schema innerType = schema.getTypes().stream().filter(x -> x.getType() != Schema.Type.NULL).findFirst().get(); return typeFactory.createTypeWithNullability(rel(innerType, typeFactory, true), true); - } else { - // TODO support more elaborate union types - return typeFactory.createTypeWithNullability(typeFactory.createUnknownType(), true); } + return typeFactory.createTypeWithNullability(typeFactory.createStructType(schema.getTypes().stream() + .filter(x -> x.getType() != Schema.Type.NULL) + .map(x -> new AbstractMap.SimpleEntry<>(x.getName(), rel(x, typeFactory, isNullable))) + .filter(x -> x.getValue().getSqlTypeName() != SqlTypeName.NULL) + .filter(x -> x.getValue().getSqlTypeName() != unknown.getSqlTypeName()) + .collect(Collectors.toList())), isNullable); default: - return typeFactory.createTypeWithNullability(typeFactory.createUnknownType(), nullable); + return typeFactory.createTypeWithNullability(typeFactory.createUnknownType(), true); } } diff --git a/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/AvroConverterTest.java b/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/AvroConverterTest.java index 8558bac..6290553 100644 --- a/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/AvroConverterTest.java +++ b/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/AvroConverterTest.java @@ -45,4 +45,23 @@ public void convertsNestedSchemas() { RelDataType rel4 = AvroConverter.rel(avroSchema4); assertTrue("types match", RelOptUtil.eq("rel4", rel4, "rel1", rel1, Litmus.THROW)); } + + @Test + public void convertsNestedUnionSchemas() { + String schemaString = "{\"type\":\"record\",\"name\":\"record\",\"namespace\":\"ns\",\"fields\":[{\"name\":\"event\",\"type\":[{\"type\":\"record\",\"name\":\"record_event1\",\"fields\":[{\"name\":\"strField\",\"type\":\"string\"}]},{\"type\":\"record\",\"name\":\"record_event2\",\"fields\":[{\"name\":\"strField\",\"type\":\"string\"}]}]}]}"; + Schema avroSchema1 = (new Schema.Parser()).parse(schemaString); + RelDataType rel1 = AvroConverter.rel(avroSchema1); + assertEquals(rel1.toString(), rel1.getFieldCount(), avroSchema1.getFields().size()); + assertNotNull(rel1.toString(), rel1.getField("event", false, false)); + RelDataType rel2 = rel1.getField("event", false, false).getType(); + assertTrue(rel2.isStruct()); + Schema avroSchema2 = avroSchema1.getField("event").schema(); + assertEquals(rel2.toString(), rel2.getFieldCount(), avroSchema2.getTypes().size()); + RelDataType rel3 = rel2.getField("record_event1", false, false).getType(); + Schema avroSchema3 = avroSchema2.getTypes().get(0); + assertEquals(rel3.toString(), rel3.getFieldCount(), avroSchema3.getFields().size()); + Schema avroSchema4 = AvroConverter.avro("NS", "R", rel1); + assertFalse("!avroSchema4.isNullable()", avroSchema4.isNullable()); + assertEquals(avroSchema4.toString(), avroSchema4.getFields().size(), rel1.getFieldCount()); + } } diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DataTypeUtils.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DataTypeUtils.java index f0d9bb9..816c901 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DataTypeUtils.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DataTypeUtils.java @@ -15,6 +15,9 @@ public final class DataTypeUtils { + private static final String MAP_KEY_TYPE = "keyType"; + private static final String MAP_VALUE_TYPE = "valueType"; + private DataTypeUtils() { } @@ -46,13 +49,17 @@ private static void flattenInto(RelDataTypeFactory typeFactory, RelDataType data flattenInto(typeFactory, field.getType(), builder, Stream.concat(path.stream(), Stream.of(field.getName())).collect(Collectors.toList())); } - } else if (!dataType.isStruct()) { - builder.add(String.join("$", path), dataType); - } else { + } else if (dataType.isStruct()) { for (RelDataTypeField field : dataType.getFieldList()) { flattenInto(typeFactory, field.getType(), builder, Stream.concat(path.stream(), Stream.of(field.getName())).collect(Collectors.toList())); } + } else if (dataType.getKeyType() != null && dataType.getValueType() != null) { + builder.add(String.join("$", path) + "$" + MAP_KEY_TYPE, dataType.getKeyType()); + flattenInto(typeFactory, dataType.getValueType(), builder, Stream.concat(path.stream(), + Stream.of(MAP_VALUE_TYPE)).collect(Collectors.toList())); + } else { + builder.add(String.join("$", path), dataType); } } @@ -86,6 +93,13 @@ private static RelDataType buildRecord(Node node, RelDataTypeFactory typeFactory return node.dataType; } RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory); + // Placeholders to handle MAP type + if (node.children.size() == 2 + && node.children.containsKey(MAP_KEY_TYPE) && node.children.containsKey(MAP_VALUE_TYPE)) { + RelDataType keyType = buildRecord(node.children.get(MAP_KEY_TYPE), typeFactory); + RelDataType valueType = buildRecord(node.children.get(MAP_VALUE_TYPE), typeFactory); + return typeFactory.createMapType(keyType, valueType); + } for (Map.Entry child : node.children.entrySet()) { builder.add(child.getKey(), buildRecord(child.getValue(), typeFactory)); } diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java index 38a5018..ee02114 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java @@ -14,10 +14,9 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; -import org.apache.calcite.util.Pair; +import org.apache.calcite.runtime.ImmutablePairList; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; import com.linkedin.hoptimator.Deployable; import com.linkedin.hoptimator.Job; @@ -49,7 +48,7 @@ public interface PipelineRel extends RelNode { /** Implements a deployable Pipeline. */ class Implementor { - private final ImmutableList> targetFields; + private final ImmutablePairList targetFields; private final Map sources = new LinkedHashMap<>(); private RelNode query; private String sinkDatabase = "pipeline"; @@ -57,7 +56,7 @@ class Implementor { private RelDataType sinkRowType = null; private Map sinkOptions = Collections.emptyMap(); - public Implementor(ImmutableList> targetFields) { + public Implementor(ImmutablePairList targetFields) { this.targetFields = targetFields; } diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java index 118ca50..e5ff6bc 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java @@ -9,7 +9,6 @@ import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.RelFactories; @@ -20,6 +19,7 @@ import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rel.type.RelDataTypeSystem; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.runtime.ImmutablePairList; import org.apache.calcite.sql.SqlBasicTypeNameSpec; import org.apache.calcite.sql.SqlCall; import org.apache.calcite.sql.SqlCollectionTypeNameSpec; @@ -27,6 +27,7 @@ import org.apache.calcite.sql.SqlDialect; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlMapTypeNameSpec; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.SqlRowTypeNameSpec; @@ -42,11 +43,8 @@ import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.sql.util.SqlShuttle; import org.apache.calcite.tools.RelBuilder; -import org.apache.calcite.util.Pair; import org.apache.calcite.util.Util; -import com.google.common.collect.ImmutableList; - /** * An abstract way to write SQL scripts. @@ -103,7 +101,7 @@ default ScriptImplementor database(String database) { } /** Append an insert statement, e.g. `INSERT INTO ... SELECT ...` */ - default ScriptImplementor insert(String schema, String table, RelNode relNode, ImmutableList> targetFields) { + default ScriptImplementor insert(String schema, String table, RelNode relNode, ImmutablePairList targetFields) { return with(new InsertImplementor(schema, table, relNode, targetFields)); } @@ -262,9 +260,9 @@ class InsertImplementor implements ScriptImplementor { private final String schema; private final String table; private final RelNode relNode; - private final ImmutableList> targetFields; + private final ImmutablePairList targetFields; - public InsertImplementor(String schema, String table, RelNode relNode, ImmutableList> targetFields) { + public InsertImplementor(String schema, String table, RelNode relNode, ImmutablePairList targetFields) { this.schema = schema; this.table = table; this.relNode = relNode; @@ -283,10 +281,10 @@ public void implement(SqlWriter w) { // Drops NULL fields // Drops non-target columns, for use case: INSERT INTO (col1, col2) SELECT * FROM ... - private static RelNode dropFields(RelNode relNode, ImmutableList> targetFields) { + private static RelNode dropFields(RelNode relNode, ImmutablePairList targetFields) { List cols = new ArrayList<>(); int i = 0; - Set targetFieldNames = targetFields.stream().map(x -> x.right).collect(Collectors.toSet()); + Set targetFieldNames = targetFields.stream().map(Map.Entry::getValue).collect(Collectors.toSet()); for (RelDataTypeField field : relNode.getRowType().getFieldList()) { if (targetFieldNames.contains(field.getName()) && !field.getType().getSqlTypeName().equals(SqlTypeName.NULL)) { @@ -463,6 +461,9 @@ private static SqlDataTypeSpec toSpec(RelDataType dataType) { .map(RelDataType::getSqlTypeName) .orElseThrow(() -> new IllegalArgumentException("not a collection?")), SqlParserPos.ZERO), dataType.getSqlTypeName(), SqlParserPos.ZERO), SqlParserPos.ZERO)); + } else if (dataType.getKeyType() != null && dataType.getValueType() != null) { + return maybeNullable(dataType, new SqlDataTypeSpec(new SqlMapTypeNameSpec( + toSpec(dataType.getKeyType()), toSpec(dataType.getValueType()), SqlParserPos.ZERO), SqlParserPos.ZERO)); } else { return maybeNullable(dataType, new SqlDataTypeSpec(new SqlBasicTypeNameSpec(dataType.getSqlTypeName(), SqlParserPos.ZERO), diff --git a/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/TestDataTypeUtils.java b/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/TestDataTypeUtils.java index e8d78de..090dd33 100644 --- a/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/TestDataTypeUtils.java +++ b/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/TestDataTypeUtils.java @@ -97,4 +97,46 @@ public void flattenUnflattenNestedArrays() { + "`CAR` FLOAT ARRAY) WITH ();", unflattenedConnector, "Flattened-unflattened connector should be correct"); } + + @Test + public void flattenUnflattenComplexMap() { + RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); + RelDataTypeFactory.Builder mapValue = new RelDataTypeFactory.Builder(typeFactory); + mapValue.add("BAR", SqlTypeName.VARCHAR); + mapValue.add("CAR", SqlTypeName.INTEGER); + + RelDataTypeFactory.Builder keyBuilder = new RelDataTypeFactory.Builder(typeFactory); + RelDataTypeFactory.Builder valueBuilder = new RelDataTypeFactory.Builder(typeFactory); + keyBuilder.add("QUX", SqlTypeName.VARCHAR); + valueBuilder.add("QIZ", mapValue.build()); + + RelDataTypeFactory.Builder mapBuilder = new RelDataTypeFactory.Builder(typeFactory); + mapBuilder.add("FOO", typeFactory.createMapType(keyBuilder.build(), valueBuilder.build())); + RelDataType rowType = mapBuilder.build(); + Assertions.assertEquals(1, rowType.getFieldList().size()); + RelDataType flattenedType = DataTypeUtils.flatten(rowType, typeFactory); + Assertions.assertEquals(3, flattenedType.getFieldList().size()); + List flattenedNames = flattenedType.getFieldList().stream().map(RelDataTypeField::getName) + .collect(Collectors.toList()); + Assertions.assertIterableEquals(Arrays.asList("FOO$keyType", "FOO$valueType$QIZ$BAR", "FOO$valueType$QIZ$CAR"), flattenedNames); + String flattenedConnector = new ScriptImplementor.ConnectorImplementor("S", "T1", + flattenedType, Collections.emptyMap()).sql(); + Assertions.assertEquals("CREATE TABLE IF NOT EXISTS `S`.`T1` (" + + "`FOO_keyType` ROW(`QUX` VARCHAR), " + + "`FOO_valueType_QIZ_BAR` VARCHAR, " + + "`FOO_valueType_QIZ_CAR` INTEGER) WITH ();", flattenedConnector, + "Flattened connector should have simplified map"); + + RelDataType unflattenedType = DataTypeUtils.unflatten(flattenedType, typeFactory); + RelOptUtil.eq("original", rowType, "flattened-unflattened", unflattenedType, Litmus.THROW); + String originalConnector = new ScriptImplementor.ConnectorImplementor("S", "T1", + rowType, Collections.emptyMap()).sql(); + String unflattenedConnector = new ScriptImplementor.ConnectorImplementor("S", "T1", + unflattenedType, Collections.emptyMap()).sql(); + Assertions.assertEquals(originalConnector, unflattenedConnector, + "Flattening and unflattening data types should have no impact on connector"); + Assertions.assertEquals("CREATE TABLE IF NOT EXISTS `S`.`T1` (" + + "`FOO` MAP< ROW(`QUX` VARCHAR), ROW(`QIZ` ROW(`BAR` VARCHAR, `CAR` INTEGER)) >) WITH ();", unflattenedConnector, + "Flattened-unflattened connector should be correct"); + } }