Skip to content

Commit eb0e93f

Browse files
committed
feat: implementation for ArrowReader::build_deletes_row_selection
1 parent 84002f6 commit eb0e93f

File tree

1 file changed

+245
-5
lines changed

1 file changed

+245
-5
lines changed

crates/iceberg/src/arrow/reader.rs

Lines changed: 245 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ use bytes::Bytes;
3333
use fnv::FnvHashSet;
3434
use futures::future::BoxFuture;
3535
use futures::{try_join, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
36-
use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection};
36+
use parquet::arrow::arrow_reader::{
37+
ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector,
38+
};
3739
use parquet::arrow::async_reader::AsyncFileReader;
3840
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY};
3941
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData};
@@ -367,15 +369,104 @@ impl ArrowReader {
367369
/// Using the Parquet page index, we build a `RowSelection` that rejects rows that are indicated
368370
/// as having been deleted by a positional delete, taking into account any row groups that have
369371
/// been skipped entirely by the filter predicate
370-
#[allow(unused)]
371372
fn build_deletes_row_selection(
372373
row_group_metadata: &[RowGroupMetaData],
373374
selected_row_groups: &Option<Vec<usize>>,
374375
mut positional_deletes: &DeleteVector,
375376
) -> Result<RowSelection> {
376-
// TODO
377+
let mut results: Vec<RowSelector> = Vec::new();
378+
let mut selected_row_groups_idx = 0;
379+
let mut current_page_base_idx: u64 = 0;
380+
381+
for (idx, row_group_metadata) in row_group_metadata.iter().enumerate() {
382+
let page_num_rows = row_group_metadata.num_rows() as u64;
383+
let next_page_base_idx = current_page_base_idx + page_num_rows;
384+
385+
// if row group selection is enabled,
386+
if let Some(selected_row_groups) = selected_row_groups {
387+
// if we've consumed all the selected row groups, we're done
388+
if selected_row_groups_idx == selected_row_groups.len() {
389+
break;
390+
}
391+
392+
if idx == selected_row_groups[selected_row_groups_idx] {
393+
// we're in a selected row group. Increment selected_row_groups_idx
394+
// so that next time around the for loop we're looking for the next
395+
// selected row group
396+
selected_row_groups_idx += 1;
397+
} else {
398+
// remove any positional deletes from the skipped page so that
399+
// `positional.deletes.min()` can be used
400+
positional_deletes.remove_range(current_page_base_idx..next_page_base_idx);
401+
402+
// still increment the current page base index but then skip to the next row group
403+
// in the file
404+
current_page_base_idx += page_num_rows;
405+
continue;
406+
}
407+
}
408+
409+
let mut next_deleted_row_idx = match positional_deletes.min() {
410+
Some(next_deleted_row_idx) => {
411+
// if the index of the next deleted row is beyond this page, add a selection for
412+
// the remainder of this page and skip to the next page
413+
if next_deleted_row_idx >= next_page_base_idx {
414+
results.push(RowSelector::select(page_num_rows as usize));
415+
continue;
416+
}
417+
418+
next_deleted_row_idx
419+
}
420+
421+
// If there are no more pos deletes, add a selector for the entirety of this page.
422+
_ => {
423+
results.push(RowSelector::select(page_num_rows as usize));
424+
continue;
425+
}
426+
};
427+
428+
let mut current_idx = current_page_base_idx;
429+
'chunks: while next_deleted_row_idx < next_page_base_idx {
430+
// `select` all rows that precede the next delete index
431+
if current_idx < next_deleted_row_idx {
432+
let run_length = next_deleted_row_idx - current_idx;
433+
results.push(RowSelector::select(run_length as usize));
434+
current_idx += run_length;
435+
}
436+
437+
// `skip` all consecutive deleted rows in the current row group
438+
let mut run_length = 0;
439+
while next_deleted_row_idx == current_idx
440+
&& next_deleted_row_idx < next_page_base_idx
441+
{
442+
run_length += 1;
443+
current_idx += 1;
444+
positional_deletes.remove(next_deleted_row_idx);
445+
446+
next_deleted_row_idx = match positional_deletes.min() {
447+
Some(next_deleted_row_idx) => next_deleted_row_idx,
448+
_ => {
449+
// We've processed the final positional delete.
450+
// Conclude the skip and then break so that we select the remaining
451+
// rows in the page and move on to the next row group
452+
results.push(RowSelector::skip(run_length));
453+
break 'chunks;
454+
}
455+
};
456+
}
457+
results.push(RowSelector::skip(run_length));
458+
}
459+
460+
if current_idx < next_page_base_idx {
461+
results.push(RowSelector::select(
462+
(next_page_base_idx - current_idx) as usize,
463+
));
464+
}
465+
466+
current_page_base_idx += page_num_rows;
467+
}
377468

378-
Ok(RowSelection::default())
469+
Ok(results.into())
379470
}
380471

381472
fn build_field_id_set_and_map(
@@ -1312,9 +1403,12 @@ mod tests {
13121403
use parquet::arrow::{ArrowWriter, ProjectionMask};
13131404
use parquet::basic::Compression;
13141405
use parquet::file::properties::WriterProperties;
1406+
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
1407+
use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
13151408
use parquet::schema::parser::parse_message_type;
1316-
use parquet::schema::types::SchemaDescriptor;
13171409
use tempfile::TempDir;
1410+
use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
1411+
use roaring::RoaringTreemap;
13181412

13191413
use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY};
13201414
use crate::arrow::{ArrowReader, ArrowReaderBuilder};
@@ -1618,4 +1712,150 @@ message schema {
16181712

16191713
(file_io, schema, table_location, tmp_dir)
16201714
}
1715+
1716+
#[test]
1717+
fn test_build_deletes_row_selection() {
1718+
let schema_descr = get_test_schema_descr();
1719+
1720+
let mut columns = vec![];
1721+
for ptr in schema_descr.columns() {
1722+
let column = ColumnChunkMetaData::builder(ptr.clone()).build().unwrap();
1723+
columns.push(column);
1724+
}
1725+
1726+
let row_groups_metadata = vec![
1727+
build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 0),
1728+
build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 1),
1729+
build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 2),
1730+
build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 3),
1731+
build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 4),
1732+
];
1733+
1734+
let selected_row_groups = Some(vec![1, 3]);
1735+
1736+
/* cases to cover:
1737+
* {skip|select} {first|intermediate|last} {one row|multiple rows} in
1738+
{first|imtermediate|last} {skipped|selected} row group
1739+
* row group selection disabled
1740+
*/
1741+
1742+
let positional_deletes = RoaringTreemap::from_iter(&[
1743+
1, // in skipped rg 0, should be ignored
1744+
3, // run of three consecutive items in skipped rg0
1745+
4, 5, 998, // two consecutive items at end of skipped rg0
1746+
999, 1000, // solitary row at start of selected rg1 (1, 9)
1747+
1010, // run of 3 rows in selected rg1
1748+
1011, 1012, // (3, 485)
1749+
1498, // run of two items at end of selected rg1
1750+
1499, 1500, // run of two items at start of skipped rg2
1751+
1501, 1600, // should ignore, in skipped rg2
1752+
1999, // single row at end of skipped rg2
1753+
2000, // run of two items at start of selected rg3
1754+
2001, // (4, 98)
1755+
2100, // single row in selected row group 3 (1, 99)
1756+
2200, // run of 3 consecutive rows in selected row group 3
1757+
2201, 2202, // (3, 796)
1758+
2999, // single item at end of selected rg3 (1)
1759+
3000, // single item at start of skipped rg4
1760+
]);
1761+
1762+
let positional_deletes = DeleteVector {
1763+
inner: positional_deletes
1764+
};
1765+
1766+
// using selected row groups 1 and 3
1767+
let result = ArrowReader::build_deletes_row_selection(
1768+
&row_groups_metadata,
1769+
&selected_row_groups,
1770+
positional_deletes.clone(),
1771+
)
1772+
.unwrap();
1773+
1774+
let expected = RowSelection::from(vec![
1775+
RowSelector::skip(1),
1776+
RowSelector::select(9),
1777+
RowSelector::skip(3),
1778+
RowSelector::select(485),
1779+
RowSelector::skip(4),
1780+
RowSelector::select(98),
1781+
RowSelector::skip(1),
1782+
RowSelector::select(99),
1783+
RowSelector::skip(3),
1784+
RowSelector::select(796),
1785+
RowSelector::skip(1),
1786+
]);
1787+
1788+
assert_eq!(result, expected);
1789+
1790+
// selecting all row groups
1791+
let result = ArrowReader::build_deletes_row_selection(
1792+
&row_groups_metadata,
1793+
&None,
1794+
positional_deletes,
1795+
)
1796+
.unwrap();
1797+
1798+
let expected = RowSelection::from(vec![
1799+
RowSelector::select(1),
1800+
RowSelector::skip(1),
1801+
RowSelector::select(1),
1802+
RowSelector::skip(3),
1803+
RowSelector::select(992),
1804+
RowSelector::skip(3),
1805+
RowSelector::select(9),
1806+
RowSelector::skip(3),
1807+
RowSelector::select(485),
1808+
RowSelector::skip(4),
1809+
RowSelector::select(98),
1810+
RowSelector::skip(1),
1811+
RowSelector::select(398),
1812+
RowSelector::skip(3),
1813+
RowSelector::select(98),
1814+
RowSelector::skip(1),
1815+
RowSelector::select(99),
1816+
RowSelector::skip(3),
1817+
RowSelector::select(796),
1818+
RowSelector::skip(2),
1819+
RowSelector::select(499),
1820+
]);
1821+
1822+
assert_eq!(result, expected);
1823+
}
1824+
1825+
fn build_test_row_group_meta(
1826+
schema_descr: SchemaDescPtr,
1827+
columns: Vec<ColumnChunkMetaData>,
1828+
num_rows: i64,
1829+
ordinal: i16,
1830+
) -> RowGroupMetaData {
1831+
RowGroupMetaData::builder(schema_descr.clone())
1832+
.set_num_rows(num_rows)
1833+
.set_total_byte_size(2000)
1834+
.set_column_metadata(columns)
1835+
.set_ordinal(ordinal)
1836+
.build()
1837+
.unwrap()
1838+
}
1839+
1840+
fn get_test_schema_descr() -> SchemaDescPtr {
1841+
use parquet::schema::types::Type as SchemaType;
1842+
1843+
let schema = SchemaType::group_type_builder("schema")
1844+
.with_fields(vec![
1845+
Arc::new(
1846+
SchemaType::primitive_type_builder("a", parquet::basic::Type::INT32)
1847+
.build()
1848+
.unwrap(),
1849+
),
1850+
Arc::new(
1851+
SchemaType::primitive_type_builder("b", parquet::basic::Type::INT32)
1852+
.build()
1853+
.unwrap(),
1854+
),
1855+
])
1856+
.build()
1857+
.unwrap();
1858+
1859+
Arc::new(SchemaDescriptor::new(Arc::new(schema)))
1860+
}
16211861
}

0 commit comments

Comments
 (0)