Skip to content

Commit 401f168

Browse files
committed
refactor: delete row selection uses roaring treemap iterator and advance_to
1 parent eb0e93f commit 401f168

File tree

4 files changed

+94
-78
lines changed

4 files changed

+94
-78
lines changed

Cargo.lock

Lines changed: 1 addition & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ port_scanner = "0.1.5"
8383
rand = "0.8.5"
8484
regex = "1.10.5"
8585
reqwest = { version = "0.12.2", default-features = false, features = ["json"] }
86-
roaring = "0.10"
86+
roaring = { version = "0.10", git = "https://github.com/RoaringBitmap/roaring-rs.git" }
8787
rust_decimal = "1.31"
8888
serde = { version = "1.0.204", features = ["rc"] }
8989
serde_bytes = "0.11.15"

crates/iceberg/src/arrow/reader.rs

Lines changed: 35 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -370,17 +370,19 @@ impl ArrowReader {
370370
/// as having been deleted by a positional delete, taking into account any row groups that have
371371
/// been skipped entirely by the filter predicate
372372
fn build_deletes_row_selection(
373-
row_group_metadata: &[RowGroupMetaData],
373+
row_group_metadata_list: &[RowGroupMetaData],
374374
selected_row_groups: &Option<Vec<usize>>,
375-
mut positional_deletes: &DeleteVector,
375+
positional_deletes: &DeleteVector,
376376
) -> Result<RowSelection> {
377377
let mut results: Vec<RowSelector> = Vec::new();
378378
let mut selected_row_groups_idx = 0;
379-
let mut current_page_base_idx: u64 = 0;
379+
let mut current_row_group_base_idx: u64 = 0;
380+
let mut delete_vector_iter = positional_deletes.iter();
381+
let mut next_deleted_row_idx_opt = delete_vector_iter.next();
380382

381-
for (idx, row_group_metadata) in row_group_metadata.iter().enumerate() {
382-
let page_num_rows = row_group_metadata.num_rows() as u64;
383-
let next_page_base_idx = current_page_base_idx + page_num_rows;
383+
for (idx, row_group_metadata) in row_group_metadata_list.iter().enumerate() {
384+
let row_group_num_rows = row_group_metadata.num_rows() as u64;
385+
let next_row_group_base_idx = current_row_group_base_idx + row_group_num_rows;
384386

385387
// if row group selection is enabled,
386388
if let Some(selected_row_groups) = selected_row_groups {
@@ -397,36 +399,37 @@ impl ArrowReader {
397399
} else {
398400
// remove any positional deletes from the skipped page so that
399401
// `positional.deletes.min()` can be used
400-
positional_deletes.remove_range(current_page_base_idx..next_page_base_idx);
402+
delete_vector_iter.advance_to(next_row_group_base_idx);
403+
next_deleted_row_idx_opt = delete_vector_iter.next();
401404

402405
// still increment the current page base index but then skip to the next row group
403406
// in the file
404-
current_page_base_idx += page_num_rows;
407+
current_row_group_base_idx += row_group_num_rows;
405408
continue;
406409
}
407410
}
408411

409-
let mut next_deleted_row_idx = match positional_deletes.min() {
412+
let mut next_deleted_row_idx = match next_deleted_row_idx_opt {
410413
Some(next_deleted_row_idx) => {
411-
// if the index of the next deleted row is beyond this page, add a selection for
412-
// the remainder of this page and skip to the next page
413-
if next_deleted_row_idx >= next_page_base_idx {
414-
results.push(RowSelector::select(page_num_rows as usize));
414+
// if the index of the next deleted row is beyond this row group, add a selection for
415+
// the remainder of this row group and skip to the next row group
416+
if next_deleted_row_idx >= next_row_group_base_idx {
417+
results.push(RowSelector::select(row_group_num_rows as usize));
415418
continue;
416419
}
417420

418421
next_deleted_row_idx
419422
}
420423

421-
// If there are no more pos deletes, add a selector for the entirety of this page.
424+
// If there are no more pos deletes, add a selector for the entirety of this row group.
422425
_ => {
423-
results.push(RowSelector::select(page_num_rows as usize));
426+
results.push(RowSelector::select(row_group_num_rows as usize));
424427
continue;
425428
}
426429
};
427430

428-
let mut current_idx = current_page_base_idx;
429-
'chunks: while next_deleted_row_idx < next_page_base_idx {
431+
let mut current_idx = current_row_group_base_idx;
432+
'chunks: while next_deleted_row_idx < next_row_group_base_idx {
430433
// `select` all rows that precede the next delete index
431434
if current_idx < next_deleted_row_idx {
432435
let run_length = next_deleted_row_idx - current_idx;
@@ -437,18 +440,18 @@ impl ArrowReader {
437440
// `skip` all consecutive deleted rows in the current row group
438441
let mut run_length = 0;
439442
while next_deleted_row_idx == current_idx
440-
&& next_deleted_row_idx < next_page_base_idx
443+
&& next_deleted_row_idx < next_row_group_base_idx
441444
{
442445
run_length += 1;
443446
current_idx += 1;
444-
positional_deletes.remove(next_deleted_row_idx);
445447

446-
next_deleted_row_idx = match positional_deletes.min() {
448+
next_deleted_row_idx_opt = delete_vector_iter.next();
449+
next_deleted_row_idx = match next_deleted_row_idx_opt {
447450
Some(next_deleted_row_idx) => next_deleted_row_idx,
448451
_ => {
449452
// We've processed the final positional delete.
450453
// Conclude the skip and then break so that we select the remaining
451-
// rows in the page and move on to the next row group
454+
// rows in the row group and move on to the next row group
452455
results.push(RowSelector::skip(run_length));
453456
break 'chunks;
454457
}
@@ -457,13 +460,13 @@ impl ArrowReader {
457460
results.push(RowSelector::skip(run_length));
458461
}
459462

460-
if current_idx < next_page_base_idx {
463+
if current_idx < next_row_group_base_idx {
461464
results.push(RowSelector::select(
462-
(next_page_base_idx - current_idx) as usize,
465+
(next_row_group_base_idx - current_idx) as usize,
463466
));
464467
}
465468

466-
current_page_base_idx += page_num_rows;
469+
current_row_group_base_idx += row_group_num_rows;
467470
}
468471

469472
Ok(results.into())
@@ -1400,18 +1403,19 @@ mod tests {
14001403
use arrow_array::{ArrayRef, RecordBatch, StringArray};
14011404
use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
14021405
use futures::TryStreamExt;
1406+
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
14031407
use parquet::arrow::{ArrowWriter, ProjectionMask};
14041408
use parquet::basic::Compression;
1405-
use parquet::file::properties::WriterProperties;
1406-
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
14071409
use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
1410+
use parquet::file::properties::WriterProperties;
14081411
use parquet::schema::parser::parse_message_type;
1409-
use tempfile::TempDir;
14101412
use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
14111413
use roaring::RoaringTreemap;
1414+
use tempfile::TempDir;
14121415

14131416
use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY};
14141417
use crate::arrow::{ArrowReader, ArrowReaderBuilder};
1418+
use crate::delete_vector::DeleteVector;
14151419
use crate::expr::visitors::bound_predicate_visitor::visit;
14161420
use crate::expr::{Bind, Predicate, Reference};
14171421
use crate::io::FileIO;
@@ -1758,16 +1762,14 @@ message schema {
17581762
2999, // single item at end of selected rg3 (1)
17591763
3000, // single item at start of skipped rg4
17601764
]);
1761-
1762-
let positional_deletes = DeleteVector {
1763-
inner: positional_deletes
1764-
};
1765+
1766+
let positional_deletes = DeleteVector::new(positional_deletes);
17651767

17661768
// using selected row groups 1 and 3
17671769
let result = ArrowReader::build_deletes_row_selection(
17681770
&row_groups_metadata,
17691771
&selected_row_groups,
1770-
positional_deletes.clone(),
1772+
&positional_deletes,
17711773
)
17721774
.unwrap();
17731775

@@ -1791,7 +1793,7 @@ message schema {
17911793
let result = ArrowReader::build_deletes_row_selection(
17921794
&row_groups_metadata,
17931795
&None,
1794-
positional_deletes,
1796+
&positional_deletes,
17951797
)
17961798
.unwrap();
17971799

crates/iceberg/src/delete_vector.rs

Lines changed: 57 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
use std::ops::BitOrAssign;
1919

20+
use roaring::bitmap::Iter;
21+
use roaring::treemap::BitmapIter;
2022
use roaring::RoaringTreemap;
2123

2224
#[derive(Debug, Default)]
@@ -25,22 +27,18 @@ pub struct DeleteVector {
2527
}
2628

2729
impl DeleteVector {
28-
pub fn iter(&self) -> DeleteVectorIterator {
29-
let mut iter = self.inner.bitmaps();
30-
match iter.next() {
31-
Some((high_bits, bitmap)) => {
32-
DeleteVectorIterator {
33-
inner: Some(DeleteVectorIteratorInner {
34-
// iter,
35-
high_bits: (high_bits as u64) << 32,
36-
bitmap_iter: bitmap.iter(),
37-
}),
38-
}
39-
}
40-
_ => DeleteVectorIterator { inner: None },
30+
#[allow(unused)]
31+
pub(crate) fn new(roaring_treemap: RoaringTreemap) -> DeleteVector {
32+
DeleteVector {
33+
inner: roaring_treemap,
4134
}
4235
}
4336

37+
pub fn iter(&self) -> DeleteVectorIterator {
38+
let outer = self.inner.bitmaps();
39+
DeleteVectorIterator { outer, inner: None }
40+
}
41+
4442
pub fn intersect_assign(&mut self, other: &DeleteVector) {
4543
self.inner.bitor_assign(&other.inner);
4644
}
@@ -54,51 +52,68 @@ impl DeleteVector {
5452
}
5553
}
5654

55+
// Ideally, we'd just wrap `roaring::RoaringTreemap`'s iterator, `roaring::treemap::Iter` here.
56+
// But right now, it does not have a corresponding implementation of `roaring::bitmap::Iter::advance_to`,
57+
// which is very handy in ArrowReader::build_deletes_row_selection.
58+
// There is a PR open on roaring to add this (https://github.com/RoaringBitmap/roaring-rs/pull/314)
59+
// and if that gets merged then we can simplify `DeleteVectorIterator` here, refactoring `advance_to`
60+
// to just a wrapper around the underlying iterator's method.
5761
pub struct DeleteVectorIterator<'a> {
62+
// NB: `BitMapIter` was only exposed publicly in https://github.com/RoaringBitmap/roaring-rs/pull/316
63+
// which is not yet released. As a consequence our Cargo.toml temporarily uses a git reference for
64+
// the roaring dependency.
65+
outer: BitmapIter<'a>,
5866
inner: Option<DeleteVectorIteratorInner<'a>>,
5967
}
6068

6169
struct DeleteVectorIteratorInner<'a> {
62-
// TODO: roaring::treemap::iter::BitmapIter is currently private.
63-
// See https://github.com/RoaringBitmap/roaring-rs/issues/312
64-
// iter: roaring::treemap::iter::BitmapIter<'a>,
65-
high_bits: u64,
66-
bitmap_iter: roaring::bitmap::Iter<'a>,
70+
high_bits: u32,
71+
bitmap_iter: Iter<'a>,
6772
}
6873

6974
impl Iterator for DeleteVectorIterator<'_> {
7075
type Item = u64;
7176

7277
fn next(&mut self) -> Option<Self::Item> {
73-
let Some(ref mut inner) = &mut self.inner else {
74-
return None;
75-
};
78+
if let Some(ref mut inner) = &mut self.inner {
79+
if let Some(inner_next) = inner.bitmap_iter.next() {
80+
return Some(u64::from(inner.high_bits) << 32 | u64::from(inner_next));
81+
}
82+
}
7683

77-
if let Some(lower) = inner.bitmap_iter.next() {
78-
return Some(inner.high_bits & lower as u64);
79-
};
84+
if let Some((high_bits, next_bitmap)) = self.outer.next() {
85+
self.inner = Some(DeleteVectorIteratorInner {
86+
high_bits,
87+
bitmap_iter: next_bitmap.iter(),
88+
})
89+
} else {
90+
return None;
91+
}
8092

81-
// TODO: roaring::treemap::iter::BitmapIter is currently private.
82-
// See https://github.com/RoaringBitmap/roaring-rs/issues/312
83-
84-
// replace with commented-out code below once BitmapIter is pub,
85-
// or use RoaringTreemap::iter if `advance_to` gets implemented natively
86-
None
87-
88-
// let Some((high_bits, bitmap)) = inner.iter.next() else {
89-
// self.inner = None;
90-
// return None;
91-
// };
92-
//
93-
// inner.high_bits = (high_bits as u64) << 32;
94-
// inner.bitmap_iter = bitmap.iter();
95-
//
96-
// self.next()
93+
self.next()
9794
}
9895
}
9996

10097
impl<'a> DeleteVectorIterator<'a> {
101-
pub fn advance_to(&'a mut self, _pos: u64) {
102-
// TODO
98+
pub fn advance_to(&mut self, pos: u64) {
99+
let hi = (pos >> 32) as u32;
100+
let lo = pos as u32;
101+
102+
let Some(ref mut inner) = self.inner else {
103+
return;
104+
};
105+
106+
while inner.high_bits < hi {
107+
let Some((next_hi, next_bitmap)) = self.outer.next() else {
108+
return;
109+
};
110+
111+
*inner = DeleteVectorIteratorInner {
112+
high_bits: next_hi,
113+
bitmap_iter: next_bitmap.iter(),
114+
}
115+
}
116+
117+
inner.bitmap_iter.advance_to(lo);
103118
}
104119
}

0 commit comments

Comments
 (0)