Skip to content

Commit 6f2ce71

Browse files
sddFokko
andauthored
Support {Positional,Equality}Deletes (#652)
This PR adds support for handling of both positional and equality delete files within table scans. The approach taken is to include a list of delete file paths in every `FileScanTask`. At the moment it is assumed that this list refers to delete files that **may** apply to the data file in the scan, rather than having been filtered to only contain delete files that definitely do apply to this data file. Further optimisation of `plan_files` is expected in the future to ensure that this list is pre-filtered before being included in the FileScanTask. This PR has been refactored so that this work can be split into multiple PRs. This PR now contains solely the scan plan changes. --------- Co-authored-by: Fokko Driesprong <[email protected]>
1 parent 6e2ef32 commit 6f2ce71

File tree

6 files changed

+479
-60
lines changed

6 files changed

+479
-60
lines changed

crates/iceberg/src/arrow/reader.rs

+16-4
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,14 @@ impl ArrowReader {
163163
row_group_filtering_enabled: bool,
164164
row_selection_enabled: bool,
165165
) -> Result<ArrowRecordBatchStream> {
166+
// TODO: add support for delete files
167+
if !task.deletes.is_empty() {
168+
return Err(Error::new(
169+
ErrorKind::FeatureUnsupported,
170+
"Delete files are not yet supported",
171+
));
172+
}
173+
166174
// Get the metadata for the Parquet file we need to read and build
167175
// a reader for the data within
168176
let parquet_file = file_io.new_input(&task.data_file_path)?;
@@ -751,10 +759,14 @@ impl PredicateConverter<'_> {
751759
let index = self
752760
.column_indices
753761
.iter()
754-
.position(|&idx| idx == *column_idx).ok_or(Error::new(ErrorKind::DataInvalid, format!(
755-
"Leave column `{}` in predicates cannot be found in the required column indices.",
756-
reference.field().name
757-
)))?;
762+
.position(|&idx| idx == *column_idx)
763+
.ok_or(Error::new(
764+
ErrorKind::DataInvalid,
765+
format!(
766+
"Leave column `{}` in predicates cannot be found in the required column indices.",
767+
reference.field().name
768+
),
769+
))?;
758770

759771
Ok(Some(index))
760772
} else {
+211
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::collections::HashMap;
19+
use std::future::Future;
20+
use std::ops::Deref;
21+
use std::pin::Pin;
22+
use std::sync::{Arc, RwLock};
23+
use std::task::{Context, Poll};
24+
25+
use futures::channel::mpsc::{channel, Sender};
26+
use futures::StreamExt;
27+
28+
use crate::runtime::spawn;
29+
use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile};
30+
use crate::spec::{DataContentType, DataFile, Struct};
31+
use crate::{Error, ErrorKind, Result};
32+
33+
/// Index of delete files
34+
#[derive(Clone, Debug)]
35+
pub(crate) struct DeleteFileIndex {
36+
state: Arc<RwLock<DeleteFileIndexState>>,
37+
}
38+
39+
#[derive(Debug)]
40+
enum DeleteFileIndexState {
41+
Populating,
42+
Populated(PopulatedDeleteFileIndex),
43+
}
44+
45+
#[derive(Debug)]
46+
struct PopulatedDeleteFileIndex {
47+
#[allow(dead_code)]
48+
global_deletes: Vec<Arc<DeleteFileContext>>,
49+
eq_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>,
50+
pos_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>,
51+
// TODO: do we need this?
52+
// pos_deletes_by_path: HashMap<String, Vec<Arc<DeleteFileContext>>>,
53+
54+
// TODO: Deletion Vector support
55+
}
56+
57+
impl DeleteFileIndex {
58+
/// create a new `DeleteFileIndex` along with the sender that populates it with delete files
59+
pub(crate) fn new() -> (DeleteFileIndex, Sender<DeleteFileContext>) {
60+
// TODO: what should the channel limit be?
61+
let (tx, rx) = channel(10);
62+
let state = Arc::new(RwLock::new(DeleteFileIndexState::Populating));
63+
let delete_file_stream = rx.boxed();
64+
65+
spawn({
66+
let state = state.clone();
67+
async move {
68+
let delete_files = delete_file_stream.collect::<Vec<_>>().await;
69+
70+
let populated_delete_file_index = PopulatedDeleteFileIndex::new(delete_files);
71+
72+
let mut guard = state.write().unwrap();
73+
*guard = DeleteFileIndexState::Populated(populated_delete_file_index);
74+
}
75+
});
76+
77+
(DeleteFileIndex { state }, tx)
78+
}
79+
80+
/// Gets all the delete files that apply to the specified data file.
81+
///
82+
/// Returns a future that resolves to a Result<Vec<FileScanTaskDeleteFile>>
83+
pub(crate) fn get_deletes_for_data_file<'a>(
84+
&self,
85+
data_file: &'a DataFile,
86+
seq_num: Option<i64>,
87+
) -> DeletesForDataFile<'a> {
88+
DeletesForDataFile {
89+
state: self.state.clone(),
90+
data_file,
91+
seq_num,
92+
}
93+
}
94+
}
95+
96+
impl PopulatedDeleteFileIndex {
97+
fn new(files: Vec<DeleteFileContext>) -> PopulatedDeleteFileIndex {
98+
let mut eq_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>> =
99+
HashMap::default();
100+
let mut pos_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>> =
101+
HashMap::default();
102+
103+
let mut global_deletes: Vec<Arc<DeleteFileContext>> = vec![];
104+
105+
files.into_iter().for_each(|ctx| {
106+
let arc_ctx = Arc::new(ctx);
107+
108+
let partition = arc_ctx.manifest_entry.data_file().partition();
109+
110+
// The spec states that "Equality delete files stored with an unpartitioned spec are applied as global deletes".
111+
if partition.fields().is_empty() {
112+
// TODO: confirm we're good to skip here if we encounter a pos del
113+
if arc_ctx.manifest_entry.content_type() != DataContentType::PositionDeletes {
114+
global_deletes.push(arc_ctx);
115+
return;
116+
}
117+
}
118+
119+
let destination_map = match arc_ctx.manifest_entry.content_type() {
120+
DataContentType::PositionDeletes => &mut pos_deletes_by_partition,
121+
DataContentType::EqualityDeletes => &mut eq_deletes_by_partition,
122+
_ => unreachable!(),
123+
};
124+
125+
destination_map
126+
.entry(partition.clone())
127+
.and_modify(|entry| {
128+
entry.push(arc_ctx.clone());
129+
})
130+
.or_insert(vec![arc_ctx.clone()]);
131+
});
132+
133+
PopulatedDeleteFileIndex {
134+
global_deletes,
135+
eq_deletes_by_partition,
136+
pos_deletes_by_partition,
137+
}
138+
}
139+
140+
/// Determine all the delete files that apply to the provided `DataFile`.
141+
fn get_deletes_for_data_file(
142+
&self,
143+
data_file: &DataFile,
144+
seq_num: Option<i64>,
145+
) -> Vec<FileScanTaskDeleteFile> {
146+
let mut results = vec![];
147+
148+
self.global_deletes
149+
.iter()
150+
// filter that returns true if the provided delete file's sequence number is **greater than or equal to** `seq_num`
151+
.filter(|&delete| {
152+
seq_num
153+
.map(|seq_num| delete.manifest_entry.sequence_number() >= Some(seq_num))
154+
.unwrap_or_else(|| true)
155+
})
156+
.for_each(|delete| results.push(delete.as_ref().into()));
157+
158+
if let Some(deletes) = self.eq_deletes_by_partition.get(data_file.partition()) {
159+
deletes
160+
.iter()
161+
// filter that returns true if the provided delete file's sequence number is **greater than or equal to** `seq_num`
162+
.filter(|&delete| {
163+
seq_num
164+
.map(|seq_num| delete.manifest_entry.sequence_number() >= Some(seq_num))
165+
.unwrap_or_else(|| true)
166+
})
167+
.for_each(|delete| results.push(delete.as_ref().into()));
168+
}
169+
170+
// TODO: the spec states that:
171+
// "The data file's file_path is equal to the delete file's referenced_data_file if it is non-null".
172+
// we're not yet doing that here. The referenced data file's name will also be present in the positional
173+
// delete file's file path column.
174+
if let Some(deletes) = self.pos_deletes_by_partition.get(data_file.partition()) {
175+
deletes
176+
.iter()
177+
// filter that returns true if the provided delete file's sequence number is **greater thano** `seq_num`
178+
.filter(|&delete| {
179+
seq_num
180+
.map(|seq_num| delete.manifest_entry.sequence_number() > Some(seq_num))
181+
.unwrap_or_else(|| true)
182+
})
183+
.for_each(|delete| results.push(delete.as_ref().into()));
184+
}
185+
186+
results
187+
}
188+
}
189+
190+
/// Future for the `DeleteFileIndex::get_deletes_for_data_file` method
191+
pub(crate) struct DeletesForDataFile<'a> {
192+
state: Arc<RwLock<DeleteFileIndexState>>,
193+
data_file: &'a DataFile,
194+
seq_num: Option<i64>,
195+
}
196+
197+
impl Future for DeletesForDataFile<'_> {
198+
type Output = Result<Vec<FileScanTaskDeleteFile>>;
199+
200+
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
201+
match self.state.try_read() {
202+
Ok(guard) => match guard.deref() {
203+
DeleteFileIndexState::Populated(idx) => Poll::Ready(Ok(
204+
idx.get_deletes_for_data_file(self.data_file, self.seq_num)
205+
)),
206+
_ => Poll::Pending,
207+
},
208+
Err(err) => Poll::Ready(Err(Error::new(ErrorKind::Unexpected, err.to_string()))),
209+
}
210+
}
211+
}

crates/iceberg/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ pub mod transform;
8383
mod runtime;
8484

8585
pub mod arrow;
86+
pub(crate) mod delete_file_index;
8687
mod utils;
8788
pub mod writer;
8889

0 commit comments

Comments
 (0)