Skip to content

Commit 3804bda

Browse files
committed
refactor: split DeleteFileManager into DeleteFileLoader and DeleteFilter
1 parent 5f6036d commit 3804bda

File tree

5 files changed

+454
-227
lines changed

5 files changed

+454
-227
lines changed

crates/iceberg/src/arrow/delete_file_manager.rs renamed to crates/iceberg/src/arrow/delete_file_loader.rs

+83-193
Original file line numberDiff line numberDiff line change
@@ -16,100 +16,57 @@
1616
// under the License.
1717

1818
use std::collections::HashMap;
19-
use std::future::Future;
20-
use std::pin::Pin;
21-
use std::sync::{Arc, OnceLock, RwLock};
22-
use std::task::{Context, Poll};
19+
use std::sync::Arc;
2320

2421
use futures::channel::oneshot;
2522
use futures::future::join_all;
2623
use futures::{StreamExt, TryStreamExt};
24+
use tokio::sync::oneshot::{channel, Receiver};
2725

26+
use super::delete_filter::{DeleteFilter, EqDelFuture};
2827
use crate::arrow::record_batch_transformer::RecordBatchTransformer;
2928
use crate::arrow::ArrowReader;
3029
use crate::delete_vector::DeleteVector;
31-
use crate::expr::Predicate::AlwaysTrue;
32-
use crate::expr::{Bind, BoundPredicate, Predicate};
30+
use crate::expr::Predicate;
3331
use crate::io::FileIO;
34-
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskDeleteFile};
32+
use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile};
3533
use crate::spec::{DataContentType, Schema, SchemaRef};
3634
use crate::{Error, ErrorKind, Result};
3735

3836
#[allow(unused)]
39-
pub trait DeleteFileManager {
37+
pub trait DeleteFileLoader {
4038
/// Read the delete file referred to in the task
4139
///
42-
/// Returns the raw contents of the delete file as a RecordBatch stream
43-
fn read_delete_file(task: &FileScanTaskDeleteFile) -> Result<ArrowRecordBatchStream>;
40+
/// Returns the contents of the delete file as a RecordBatch stream. Applies schema evolution.
41+
async fn read_delete_file(
42+
&self,
43+
task: &FileScanTaskDeleteFile,
44+
schema: SchemaRef,
45+
) -> Result<ArrowRecordBatchStream>;
4446
}
4547

4648
#[allow(unused)]
4749
#[derive(Clone, Debug)]
48-
pub(crate) struct CachingDeleteFileManager {
50+
pub(crate) struct CachingDeleteFileLoader {
4951
file_io: FileIO,
5052
concurrency_limit_data_files: usize,
51-
state: Arc<RwLock<DeleteFileManagerState>>,
52-
}
53-
54-
impl DeleteFileManager for CachingDeleteFileManager {
55-
fn read_delete_file(_task: &FileScanTaskDeleteFile) -> Result<ArrowRecordBatchStream> {
56-
// TODO, implementation in https://github.com/apache/iceberg-rust/pull/982
57-
58-
Err(Error::new(
59-
ErrorKind::FeatureUnsupported,
60-
"Reading delete files is not yet supported",
61-
))
62-
}
63-
}
64-
// Equality deletes may apply to more than one DataFile in a scan, and so
65-
// the same equality delete file may be present in more than one invocation of
66-
// DeleteFileManager::load_deletes in the same scan. We want to deduplicate these
67-
// to avoid having to load them twice, so we immediately store cloneable futures in the
68-
// state that can be awaited upon to get te EQ deletes. That way we can check to see if
69-
// a load of each Eq delete file is already in progress and avoid starting another one.
70-
#[derive(Debug, Clone)]
71-
struct EqDelFuture {
72-
result: OnceLock<Predicate>,
73-
}
74-
75-
impl EqDelFuture {
76-
pub fn new() -> (oneshot::Sender<Predicate>, Self) {
77-
let (tx, rx) = oneshot::channel();
78-
let result = OnceLock::new();
79-
80-
crate::runtime::spawn({
81-
let result = result.clone();
82-
async move { result.set(rx.await.unwrap()) }
83-
});
84-
85-
(tx, Self { result })
86-
}
53+
del_filter: DeleteFilter,
8754
}
8855

89-
impl Future for EqDelFuture {
90-
type Output = Predicate;
56+
impl DeleteFileLoader for CachingDeleteFileLoader {
57+
async fn read_delete_file(
58+
&self,
59+
task: &FileScanTaskDeleteFile,
60+
schema: SchemaRef,
61+
) -> Result<ArrowRecordBatchStream> {
62+
let raw_batch_stream =
63+
CachingDeleteFileLoader::parquet_to_batch_stream(&task.file_path, self.file_io.clone())
64+
.await?;
9165

92-
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
93-
match self.result.get() {
94-
None => Poll::Pending,
95-
Some(predicate) => Poll::Ready(predicate.clone()),
96-
}
66+
Self::evolve_schema(raw_batch_stream, schema).await
9767
}
9868
}
9969

100-
#[derive(Debug, Default)]
101-
struct DeleteFileManagerState {
102-
// delete vectors and positional deletes get merged when loaded into a single delete vector
103-
// per data file
104-
delete_vectors: HashMap<String, Arc<RwLock<DeleteVector>>>,
105-
106-
// equality delete files are parsed into unbound `Predicate`s. We store them here as
107-
// cloneable futures (see note below)
108-
equality_deletes: HashMap<String, EqDelFuture>,
109-
}
110-
111-
type StateRef = Arc<RwLock<DeleteFileManagerState>>;
112-
11370
// Intermediate context during processing of a delete file task.
11471
enum DeleteFileContext {
11572
// TODO: Delete Vector loader from Puffin files
@@ -130,12 +87,12 @@ enum ParsedDeleteFileContext {
13087
}
13188

13289
#[allow(unused_variables)]
133-
impl CachingDeleteFileManager {
90+
impl CachingDeleteFileLoader {
13491
pub(crate) fn new(file_io: FileIO, concurrency_limit_data_files: usize) -> Self {
135-
CachingDeleteFileManager {
92+
CachingDeleteFileLoader {
13693
file_io,
13794
concurrency_limit_data_files,
138-
state: Arc::new(Default::default()),
95+
del_filter: DeleteFilter::default(),
13996
}
14097
}
14198

@@ -169,7 +126,7 @@ impl CachingDeleteFileManager {
169126
/// vector maps that resulted from any positional delete or delete vector files into a
170127
/// single map and persist it in the state.
171128
///
172-
///
129+
///
173130
/// Conceptually, the data flow is like this:
174131
/// ```none
175132
/// FileScanTaskDeleteFile
@@ -204,59 +161,74 @@ impl CachingDeleteFileManager {
204161
/// |
205162
/// [join!]
206163
/// ```
207-
pub(crate) async fn load_deletes(
164+
pub(crate) fn load_deletes(
208165
&self,
209166
delete_file_entries: &[FileScanTaskDeleteFile],
210167
schema: SchemaRef,
211-
) -> Result<()> {
168+
) -> Receiver<Result<DeleteFilter>> {
169+
let (tx, rx) = channel();
170+
212171
let stream_items = delete_file_entries
213172
.iter()
214173
.map(|t| {
215174
(
216175
t.clone(),
217176
self.file_io.clone(),
218-
self.state.clone(),
177+
self.del_filter.clone(),
219178
schema.clone(),
220179
)
221180
})
222181
.collect::<Vec<_>>();
223-
// NOTE: removing the collect and just passing the iterator to futures::stream:iter
224-
// results in an error 'implementation of `std::ops::FnOnce` is not general enough'
225-
226-
let task_stream = futures::stream::iter(stream_items.into_iter());
182+
let task_stream = futures::stream::iter(stream_items);
183+
let del_filter = self.del_filter.clone();
184+
let concurrency_limit_data_files = self.concurrency_limit_data_files;
185+
crate::runtime::spawn(async move {
186+
let result = async move {
187+
let mut del_filter = del_filter;
188+
189+
let results: Vec<ParsedDeleteFileContext> = task_stream
190+
.map(move |(task, file_io, del_filter, schema)| async move {
191+
Self::load_file_for_task(&task, file_io, del_filter, schema).await
192+
})
193+
.map(move |ctx| {
194+
Ok(async { Self::parse_file_content_for_task(ctx.await?).await })
195+
})
196+
.try_buffer_unordered(concurrency_limit_data_files)
197+
.try_collect::<Vec<_>>()
198+
.await?;
199+
200+
// wait for all in-progress EQ deletes from other tasks
201+
let _ = join_all(results.iter().filter_map(|i| {
202+
if let ParsedDeleteFileContext::InProgEqDel(fut) = i {
203+
Some(fut.clone())
204+
} else {
205+
None
206+
}
207+
}))
208+
.await;
209+
210+
for item in results {
211+
if let ParsedDeleteFileContext::DelVecs(hash_map) = item {
212+
for (data_file_path, delete_vector) in hash_map.into_iter() {
213+
del_filter.upsert_delete_vector(data_file_path, delete_vector);
214+
}
215+
}
216+
}
227217

228-
let results: Vec<ParsedDeleteFileContext> = task_stream
229-
.map(move |(task, file_io, state_ref, schema)| async {
230-
Self::load_file_for_task(task, file_io, state_ref, schema).await
231-
})
232-
.map(move |ctx| Ok(async { Self::parse_file_content_for_task(ctx.await?).await }))
233-
.try_buffer_unordered(self.concurrency_limit_data_files)
234-
.try_collect::<Vec<_>>()
235-
.await?;
236-
237-
// wait for all in-progress EQ deletes from other tasks
238-
let _ = join_all(results.iter().filter_map(|i| {
239-
if let ParsedDeleteFileContext::InProgEqDel(fut) = i {
240-
Some(fut.clone())
241-
} else {
242-
None
218+
Ok(del_filter)
243219
}
244-
}))
245-
.await;
246-
247-
let merged_delete_vectors = results
248-
.into_iter()
249-
.fold(HashMap::default(), Self::merge_delete_vectors);
220+
.await;
250221

251-
self.state.write().unwrap().delete_vectors = merged_delete_vectors;
222+
let _ = tx.send(result);
223+
});
252224

253-
Ok(())
225+
rx
254226
}
255227

256228
async fn load_file_for_task(
257-
task: FileScanTaskDeleteFile,
229+
task: &FileScanTaskDeleteFile,
258230
file_io: FileIO,
259-
state: StateRef,
231+
del_filter: DeleteFilter,
260232
schema: SchemaRef,
261233
) -> Result<DeleteFileContext> {
262234
match task.file_type {
@@ -266,16 +238,15 @@ impl CachingDeleteFileManager {
266238

267239
DataContentType::EqualityDeletes => {
268240
let sender = {
269-
let mut state = state.write().unwrap();
270-
if let Some(existing) = state.equality_deletes.get(&task.file_path) {
241+
if let Some(existing) = del_filter
242+
.get_equality_delete_predicate_for_delete_file_path(&task.file_path)
243+
{
271244
return Ok(DeleteFileContext::InProgEqDel(existing.clone()));
272245
}
273246

274247
let (sender, fut) = EqDelFuture::new();
275248

276-
state
277-
.equality_deletes
278-
.insert(task.file_path.to_string(), fut);
249+
del_filter.insert_equality_delete(task.file_path.to_string(), fut);
279250

280251
sender
281252
};
@@ -327,23 +298,6 @@ impl CachingDeleteFileManager {
327298
}
328299
}
329300

330-
fn merge_delete_vectors(
331-
mut merged_delete_vectors: HashMap<String, Arc<RwLock<DeleteVector>>>,
332-
item: ParsedDeleteFileContext,
333-
) -> HashMap<String, Arc<RwLock<DeleteVector>>> {
334-
if let ParsedDeleteFileContext::DelVecs(del_vecs) = item {
335-
del_vecs.into_iter().for_each(|(key, val)| {
336-
let entry = merged_delete_vectors.entry(key).or_default();
337-
{
338-
let mut inner = entry.write().unwrap();
339-
(*inner).intersect_assign(&val);
340-
}
341-
});
342-
}
343-
344-
merged_delete_vectors
345-
}
346-
347301
/// Loads a RecordBatchStream for a given datafile.
348302
async fn parquet_to_batch_stream(
349303
data_file_path: &str,
@@ -416,72 +370,6 @@ impl CachingDeleteFileManager {
416370
"parsing of equality deletes is not yet supported",
417371
))
418372
}
419-
420-
/// Builds eq delete predicate for the provided task.
421-
///
422-
/// Must await on load_deletes before calling this.
423-
pub(crate) async fn build_delete_predicate_for_task(
424-
&self,
425-
file_scan_task: &FileScanTask,
426-
) -> Result<Option<BoundPredicate>> {
427-
// * Filter the task's deletes into just the Equality deletes
428-
// * Retrieve the unbound predicate for each from self.state.equality_deletes
429-
// * Logical-AND them all together to get a single combined `Predicate`
430-
// * Bind the predicate to the task's schema to get a `BoundPredicate`
431-
432-
let mut combined_predicate = AlwaysTrue;
433-
for delete in &file_scan_task.deletes {
434-
if !is_equality_delete(delete) {
435-
continue;
436-
}
437-
438-
let predicate = {
439-
let state = self.state.read().unwrap();
440-
441-
let Some(predicate) = state.equality_deletes.get(&delete.file_path) else {
442-
return Err(Error::new(
443-
ErrorKind::Unexpected,
444-
format!(
445-
"Missing predicate for equality delete file '{}'",
446-
delete.file_path
447-
),
448-
));
449-
};
450-
451-
predicate.clone()
452-
};
453-
454-
combined_predicate = combined_predicate.and(predicate.await);
455-
}
456-
457-
if combined_predicate == AlwaysTrue {
458-
return Ok(None);
459-
}
460-
461-
// TODO: handle case-insensitive case
462-
let bound_predicate = combined_predicate.bind(file_scan_task.schema.clone(), false)?;
463-
Ok(Some(bound_predicate))
464-
}
465-
466-
/// Retrieve a delete vector for the data file associated with a given file scan task
467-
///
468-
/// Should only be called after awaiting on load_deletes. Takes the vector to avoid a
469-
/// clone since each item is specific to a single data file and won't need to be used again
470-
pub(crate) fn get_delete_vector_for_task(
471-
&self,
472-
file_scan_task: &FileScanTask,
473-
) -> Option<Arc<RwLock<DeleteVector>>> {
474-
self.state
475-
.write()
476-
.unwrap()
477-
.delete_vectors
478-
.get(file_scan_task.data_file_path())
479-
.map(Clone::clone)
480-
}
481-
}
482-
483-
pub(crate) fn is_equality_delete(f: &FileScanTaskDeleteFile) -> bool {
484-
matches!(f.file_type, DataContentType::EqualityDeletes)
485373
}
486374

487375
#[cfg(test)]
@@ -498,6 +386,7 @@ mod tests {
498386
use tempfile::TempDir;
499387

500388
use super::*;
389+
use crate::scan::FileScanTask;
501390
use crate::spec::{DataFileFormat, Schema};
502391

503392
type ArrowSchemaRef = Arc<ArrowSchema>;
@@ -516,13 +405,14 @@ mod tests {
516405

517406
// Note that with the delete file parsing not yet in place, all we can test here is that
518407
// the call to the loader fails with the expected FeatureUnsupportedError.
519-
let delete_file_manager = CachingDeleteFileManager::new(file_io.clone(), 10);
408+
let delete_file_manager = CachingDeleteFileLoader::new(file_io.clone(), 10);
520409

521410
let file_scan_tasks = setup(table_location);
522411

523412
let result = delete_file_manager
524413
.load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref())
525-
.await;
414+
.await
415+
.unwrap();
526416

527417
assert!(result.is_err_and(|e| e.kind() == ErrorKind::FeatureUnsupported));
528418
}

0 commit comments

Comments
 (0)