Skip to content

Commit 7b755fd

Browse files
committed
feat: add positional delete parsing. Add tests for end-to-end positional delete functionality
1 parent 55f656b commit 7b755fd

File tree

4 files changed

+176
-33
lines changed

4 files changed

+176
-33
lines changed

crates/iceberg/src/arrow/delete_file_loader.rs

Lines changed: 89 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use std::collections::HashMap;
1919
use std::sync::Arc;
2020

21+
use arrow_array::{Int64Array, StringArray};
2122
use futures::channel::oneshot;
2223
use futures::future::join_all;
2324
use futures::{StreamExt, TryStreamExt};
@@ -347,14 +348,44 @@ impl CachingDeleteFileLoader {
347348
///
348349
/// Returns a map of data file path to a delete vector
349350
async fn parse_positional_deletes_record_batch_stream(
350-
stream: ArrowRecordBatchStream,
351+
mut stream: ArrowRecordBatchStream,
351352
) -> Result<HashMap<String, DeleteVector>> {
352-
// TODO
353+
let mut result: HashMap<String, DeleteVector> = HashMap::default();
354+
355+
while let Some(batch) = stream.next().await {
356+
let batch = batch?;
357+
let schema = batch.schema();
358+
let columns = batch.columns();
359+
360+
let Some(file_paths) = columns[0].as_any().downcast_ref::<StringArray>() else {
361+
return Err(Error::new(
362+
ErrorKind::DataInvalid,
363+
"Could not downcast file paths array to StringArray",
364+
));
365+
};
366+
let Some(positions) = columns[1].as_any().downcast_ref::<Int64Array>() else {
367+
return Err(Error::new(
368+
ErrorKind::DataInvalid,
369+
"Could not downcast positions array to Int64Array",
370+
));
371+
};
372+
373+
for (file_path, pos) in file_paths.iter().zip(positions.iter()) {
374+
let (Some(file_path), Some(pos)) = (file_path, pos) else {
375+
return Err(Error::new(
376+
ErrorKind::DataInvalid,
377+
"null values in delete file",
378+
));
379+
};
353380

354-
Err(Error::new(
355-
ErrorKind::FeatureUnsupported,
356-
"parsing of positional deletes is not yet supported",
357-
))
381+
result
382+
.entry(file_path.to_string())
383+
.or_default()
384+
.insert(pos as u64);
385+
}
386+
}
387+
388+
Ok(result)
358389
}
359390

360391
/// Parses record batch streams from individual equality delete files
@@ -395,7 +426,7 @@ mod tests {
395426
const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
396427

397428
#[tokio::test]
398-
async fn test_delete_file_manager_load_deletes() {
429+
async fn test_delete_file_loader_load_deletes() {
399430
let tmp_dir = TempDir::new().unwrap();
400431
let table_location = tmp_dir.path();
401432
let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
@@ -405,37 +436,76 @@ mod tests {
405436

406437
// Note that with the delete file parsing not yet in place, all we can test here is that
407438
// the call to the loader fails with the expected FeatureUnsupportedError.
408-
let delete_file_manager = CachingDeleteFileLoader::new(file_io.clone(), 10);
439+
let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10);
409440

410441
let file_scan_tasks = setup(table_location);
411442

412-
let result = delete_file_manager
443+
let delete_filter = delete_file_loader
413444
.load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref())
414445
.await
446+
.unwrap()
447+
.unwrap();
448+
449+
let result = delete_filter
450+
.get_delete_vector(&file_scan_tasks[0])
415451
.unwrap();
452+
assert_eq!(result.lock().unwrap().len(), 3); // pos dels from pos del file 1 and 2
416453

417-
assert!(result.is_err_and(|e| e.kind() == ErrorKind::FeatureUnsupported));
454+
let result = delete_filter
455+
.get_delete_vector(&file_scan_tasks[1])
456+
.unwrap();
457+
assert_eq!(result.lock().unwrap().len(), 3); // pos dels from pos del file 3
418458
}
419459

420460
fn setup(table_location: &Path) -> Vec<FileScanTask> {
421461
let data_file_schema = Arc::new(Schema::builder().build().unwrap());
422462
let positional_delete_schema = create_pos_del_schema();
423463

424-
let file_path_values = vec![format!("{}/1.parquet", table_location.to_str().unwrap()); 8];
425-
let pos_values = vec![0, 1, 3, 5, 6, 8, 1022, 1023];
426-
427-
let file_path_col = Arc::new(StringArray::from_iter_values(file_path_values));
428-
let pos_col = Arc::new(Int64Array::from_iter_values(pos_values));
464+
let mut file_path_values = vec![];
465+
let mut pos_values = vec![];
466+
467+
file_path_values.push(vec![
468+
format!(
469+
"{}/1.parquet",
470+
table_location.to_str().unwrap()
471+
);
472+
3
473+
]);
474+
pos_values.push(vec![0, 1, 3]);
475+
476+
file_path_values.push(vec![
477+
format!(
478+
"{}/1.parquet",
479+
table_location.to_str().unwrap()
480+
);
481+
3
482+
]);
483+
pos_values.push(vec![5, 6, 8]);
484+
485+
file_path_values.push(vec![
486+
format!(
487+
"{}/2.parquet",
488+
table_location.to_str().unwrap()
489+
);
490+
3
491+
]);
492+
pos_values.push(vec![1022, 1023, 1024]);
493+
// 9 rows in total pos deleted across 3 files
429494

430495
let props = WriterProperties::builder()
431496
.set_compression(Compression::SNAPPY)
432497
.build();
433498

434499
for n in 1..=3 {
500+
let file_path_col = Arc::new(StringArray::from_iter_values(
501+
file_path_values.pop().unwrap(),
502+
));
503+
let pos_col = Arc::new(Int64Array::from_iter_values(pos_values.pop().unwrap()));
504+
435505
let positional_deletes_to_write =
436506
RecordBatch::try_new(positional_delete_schema.clone(), vec![
437507
file_path_col.clone(),
438-
pos_col.clone(),
508+
pos_col,
439509
])
440510
.unwrap();
441511

@@ -486,7 +556,7 @@ mod tests {
486556
start: 0,
487557
length: 0,
488558
record_count: None,
489-
data_file_path: "".to_string(),
559+
data_file_path: format!("{}/1.parquet", table_location.to_str().unwrap()),
490560
data_file_content: DataContentType::Data,
491561
data_file_format: DataFileFormat::Parquet,
492562
schema: data_file_schema.clone(),
@@ -498,13 +568,13 @@ mod tests {
498568
start: 0,
499569
length: 0,
500570
record_count: None,
501-
data_file_path: "".to_string(),
571+
data_file_path: format!("{}/2.parquet", table_location.to_str().unwrap()),
502572
data_file_content: DataContentType::Data,
503573
data_file_format: DataFileFormat::Parquet,
504574
schema: data_file_schema.clone(),
505575
project_field_ids: vec![],
506576
predicate: None,
507-
deletes: vec![pos_del_2, pos_del_3],
577+
deletes: vec![pos_del_3],
508578
},
509579
];
510580

crates/iceberg/src/arrow/delete_filter.rs

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -202,43 +202,67 @@ mod tests {
202202
const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
203203

204204
#[tokio::test]
205-
async fn test_delete_file_manager_load_deletes() {
205+
async fn test_delete_file_loader_load_deletes() {
206206
let tmp_dir = TempDir::new().unwrap();
207207
let table_location = tmp_dir.path();
208208
let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
209209
.unwrap()
210210
.build()
211211
.unwrap();
212212

213-
// Note that with the delete file parsing not yet in place, all we can test here is that
214-
// the call to the loader fails with the expected FeatureUnsupportedError.
215-
let delete_file_manager = CachingDeleteFileLoader::new(file_io.clone(), 10);
213+
let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10);
216214

217215
let file_scan_tasks = setup(table_location);
218216

219-
let result = delete_file_manager
217+
let delete_filter = delete_file_loader
220218
.load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref())
221219
.await
220+
.unwrap()
221+
.unwrap();
222+
223+
let _ = delete_file_loader
224+
.load_deletes(&file_scan_tasks[1].deletes, file_scan_tasks[1].schema_ref())
225+
.await
226+
.unwrap()
227+
.unwrap();
228+
229+
let result = delete_filter
230+
.get_delete_vector(&file_scan_tasks[0])
222231
.unwrap();
232+
assert_eq!(result.lock().unwrap().len(), 12); // pos dels from pos del file 1 and 2
223233

224-
assert!(result.is_err_and(|e| e.kind() == ErrorKind::FeatureUnsupported));
234+
let result = delete_filter
235+
.get_delete_vector(&file_scan_tasks[1])
236+
.unwrap();
237+
assert_eq!(result.lock().unwrap().len(), 8); // pos dels from pos del file 3
225238
}
226239

227240
fn setup(table_location: &Path) -> Vec<FileScanTask> {
228241
let data_file_schema = Arc::new(Schema::builder().build().unwrap());
229242
let positional_delete_schema = create_pos_del_schema();
230243

231-
let file_path_values = vec![format!("{}/1.parquet", table_location.to_str().unwrap()); 8];
232-
let pos_values = vec![0, 1, 3, 5, 6, 8, 1022, 1023];
233-
234-
let file_path_col = Arc::new(StringArray::from_iter_values(file_path_values));
235-
let pos_col = Arc::new(Int64Array::from_iter_values(pos_values));
244+
let file_path_values = [
245+
vec![format!("{}/1.parquet", table_location.to_str().unwrap()); 8],
246+
vec![format!("{}/1.parquet", table_location.to_str().unwrap()); 8],
247+
vec![format!("{}/2.parquet", table_location.to_str().unwrap()); 8],
248+
];
249+
let pos_values = [
250+
vec![0i64, 1, 3, 5, 6, 8, 1022, 1023],
251+
vec![0i64, 1, 3, 5, 20, 21, 22, 23],
252+
vec![0i64, 1, 3, 5, 6, 8, 1022, 1023],
253+
];
236254

237255
let props = WriterProperties::builder()
238256
.set_compression(Compression::SNAPPY)
239257
.build();
240258

241259
for n in 1..=3 {
260+
let file_path_vals = file_path_values.get(n - 1).unwrap();
261+
let file_path_col = Arc::new(StringArray::from_iter_values(file_path_vals));
262+
263+
let pos_vals = pos_values.get(n - 1).unwrap();
264+
let pos_col = Arc::new(Int64Array::from_iter_values(pos_vals.clone()));
265+
242266
let positional_deletes_to_write =
243267
RecordBatch::try_new(positional_delete_schema.clone(), vec![
244268
file_path_col.clone(),
@@ -293,7 +317,7 @@ mod tests {
293317
start: 0,
294318
length: 0,
295319
record_count: None,
296-
data_file_path: "".to_string(),
320+
data_file_path: format!("{}/1.parquet", table_location.to_str().unwrap()),
297321
data_file_content: DataContentType::Data,
298322
data_file_format: DataFileFormat::Parquet,
299323
schema: data_file_schema.clone(),
@@ -305,13 +329,13 @@ mod tests {
305329
start: 0,
306330
length: 0,
307331
record_count: None,
308-
data_file_path: "".to_string(),
332+
data_file_path: format!("{}/2.parquet", table_location.to_str().unwrap()),
309333
data_file_content: DataContentType::Data,
310334
data_file_format: DataFileFormat::Parquet,
311335
schema: data_file_schema.clone(),
312336
project_field_ids: vec![],
313337
predicate: None,
314-
deletes: vec![pos_del_2, pos_del_3],
338+
deletes: vec![pos_del_3],
315339
},
316340
];
317341

crates/iceberg/src/delete_vector.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,15 @@ impl DeleteVector {
3838
let outer = self.inner.bitmaps();
3939
DeleteVectorIterator { outer, inner: None }
4040
}
41+
42+
pub fn insert(&mut self, pos: u64) -> bool {
43+
self.inner.insert(pos)
44+
}
45+
46+
#[allow(unused)]
47+
pub fn len(&self) -> u64 {
48+
self.inner.len()
49+
}
4150
}
4251

4352
// Ideally, we'd just wrap `roaring::RoaringTreemap`'s iterator, `roaring::treemap::Iter` here.

crates/integration_tests/tests/shared_tests/read_positional_deletes.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,43 @@ async fn test_read_table_with_positional_deletes() {
6969
// let num_rows: usize = batches.iter().map(|v| v.num_rows()).sum();
7070
// assert_eq!(num_rows, 10);
7171
}
72+
73+
#[tokio::test]
74+
async fn test_read_table_with_positional_deletes_with_delete_support_enabled() {
75+
let fixture = get_shared_containers();
76+
let rest_catalog = RestCatalog::new(fixture.catalog_config.clone());
77+
78+
let table = rest_catalog
79+
.load_table(
80+
&TableIdent::from_strs(["default", "test_positional_merge_on_read_double_deletes"])
81+
.unwrap(),
82+
)
83+
.await
84+
.unwrap();
85+
86+
let scan = table
87+
.scan()
88+
.with_delete_file_processing_enabled(true)
89+
.build()
90+
.unwrap();
91+
println!("{:?}", scan);
92+
93+
let plan: Vec<_> = scan
94+
.plan_files()
95+
.await
96+
.unwrap()
97+
.try_collect()
98+
.await
99+
.unwrap();
100+
println!("{:?}", plan);
101+
102+
// Scan plan phase should include delete files in file plan
103+
// when with_delete_file_processing_enabled == true
104+
assert_eq!(plan[0].deletes.len(), 2);
105+
106+
// we should see two rows deleted, returning 10 rows instead of 12
107+
let batch_stream = scan.to_arrow().await.unwrap();
108+
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
109+
let num_rows: usize = batches.iter().map(|v| v.num_rows()).sum();
110+
assert_eq!(num_rows, 10);
111+
}

0 commit comments

Comments
 (0)