Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

preliminary changes for removing hive-dwrf #2

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions presto-orc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,6 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.hive</groupId>
<artifactId>hive-dwrf</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.airlift</groupId>
<artifactId>json</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ abstract class AbstractOrcRecordReader<T extends StreamReader>
private final long splitLength;
private final Set<Integer> presentColumns;
private final long maxBlockBytes;
private final Optional<EncryptionLibrary> encryptionLibrary;
private final Map<Integer, Integer> dwrfEncryptionGroupMap;
private final Map<Integer, Slice> intermediateKeyMetadata;
private long currentPosition;
Expand All @@ -99,8 +98,6 @@ abstract class AbstractOrcRecordReader<T extends StreamReader>
private final StripeReader stripeReader;
private int currentStripe = -1;
private OrcAggregatedMemoryContext currentStripeSystemMemoryContext;
private Optional<DwrfEncryptionInfo> dwrfEncryptionInfo = Optional.empty();

private final long fileRowCount;
private final List<Long> stripeFilePositions;
private long filePosition;
Expand All @@ -115,15 +112,6 @@ abstract class AbstractOrcRecordReader<T extends StreamReader>

private final Map<String, Slice> userMetadata;

private final Optional<OrcWriteValidation> writeValidation;
private final Optional<OrcWriteValidation.WriteChecksumBuilder> writeChecksumBuilder;
private final Optional<OrcWriteValidation.StatisticsValidation> rowGroupStatisticsValidation;
private final Optional<OrcWriteValidation.StatisticsValidation> stripeStatisticsValidation;
private final Optional<OrcWriteValidation.StatisticsValidation> fileStatisticsValidation;
private final Optional<OrcFileIntrospector> fileIntrospector;

private final RuntimeStats runtimeStats;

public AbstractOrcRecordReader(
Map<Integer, Type> includedColumns,
Map<Integer, List<Subfield>> requiredSubfields,
Expand All @@ -139,8 +127,6 @@ public AbstractOrcRecordReader(
List<OrcType> types,
Optional<OrcDecompressor> decompressor,
Optional<EncryptionLibrary> encryptionLibrary,
Map<Integer, Integer> dwrfEncryptionGroupMap,
Map<Integer, Slice> columnToIntermediateKeyMap,
int rowsInRowGroup,
DateTimeZone hiveStorageTimeZone,
PostScript.HiveWriterVersion hiveWriterVersion,
Expand All @@ -152,10 +138,8 @@ public AbstractOrcRecordReader(
OrcAggregatedMemoryContext systemMemoryUsage,
Optional<OrcWriteValidation> writeValidation,
int initialBatchSize,
StripeMetadataSource stripeMetadataSource,
boolean cacheable,
RuntimeStats runtimeStats,
Optional<OrcFileIntrospector> fileIntrospector)
RuntimeStats runtimeStats)
{
requireNonNull(includedColumns, "includedColumns is null");
requireNonNull(predicate, "predicate is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,14 @@ public OrcBatchRecordReader(
List<OrcType> types,
Optional<OrcDecompressor> decompressor,
Optional<EncryptionLibrary> encryptionLibrary,
Map<Integer, Integer> dwrfEncryptionGroupMap,
Map<Integer, Slice> intermediateKeyMetadata,
int rowsInRowGroup,
DateTimeZone hiveStorageTimeZone,
OrcRecordReaderOptions options,
HiveWriterVersion hiveWriterVersion,
MetadataReader metadataReader,
Map<String, Slice> userMetadata,
OrcAggregatedMemoryContext systemMemoryUsage,
Optional<OrcWriteValidation> writeValidation,
int initialBatchSize,
StripeMetadataSource stripeMetadataSource,
boolean cacheable,
RuntimeStats runtimeStats)
throws OrcCorruptionException
Expand All @@ -91,8 +87,6 @@ public OrcBatchRecordReader(
types,
decompressor,
encryptionLibrary,
dwrfEncryptionGroupMap,
intermediateKeyMetadata,
rowsInRowGroup,
hiveStorageTimeZone,
hiveWriterVersion,
Expand All @@ -102,7 +96,6 @@ public OrcBatchRecordReader(
options.getMaxBlockSize(),
userMetadata,
systemMemoryUsage,
writeValidation,
initialBatchSize,
stripeMetadataSource,
cacheable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public class OrcOutputBuffer
private final int minCompressibleSize;

private final CompressionBufferPool compressionBufferPool;
private final Optional<DwrfDataEncryptor> dwrfEncryptor;
@Nullable
private final Compressor compressor;

Expand All @@ -76,10 +75,9 @@ public class OrcOutputBuffer
*/
private int bufferPosition;

public OrcOutputBuffer(ColumnWriterOptions columnWriterOptions, Optional<DwrfDataEncryptor> dwrfEncryptor)
public OrcOutputBuffer(ColumnWriterOptions columnWriterOptions)
{
requireNonNull(columnWriterOptions, "columnWriterOptions is null");
requireNonNull(dwrfEncryptor, "dwrfEncryptor is null");
int maxBufferSize = columnWriterOptions.getCompressionMaxBufferSize();
checkArgument(maxBufferSize > PAGE_HEADER_SIZE, "maximum buffer size should be greater than page header size");

Expand All @@ -93,7 +91,6 @@ public OrcOutputBuffer(ColumnWriterOptions columnWriterOptions, Optional<DwrfDat
this.slice = wrappedBuffer(buffer);

this.compressionBufferPool = columnWriterOptions.getCompressionBufferPool();
this.dwrfEncryptor = requireNonNull(dwrfEncryptor, "dwrfEncryptor is null");

if (compressionKind == CompressionKind.NONE) {
this.compressor = null;
Expand Down Expand Up @@ -144,7 +141,7 @@ public int writeDataTo(SliceOutput outputStream)

public long getCheckpoint()
{
if (compressor == null && !dwrfEncryptor.isPresent()) {
if (compressor == null) {
return size();
}
return InputStreamCheckpoint.createInputStreamCheckpoint(getCompressedOutputSize(), bufferPosition);
Expand Down Expand Up @@ -482,7 +479,7 @@ private void writeChunkToOutputStream(byte[] chunk, int offset, int length)
initCompressedOutputStream();
}

if (compressor == null && !dwrfEncryptor.isPresent()) {
if (compressor == null ) {
compressedOutputStream.write(chunk, offset, length);
return;
}
Expand All @@ -503,15 +500,15 @@ private void writeChunkToOutputStream(byte[] chunk, int offset, int length)
offset = 0;
}
}
if (dwrfEncryptor.isPresent()) {
chunk = dwrfEncryptor.get().encrypt(chunk, offset, length);
length = chunk.length;
offset = 0;
// size after encryption should not exceed what the 3 byte header can hold (2^23)
if (length > 8388608) {
throw new OrcEncryptionException("Encrypted data size %s exceeds limit of 2^23", length);
}
}
// if (dwrfEncryptor.isPresent()) {
// chunk = dwrfEncryptor.get().encrypt(chunk, offset, length);
// length = chunk.length;
// offset = 0;
// // size after encryption should not exceed what the 3 byte header can hold (2^23)
// if (length > 8388608) {
// throw new OrcEncryptionException("Encrypted data size %s exceeds limit of 2^23", length);
// }
// }

int header = isCompressed ? length << 1 : (length << 1) + 1;
writeChunkedOutput(chunk, offset, length, header);
Expand Down
Loading
Loading