Skip to content

CCLOG-2401: Support for hierarchical ORC data and logical tpes. #651

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Feb 23, 2023
7 changes: 3 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-storage-common-parent</artifactId>
<version>11.0.16</version>
<version>11.1.0</version>
</parent>

<artifactId>kafka-connect-hdfs</artifactId>
Expand Down Expand Up @@ -52,12 +52,11 @@
</scm>

<properties>
<confluent.maven.repo>https://packages.confluent.io/maven/</confluent.maven.repo>
<apacheds-jdbm1.version>2.0.0-M2</apacheds-jdbm1.version>
<confluent-log4j.version>1.2.17-cp8</confluent-log4j.version>
<kafka.connect.maven.plugin.version>0.11.1</kafka.connect.maven.plugin.version>
<maven.release.plugin.version>2.5.3</maven.release.plugin.version>
<kafka.connect.storage.common.version>11.0.16</kafka.connect.storage.common.version>
<kafka.connect.storage.common.version>11.1.0</kafka.connect.storage.common.version>
<commons.collections.version>3.2.2</commons.collections.version>
<libthrift.version>0.13.0</libthrift.version>
<log4j2-api.version>2.17.1</log4j2-api.version>
Expand All @@ -70,7 +69,7 @@
<repository>
<id>confluent</id>
<name>Confluent</name>
<url>${confluent.maven.repo}</url>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>

Expand Down
93 changes: 58 additions & 35 deletions src/main/java/io/confluent/connect/hdfs/orc/OrcFileReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.OrcFile.ReaderOptions;
import org.apache.hadoop.hive.ql.io.orc.Reader;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.PrimitiveTypeEntry;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Timestamp;
Expand All @@ -53,40 +55,8 @@ public Schema getSchema(HdfsSinkConnectorConfig conf, Path path) {
Reader reader = OrcFile.createReader(path, readerOptions);

if (reader.getObjectInspector().getCategory() == ObjectInspector.Category.STRUCT) {
SchemaBuilder schemaBuilder = SchemaBuilder.struct().name("record").version(1);
StructObjectInspector objectInspector = (StructObjectInspector) reader.getObjectInspector();

for (StructField schema : objectInspector.getAllStructFieldRefs()) {
ObjectInspector fieldObjectInspector = schema.getFieldObjectInspector();
String typeName = fieldObjectInspector.getTypeName();
Schema.Type schemaType;

switch (fieldObjectInspector.getCategory()) {
case PRIMITIVE:
PrimitiveTypeEntry typeEntry = PrimitiveObjectInspectorUtils
.getTypeEntryFromTypeName(typeName);
if (java.sql.Date.class.isAssignableFrom(typeEntry.primitiveJavaClass)) {
schemaType = Date.SCHEMA.type();
} else if (java.sql.Timestamp.class.isAssignableFrom(typeEntry.primitiveJavaClass)) {
schemaType = Timestamp.SCHEMA.type();
} else {
schemaType = ConnectSchema.schemaType(typeEntry.primitiveJavaClass);
}
break;
case LIST:
schemaType = Schema.Type.ARRAY;
break;
case MAP:
schemaType = Schema.Type.MAP;
break;
default:
throw new DataException("Unknown type " + fieldObjectInspector.getCategory().name());
}

schemaBuilder.field(schema.getFieldName(), SchemaBuilder.type(schemaType).build());
}

return schemaBuilder.build();
return deriveStruct(objectInspector);
} else {
throw new ConnectException(
"Top level type must be of type STRUCT, but was "
Expand All @@ -98,6 +68,59 @@ public Schema getSchema(HdfsSinkConnectorConfig conf, Path path) {
}
}

private Schema derivePrimitive(PrimitiveObjectInspector inspector) {
Class<?> klass = inspector.getTypeInfo().getPrimitiveJavaClass();
if (java.sql.Date.class.isAssignableFrom(klass)) {
return Date.SCHEMA;
} else if (java.sql.Timestamp.class.isAssignableFrom(klass)) {
return Timestamp.SCHEMA;
} else if (org.apache.hadoop.hive.common.type.HiveDecimal.class.isAssignableFrom(klass)) {
return Decimal.schema(inspector.scale());
}
return SchemaBuilder.type(ConnectSchema.schemaType(klass)).build();

}

private Schema deriveSchema(ObjectInspector inspector) {

switch (inspector.getCategory()) {
case PRIMITIVE:
return derivePrimitive((PrimitiveObjectInspector) inspector);
case MAP:
return deriveMap((MapObjectInspector) inspector);
case LIST:
return deriveList((ListObjectInspector) inspector);
case STRUCT:
return deriveStruct((StructObjectInspector) inspector);
default:
throw new DataException("Unknown type " + inspector.getCategory()
.name());
}
}

private Schema deriveStruct(StructObjectInspector inspector) {

SchemaBuilder schemaBuilder = SchemaBuilder.struct();
for (StructField field: inspector.getAllStructFieldRefs()) {
ObjectInspector fieldInspector = field.getFieldObjectInspector();
schemaBuilder.field(field.getFieldName(), deriveSchema(fieldInspector));
}
schemaBuilder.name("record").version(1);
return schemaBuilder.build();
}


private Schema deriveMap(MapObjectInspector inspector) {
return SchemaBuilder.map(
deriveSchema(inspector.getMapKeyObjectInspector()),
deriveSchema(inspector.getMapValueObjectInspector())
).build();
}

private Schema deriveList(ListObjectInspector inspector) {
return SchemaBuilder.array(deriveSchema(inspector.getListElementObjectInspector())).build();
}

@Override
public boolean hasNext() {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public OrcHiveUtil(HdfsSinkConnectorConfig config, HiveMetaStore hiveMetaStore)
@Override
public void alterSchema(String database, String tableName, Schema schema) {
Table table = hiveMetaStore.getTable(database, tableName);
List<FieldSchema> columns = HiveSchemaConverter.convertSchema(schema);
List<FieldSchema> columns = HiveSchemaConverter.convertSchemaMaybeLogical(schema);
table.setFields(columns);
hiveMetaStore.alterTable(table);
}
Expand Down Expand Up @@ -85,7 +85,7 @@ private Table constructOrcTable(
}

// convert Connect schema schema to Hive columns
List<FieldSchema> columns = HiveSchemaConverter.convertSchema(schema);
List<FieldSchema> columns = HiveSchemaConverter.convertSchemaMaybeLogical(schema);
table.setFields(columns);
table.setPartCols(partitioner.partitionFields());
return table;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void preFooterWrite(OrcFile.WriterContext writerContext) {
}
};

typeInfo = HiveSchemaConverter.convert(schema);
typeInfo = HiveSchemaConverter.convertMaybeLogical(schema);
ObjectInspector objectInspector = OrcStruct.createObjectInspector(typeInfo);

log.info("Opening ORC record writer for: {}", filename);
Expand All @@ -90,7 +90,7 @@ public void preFooterWrite(OrcFile.WriterContext writerContext) {
);

Struct struct = (Struct) record.value();
OrcStruct row = OrcUtil.createOrcStruct(typeInfo, OrcUtil.convertStruct(struct));
OrcStruct row = (OrcStruct) OrcUtil.convert(typeInfo, struct.schema(), struct);
writer.addRow(row);

} else {
Expand Down
Loading