Skip to content

Commit 00f47be

Browse files
authored
CCLOG-2401: Support for hierarchical ORC data and logical tpes. (#651)
* Fix OrcUtil bug where convertStruct() returns an Array instead of an OrcStruct. Added support to create DATE, TIMESTAMP and INT values from KafkaConnect Date, Timestamp and Time respectively.
1 parent 2399995 commit 00f47be

File tree

9 files changed

+445
-124
lines changed

9 files changed

+445
-124
lines changed

pom.xml

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
<parent>
2121
<groupId>io.confluent</groupId>
2222
<artifactId>kafka-connect-storage-common-parent</artifactId>
23-
<version>11.0.16</version>
23+
<version>11.1.0</version>
2424
</parent>
2525

2626
<artifactId>kafka-connect-hdfs</artifactId>
@@ -52,12 +52,11 @@
5252
</scm>
5353

5454
<properties>
55-
<confluent.maven.repo>https://packages.confluent.io/maven/</confluent.maven.repo>
5655
<apacheds-jdbm1.version>2.0.0-M2</apacheds-jdbm1.version>
5756
<confluent-log4j.version>1.2.17-cp8</confluent-log4j.version>
5857
<kafka.connect.maven.plugin.version>0.11.1</kafka.connect.maven.plugin.version>
5958
<maven.release.plugin.version>2.5.3</maven.release.plugin.version>
60-
<kafka.connect.storage.common.version>11.0.16</kafka.connect.storage.common.version>
59+
<kafka.connect.storage.common.version>11.1.0</kafka.connect.storage.common.version>
6160
<commons.collections.version>3.2.2</commons.collections.version>
6261
<libthrift.version>0.13.0</libthrift.version>
6362
<log4j2-api.version>2.17.1</log4j2-api.version>
@@ -70,7 +69,7 @@
7069
<repository>
7170
<id>confluent</id>
7271
<name>Confluent</name>
73-
<url>${confluent.maven.repo}</url>
72+
<url>https://packages.confluent.io/maven/</url>
7473
</repository>
7574
</repositories>
7675

src/main/java/io/confluent/connect/hdfs/orc/OrcFileReader.java

Lines changed: 58 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@
2222
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
2323
import org.apache.hadoop.hive.ql.io.orc.OrcFile.ReaderOptions;
2424
import org.apache.hadoop.hive.ql.io.orc.Reader;
25+
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
26+
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
2527
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
28+
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
2629
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
2730
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
28-
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
29-
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.PrimitiveTypeEntry;
3031
import org.apache.kafka.connect.data.ConnectSchema;
3132
import org.apache.kafka.connect.data.Date;
33+
import org.apache.kafka.connect.data.Decimal;
3234
import org.apache.kafka.connect.data.Schema;
3335
import org.apache.kafka.connect.data.SchemaBuilder;
3436
import org.apache.kafka.connect.data.Timestamp;
@@ -53,40 +55,8 @@ public Schema getSchema(HdfsSinkConnectorConfig conf, Path path) {
5355
Reader reader = OrcFile.createReader(path, readerOptions);
5456

5557
if (reader.getObjectInspector().getCategory() == ObjectInspector.Category.STRUCT) {
56-
SchemaBuilder schemaBuilder = SchemaBuilder.struct().name("record").version(1);
5758
StructObjectInspector objectInspector = (StructObjectInspector) reader.getObjectInspector();
58-
59-
for (StructField schema : objectInspector.getAllStructFieldRefs()) {
60-
ObjectInspector fieldObjectInspector = schema.getFieldObjectInspector();
61-
String typeName = fieldObjectInspector.getTypeName();
62-
Schema.Type schemaType;
63-
64-
switch (fieldObjectInspector.getCategory()) {
65-
case PRIMITIVE:
66-
PrimitiveTypeEntry typeEntry = PrimitiveObjectInspectorUtils
67-
.getTypeEntryFromTypeName(typeName);
68-
if (java.sql.Date.class.isAssignableFrom(typeEntry.primitiveJavaClass)) {
69-
schemaType = Date.SCHEMA.type();
70-
} else if (java.sql.Timestamp.class.isAssignableFrom(typeEntry.primitiveJavaClass)) {
71-
schemaType = Timestamp.SCHEMA.type();
72-
} else {
73-
schemaType = ConnectSchema.schemaType(typeEntry.primitiveJavaClass);
74-
}
75-
break;
76-
case LIST:
77-
schemaType = Schema.Type.ARRAY;
78-
break;
79-
case MAP:
80-
schemaType = Schema.Type.MAP;
81-
break;
82-
default:
83-
throw new DataException("Unknown type " + fieldObjectInspector.getCategory().name());
84-
}
85-
86-
schemaBuilder.field(schema.getFieldName(), SchemaBuilder.type(schemaType).build());
87-
}
88-
89-
return schemaBuilder.build();
59+
return deriveStruct(objectInspector);
9060
} else {
9161
throw new ConnectException(
9262
"Top level type must be of type STRUCT, but was "
@@ -98,6 +68,59 @@ public Schema getSchema(HdfsSinkConnectorConfig conf, Path path) {
9868
}
9969
}
10070

71+
private Schema derivePrimitive(PrimitiveObjectInspector inspector) {
72+
Class<?> klass = inspector.getTypeInfo().getPrimitiveJavaClass();
73+
if (java.sql.Date.class.isAssignableFrom(klass)) {
74+
return Date.SCHEMA;
75+
} else if (java.sql.Timestamp.class.isAssignableFrom(klass)) {
76+
return Timestamp.SCHEMA;
77+
} else if (org.apache.hadoop.hive.common.type.HiveDecimal.class.isAssignableFrom(klass)) {
78+
return Decimal.schema(inspector.scale());
79+
}
80+
return SchemaBuilder.type(ConnectSchema.schemaType(klass)).build();
81+
82+
}
83+
84+
private Schema deriveSchema(ObjectInspector inspector) {
85+
86+
switch (inspector.getCategory()) {
87+
case PRIMITIVE:
88+
return derivePrimitive((PrimitiveObjectInspector) inspector);
89+
case MAP:
90+
return deriveMap((MapObjectInspector) inspector);
91+
case LIST:
92+
return deriveList((ListObjectInspector) inspector);
93+
case STRUCT:
94+
return deriveStruct((StructObjectInspector) inspector);
95+
default:
96+
throw new DataException("Unknown type " + inspector.getCategory()
97+
.name());
98+
}
99+
}
100+
101+
private Schema deriveStruct(StructObjectInspector inspector) {
102+
103+
SchemaBuilder schemaBuilder = SchemaBuilder.struct();
104+
for (StructField field: inspector.getAllStructFieldRefs()) {
105+
ObjectInspector fieldInspector = field.getFieldObjectInspector();
106+
schemaBuilder.field(field.getFieldName(), deriveSchema(fieldInspector));
107+
}
108+
schemaBuilder.name("record").version(1);
109+
return schemaBuilder.build();
110+
}
111+
112+
113+
private Schema deriveMap(MapObjectInspector inspector) {
114+
return SchemaBuilder.map(
115+
deriveSchema(inspector.getMapKeyObjectInspector()),
116+
deriveSchema(inspector.getMapValueObjectInspector())
117+
).build();
118+
}
119+
120+
private Schema deriveList(ListObjectInspector inspector) {
121+
return SchemaBuilder.array(deriveSchema(inspector.getListElementObjectInspector())).build();
122+
}
123+
101124
@Override
102125
public boolean hasNext() {
103126
throw new UnsupportedOperationException();

src/main/java/io/confluent/connect/hdfs/orc/OrcHiveUtil.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public OrcHiveUtil(HdfsSinkConnectorConfig config, HiveMetaStore hiveMetaStore)
4444
@Override
4545
public void alterSchema(String database, String tableName, Schema schema) {
4646
Table table = hiveMetaStore.getTable(database, tableName);
47-
List<FieldSchema> columns = HiveSchemaConverter.convertSchema(schema);
47+
List<FieldSchema> columns = HiveSchemaConverter.convertSchemaMaybeLogical(schema);
4848
table.setFields(columns);
4949
hiveMetaStore.alterTable(table);
5050
}
@@ -85,7 +85,7 @@ private Table constructOrcTable(
8585
}
8686

8787
// convert Connect schema schema to Hive columns
88-
List<FieldSchema> columns = HiveSchemaConverter.convertSchema(schema);
88+
List<FieldSchema> columns = HiveSchemaConverter.convertSchemaMaybeLogical(schema);
8989
table.setFields(columns);
9090
table.setPartCols(partitioner.partitionFields());
9191
return table;

src/main/java/io/confluent/connect/hdfs/orc/OrcRecordWriterProvider.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public void preFooterWrite(OrcFile.WriterContext writerContext) {
7070
}
7171
};
7272

73-
typeInfo = HiveSchemaConverter.convert(schema);
73+
typeInfo = HiveSchemaConverter.convertMaybeLogical(schema);
7474
ObjectInspector objectInspector = OrcStruct.createObjectInspector(typeInfo);
7575

7676
log.info("Opening ORC record writer for: {}", filename);
@@ -90,7 +90,7 @@ public void preFooterWrite(OrcFile.WriterContext writerContext) {
9090
);
9191

9292
Struct struct = (Struct) record.value();
93-
OrcStruct row = OrcUtil.createOrcStruct(typeInfo, OrcUtil.convertStruct(struct));
93+
OrcStruct row = (OrcStruct) OrcUtil.convert(typeInfo, struct.schema(), struct);
9494
writer.addRow(row);
9595

9696
} else {

0 commit comments

Comments
 (0)