Skip to content

Commit 52d3e2c

Browse files
committed
refactor: disabling delete file support will error in the read phase instead of the plan phase if a delete file is encountered
1 parent 6a5a421 commit 52d3e2c

File tree

3 files changed

+59
-90
lines changed

3 files changed

+59
-90
lines changed

crates/iceberg/src/arrow/reader.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -202,11 +202,7 @@ impl ArrowReader {
202202
// concurrently retrieve delete files and create RecordBatchStreamBuilder
203203
let (_, mut record_batch_stream_builder) = try_join!(
204204
delete_file_manager.load_deletes(
205-
if delete_file_support_enabled {
206-
&task.deletes
207-
} else {
208-
&[]
209-
},
205+
&task.deletes,
210206
file_io.clone(),
211207
concurrency_limit_data_files,
212208
),

crates/iceberg/src/scan.rs

Lines changed: 51 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -375,12 +375,7 @@ impl TableScan {
375375
// used to stream the results back to the caller
376376
let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);
377377

378-
let delete_file_idx_and_tx: Option<(DeleteFileIndex, Sender<DeleteFileContext>)> =
379-
if self.delete_file_processing_enabled {
380-
Some(DeleteFileIndex::new())
381-
} else {
382-
None
383-
};
378+
let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new();
384379

385380
let manifest_list = self.plan_context.get_manifest_list().await?;
386381

@@ -390,9 +385,8 @@ impl TableScan {
390385
let manifest_file_contexts = self.plan_context.build_manifest_file_contexts(
391386
manifest_list,
392387
manifest_entry_data_ctx_tx,
393-
delete_file_idx_and_tx.as_ref().map(|(delete_file_idx, _)| {
394-
(delete_file_idx.clone(), manifest_entry_delete_ctx_tx)
395-
}),
388+
delete_file_idx.clone(),
389+
manifest_entry_delete_ctx_tx,
396390
)?;
397391

398392
let mut channel_for_manifest_error = file_scan_task_tx.clone();
@@ -411,34 +405,30 @@ impl TableScan {
411405
});
412406

413407
let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone();
408+
let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone();
414409

415-
if let Some((_, delete_file_tx)) = delete_file_idx_and_tx {
416-
let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone();
417-
418-
// Process the delete file [`ManifestEntry`] stream in parallel
419-
spawn(async move {
420-
let result = manifest_entry_delete_ctx_rx
421-
.map(|me_ctx| Ok((me_ctx, delete_file_tx.clone())))
422-
.try_for_each_concurrent(
423-
concurrency_limit_manifest_entries,
424-
|(manifest_entry_context, tx)| async move {
425-
spawn(async move {
426-
Self::process_delete_manifest_entry(manifest_entry_context, tx)
427-
.await
428-
})
429-
.await
430-
},
431-
)
432-
.await;
410+
// Process the delete file [`ManifestEntry`] stream in parallel
411+
spawn(async move {
412+
let result = manifest_entry_delete_ctx_rx
413+
.map(|me_ctx| Ok((me_ctx, delete_file_tx.clone())))
414+
.try_for_each_concurrent(
415+
concurrency_limit_manifest_entries,
416+
|(manifest_entry_context, tx)| async move {
417+
spawn(async move {
418+
Self::process_delete_manifest_entry(manifest_entry_context, tx).await
419+
})
420+
.await
421+
},
422+
)
423+
.await;
433424

434-
if let Err(error) = result {
435-
let _ = channel_for_delete_manifest_entry_error
436-
.send(Err(error))
437-
.await;
438-
}
439-
})
440-
.await;
441-
}
425+
if let Err(error) = result {
426+
let _ = channel_for_delete_manifest_entry_error
427+
.send(Err(error))
428+
.await;
429+
}
430+
})
431+
.await;
442432

443433
// Process the data file [`ManifestEntry`] stream in parallel
444434
spawn(async move {
@@ -460,15 +450,16 @@ impl TableScan {
460450
}
461451
});
462452

463-
return Ok(file_scan_task_rx.boxed());
453+
Ok(file_scan_task_rx.boxed())
464454
}
465455

466456
/// Returns an [`ArrowRecordBatchStream`].
467457
pub async fn to_arrow(&self) -> Result<ArrowRecordBatchStream> {
468458
let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone())
469459
.with_data_file_concurrency_limit(self.concurrency_limit_data_files)
470460
.with_row_group_filtering_enabled(self.row_group_filtering_enabled)
471-
.with_row_selection_enabled(self.row_selection_enabled);
461+
.with_row_selection_enabled(self.row_selection_enabled)
462+
.with_delete_file_support_enabled(self.delete_file_processing_enabled);
472463

473464
if let Some(batch_size) = self.batch_size {
474465
arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);
@@ -608,7 +599,7 @@ struct ManifestFileContext {
608599
object_cache: Arc<ObjectCache>,
609600
snapshot_schema: SchemaRef,
610601
expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
611-
delete_file_index: Option<DeleteFileIndex>,
602+
delete_file_index: DeleteFileIndex,
612603
}
613604

614605
/// Wraps a [`ManifestEntryRef`] alongside the objects that are needed
@@ -621,7 +612,7 @@ struct ManifestEntryContext {
621612
bound_predicates: Option<Arc<BoundPredicates>>,
622613
partition_spec_id: i32,
623614
snapshot_schema: SchemaRef,
624-
delete_file_index: Option<DeleteFileIndex>,
615+
delete_file_index: DeleteFileIndex,
625616
}
626617

627618
impl ManifestFileContext {
@@ -668,16 +659,13 @@ impl ManifestEntryContext {
668659
/// consume this `ManifestEntryContext`, returning a `FileScanTask`
669660
/// created from it
670661
async fn into_file_scan_task(self) -> Result<FileScanTask> {
671-
let deletes = if let Some(delete_file_index) = self.delete_file_index {
672-
delete_file_index
673-
.get_deletes_for_data_file(
674-
self.manifest_entry.data_file(),
675-
self.manifest_entry.sequence_number(),
676-
)
677-
.await?
678-
} else {
679-
vec![]
680-
};
662+
let deletes = self
663+
.delete_file_index
664+
.get_deletes_for_data_file(
665+
self.manifest_entry.data_file(),
666+
self.manifest_entry.sequence_number(),
667+
)
668+
.await?;
681669

682670
Ok(FileScanTask {
683671
start: 0,
@@ -732,24 +720,19 @@ impl PlanContext {
732720
&self,
733721
manifest_list: Arc<ManifestList>,
734722
tx_data: Sender<ManifestEntryContext>,
735-
delete_file_idx_and_tx: Option<(DeleteFileIndex, Sender<ManifestEntryContext>)>,
723+
delete_file_idx: DeleteFileIndex,
724+
delete_file_tx: Sender<ManifestEntryContext>,
736725
) -> Result<Box<impl Iterator<Item = Result<ManifestFileContext>>>> {
737726
let manifest_files = manifest_list.entries().iter();
738727

739728
// TODO: Ideally we could ditch this intermediate Vec as we return an iterator.
740729
let mut filtered_mfcs = vec![];
741730

742731
for manifest_file in manifest_files {
743-
let (delete_file_idx, tx) = if manifest_file.content == ManifestContentType::Deletes {
744-
let Some((delete_file_idx, tx)) = delete_file_idx_and_tx.as_ref() else {
745-
continue;
746-
};
747-
(Some(delete_file_idx.clone()), tx.clone())
732+
let tx = if manifest_file.content == ManifestContentType::Deletes {
733+
delete_file_tx.clone()
748734
} else {
749-
(
750-
delete_file_idx_and_tx.as_ref().map(|x| x.0.clone()),
751-
tx_data.clone(),
752-
)
735+
tx_data.clone()
753736
};
754737

755738
let partition_bound_predicate = if self.predicate.is_some() {
@@ -777,7 +760,7 @@ impl PlanContext {
777760
manifest_file,
778761
partition_bound_predicate,
779762
tx,
780-
delete_file_idx,
763+
delete_file_idx.clone(),
781764
);
782765

783766
filtered_mfcs.push(Ok(mfc));
@@ -791,7 +774,7 @@ impl PlanContext {
791774
manifest_file: &ManifestFile,
792775
partition_filter: Option<Arc<BoundPredicate>>,
793776
sender: Sender<ManifestEntryContext>,
794-
delete_file_index: Option<DeleteFileIndex>,
777+
delete_file_index: DeleteFileIndex,
795778
) -> ManifestFileContext {
796779
let bound_predicates =
797780
if let (Some(ref partition_bound_predicate), Some(snapshot_bound_predicate)) =
@@ -1003,23 +986,18 @@ impl ExpressionEvaluatorCache {
1003986
ErrorKind::Unexpected,
1004987
"ManifestEvaluatorCache RwLock was poisoned",
1005988
)
1006-
})
1007-
.unwrap()
989+
})?
1008990
.insert(
1009991
spec_id,
1010992
Arc::new(ExpressionEvaluator::new(partition_filter.clone())),
1011993
);
1012994

1013-
let read = self
1014-
.0
1015-
.read()
1016-
.map_err(|_| {
1017-
Error::new(
1018-
ErrorKind::Unexpected,
1019-
"ManifestEvaluatorCache RwLock was poisoned",
1020-
)
1021-
})
1022-
.unwrap();
995+
let read = self.0.read().map_err(|_| {
996+
Error::new(
997+
ErrorKind::Unexpected,
998+
"ManifestEvaluatorCache RwLock was poisoned",
999+
)
1000+
})?;
10231001

10241002
Ok(read.get(&spec_id).unwrap().clone())
10251003
}

crates/integration_tests/tests/shared_tests/read_positional_deletes.rs

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use iceberg_catalog_rest::RestCatalog;
2525
use crate::get_shared_containers;
2626

2727
#[tokio::test]
28-
async fn test_read_table_with_positional_deletes() {
28+
async fn test_read_table_with_positional_deletes_with_delete_support_disabled() {
2929
let fixture = get_shared_containers();
3030
let rest_catalog = RestCatalog::new(fixture.catalog_config.clone());
3131

@@ -39,7 +39,7 @@ async fn test_read_table_with_positional_deletes() {
3939

4040
let scan = table
4141
.scan()
42-
.with_delete_file_processing_enabled(true)
42+
.with_delete_file_processing_enabled(false)
4343
.build()
4444
.unwrap();
4545
println!("{:?}", scan);
@@ -53,21 +53,16 @@ async fn test_read_table_with_positional_deletes() {
5353
.unwrap();
5454
println!("{:?}", plan);
5555

56-
// Scan plan phase should include delete files in file plan
57-
// when with_delete_file_processing_enabled == true
56+
// Scan plan phase stills include delete files in file plan
57+
// when with_delete_file_processing_enabled == false. We instead
58+
// fail at the read phase after this.
5859
assert_eq!(plan[0].deletes.len(), 2);
5960

60-
// 😱 If we don't support positional deletes, we should fail when we try to read a table that
61-
// has positional deletes. The table has 12 rows, and 2 are deleted, see provision.py
61+
// with delete_file_processing_enabled == false, we should fail when we
62+
// try to read a table that has positional deletes.
6263
let result = scan.to_arrow().await.unwrap().try_collect::<Vec<_>>().await;
6364

6465
assert!(result.is_err_and(|e| e.kind() == FeatureUnsupported));
65-
66-
// When we get support for it:
67-
// let batch_stream = scan.to_arrow().await.unwrap();
68-
// let batches: Vec<_> = batch_stream.try_collect().await.is_err();
69-
// let num_rows: usize = batches.iter().map(|v| v.num_rows()).sum();
70-
// assert_eq!(num_rows, 10);
7166
}
7267

7368
#[tokio::test]

0 commit comments

Comments
 (0)