Skip to content

Commit f496527

Browse files
committed
feat: complete work to support positional deletes in table scans
1 parent 641a675 commit f496527

File tree

3 files changed

+687
-86
lines changed

3 files changed

+687
-86
lines changed

crates/iceberg/src/arrow/reader.rs

+113-67
Original file line numberDiff line numberDiff line change
@@ -194,53 +194,68 @@ impl ArrowReader {
194194
};
195195

196196
let (tx, rx) = channel(concurrency_limit_data_files);
197+
let mut channel_for_error = tx.clone();
197198

198-
#[allow(clippy::redundant_closure)] // clippy's recommendation fails to compile
199-
futures::stream::iter(delete_file_entries.iter().map(|df| crate::Result::Ok(df)))
200-
.try_for_each_concurrent(concurrency_limit_data_files, |entry| {
201-
let file_io = file_io.clone();
202-
let mut tx = tx.clone();
203-
async move {
204-
let FileScanTaskDeleteFile {
205-
ref file_path,
206-
file_type,
207-
} = entry;
208-
209-
let record_batch_stream =
210-
Self::create_parquet_record_batch_stream_builder(
211-
file_path,
212-
file_io.clone(),
213-
false,
214-
)
215-
.await?
216-
.build()?
217-
.map(|item| match item {
218-
Ok(val) => Ok(val),
219-
Err(err) => Err(Error::new(ErrorKind::DataInvalid, err.to_string())
220-
.with_source(err)),
221-
})
222-
.boxed();
223-
224-
let result = match file_type {
225-
DataContentType::PositionDeletes => {
226-
parse_positional_delete_file(record_batch_stream).await
227-
}
228-
DataContentType::EqualityDeletes => {
229-
parse_equality_delete_file(record_batch_stream).await
199+
spawn(async move {
200+
#[allow(clippy::redundant_closure)] // clippy's recommendation fails to compile
201+
let result =
202+
futures::stream::iter(delete_file_entries.iter().map(|df| crate::Result::Ok(df)))
203+
.try_for_each_concurrent(concurrency_limit_data_files, |entry| {
204+
let file_io = file_io.clone();
205+
let mut tx = tx.clone();
206+
async move {
207+
let FileScanTaskDeleteFile {
208+
ref file_path,
209+
file_type,
210+
} = entry;
211+
212+
let record_batch_stream =
213+
Self::create_parquet_record_batch_stream_builder(
214+
file_path,
215+
file_io.clone(),
216+
false,
217+
)
218+
.await?
219+
.build()?
220+
.map(|item| match item {
221+
Ok(val) => Ok(val),
222+
Err(err) => {
223+
Err(Error::new(ErrorKind::DataInvalid, err.to_string())
224+
.with_source(err))
225+
}
226+
})
227+
.boxed();
228+
229+
let result = match file_type {
230+
DataContentType::PositionDeletes => {
231+
parse_positional_delete_file(record_batch_stream).await
232+
}
233+
DataContentType::EqualityDeletes => {
234+
parse_equality_delete_file(record_batch_stream).await
235+
}
236+
_ => Err(Error::new(
237+
ErrorKind::Unexpected,
238+
"Expected equality or positional delete",
239+
)),
240+
}?;
241+
242+
tx.send(Ok(result)).await?;
243+
Ok(())
230244
}
231-
_ => Err(Error::new(
232-
ErrorKind::Unexpected,
233-
"Expected equality or positional delete",
234-
)),
235-
}?;
236-
237-
tx.send(result).await?;
238-
Ok(())
239-
}
240-
})
241-
.await?;
245+
})
246+
.await;
247+
248+
if let Err(error) = result {
249+
let _ = channel_for_error.send(Err(error)).await;
250+
}
251+
});
242252

243-
Ok(Some(rx.collect::<Vec<_>>().await))
253+
let results = rx.try_collect::<Vec<_>>().await?;
254+
if results.is_empty() {
255+
Ok(None)
256+
} else {
257+
Ok(Some(results))
258+
}
244259
}
245260

246261
fn get_positional_delete_indexes(
@@ -299,7 +314,8 @@ impl ArrowReader {
299314
async move { Self::get_deletes(delete_file_tasks, file_io, 5).await }
300315
});
301316

302-
let should_load_page_index = row_selection_enabled && task.predicate.is_some();
317+
let should_load_page_index =
318+
(row_selection_enabled && task.predicate.is_some()) || task.deletes.is_some();
303319
let mut record_batch_stream_builder = Self::create_parquet_record_batch_stream_builder(
304320
&task.data_file_path,
305321
file_io,
@@ -663,7 +679,7 @@ impl ArrowReader {
663679
fn get_deleted_row_selection(
664680
parquet_metadata: &Arc<ParquetMetaData>,
665681
selected_row_groups: &Option<Vec<usize>>,
666-
positional_delete_indexes: &[usize],
682+
positional_deletes: &[usize],
667683
) -> Result<RowSelection> {
668684
let Some(offset_index) = parquet_metadata.offset_index() else {
669685
return Err(Error::new(
@@ -672,47 +688,77 @@ impl ArrowReader {
672688
));
673689
};
674690

675-
let /*mut*/ selected_row_groups_idx = 0;
691+
let mut selected_row_groups_idx = 0;
692+
let mut curr_pos_del_idx = 0;
693+
let pos_del_len = positional_deletes.len();
676694

677695
let page_index = offset_index
678696
.iter()
679697
.enumerate()
680698
.zip(parquet_metadata.row_groups());
681699

682-
let results: Vec<RowSelector> = Vec::new();
700+
let mut results: Vec<RowSelector> = Vec::new();
683701
let mut current_page_base_idx: usize = 0;
684702
for ((idx, _offset_index), row_group_metadata) in page_index {
685703
let page_num_rows = row_group_metadata.num_rows() as usize;
686-
let _next_page_base_idx = current_page_base_idx + page_num_rows;
704+
let next_page_base_idx = current_page_base_idx + page_num_rows;
687705

706+
// skip any row groups that aren't in the row group selection
688707
if let Some(selected_row_groups) = selected_row_groups {
689708
// skip row groups that aren't present in selected_row_groups
709+
if selected_row_groups_idx == selected_row_groups.len() {
710+
break;
711+
}
690712
if idx == selected_row_groups[selected_row_groups_idx] {
691-
// selected_row_groups_idx += 1;
713+
selected_row_groups_idx += 1;
692714
} else {
693715
current_page_base_idx += page_num_rows;
694716
continue;
695717
}
696718
}
697719

698-
let Some(_next_delete_row_idx) = positional_delete_indexes.last() else {
699-
break;
700-
};
720+
let mut next_deleted_row_idx = positional_deletes[curr_pos_del_idx];
721+
722+
// if the index of the next deleted row is beyond this page, skip
723+
// to the next page
724+
if next_deleted_row_idx >= next_page_base_idx {
725+
continue;
726+
}
727+
728+
let mut current_idx = current_page_base_idx;
729+
while next_deleted_row_idx < next_page_base_idx {
730+
// select all rows that precede the next delete index
731+
if current_idx < next_deleted_row_idx {
732+
let run_length = next_deleted_row_idx - current_idx;
733+
results.push(RowSelector::select(run_length));
734+
current_idx += run_length;
735+
}
736+
737+
// skip all consecutive deleted rows
738+
let mut run_length = 1;
739+
while curr_pos_del_idx < pos_del_len - 1
740+
&& positional_deletes[curr_pos_del_idx + 1] == next_deleted_row_idx + 1
741+
{
742+
run_length += 1;
743+
curr_pos_del_idx += 1;
744+
}
745+
results.push(RowSelector::skip(run_length));
746+
current_idx += run_length;
747+
748+
curr_pos_del_idx += 1;
749+
if curr_pos_del_idx >= pos_del_len {
750+
break;
751+
}
752+
next_deleted_row_idx = positional_deletes[curr_pos_del_idx];
753+
}
754+
755+
if let Some(selected_row_groups) = selected_row_groups {
756+
if selected_row_groups_idx == selected_row_groups.len() {
757+
break;
758+
}
759+
}
701760

702-
// TODO: logic goes here to create `RowSelection`s for the
703-
// current page, based on popping delete indices that are
704-
// on this page
705-
break;
706-
// while *next_delete_row_idx < next_page_base_idx {
707-
// }
708-
709-
// if let Some(selected_row_groups) = selected_row_groups {
710-
// if selected_row_groups_idx == selected_row_groups.len() {
711-
// break;
712-
// }
713-
// }
714-
//
715-
// current_page_base_idx += page_num_rows;
761+
current_page_base_idx += page_num_rows;
716762
}
717763

718764
Ok(results.into())

0 commit comments

Comments
 (0)