Skip to content

Commit f821e4f

Browse files
committed
feat: add positional delete parsing. Add tests for end-to-end positional delete functionality
1 parent 8f273f3 commit f821e4f

File tree

5 files changed

+144
-161
lines changed

5 files changed

+144
-161
lines changed

crates/iceberg/src/arrow/caching_delete_file_loader.rs

Lines changed: 56 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
use std::collections::HashMap;
1919

20+
use arrow_array::{Int64Array, StringArray};
2021
use futures::channel::oneshot;
2122
use futures::future::join_all;
2223
use futures::{StreamExt, TryStreamExt};
@@ -283,18 +284,51 @@ impl CachingDeleteFileLoader {
283284
}
284285
}
285286

287+
/// Parses a record batch stream coming from positional delete files
288+
///
289+
/// Returns a map of data file path to a delete vector
286290
/// Parses a record batch stream coming from positional delete files
287291
///
288292
/// Returns a map of data file path to a delete vector
289293
async fn parse_positional_deletes_record_batch_stream(
290-
stream: ArrowRecordBatchStream,
294+
mut stream: ArrowRecordBatchStream,
291295
) -> Result<HashMap<String, DeleteVector>> {
292-
// TODO
296+
let mut result: HashMap<String, DeleteVector> = HashMap::default();
297+
298+
while let Some(batch) = stream.next().await {
299+
let batch = batch?;
300+
let schema = batch.schema();
301+
let columns = batch.columns();
302+
303+
let Some(file_paths) = columns[0].as_any().downcast_ref::<StringArray>() else {
304+
return Err(Error::new(
305+
ErrorKind::DataInvalid,
306+
"Could not downcast file paths array to StringArray",
307+
));
308+
};
309+
let Some(positions) = columns[1].as_any().downcast_ref::<Int64Array>() else {
310+
return Err(Error::new(
311+
ErrorKind::DataInvalid,
312+
"Could not downcast positions array to Int64Array",
313+
));
314+
};
315+
316+
for (file_path, pos) in file_paths.iter().zip(positions.iter()) {
317+
let (Some(file_path), Some(pos)) = (file_path, pos) else {
318+
return Err(Error::new(
319+
ErrorKind::DataInvalid,
320+
"null values in delete file",
321+
));
322+
};
293323

294-
Err(Error::new(
295-
ErrorKind::FeatureUnsupported,
296-
"parsing of positional deletes is not yet supported",
297-
))
324+
result
325+
.entry(file_path.to_string())
326+
.or_default()
327+
.insert(pos as u64);
328+
}
329+
}
330+
331+
Ok(result)
298332
}
299333

300334
/// Parses record batch streams from individual equality delete files
@@ -317,28 +351,37 @@ mod tests {
317351
use tempfile::TempDir;
318352

319353
use super::*;
320-
use crate::arrow::delete_file_loader::tests::setup;
354+
use crate::arrow::delete_filter::tests::setup;
321355

322356
#[tokio::test]
323-
async fn test_delete_file_manager_load_deletes() {
357+
async fn test_caching_delete_file_loader_load_deletes() {
324358
let tmp_dir = TempDir::new().unwrap();
325359
let table_location = tmp_dir.path();
326360
let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
327361
.unwrap()
328362
.build()
329363
.unwrap();
330364

331-
// Note that with the delete file parsing not yet in place, all we can test here is that
332-
// the call to the loader fails with the expected FeatureUnsupportedError.
333-
let delete_file_manager = CachingDeleteFileLoader::new(file_io.clone(), 10);
365+
let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10);
334366

335367
let file_scan_tasks = setup(table_location);
336368

337-
let result = delete_file_manager
369+
let delete_filter = delete_file_loader
338370
.load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref())
339371
.await
372+
.unwrap()
340373
.unwrap();
341374

342-
assert!(result.is_err_and(|e| e.kind() == ErrorKind::FeatureUnsupported));
375+
let result = delete_filter
376+
.get_delete_vector(&file_scan_tasks[0])
377+
.unwrap();
378+
379+
// union of pos dels from pos del file 1 and 2, ie
380+
// [0, 1, 3, 5, 6, 8, 1022, 1023] | [0, 1, 3, 5, 20, 21, 22, 23]
381+
// = [0, 1, 3, 5, 6, 8, 20, 21, 22, 23, 1022, 1023]
382+
assert_eq!(result.lock().unwrap().len(), 12);
383+
384+
let result = delete_filter.get_delete_vector(&file_scan_tasks[1]);
385+
assert!(result.is_none()); // no pos dels for file 3
343386
}
344387
}

crates/iceberg/src/arrow/delete_file_loader.rs

Lines changed: 2 additions & 131 deletions
Original file line numberDiff line numberDiff line change
@@ -110,27 +110,11 @@ impl DeleteFileLoader for BasicDeleteFileLoader {
110110
}
111111

112112
#[cfg(test)]
113-
pub(crate) mod tests {
114-
use std::collections::HashMap;
115-
use std::fs::File;
116-
use std::path::Path;
117-
use std::sync::Arc;
118-
119-
use arrow_array::{Int64Array, RecordBatch, StringArray};
120-
use arrow_schema::Schema as ArrowSchema;
121-
use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
122-
use parquet::basic::Compression;
123-
use parquet::file::properties::WriterProperties;
113+
mod tests {
124114
use tempfile::TempDir;
125115

126116
use super::*;
127-
use crate::scan::FileScanTask;
128-
use crate::spec::{DataContentType, DataFileFormat, Schema};
129-
130-
type ArrowSchemaRef = Arc<ArrowSchema>;
131-
132-
const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
133-
const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
117+
use crate::arrow::delete_filter::tests::setup;
134118

135119
#[tokio::test]
136120
async fn test_basic_delete_file_loader_read_delete_file() {
@@ -141,8 +125,6 @@ pub(crate) mod tests {
141125
.build()
142126
.unwrap();
143127

144-
// Note that with the delete file parsing not yet in place, all we can test here is that
145-
// the call to the loader fails with the expected FeatureUnsupportedError.
146128
let delete_file_loader = BasicDeleteFileLoader::new(file_io.clone());
147129

148130
let file_scan_tasks = setup(table_location);
@@ -159,115 +141,4 @@ pub(crate) mod tests {
159141

160142
assert_eq!(result.len(), 1);
161143
}
162-
163-
pub(crate) fn setup(table_location: &Path) -> Vec<FileScanTask> {
164-
let data_file_schema = Arc::new(Schema::builder().build().unwrap());
165-
let positional_delete_schema = create_pos_del_schema();
166-
167-
let file_path_values = vec![format!("{}/1.parquet", table_location.to_str().unwrap()); 8];
168-
let pos_values = vec![0, 1, 3, 5, 6, 8, 1022, 1023];
169-
170-
let file_path_col = Arc::new(StringArray::from_iter_values(file_path_values));
171-
let pos_col = Arc::new(Int64Array::from_iter_values(pos_values));
172-
173-
let props = WriterProperties::builder()
174-
.set_compression(Compression::SNAPPY)
175-
.build();
176-
177-
for n in 1..=3 {
178-
let positional_deletes_to_write =
179-
RecordBatch::try_new(positional_delete_schema.clone(), vec![
180-
file_path_col.clone(),
181-
pos_col.clone(),
182-
])
183-
.unwrap();
184-
185-
let file = File::create(format!(
186-
"{}/pos-del-{}.parquet",
187-
table_location.to_str().unwrap(),
188-
n
189-
))
190-
.unwrap();
191-
let mut writer = ArrowWriter::try_new(
192-
file,
193-
positional_deletes_to_write.schema(),
194-
Some(props.clone()),
195-
)
196-
.unwrap();
197-
198-
writer
199-
.write(&positional_deletes_to_write)
200-
.expect("Writing batch");
201-
202-
// writer must be closed to write footer
203-
writer.close().unwrap();
204-
}
205-
206-
let pos_del_1 = FileScanTaskDeleteFile {
207-
file_path: format!("{}/pos-del-1.parquet", table_location.to_str().unwrap()),
208-
file_type: DataContentType::PositionDeletes,
209-
partition_spec_id: 0,
210-
equality_ids: vec![],
211-
};
212-
213-
let pos_del_2 = FileScanTaskDeleteFile {
214-
file_path: format!("{}/pos-del-2.parquet", table_location.to_str().unwrap()),
215-
file_type: DataContentType::PositionDeletes,
216-
partition_spec_id: 0,
217-
equality_ids: vec![],
218-
};
219-
220-
let pos_del_3 = FileScanTaskDeleteFile {
221-
file_path: format!("{}/pos-del-3.parquet", table_location.to_str().unwrap()),
222-
file_type: DataContentType::PositionDeletes,
223-
partition_spec_id: 0,
224-
equality_ids: vec![],
225-
};
226-
227-
let file_scan_tasks = vec![
228-
FileScanTask {
229-
start: 0,
230-
length: 0,
231-
record_count: None,
232-
data_file_path: "".to_string(),
233-
data_file_content: DataContentType::Data,
234-
data_file_format: DataFileFormat::Parquet,
235-
schema: data_file_schema.clone(),
236-
project_field_ids: vec![],
237-
predicate: None,
238-
deletes: vec![pos_del_1, pos_del_2.clone()],
239-
},
240-
FileScanTask {
241-
start: 0,
242-
length: 0,
243-
record_count: None,
244-
data_file_path: "".to_string(),
245-
data_file_content: DataContentType::Data,
246-
data_file_format: DataFileFormat::Parquet,
247-
schema: data_file_schema.clone(),
248-
project_field_ids: vec![],
249-
predicate: None,
250-
deletes: vec![pos_del_2, pos_del_3],
251-
},
252-
];
253-
254-
file_scan_tasks
255-
}
256-
257-
pub(crate) fn create_pos_del_schema() -> ArrowSchemaRef {
258-
let fields = vec![
259-
arrow_schema::Field::new("file_path", arrow_schema::DataType::Utf8, false)
260-
.with_metadata(HashMap::from([(
261-
PARQUET_FIELD_ID_META_KEY.to_string(),
262-
FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
263-
)])),
264-
arrow_schema::Field::new("pos", arrow_schema::DataType::Int64, false).with_metadata(
265-
HashMap::from([(
266-
PARQUET_FIELD_ID_META_KEY.to_string(),
267-
FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
268-
)]),
269-
),
270-
];
271-
Arc::new(arrow_schema::Schema::new(fields))
272-
}
273144
}

crates/iceberg/src/arrow/delete_filter.rs

Lines changed: 41 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ pub(crate) fn is_equality_delete(f: &FileScanTaskDeleteFile) -> bool {
179179
}
180180

181181
#[cfg(test)]
182-
mod tests {
182+
pub(crate) mod tests {
183183
use std::fs::File;
184184
use std::path::Path;
185185
use std::sync::Arc;
@@ -202,43 +202,67 @@ mod tests {
202202
const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
203203

204204
#[tokio::test]
205-
async fn test_delete_file_manager_load_deletes() {
205+
async fn test_delete_file_filter_load_deletes() {
206206
let tmp_dir = TempDir::new().unwrap();
207207
let table_location = tmp_dir.path();
208208
let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
209209
.unwrap()
210210
.build()
211211
.unwrap();
212212

213-
// Note that with the delete file parsing not yet in place, all we can test here is that
214-
// the call to the loader fails with the expected FeatureUnsupportedError.
215-
let delete_file_manager = CachingDeleteFileLoader::new(file_io.clone(), 10);
213+
let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10);
216214

217215
let file_scan_tasks = setup(table_location);
218216

219-
let result = delete_file_manager
217+
let delete_filter = delete_file_loader
220218
.load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref())
221219
.await
220+
.unwrap()
221+
.unwrap();
222+
223+
let result = delete_filter
224+
.get_delete_vector(&file_scan_tasks[0])
225+
.unwrap();
226+
assert_eq!(result.lock().unwrap().len(), 12); // pos dels from pos del file 1 and 2
227+
228+
let delete_filter = delete_file_loader
229+
.load_deletes(&file_scan_tasks[1].deletes, file_scan_tasks[1].schema_ref())
230+
.await
231+
.unwrap()
222232
.unwrap();
223233

224-
assert!(result.is_err_and(|e| e.kind() == ErrorKind::FeatureUnsupported));
234+
let result = delete_filter
235+
.get_delete_vector(&file_scan_tasks[1])
236+
.unwrap();
237+
assert_eq!(result.lock().unwrap().len(), 8); // no pos dels for file 3
225238
}
226239

227-
fn setup(table_location: &Path) -> Vec<FileScanTask> {
240+
pub(crate) fn setup(table_location: &Path) -> Vec<FileScanTask> {
228241
let data_file_schema = Arc::new(Schema::builder().build().unwrap());
229242
let positional_delete_schema = create_pos_del_schema();
230243

231-
let file_path_values = vec![format!("{}/1.parquet", table_location.to_str().unwrap()); 8];
232-
let pos_values = vec![0, 1, 3, 5, 6, 8, 1022, 1023];
233-
234-
let file_path_col = Arc::new(StringArray::from_iter_values(file_path_values));
235-
let pos_col = Arc::new(Int64Array::from_iter_values(pos_values));
244+
let file_path_values = [
245+
vec![format!("{}/1.parquet", table_location.to_str().unwrap()); 8],
246+
vec![format!("{}/1.parquet", table_location.to_str().unwrap()); 8],
247+
vec![format!("{}/2.parquet", table_location.to_str().unwrap()); 8],
248+
];
249+
let pos_values = [
250+
vec![0i64, 1, 3, 5, 6, 8, 1022, 1023],
251+
vec![0i64, 1, 3, 5, 20, 21, 22, 23],
252+
vec![0i64, 1, 3, 5, 6, 8, 1022, 1023],
253+
];
236254

237255
let props = WriterProperties::builder()
238256
.set_compression(Compression::SNAPPY)
239257
.build();
240258

241259
for n in 1..=3 {
260+
let file_path_vals = file_path_values.get(n - 1).unwrap();
261+
let file_path_col = Arc::new(StringArray::from_iter_values(file_path_vals));
262+
263+
let pos_vals = pos_values.get(n - 1).unwrap();
264+
let pos_col = Arc::new(Int64Array::from_iter_values(pos_vals.clone()));
265+
242266
let positional_deletes_to_write =
243267
RecordBatch::try_new(positional_delete_schema.clone(), vec![
244268
file_path_col.clone(),
@@ -293,7 +317,7 @@ mod tests {
293317
start: 0,
294318
length: 0,
295319
record_count: None,
296-
data_file_path: "".to_string(),
320+
data_file_path: format!("{}/1.parquet", table_location.to_str().unwrap()),
297321
data_file_content: DataContentType::Data,
298322
data_file_format: DataFileFormat::Parquet,
299323
schema: data_file_schema.clone(),
@@ -305,20 +329,20 @@ mod tests {
305329
start: 0,
306330
length: 0,
307331
record_count: None,
308-
data_file_path: "".to_string(),
332+
data_file_path: format!("{}/2.parquet", table_location.to_str().unwrap()),
309333
data_file_content: DataContentType::Data,
310334
data_file_format: DataFileFormat::Parquet,
311335
schema: data_file_schema.clone(),
312336
project_field_ids: vec![],
313337
predicate: None,
314-
deletes: vec![pos_del_2, pos_del_3],
338+
deletes: vec![pos_del_3],
315339
},
316340
];
317341

318342
file_scan_tasks
319343
}
320344

321-
fn create_pos_del_schema() -> ArrowSchemaRef {
345+
pub(crate) fn create_pos_del_schema() -> ArrowSchemaRef {
322346
let fields = vec![
323347
arrow_schema::Field::new("file_path", arrow_schema::DataType::Utf8, false)
324348
.with_metadata(HashMap::from([(

0 commit comments

Comments
 (0)