diff --git a/pom.xml b/pom.xml index 530680dcb..de9dcc058 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ io.confluent kafka-connect-storage-common-parent - 11.0.16 + 11.1.0 kafka-connect-hdfs @@ -52,12 +52,11 @@ - https://packages.confluent.io/maven/ 2.0.0-M2 1.2.17-cp8 0.11.1 2.5.3 - 11.0.16 + 11.1.0 3.2.2 0.13.0 2.17.1 @@ -70,7 +69,7 @@ confluent Confluent - ${confluent.maven.repo} + https://packages.confluent.io/maven/ diff --git a/src/main/java/io/confluent/connect/hdfs/orc/OrcFileReader.java b/src/main/java/io/confluent/connect/hdfs/orc/OrcFileReader.java index 4ccb18222..40db808e1 100644 --- a/src/main/java/io/confluent/connect/hdfs/orc/OrcFileReader.java +++ b/src/main/java/io/confluent/connect/hdfs/orc/OrcFileReader.java @@ -22,13 +22,15 @@ import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.OrcFile.ReaderOptions; import org.apache.hadoop.hive.ql.io.orc.Reader; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.PrimitiveTypeEntry; import org.apache.kafka.connect.data.ConnectSchema; import org.apache.kafka.connect.data.Date; +import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Timestamp; @@ -53,40 +55,8 @@ public Schema getSchema(HdfsSinkConnectorConfig conf, Path path) { Reader reader = OrcFile.createReader(path, readerOptions); if (reader.getObjectInspector().getCategory() == ObjectInspector.Category.STRUCT) { - SchemaBuilder schemaBuilder = SchemaBuilder.struct().name("record").version(1); StructObjectInspector objectInspector = (StructObjectInspector) reader.getObjectInspector(); - - for (StructField schema : objectInspector.getAllStructFieldRefs()) { - ObjectInspector fieldObjectInspector = schema.getFieldObjectInspector(); - String typeName = fieldObjectInspector.getTypeName(); - Schema.Type schemaType; - - switch (fieldObjectInspector.getCategory()) { - case PRIMITIVE: - PrimitiveTypeEntry typeEntry = PrimitiveObjectInspectorUtils - .getTypeEntryFromTypeName(typeName); - if (java.sql.Date.class.isAssignableFrom(typeEntry.primitiveJavaClass)) { - schemaType = Date.SCHEMA.type(); - } else if (java.sql.Timestamp.class.isAssignableFrom(typeEntry.primitiveJavaClass)) { - schemaType = Timestamp.SCHEMA.type(); - } else { - schemaType = ConnectSchema.schemaType(typeEntry.primitiveJavaClass); - } - break; - case LIST: - schemaType = Schema.Type.ARRAY; - break; - case MAP: - schemaType = Schema.Type.MAP; - break; - default: - throw new DataException("Unknown type " + fieldObjectInspector.getCategory().name()); - } - - schemaBuilder.field(schema.getFieldName(), SchemaBuilder.type(schemaType).build()); - } - - return schemaBuilder.build(); + return deriveStruct(objectInspector); } else { throw new ConnectException( "Top level type must be of type STRUCT, but was " @@ -98,6 +68,59 @@ public Schema getSchema(HdfsSinkConnectorConfig conf, Path path) { } } + private Schema derivePrimitive(PrimitiveObjectInspector inspector) { + Class klass = inspector.getTypeInfo().getPrimitiveJavaClass(); + if (java.sql.Date.class.isAssignableFrom(klass)) { + return Date.SCHEMA; + } else if (java.sql.Timestamp.class.isAssignableFrom(klass)) { + return Timestamp.SCHEMA; + } else if (org.apache.hadoop.hive.common.type.HiveDecimal.class.isAssignableFrom(klass)) { + return Decimal.schema(inspector.scale()); + } + return SchemaBuilder.type(ConnectSchema.schemaType(klass)).build(); + + } + + private Schema deriveSchema(ObjectInspector inspector) { + + switch (inspector.getCategory()) { + case PRIMITIVE: + return derivePrimitive((PrimitiveObjectInspector) inspector); + case MAP: + return deriveMap((MapObjectInspector) inspector); + case LIST: + return deriveList((ListObjectInspector) inspector); + case STRUCT: + return deriveStruct((StructObjectInspector) inspector); + default: + throw new DataException("Unknown type " + inspector.getCategory() + .name()); + } + } + + private Schema deriveStruct(StructObjectInspector inspector) { + + SchemaBuilder schemaBuilder = SchemaBuilder.struct(); + for (StructField field: inspector.getAllStructFieldRefs()) { + ObjectInspector fieldInspector = field.getFieldObjectInspector(); + schemaBuilder.field(field.getFieldName(), deriveSchema(fieldInspector)); + } + schemaBuilder.name("record").version(1); + return schemaBuilder.build(); + } + + + private Schema deriveMap(MapObjectInspector inspector) { + return SchemaBuilder.map( + deriveSchema(inspector.getMapKeyObjectInspector()), + deriveSchema(inspector.getMapValueObjectInspector()) + ).build(); + } + + private Schema deriveList(ListObjectInspector inspector) { + return SchemaBuilder.array(deriveSchema(inspector.getListElementObjectInspector())).build(); + } + @Override public boolean hasNext() { throw new UnsupportedOperationException(); diff --git a/src/main/java/io/confluent/connect/hdfs/orc/OrcHiveUtil.java b/src/main/java/io/confluent/connect/hdfs/orc/OrcHiveUtil.java index 526b83891..a66b6d6b9 100644 --- a/src/main/java/io/confluent/connect/hdfs/orc/OrcHiveUtil.java +++ b/src/main/java/io/confluent/connect/hdfs/orc/OrcHiveUtil.java @@ -44,7 +44,7 @@ public OrcHiveUtil(HdfsSinkConnectorConfig config, HiveMetaStore hiveMetaStore) @Override public void alterSchema(String database, String tableName, Schema schema) { Table table = hiveMetaStore.getTable(database, tableName); - List columns = HiveSchemaConverter.convertSchema(schema); + List columns = HiveSchemaConverter.convertSchemaMaybeLogical(schema); table.setFields(columns); hiveMetaStore.alterTable(table); } @@ -85,7 +85,7 @@ private Table constructOrcTable( } // convert Connect schema schema to Hive columns - List columns = HiveSchemaConverter.convertSchema(schema); + List columns = HiveSchemaConverter.convertSchemaMaybeLogical(schema); table.setFields(columns); table.setPartCols(partitioner.partitionFields()); return table; diff --git a/src/main/java/io/confluent/connect/hdfs/orc/OrcRecordWriterProvider.java b/src/main/java/io/confluent/connect/hdfs/orc/OrcRecordWriterProvider.java index aba47b9e5..53b612d22 100644 --- a/src/main/java/io/confluent/connect/hdfs/orc/OrcRecordWriterProvider.java +++ b/src/main/java/io/confluent/connect/hdfs/orc/OrcRecordWriterProvider.java @@ -70,7 +70,7 @@ public void preFooterWrite(OrcFile.WriterContext writerContext) { } }; - typeInfo = HiveSchemaConverter.convert(schema); + typeInfo = HiveSchemaConverter.convertMaybeLogical(schema); ObjectInspector objectInspector = OrcStruct.createObjectInspector(typeInfo); log.info("Opening ORC record writer for: {}", filename); @@ -90,7 +90,7 @@ public void preFooterWrite(OrcFile.WriterContext writerContext) { ); Struct struct = (Struct) record.value(); - OrcStruct row = OrcUtil.createOrcStruct(typeInfo, OrcUtil.convertStruct(struct)); + OrcStruct row = (OrcStruct) OrcUtil.convert(typeInfo, struct.schema(), struct); writer.addRow(row); } else { diff --git a/src/main/java/io/confluent/connect/hdfs/orc/OrcUtil.java b/src/main/java/io/confluent/connect/hdfs/orc/OrcUtil.java index 25d783cb9..66d6fc054 100644 --- a/src/main/java/io/confluent/connect/hdfs/orc/OrcUtil.java +++ b/src/main/java/io/confluent/connect/hdfs/orc/OrcUtil.java @@ -15,7 +15,6 @@ package io.confluent.connect.hdfs.orc; -import static org.apache.kafka.connect.data.Schema.Type.ARRAY; import static org.apache.kafka.connect.data.Schema.Type.BOOLEAN; import static org.apache.kafka.connect.data.Schema.Type.BYTES; import static org.apache.kafka.connect.data.Schema.Type.FLOAT32; @@ -24,32 +23,38 @@ import static org.apache.kafka.connect.data.Schema.Type.INT32; import static org.apache.kafka.connect.data.Schema.Type.INT64; import static org.apache.kafka.connect.data.Schema.Type.INT8; -import static org.apache.kafka.connect.data.Schema.Type.MAP; import static org.apache.kafka.connect.data.Schema.Type.STRING; -import static org.apache.kafka.connect.data.Schema.Type.STRUCT; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.AbstractMap; +import java.util.AbstractMap.SimpleEntry; import java.util.HashMap; import java.util.Map; -import java.util.function.BiFunction; + +import java.util.stream.Collectors; +import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.ql.io.orc.OrcStruct; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.hadoop.hive.serde2.io.ShortWritable; import org.apache.hadoop.hive.serde2.io.TimestampWritable; import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.io.ArrayPrimitiveWritable; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.MapWritable; -import org.apache.hadoop.io.ObjectWritable; import org.apache.hadoop.io.Text; import org.apache.kafka.connect.data.Date; +import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema.Type; @@ -59,24 +64,23 @@ import java.util.LinkedList; import java.util.List; +import java.util.function.BiFunction; public final class OrcUtil { - private static Map> CONVERSION_MAP = new HashMap<>(); + private static final Map> PRIMITIVE_CONVERSION_MAP = + new HashMap<>(); static { - CONVERSION_MAP.put(ARRAY, OrcUtil::convertArray); - CONVERSION_MAP.put(BOOLEAN, OrcUtil::convertBoolean); - CONVERSION_MAP.put(BYTES, OrcUtil::convertBytes); - CONVERSION_MAP.put(FLOAT32, OrcUtil::convertFloat32); - CONVERSION_MAP.put(FLOAT64, OrcUtil::convertFloat64); - CONVERSION_MAP.put(INT8, OrcUtil::convertInt8); - CONVERSION_MAP.put(INT16, OrcUtil::convertInt16); - CONVERSION_MAP.put(INT32, OrcUtil::convertInt32); - CONVERSION_MAP.put(INT64, OrcUtil::convertInt64); - CONVERSION_MAP.put(MAP, OrcUtil::convertMap); - CONVERSION_MAP.put(STRING, OrcUtil::convertString); - CONVERSION_MAP.put(STRUCT, OrcUtil::convertStruct); + PRIMITIVE_CONVERSION_MAP.put(BOOLEAN, OrcUtil::convertBoolean); + PRIMITIVE_CONVERSION_MAP.put(BYTES, OrcUtil::convertBytes); + PRIMITIVE_CONVERSION_MAP.put(FLOAT32, OrcUtil::convertFloat32); + PRIMITIVE_CONVERSION_MAP.put(FLOAT64, OrcUtil::convertFloat64); + PRIMITIVE_CONVERSION_MAP.put(INT8, OrcUtil::convertInt8); + PRIMITIVE_CONVERSION_MAP.put(INT16, OrcUtil::convertInt16); + PRIMITIVE_CONVERSION_MAP.put(INT32, OrcUtil::convertInt32); + PRIMITIVE_CONVERSION_MAP.put(INT64, OrcUtil::convertInt64); + PRIMITIVE_CONVERSION_MAP.put(STRING, OrcUtil::convertString); } /** @@ -87,8 +91,8 @@ public final class OrcUtil { * @return the struct object */ @SuppressWarnings("unchecked") - public static OrcStruct createOrcStruct(TypeInfo typeInfo, Object... objs) { - SettableStructObjectInspector oi = (SettableStructObjectInspector) + public static OrcStruct createOrcStruct(TypeInfo typeInfo, Object[] objs) { + SettableStructObjectInspector oi = (SettableStructObjectInspector) OrcStruct.createObjectInspector(typeInfo); List fields = (List) oi.getAllStructFieldRefs(); @@ -107,87 +111,114 @@ public static OrcStruct createOrcStruct(TypeInfo typeInfo, Object... objs) { * @param struct the struct to convert * @return the struct as a writable array */ - public static Object[] convertStruct(Struct struct) { + public static Object[] convertStruct(TypeInfo typeInfo, Struct struct) { List data = new LinkedList<>(); for (Field field : struct.schema().fields()) { if (struct.get(field) == null) { data.add(null); } else { - Schema.Type schemaType = field.schema().type(); - data.add(CONVERSION_MAP.get(schemaType).apply(struct, field)); + TypeInfo fieldTypeInfo = ((StructTypeInfo) typeInfo).getStructFieldTypeInfo(field.name()); + data.add(convert(fieldTypeInfo, field.schema(), struct.get(field))); } } return data.toArray(); } - private static Object convertStruct(Struct struct, Field field) { - return convertStruct(struct.getStruct(field.name())); + public static Object convert(TypeInfo typeInfo, Schema schema, Object obj) { + + switch (schema.type()) { + case STRUCT: + return createOrcStruct(typeInfo, convertStruct(typeInfo, (Struct) obj)); + case ARRAY: + return convertArray(typeInfo, schema, (List) obj); + case MAP: + return convertMap(typeInfo, schema, (Map) obj); + default: + return PRIMITIVE_CONVERSION_MAP.get(schema.type()).apply(schema, obj); + } } - private static Object convertArray(Struct struct, Field field) { - return new ArrayPrimitiveWritable(struct.getArray(field.name()).toArray()); + private static Object convertArray(TypeInfo typeInfo, Schema schema, List objects) { + + TypeInfo elementTypeInfo = ((ListTypeInfo) typeInfo).getListElementTypeInfo(); + Schema valueSchema = schema.valueSchema(); + return objects.stream().map(o -> convert(elementTypeInfo, valueSchema, o)) + .collect(Collectors.toList()); } - private static Object convertBoolean(Struct struct, Field field) { - return new BooleanWritable(struct.getBoolean(field.name())); + private static Object convertBoolean(Schema schema, Object obj) { + return new BooleanWritable((Boolean) obj); } - private static Object convertBytes(Struct struct, Field field) { - return new BytesWritable(struct.getBytes(field.name())); + private static Object convertBytes(Schema schema, Object obj) { + + if (Decimal.LOGICAL_NAME.equals(schema.name())) { + BigDecimal bigDecimal = (BigDecimal) obj; + return new HiveDecimalWritable(HiveDecimal.create(bigDecimal)); + } + + // taken from Struct.getBytes() + byte[] bytes = obj instanceof ByteBuffer ? ((ByteBuffer)obj).array() : (byte[])((byte[])obj); + return new BytesWritable(bytes); } - private static Object convertFloat32(Struct struct, Field field) { - return new FloatWritable(struct.getFloat32(field.name())); + private static Object convertFloat32(Schema schema, Object obj) { + return new FloatWritable((Float) obj); } - private static Object convertFloat64(Struct struct, Field field) { - return new DoubleWritable(struct.getFloat64(field.name())); + private static Object convertFloat64(Schema schema, Object obj) { + return new DoubleWritable((Double) obj); } - private static Object convertInt8(Struct struct, Field field) { - return new ByteWritable(struct.getInt8(field.name())); + private static Object convertInt8(Schema schema, Object obj) { + return new ByteWritable((Byte) obj); } - private static Object convertInt16(Struct struct, Field field) { - return new ShortWritable(struct.getInt16(field.name())); + private static Object convertInt16(Schema schema, Object obj) { + return new ShortWritable((Short) obj); } - private static Object convertInt32(Struct struct, Field field) { + private static Object convertInt32(Schema schema, Object obj) { - if (Date.LOGICAL_NAME.equals(field.schema().name())) { - java.util.Date date = (java.util.Date) struct.get(field); + if (Date.LOGICAL_NAME.equals(schema.name())) { + java.util.Date date = (java.util.Date) obj; return new DateWritable(new java.sql.Date(date.getTime())); } - if (Time.LOGICAL_NAME.equals(field.schema().name())) { - java.util.Date date = (java.util.Date) struct.get(field); - return new TimestampWritable(new java.sql.Timestamp(date.getTime())); + if (Time.LOGICAL_NAME.equals(schema.name())) { + java.util.Date date = (java.util.Date) obj; + return new IntWritable((int) date.getTime()); } - return new IntWritable(struct.getInt32(field.name())); + return new IntWritable((Integer) obj); } - private static Object convertInt64(Struct struct, Field field) { + private static Object convertInt64(Schema schema, Object obj) { - if (Timestamp.LOGICAL_NAME.equals(field.schema().name())) { - java.util.Date date = (java.util.Date) struct.get(field); + if (Timestamp.LOGICAL_NAME.equals(schema.name())) { + java.util.Date date = (java.util.Date) obj; return new TimestampWritable(new java.sql.Timestamp(date.getTime())); } - return new LongWritable(struct.getInt64(field.name())); + if (Time.LOGICAL_NAME.equals(schema.name())) { + java.util.Date date = (java.util.Date) obj; + return new LongWritable(date.getTime()); + } + + return new LongWritable((Long) obj); } - private static Object convertMap(Struct struct, Field field) { - MapWritable mapWritable = new MapWritable(); - struct.getMap(field.name()).forEach( - (key, value) -> mapWritable.put(new ObjectWritable(key), new ObjectWritable(value)) - ); + private static Object convertMap(TypeInfo typeInfo, Schema schema, Map obj) { - return mapWritable; + MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo; + return obj.entrySet().stream().map(e -> new AbstractMap.SimpleEntry<>( + convert(mapTypeInfo.getMapKeyTypeInfo(), schema.keySchema(), e.getKey()), + convert(mapTypeInfo.getMapValueTypeInfo(), schema.valueSchema(), e.getValue())) + ).collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)); } - private static Object convertString(Struct struct, Field field) { - return new Text(struct.getString(field.name())); + private static Object convertString(Schema schema, Object obj) { + return new Text((String) obj); } } diff --git a/src/test/java/io/confluent/connect/hdfs/TestWithMiniDFSCluster.java b/src/test/java/io/confluent/connect/hdfs/TestWithMiniDFSCluster.java index 7038ab5d5..dca864636 100644 --- a/src/test/java/io/confluent/connect/hdfs/TestWithMiniDFSCluster.java +++ b/src/test/java/io/confluent/connect/hdfs/TestWithMiniDFSCluster.java @@ -15,17 +15,27 @@ package io.confluent.connect.hdfs; +import com.google.common.collect.ImmutableMap; +import java.math.BigDecimal; +import java.time.Instant; +import java.util.Arrays; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.types.StructBuilder; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.connect.data.Date; +import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Schema.Type; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.SchemaProjector; import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.data.Time; +import org.apache.kafka.connect.data.Timestamp; import org.apache.kafka.connect.sink.SinkRecord; import org.junit.After; @@ -251,6 +261,77 @@ protected List createSinkRecordsWithTimestamp( return sinkRecords; } + private Schema createLogicalSchema() { + return SchemaBuilder.struct().name("record").version(1) + .field("time", Time.SCHEMA) + .field("timestamp", Timestamp.SCHEMA) + .field("date", Date.SCHEMA) + .field("decimal", Decimal.schema(2)) + .build(); + } + + protected Struct createLogicalStruct() { + + Struct struct = new Struct(createLogicalSchema()); + struct.put("time", Time.toLogical(Time.SCHEMA, 167532)); + struct.put("timestamp", Timestamp.toLogical(Timestamp.SCHEMA, 1675323210)); + struct.put("date", Date.toLogical(Date.SCHEMA, 12345)); + struct.put("decimal", BigDecimal.valueOf(5000, 2)); + return struct; + } + + private Schema createArraySchema() { + return SchemaBuilder.struct().name("record").version(1) + .field("struct_array", SchemaBuilder.array(createSchema()).build()) + .field("int_array", SchemaBuilder.array(Schema.INT32_SCHEMA).build()) + .field("logical_array", SchemaBuilder.array(Date.SCHEMA).build()) + .field("array_array", SchemaBuilder.array(SchemaBuilder.array(createLogicalSchema()).build()).build()) + .build(); + + } + protected Struct createArrayStruct() { + + Struct record = createRecord(createSchema()); + Struct logicalStruct = createLogicalStruct(); + java.util.Date today = new java.util.Date(Instant.now().toEpochMilli()); + Struct struct = new Struct(createArraySchema()); + + struct.put("struct_array", Arrays.asList(record, record)); + struct.put("int_array", Arrays.asList(Integer.valueOf(1), Integer.valueOf(2), Integer.valueOf(3))); + struct.put("logical_array", Arrays.asList(today, today)); + struct.put("array_array", Arrays.asList(Arrays.asList(logicalStruct, logicalStruct), Arrays.asList(logicalStruct, logicalStruct))); + return struct; + } + + protected Struct createNestedStruct() { + + Schema nestedSchema = SchemaBuilder.struct().name("record").version(1) + .field("struct", createSchema()) + .field("int", Schema.INT32_SCHEMA) + .field("array", SchemaBuilder.array(createSchema()).build()) + .field("map", SchemaBuilder.map(SchemaBuilder.STRING_SCHEMA, SchemaBuilder.STRING_SCHEMA).build()) + .build(); + Schema schema = SchemaBuilder.struct().name("record").version(1) + .field("struct", createLogicalSchema()) + .field("nested", nestedSchema) + .field("string", Schema.STRING_SCHEMA) + .field("map", SchemaBuilder.map(SchemaBuilder.STRING_SCHEMA, createLogicalSchema()).build()) + .build(); + + Struct struct = new Struct(schema); + struct.put("struct", createLogicalStruct()); + Struct nested = new Struct(nestedSchema); + nested.put("struct", createRecord(createSchema())); + nested.put("int", 10); + nested.put("array", Arrays.asList(createRecord(createSchema()), createRecord(createSchema()))); + nested.put("map", ImmutableMap.of("a", "b", "c", "d")); + struct.put("nested", nested); + struct.put("string", "test"); + struct.put("map", ImmutableMap.of("s1", createLogicalStruct(), "s2", createLogicalStruct())); + + return struct; + } + protected String getDirectory() { return getDirectory(TOPIC, PARTITION); } @@ -388,4 +469,20 @@ protected int getFileSystemCacheSize() throws Exception { return cacheMap.size(); } + protected void writeAndVerify(List sinkRecords) throws Exception { + DataWriter hdfsWriter = new DataWriter(connectorConfig, context, avroData); + partitioner = hdfsWriter.getPartitioner(); + hdfsWriter.recover(TOPIC_PARTITION); + + + hdfsWriter.write(sinkRecords); + hdfsWriter.close(); + hdfsWriter.stop(); + + List validOffsets = new ArrayList<>(); + int flushSize = connectorConfig.getInt(HdfsSinkConnectorConfig.FLUSH_SIZE_CONFIG); + for (long i = 0; i < sinkRecords.size(); i += flushSize) validOffsets.add(i); + verify(sinkRecords, validOffsets.stream().mapToLong(l -> l).toArray()); + } + } diff --git a/src/test/java/io/confluent/connect/hdfs/orc/DataWriterOrcTest.java b/src/test/java/io/confluent/connect/hdfs/orc/DataWriterOrcTest.java index 778a0b9d8..31240f480 100644 --- a/src/test/java/io/confluent/connect/hdfs/orc/DataWriterOrcTest.java +++ b/src/test/java/io/confluent/connect/hdfs/orc/DataWriterOrcTest.java @@ -20,8 +20,12 @@ import io.confluent.connect.hdfs.HdfsSinkConnectorConfig; import io.confluent.connect.hdfs.TestWithMiniDFSCluster; import io.confluent.connect.storage.hive.HiveSchemaConverter; +import java.io.IOException; +import java.util.Collections; +import java.util.Set; +import org.apache.hadoop.hive.ql.io.orc.OrcStruct; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.kafka.connect.data.Field; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaProjector; import org.apache.kafka.connect.data.Struct; @@ -54,19 +58,13 @@ protected Map createProps() { @Test public void testWriteRecord() throws Exception { - DataWriter hdfsWriter = new DataWriter(connectorConfig, context, avroData); - partitioner = hdfsWriter.getPartitioner(); - hdfsWriter.recover(TOPIC_PARTITION); - - List sinkRecords = createSinkRecords(7); - - hdfsWriter.write(sinkRecords); - hdfsWriter.close(); - hdfsWriter.stop(); + writeAndVerify(createSinkRecords(7)); + } - // Last file (offset 6) doesn't satisfy size requirement and gets discarded on close - long[] validOffsets = {0, 3, 6}; - verify(sinkRecords, validOffsets); + @Test + public void testWriteNestedRecord() throws Exception { + Struct struct = createNestedStruct(); + writeAndVerify(createSinkRecords(Collections.nCopies(7, struct), struct.schema())); } @Override @@ -80,16 +78,11 @@ protected void verifyContents(List expectedRecords, int startIndex, expectedRecords.get(startIndex++).value(), expectedSchema); - TypeInfo typeInfo = HiveSchemaConverter.convert(expectedSchema); - - ArrayList objs = new ArrayList<>(); - for (Field field : expectedSchema.fields()) { - objs.add(((Struct) expectedValue).get(field)); - } + TypeInfo typeInfo = HiveSchemaConverter.convertMaybeLogical(expectedSchema); - expectedValue = OrcUtil.createOrcStruct(typeInfo, objs.toArray(new Object[0])); + OrcStruct orcStruct = (OrcStruct) OrcUtil.convert(typeInfo, expectedSchema, expectedValue); - assertEquals(expectedValue.toString(), orcRecord.toString()); + assertEquals(orcStruct.toString(), orcRecord.toString()); } } diff --git a/src/test/java/io/confluent/connect/hdfs/orc/HiveIntegrationOrcTest.java b/src/test/java/io/confluent/connect/hdfs/orc/HiveIntegrationOrcTest.java index 3874c2166..efcbfeb19 100644 --- a/src/test/java/io/confluent/connect/hdfs/orc/HiveIntegrationOrcTest.java +++ b/src/test/java/io/confluent/connect/hdfs/orc/HiveIntegrationOrcTest.java @@ -25,7 +25,9 @@ import io.confluent.connect.hdfs.partitioner.FieldPartitioner; import io.confluent.connect.hdfs.partitioner.TimeUtils; import io.confluent.connect.storage.hive.HiveConfig; +import io.confluent.connect.storage.hive.HiveSchemaConverter; import io.confluent.connect.storage.partitioner.PartitionerConfig; +import java.sql.DriverManager; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -33,10 +35,16 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Schema.Type; +import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.sink.SinkRecord; import org.joda.time.DateTime; @@ -147,6 +155,7 @@ public void testHiveIntegrationOrc() throws Exception { String hiveTableName = connectorConfig.getHiveTableName(TOPIC); Table table = hiveMetaStore.getTable(hiveDatabase, hiveTableName); List expectedColumnNames = new ArrayList<>(); + for (Field field : schema.fields()) { expectedColumnNames.add(field.name()); } @@ -164,6 +173,96 @@ public void testHiveIntegrationOrc() throws Exception { assertEquals(expectedPartitions, partitions); } + @Test + public void testHiveIntegrationWithLogicalTypesOrc() throws Exception { + localProps.put(HiveConfig.HIVE_INTEGRATION_CONFIG, "true"); + setUp(); + + DataWriter hdfsWriter = new DataWriter(connectorConfig, context, avroData); + hdfsWriter.recover(TOPIC_PARTITION); + + Struct struct = createLogicalStruct(); + List sinkRecords = createSinkRecords(Arrays.asList(struct, struct, struct), struct.schema()); + + hdfsWriter.write(sinkRecords); + hdfsWriter.close(); + hdfsWriter.stop(); + + String hiveTableName = connectorConfig.getHiveTableName(TOPIC); + Table table = hiveMetaStore.getTable(hiveDatabase, hiveTableName); + + List hiveFields = table.getFields(); + List connectFields = struct.schema().fields(); + for (int i = 0; i < connectFields.size(); i++) { + assertEquals(connectFields.get(i).name(), hiveFields.get(i).getFieldName()); + assertEquals(HiveSchemaConverter.convertPrimitiveMaybeLogical(connectFields.get(i).schema()).getTypeName(), + hiveFields.get(i).getFieldObjectInspector().getTypeName()); + } + } + + @Test + public void testHiveIntegrationWithArrays() throws Exception { + localProps.put(HiveConfig.HIVE_INTEGRATION_CONFIG, "true"); + setUp(); + + DataWriter hdfsWriter = new DataWriter(connectorConfig, context, avroData); + hdfsWriter.recover(TOPIC_PARTITION); + + Struct struct = createArrayStruct(); + List sinkRecords = createSinkRecords(Arrays.asList(struct, struct, struct), struct.schema()); + + hdfsWriter.write(sinkRecords); + hdfsWriter.close(); + hdfsWriter.stop(); + + String hiveTableName = connectorConfig.getHiveTableName(TOPIC); + Table table = hiveMetaStore.getTable(hiveDatabase, hiveTableName); + + StructTypeInfo typeInfo = (StructTypeInfo) HiveSchemaConverter.convertMaybeLogical(struct.schema()); + String expectedTypeInfo = typeInfo.getAllStructFieldTypeInfos().stream().map(TypeInfo::toString).collect( + Collectors.joining(",")); + String tableTypeInfo = table.getSd().getCols().stream().map(FieldSchema::getType).collect( + Collectors.joining(",")); + assertEquals(expectedTypeInfo, tableTypeInfo); + + List expectedPartitions = Arrays.asList(partitionLocation(TOPIC, PARTITION)); + List partitions = hiveMetaStore.listPartitions(hiveDatabase, hiveTableName, (short)-1); + assertEquals(expectedPartitions, partitions); + + } + + @Test + public void testHiveIntegrationWithNestedStruct() throws Exception { + localProps.put(HiveConfig.HIVE_INTEGRATION_CONFIG, "true"); + setUp(); + + DataWriter hdfsWriter = new DataWriter(connectorConfig, context, avroData); + hdfsWriter.recover(TOPIC_PARTITION); + + + Struct struct = createNestedStruct(); + + List sinkRecords = createSinkRecords(Arrays.asList(struct, struct, struct), struct.schema()); + + hdfsWriter.write(sinkRecords); + hdfsWriter.close(); + hdfsWriter.stop(); + + String hiveTableName = connectorConfig.getHiveTableName(TOPIC); + Table table = hiveMetaStore.getTable(hiveDatabase, hiveTableName); + + StructTypeInfo typeInfo = (StructTypeInfo) HiveSchemaConverter.convertMaybeLogical(struct.schema()); + String expectedTypeInfo = typeInfo.getAllStructFieldTypeInfos().stream().map(TypeInfo::toString).collect( + Collectors.joining(",")); + String tableTypeInfo = table.getSd().getCols().stream().map(FieldSchema::getType).collect( + Collectors.joining(",")); + assertEquals(expectedTypeInfo, tableTypeInfo); + + List expectedPartitions = Arrays.asList(partitionLocation(TOPIC, PARTITION)); + List partitions = hiveMetaStore.listPartitions(hiveDatabase, hiveTableName, (short)-1); + assertEquals(expectedPartitions, partitions); + } + @Test public void testHiveIntegrationFieldPartitionerOrc() throws Exception { localProps.put(HiveConfig.HIVE_INTEGRATION_CONFIG, "true"); diff --git a/src/test/java/io/confluent/connect/hdfs/orc/OrcFileReaderTest.java b/src/test/java/io/confluent/connect/hdfs/orc/OrcFileReaderTest.java new file mode 100644 index 000000000..501316d09 --- /dev/null +++ b/src/test/java/io/confluent/connect/hdfs/orc/OrcFileReaderTest.java @@ -0,0 +1,79 @@ +package io.confluent.connect.hdfs.orc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import io.confluent.connect.hdfs.FileUtils; +import io.confluent.connect.hdfs.HdfsSinkConnectorConfig; +import io.confluent.connect.hdfs.TestWithMiniDFSCluster; +import io.confluent.connect.storage.hive.HiveSchemaConverter; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.hadoop.fs.Path; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.Before; +import org.junit.Test; + +public class OrcFileReaderTest extends TestWithMiniDFSCluster { + + @Before + public void setUp() throws Exception { + super.setUp(); + dataFileReader = new OrcDataFileReader(); + extension = ".orc"; + } + + @Override + protected Map createProps() { + Map props = super.createProps(); + props.put(HdfsSinkConnectorConfig.FORMAT_CLASS_CONFIG, OrcFormat.class.getName()); + return props; + } + + @Test + public void testSchemaRead() throws Exception { + writeAndVerify(createSinkRecords(7)); + } + + @Test + public void testSchemaReadWithNestedStruct() throws Exception { + Struct struct = createNestedStruct(); + writeAndVerify(createSinkRecords(Collections.nCopies(7, struct), struct.schema())); + } + + @Override + protected void verify(List sinkRecords, long[] validOffsets, + Set partitions, boolean skipFileListing) + throws IOException { + + if (!skipFileListing) { + verifyFileListing(validOffsets, partitions); + } + + for (TopicPartition tp : partitions) { + for (int i = 1, j = 0; i < validOffsets.length; ++i) { + long startOffset = validOffsets[i - 1]; + long endOffset = validOffsets[i] - 1; + + String topicsDir = this.topicsDir.get(tp.topic()); + String filename = FileUtils.committedFileName(url, topicsDir, + getDirectory(tp.topic(), tp.partition()), tp, + startOffset, endOffset, extension, zeroPadFormat); + Path path = new Path(filename); + + Schema fileSchema = new OrcFileReader().getSchema(connectorConfig, path); + String hiveSchema = HiveSchemaConverter.convertMaybeLogical(fileSchema).toString(); + String connectSchema = HiveSchemaConverter + .convertMaybeLogical(sinkRecords.get(0).valueSchema()).toString(); + assertEquals(hiveSchema, connectSchema); + } + } + + } +}