Skip to content

Commit decef64

Browse files
committed
feat: add positional delete parsing. Add tests for end-to-end positional delete functionality
1 parent 52cf8b9 commit decef64

File tree

3 files changed

+136
-20
lines changed

3 files changed

+136
-20
lines changed

crates/iceberg/src/arrow/delete_file_manager.rs

Lines changed: 87 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::pin::Pin;
2121
use std::sync::{Arc, OnceLock, RwLock};
2222
use std::task::{Context, Poll};
2323

24+
use arrow_array::{Int64Array, StringArray};
2425
use futures::channel::oneshot;
2526
use futures::future::join_all;
2627
use futures::{StreamExt, TryStreamExt};
@@ -169,7 +170,7 @@ impl CachingDeleteFileManager {
169170
/// vector maps that resulted from any positional delete or delete vector files into a
170171
/// single map and persist it in the state.
171172
///
172-
///
173+
///
173174
/// Conceptually, the data flow is like this:
174175
/// ```none
175176
/// FileScanTaskDeleteFile
@@ -393,14 +394,44 @@ impl CachingDeleteFileManager {
393394
///
394395
/// Returns a map of data file path to a delete vector
395396
async fn parse_positional_deletes_record_batch_stream(
396-
stream: ArrowRecordBatchStream,
397+
mut stream: ArrowRecordBatchStream,
397398
) -> Result<HashMap<String, DeleteVector>> {
398-
// TODO
399+
let mut result: HashMap<String, DeleteVector> = HashMap::default();
400+
401+
while let Some(batch) = stream.next().await {
402+
let batch = batch?;
403+
let schema = batch.schema();
404+
let columns = batch.columns();
405+
406+
let Some(file_paths) = columns[0].as_any().downcast_ref::<StringArray>() else {
407+
return Err(Error::new(
408+
ErrorKind::DataInvalid,
409+
"Could not downcast file paths array to StringArray",
410+
));
411+
};
412+
let Some(positions) = columns[1].as_any().downcast_ref::<Int64Array>() else {
413+
return Err(Error::new(
414+
ErrorKind::DataInvalid,
415+
"Could not downcast positions array to Int64Array",
416+
));
417+
};
399418

400-
Err(Error::new(
401-
ErrorKind::FeatureUnsupported,
402-
"parsing of positional deletes is not yet supported",
403-
))
419+
for (file_path, pos) in file_paths.iter().zip(positions.iter()) {
420+
let (Some(file_path), Some(pos)) = (file_path, pos) else {
421+
return Err(Error::new(
422+
ErrorKind::DataInvalid,
423+
"null values in delete file",
424+
));
425+
};
426+
427+
result
428+
.entry(file_path.to_string())
429+
.or_default()
430+
.insert(pos as u64);
431+
}
432+
}
433+
434+
Ok(result)
404435
}
405436

406437
/// Parses record batch streams from individual equality delete files
@@ -514,38 +545,74 @@ mod tests {
514545
.build()
515546
.unwrap();
516547

517-
// Note that with the delete file parsing not yet in place, all we can test here is that
518-
// the call to the loader fails with the expected FeatureUnsupportedError.
519548
let delete_file_manager = CachingDeleteFileManager::new(file_io.clone(), 10);
520549

521550
let file_scan_tasks = setup(table_location);
522551

523-
let result = delete_file_manager
552+
let _ = delete_file_manager
524553
.load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref())
525554
.await;
526555

527-
assert!(result.is_err_and(|e| e.kind() == ErrorKind::FeatureUnsupported));
556+
let result = delete_file_manager
557+
.get_delete_vector_for_task(&file_scan_tasks[0])
558+
.unwrap();
559+
assert_eq!(result.read().unwrap().len(), 3); // pos dels from pos del file 1 and 2
560+
561+
let result = delete_file_manager
562+
.get_delete_vector_for_task(&file_scan_tasks[1])
563+
.unwrap();
564+
assert_eq!(result.read().unwrap().len(), 3); // pos dels from pos del file 3
528565
}
529566

530567
fn setup(table_location: &Path) -> Vec<FileScanTask> {
531568
let data_file_schema = Arc::new(Schema::builder().build().unwrap());
532569
let positional_delete_schema = create_pos_del_schema();
533570

534-
let file_path_values = vec![format!("{}/1.parquet", table_location.to_str().unwrap()); 8];
535-
let pos_values = vec![0, 1, 3, 5, 6, 8, 1022, 1023];
536-
537-
let file_path_col = Arc::new(StringArray::from_iter_values(file_path_values));
538-
let pos_col = Arc::new(Int64Array::from_iter_values(pos_values));
571+
let mut file_path_values = vec![];
572+
let mut pos_values = vec![];
573+
574+
file_path_values.push(vec![
575+
format!(
576+
"{}/1.parquet",
577+
table_location.to_str().unwrap()
578+
);
579+
3
580+
]);
581+
pos_values.push(vec![0, 1, 3]);
582+
583+
file_path_values.push(vec![
584+
format!(
585+
"{}/1.parquet",
586+
table_location.to_str().unwrap()
587+
);
588+
3
589+
]);
590+
pos_values.push(vec![5, 6, 8]);
591+
592+
file_path_values.push(vec![
593+
format!(
594+
"{}/2.parquet",
595+
table_location.to_str().unwrap()
596+
);
597+
3
598+
]);
599+
pos_values.push(vec![1022, 1023, 1024]);
600+
// 9 rows in total pos deleted across 3 files
539601

540602
let props = WriterProperties::builder()
541603
.set_compression(Compression::SNAPPY)
542604
.build();
543605

544606
for n in 1..=3 {
607+
let file_path_col = Arc::new(StringArray::from_iter_values(
608+
file_path_values.pop().unwrap(),
609+
));
610+
let pos_col = Arc::new(Int64Array::from_iter_values(pos_values.pop().unwrap()));
611+
545612
let positional_deletes_to_write =
546613
RecordBatch::try_new(positional_delete_schema.clone(), vec![
547614
file_path_col.clone(),
548-
pos_col.clone(),
615+
pos_col,
549616
])
550617
.unwrap();
551618

@@ -596,7 +663,7 @@ mod tests {
596663
start: 0,
597664
length: 0,
598665
record_count: None,
599-
data_file_path: "".to_string(),
666+
data_file_path: format!("{}/1.parquet", table_location.to_str().unwrap()),
600667
data_file_content: DataContentType::Data,
601668
data_file_format: DataFileFormat::Parquet,
602669
schema: data_file_schema.clone(),
@@ -608,13 +675,13 @@ mod tests {
608675
start: 0,
609676
length: 0,
610677
record_count: None,
611-
data_file_path: "".to_string(),
678+
data_file_path: format!("{}/2.parquet", table_location.to_str().unwrap()),
612679
data_file_content: DataContentType::Data,
613680
data_file_format: DataFileFormat::Parquet,
614681
schema: data_file_schema.clone(),
615682
project_field_ids: vec![],
616683
predicate: None,
617-
deletes: vec![pos_del_2, pos_del_3],
684+
deletes: vec![pos_del_3],
618685
},
619686
];
620687

crates/iceberg/src/delete_vector.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,15 @@ impl DeleteVector {
4242
pub(crate) fn intersect_assign(&mut self, other: &DeleteVector) {
4343
self.inner.bitor_assign(&other.inner);
4444
}
45+
46+
pub fn insert(&mut self, pos: u64) -> bool {
47+
self.inner.insert(pos)
48+
}
49+
50+
#[allow(unused)]
51+
pub fn len(&self) -> u64 {
52+
self.inner.len()
53+
}
4554
}
4655

4756
// Ideally, we'd just wrap `roaring::RoaringTreemap`'s iterator, `roaring::treemap::Iter` here.

crates/integration_tests/tests/shared_tests/read_positional_deletes.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,43 @@ async fn test_read_table_with_positional_deletes() {
6969
// let num_rows: usize = batches.iter().map(|v| v.num_rows()).sum();
7070
// assert_eq!(num_rows, 10);
7171
}
72+
73+
#[tokio::test]
74+
async fn test_read_table_with_positional_deletes_with_delete_support_enabled() {
75+
let fixture = get_shared_containers();
76+
let rest_catalog = RestCatalog::new(fixture.catalog_config.clone());
77+
78+
let table = rest_catalog
79+
.load_table(
80+
&TableIdent::from_strs(["default", "test_positional_merge_on_read_double_deletes"])
81+
.unwrap(),
82+
)
83+
.await
84+
.unwrap();
85+
86+
let scan = table
87+
.scan()
88+
.with_delete_file_processing_enabled(true)
89+
.build()
90+
.unwrap();
91+
println!("{:?}", scan);
92+
93+
let plan: Vec<_> = scan
94+
.plan_files()
95+
.await
96+
.unwrap()
97+
.try_collect()
98+
.await
99+
.unwrap();
100+
println!("{:?}", plan);
101+
102+
// Scan plan phase should include delete files in file plan
103+
// when with_delete_file_processing_enabled == true
104+
assert_eq!(plan[0].deletes.len(), 2);
105+
106+
// we should see two rows deleted, returning 10 rows instead of 12
107+
let batch_stream = scan.to_arrow().await.unwrap();
108+
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
109+
let num_rows: usize = batches.iter().map(|v| v.num_rows()).sum();
110+
assert_eq!(num_rows, 10);
111+
}

0 commit comments

Comments
 (0)