From 2e1049fcca9512ed382457f44b1def00ff640a90 Mon Sep 17 00:00:00 2001 From: Joseph Grogan Date: Fri, 7 Feb 2025 15:01:51 -0500 Subject: [PATCH] Support maps/nested unions, fix nullability --- .../hoptimator/avro/AvroConverter.java | 85 +++++++++++-------- 1 file changed, 50 insertions(+), 35 deletions(-) diff --git a/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroConverter.java b/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroConverter.java index 520b343f..770d4eb7 100644 --- a/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroConverter.java +++ b/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroConverter.java @@ -14,7 +14,6 @@ import org.apache.calcite.sql.type.SqlTypeFactoryImpl; import org.apache.calcite.sql.type.SqlTypeName; - /** Converts between Avro and Calcite's RelDataType */ public final class AvroConverter { @@ -23,13 +22,11 @@ private AvroConverter() { public static Schema avro(String namespace, String name, RelDataType dataType) { if (dataType.isStruct()) { - List fields = dataType.getFieldList() - .stream() - .map(x -> new Schema.Field(sanitize(x.getName()), avro(namespace, x.getName(), x.getType()), describe(x), - null)) + List fields = dataType.getFieldList().stream() + .map(x -> new Schema.Field(sanitize(x.getName()), avro(namespace, x.getName(), x.getType()), describe(x), null)) .collect(Collectors.toList()); - return createAvroSchemaWithNullability( - Schema.createRecord(sanitize(name), dataType.toString(), namespace, false, fields), dataType.isNullable()); + return createAvroSchemaWithNullability(Schema.createRecord(sanitize(name), dataType.toString(), namespace, false, fields), + dataType.isNullable()); } else { switch (dataType.getSqlTypeName()) { case INTEGER: @@ -51,10 +48,9 @@ public static Schema avro(String namespace, String name, RelDataType dataType) { case ARRAY: return createAvroSchemaWithNullability(Schema.createArray(avro(null, null, dataType.getComponentType())), dataType.isNullable()); - // TODO support map types - // Appears to require a Calcite version bump - // case MAP: - // return createAvroSchemaWithNullability(Schema.createMap(avroPrimitive(dataType.getValueType())), dataType.isNullable()); + case MAP: + return createAvroSchemaWithNullability(Schema.createMap(avro(null, null, dataType.getValueType())), + dataType.isNullable()); case UNKNOWN: case NULL: return Schema.createUnion(Schema.create(Schema.Type.NULL)); @@ -82,45 +78,59 @@ private static Schema createAvroTypeWithNullability(Schema.Type rawType, boolean } public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory) { + return rel(schema, typeFactory, false); + } + + /** Converts Avro Schema to RelDataType. + * Nullability is preserved except for array types, JDBC is incapable of interpreting e.g. "FLOAT NOT NULL ARRAY" + * causing "NOT NULL" arrays to get demoted to "ANY ARRAY" which is not desired. + */ + public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory, boolean nullable) { RelDataType unknown = typeFactory.createUnknownType(); switch (schema.getType()) { case RECORD: - return typeFactory.createStructType(schema.getFields() - .stream() - .map(x -> new AbstractMap.SimpleEntry<>(x.name(), rel(x.schema(), typeFactory))) + return typeFactory.createTypeWithNullability(typeFactory.createStructType(schema.getFields().stream() + .map(x -> new AbstractMap.SimpleEntry<>(x.name(), rel(x.schema(), typeFactory, nullable))) .filter(x -> x.getValue().getSqlTypeName() != SqlTypeName.NULL) .filter(x -> x.getValue().getSqlTypeName() != unknown.getSqlTypeName()) - .collect(Collectors.toList())); + .collect(Collectors.toList())), nullable); case INT: - return createRelType(typeFactory, SqlTypeName.INTEGER); + return createRelType(typeFactory, SqlTypeName.INTEGER, nullable); case LONG: - return createRelType(typeFactory, SqlTypeName.BIGINT); + return createRelType(typeFactory, SqlTypeName.BIGINT, nullable); case ENUM: - case FIXED: case STRING: - return createRelType(typeFactory, SqlTypeName.VARCHAR); + return createRelType(typeFactory, SqlTypeName.VARCHAR, nullable); + case FIXED: + return createRelType(typeFactory, SqlTypeName.VARBINARY, schema.getFixedSize(), nullable); + case BYTES: + return createRelType(typeFactory, SqlTypeName.VARBINARY, nullable); case FLOAT: - return createRelType(typeFactory, SqlTypeName.FLOAT); + return createRelType(typeFactory, SqlTypeName.FLOAT, nullable); case DOUBLE: - return createRelType(typeFactory, SqlTypeName.DOUBLE); + return createRelType(typeFactory, SqlTypeName.DOUBLE, nullable); case BOOLEAN: - return createRelType(typeFactory, SqlTypeName.BOOLEAN); + return createRelType(typeFactory, SqlTypeName.BOOLEAN, nullable); case ARRAY: - return typeFactory.createArrayType(rel(schema.getElementType(), typeFactory), -1); -// TODO support map types -// Appears to require a Calcite version bump -// case MAP: -// return typeFactory.createMapType(typeFactory.createSqlType(SqlTypeName.VARCHAR), rel(schema.getValueType(), typeFactory)); + return typeFactory.createTypeWithNullability( + typeFactory.createArrayType(rel(schema.getElementType(), typeFactory, true), -1), nullable); + case MAP: + return typeFactory.createTypeWithNullability( + typeFactory.createMapType(typeFactory.createSqlType(SqlTypeName.VARCHAR), rel(schema.getValueType(), typeFactory, nullable)), nullable); case UNION: + boolean isNullable = schema.isNullable(); if (schema.isNullable() && schema.getTypes().size() == 2) { Schema innerType = schema.getTypes().stream().filter(x -> x.getType() != Schema.Type.NULL).findFirst().get(); - return typeFactory.createTypeWithNullability(rel(innerType, typeFactory), true); - } else { - // TODO support more elaborate union types - return typeFactory.createTypeWithNullability(typeFactory.createUnknownType(), true); + return typeFactory.createTypeWithNullability(rel(innerType, typeFactory, true), true); } + return typeFactory.createTypeWithNullability(typeFactory.createStructType(schema.getTypes().stream() + .filter(x -> x.getType() != Schema.Type.NULL) + .map(x -> new AbstractMap.SimpleEntry<>(x.getName(), rel(x, typeFactory, isNullable))) + .filter(x -> x.getValue().getSqlTypeName() != SqlTypeName.NULL) + .filter(x -> x.getValue().getSqlTypeName() != unknown.getSqlTypeName()) + .collect(Collectors.toList())), isNullable); default: - return typeFactory.createUnknownType(); + return typeFactory.createTypeWithNullability(typeFactory.createUnknownType(), true); } } @@ -128,9 +138,14 @@ public static RelDataType rel(Schema schema) { return rel(schema, new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT)); } - private static RelDataType createRelType(RelDataTypeFactory typeFactory, SqlTypeName typeName) { - RelDataType rawType = typeFactory.createSqlType(typeName); - return typeFactory.createTypeWithNullability(rawType, false); + private static RelDataType createRelType(RelDataTypeFactory typeFactory, SqlTypeName typeName, boolean nullable) { + return createRelType(typeFactory, typeName, RelDataType.PRECISION_NOT_SPECIFIED, nullable); + } + + private static RelDataType createRelType(RelDataTypeFactory typeFactory, SqlTypeName typeName, + int precision, boolean nullable) { + RelDataType rawType = typeFactory.createSqlType(typeName, precision); + return typeFactory.createTypeWithNullability(rawType, nullable); } public static RelProtoDataType proto(Schema schema) {