From 8922004598bd19ea8da01dc16f4cc3d101321daa Mon Sep 17 00:00:00 2001 From: Joseph Grogan Date: Fri, 7 Feb 2025 15:38:08 -0500 Subject: [PATCH] Merge AvroConverters (#101) --- hoptimator-avro/build.gradle | 3 + .../hoptimator/avro/AvroConverter.java | 85 +++++---- .../hoptimator/avro}/AvroConverterTest.java | 2 +- hoptimator-catalog/build.gradle | 1 + .../hoptimator/catalog/AvroConverter.java | 163 ------------------ .../catalog/ScriptImplementorTest.java | 2 + hoptimator-operator/build.gradle | 5 +- .../subscription/SubscriptionEnvironment.java | 2 +- 8 files changed, 61 insertions(+), 202 deletions(-) rename {hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog => hoptimator-avro/src/test/java/com/linkedin/hoptimator/avro}/AvroConverterTest.java (98%) delete mode 100644 hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/AvroConverter.java diff --git a/hoptimator-avro/build.gradle b/hoptimator-avro/build.gradle index 3311448d..7d569c92 100644 --- a/hoptimator-avro/build.gradle +++ b/hoptimator-avro/build.gradle @@ -7,6 +7,9 @@ dependencies { implementation project(':hoptimator-api') implementation libs.avro implementation libs.calcite.core + + testImplementation libs.junit + testImplementation libs.assertj } publishing { diff --git a/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroConverter.java b/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroConverter.java index 520b343f..770d4eb7 100644 --- a/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroConverter.java +++ b/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroConverter.java @@ -14,7 +14,6 @@ import org.apache.calcite.sql.type.SqlTypeFactoryImpl; import org.apache.calcite.sql.type.SqlTypeName; - /** Converts between Avro and Calcite's RelDataType */ public final class AvroConverter { @@ -23,13 +22,11 @@ private AvroConverter() { public static Schema avro(String namespace, String name, RelDataType dataType) { if (dataType.isStruct()) { - List fields = dataType.getFieldList() - .stream() - .map(x -> new Schema.Field(sanitize(x.getName()), avro(namespace, x.getName(), x.getType()), describe(x), - null)) + List fields = dataType.getFieldList().stream() + .map(x -> new Schema.Field(sanitize(x.getName()), avro(namespace, x.getName(), x.getType()), describe(x), null)) .collect(Collectors.toList()); - return createAvroSchemaWithNullability( - Schema.createRecord(sanitize(name), dataType.toString(), namespace, false, fields), dataType.isNullable()); + return createAvroSchemaWithNullability(Schema.createRecord(sanitize(name), dataType.toString(), namespace, false, fields), + dataType.isNullable()); } else { switch (dataType.getSqlTypeName()) { case INTEGER: @@ -51,10 +48,9 @@ public static Schema avro(String namespace, String name, RelDataType dataType) { case ARRAY: return createAvroSchemaWithNullability(Schema.createArray(avro(null, null, dataType.getComponentType())), dataType.isNullable()); - // TODO support map types - // Appears to require a Calcite version bump - // case MAP: - // return createAvroSchemaWithNullability(Schema.createMap(avroPrimitive(dataType.getValueType())), dataType.isNullable()); + case MAP: + return createAvroSchemaWithNullability(Schema.createMap(avro(null, null, dataType.getValueType())), + dataType.isNullable()); case UNKNOWN: case NULL: return Schema.createUnion(Schema.create(Schema.Type.NULL)); @@ -82,45 +78,59 @@ private static Schema createAvroTypeWithNullability(Schema.Type rawType, boolean } public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory) { + return rel(schema, typeFactory, false); + } + + /** Converts Avro Schema to RelDataType. + * Nullability is preserved except for array types, JDBC is incapable of interpreting e.g. "FLOAT NOT NULL ARRAY" + * causing "NOT NULL" arrays to get demoted to "ANY ARRAY" which is not desired. + */ + public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory, boolean nullable) { RelDataType unknown = typeFactory.createUnknownType(); switch (schema.getType()) { case RECORD: - return typeFactory.createStructType(schema.getFields() - .stream() - .map(x -> new AbstractMap.SimpleEntry<>(x.name(), rel(x.schema(), typeFactory))) + return typeFactory.createTypeWithNullability(typeFactory.createStructType(schema.getFields().stream() + .map(x -> new AbstractMap.SimpleEntry<>(x.name(), rel(x.schema(), typeFactory, nullable))) .filter(x -> x.getValue().getSqlTypeName() != SqlTypeName.NULL) .filter(x -> x.getValue().getSqlTypeName() != unknown.getSqlTypeName()) - .collect(Collectors.toList())); + .collect(Collectors.toList())), nullable); case INT: - return createRelType(typeFactory, SqlTypeName.INTEGER); + return createRelType(typeFactory, SqlTypeName.INTEGER, nullable); case LONG: - return createRelType(typeFactory, SqlTypeName.BIGINT); + return createRelType(typeFactory, SqlTypeName.BIGINT, nullable); case ENUM: - case FIXED: case STRING: - return createRelType(typeFactory, SqlTypeName.VARCHAR); + return createRelType(typeFactory, SqlTypeName.VARCHAR, nullable); + case FIXED: + return createRelType(typeFactory, SqlTypeName.VARBINARY, schema.getFixedSize(), nullable); + case BYTES: + return createRelType(typeFactory, SqlTypeName.VARBINARY, nullable); case FLOAT: - return createRelType(typeFactory, SqlTypeName.FLOAT); + return createRelType(typeFactory, SqlTypeName.FLOAT, nullable); case DOUBLE: - return createRelType(typeFactory, SqlTypeName.DOUBLE); + return createRelType(typeFactory, SqlTypeName.DOUBLE, nullable); case BOOLEAN: - return createRelType(typeFactory, SqlTypeName.BOOLEAN); + return createRelType(typeFactory, SqlTypeName.BOOLEAN, nullable); case ARRAY: - return typeFactory.createArrayType(rel(schema.getElementType(), typeFactory), -1); -// TODO support map types -// Appears to require a Calcite version bump -// case MAP: -// return typeFactory.createMapType(typeFactory.createSqlType(SqlTypeName.VARCHAR), rel(schema.getValueType(), typeFactory)); + return typeFactory.createTypeWithNullability( + typeFactory.createArrayType(rel(schema.getElementType(), typeFactory, true), -1), nullable); + case MAP: + return typeFactory.createTypeWithNullability( + typeFactory.createMapType(typeFactory.createSqlType(SqlTypeName.VARCHAR), rel(schema.getValueType(), typeFactory, nullable)), nullable); case UNION: + boolean isNullable = schema.isNullable(); if (schema.isNullable() && schema.getTypes().size() == 2) { Schema innerType = schema.getTypes().stream().filter(x -> x.getType() != Schema.Type.NULL).findFirst().get(); - return typeFactory.createTypeWithNullability(rel(innerType, typeFactory), true); - } else { - // TODO support more elaborate union types - return typeFactory.createTypeWithNullability(typeFactory.createUnknownType(), true); + return typeFactory.createTypeWithNullability(rel(innerType, typeFactory, true), true); } + return typeFactory.createTypeWithNullability(typeFactory.createStructType(schema.getTypes().stream() + .filter(x -> x.getType() != Schema.Type.NULL) + .map(x -> new AbstractMap.SimpleEntry<>(x.getName(), rel(x, typeFactory, isNullable))) + .filter(x -> x.getValue().getSqlTypeName() != SqlTypeName.NULL) + .filter(x -> x.getValue().getSqlTypeName() != unknown.getSqlTypeName()) + .collect(Collectors.toList())), isNullable); default: - return typeFactory.createUnknownType(); + return typeFactory.createTypeWithNullability(typeFactory.createUnknownType(), true); } } @@ -128,9 +138,14 @@ public static RelDataType rel(Schema schema) { return rel(schema, new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT)); } - private static RelDataType createRelType(RelDataTypeFactory typeFactory, SqlTypeName typeName) { - RelDataType rawType = typeFactory.createSqlType(typeName); - return typeFactory.createTypeWithNullability(rawType, false); + private static RelDataType createRelType(RelDataTypeFactory typeFactory, SqlTypeName typeName, boolean nullable) { + return createRelType(typeFactory, typeName, RelDataType.PRECISION_NOT_SPECIFIED, nullable); + } + + private static RelDataType createRelType(RelDataTypeFactory typeFactory, SqlTypeName typeName, + int precision, boolean nullable) { + RelDataType rawType = typeFactory.createSqlType(typeName, precision); + return typeFactory.createTypeWithNullability(rawType, nullable); } public static RelProtoDataType proto(Schema schema) { diff --git a/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/AvroConverterTest.java b/hoptimator-avro/src/test/java/com/linkedin/hoptimator/avro/AvroConverterTest.java similarity index 98% rename from hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/AvroConverterTest.java rename to hoptimator-avro/src/test/java/com/linkedin/hoptimator/avro/AvroConverterTest.java index 6290553c..e36f7749 100644 --- a/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/AvroConverterTest.java +++ b/hoptimator-avro/src/test/java/com/linkedin/hoptimator/avro/AvroConverterTest.java @@ -1,4 +1,4 @@ -package com.linkedin.hoptimator.catalog; +package com.linkedin.hoptimator.avro; import org.apache.avro.Schema; import org.apache.calcite.plan.RelOptUtil; diff --git a/hoptimator-catalog/build.gradle b/hoptimator-catalog/build.gradle index 01dfd9fd..17cd40d3 100644 --- a/hoptimator-catalog/build.gradle +++ b/hoptimator-catalog/build.gradle @@ -5,6 +5,7 @@ plugins { } dependencies { + implementation project(':hoptimator-avro') implementation project(':hoptimator-util') implementation libs.avro implementation libs.calcite.core diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/AvroConverter.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/AvroConverter.java deleted file mode 100644 index 74c37308..00000000 --- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/AvroConverter.java +++ /dev/null @@ -1,163 +0,0 @@ -package com.linkedin.hoptimator.catalog; - -import java.util.AbstractMap; -import java.util.List; -import java.util.stream.Collectors; - -import org.apache.avro.Schema; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rel.type.RelDataTypeField; -import org.apache.calcite.rel.type.RelDataTypeImpl; -import org.apache.calcite.rel.type.RelDataTypeSystem; -import org.apache.calcite.rel.type.RelProtoDataType; -import org.apache.calcite.sql.type.SqlTypeFactoryImpl; -import org.apache.calcite.sql.type.SqlTypeName; - - -/** Converts between Avro and Calcite's RelDataType */ -public final class AvroConverter { - - private AvroConverter() { - } - - public static Schema avro(String namespace, String name, RelDataType dataType) { - if (dataType.isStruct()) { - List fields = - dataType.getFieldList().stream().filter(x -> !x.getName().startsWith("__")) // don't write out hidden fields - .map(x -> new Schema.Field(sanitize(x.getName()), avro(namespace, x.getName(), x.getType()), describe(x), - null)).collect(Collectors.toList()); - return createAvroSchemaWithNullability( - Schema.createRecord(sanitize(name), dataType.toString(), namespace, false, fields), dataType.isNullable()); - } else { - switch (dataType.getSqlTypeName()) { - case INTEGER: - return createAvroTypeWithNullability(Schema.Type.INT, dataType.isNullable()); - case SMALLINT: - return createAvroTypeWithNullability(Schema.Type.INT, dataType.isNullable()); - case BIGINT: - return createAvroTypeWithNullability(Schema.Type.LONG, dataType.isNullable()); - case VARCHAR: - return createAvroTypeWithNullability(Schema.Type.STRING, dataType.isNullable()); - case FLOAT: - return createAvroTypeWithNullability(Schema.Type.FLOAT, dataType.isNullable()); - case DOUBLE: - return createAvroTypeWithNullability(Schema.Type.DOUBLE, dataType.isNullable()); - case CHAR: - return createAvroTypeWithNullability(Schema.Type.STRING, dataType.isNullable()); - case BOOLEAN: - return createAvroTypeWithNullability(Schema.Type.BOOLEAN, dataType.isNullable()); - case ARRAY: - return createAvroSchemaWithNullability(Schema.createArray(avro(null, null, dataType.getComponentType())), - dataType.isNullable()); - case MAP: - return createAvroSchemaWithNullability(Schema.createMap(avro(null, null, dataType.getValueType())), - dataType.isNullable()); - case UNKNOWN: - case NULL: - return Schema.createUnion(Schema.create(Schema.Type.NULL)); - default: - throw new UnsupportedOperationException("No support yet for " + dataType.getSqlTypeName().toString()); - } - } - } - - public static Schema avro(String namespace, String name, RelProtoDataType relProtoDataType) { - RelDataTypeFactory factory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); - return avro(namespace, name, relProtoDataType.apply(factory)); - } - - private static Schema createAvroSchemaWithNullability(Schema schema, boolean nullable) { - if (nullable) { - return Schema.createUnion(Schema.create(Schema.Type.NULL), schema); - } else { - return schema; - } - } - - private static Schema createAvroTypeWithNullability(Schema.Type rawType, boolean nullable) { - return createAvroSchemaWithNullability(Schema.create(rawType), nullable); - } - - public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory) { - return rel(schema, typeFactory, false); - } - - /** Converts Avro Schema to RelDataType. - * Nullability is preserved except for array types, JDBC is incapable of interpreting e.g. "FLOAT NOT NULL ARRAY" - * causing "NOT NULL" arrays to get demoted to "ANY ARRAY" which is not desired. - */ - public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory, boolean nullable) { - RelDataType unknown = typeFactory.createUnknownType(); - switch (schema.getType()) { - case RECORD: - return typeFactory.createTypeWithNullability(typeFactory.createStructType(schema.getFields().stream() - .map(x -> new AbstractMap.SimpleEntry<>(x.name(), rel(x.schema(), typeFactory, nullable))) - .filter(x -> x.getValue().getSqlTypeName() != SqlTypeName.NULL) - .filter(x -> x.getValue().getSqlTypeName() != unknown.getSqlTypeName()) - .collect(Collectors.toList())), nullable); - case INT: - return createRelType(typeFactory, SqlTypeName.INTEGER, nullable); - case LONG: - return createRelType(typeFactory, SqlTypeName.BIGINT, nullable); - case ENUM: - case FIXED: - case STRING: - return createRelType(typeFactory, SqlTypeName.VARCHAR, nullable); - case BYTES: - return createRelType(typeFactory, SqlTypeName.VARBINARY, nullable); - case FLOAT: - return createRelType(typeFactory, SqlTypeName.FLOAT, nullable); - case DOUBLE: - return createRelType(typeFactory, SqlTypeName.DOUBLE, nullable); - case BOOLEAN: - return createRelType(typeFactory, SqlTypeName.BOOLEAN, nullable); - case ARRAY: - return typeFactory.createTypeWithNullability( - typeFactory.createArrayType(rel(schema.getElementType(), typeFactory, true), -1), nullable); - case MAP: - return typeFactory.createTypeWithNullability( - typeFactory.createMapType(typeFactory.createSqlType(SqlTypeName.VARCHAR), rel(schema.getValueType(), typeFactory, nullable)), nullable); - case UNION: - boolean isNullable = schema.isNullable(); - if (schema.isNullable() && schema.getTypes().size() == 2) { - Schema innerType = schema.getTypes().stream().filter(x -> x.getType() != Schema.Type.NULL).findFirst().get(); - return typeFactory.createTypeWithNullability(rel(innerType, typeFactory, true), true); - } - return typeFactory.createTypeWithNullability(typeFactory.createStructType(schema.getTypes().stream() - .filter(x -> x.getType() != Schema.Type.NULL) - .map(x -> new AbstractMap.SimpleEntry<>(x.getName(), rel(x, typeFactory, isNullable))) - .filter(x -> x.getValue().getSqlTypeName() != SqlTypeName.NULL) - .filter(x -> x.getValue().getSqlTypeName() != unknown.getSqlTypeName()) - .collect(Collectors.toList())), isNullable); - default: - return typeFactory.createTypeWithNullability(typeFactory.createUnknownType(), true); - } - } - - public static RelDataType rel(Schema schema) { - return rel(schema, DataType.DEFAULT_TYPE_FACTORY); - } - - private static RelDataType createRelType(RelDataTypeFactory typeFactory, SqlTypeName typeName, boolean nullable) { - RelDataType rawType = typeFactory.createSqlType(typeName); - return typeFactory.createTypeWithNullability(rawType, nullable); - } - - public static RelProtoDataType proto(Schema schema) { - return RelDataTypeImpl.proto(rel(schema, new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT))); - } - - private static String describe(RelDataTypeField dataType) { - return dataType.getName() + " " + dataType.getType().getFullTypeString(); - } - - private static String sanitize(String name) { - if (name.matches("^[^A-Za-z_]")) { - // avoid starting with numbers, etc - return sanitize("_" + name); - } - // avoid $, etc - return name.replaceAll("\\W", ""); - } -} diff --git a/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/ScriptImplementorTest.java b/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/ScriptImplementorTest.java index 6e4aaeac..b8541385 100644 --- a/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/ScriptImplementorTest.java +++ b/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/ScriptImplementorTest.java @@ -6,6 +6,8 @@ import org.apache.calcite.sql.pretty.SqlPrettyWriter; import org.junit.Test; +import com.linkedin.hoptimator.avro.AvroConverter; + import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; diff --git a/hoptimator-operator/build.gradle b/hoptimator-operator/build.gradle index 684fb0f5..5f319069 100644 --- a/hoptimator-operator/build.gradle +++ b/hoptimator-operator/build.gradle @@ -5,19 +5,20 @@ plugins { } dependencies { + implementation project(':hoptimator-avro') implementation project(':hoptimator-planner') implementation project(':hoptimator-catalog') // <-- marked for deletion implementation project(':hoptimator-util') implementation project(':hoptimator-k8s') implementation project(':hoptimator-models') // <-- marked for deletion - + implementation libs.calcite.core implementation libs.kubernetes.client implementation libs.kubernetes.extended.client implementation libs.slf4j.api implementation libs.commons.cli implementation libs.avro - + testImplementation libs.junit testImplementation libs.assertj } diff --git a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionEnvironment.java b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionEnvironment.java index 36733adc..18b452ee 100644 --- a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionEnvironment.java +++ b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionEnvironment.java @@ -1,6 +1,6 @@ package com.linkedin.hoptimator.operator.subscription; -import com.linkedin.hoptimator.catalog.AvroConverter; +import com.linkedin.hoptimator.avro.AvroConverter; import com.linkedin.hoptimator.catalog.Resource; import com.linkedin.hoptimator.planner.Pipeline;