Skip to content

Commit cc5dba4

Browse files
committed
refactor: improve design of DeleteFileManager
1 parent 7a8d297 commit cc5dba4

File tree

1 file changed

+93
-78
lines changed

1 file changed

+93
-78
lines changed

crates/iceberg/src/scan.rs

+93-78
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818
//! Table scan api.
1919
2020
use std::collections::HashMap;
21+
use std::future::Future;
22+
use std::pin::Pin;
2123
use std::sync::{Arc, RwLock};
24+
use std::task::{Context, Poll};
2225

2326
use arrow_array::RecordBatch;
2427
use futures::channel::mpsc::{channel, Receiver, Sender};
@@ -292,7 +295,6 @@ impl<'a> TableScanBuilder<'a> {
292295
partition_filter_cache: Arc::new(PartitionFilterCache::new()),
293296
manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
294297
expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()),
295-
delete_file_manager: Arc::new(DeleteFileManager::new()),
296298
};
297299

298300
Ok(TableScan {
@@ -332,68 +334,6 @@ pub struct TableScan {
332334
row_selection_enabled: bool,
333335
}
334336

335-
/// Manages async retrieval of all of a given snapshot's delete files
336-
/// from FileIO and then subsequently serving filtered references to them
337-
/// up for inclusion within FileScanTasks
338-
#[derive(Debug)]
339-
struct DeleteFileManager {
340-
file_scan_task_delete_files: RwLock<Option<Arc<Vec<FileScanTaskDeleteFile>>>>,
341-
}
342-
343-
impl DeleteFileManager {
344-
pub(crate) fn new() -> Self {
345-
DeleteFileManager {
346-
file_scan_task_delete_files: RwLock::new(None),
347-
}
348-
}
349-
350-
pub(crate) async fn handle_delete_file_stream(
351-
&self,
352-
delete_file_rx: Receiver<Result<FileScanTaskDeleteFile>>,
353-
) -> Result<()> {
354-
let mut delete_file_stream = delete_file_rx.boxed();
355-
356-
let mut delete_files = vec![];
357-
358-
while let Some(delete_file) = delete_file_stream.try_next().await? {
359-
delete_files.push(delete_file);
360-
}
361-
362-
if !delete_files.is_empty() {
363-
let mut guard = self
364-
.file_scan_task_delete_files
365-
.write()
366-
.map_err(|_| {
367-
Error::new(
368-
ErrorKind::Unexpected,
369-
"DeleteFileManager RwLock was poisoned",
370-
)
371-
})
372-
.unwrap();
373-
374-
*guard = Some(Arc::new(delete_files));
375-
}
376-
377-
Ok(())
378-
}
379-
380-
pub(crate) async fn get_deletes_for_data_file(
381-
&self,
382-
_data_file: &DataFile,
383-
) -> Option<Arc<Vec<FileScanTaskDeleteFile>>> {
384-
self.file_scan_task_delete_files
385-
.read()
386-
.map_err(|_| {
387-
Error::new(
388-
ErrorKind::Unexpected,
389-
"DeleteFileManager RwLock was poisoned",
390-
)
391-
})
392-
.unwrap()
393-
.clone()
394-
}
395-
}
396-
397337
/// PlanContext wraps a [`SnapshotRef`] alongside all the other
398338
/// objects that are required to perform a scan file plan.
399339
#[derive(Debug)]
@@ -407,7 +347,6 @@ struct PlanContext {
407347
snapshot_bound_predicate: Option<Arc<BoundPredicate>>,
408348
object_cache: Arc<ObjectCache>,
409349
field_ids: Arc<Vec<i32>>,
410-
delete_file_manager: Arc<DeleteFileManager>,
411350

412351
partition_filter_cache: Arc<PartitionFilterCache>,
413352
manifest_evaluator_cache: Arc<ManifestEvaluatorCache>,
@@ -425,10 +364,13 @@ impl TableScan {
425364
channel(concurrency_limit_manifest_files);
426365
let (manifest_entry_delete_ctx_tx, manifest_entry_delete_ctx_rx) =
427366
channel(concurrency_limit_manifest_files);
367+
428368
// used to stream the results back to the caller
429369
let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);
370+
430371
// used to stream delete files into the DeleteFileManager
431372
let (delete_file_tx, delete_file_rx) = channel(concurrency_limit_manifest_entries);
373+
let delete_file_manager = Arc::new(DeleteFileManager::from_receiver(delete_file_rx));
432374

433375
let manifest_list = self.plan_context.get_manifest_list().await?;
434376

@@ -439,7 +381,7 @@ impl TableScan {
439381
manifest_list,
440382
manifest_entry_data_ctx_tx,
441383
manifest_entry_delete_ctx_tx,
442-
self.plan_context.delete_file_manager.clone(),
384+
delete_file_manager.clone(),
443385
)?;
444386

445387
let mut channel_for_manifest_error = file_scan_task_tx.clone();
@@ -483,11 +425,6 @@ impl TableScan {
483425
})
484426
.await;
485427

486-
self.plan_context
487-
.delete_file_manager
488-
.handle_delete_file_stream(delete_file_rx)
489-
.await?;
490-
491428
// Process the data file [`ManifestEntry`] stream in parallel
492429
spawn(async move {
493430
let result = manifest_entry_data_ctx_rx
@@ -586,7 +523,7 @@ impl TableScan {
586523
// entire plan without getting filtered out. Create a corresponding
587524
// FileScanTask and push it to the result stream
588525
file_scan_task_tx
589-
.send(Ok(manifest_entry_context.into_file_scan_task().await))
526+
.send(Ok(manifest_entry_context.into_file_scan_task().await?))
590527
.await?;
591528

592529
Ok(())
@@ -715,8 +652,13 @@ impl ManifestFileContext {
715652
impl ManifestEntryContext {
716653
/// consume this `ManifestEntryContext`, returning a `FileScanTask`
717654
/// created from it
718-
async fn into_file_scan_task(self) -> FileScanTask {
719-
FileScanTask {
655+
async fn into_file_scan_task(self) -> Result<FileScanTask> {
656+
let deletes = self
657+
.delete_file_manager
658+
.get_deletes_for_data_file(self.manifest_entry.data_file())
659+
.await?;
660+
661+
Ok(FileScanTask {
720662
start: 0,
721663
length: self.manifest_entry.file_size_in_bytes(),
722664
record_count: Some(self.manifest_entry.record_count()),
@@ -731,11 +673,8 @@ impl ManifestEntryContext {
731673
.bound_predicates
732674
.map(|x| x.as_ref().snapshot_bound_predicate.clone()),
733675

734-
deletes: self
735-
.delete_file_manager
736-
.get_deletes_for_data_file(self.manifest_entry.data_file())
737-
.await,
738-
}
676+
deletes,
677+
})
739678
}
740679
}
741680

@@ -1138,6 +1077,82 @@ impl FileScanTask {
11381077
}
11391078
}
11401079

1080+
type DeleteFileManagerResult = Result<Option<Arc<Vec<FileScanTaskDeleteFile>>>>;
1081+
1082+
/// Manages async retrieval of all the delete files from FileIO that are
1083+
/// applicable to the scan. Provides references to them for inclusion within FileScanTasks
1084+
#[derive(Debug, Clone)]
1085+
struct DeleteFileManager {
1086+
files: Arc<RwLock<Option<DeleteFileManagerResult>>>,
1087+
}
1088+
1089+
#[derive(Debug, Clone)]
1090+
struct DeleteFileManagerFuture {
1091+
files: Arc<RwLock<Option<DeleteFileManagerResult>>>,
1092+
}
1093+
1094+
impl Future for DeleteFileManagerFuture {
1095+
type Output = DeleteFileManagerResult;
1096+
1097+
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
1098+
let Ok(guard) = self.files.try_read() else {
1099+
return Poll::Pending;
1100+
};
1101+
1102+
if let Some(value) = guard.as_ref() {
1103+
Poll::Ready(match value.as_ref() {
1104+
Ok(deletes) => Ok(deletes.clone()),
1105+
Err(err) => Err(Error::new(err.kind(), err.message())),
1106+
})
1107+
} else {
1108+
Poll::Pending
1109+
}
1110+
}
1111+
}
1112+
1113+
impl DeleteFileManager {
1114+
pub(crate) fn from_receiver(receiver: Receiver<Result<FileScanTaskDeleteFile>>) -> Self {
1115+
let delete_file_stream = receiver.boxed();
1116+
let files = Arc::new(RwLock::new(None));
1117+
1118+
spawn({
1119+
let files = files.clone();
1120+
async move {
1121+
let _ = spawn(async move {
1122+
let result = delete_file_stream.try_collect::<Vec<_>>().await;
1123+
let result = result.map(|files| {
1124+
if files.is_empty() {
1125+
None
1126+
} else {
1127+
Some(Arc::new(files))
1128+
}
1129+
});
1130+
1131+
// Unwrap is ok here since this is the only place where a write lock
1132+
// can be acquired, so the lock can't already have been poisoned
1133+
let mut guard = files.write().unwrap();
1134+
*guard = Some(result);
1135+
})
1136+
.await;
1137+
}
1138+
});
1139+
1140+
DeleteFileManager { files }
1141+
}
1142+
1143+
pub(crate) fn get_deletes_for_data_file(
1144+
&self,
1145+
_data_file: &DataFile,
1146+
) -> DeleteFileManagerFuture {
1147+
// TODO: in the future we may want to filter out delete files
1148+
// that are not applicable to the DataFile?
1149+
1150+
DeleteFileManagerFuture {
1151+
files: self.files.clone(),
1152+
}
1153+
}
1154+
}
1155+
11411156
#[cfg(test)]
11421157
mod tests {
11431158
use std::collections::HashMap;

0 commit comments

Comments
 (0)