Skip to content

Commit

Permalink
Add support for time type partitioning in ORC for iceberg with testing
Browse files Browse the repository at this point in the history
  • Loading branch information
auden-woolfson committed Jan 8, 2025
1 parent 8883d76 commit da8715f
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.predicate.ValueSet;
import com.facebook.presto.common.type.StandardTypes;
import com.facebook.presto.common.type.TimeType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.hive.EncryptionInformation;
Expand Down Expand Up @@ -572,6 +573,15 @@ private static ConnectorPageSourceWithRowPositions createBatchOrcPageSource(
isRowPositionList.add(column.isRowPositionColumn());
}

// Skip the time type columns in predicate, converted on page source level
ImmutableMap.Builder<IcebergColumnHandle, Domain> predicateExcludeTimeType = ImmutableMap.builder();
effectivePredicate.getDomains().get().forEach((columnHandle, domain) -> {
if (!(columnHandle.getType() instanceof TimeType)) {
predicateExcludeTimeType.put(columnHandle, domain);
}
});

effectivePredicate = TupleDomain.withColumnDomains(predicateExcludeTimeType.build());
TupleDomain<HiveColumnHandle> hiveColumnHandleTupleDomain = effectivePredicate.transform(column -> {
IcebergOrcColumn icebergOrcColumn;
if (fileOrcColumnByIcebergId.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ public static Optional<String> tryGetLocation(Table table)
}
}

private static boolean isValidPartitionType(FileFormat fileFormat, Type type)
private static boolean isValidPartitionType(Type type)
{
return type instanceof DecimalType ||
BOOLEAN.equals(type) ||
Expand All @@ -521,15 +521,15 @@ private static boolean isValidPartitionType(FileFormat fileFormat, Type type)
DOUBLE.equals(type) ||
DATE.equals(type) ||
type instanceof TimestampType ||
(TIME.equals(type) && fileFormat == PARQUET) ||
TIME.equals(type) ||
VARBINARY.equals(type) ||
isVarcharType(type) ||
isCharType(type);
}

private static void verifyPartitionTypeSupported(FileFormat fileFormat, String partitionName, Type type)
private static void verifyPartitionTypeSupported(String partitionName, Type type)
{
if (!isValidPartitionType(fileFormat, type)) {
if (!isValidPartitionType(type)) {
throw new PrestoException(NOT_SUPPORTED, format("Unsupported type [%s] for partition: %s", type, partitionName));
}
}
Expand All @@ -540,7 +540,7 @@ private static NullableValue parsePartitionValue(
Type prestoType,
String partitionName)
{
verifyPartitionTypeSupported(fileFormat, partitionName, prestoType);
verifyPartitionTypeSupported(partitionName, prestoType);

Object partitionValue = deserializePartitionValue(prestoType, partitionStringValue, partitionName);
return partitionValue == null ? NullableValue.asNull(prestoType) : NullableValue.of(prestoType, partitionValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,9 @@ private static TypeInfo toHiveTypeInfo(Type type)
if (DOUBLE.equals(type)) {
return HIVE_DOUBLE.getTypeInfo();
}
if (TimeType.TIME.equals(type)) {
return HIVE_LONG.getTypeInfo();
}
if (type instanceof VarcharType) {
VarcharType varcharType = (VarcharType) type;
if (varcharType.isUnbounded()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1054,6 +1054,42 @@ public void testCreateNestedPartitionedTable(Session session, FileFormat fileFor
dropTable(session, "test_nested_table2");
}

@DataProvider(name = "testPartitionedByTimeProvider")
public Object[][] testPartitionedByTimeProvider()
{
return new Object[][] {
{false, FileFormat.PARQUET},
{false, FileFormat.ORC},
{true, FileFormat.PARQUET},
{true, FileFormat.ORC}
};
}

@Test(dataProvider = "testPartitionedByTimeProvider")
private void testSelectOrPartitionedByTime(boolean partitioned, FileFormat format)
{
String tableName = format("test_%s_by_time", partitioned ? "partitioned" : "selected");
try {
String partitioning = partitioned ? ", partitioning = ARRAY['x']" : "";
assertUpdate(format("CREATE TABLE %s (x TIME, y BIGINT) WITH (format = '%s'%s)", tableName, format, partitioning));
assertUpdate(format("INSERT INTO %s VALUES (TIME '10:12:34', 12345)", tableName), 1);
assertQuery(format("SELECT COUNT(*) FROM %s", tableName), "SELECT 1");
assertQuery(format("SELECT x FROM %s", tableName), "SELECT CAST('10:12:34' AS TIME)");
assertUpdate(format("INSERT INTO %s VALUES (TIME '9:00:00', 67890)", tableName), 1);
assertQuery(format("SELECT COUNT(*) FROM %s", tableName), "SELECT 2");
assertQuery(format("SELECT x FROM %s WHERE y = 12345", tableName), "SELECT CAST('10:12:34' AS TIME)");
assertQuery(format("SELECT x FROM %s WHERE y = 67890", tableName), "SELECT CAST('9:00:00' AS TIME)");
assertUpdate(format("INSERT INTO %s VALUES (TIME '10:12:34', 54321)", tableName), 1);
assertQuery(
format("SELECT x, COUNT(*) FROM %s GROUP BY x ORDER BY x", tableName),
"SELECT CAST('9:00:00' AS TIME), 1 UNION ALL SELECT CAST('10:12:34' AS TIME), 2");
assertQuery(format("SELECT y FROM %s WHERE x = time '10:12:34'", tableName), "values 12345, 54321");
}
finally {
dropTable(getSession(), tableName);
}
}

@Test
public void testReadEmptyTable()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.facebook.presto.common.type.DateType;
import com.facebook.presto.common.type.IntegerType;
import com.facebook.presto.common.type.SmallintType;
import com.facebook.presto.common.type.TimeType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.orc.OrcCorruptionException;
import com.facebook.presto.orc.OrcLocalMemoryContext;
Expand Down Expand Up @@ -81,7 +82,7 @@ public LongDictionaryBatchStreamReader(Type type, StreamDescriptor streamDescrip
throws OrcCorruptionException
{
requireNonNull(type, "type is null");
verifyStreamType(streamDescriptor, type, t -> t instanceof BigintType || t instanceof IntegerType || t instanceof SmallintType || t instanceof DateType);
verifyStreamType(streamDescriptor, type, t -> t instanceof BigintType || t instanceof IntegerType || t instanceof SmallintType || t instanceof DateType || t instanceof TimeType);
this.type = type;
this.streamDescriptor = requireNonNull(streamDescriptor, "stream is null");
this.systemMemoryContext = requireNonNull(systemMemoryContext, "systemMemoryContext is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.facebook.presto.common.type.DateType;
import com.facebook.presto.common.type.IntegerType;
import com.facebook.presto.common.type.SmallintType;
import com.facebook.presto.common.type.TimeType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.orc.OrcCorruptionException;
import com.facebook.presto.orc.OrcLocalMemoryContext;
Expand All @@ -37,6 +38,7 @@

import java.io.IOException;
import java.util.Optional;
import java.util.function.LongFunction;

import static com.facebook.presto.orc.metadata.Stream.StreamKind.DATA;
import static com.facebook.presto.orc.metadata.Stream.StreamKind.PRESENT;
Expand All @@ -59,6 +61,7 @@ public class LongDirectBatchStreamReader

private final Type type;
private final StreamDescriptor streamDescriptor;
private final LongFunction<Long> convertUnits;

private int readOffset;
private int nextBatchSize;
Expand All @@ -84,10 +87,16 @@ public LongDirectBatchStreamReader(Type type, StreamDescriptor streamDescriptor,
throws OrcCorruptionException
{
requireNonNull(type, "type is null");
verifyStreamType(streamDescriptor, type, t -> t instanceof BigintType || t instanceof IntegerType || t instanceof SmallintType || t instanceof DateType);
verifyStreamType(streamDescriptor, type, t -> t instanceof BigintType || t instanceof IntegerType || t instanceof SmallintType || t instanceof DateType || t instanceof TimeType);
this.type = type;
this.streamDescriptor = requireNonNull(streamDescriptor, "stream is null");
this.systemMemoryContext = requireNonNull(systemMemoryContext, "systemMemoryContext is null");
if (this.type instanceof TimeType) {
this.convertUnits = longValue -> Math.floorDiv(longValue, 1000L);
}
else {
this.convertUnits = longValue -> longValue;
}
}

@Override
Expand Down Expand Up @@ -158,6 +167,14 @@ private Block readNonNullBlock()
dataStream.next(values, nextBatchSize);
return new LongArrayBlock(nextBatchSize, Optional.empty(), values);
}
if (type instanceof TimeType) {
long[] values = new long[nextBatchSize];
dataStream.next(values, nextBatchSize);
for (int i = 0; i < values.length; i++) {
values[i] = convertUnits.apply(values[i]);
}
return new LongArrayBlock(nextBatchSize, Optional.empty(), values);
}
if (type instanceof IntegerType || type instanceof DateType) {
int[] values = new int[nextBatchSize];
dataStream.next(values, nextBatchSize);
Expand All @@ -177,6 +194,9 @@ private Block readNullBlock(boolean[] isNull, int nonNullCount)
if (type instanceof BigintType) {
return longReadNullBlock(isNull, nonNullCount);
}
if (type instanceof TimeType) {
return longReadNullBlock(isNull, nonNullCount);
}
if (type instanceof IntegerType || type instanceof DateType) {
return intReadNullBlock(isNull, nonNullCount);
}
Expand All @@ -199,6 +219,13 @@ private Block longReadNullBlock(boolean[] isNull, int nonNullCount)
dataStream.next(longNonNullValueTemp, nonNullCount);

long[] result = unpackLongNulls(longNonNullValueTemp, isNull);
if (type instanceof TimeType) {
for (int i = 0; i < result.length; i++) {
if (!isNull[i]) {
result[i] = convertUnits.apply(result[i]);
}
}
}

return new LongArrayBlock(nextBatchSize, Optional.of(isNull), result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.type.FixedWidthType;
import com.facebook.presto.common.type.TimeType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.orc.ColumnWriterOptions;
import com.facebook.presto.orc.DwrfDataEncryptor;
Expand Down Expand Up @@ -43,6 +44,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.LongFunction;
import java.util.function.Supplier;

import static com.facebook.presto.orc.OrcEncoding.DWRF;
Expand All @@ -69,6 +71,7 @@ public class LongColumnWriter

private final List<ColumnStatistics> rowGroupColumnStatistics = new ArrayList<>();
private final CompressedMetadataWriter metadataWriter;
private final LongFunction<Long> convertUnits;
private long columnStatisticsRetainedSizeInBytes;
private PresentOutputStream presentStream;

Expand Down Expand Up @@ -111,6 +114,13 @@ public LongColumnWriter(
this.metadataWriter = new CompressedMetadataWriter(metadataWriter, columnWriterOptions, dwrfEncryptor);
this.statisticsBuilderSupplier = requireNonNull(statisticsBuilderSupplier, "statisticsBuilderSupplier is null");
this.statisticsBuilder = statisticsBuilderSupplier.get();

if (this.type instanceof TimeType) {
this.convertUnits = longValue -> Math.multiplyExact(longValue, 1000L);
}
else {
this.convertUnits = longValue -> longValue;
}
}

@Override
Expand Down Expand Up @@ -158,7 +168,7 @@ public long writeBlock(Block block)
int nonNullValueCount = 0;
for (int position = 0; position < block.getPositionCount(); position++) {
if (!block.isNull(position)) {
long value = type.getLong(block, position);
long value = this.convertUnits.apply(type.getLong(block, position));
writeValue(value);
nonNullValueCount++;
}
Expand Down

0 comments on commit da8715f

Please sign in to comment.