Skip to content

Commit 6a5a421

Browse files
committed
feat: add positional delete parsing. Add tests for end-to-end positional delete functionality
1 parent ef827f1 commit 6a5a421

File tree

2 files changed

+125
-12
lines changed

2 files changed

+125
-12
lines changed

crates/iceberg/src/arrow/delete_file_manager.rs

Lines changed: 85 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -328,11 +328,44 @@ impl DeleteFileManager {
328328
///
329329
/// Returns a map of data file path to a delete vector
330330
async fn parse_positional_deletes_record_batch_stream(
331-
stream: ArrowRecordBatchStream,
331+
mut stream: ArrowRecordBatchStream,
332332
) -> Result<HashMap<String, RoaringTreemap>> {
333-
// TODO
333+
let mut result: HashMap<String, RoaringTreemap> = HashMap::default();
334+
335+
while let Some(batch) = stream.next().await {
336+
let batch = batch?;
337+
let schema = batch.schema();
338+
let columns = batch.columns();
339+
340+
let Some(file_paths) = columns[0].as_any().downcast_ref::<StringArray>() else {
341+
return Err(Error::new(
342+
ErrorKind::DataInvalid,
343+
"Could not downcast file paths array to StringArray",
344+
));
345+
};
346+
let Some(positions) = columns[1].as_any().downcast_ref::<Int64Array>() else {
347+
return Err(Error::new(
348+
ErrorKind::DataInvalid,
349+
"Could not downcast positions array to Int64Array",
350+
));
351+
};
352+
353+
for (file_path, pos) in file_paths.iter().zip(positions.iter()) {
354+
let (Some(file_path), Some(pos)) = (file_path, pos) else {
355+
return Err(Error::new(
356+
ErrorKind::DataInvalid,
357+
"null values in delete file",
358+
));
359+
};
360+
361+
result
362+
.entry(file_path.to_string())
363+
.or_default()
364+
.insert(pos as u64);
365+
}
366+
}
334367

335-
Ok(HashMap::default())
368+
Ok(result)
336369
}
337370

338371
/// Parses record batch streams from individual equality delete files
@@ -452,27 +485,67 @@ mod tests {
452485
.load_deletes(&file_scan_tasks[0].deletes, file_io, 5)
453486
.await
454487
.unwrap();
488+
489+
let result = delete_file_manager
490+
.get_delete_vector_for_task(&file_scan_tasks[0])
491+
.unwrap();
492+
assert_eq!(result.len(), 3); // pos dels from pos del file 1 and 2
493+
494+
let result = delete_file_manager
495+
.get_delete_vector_for_task(&file_scan_tasks[1])
496+
.unwrap();
497+
assert_eq!(result.len(), 3); // pos dels from pos del file 3
455498
}
456499

457500
fn setup(table_location: &Path) -> Vec<FileScanTask> {
458501
let data_file_schema = Arc::new(Schema::builder().build().unwrap());
459502
let positional_delete_schema = create_pos_del_schema();
460503

461-
let file_path_values = vec![format!("{}/1.parquet", table_location.to_str().unwrap()); 8];
462-
let pos_values = vec![0, 1, 3, 5, 6, 8, 1022, 1023];
463-
464-
let file_path_col = Arc::new(StringArray::from_iter_values(file_path_values));
465-
let pos_col = Arc::new(Int64Array::from_iter_values(pos_values));
504+
let mut file_path_values = vec![];
505+
let mut pos_values = vec![];
506+
507+
file_path_values.push(vec![
508+
format!(
509+
"{}/1.parquet",
510+
table_location.to_str().unwrap()
511+
);
512+
3
513+
]);
514+
pos_values.push(vec![0, 1, 3]);
515+
516+
file_path_values.push(vec![
517+
format!(
518+
"{}/1.parquet",
519+
table_location.to_str().unwrap()
520+
);
521+
3
522+
]);
523+
pos_values.push(vec![5, 6, 8]);
524+
525+
file_path_values.push(vec![
526+
format!(
527+
"{}/2.parquet",
528+
table_location.to_str().unwrap()
529+
);
530+
3
531+
]);
532+
pos_values.push(vec![1022, 1023, 1024]);
533+
// 9 rows in total pos deleted across 3 files
466534

467535
let props = WriterProperties::builder()
468536
.set_compression(Compression::SNAPPY)
469537
.build();
470538

471539
for n in 1..=3 {
540+
let file_path_col = Arc::new(StringArray::from_iter_values(
541+
file_path_values.pop().unwrap(),
542+
));
543+
let pos_col = Arc::new(Int64Array::from_iter_values(pos_values.pop().unwrap()));
544+
472545
let positional_deletes_to_write =
473546
RecordBatch::try_new(positional_delete_schema.clone(), vec![
474547
file_path_col.clone(),
475-
pos_col.clone(),
548+
pos_col,
476549
])
477550
.unwrap();
478551

@@ -520,7 +593,7 @@ mod tests {
520593
start: 0,
521594
length: 0,
522595
record_count: None,
523-
data_file_path: "".to_string(),
596+
data_file_path: format!("{}/1.parquet", table_location.to_str().unwrap()),
524597
data_file_content: DataContentType::Data,
525598
data_file_format: DataFileFormat::Parquet,
526599
schema: data_file_schema.clone(),
@@ -532,13 +605,13 @@ mod tests {
532605
start: 0,
533606
length: 0,
534607
record_count: None,
535-
data_file_path: "".to_string(),
608+
data_file_path: format!("{}/2.parquet", table_location.to_str().unwrap()),
536609
data_file_content: DataContentType::Data,
537610
data_file_format: DataFileFormat::Parquet,
538611
schema: data_file_schema.clone(),
539612
project_field_ids: vec![],
540613
predicate: None,
541-
deletes: vec![pos_del_2, pos_del_3],
614+
deletes: vec![pos_del_3],
542615
},
543616
];
544617

crates/integration_tests/tests/shared_tests/read_positional_deletes.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,43 @@ async fn test_read_table_with_positional_deletes() {
6969
// let num_rows: usize = batches.iter().map(|v| v.num_rows()).sum();
7070
// assert_eq!(num_rows, 10);
7171
}
72+
73+
#[tokio::test]
74+
async fn test_read_table_with_positional_deletes_with_delete_support_enabled() {
75+
let fixture = get_shared_containers();
76+
let rest_catalog = RestCatalog::new(fixture.catalog_config.clone());
77+
78+
let table = rest_catalog
79+
.load_table(
80+
&TableIdent::from_strs(["default", "test_positional_merge_on_read_double_deletes"])
81+
.unwrap(),
82+
)
83+
.await
84+
.unwrap();
85+
86+
let scan = table
87+
.scan()
88+
.with_delete_file_processing_enabled(true)
89+
.build()
90+
.unwrap();
91+
println!("{:?}", scan);
92+
93+
let plan: Vec<_> = scan
94+
.plan_files()
95+
.await
96+
.unwrap()
97+
.try_collect()
98+
.await
99+
.unwrap();
100+
println!("{:?}", plan);
101+
102+
// Scan plan phase should include delete files in file plan
103+
// when with_delete_file_processing_enabled == true
104+
assert_eq!(plan[0].deletes.len(), 2);
105+
106+
// we should see two rows deleted, returning 10 rows instead of 12
107+
let batch_stream = scan.to_arrow().await.unwrap();
108+
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
109+
let num_rows: usize = batches.iter().map(|v| v.num_rows()).sum();
110+
assert_eq!(num_rows, 10);
111+
}

0 commit comments

Comments
 (0)