Skip to content

Commit dfe7d92

Browse files
committed
feat: add positional delete parsing. Add tests for end-to-end positional delete functionality
1 parent db47085 commit dfe7d92

File tree

2 files changed

+125
-12
lines changed

2 files changed

+125
-12
lines changed

crates/iceberg/src/arrow/delete_file_manager.rs

Lines changed: 85 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -329,11 +329,44 @@ impl DeleteFileManager {
329329
///
330330
/// Returns a map of data file path to a delete vector
331331
async fn parse_positional_deletes_record_batch_stream(
332-
stream: ArrowRecordBatchStream,
332+
mut stream: ArrowRecordBatchStream,
333333
) -> Result<HashMap<String, RoaringTreemap>> {
334-
// TODO
334+
let mut result: HashMap<String, RoaringTreemap> = HashMap::default();
335+
336+
while let Some(batch) = stream.next().await {
337+
let batch = batch?;
338+
let schema = batch.schema();
339+
let columns = batch.columns();
340+
341+
let Some(file_paths) = columns[0].as_any().downcast_ref::<StringArray>() else {
342+
return Err(Error::new(
343+
ErrorKind::DataInvalid,
344+
"Could not downcast file paths array to StringArray",
345+
));
346+
};
347+
let Some(positions) = columns[1].as_any().downcast_ref::<Int64Array>() else {
348+
return Err(Error::new(
349+
ErrorKind::DataInvalid,
350+
"Could not downcast positions array to Int64Array",
351+
));
352+
};
353+
354+
for (file_path, pos) in file_paths.iter().zip(positions.iter()) {
355+
let (Some(file_path), Some(pos)) = (file_path, pos) else {
356+
return Err(Error::new(
357+
ErrorKind::DataInvalid,
358+
"null values in delete file",
359+
));
360+
};
361+
362+
result
363+
.entry(file_path.to_string())
364+
.or_default()
365+
.insert(pos as u64);
366+
}
367+
}
335368

336-
Ok(HashMap::default())
369+
Ok(result)
337370
}
338371

339372
/// Parses record batch streams from individual equality delete files
@@ -453,27 +486,67 @@ mod tests {
453486
.load_deletes(&file_scan_tasks[0].deletes, file_io, 5)
454487
.await
455488
.unwrap();
489+
490+
let result = delete_file_manager
491+
.get_delete_vector_for_task(&file_scan_tasks[0])
492+
.unwrap();
493+
assert_eq!(result.len(), 3); // pos dels from pos del file 1 and 2
494+
495+
let result = delete_file_manager
496+
.get_delete_vector_for_task(&file_scan_tasks[1])
497+
.unwrap();
498+
assert_eq!(result.len(), 3); // pos dels from pos del file 3
456499
}
457500

458501
fn setup(table_location: &Path) -> Vec<FileScanTask> {
459502
let data_file_schema = Arc::new(Schema::builder().build().unwrap());
460503
let positional_delete_schema = create_pos_del_schema();
461504

462-
let file_path_values = vec![format!("{}/1.parquet", table_location.to_str().unwrap()); 8];
463-
let pos_values = vec![0, 1, 3, 5, 6, 8, 1022, 1023];
464-
465-
let file_path_col = Arc::new(StringArray::from_iter_values(file_path_values));
466-
let pos_col = Arc::new(Int64Array::from_iter_values(pos_values));
505+
let mut file_path_values = vec![];
506+
let mut pos_values = vec![];
507+
508+
file_path_values.push(vec![
509+
format!(
510+
"{}/1.parquet",
511+
table_location.to_str().unwrap()
512+
);
513+
3
514+
]);
515+
pos_values.push(vec![0, 1, 3]);
516+
517+
file_path_values.push(vec![
518+
format!(
519+
"{}/1.parquet",
520+
table_location.to_str().unwrap()
521+
);
522+
3
523+
]);
524+
pos_values.push(vec![5, 6, 8]);
525+
526+
file_path_values.push(vec![
527+
format!(
528+
"{}/2.parquet",
529+
table_location.to_str().unwrap()
530+
);
531+
3
532+
]);
533+
pos_values.push(vec![1022, 1023, 1024]);
534+
// 9 rows in total pos deleted across 3 files
467535

468536
let props = WriterProperties::builder()
469537
.set_compression(Compression::SNAPPY)
470538
.build();
471539

472540
for n in 1..=3 {
541+
let file_path_col = Arc::new(StringArray::from_iter_values(
542+
file_path_values.pop().unwrap(),
543+
));
544+
let pos_col = Arc::new(Int64Array::from_iter_values(pos_values.pop().unwrap()));
545+
473546
let positional_deletes_to_write =
474547
RecordBatch::try_new(positional_delete_schema.clone(), vec![
475548
file_path_col.clone(),
476-
pos_col.clone(),
549+
pos_col,
477550
])
478551
.unwrap();
479552

@@ -521,7 +594,7 @@ mod tests {
521594
start: 0,
522595
length: 0,
523596
record_count: None,
524-
data_file_path: "".to_string(),
597+
data_file_path: format!("{}/1.parquet", table_location.to_str().unwrap()),
525598
data_file_content: DataContentType::Data,
526599
data_file_format: DataFileFormat::Parquet,
527600
schema: data_file_schema.clone(),
@@ -533,13 +606,13 @@ mod tests {
533606
start: 0,
534607
length: 0,
535608
record_count: None,
536-
data_file_path: "".to_string(),
609+
data_file_path: format!("{}/2.parquet", table_location.to_str().unwrap()),
537610
data_file_content: DataContentType::Data,
538611
data_file_format: DataFileFormat::Parquet,
539612
schema: data_file_schema.clone(),
540613
project_field_ids: vec![],
541614
predicate: None,
542-
deletes: vec![pos_del_2, pos_del_3],
615+
deletes: vec![pos_del_3],
543616
},
544617
];
545618

crates/integration_tests/tests/shared_tests/read_positional_deletes.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,43 @@ async fn test_read_table_with_positional_deletes() {
6969
// let num_rows: usize = batches.iter().map(|v| v.num_rows()).sum();
7070
// assert_eq!(num_rows, 10);
7171
}
72+
73+
#[tokio::test]
74+
async fn test_read_table_with_positional_deletes_with_delete_support_enabled() {
75+
let fixture = get_shared_containers();
76+
let rest_catalog = RestCatalog::new(fixture.catalog_config.clone());
77+
78+
let table = rest_catalog
79+
.load_table(
80+
&TableIdent::from_strs(["default", "test_positional_merge_on_read_double_deletes"])
81+
.unwrap(),
82+
)
83+
.await
84+
.unwrap();
85+
86+
let scan = table
87+
.scan()
88+
.with_delete_file_processing_enabled(true)
89+
.build()
90+
.unwrap();
91+
println!("{:?}", scan);
92+
93+
let plan: Vec<_> = scan
94+
.plan_files()
95+
.await
96+
.unwrap()
97+
.try_collect()
98+
.await
99+
.unwrap();
100+
println!("{:?}", plan);
101+
102+
// Scan plan phase should include delete files in file plan
103+
// when with_delete_file_processing_enabled == true
104+
assert_eq!(plan[0].deletes.len(), 2);
105+
106+
// we should see two rows deleted, returning 10 rows instead of 12
107+
let batch_stream = scan.to_arrow().await.unwrap();
108+
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
109+
let num_rows: usize = batches.iter().map(|v| v.num_rows()).sum();
110+
assert_eq!(num_rows, 10);
111+
}

0 commit comments

Comments
 (0)