Skip to content

Commit 8cfc923

Browse files
committed
refactor: DeleteFileIndex cloneable, with an awaitable method to get deletes for data file.
1 parent 7c1bd09 commit 8cfc923

File tree

2 files changed

+101
-66
lines changed

2 files changed

+101
-66
lines changed

crates/iceberg/src/delete_file_index.rs

Lines changed: 81 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,48 +16,81 @@
1616
// under the License.
1717

1818
use std::collections::HashMap;
19-
use std::sync::Arc;
19+
use std::future::Future;
20+
use std::ops::Deref;
21+
use std::pin::Pin;
22+
use std::sync::{Arc, RwLock};
23+
use std::task::{Context, Poll};
2024

21-
use futures::channel::mpsc;
22-
use futures::{StreamExt, TryStreamExt};
23-
use tokio::sync::watch;
25+
use futures::channel::mpsc::{channel, Sender};
26+
use futures::StreamExt;
2427

2528
use crate::runtime::spawn;
2629
use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile};
2730
use crate::spec::{DataContentType, DataFile, Struct};
28-
use crate::Result;
2931

30-
type DeleteFileIndexRef = Arc<Result<DeleteFileIndex>>;
31-
pub(crate) type DeleteFileIndexRefReceiver = watch::Receiver<Option<DeleteFileIndexRef>>;
32+
#[derive(Debug)]
33+
enum DeleteFileIndexState {
34+
Populating,
35+
Populated(PopulatedDeleteFileIndex),
36+
}
3237

33-
/// Index of delete files
3438
#[derive(Debug)]
35-
pub(crate) struct DeleteFileIndex {
39+
struct PopulatedDeleteFileIndex {
3640
#[allow(dead_code)]
3741
global_deletes: Vec<Arc<DeleteFileContext>>,
3842
eq_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>,
3943
pos_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>,
4044
pos_deletes_by_path: HashMap<String, Vec<Arc<DeleteFileContext>>>,
4145
}
4246

47+
/// Index of delete files
48+
#[derive(Clone, Debug)]
49+
pub(crate) struct DeleteFileIndex {
50+
state: Arc<RwLock<DeleteFileIndexState>>,
51+
}
52+
4353
impl DeleteFileIndex {
44-
pub(crate) fn from_del_file_chan(
45-
receiver: mpsc::Receiver<Result<DeleteFileContext>>,
46-
) -> watch::Receiver<Option<DeleteFileIndexRef>> {
47-
let (tx, rx) = watch::channel(None);
48-
49-
let delete_file_stream = receiver.boxed();
50-
spawn(async move {
51-
let delete_files = delete_file_stream.try_collect::<Vec<_>>().await;
52-
let delete_file_index = delete_files.map(DeleteFileIndex::from_delete_files);
53-
let delete_file_index = Arc::new(delete_file_index);
54-
tx.send(Some(delete_file_index))
54+
/// create a new `DeleteFileIndex` along with the sender that populates it with delete files
55+
pub(crate) fn new() -> (DeleteFileIndex, Sender<DeleteFileContext>) {
56+
// TODO: what should the channel limit be?
57+
let (tx, rx) = channel(10);
58+
let state = Arc::new(RwLock::new(DeleteFileIndexState::Populating));
59+
let delete_file_stream = rx.boxed();
60+
61+
spawn({
62+
let state = state.clone();
63+
async move {
64+
let delete_files = delete_file_stream.collect::<Vec<_>>().await;
65+
66+
let populated_delete_file_index = PopulatedDeleteFileIndex::new(delete_files);
67+
68+
let mut guard = state.write().unwrap();
69+
*guard = DeleteFileIndexState::Populated(populated_delete_file_index);
70+
}
5571
});
5672

57-
rx
73+
(DeleteFileIndex { state }, tx)
74+
}
75+
76+
/// Gets all the delete files that apply to the specified data file.
77+
///
78+
/// Returns a future that resolves to a Vec<FileScanTaskDeleteFile>
79+
pub(crate) fn get_deletes_for_data_file<'a>(
80+
&self,
81+
data_file: &'a DataFile,
82+
seq_num: Option<i64>,
83+
) -> DeletesForDataFile<'a> {
84+
DeletesForDataFile {
85+
state: self.state.clone(),
86+
data_file,
87+
seq_num,
88+
}
5889
}
90+
}
5991

60-
fn from_delete_files(files: Vec<DeleteFileContext>) -> Self {
92+
impl PopulatedDeleteFileIndex {
93+
fn new(files: Vec<DeleteFileContext>) -> PopulatedDeleteFileIndex {
6194
let mut eq_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>> =
6295
HashMap::default();
6396
let mut pos_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>> =
@@ -111,7 +144,7 @@ impl DeleteFileIndex {
111144
}
112145
});
113146

114-
DeleteFileIndex {
147+
PopulatedDeleteFileIndex {
115148
global_deletes: vec![],
116149
eq_deletes_by_partition,
117150
pos_deletes_by_partition,
@@ -120,7 +153,7 @@ impl DeleteFileIndex {
120153
}
121154

122155
/// Determine all the delete files that apply to the provided `DataFile`.
123-
pub(crate) fn get_deletes_for_data_file(
156+
fn get_deletes_for_data_file(
124157
&self,
125158
data_file: &DataFile,
126159
seq_num: Option<i64>,
@@ -158,3 +191,27 @@ impl DeleteFileIndex {
158191
results
159192
}
160193
}
194+
195+
/// Future for the `DeleteFileIndex::get_deletes_for_data_file` method
196+
pub(crate) struct DeletesForDataFile<'a> {
197+
state: Arc<RwLock<DeleteFileIndexState>>,
198+
data_file: &'a DataFile,
199+
seq_num: Option<i64>,
200+
}
201+
202+
impl Future for DeletesForDataFile<'_> {
203+
type Output = Vec<FileScanTaskDeleteFile>;
204+
205+
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
206+
let Ok(guard) = self.state.try_read() else {
207+
return Poll::Pending;
208+
};
209+
210+
match guard.deref() {
211+
DeleteFileIndexState::Populated(idx) => {
212+
Poll::Ready(idx.get_deletes_for_data_file(self.data_file, self.seq_num))
213+
}
214+
_ => Poll::Pending,
215+
}
216+
}
217+
}

crates/iceberg/src/scan.rs

Lines changed: 20 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use futures::{SinkExt, StreamExt, TryFutureExt, TryStreamExt};
2727
use serde::{Deserialize, Serialize};
2828

2929
use crate::arrow::ArrowReaderBuilder;
30-
use crate::delete_file_index::{DeleteFileIndex, DeleteFileIndexRefReceiver};
30+
use crate::delete_file_index::DeleteFileIndex;
3131
use crate::expr::visitors::expression_evaluator::ExpressionEvaluator;
3232
use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator;
3333
use crate::expr::visitors::inclusive_projection::InclusiveProjection;
@@ -385,18 +385,12 @@ impl TableScan {
385385
// used to stream the results back to the caller
386386
let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);
387387

388-
// DeleteFileIndexRefReceiver is a watch channel receiver that will
389-
// be notified when the DeleteFileIndex is ready.
390-
let delete_file_idx_and_tx: Option<(
391-
DeleteFileIndexRefReceiver,
392-
Sender<Result<DeleteFileContext>>,
393-
)> = if self.delete_file_processing_enabled {
394-
let (delete_file_tx, delete_file_rx) = channel(concurrency_limit_manifest_entries);
395-
let delete_file_index_rx = DeleteFileIndex::from_del_file_chan(delete_file_rx);
396-
Some((delete_file_index_rx, delete_file_tx))
397-
} else {
398-
None
399-
};
388+
let delete_file_idx_and_tx: Option<(DeleteFileIndex, Sender<DeleteFileContext>)> =
389+
if self.delete_file_processing_enabled {
390+
Some(DeleteFileIndex::new())
391+
} else {
392+
None
393+
};
400394

401395
let manifest_list = self.plan_context.get_manifest_list().await?;
402396

@@ -565,7 +559,7 @@ impl TableScan {
565559

566560
async fn process_delete_manifest_entry(
567561
manifest_entry_context: ManifestEntryContext,
568-
mut delete_file_ctx_tx: Sender<Result<DeleteFileContext>>,
562+
mut delete_file_ctx_tx: Sender<DeleteFileContext>,
569563
) -> Result<()> {
570564
// skip processing this manifest entry if it has been marked as deleted
571565
if !manifest_entry_context.manifest_entry.is_alive() {
@@ -597,10 +591,10 @@ impl TableScan {
597591
}
598592

599593
delete_file_ctx_tx
600-
.send(Ok(DeleteFileContext {
594+
.send(DeleteFileContext {
601595
manifest_entry: manifest_entry_context.manifest_entry.clone(),
602596
partition_spec_id: manifest_entry_context.partition_spec_id,
603-
}))
597+
})
604598
.await?;
605599

606600
Ok(())
@@ -624,7 +618,7 @@ struct ManifestFileContext {
624618
object_cache: Arc<ObjectCache>,
625619
snapshot_schema: SchemaRef,
626620
expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
627-
delete_file_index: Option<DeleteFileIndexRefReceiver>,
621+
delete_file_index: Option<DeleteFileIndex>,
628622
}
629623

630624
/// Wraps a [`ManifestEntryRef`] alongside the objects that are needed
@@ -637,7 +631,7 @@ struct ManifestEntryContext {
637631
bound_predicates: Option<Arc<BoundPredicates>>,
638632
partition_spec_id: i32,
639633
snapshot_schema: SchemaRef,
640-
delete_file_index: Option<DeleteFileIndexRefReceiver>,
634+
delete_file_index: Option<DeleteFileIndex>,
641635
}
642636

643637
impl ManifestFileContext {
@@ -684,29 +678,13 @@ impl ManifestEntryContext {
684678
/// consume this `ManifestEntryContext`, returning a `FileScanTask`
685679
/// created from it
686680
async fn into_file_scan_task(self) -> Result<FileScanTask> {
687-
// let deletes = self.get_deletes().await?;
688-
689-
let deletes = if let Some(mut delete_file_index_rx) = self.delete_file_index {
690-
let del_file_idx_opt = delete_file_index_rx
691-
.wait_for(Option::is_some)
681+
let deletes = if let Some(delete_file_index) = self.delete_file_index {
682+
delete_file_index
683+
.get_deletes_for_data_file(
684+
self.manifest_entry.data_file(),
685+
self.manifest_entry.sequence_number(),
686+
)
692687
.await
693-
.map_err(|_| Error::new(ErrorKind::Unexpected, "DeleteFileIndex recv error"))?;
694-
695-
match del_file_idx_opt.as_ref() {
696-
Some(del_file_idx) => match del_file_idx.as_ref() {
697-
Ok(delete_file_idx) => delete_file_idx.get_deletes_for_data_file(
698-
self.manifest_entry.data_file(),
699-
self.manifest_entry.sequence_number(),
700-
),
701-
Err(err) => {
702-
return Err(Error::new(ErrorKind::Unexpected, err.message()));
703-
}
704-
},
705-
706-
// the `wait_for(Option::is_some)` above means that we can
707-
// never get a `None` here
708-
None => unreachable!(),
709-
}
710688
} else {
711689
vec![]
712690
};
@@ -764,7 +742,7 @@ impl PlanContext {
764742
&self,
765743
manifest_list: Arc<ManifestList>,
766744
tx_data: Sender<ManifestEntryContext>,
767-
delete_file_idx_and_tx: Option<(DeleteFileIndexRefReceiver, Sender<ManifestEntryContext>)>,
745+
delete_file_idx_and_tx: Option<(DeleteFileIndex, Sender<ManifestEntryContext>)>,
768746
) -> Result<Box<impl Iterator<Item = Result<ManifestFileContext>>>> {
769747
let manifest_files = manifest_list.entries().iter();
770748

@@ -823,7 +801,7 @@ impl PlanContext {
823801
manifest_file: &ManifestFile,
824802
partition_filter: Option<Arc<BoundPredicate>>,
825803
sender: Sender<ManifestEntryContext>,
826-
delete_file_index: Option<DeleteFileIndexRefReceiver>,
804+
delete_file_index: Option<DeleteFileIndex>,
827805
) -> ManifestFileContext {
828806
let bound_predicates =
829807
if let (Some(ref partition_bound_predicate), Some(snapshot_bound_predicate)) =

0 commit comments

Comments
 (0)