Skip to content

Commit 4c0c7f9

Browse files
committed
feat: introduce delete file manager skeleton. Use in ArrowReader
1 parent 32404bf commit 4c0c7f9

File tree

4 files changed

+197
-42
lines changed

4 files changed

+197
-42
lines changed
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::expr::BoundPredicate;
19+
use crate::io::FileIO;
20+
use crate::scan::FileScanTaskDeleteFile;
21+
use crate::spec::SchemaRef;
22+
use crate::{Error, ErrorKind, Result};
23+
24+
pub(crate) struct DeleteFileManager {}
25+
26+
#[allow(unused_variables)]
27+
impl DeleteFileManager {
28+
pub(crate) async fn load_deletes(
29+
delete_file_entries: Vec<FileScanTaskDeleteFile>,
30+
file_io: FileIO,
31+
concurrency_limit_data_files: usize,
32+
) -> Result<DeleteFileManager> {
33+
// TODO
34+
35+
if !delete_file_entries.is_empty() {
36+
Err(Error::new(
37+
ErrorKind::FeatureUnsupported,
38+
"Reading delete files is not yet supported",
39+
))
40+
} else {
41+
Ok(DeleteFileManager {})
42+
}
43+
}
44+
45+
pub(crate) fn build_delete_predicate(
46+
&self,
47+
snapshot_schema: SchemaRef,
48+
) -> Result<Option<BoundPredicate>> {
49+
// TODO
50+
51+
Ok(None)
52+
}
53+
54+
pub(crate) fn get_positional_delete_indexes_for_data_file(
55+
&self,
56+
data_file_path: &str,
57+
) -> Option<Vec<usize>> {
58+
// TODO
59+
60+
None
61+
}
62+
}

crates/iceberg/src/arrow/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
2020
mod schema;
2121
pub use schema::*;
22+
pub(crate) mod delete_file_manager;
2223
mod reader;
2324
pub(crate) mod record_batch_projector;
2425
pub(crate) mod record_batch_transformer;

crates/iceberg/src/arrow/reader.rs

Lines changed: 128 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FI
3939
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
4040
use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};
4141

42+
use crate::arrow::delete_file_manager::DeleteFileManager;
4243
use crate::arrow::record_batch_transformer::RecordBatchTransformer;
4344
use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
4445
use crate::error::Result;
@@ -145,6 +146,7 @@ impl ArrowReader {
145146
file_io,
146147
row_group_filtering_enabled,
147148
row_selection_enabled,
149+
concurrency_limit_data_files,
148150
)
149151
})
150152
.map_err(|err| {
@@ -162,30 +164,24 @@ impl ArrowReader {
162164
file_io: FileIO,
163165
row_group_filtering_enabled: bool,
164166
row_selection_enabled: bool,
167+
concurrency_limit_data_files: usize,
165168
) -> Result<ArrowRecordBatchStream> {
166-
// TODO: add support for delete files
167-
if !task.deletes.is_empty() {
168-
return Err(Error::new(
169-
ErrorKind::FeatureUnsupported,
170-
"Delete files are not yet supported",
171-
));
172-
}
173-
174-
// Get the metadata for the Parquet file we need to read and build
175-
// a reader for the data within
176-
let parquet_file = file_io.new_input(&task.data_file_path)?;
177-
let (parquet_metadata, parquet_reader) =
178-
try_join!(parquet_file.metadata(), parquet_file.reader())?;
179-
let parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);
180-
181-
let should_load_page_index = row_selection_enabled && task.predicate.is_some();
182-
183-
// Start creating the record batch stream, which wraps the parquet file reader
184-
let mut record_batch_stream_builder = ParquetRecordBatchStreamBuilder::new_with_options(
185-
parquet_file_reader,
186-
ArrowReaderOptions::new().with_page_index(should_load_page_index),
187-
)
188-
.await?;
169+
let should_load_page_index =
170+
(row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty();
171+
172+
// concurrently retrieve delete files and create RecordBatchStreamBuilder
173+
let (delete_file_manager, mut record_batch_stream_builder) = try_join!(
174+
DeleteFileManager::load_deletes(
175+
task.deletes.clone(),
176+
file_io.clone(),
177+
concurrency_limit_data_files
178+
),
179+
Self::create_parquet_record_batch_stream_builder(
180+
&task.data_file_path,
181+
file_io.clone(),
182+
should_load_page_index,
183+
)
184+
)?;
189185

190186
// Create a projection mask for the batch stream to select which columns in the
191187
// Parquet file that we want in the response
@@ -197,7 +193,7 @@ impl ArrowReader {
197193
)?;
198194
record_batch_stream_builder = record_batch_stream_builder.with_projection(projection_mask);
199195

200-
// RecordBatchTransformer performs any required transformations on the RecordBatches
196+
// RecordBatchTransformer performs any transformations required on the RecordBatches
201197
// that come back from the file, such as type promotion, default column insertion
202198
// and column re-ordering
203199
let mut record_batch_transformer =
@@ -207,49 +203,102 @@ impl ArrowReader {
207203
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
208204
}
209205

210-
if let Some(predicate) = &task.predicate {
206+
let delete_predicate = delete_file_manager.build_delete_predicate(task.schema.clone())?;
207+
208+
// In addition to the optional predicate supplied in the `FileScanTask`,
209+
// we also have an optional predicate resulting from equality delete files.
210+
// If both are present, we logical-AND them together to form a single filter
211+
// predicate that we can pass to the `RecordBatchStreamBuilder`.
212+
let final_predicate = match (&task.predicate, delete_predicate) {
213+
(None, None) => None,
214+
(Some(predicate), None) => Some(predicate.clone()),
215+
(None, Some(ref predicate)) => Some(predicate.clone()),
216+
(Some(filter_predicate), Some(delete_predicate)) => {
217+
Some(filter_predicate.clone().and(delete_predicate))
218+
}
219+
};
220+
221+
// There are two possible sources both for potential lists of selected RowGroup indices,
222+
// and for `RowSelection`s.
223+
// Selected RowGroup index lists can come from two sources:
224+
// * When there are equality delete files that are applicable;
225+
// * When there is a scan predicate and row_group_filtering_enabled = true.
226+
// `RowSelection`s can be created in either or both of the following cases:
227+
// * When there are positional delete files that are applicable;
228+
// * When there is a scan predicate and row_selection_enabled = true
229+
// Note that, in the former case we only perform row group filtering when
230+
// there is a scan predicate AND row_group_filtering_enabled = true,
231+
// but we perform row selection filtering if there are applicable
232+
// equality delete files OR (there is a scan predicate AND row_selection_enabled),
233+
// since the only implemented method of applying positional deletes is
234+
// by using a `RowSelection`.
235+
let mut selected_row_group_indices = None;
236+
let mut row_selection = None;
237+
238+
if let Some(predicate) = final_predicate {
211239
let (iceberg_field_ids, field_id_map) = Self::build_field_id_set_and_map(
212240
record_batch_stream_builder.parquet_schema(),
213-
predicate,
241+
&predicate,
214242
)?;
215243

216244
let row_filter = Self::get_row_filter(
217-
predicate,
245+
&predicate,
218246
record_batch_stream_builder.parquet_schema(),
219247
&iceberg_field_ids,
220248
&field_id_map,
221249
)?;
222250
record_batch_stream_builder = record_batch_stream_builder.with_row_filter(row_filter);
223251

224-
let mut selected_row_groups = None;
225252
if row_group_filtering_enabled {
226253
let result = Self::get_selected_row_group_indices(
227-
predicate,
254+
&predicate,
228255
record_batch_stream_builder.metadata(),
229256
&field_id_map,
230257
&task.schema,
231258
)?;
232259

233-
selected_row_groups = Some(result);
260+
selected_row_group_indices = Some(result);
234261
}
235262

236263
if row_selection_enabled {
237-
let row_selection = Self::get_row_selection(
238-
predicate,
264+
row_selection = Some(Self::get_row_selection_for_filter_predicate(
265+
&predicate,
239266
record_batch_stream_builder.metadata(),
240-
&selected_row_groups,
267+
&selected_row_group_indices,
241268
&field_id_map,
242269
&task.schema,
243-
)?;
244-
245-
record_batch_stream_builder =
246-
record_batch_stream_builder.with_row_selection(row_selection);
270+
)?);
247271
}
272+
}
248273

249-
if let Some(selected_row_groups) = selected_row_groups {
250-
record_batch_stream_builder =
251-
record_batch_stream_builder.with_row_groups(selected_row_groups);
252-
}
274+
let positional_delete_indexes =
275+
delete_file_manager.get_positional_delete_indexes_for_data_file(&task.data_file_path);
276+
277+
if let Some(positional_delete_indexes) = positional_delete_indexes {
278+
let delete_row_selection = Self::build_deletes_row_selection(
279+
record_batch_stream_builder.metadata(),
280+
&selected_row_group_indices,
281+
&positional_delete_indexes,
282+
)?;
283+
284+
// merge the row selection from the delete files with the row selection
285+
// from the filter predicate, if there is one from the filter predicate
286+
row_selection = match row_selection {
287+
None => Some(delete_row_selection),
288+
Some(filter_row_selection) => {
289+
Some(filter_row_selection.intersection(&delete_row_selection))
290+
}
291+
};
292+
}
293+
294+
if let Some(row_selection) = row_selection {
295+
record_batch_stream_builder =
296+
record_batch_stream_builder.with_row_selection(row_selection);
297+
}
298+
299+
if let Some(selected_row_group_indices) = selected_row_group_indices {
300+
record_batch_stream_builder =
301+
record_batch_stream_builder.with_row_groups(selected_row_group_indices);
253302
}
254303

255304
// Build the batch stream and send all the RecordBatches that it generates
@@ -265,6 +314,43 @@ impl ArrowReader {
265314
Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
266315
}
267316

317+
async fn create_parquet_record_batch_stream_builder(
318+
data_file_path: &str,
319+
file_io: FileIO,
320+
should_load_page_index: bool,
321+
) -> Result<ParquetRecordBatchStreamBuilder<ArrowFileReader<impl FileRead + Sized>>> {
322+
// Get the metadata for the Parquet file we need to read and build
323+
// a reader for the data within
324+
let parquet_file = file_io.new_input(data_file_path)?;
325+
let (parquet_metadata, parquet_reader) =
326+
try_join!(parquet_file.metadata(), parquet_file.reader())?;
327+
let parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);
328+
329+
// Create the record batch stream builder, which wraps the parquet file reader
330+
let record_batch_stream_builder = ParquetRecordBatchStreamBuilder::new_with_options(
331+
parquet_file_reader,
332+
ArrowReaderOptions::new().with_page_index(should_load_page_index),
333+
)
334+
.await?;
335+
Ok(record_batch_stream_builder)
336+
}
337+
338+
/// computes a `RowSelection` from positional delete indices.
339+
///
340+
/// Using the Parquet page index, we build a `RowSelection` that rejects rows that are indicated
341+
/// as having been deleted by a positional delete, taking into account any row groups that have
342+
/// been skipped entirely by the filter predicate
343+
#[allow(unused)]
344+
fn build_deletes_row_selection(
345+
parquet_metadata: &Arc<ParquetMetaData>,
346+
selected_row_groups: &Option<Vec<usize>>,
347+
positional_deletes: &[usize],
348+
) -> Result<RowSelection> {
349+
// TODO
350+
351+
Ok(RowSelection::default())
352+
}
353+
268354
fn build_field_id_set_and_map(
269355
parquet_schema: &SchemaDescriptor,
270356
predicate: &BoundPredicate,
@@ -475,7 +561,7 @@ impl ArrowReader {
475561
Ok(results)
476562
}
477563

478-
fn get_row_selection(
564+
fn get_row_selection_for_filter_predicate(
479565
predicate: &BoundPredicate,
480566
parquet_metadata: &Arc<ParquetMetaData>,
481567
selected_row_groups: &Option<Vec<usize>>,

crates/iceberg/src/expr/predicate.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -724,6 +724,12 @@ pub enum BoundPredicate {
724724
Set(SetExpression<BoundReference>),
725725
}
726726

727+
impl BoundPredicate {
728+
pub(crate) fn and(self, other: BoundPredicate) -> BoundPredicate {
729+
BoundPredicate::And(LogicalExpression::new([Box::new(self), Box::new(other)]))
730+
}
731+
}
732+
727733
impl Display for BoundPredicate {
728734
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
729735
match self {

0 commit comments

Comments
 (0)