Skip to content

Commit 68d627e

Browse files
committed
feat: remove flag to selectively enable delete file processing in vavour of it being always on
1 parent fe62959 commit 68d627e

File tree

5 files changed

+48
-116
lines changed

5 files changed

+48
-116
lines changed

crates/iceberg/src/arrow/delete_filter.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,21 +73,21 @@ struct DeleteFileFilterState {
7373
}
7474

7575
#[derive(Clone, Debug, Default)]
76-
pub struct DeleteFilter {
76+
pub(crate) struct DeleteFilter {
7777
state: Arc<RwLock<DeleteFileFilterState>>,
7878
}
7979

8080
impl DeleteFilter {
8181
/// Retrieve a delete vector for the data file associated with a given file scan task
82-
pub fn get_delete_vector(
82+
pub(crate) fn get_delete_vector(
8383
&self,
8484
file_scan_task: &FileScanTask,
8585
) -> Option<Arc<Mutex<DeleteVector>>> {
8686
self.get_delete_vector_for_path(file_scan_task.data_file_path())
8787
}
8888

8989
/// Retrieve a delete vector for a data file
90-
pub fn get_delete_vector_for_path(
90+
pub(crate) fn get_delete_vector_for_path(
9191
&self,
9292
delete_file_path: &str,
9393
) -> Option<Arc<Mutex<DeleteVector>>> {
@@ -111,7 +111,7 @@ impl DeleteFilter {
111111
}
112112

113113
/// Builds eq delete predicate for the provided task.
114-
pub async fn build_equality_delete_predicate(
114+
pub(crate) async fn build_equality_delete_predicate(
115115
&self,
116116
file_scan_task: &FileScanTask,
117117
) -> Result<Option<BoundPredicate>> {

crates/iceberg/src/arrow/reader.rs

Lines changed: 2 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ pub struct ArrowReaderBuilder {
6464
concurrency_limit_data_files: usize,
6565
row_group_filtering_enabled: bool,
6666
row_selection_enabled: bool,
67-
delete_file_support_enabled: bool,
6867
}
6968

7069
impl ArrowReaderBuilder {
@@ -78,7 +77,6 @@ impl ArrowReaderBuilder {
7877
concurrency_limit_data_files: num_cpus,
7978
row_group_filtering_enabled: true,
8079
row_selection_enabled: false,
81-
delete_file_support_enabled: false,
8280
}
8381
}
8482

@@ -107,12 +105,6 @@ impl ArrowReaderBuilder {
107105
self
108106
}
109107

110-
/// Determines whether to enable delete file support.
111-
pub fn with_delete_file_support_enabled(mut self, delete_file_support_enabled: bool) -> Self {
112-
self.delete_file_support_enabled = delete_file_support_enabled;
113-
self
114-
}
115-
116108
/// Build the ArrowReader.
117109
pub fn build(self) -> ArrowReader {
118110
ArrowReader {
@@ -125,7 +117,6 @@ impl ArrowReaderBuilder {
125117
concurrency_limit_data_files: self.concurrency_limit_data_files,
126118
row_group_filtering_enabled: self.row_group_filtering_enabled,
127119
row_selection_enabled: self.row_selection_enabled,
128-
delete_file_support_enabled: self.delete_file_support_enabled,
129120
}
130121
}
131122
}
@@ -142,7 +133,6 @@ pub struct ArrowReader {
142133

143134
row_group_filtering_enabled: bool,
144135
row_selection_enabled: bool,
145-
delete_file_support_enabled: bool,
146136
}
147137

148138
impl ArrowReader {
@@ -154,7 +144,6 @@ impl ArrowReader {
154144
let concurrency_limit_data_files = self.concurrency_limit_data_files;
155145
let row_group_filtering_enabled = self.row_group_filtering_enabled;
156146
let row_selection_enabled = self.row_selection_enabled;
157-
let delete_file_support_enabled = self.delete_file_support_enabled;
158147

159148
let stream = tasks
160149
.map_ok(move |task| {
@@ -167,7 +156,6 @@ impl ArrowReader {
167156
self.delete_file_loader.clone(),
168157
row_group_filtering_enabled,
169158
row_selection_enabled,
170-
delete_file_support_enabled,
171159
)
172160
})
173161
.map_err(|err| {
@@ -187,26 +175,12 @@ impl ArrowReader {
187175
delete_file_loader: CachingDeleteFileLoader,
188176
row_group_filtering_enabled: bool,
189177
row_selection_enabled: bool,
190-
delete_file_support_enabled: bool,
191178
) -> Result<ArrowRecordBatchStream> {
192-
if !delete_file_support_enabled && !task.deletes.is_empty() {
193-
return Err(Error::new(
194-
ErrorKind::FeatureUnsupported,
195-
"Delete file support is not enabled",
196-
));
197-
}
198-
199179
let should_load_page_index =
200180
(row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty();
201181

202-
let delete_filter_rx = delete_file_loader.load_deletes(
203-
if delete_file_support_enabled {
204-
&task.deletes
205-
} else {
206-
&[]
207-
},
208-
task.schema.clone(),
209-
);
182+
let delete_filter_rx = delete_file_loader.load_deletes(&task.deletes, task.schema.clone());
183+
210184
let mut record_batch_stream_builder = Self::create_parquet_record_batch_stream_builder(
211185
&task.data_file_path,
212186
file_io.clone(),

crates/iceberg/src/scan/context.rs

Lines changed: 16 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ pub(crate) struct ManifestFileContext {
4545
object_cache: Arc<ObjectCache>,
4646
snapshot_schema: SchemaRef,
4747
expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
48-
delete_file_index: Option<DeleteFileIndex>,
48+
delete_file_index: DeleteFileIndex,
4949
}
5050

5151
/// Wraps a [`ManifestEntryRef`] alongside the objects that are needed
@@ -58,7 +58,7 @@ pub(crate) struct ManifestEntryContext {
5858
pub bound_predicates: Option<Arc<BoundPredicates>>,
5959
pub partition_spec_id: i32,
6060
pub snapshot_schema: SchemaRef,
61-
pub delete_file_index: Option<DeleteFileIndex>,
61+
pub delete_file_index: DeleteFileIndex,
6262
}
6363

6464
impl ManifestFileContext {
@@ -105,16 +105,13 @@ impl ManifestEntryContext {
105105
/// consume this `ManifestEntryContext`, returning a `FileScanTask`
106106
/// created from it
107107
pub(crate) async fn into_file_scan_task(self) -> Result<FileScanTask> {
108-
let deletes = if let Some(delete_file_index) = self.delete_file_index {
109-
delete_file_index
110-
.get_deletes_for_data_file(
111-
self.manifest_entry.data_file(),
112-
self.manifest_entry.sequence_number(),
113-
)
114-
.await?
115-
} else {
116-
vec![]
117-
};
108+
let deletes = self
109+
.delete_file_index
110+
.get_deletes_for_data_file(
111+
self.manifest_entry.data_file(),
112+
self.manifest_entry.sequence_number(),
113+
)
114+
.await?;
118115

119116
Ok(FileScanTask {
120117
start: 0,
@@ -188,24 +185,19 @@ impl PlanContext {
188185
&self,
189186
manifest_list: Arc<ManifestList>,
190187
tx_data: Sender<ManifestEntryContext>,
191-
delete_file_idx_and_tx: Option<(DeleteFileIndex, Sender<ManifestEntryContext>)>,
188+
delete_file_idx: DeleteFileIndex,
189+
delete_file_tx: Sender<ManifestEntryContext>,
192190
) -> Result<Box<impl Iterator<Item = Result<ManifestFileContext>> + 'static>> {
193191
let manifest_files = manifest_list.entries().iter();
194192

195193
// TODO: Ideally we could ditch this intermediate Vec as we return an iterator.
196194
let mut filtered_mfcs = vec![];
197195

198196
for manifest_file in manifest_files {
199-
let (delete_file_idx, tx) = if manifest_file.content == ManifestContentType::Deletes {
200-
let Some((delete_file_idx, tx)) = delete_file_idx_and_tx.as_ref() else {
201-
continue;
202-
};
203-
(Some(delete_file_idx.clone()), tx.clone())
197+
let tx = if manifest_file.content == ManifestContentType::Deletes {
198+
delete_file_tx.clone()
204199
} else {
205-
(
206-
delete_file_idx_and_tx.as_ref().map(|x| x.0.clone()),
207-
tx_data.clone(),
208-
)
200+
tx_data.clone()
209201
};
210202

211203
let partition_bound_predicate = if self.predicate.is_some() {
@@ -233,7 +225,7 @@ impl PlanContext {
233225
manifest_file,
234226
partition_bound_predicate,
235227
tx,
236-
delete_file_idx,
228+
delete_file_idx.clone(),
237229
);
238230

239231
filtered_mfcs.push(Ok(mfc));
@@ -247,7 +239,7 @@ impl PlanContext {
247239
manifest_file: &ManifestFile,
248240
partition_filter: Option<Arc<BoundPredicate>>,
249241
sender: Sender<ManifestEntryContext>,
250-
delete_file_index: Option<DeleteFileIndex>,
242+
delete_file_index: DeleteFileIndex,
251243
) -> ManifestFileContext {
252244
let bound_predicates =
253245
if let (Some(ref partition_bound_predicate), Some(snapshot_bound_predicate)) =

crates/iceberg/src/scan/mod.rs

Lines changed: 25 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,6 @@ pub struct TableScanBuilder<'a> {
5959
concurrency_limit_manifest_files: usize,
6060
row_group_filtering_enabled: bool,
6161
row_selection_enabled: bool,
62-
63-
// TODO: defaults to false for now whilst delete file processing
64-
// is still being worked on but will switch to a default of true
65-
// once this work is complete
66-
delete_file_processing_enabled: bool,
6762
}
6863

6964
impl<'a> TableScanBuilder<'a> {
@@ -82,7 +77,6 @@ impl<'a> TableScanBuilder<'a> {
8277
concurrency_limit_manifest_files: num_cpus,
8378
row_group_filtering_enabled: true,
8479
row_selection_enabled: false,
85-
delete_file_processing_enabled: false,
8680
}
8781
}
8882

@@ -189,17 +183,6 @@ impl<'a> TableScanBuilder<'a> {
189183
self
190184
}
191185

192-
/// Determines whether to enable delete file processing (currently disabled by default)
193-
///
194-
/// When disabled, delete files are ignored.
195-
pub fn with_delete_file_processing_enabled(
196-
mut self,
197-
delete_file_processing_enabled: bool,
198-
) -> Self {
199-
self.delete_file_processing_enabled = delete_file_processing_enabled;
200-
self
201-
}
202-
203186
/// Build the table scan.
204187
pub fn build(self) -> Result<TableScan> {
205188
let snapshot = match self.snapshot_id {
@@ -226,7 +209,6 @@ impl<'a> TableScanBuilder<'a> {
226209
concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
227210
row_group_filtering_enabled: self.row_group_filtering_enabled,
228211
row_selection_enabled: self.row_selection_enabled,
229-
delete_file_processing_enabled: self.delete_file_processing_enabled,
230212
});
231213
};
232214
current_snapshot_id.clone()
@@ -317,7 +299,6 @@ impl<'a> TableScanBuilder<'a> {
317299
concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
318300
row_group_filtering_enabled: self.row_group_filtering_enabled,
319301
row_selection_enabled: self.row_selection_enabled,
320-
delete_file_processing_enabled: self.delete_file_processing_enabled,
321302
})
322303
}
323304
}
@@ -346,7 +327,6 @@ pub struct TableScan {
346327

347328
row_group_filtering_enabled: bool,
348329
row_selection_enabled: bool,
349-
delete_file_processing_enabled: bool,
350330
}
351331

352332
impl TableScan {
@@ -368,12 +348,7 @@ impl TableScan {
368348
// used to stream the results back to the caller
369349
let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);
370350

371-
let delete_file_idx_and_tx: Option<(DeleteFileIndex, Sender<DeleteFileContext>)> =
372-
if self.delete_file_processing_enabled {
373-
Some(DeleteFileIndex::new())
374-
} else {
375-
None
376-
};
351+
let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new();
377352

378353
let manifest_list = plan_context.get_manifest_list().await?;
379354

@@ -383,9 +358,8 @@ impl TableScan {
383358
let manifest_file_contexts = plan_context.build_manifest_file_contexts(
384359
manifest_list,
385360
manifest_entry_data_ctx_tx,
386-
delete_file_idx_and_tx.as_ref().map(|(delete_file_idx, _)| {
387-
(delete_file_idx.clone(), manifest_entry_delete_ctx_tx)
388-
}),
361+
delete_file_idx.clone(),
362+
manifest_entry_delete_ctx_tx,
389363
)?;
390364

391365
let mut channel_for_manifest_error = file_scan_task_tx.clone();
@@ -404,34 +378,30 @@ impl TableScan {
404378
});
405379

406380
let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone();
381+
let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone();
407382

408-
if let Some((_, delete_file_tx)) = delete_file_idx_and_tx {
409-
let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone();
410-
411-
// Process the delete file [`ManifestEntry`] stream in parallel
412-
spawn(async move {
413-
let result = manifest_entry_delete_ctx_rx
414-
.map(|me_ctx| Ok((me_ctx, delete_file_tx.clone())))
415-
.try_for_each_concurrent(
416-
concurrency_limit_manifest_entries,
417-
|(manifest_entry_context, tx)| async move {
418-
spawn(async move {
419-
Self::process_delete_manifest_entry(manifest_entry_context, tx)
420-
.await
421-
})
422-
.await
423-
},
424-
)
425-
.await;
383+
// Process the delete file [`ManifestEntry`] stream in parallel
384+
spawn(async move {
385+
let result = manifest_entry_delete_ctx_rx
386+
.map(|me_ctx| Ok((me_ctx, delete_file_tx.clone())))
387+
.try_for_each_concurrent(
388+
concurrency_limit_manifest_entries,
389+
|(manifest_entry_context, tx)| async move {
390+
spawn(async move {
391+
Self::process_delete_manifest_entry(manifest_entry_context, tx).await
392+
})
393+
.await
394+
},
395+
)
396+
.await;
426397

427-
if let Err(error) = result {
428-
let _ = channel_for_delete_manifest_entry_error
429-
.send(Err(error))
430-
.await;
431-
}
432-
})
433-
.await;
434-
}
398+
if let Err(error) = result {
399+
let _ = channel_for_delete_manifest_entry_error
400+
.send(Err(error))
401+
.await;
402+
}
403+
})
404+
.await;
435405

436406
// Process the data file [`ManifestEntry`] stream in parallel
437407
spawn(async move {

crates/integration_tests/tests/shared_tests/read_positional_deletes.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,7 @@ async fn test_read_table_with_positional_deletes() {
3737
.await
3838
.unwrap();
3939

40-
let scan = table
41-
.scan()
42-
.with_delete_file_processing_enabled(true)
43-
.build()
44-
.unwrap();
40+
let scan = table.scan().build().unwrap();
4541
println!("{:?}", scan);
4642

4743
let plan: Vec<_> = scan

0 commit comments

Comments
 (0)