Skip to content

Commit 19b7ba9

Browse files
author
Arunachalam Thirupathi
committed
Optimize non null filter implementation in orc readers
Orc reader filters map entries if the keys are null. Selective map readers combine the key readers with non null filters. When key column does not have a present stream, then this filter can be ignored as the key does not have any null. This change replaces the file level filter with rowGroup filter.
1 parent 29afafe commit 19b7ba9

File tree

6 files changed

+134
-76
lines changed

6 files changed

+134
-76
lines changed

presto-orc/src/main/java/com/facebook/presto/orc/reader/LongDictionarySelectiveStreamReader.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ public class LongDictionarySelectiveStreamReader
5757
private InputStreamSource<BooleanInputStream> presentStreamSource = getBooleanMissingStreamSource();
5858
@Nullable
5959
private BooleanInputStream presentStream;
60+
@Nullable
61+
private TupleDomainFilter rowGroupFilter;
6062

6163
private LongDictionaryProvider dictionaryProvider;
6264
private int dictionarySize;
@@ -107,7 +109,7 @@ public int read(int offset, int[] positions, int positionCount)
107109

108110
// TODO In case of all nulls, the stream type will be LongDirect
109111
int streamPosition;
110-
if (context.getFilter() == null) {
112+
if (rowGroupFilter == null) {
111113
streamPosition = readNoFilter(positions, positionCount);
112114
}
113115
else {
@@ -158,7 +160,6 @@ private int readWithFilter(int[] positions, int positionCount)
158160
{
159161
outputPositionCount = 0;
160162
int streamPosition = 0;
161-
TupleDomainFilter filter = context.getFilter();
162163
for (int i = 0; i < positionCount; i++) {
163164
int position = positions[i];
164165
if (position > streamPosition) {
@@ -167,7 +168,7 @@ private int readWithFilter(int[] positions, int positionCount)
167168
}
168169

169170
if (presentStream != null && !presentStream.nextBit()) {
170-
if ((context.isNonDeterministicFilter() && context.getFilter().testNull()) || context.isNullsAllowed()) {
171+
if ((context.isNonDeterministicFilter() && rowGroupFilter.testNull()) || context.isNullsAllowed()) {
171172
if (context.isOutputRequired()) {
172173
nulls[outputPositionCount] = true;
173174
values[outputPositionCount] = 0;
@@ -185,7 +186,7 @@ private int readWithFilter(int[] positions, int positionCount)
185186

186187
if (dictionaryFilterStatus != null) {
187188
if (dictionaryFilterStatus[id] == FILTER_NOT_EVALUATED) {
188-
if (filter.testLong(value)) {
189+
if (rowGroupFilter.testLong(value)) {
189190
dictionaryFilterStatus[id] = FILTER_PASSED;
190191
}
191192
else {
@@ -195,11 +196,11 @@ private int readWithFilter(int[] positions, int positionCount)
195196
filterPassed = dictionaryFilterStatus[id] == FILTER_PASSED;
196197
}
197198
else {
198-
filterPassed = filter.testLong(value);
199+
filterPassed = rowGroupFilter.testLong(value);
199200
}
200201
}
201202
else {
202-
filterPassed = filter.testLong(value);
203+
filterPassed = rowGroupFilter.testLong(value);
203204
}
204205
if (filterPassed) {
205206
if (context.isOutputRequired()) {
@@ -215,9 +216,9 @@ private int readWithFilter(int[] positions, int positionCount)
215216

216217
streamPosition++;
217218

218-
outputPositionCount -= filter.getPrecedingPositionsToFail();
219+
outputPositionCount -= rowGroupFilter.getPrecedingPositionsToFail();
219220

220-
int succeedingPositionsToFail = filter.getSucceedingPositionsToFail();
221+
int succeedingPositionsToFail = rowGroupFilter.getSucceedingPositionsToFail();
221222
if (succeedingPositionsToFail > 0) {
222223
int positionsToSkip = 0;
223224
for (int j = 0; j < succeedingPositionsToFail; j++) {
@@ -256,12 +257,15 @@ private void skip(int items)
256257
private void openRowGroup()
257258
throws IOException
258259
{
260+
presentStream = presentStreamSource.openStream();
261+
rowGroupFilter = context.getRowGroupFilter(presentStream);
262+
259263
// read the dictionary
260264
if (!dictionaryOpen && dictionarySize > 0) {
261265
DictionaryResult dictionaryResult = dictionaryProvider.getDictionary(context.getStreamDescriptor(), dictionary, dictionarySize);
262266
dictionary = dictionaryResult.dictionaryBuffer();
263267
isDictionaryOwner = dictionaryResult.isBufferOwner();
264-
if (!context.isLowMemory() && context.getFilter() != null && !context.isNonDeterministicFilter()) {
268+
if (!context.isLowMemory() && rowGroupFilter != null && !context.isNonDeterministicFilter()) {
265269
dictionaryFilterStatus = ensureCapacity(dictionaryFilterStatus, dictionarySize);
266270
Arrays.fill(dictionaryFilterStatus, 0, dictionarySize, FILTER_NOT_EVALUATED);
267271
}
@@ -271,7 +275,6 @@ private void openRowGroup()
271275
}
272276
dictionaryOpen = true;
273277

274-
presentStream = presentStreamSource.openStream();
275278
inDictionaryStream = inDictionaryStreamSource.openStream();
276279
dataStream = dataStreamSource.openStream();
277280

@@ -314,6 +317,7 @@ public void startStripe(Stripe stripe)
314317
readOffset = 0;
315318

316319
presentStream = null;
320+
rowGroupFilter = null;
317321
inDictionaryStream = null;
318322
dataStream = null;
319323

@@ -329,6 +333,7 @@ public void startRowGroup(InputStreamSources dataStreamSources)
329333

330334
readOffset = 0;
331335
presentStream = null;
336+
rowGroupFilter = null;
332337
inDictionaryStream = null;
333338
dataStream = null;
334339

presto-orc/src/main/java/com/facebook/presto/orc/reader/LongDirectSelectiveStreamReader.java

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ public class LongDirectSelectiveStreamReader
4747
private InputStreamSource<BooleanInputStream> presentStreamSource = getBooleanMissingStreamSource();
4848
@Nullable
4949
private BooleanInputStream presentStream;
50+
@Nullable
51+
private TupleDomainFilter rowGroupFilter;
5052

5153
private InputStreamSource<LongInputStream> dataStreamSource = getLongMissingStreamSource();
5254
@Nullable
@@ -91,7 +93,7 @@ public int read(int offset, int[] positions, int positionCount)
9193
if (dataStream == null && presentStream != null) {
9294
streamPosition = readAllNulls(positions, positionCount);
9395
}
94-
else if (context.getFilter() == null) {
96+
else if (rowGroupFilter == null) {
9597
streamPosition = readNoFilter(positions, positionCount);
9698
}
9799
else {
@@ -108,16 +110,15 @@ private int readAllNulls(int[] positions, int positionCount)
108110
{
109111
presentStream.skip(positions[positionCount - 1]);
110112

111-
TupleDomainFilter filter = context.getFilter();
112113
if (context.isNonDeterministicFilter()) {
113114
outputPositionCount = 0;
114115
for (int i = 0; i < positionCount; i++) {
115-
if (filter.testNull()) {
116+
if (rowGroupFilter.testNull()) {
116117
outputPositionCount++;
117118
}
118119
else {
119-
outputPositionCount -= filter.getPrecedingPositionsToFail();
120-
i += filter.getSucceedingPositionsToFail();
120+
outputPositionCount -= rowGroupFilter.getPrecedingPositionsToFail();
121+
i += rowGroupFilter.getSucceedingPositionsToFail();
121122
}
122123
}
123124
}
@@ -194,16 +195,15 @@ private int readNoFilter(int[] positions, int positionCount)
194195
private int readWithFilter(int[] positions, int positionCount)
195196
throws IOException
196197
{
197-
TupleDomainFilter filter = context.getFilter();
198198
if (positions[positionCount - 1] == positionCount - 1) {
199199
// no skipping
200200
if (presentStream == null) {
201201
// no nulls
202-
if (!context.isOutputRequired() && !filter.isPositionalFilter()) {
202+
if (!context.isOutputRequired() && !rowGroupFilter.isPositionalFilter()) {
203203
// no output; just filter
204204
for (int i = 0; i < positionCount; i++) {
205205
long value = dataStream.next();
206-
if (filter.testLong(value)) {
206+
if (rowGroupFilter.testLong(value)) {
207207
outputPositions[outputPositionCount] = positions[i];
208208
outputPositionCount++;
209209
}
@@ -223,7 +223,7 @@ private int readWithFilter(int[] positions, int positionCount)
223223
}
224224

225225
if (presentStream != null && !presentStream.nextBit()) {
226-
if ((context.isNonDeterministicFilter() && filter.testNull()) || context.isNullsAllowed()) {
226+
if ((context.isNonDeterministicFilter() && rowGroupFilter.testNull()) || context.isNullsAllowed()) {
227227
if (context.isOutputRequired()) {
228228
nulls[outputPositionCount] = true;
229229
values[outputPositionCount] = 0;
@@ -234,7 +234,7 @@ private int readWithFilter(int[] positions, int positionCount)
234234
}
235235
else {
236236
long value = dataStream.next();
237-
if (filter.testLong(value)) {
237+
if (rowGroupFilter.testLong(value)) {
238238
if (context.isOutputRequired()) {
239239
values[outputPositionCount] = value;
240240
if (context.isNullsAllowed() && presentStream != null) {
@@ -248,9 +248,9 @@ private int readWithFilter(int[] positions, int positionCount)
248248

249249
streamPosition++;
250250

251-
outputPositionCount -= filter.getPrecedingPositionsToFail();
251+
outputPositionCount -= rowGroupFilter.getPrecedingPositionsToFail();
252252

253-
int succeedingPositionsToFail = filter.getSucceedingPositionsToFail();
253+
int succeedingPositionsToFail = rowGroupFilter.getSucceedingPositionsToFail();
254254
if (succeedingPositionsToFail > 0) {
255255
int positionsToSkip = 0;
256256
for (int j = 0; j < succeedingPositionsToFail; j++) {
@@ -285,6 +285,7 @@ private void openRowGroup()
285285
throws IOException
286286
{
287287
presentStream = presentStreamSource.openStream();
288+
rowGroupFilter = context.getRowGroupFilter(presentStream);
288289
dataStream = dataStreamSource.openStream();
289290

290291
rowGroupOpen = true;
@@ -327,6 +328,7 @@ public void startStripe(Stripe stripe)
327328
readOffset = 0;
328329

329330
presentStream = null;
331+
rowGroupFilter = null;
330332
dataStream = null;
331333

332334
rowGroupOpen = false;
@@ -341,6 +343,7 @@ public void startRowGroup(InputStreamSources dataStreamSources)
341343
readOffset = 0;
342344

343345
presentStream = null;
346+
rowGroupFilter = null;
344347
dataStream = null;
345348

346349
rowGroupOpen = false;
@@ -353,6 +356,7 @@ public void close()
353356
outputPositions = null;
354357

355358
presentStream = null;
359+
rowGroupFilter = null;
356360
presentStreamSource = null;
357361
dataStream = null;
358362
dataStreamSource = null;

presto-orc/src/main/java/com/facebook/presto/orc/reader/SelectiveReaderContext.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.facebook.presto.common.type.Type;
1818
import com.facebook.presto.orc.OrcAggregatedMemoryContext;
1919
import com.facebook.presto.orc.StreamDescriptor;
20+
import com.facebook.presto.orc.stream.BooleanInputStream;
2021

2122
import javax.annotation.Nullable;
2223

@@ -75,8 +76,14 @@ public Type getOutputType()
7576
}
7677

7778
@Nullable
78-
public TupleDomainFilter getFilter()
79+
public TupleDomainFilter getRowGroupFilter(BooleanInputStream presentStream)
7980
{
81+
if (isOutputRequired() && presentStream == null && filter == TupleDomainFilter.IS_NOT_NULL) {
82+
// Readers don't handle the outputRequired == false and filter = null. When that is fixed
83+
// outputRequired can be removed from the above condition.
84+
// When present stream is null, there are no nulls in the Column. The filter is no-op
85+
return null;
86+
}
8087
return filter;
8188
}
8289

presto-orc/src/main/java/com/facebook/presto/orc/reader/SliceDictionarySelectiveReader.java

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,10 @@ public class SliceDictionarySelectiveReader
102102
private VariableWidthBlock dictionary = new VariableWidthBlock(1, wrappedBuffer(EMPTY_DICTIONARY_DATA), EMPTY_DICTIONARY_OFFSETS, Optional.of(new boolean[] {true}));
103103

104104
private InputStreamSource<BooleanInputStream> presentStreamSource = getBooleanMissingStreamSource();
105+
@Nullable
105106
private BooleanInputStream presentStream;
107+
@Nullable
108+
private TupleDomainFilter rowGroupFilter;
106109

107110
private BooleanInputStream inDictionaryStream;
108111

@@ -173,7 +176,7 @@ public int read(int offset, int[] positions, int positionCount)
173176
if (dataStream == null && presentStream != null) {
174177
streamPosition = readAllNulls(positions, positionCount);
175178
}
176-
else if (context.getFilter() == null) {
179+
else if (rowGroupFilter == null) {
177180
streamPosition = readNoFilter(positions, positionCount);
178181
}
179182
else {
@@ -222,7 +225,7 @@ private int readWithFilter(int[] positions, int positionCount)
222225
}
223226

224227
if (presentStream != null && !presentStream.nextBit()) {
225-
if ((context.isNonDeterministicFilter() && context.getFilter().testNull()) || context.isNullsAllowed()) {
228+
if ((context.isNonDeterministicFilter() && rowGroupFilter.testNull()) || context.isNullsAllowed()) {
226229
if (context.isOutputRequired()) {
227230
values[outputPositionCount] = currentDictionarySize - 1;
228231
}
@@ -269,9 +272,9 @@ private int readWithFilter(int[] positions, int positionCount)
269272
}
270273
streamPosition++;
271274

272-
if (context.getFilter() != null) {
273-
outputPositionCount -= context.getFilter().getPrecedingPositionsToFail();
274-
int succeedingPositionsToFail = context.getFilter().getSucceedingPositionsToFail();
275+
if (rowGroupFilter != null) {
276+
outputPositionCount -= rowGroupFilter.getPrecedingPositionsToFail();
277+
int succeedingPositionsToFail = rowGroupFilter.getSucceedingPositionsToFail();
275278
if (succeedingPositionsToFail > 0) {
276279
int positionsToSkip = 0;
277280
for (int j = 0; j < succeedingPositionsToFail; j++) {
@@ -289,19 +292,19 @@ private int readWithFilter(int[] positions, int positionCount)
289292

290293
private byte evaluateFilter(int position, int index, int length)
291294
{
292-
if (!context.getFilter().testLength(length)) {
295+
if (!rowGroupFilter.testLength(length)) {
293296
return FILTER_FAILED;
294297
}
295298

296299
int currentLength = dictionaryOffsetVector[index + 1] - dictionaryOffsetVector[index];
297300
if (isCharType && length != currentLength) {
298301
System.arraycopy(dictionaryData, dictionaryOffsetVector[index], valueWithPadding, 0, currentLength);
299302
Arrays.fill(valueWithPadding, currentLength, length, (byte) ' ');
300-
if (!context.getFilter().testBytes(valueWithPadding, 0, length)) {
303+
if (!rowGroupFilter.testBytes(valueWithPadding, 0, length)) {
301304
return FILTER_FAILED;
302305
}
303306
}
304-
else if (!context.getFilter().testBytes(dictionaryData, dictionaryOffsetVector[index], length)) {
307+
else if (!rowGroupFilter.testBytes(dictionaryData, dictionaryOffsetVector[index], length)) {
305308
return FILTER_FAILED;
306309
}
307310

@@ -318,16 +321,15 @@ private int readAllNulls(int[] positions, int positionCount)
318321
{
319322
presentStream.skip(positions[positionCount - 1]);
320323

321-
TupleDomainFilter filter = context.getFilter();
322324
if (context.isNonDeterministicFilter()) {
323325
outputPositionCount = 0;
324326
for (int i = 0; i < positionCount; i++) {
325-
if (filter.testNull()) {
327+
if (rowGroupFilter.testNull()) {
326328
outputPositionCount++;
327329
}
328330
else {
329-
outputPositionCount -= filter.getPrecedingPositionsToFail();
330-
i += filter.getSucceedingPositionsToFail();
331+
outputPositionCount -= rowGroupFilter.getPrecedingPositionsToFail();
332+
i += rowGroupFilter.getSucceedingPositionsToFail();
331333
}
332334
}
333335
}
@@ -476,6 +478,9 @@ public void throwAnyError(int[] positions, int positionCount)
476478
private void openRowGroup()
477479
throws IOException
478480
{
481+
presentStream = presentStreamSource.openStream();
482+
rowGroupFilter = context.getRowGroupFilter(presentStream);
483+
479484
// read the dictionary
480485
if (!stripeDictionaryOpen) {
481486
if (stripeDictionarySize > 0) {
@@ -554,7 +559,6 @@ private void openRowGroup()
554559

555560
dictionaryOffsetVector[currentDictionarySize] = dictionaryOffsetVector[currentDictionarySize - 1];
556561
stripeDictionaryOpen = true;
557-
presentStream = presentStreamSource.openStream();
558562
inDictionaryStream = inDictionaryStreamSource.openStream();
559563
dataStream = dataStreamSource.openStream();
560564

@@ -623,6 +627,7 @@ public void startStripe(Stripe stripe)
623627
readOffset = 0;
624628

625629
presentStream = null;
630+
rowGroupFilter = null;
626631
inDictionaryStream = null;
627632
dataStream = null;
628633

@@ -644,6 +649,7 @@ public void startRowGroup(InputStreamSources dataStreamSources)
644649
readOffset = 0;
645650

646651
presentStream = null;
652+
rowGroupFilter = null;
647653
inDictionaryStream = null;
648654
dataStream = null;
649655

0 commit comments

Comments
 (0)