Skip to content

Commit b27df48

Browse files
Arunachalam Thirupathishixuan-fan
Arunachalam Thirupathi
authored andcommitted
Support DWRF stripes above MAX_INT rows in reader
OrcReader fails when a stripe has more than INT32_MAX rows. Both ORC/DWRF format uses long for stripe number of rows. We ran into this edge case on a file with only string as a column and file is produced by a different writer compatible with DWRF spec.
1 parent b2f1936 commit b27df48

File tree

10 files changed

+63
-28
lines changed

10 files changed

+63
-28
lines changed

presto-orc/src/main/java/com/facebook/presto/orc/AbstractOrcRecordReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -689,7 +689,7 @@ private void validateWrite(Predicate<OrcWriteValidation> test, String messageFor
689689
}
690690
}
691691

692-
private void validateWriteStripe(int rowCount)
692+
private void validateWriteStripe(long rowCount)
693693
{
694694
if (writeChecksumBuilder.isPresent()) {
695695
writeChecksumBuilder.get().addStripe(rowCount);

presto-orc/src/main/java/com/facebook/presto/orc/OrcWriteValidation.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -481,10 +481,10 @@ public static WriteChecksumBuilder createWriteChecksumBuilder(Map<Integer, Type>
481481
return new WriteChecksumBuilder(types.build());
482482
}
483483

484-
public void addStripe(int rowCount)
484+
public void addStripe(long rowCount)
485485
{
486-
longSlice.setInt(0, rowCount);
487-
stripeHash.update(longBuffer, 0, Integer.BYTES);
486+
longSlice.setLong(0, rowCount);
487+
stripeHash.update(longBuffer, 0, Long.BYTES);
488488
}
489489

490490
public void addPage(Page page)
@@ -888,7 +888,7 @@ public OrcWriteValidationBuilder addMetadataProperty(String key, Slice value)
888888
return this;
889889
}
890890

891-
public OrcWriteValidationBuilder addStripe(int rowCount)
891+
public OrcWriteValidationBuilder addStripe(long rowCount)
892892
{
893893
checksum.addStripe(rowCount);
894894
return this;

presto-orc/src/main/java/com/facebook/presto/orc/OrcWriterFlushStats.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public DistributionStat getDictionaryBytes()
5757
return dictionaryBytes;
5858
}
5959

60-
public void recordStripeWritten(long stripeBytes, int stripeRows, int dictionaryBytes)
60+
public void recordStripeWritten(long stripeBytes, long stripeRows, int dictionaryBytes)
6161
{
6262
this.stripeBytes.add(stripeBytes);
6363
this.stripeRows.add(stripeRows);

presto-orc/src/main/java/com/facebook/presto/orc/OrcWriterStats.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public void recordStripeWritten(
4545
StripeInformation stripeInformation)
4646
{
4747
long stripeBytes = stripeInformation.getTotalLength();
48-
int stripeRows = stripeInformation.getNumberOfRows();
48+
long stripeRows = stripeInformation.getNumberOfRows();
4949
getFlushStats(flushReason).recordStripeWritten(stripeBytes, stripeRows, dictionaryBytes);
5050
allFlush.recordStripeWritten(stripeBytes, stripeRows, dictionaryBytes);
5151
}

presto-orc/src/main/java/com/facebook/presto/orc/StripeReader.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,7 @@ public InputStreamSources createDictionaryStreamSources(Map<StreamId, Stream> st
420420
}
421421

422422
private List<RowGroup> createRowGroups(
423-
int rowsInStripe,
423+
long rowsInStripe,
424424
Map<StreamId, Stream> streams,
425425
Map<StreamId, ValueInputStream<?>> valueStreams,
426426
Map<StreamId, List<RowGroupIndex>> columnIndexes,
@@ -433,7 +433,7 @@ private List<RowGroup> createRowGroups(
433433
for (int rowGroupId : selectedRowGroups) {
434434
Map<StreamId, StreamCheckpoint> checkpoints = getStreamCheckpoints(includedOrcColumns, types, decompressor.isPresent(), rowGroupId, encodings, streams, columnIndexes);
435435
int rowOffset = rowGroupId * rowsInRowGroup;
436-
int rowsInGroup = Math.min(rowsInStripe - rowOffset, rowsInRowGroup);
436+
int rowsInGroup = toIntExact(Math.min(rowsInStripe - rowOffset, rowsInRowGroup));
437437
long minAverageRowBytes = columnIndexes
438438
.entrySet()
439439
.stream()
@@ -536,13 +536,13 @@ private Map<StreamId, List<RowGroupIndex>> readColumnIndexes(Map<StreamId, Strea
536536

537537
private Set<Integer> selectRowGroups(StripeInformation stripe, Map<StreamId, List<RowGroupIndex>> columnIndexes)
538538
{
539-
int rowsInStripe = toIntExact(stripe.getNumberOfRows());
539+
long rowsInStripe = stripe.getNumberOfRows();
540540
int groupsInStripe = ceil(rowsInStripe, rowsInRowGroup);
541541

542542
ImmutableSet.Builder<Integer> selectedRowGroups = ImmutableSet.builder();
543-
int remainingRows = rowsInStripe;
543+
long remainingRows = rowsInStripe;
544544
for (int rowGroup = 0; rowGroup < groupsInStripe; ++rowGroup) {
545-
int rows = Math.min(remainingRows, rowsInRowGroup);
545+
int rows = toIntExact(Math.min(remainingRows, rowsInRowGroup));
546546
Map<Integer, ColumnStatistics> statistics = getRowGroupStatistics(types.get(0), columnIndexes, rowGroup);
547547
if (predicate.matches(rows, statistics)) {
548548
selectedRowGroups.add(rowGroup);
@@ -613,9 +613,10 @@ public static Map<StreamId, DiskRange> getDiskRanges(List<List<Stream>> streams)
613613
/**
614614
* Ceiling of integer division
615615
*/
616-
private static int ceil(int dividend, int divisor)
616+
private static int ceil(long dividend, int divisor)
617617
{
618-
return ((dividend + divisor) - 1) / divisor;
618+
long ceil = ((dividend + divisor) - 1) / divisor;
619+
return toIntExact(ceil);
619620
}
620621

621622
public static class StripeId

presto-orc/src/main/java/com/facebook/presto/orc/metadata/DwrfMetadataReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ private static StripeInformation toStripeInformation(DwrfProto.StripeInformation
286286
keyMetadata = previousKeyMetadata;
287287
}
288288
return new StripeInformation(
289-
toIntExact(stripeInformation.getNumberOfRows()),
289+
stripeInformation.getNumberOfRows(),
290290
stripeInformation.getOffset(),
291291
stripeInformation.getIndexLength(),
292292
stripeInformation.getDataLength(),

presto-orc/src/main/java/com/facebook/presto/orc/metadata/DwrfMetadataWriter.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.facebook.presto.orc.proto.DwrfProto.UserMetadataItem;
2525
import com.facebook.presto.orc.protobuf.ByteString;
2626
import com.facebook.presto.orc.protobuf.MessageLite;
27+
import com.google.common.annotations.VisibleForTesting;
2728
import com.google.common.collect.ImmutableList;
2829
import com.google.common.collect.ImmutableMap;
2930
import com.google.common.io.CountingOutputStream;
@@ -136,7 +137,8 @@ public int writeFooter(SliceOutput output, Footer footer)
136137
return writeProtobufObject(output, footerProtobuf.build());
137138
}
138139

139-
private static DwrfProto.StripeInformation toStripeInformation(StripeInformation stripe)
140+
@VisibleForTesting
141+
static DwrfProto.StripeInformation toStripeInformation(StripeInformation stripe)
140142
{
141143
return DwrfProto.StripeInformation.newBuilder()
142144
.setNumberOfRows(stripe.getNumberOfRows())

presto-orc/src/main/java/com/facebook/presto/orc/metadata/OrcMetadataReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ private static List<StripeInformation> toStripeInformation(List<OrcProto.StripeI
163163
private static StripeInformation toStripeInformation(OrcProto.StripeInformation stripeInformation)
164164
{
165165
return new StripeInformation(
166-
toIntExact(stripeInformation.getNumberOfRows()),
166+
stripeInformation.getNumberOfRows(),
167167
stripeInformation.getOffset(),
168168
stripeInformation.getIndexLength(),
169169
stripeInformation.getDataLength(),

presto-orc/src/main/java/com/facebook/presto/orc/metadata/StripeInformation.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
public class StripeInformation
2525
{
26-
private final int numberOfRows;
26+
private final long numberOfRows;
2727
private final long offset;
2828
private final long indexLength;
2929
private final long dataLength;
@@ -34,7 +34,7 @@ public class StripeInformation
3434
// only set for run start, and reuse until next run
3535
private final List<byte[]> keyMetadata;
3636

37-
public StripeInformation(int numberOfRows, long offset, long indexLength, long dataLength, long footerLength, List<byte[]> keyMetadata)
37+
public StripeInformation(long numberOfRows, long offset, long indexLength, long dataLength, long footerLength, List<byte[]> keyMetadata)
3838
{
3939
// dataLength can be zero when the stripe only contains empty flat maps.
4040
checkArgument(numberOfRows > 0, "Stripe must have at least one row");
@@ -48,7 +48,7 @@ public StripeInformation(int numberOfRows, long offset, long indexLength, long d
4848
this.keyMetadata = ImmutableList.copyOf(requireNonNull(keyMetadata, "keyMetadata is null"));
4949
}
5050

51-
public int getNumberOfRows()
51+
public long getNumberOfRows()
5252
{
5353
return numberOfRows;
5454
}

presto-orc/src/test/java/com/facebook/presto/orc/metadata/TestDwrfMetadataReader.java

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import com.facebook.presto.orc.DwrfEncryptionProvider;
1717
import com.facebook.presto.orc.DwrfKeyProvider;
1818
import com.facebook.presto.orc.OrcCorruptionException;
19-
import com.facebook.presto.orc.OrcDataSource;
2019
import com.facebook.presto.orc.OrcDataSourceId;
2120
import com.facebook.presto.orc.metadata.PostScript.HiveWriterVersion;
2221
import com.facebook.presto.orc.metadata.statistics.StringStatistics;
@@ -30,6 +29,7 @@
3029
import java.io.ByteArrayInputStream;
3130
import java.io.IOException;
3231
import java.io.InputStream;
32+
import java.util.Collections;
3333
import java.util.List;
3434
import java.util.Optional;
3535

@@ -126,6 +126,32 @@ public void testToStripeCacheMode()
126126
assertEquals(DwrfMetadataReader.toStripeCacheMode(DwrfProto.StripeCacheMode.NA), DwrfStripeCacheMode.NONE);
127127
}
128128

129+
@Test
130+
public void testStripeInformationRows()
131+
throws IOException
132+
{
133+
long aLongNumber = Integer.MAX_VALUE + 1000L;
134+
StripeInformation expectedStripeInformation = new StripeInformation(aLongNumber, aLongNumber, aLongNumber, aLongNumber, aLongNumber, ImmutableList.of());
135+
DwrfProto.StripeInformation dwrfStripeInformation = DwrfMetadataWriter.toStripeInformation(expectedStripeInformation);
136+
137+
DwrfProto.Footer protoFooter = DwrfProto.Footer.newBuilder()
138+
.setNumberOfRows(aLongNumber)
139+
.setRowIndexStride(10_000)
140+
.addStripes(dwrfStripeInformation)
141+
.build();
142+
143+
Footer footer = convertToFooter(protoFooter);
144+
assertEquals(footer.getNumberOfRows(), aLongNumber);
145+
assertEquals(footer.getStripes().size(), 1);
146+
StripeInformation actualStripeInformation = footer.getStripes().get(0);
147+
assertEquals(actualStripeInformation.getOffset(), expectedStripeInformation.getOffset());
148+
assertEquals(actualStripeInformation.getNumberOfRows(), expectedStripeInformation.getNumberOfRows());
149+
assertEquals(actualStripeInformation.getIndexLength(), expectedStripeInformation.getIndexLength());
150+
assertEquals(actualStripeInformation.getDataLength(), expectedStripeInformation.getDataLength());
151+
assertEquals(actualStripeInformation.getFooterLength(), expectedStripeInformation.getFooterLength());
152+
assertEquals(actualStripeInformation.getTotalLength(), expectedStripeInformation.getTotalLength());
153+
}
154+
129155
@Test
130156
public void testReadFooter()
131157
throws IOException
@@ -139,20 +165,26 @@ public void testReadFooter()
139165
.setRowIndexStride(rowIndexStride)
140166
.addAllStripeCacheOffsets(stripeCacheOffsets)
141167
.build();
168+
Footer footer = convertToFooter(protoFooter);
169+
170+
assertEquals(footer.getNumberOfRows(), numberOfRows);
171+
assertEquals(footer.getRowsInRowGroup(), rowIndexStride);
172+
assertEquals(footer.getDwrfStripeCacheOffsets().get(), stripeCacheOffsets);
173+
assertEquals(footer.getStripes(), Collections.emptyList());
174+
}
175+
176+
private Footer convertToFooter(DwrfProto.Footer protoFooter)
177+
throws IOException
178+
{
142179
byte[] data = protoFooter.toByteArray();
143180
InputStream inputStream = new ByteArrayInputStream(data);
144-
OrcDataSource orcDataSource = null; // orcDataSource only needed for encrypted files
145181

146-
Footer footer = dwrfMetadataReader.readFooter(HiveWriterVersion.ORC_HIVE_8732,
182+
return dwrfMetadataReader.readFooter(HiveWriterVersion.ORC_HIVE_8732,
147183
inputStream,
148184
DwrfEncryptionProvider.NO_ENCRYPTION,
149185
DwrfKeyProvider.EMPTY,
150-
orcDataSource,
186+
null, // orcDataSource only needed for encrypted files
151187
Optional.empty());
152-
153-
assertEquals(footer.getNumberOfRows(), numberOfRows);
154-
assertEquals(footer.getRowsInRowGroup(), rowIndexStride);
155-
assertEquals(footer.getDwrfStripeCacheOffsets().get(), stripeCacheOffsets);
156188
}
157189

158190
@Test

0 commit comments

Comments
 (0)