From a258d4a0837df1c0940270fc71a643b0870bcc63 Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Wed, 25 Sep 2024 07:57:28 +0100 Subject: [PATCH 01/11] feat: add delete_file_index and populate in table scan --- crates/iceberg/src/arrow/reader.rs | 20 +- crates/iceberg/src/delete_file_index.rs | 110 +++++++ crates/iceberg/src/expr/predicate.rs | 6 + crates/iceberg/src/lib.rs | 2 + crates/iceberg/src/scan.rs | 276 ++++++++++++++---- .../testdata/example_table_metadata_v2.json | 16 +- 6 files changed, 372 insertions(+), 58 deletions(-) create mode 100644 crates/iceberg/src/delete_file_index.rs diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index b4e15821f..7e6df8782 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -47,8 +47,12 @@ use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator; use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator; use crate::expr::{BoundPredicate, BoundReference}; use crate::io::{FileIO, FileMetadata, FileRead}; -use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream}; -use crate::spec::{Datum, PrimitiveType, Schema}; +use crate::scan::{ + ArrowRecordBatchStream, FileScanTask, FileScanTaskStream, +}; +use crate::spec::{ + Datum, PrimitiveType, Schema, +}; use crate::utils::available_parallelism; use crate::{Error, ErrorKind}; @@ -712,10 +716,14 @@ impl PredicateConverter<'_> { let index = self .column_indices .iter() - .position(|&idx| idx == *column_idx).ok_or(Error::new(ErrorKind::DataInvalid, format!( - "Leave column `{}` in predicates cannot be found in the required column indices.", - reference.field().name - )))?; + .position(|&idx| idx == *column_idx) + .ok_or(Error::new( + ErrorKind::DataInvalid, + format!( + "Leave column `{}` in predicates cannot be found in the required column indices.", + reference.field().name + ), + ))?; Ok(Some(index)) } else { diff --git a/crates/iceberg/src/delete_file_index.rs b/crates/iceberg/src/delete_file_index.rs new file mode 100644 index 000000000..5265c08f0 --- /dev/null +++ b/crates/iceberg/src/delete_file_index.rs @@ -0,0 +1,110 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use futures::channel::mpsc; +use futures::{StreamExt, TryStreamExt}; +use tokio::sync::watch; + +use crate::scan::FileScanTaskDeleteFile; +use crate::spec::{DataContentType, DataFile}; +use crate::Result; + +type DeleteFileIndexRef = Arc>; +pub(crate) type DeleteFileIndexRefReceiver = watch::Receiver>; + +/// Index of delete files +#[derive(Debug)] +pub(crate) struct DeleteFileIndex { + #[allow(dead_code)] + global_deletes: Vec>, + #[allow(dead_code)] + equality_deletes_by_partition: HashMap>>, + #[allow(dead_code)] + positional_deletes_by_partition: HashMap>>, + positional_deletes_by_path: HashMap>>, +} + +impl DeleteFileIndex { + pub(crate) fn from_receiver( + receiver: mpsc::Receiver>, + ) -> watch::Receiver> { + let (tx, rx) = watch::channel(None); + + let delete_file_stream = receiver.boxed(); + tokio::spawn(async move { + let delete_files = delete_file_stream.try_collect::>().await; + let delete_file_index = delete_files.map(DeleteFileIndex::from_delete_files); + let delete_file_index = Arc::new(delete_file_index); + tx.send(Some(delete_file_index)) + }); + + rx + } + + fn from_delete_files(files: Vec) -> Self { + let mut equality_deletes_by_partition: HashMap>> = + HashMap::default(); + let mut positional_deletes_by_partition: HashMap>> = + HashMap::default(); + let mut positional_deletes_by_path: HashMap>> = + HashMap::default(); + + files.into_iter().for_each(|file| { + let arc_file = Arc::new(file); + match arc_file.file_type { + DataContentType::PositionDeletes => { + // TODO: implement logic from ContentFileUtil.referencedDataFile + // see https://github.com/apache/iceberg/blob/cdf748e8e5537f13d861aa4c617a51f3e11dc97c/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java#L54 + let referenced_data_file_path = "TODO".to_string(); + + positional_deletes_by_path + .entry(referenced_data_file_path) + .and_modify(|entry| { + entry.push(arc_file.clone()); + }) + .or_insert(vec![arc_file.clone()]); + + positional_deletes_by_partition + .entry(arc_file.partition_spec_id) + .and_modify(|entry| { + entry.push(arc_file.clone()); + }) + .or_insert(vec![arc_file.clone()]); + } + DataContentType::EqualityDeletes => { + equality_deletes_by_partition + .entry(arc_file.partition_spec_id) + .and_modify(|entry| { + entry.push(arc_file.clone()); + }) + .or_insert(vec![arc_file.clone()]); + } + _ => unreachable!(), + } + }); + + DeleteFileIndex { + global_deletes: vec![], + equality_deletes_by_partition, + positional_deletes_by_partition, + positional_deletes_by_path, + } + } + + /// Determine all the delete files that apply to the provided `DataFile`. + pub(crate) fn get_deletes_for_data_file( + &self, + data_file: &DataFile, + ) -> Vec { + let mut results = vec![]; + + if let Some(positional_deletes) = self.positional_deletes_by_path.get(data_file.file_path()) + { + results.extend(positional_deletes.iter().map(|i| i.as_ref().clone())) + } + + // TODO: equality deletes + + results + } +} diff --git a/crates/iceberg/src/expr/predicate.rs b/crates/iceberg/src/expr/predicate.rs index e0f6a7845..f38a0ccad 100644 --- a/crates/iceberg/src/expr/predicate.rs +++ b/crates/iceberg/src/expr/predicate.rs @@ -724,6 +724,12 @@ pub enum BoundPredicate { Set(SetExpression), } +impl BoundPredicate { + pub(crate) fn and(self, other: BoundPredicate) -> BoundPredicate { + BoundPredicate::And(LogicalExpression::new([Box::new(self), Box::new(other)])) + } +} + impl Display for BoundPredicate { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index fe5a52999..466592bb8 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -83,6 +83,8 @@ pub mod transform; mod runtime; pub mod arrow; +// pub(crate) mod delete_file_index; +pub(crate) mod delete_file_index; mod utils; pub mod writer; diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 5a97e74e7..acf10f5e6 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -27,6 +27,7 @@ use futures::{SinkExt, StreamExt, TryFutureExt, TryStreamExt}; use serde::{Deserialize, Serialize}; use crate::arrow::ArrowReaderBuilder; +use crate::delete_file_index::{DeleteFileIndex, DeleteFileIndexRefReceiver}; use crate::expr::visitors::expression_evaluator::ExpressionEvaluator; use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator; use crate::expr::visitors::inclusive_projection::InclusiveProjection; @@ -62,6 +63,11 @@ pub struct TableScanBuilder<'a> { concurrency_limit_manifest_files: usize, row_group_filtering_enabled: bool, row_selection_enabled: bool, + + // TODO: defaults to false for now whilst delete file processing + // is still being worked on but will switch to a default of true + // once this work is complete + delete_file_processing_enabled: bool, } impl<'a> TableScanBuilder<'a> { @@ -80,6 +86,7 @@ impl<'a> TableScanBuilder<'a> { concurrency_limit_manifest_files: num_cpus, row_group_filtering_enabled: true, row_selection_enabled: false, + delete_file_processing_enabled: false, } } @@ -186,6 +193,17 @@ impl<'a> TableScanBuilder<'a> { self } + /// Determines whether to enable delete file processing (currently disabled by default) + /// + /// When disabled, delete files are ignored. + pub fn with_delete_file_processing_enabled( + mut self, + delete_file_processing_enabled: bool, + ) -> Self { + self.delete_file_processing_enabled = delete_file_processing_enabled; + self + } + /// Build the table scan. pub fn build(self) -> Result { let snapshot = match self.snapshot_id { @@ -304,6 +322,7 @@ impl<'a> TableScanBuilder<'a> { concurrency_limit_manifest_files: self.concurrency_limit_manifest_files, row_group_filtering_enabled: self.row_group_filtering_enabled, row_selection_enabled: self.row_selection_enabled, + delete_file_processing_enabled: self.delete_file_processing_enabled, }) } } @@ -329,6 +348,7 @@ pub struct TableScan { row_group_filtering_enabled: bool, row_selection_enabled: bool, + delete_file_processing_enabled: bool, } /// PlanContext wraps a [`SnapshotRef`] alongside all the other @@ -357,18 +377,39 @@ impl TableScan { let concurrency_limit_manifest_entries = self.concurrency_limit_manifest_entries; // used to stream ManifestEntryContexts between stages of the file plan operation - let (manifest_entry_ctx_tx, manifest_entry_ctx_rx) = + let (manifest_entry_data_ctx_tx, manifest_entry_data_ctx_rx) = + channel(concurrency_limit_manifest_files); + let (manifest_entry_delete_ctx_tx, manifest_entry_delete_ctx_rx) = channel(concurrency_limit_manifest_files); + // used to stream the results back to the caller let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries); + let delete_file_idx_and_tx: Option<( + DeleteFileIndexRefReceiver, + Sender>, + )> = if self.delete_file_processing_enabled { + // used to stream delete files into the DeleteFileIndex + let (delete_file_tx, delete_file_rx) = channel(concurrency_limit_manifest_entries); + + let delete_file_index_rx = DeleteFileIndex::from_receiver(delete_file_rx); + Some((delete_file_index_rx, delete_file_tx)) + } else { + None + }; + let manifest_list = self.plan_context.get_manifest_list().await?; - // get the [`ManifestFile`]s from the [`ManifestList`], filtering out - // partitions cannot match the scan's filter - let manifest_file_contexts = self - .plan_context - .build_manifest_file_contexts(manifest_list, manifest_entry_ctx_tx)?; + // get the [`ManifestFile`]s from the [`ManifestList`], filtering out any + // whose partitions cannot match this + // scan's filter + let manifest_file_contexts = self.plan_context.build_manifest_file_contexts( + manifest_list, + manifest_entry_data_ctx_tx, + delete_file_idx_and_tx.as_ref().map(|(delete_file_idx, _)| { + (delete_file_idx.clone(), manifest_entry_delete_ctx_tx) + }), + )?; let mut channel_for_manifest_error = file_scan_task_tx.clone(); @@ -385,17 +426,45 @@ impl TableScan { } }); - let mut channel_for_manifest_entry_error = file_scan_task_tx.clone(); + let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone(); + + if let Some((_, delete_file_tx)) = delete_file_idx_and_tx { + let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone(); + + // Process the delete file [`ManifestEntry`] stream in parallel + spawn(async move { + let result = manifest_entry_delete_ctx_rx + .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone()))) + .try_for_each_concurrent( + concurrency_limit_manifest_entries, + |(manifest_entry_context, tx)| async move { + spawn(async move { + Self::process_delete_manifest_entry(manifest_entry_context, tx) + .await + }) + .await + }, + ) + .await; + + if let Err(error) = result { + let _ = channel_for_delete_manifest_entry_error + .send(Err(error)) + .await; + } + }) + .await; + } - // Process the [`ManifestEntry`] stream in parallel + // Process the data file [`ManifestEntry`] stream in parallel spawn(async move { - let result = manifest_entry_ctx_rx + let result = manifest_entry_data_ctx_rx .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone()))) .try_for_each_concurrent( concurrency_limit_manifest_entries, |(manifest_entry_context, tx)| async move { spawn(async move { - Self::process_manifest_entry(manifest_entry_context, tx).await + Self::process_data_manifest_entry(manifest_entry_context, tx).await }) .await }, @@ -403,7 +472,7 @@ impl TableScan { .await; if let Err(error) = result { - let _ = channel_for_manifest_entry_error.send(Err(error)).await; + let _ = channel_for_data_manifest_entry_error.send(Err(error)).await; } }); @@ -437,7 +506,7 @@ impl TableScan { &self.plan_context.snapshot } - async fn process_manifest_entry( + async fn process_data_manifest_entry( manifest_entry_context: ManifestEntryContext, mut file_scan_task_tx: Sender>, ) -> Result<()> { @@ -446,12 +515,11 @@ impl TableScan { return Ok(()); } - // abort the plan if we encounter a manifest entry whose data file's - // content type is currently unsupported + // abort the plan if we encounter a manifest entry for a delete file if manifest_entry_context.manifest_entry.content_type() != DataContentType::Data { return Err(Error::new( ErrorKind::FeatureUnsupported, - "Only Data files currently supported", + "Encountered an entry for a delete file in a data file manifest", )); } @@ -489,7 +557,54 @@ impl TableScan { // entire plan without getting filtered out. Create a corresponding // FileScanTask and push it to the result stream file_scan_task_tx - .send(Ok(manifest_entry_context.into_file_scan_task())) + .send(Ok(manifest_entry_context.into_file_scan_task().await?)) + .await?; + + Ok(()) + } + + async fn process_delete_manifest_entry( + manifest_entry_context: ManifestEntryContext, + mut file_scan_task_delete_file_tx: Sender>, + ) -> Result<()> { + // skip processing this manifest entry if it has been marked as deleted + if !manifest_entry_context.manifest_entry.is_alive() { + return Ok(()); + } + + // abort the plan if we encounter a manifest entry that is not for a delete file + if manifest_entry_context.manifest_entry.content_type() == DataContentType::Data { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Encountered an entry for a data file in a delete manifest", + )); + } + + if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates { + let expression_evaluator_cache = + manifest_entry_context.expression_evaluator_cache.as_ref(); + + let expression_evaluator = expression_evaluator_cache.get( + manifest_entry_context.partition_spec_id, + &bound_predicates.partition_bound_predicate, + )?; + + // skip any data file whose partition data indicates that it can't contain + // any data that matches this scan's filter + if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? { + return Ok(()); + } + } + + file_scan_task_delete_file_tx + .send(Ok(FileScanTaskDeleteFile { + file_path: manifest_entry_context + .manifest_entry + .file_path() + .to_string(), + file_type: manifest_entry_context.manifest_entry.content_type(), + partition_spec_id: manifest_entry_context.partition_spec_id, + })) .await?; Ok(()) @@ -513,6 +628,7 @@ struct ManifestFileContext { object_cache: Arc, snapshot_schema: SchemaRef, expression_evaluator_cache: Arc, + delete_file_index: Option, } /// Wraps a [`ManifestEntryRef`] alongside the objects that are needed @@ -525,6 +641,7 @@ struct ManifestEntryContext { bound_predicates: Option>, partition_spec_id: i32, snapshot_schema: SchemaRef, + delete_file_index: Option, } impl ManifestFileContext { @@ -539,6 +656,7 @@ impl ManifestFileContext { field_ids, mut sender, expression_evaluator_cache, + delete_file_index, .. } = self; @@ -546,13 +664,14 @@ impl ManifestFileContext { for manifest_entry in manifest.entries() { let manifest_entry_context = ManifestEntryContext { - // TODO: refactor to avoid clone + // TODO: refactor to avoid the expensive ManifestEntry clone manifest_entry: manifest_entry.clone(), expression_evaluator_cache: expression_evaluator_cache.clone(), field_ids: field_ids.clone(), partition_spec_id: manifest_file.partition_spec_id, bound_predicates: bound_predicates.clone(), snapshot_schema: snapshot_schema.clone(), + delete_file_index: delete_file_index.clone(), }; sender @@ -568,8 +687,34 @@ impl ManifestFileContext { impl ManifestEntryContext { /// consume this `ManifestEntryContext`, returning a `FileScanTask` /// created from it - fn into_file_scan_task(self) -> FileScanTask { - FileScanTask { + async fn into_file_scan_task(self) -> Result { + // let deletes = self.get_deletes().await?; + + let deletes = if let Some(mut delete_file_index_rx) = self.delete_file_index { + let del_file_idx_opt = delete_file_index_rx + .wait_for(Option::is_some) + .await + .map_err(|_| Error::new(ErrorKind::Unexpected, "DeleteFileIndex recv error"))?; + + match del_file_idx_opt.as_ref() { + Some(del_file_idx) => match del_file_idx.as_ref() { + Ok(delete_file_idx) => { + delete_file_idx.get_deletes_for_data_file(self.manifest_entry.data_file()) + } + Err(err) => { + return Err(Error::new(ErrorKind::Unexpected, err.message())); + } + }, + + // the `wait_for(Option::is_some)` above means that we can + // never get a `None` here + None => unreachable!(), + } + } else { + vec![] + }; + + Ok(FileScanTask { start: 0, length: self.manifest_entry.file_size_in_bytes(), record_count: Some(self.manifest_entry.record_count()), @@ -583,7 +728,9 @@ impl ManifestEntryContext { predicate: self .bound_predicates .map(|x| x.as_ref().snapshot_bound_predicate.clone()), - } + + deletes, + }) } } @@ -619,29 +766,33 @@ impl PlanContext { fn build_manifest_file_contexts( &self, manifest_list: Arc, - sender: Sender, + tx_data: Sender, + delete_file_idx_and_tx: Option<(DeleteFileIndexRefReceiver, Sender)>, ) -> Result>>> { - let entries = manifest_list.entries(); - - if entries - .iter() - .any(|e| e.content != ManifestContentType::Data) - { - return Err(Error::new( - ErrorKind::FeatureUnsupported, - "Merge-on-read is not yet supported", - )); - } + let manifest_files = manifest_list.entries().iter(); // TODO: Ideally we could ditch this intermediate Vec as we return an iterator. let mut filtered_mfcs = vec![]; - if self.predicate.is_some() { - for manifest_file in entries { + + for manifest_file in manifest_files { + let (delete_file_idx, tx) = if manifest_file.content == ManifestContentType::Deletes { + let Some((delete_file_idx, tx)) = delete_file_idx_and_tx.as_ref() else { + continue; + }; + (Some(delete_file_idx.clone()), tx.clone()) + } else { + ( + delete_file_idx_and_tx.as_ref().map(|x| x.0.clone()), + tx_data.clone(), + ) + }; + + let partition_bound_predicate = if self.predicate.is_some() { let partition_bound_predicate = self.get_partition_filter(manifest_file)?; // evaluate the ManifestFile against the partition filter. Skip // if it cannot contain any matching rows - if self + if !self .manifest_evaluator_cache .get( manifest_file.partition_spec_id, @@ -649,19 +800,22 @@ impl PlanContext { ) .eval(manifest_file)? { - let mfc = self.create_manifest_file_context( - manifest_file, - Some(partition_bound_predicate), - sender.clone(), - ); - filtered_mfcs.push(Ok(mfc)); + continue; } - } - } else { - for manifest_file in entries { - let mfc = self.create_manifest_file_context(manifest_file, None, sender.clone()); - filtered_mfcs.push(Ok(mfc)); - } + + Some(partition_bound_predicate) + } else { + None + }; + + let mfc = self.create_manifest_file_context( + manifest_file, + partition_bound_predicate, + tx, + delete_file_idx, + ); + + filtered_mfcs.push(Ok(mfc)); } Ok(Box::new(filtered_mfcs.into_iter())) @@ -672,6 +826,7 @@ impl PlanContext { manifest_file: &ManifestFile, partition_filter: Option>, sender: Sender, + delete_file_index: Option, ) -> ManifestFileContext { let bound_predicates = if let (Some(ref partition_bound_predicate), Some(snapshot_bound_predicate)) = @@ -693,6 +848,7 @@ impl PlanContext { snapshot_schema: self.snapshot_schema.clone(), field_ids: self.field_ids.clone(), expression_evaluator_cache: self.expression_evaluator_cache.clone(), + delete_file_index, } } } @@ -919,8 +1075,10 @@ pub struct FileScanTask { /// The data file path corresponding to the task. pub data_file_path: String, + /// The content type of the file to scan. pub data_file_content: DataContentType, + /// The format of the file to scan. pub data_file_format: DataFileFormat, @@ -931,6 +1089,22 @@ pub struct FileScanTask { /// The predicate to filter. #[serde(skip_serializing_if = "Option::is_none")] pub predicate: Option, + + /// The list of delete files that may need to be applied to this data file + pub deletes: Vec, +} + +/// A task to scan part of file. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct FileScanTaskDeleteFile { + /// The delete file path + pub file_path: String, + + /// delete file type + pub file_type: DataContentType, + + /// partition id + pub partition_spec_id: i32, } impl FileScanTask { @@ -1416,16 +1590,16 @@ pub mod tests { .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))]))) .await .unwrap(); - let batche1: Vec<_> = batch_stream.try_collect().await.unwrap(); + let batch_1: Vec<_> = batch_stream.try_collect().await.unwrap(); let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build(); let batch_stream = reader .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))]))) .await .unwrap(); - let batche2: Vec<_> = batch_stream.try_collect().await.unwrap(); + let batch_2: Vec<_> = batch_stream.try_collect().await.unwrap(); - assert_eq!(batche1, batche2); + assert_eq!(batch_1, batch_2); } #[tokio::test] @@ -1867,6 +2041,7 @@ pub mod tests { schema: schema.clone(), record_count: Some(100), data_file_format: DataFileFormat::Parquet, + deletes: vec![], }; test_fn(task); @@ -1881,6 +2056,7 @@ pub mod tests { schema, record_count: None, data_file_format: DataFileFormat::Avro, + deletes: vec![], }; test_fn(task); } diff --git a/crates/iceberg/testdata/example_table_metadata_v2.json b/crates/iceberg/testdata/example_table_metadata_v2.json index 35230966a..17bbd7d99 100644 --- a/crates/iceberg/testdata/example_table_metadata_v2.json +++ b/crates/iceberg/testdata/example_table_metadata_v2.json @@ -7,7 +7,12 @@ "last-column-id": 3, "current-schema-id": 1, "schemas": [ - {"type": "struct", "schema-id": 0, "fields": [{"id": 1, "name": "x", "required": true, "type": "long"}]}, + { + "type": "struct", + "schema-id": 0, + "fields": [ + {"id": 1, "name": "x", "required": true, "type": "long"} + ]}, { "type": "struct", "schema-id": 1, @@ -25,7 +30,14 @@ } ], "default-spec-id": 0, - "partition-specs": [{"spec-id": 0, "fields": [{"name": "x", "transform": "identity", "source-id": 1, "field-id": 1000}]}], + "partition-specs": [ + { + "spec-id": 0, + "fields": [ + {"name": "x", "transform": "identity", "source-id": 1, "field-id": 1000} + ] + } + ], "last-partition-id": 1000, "default-sort-order-id": 3, "sort-orders": [ From 93112187fe810ba93dc6ed03be0e4f600494e4ca Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Sat, 21 Dec 2024 20:46:06 +0000 Subject: [PATCH 02/11] refactor: introduce DeleteFileContext. Used to encapsulate enough information for DeleteFileIndex to be able to perform the requrired filtering of delete files --- crates/iceberg/src/delete_file_index.rs | 129 +++++++++++++++++------- crates/iceberg/src/scan.rs | 33 +++--- 2 files changed, 110 insertions(+), 52 deletions(-) diff --git a/crates/iceberg/src/delete_file_index.rs b/crates/iceberg/src/delete_file_index.rs index 5265c08f0..6bda14a8a 100644 --- a/crates/iceberg/src/delete_file_index.rs +++ b/crates/iceberg/src/delete_file_index.rs @@ -2,11 +2,11 @@ use std::collections::HashMap; use std::sync::Arc; use futures::channel::mpsc; -use futures::{StreamExt, TryStreamExt}; +use futures::{StreamExt, TryFutureExt, TryStreamExt}; use tokio::sync::watch; -use crate::scan::FileScanTaskDeleteFile; -use crate::spec::{DataContentType, DataFile}; +use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile}; +use crate::spec::{DataContentType, DataFile, Struct}; use crate::Result; type DeleteFileIndexRef = Arc>; @@ -16,17 +16,15 @@ pub(crate) type DeleteFileIndexRefReceiver = watch::Receiver>, - #[allow(dead_code)] - equality_deletes_by_partition: HashMap>>, - #[allow(dead_code)] - positional_deletes_by_partition: HashMap>>, - positional_deletes_by_path: HashMap>>, + global_deletes: Vec>, + eq_deletes_by_partition: HashMap>>, + pos_deletes_by_partition: HashMap>>, + pos_deletes_by_path: HashMap>>, } impl DeleteFileIndex { - pub(crate) fn from_receiver( - receiver: mpsc::Receiver>, + pub(crate) fn from_del_file_chan( + receiver: mpsc::Receiver>, ) -> watch::Receiver> { let (tx, rx) = watch::channel(None); @@ -41,43 +39,55 @@ impl DeleteFileIndex { rx } - fn from_delete_files(files: Vec) -> Self { - let mut equality_deletes_by_partition: HashMap>> = + fn from_delete_files(files: Vec) -> Self { + let mut eq_deletes_by_partition: HashMap>> = HashMap::default(); - let mut positional_deletes_by_partition: HashMap>> = + let mut pos_deletes_by_partition: HashMap>> = HashMap::default(); - let mut positional_deletes_by_path: HashMap>> = + let mut pos_deletes_by_path: HashMap>> = HashMap::default(); - files.into_iter().for_each(|file| { - let arc_file = Arc::new(file); - match arc_file.file_type { + files.into_iter().for_each(|del_file_ctx| { + let arc_del_file_ctx = Arc::new(del_file_ctx); + match arc_del_file_ctx.manifest_entry.content_type() { DataContentType::PositionDeletes => { // TODO: implement logic from ContentFileUtil.referencedDataFile // see https://github.com/apache/iceberg/blob/cdf748e8e5537f13d861aa4c617a51f3e11dc97c/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java#L54 let referenced_data_file_path = "TODO".to_string(); - positional_deletes_by_path + pos_deletes_by_path .entry(referenced_data_file_path) .and_modify(|entry| { - entry.push(arc_file.clone()); + entry.push(arc_del_file_ctx.clone()); }) - .or_insert(vec![arc_file.clone()]); - - positional_deletes_by_partition - .entry(arc_file.partition_spec_id) + .or_insert(vec![arc_del_file_ctx.clone()]); + + pos_deletes_by_partition + .entry( + arc_del_file_ctx + .manifest_entry + .data_file() + .partition() + .clone(), + ) .and_modify(|entry| { - entry.push(arc_file.clone()); + entry.push(arc_del_file_ctx.clone()); }) - .or_insert(vec![arc_file.clone()]); + .or_insert(vec![arc_del_file_ctx.clone()]); } DataContentType::EqualityDeletes => { - equality_deletes_by_partition - .entry(arc_file.partition_spec_id) + eq_deletes_by_partition + .entry( + arc_del_file_ctx + .manifest_entry + .data_file() + .partition() + .clone(), + ) .and_modify(|entry| { - entry.push(arc_file.clone()); + entry.push(arc_del_file_ctx.clone()); }) - .or_insert(vec![arc_file.clone()]); + .or_insert(vec![arc_del_file_ctx.clone()]); } _ => unreachable!(), } @@ -85,9 +95,9 @@ impl DeleteFileIndex { DeleteFileIndex { global_deletes: vec![], - equality_deletes_by_partition, - positional_deletes_by_partition, - positional_deletes_by_path, + eq_deletes_by_partition, + pos_deletes_by_partition, + pos_deletes_by_path, } } @@ -95,15 +105,60 @@ impl DeleteFileIndex { pub(crate) fn get_deletes_for_data_file( &self, data_file: &DataFile, + seq_num: Option, ) -> Vec { let mut results = vec![]; - if let Some(positional_deletes) = self.positional_deletes_by_path.get(data_file.file_path()) - { - results.extend(positional_deletes.iter().map(|i| i.as_ref().clone())) + if let Some(deletes) = self.pos_deletes_by_path.get(data_file.file_path()) { + deletes + .iter() + .filter(|&delete| { + seq_num + .map(|seq_num| delete.manifest_entry.sequence_number() > Some(seq_num)) + .unwrap_or_else(|| true) + }) + .for_each(|delete| { + results.push(FileScanTaskDeleteFile { + file_path: delete.manifest_entry.file_path().to_string(), + file_type: delete.manifest_entry.content_type(), + partition_spec_id: delete.partition_spec_id, + }) + }); } - // TODO: equality deletes + if let Some(deletes) = self.pos_deletes_by_partition.get(data_file.partition()) { + deletes + .iter() + .filter(|&delete| { + seq_num + .map(|seq_num| delete.manifest_entry.sequence_number() > Some(seq_num)) + .unwrap_or_else(|| true) + }) + .for_each(|delete| { + results.push(FileScanTaskDeleteFile { + file_path: delete.manifest_entry.file_path().to_string(), + file_type: delete.manifest_entry.content_type(), + partition_spec_id: delete.partition_spec_id, + }) + }); + } + + if let Some(deletes) = self.eq_deletes_by_partition.get(data_file.partition()) { + deletes + .iter() + .filter(|&delete| { + seq_num + .map(|seq_num| delete.manifest_entry.sequence_number() > Some(seq_num)) + .unwrap_or_else(|| true) + }) + .for_each(|delete| { + results.push(FileScanTaskDeleteFile { + file_path: delete.manifest_entry.file_path().to_string(), + file_type: delete.manifest_entry.content_type(), + partition_spec_id: delete.partition_spec_id, + }) + }); + } results } diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index acf10f5e6..b3bbb5517 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -385,14 +385,14 @@ impl TableScan { // used to stream the results back to the caller let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries); + // DeleteFileIndexRefReceiver is a watch channel receiver that will + // be notified when the DeleteFileIndex is ready. let delete_file_idx_and_tx: Option<( DeleteFileIndexRefReceiver, - Sender>, + Sender>, )> = if self.delete_file_processing_enabled { - // used to stream delete files into the DeleteFileIndex let (delete_file_tx, delete_file_rx) = channel(concurrency_limit_manifest_entries); - - let delete_file_index_rx = DeleteFileIndex::from_receiver(delete_file_rx); + let delete_file_index_rx = DeleteFileIndex::from_del_file_chan(delete_file_rx); Some((delete_file_index_rx, delete_file_tx)) } else { None @@ -565,7 +565,7 @@ impl TableScan { async fn process_delete_manifest_entry( manifest_entry_context: ManifestEntryContext, - mut file_scan_task_delete_file_tx: Sender>, + mut delete_file_ctx_tx: Sender>, ) -> Result<()> { // skip processing this manifest entry if it has been marked as deleted if !manifest_entry_context.manifest_entry.is_alive() { @@ -596,13 +596,9 @@ impl TableScan { } } - file_scan_task_delete_file_tx - .send(Ok(FileScanTaskDeleteFile { - file_path: manifest_entry_context - .manifest_entry - .file_path() - .to_string(), - file_type: manifest_entry_context.manifest_entry.content_type(), + delete_file_ctx_tx + .send(Ok(DeleteFileContext { + manifest_entry: manifest_entry_context.manifest_entry.clone(), partition_spec_id: manifest_entry_context.partition_spec_id, })) .await?; @@ -698,9 +694,10 @@ impl ManifestEntryContext { match del_file_idx_opt.as_ref() { Some(del_file_idx) => match del_file_idx.as_ref() { - Ok(delete_file_idx) => { - delete_file_idx.get_deletes_for_data_file(self.manifest_entry.data_file()) - } + Ok(delete_file_idx) => delete_file_idx.get_deletes_for_data_file( + self.manifest_entry.data_file(), + self.manifest_entry.sequence_number(), + ), Err(err) => { return Err(Error::new(ErrorKind::Unexpected, err.message())); } @@ -1107,6 +1104,12 @@ pub struct FileScanTaskDeleteFile { pub partition_spec_id: i32, } +#[derive(Debug)] +pub(crate) struct DeleteFileContext { + pub(crate) manifest_entry: ManifestEntryRef, + pub(crate) partition_spec_id: i32, +} + impl FileScanTask { /// Returns the data file path of this file scan task. pub fn data_file_path(&self) -> &str { From e2a21c1013b4a8b03837074b435b2318f9e9fb7c Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Sat, 21 Dec 2024 20:51:55 +0000 Subject: [PATCH 03/11] chore: remove unused BoundPredicate::and method and unused import --- crates/iceberg/src/delete_file_index.rs | 2 +- crates/iceberg/src/expr/predicate.rs | 6 ------ 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/crates/iceberg/src/delete_file_index.rs b/crates/iceberg/src/delete_file_index.rs index 6bda14a8a..df3ff7f0a 100644 --- a/crates/iceberg/src/delete_file_index.rs +++ b/crates/iceberg/src/delete_file_index.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::sync::Arc; use futures::channel::mpsc; -use futures::{StreamExt, TryFutureExt, TryStreamExt}; +use futures::{StreamExt, TryStreamExt}; use tokio::sync::watch; use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile}; diff --git a/crates/iceberg/src/expr/predicate.rs b/crates/iceberg/src/expr/predicate.rs index f38a0ccad..e0f6a7845 100644 --- a/crates/iceberg/src/expr/predicate.rs +++ b/crates/iceberg/src/expr/predicate.rs @@ -724,12 +724,6 @@ pub enum BoundPredicate { Set(SetExpression), } -impl BoundPredicate { - pub(crate) fn and(self, other: BoundPredicate) -> BoundPredicate { - BoundPredicate::And(LogicalExpression::new([Box::new(self), Box::new(other)])) - } -} - impl Display for BoundPredicate { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { From df9c9dff1dd4b566f7ae150ade22229cea6e4664 Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Sat, 21 Dec 2024 21:04:08 +0000 Subject: [PATCH 04/11] refactor: get_deletes_for_data_file. Change tokio::spawn to crate::runtime::spawn. Add missing license --- crates/iceberg/src/delete_file_index.rs | 83 ++++++++++++------------- 1 file changed, 39 insertions(+), 44 deletions(-) diff --git a/crates/iceberg/src/delete_file_index.rs b/crates/iceberg/src/delete_file_index.rs index df3ff7f0a..80aceb45f 100644 --- a/crates/iceberg/src/delete_file_index.rs +++ b/crates/iceberg/src/delete_file_index.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use std::collections::HashMap; use std::sync::Arc; @@ -5,6 +22,7 @@ use futures::channel::mpsc; use futures::{StreamExt, TryStreamExt}; use tokio::sync::watch; +use crate::runtime::spawn; use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile}; use crate::spec::{DataContentType, DataFile, Struct}; use crate::Result; @@ -29,7 +47,7 @@ impl DeleteFileIndex { let (tx, rx) = watch::channel(None); let delete_file_stream = receiver.boxed(); - tokio::spawn(async move { + spawn(async move { let delete_files = delete_file_stream.try_collect::>().await; let delete_file_index = delete_files.map(DeleteFileIndex::from_delete_files); let delete_file_index = Arc::new(delete_file_index); @@ -107,59 +125,36 @@ impl DeleteFileIndex { data_file: &DataFile, seq_num: Option, ) -> Vec { - let mut results = vec![]; + let mut deletes_queue = vec![]; if let Some(deletes) = self.pos_deletes_by_path.get(data_file.file_path()) { - deletes - .iter() - .filter(|&delete| { - seq_num - .map(|seq_num| delete.manifest_entry.sequence_number() > Some(seq_num)) - .unwrap_or_else(|| true) - }) - .for_each(|delete| { - results.push(FileScanTaskDeleteFile { - file_path: delete.manifest_entry.file_path().to_string(), - file_type: delete.manifest_entry.content_type(), - partition_spec_id: delete.partition_spec_id, - }) - }); + deletes_queue.extend(deletes.iter()); } if let Some(deletes) = self.pos_deletes_by_partition.get(data_file.partition()) { - deletes - .iter() - .filter(|&delete| { - seq_num - .map(|seq_num| delete.manifest_entry.sequence_number() > Some(seq_num)) - .unwrap_or_else(|| true) - }) - .for_each(|delete| { - results.push(FileScanTaskDeleteFile { - file_path: delete.manifest_entry.file_path().to_string(), - file_type: delete.manifest_entry.content_type(), - partition_spec_id: delete.partition_spec_id, - }) - }); + deletes_queue.extend(deletes.iter()); } if let Some(deletes) = self.eq_deletes_by_partition.get(data_file.partition()) { - deletes - .iter() - .filter(|&delete| { - seq_num - .map(|seq_num| delete.manifest_entry.sequence_number() > Some(seq_num)) - .unwrap_or_else(|| true) - }) - .for_each(|delete| { - results.push(FileScanTaskDeleteFile { - file_path: delete.manifest_entry.file_path().to_string(), - file_type: delete.manifest_entry.content_type(), - partition_spec_id: delete.partition_spec_id, - }) - }); + deletes_queue.extend(deletes.iter()); } + let mut results = vec![]; + deletes_queue + .iter() + .filter(|&delete| { + seq_num + .map(|seq_num| delete.manifest_entry.sequence_number() > Some(seq_num)) + .unwrap_or_else(|| true) + }) + .for_each(|delete| { + results.push(FileScanTaskDeleteFile { + file_path: delete.manifest_entry.file_path().to_string(), + file_type: delete.manifest_entry.content_type(), + partition_spec_id: delete.partition_spec_id, + }) + }); + results } } From c4a9339a690670bfd5b6f9713691b925cb575f1f Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Mon, 23 Dec 2024 11:38:58 +0000 Subject: [PATCH 05/11] refactor: DeleteFileIndex cloneable, with an awaitable method to get deletes for data file. --- crates/iceberg/src/delete_file_index.rs | 105 ++++++++++++++++++------ crates/iceberg/src/scan.rs | 62 +++++--------- 2 files changed, 101 insertions(+), 66 deletions(-) diff --git a/crates/iceberg/src/delete_file_index.rs b/crates/iceberg/src/delete_file_index.rs index 80aceb45f..3f1a092fc 100644 --- a/crates/iceberg/src/delete_file_index.rs +++ b/crates/iceberg/src/delete_file_index.rs @@ -16,23 +16,27 @@ // under the License. use std::collections::HashMap; -use std::sync::Arc; +use std::future::Future; +use std::ops::Deref; +use std::pin::Pin; +use std::sync::{Arc, RwLock}; +use std::task::{Context, Poll}; -use futures::channel::mpsc; -use futures::{StreamExt, TryStreamExt}; -use tokio::sync::watch; +use futures::channel::mpsc::{channel, Sender}; +use futures::StreamExt; use crate::runtime::spawn; use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile}; use crate::spec::{DataContentType, DataFile, Struct}; -use crate::Result; -type DeleteFileIndexRef = Arc>; -pub(crate) type DeleteFileIndexRefReceiver = watch::Receiver>; +#[derive(Debug)] +enum DeleteFileIndexState { + Populating, + Populated(PopulatedDeleteFileIndex), +} -/// Index of delete files #[derive(Debug)] -pub(crate) struct DeleteFileIndex { +struct PopulatedDeleteFileIndex { #[allow(dead_code)] global_deletes: Vec>, eq_deletes_by_partition: HashMap>>, @@ -40,24 +44,53 @@ pub(crate) struct DeleteFileIndex { pos_deletes_by_path: HashMap>>, } +/// Index of delete files +#[derive(Clone, Debug)] +pub(crate) struct DeleteFileIndex { + state: Arc>, +} + impl DeleteFileIndex { - pub(crate) fn from_del_file_chan( - receiver: mpsc::Receiver>, - ) -> watch::Receiver> { - let (tx, rx) = watch::channel(None); - - let delete_file_stream = receiver.boxed(); - spawn(async move { - let delete_files = delete_file_stream.try_collect::>().await; - let delete_file_index = delete_files.map(DeleteFileIndex::from_delete_files); - let delete_file_index = Arc::new(delete_file_index); - tx.send(Some(delete_file_index)) + /// create a new `DeleteFileIndex` along with the sender that populates it with delete files + pub(crate) fn new() -> (DeleteFileIndex, Sender) { + // TODO: what should the channel limit be? + let (tx, rx) = channel(10); + let state = Arc::new(RwLock::new(DeleteFileIndexState::Populating)); + let delete_file_stream = rx.boxed(); + + spawn({ + let state = state.clone(); + async move { + let delete_files = delete_file_stream.collect::>().await; + + let populated_delete_file_index = PopulatedDeleteFileIndex::new(delete_files); + + let mut guard = state.write().unwrap(); + *guard = DeleteFileIndexState::Populated(populated_delete_file_index); + } }); - rx + (DeleteFileIndex { state }, tx) + } + + /// Gets all the delete files that apply to the specified data file. + /// + /// Returns a future that resolves to a Vec + pub(crate) fn get_deletes_for_data_file<'a>( + &self, + data_file: &'a DataFile, + seq_num: Option, + ) -> DeletesForDataFile<'a> { + DeletesForDataFile { + state: self.state.clone(), + data_file, + seq_num, + } } +} - fn from_delete_files(files: Vec) -> Self { +impl PopulatedDeleteFileIndex { + fn new(files: Vec) -> PopulatedDeleteFileIndex { let mut eq_deletes_by_partition: HashMap>> = HashMap::default(); let mut pos_deletes_by_partition: HashMap>> = @@ -111,7 +144,7 @@ impl DeleteFileIndex { } }); - DeleteFileIndex { + PopulatedDeleteFileIndex { global_deletes: vec![], eq_deletes_by_partition, pos_deletes_by_partition, @@ -120,7 +153,7 @@ impl DeleteFileIndex { } /// Determine all the delete files that apply to the provided `DataFile`. - pub(crate) fn get_deletes_for_data_file( + fn get_deletes_for_data_file( &self, data_file: &DataFile, seq_num: Option, @@ -158,3 +191,27 @@ impl DeleteFileIndex { results } } + +/// Future for the `DeleteFileIndex::get_deletes_for_data_file` method +pub(crate) struct DeletesForDataFile<'a> { + state: Arc>, + data_file: &'a DataFile, + seq_num: Option, +} + +impl Future for DeletesForDataFile<'_> { + type Output = Vec; + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + let Ok(guard) = self.state.try_read() else { + return Poll::Pending; + }; + + match guard.deref() { + DeleteFileIndexState::Populated(idx) => { + Poll::Ready(idx.get_deletes_for_data_file(self.data_file, self.seq_num)) + } + _ => Poll::Pending, + } + } +} diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index b3bbb5517..0656fba13 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -27,7 +27,7 @@ use futures::{SinkExt, StreamExt, TryFutureExt, TryStreamExt}; use serde::{Deserialize, Serialize}; use crate::arrow::ArrowReaderBuilder; -use crate::delete_file_index::{DeleteFileIndex, DeleteFileIndexRefReceiver}; +use crate::delete_file_index::DeleteFileIndex; use crate::expr::visitors::expression_evaluator::ExpressionEvaluator; use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator; use crate::expr::visitors::inclusive_projection::InclusiveProjection; @@ -385,18 +385,12 @@ impl TableScan { // used to stream the results back to the caller let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries); - // DeleteFileIndexRefReceiver is a watch channel receiver that will - // be notified when the DeleteFileIndex is ready. - let delete_file_idx_and_tx: Option<( - DeleteFileIndexRefReceiver, - Sender>, - )> = if self.delete_file_processing_enabled { - let (delete_file_tx, delete_file_rx) = channel(concurrency_limit_manifest_entries); - let delete_file_index_rx = DeleteFileIndex::from_del_file_chan(delete_file_rx); - Some((delete_file_index_rx, delete_file_tx)) - } else { - None - }; + let delete_file_idx_and_tx: Option<(DeleteFileIndex, Sender)> = + if self.delete_file_processing_enabled { + Some(DeleteFileIndex::new()) + } else { + None + }; let manifest_list = self.plan_context.get_manifest_list().await?; @@ -565,7 +559,7 @@ impl TableScan { async fn process_delete_manifest_entry( manifest_entry_context: ManifestEntryContext, - mut delete_file_ctx_tx: Sender>, + mut delete_file_ctx_tx: Sender, ) -> Result<()> { // skip processing this manifest entry if it has been marked as deleted if !manifest_entry_context.manifest_entry.is_alive() { @@ -597,10 +591,10 @@ impl TableScan { } delete_file_ctx_tx - .send(Ok(DeleteFileContext { + .send(DeleteFileContext { manifest_entry: manifest_entry_context.manifest_entry.clone(), partition_spec_id: manifest_entry_context.partition_spec_id, - })) + }) .await?; Ok(()) @@ -624,7 +618,7 @@ struct ManifestFileContext { object_cache: Arc, snapshot_schema: SchemaRef, expression_evaluator_cache: Arc, - delete_file_index: Option, + delete_file_index: Option, } /// Wraps a [`ManifestEntryRef`] alongside the objects that are needed @@ -637,7 +631,7 @@ struct ManifestEntryContext { bound_predicates: Option>, partition_spec_id: i32, snapshot_schema: SchemaRef, - delete_file_index: Option, + delete_file_index: Option, } impl ManifestFileContext { @@ -684,29 +678,13 @@ impl ManifestEntryContext { /// consume this `ManifestEntryContext`, returning a `FileScanTask` /// created from it async fn into_file_scan_task(self) -> Result { - // let deletes = self.get_deletes().await?; - - let deletes = if let Some(mut delete_file_index_rx) = self.delete_file_index { - let del_file_idx_opt = delete_file_index_rx - .wait_for(Option::is_some) + let deletes = if let Some(delete_file_index) = self.delete_file_index { + delete_file_index + .get_deletes_for_data_file( + self.manifest_entry.data_file(), + self.manifest_entry.sequence_number(), + ) .await - .map_err(|_| Error::new(ErrorKind::Unexpected, "DeleteFileIndex recv error"))?; - - match del_file_idx_opt.as_ref() { - Some(del_file_idx) => match del_file_idx.as_ref() { - Ok(delete_file_idx) => delete_file_idx.get_deletes_for_data_file( - self.manifest_entry.data_file(), - self.manifest_entry.sequence_number(), - ), - Err(err) => { - return Err(Error::new(ErrorKind::Unexpected, err.message())); - } - }, - - // the `wait_for(Option::is_some)` above means that we can - // never get a `None` here - None => unreachable!(), - } } else { vec![] }; @@ -764,7 +742,7 @@ impl PlanContext { &self, manifest_list: Arc, tx_data: Sender, - delete_file_idx_and_tx: Option<(DeleteFileIndexRefReceiver, Sender)>, + delete_file_idx_and_tx: Option<(DeleteFileIndex, Sender)>, ) -> Result>>> { let manifest_files = manifest_list.entries().iter(); @@ -823,7 +801,7 @@ impl PlanContext { manifest_file: &ManifestFile, partition_filter: Option>, sender: Sender, - delete_file_index: Option, + delete_file_index: Option, ) -> ManifestFileContext { let bound_predicates = if let (Some(ref partition_bound_predicate), Some(snapshot_bound_predicate)) = From 5308d597a17ba0a025daf7afababd57874d6cf53 Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Mon, 23 Dec 2024 11:40:57 +0000 Subject: [PATCH 06/11] style: reorder --- crates/iceberg/src/delete_file_index.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/iceberg/src/delete_file_index.rs b/crates/iceberg/src/delete_file_index.rs index 3f1a092fc..53a508651 100644 --- a/crates/iceberg/src/delete_file_index.rs +++ b/crates/iceberg/src/delete_file_index.rs @@ -29,6 +29,12 @@ use crate::runtime::spawn; use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile}; use crate::spec::{DataContentType, DataFile, Struct}; +/// Index of delete files +#[derive(Clone, Debug)] +pub(crate) struct DeleteFileIndex { + state: Arc>, +} + #[derive(Debug)] enum DeleteFileIndexState { Populating, @@ -44,12 +50,6 @@ struct PopulatedDeleteFileIndex { pos_deletes_by_path: HashMap>>, } -/// Index of delete files -#[derive(Clone, Debug)] -pub(crate) struct DeleteFileIndex { - state: Arc>, -} - impl DeleteFileIndex { /// create a new `DeleteFileIndex` along with the sender that populates it with delete files pub(crate) fn new() -> (DeleteFileIndex, Sender) { From 73ede9b47b34dbb9b63457e38cf95c6c2b58d2e2 Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Mon, 23 Dec 2024 11:47:36 +0000 Subject: [PATCH 07/11] style: remove unneeded commented-out code --- crates/iceberg/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 466592bb8..d684be54c 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -83,7 +83,6 @@ pub mod transform; mod runtime; pub mod arrow; -// pub(crate) mod delete_file_index; pub(crate) mod delete_file_index; mod utils; pub mod writer; From a66364afb55ebfa6cfb260216aa2686cfcc9933a Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Mon, 23 Dec 2024 12:50:11 +0000 Subject: [PATCH 08/11] refactor: collect delete file results into vec --- crates/iceberg/src/delete_file_index.rs | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/crates/iceberg/src/delete_file_index.rs b/crates/iceberg/src/delete_file_index.rs index 53a508651..341690639 100644 --- a/crates/iceberg/src/delete_file_index.rs +++ b/crates/iceberg/src/delete_file_index.rs @@ -172,7 +172,6 @@ impl PopulatedDeleteFileIndex { deletes_queue.extend(deletes.iter()); } - let mut results = vec![]; deletes_queue .iter() .filter(|&delete| { @@ -180,15 +179,12 @@ impl PopulatedDeleteFileIndex { .map(|seq_num| delete.manifest_entry.sequence_number() > Some(seq_num)) .unwrap_or_else(|| true) }) - .for_each(|delete| { - results.push(FileScanTaskDeleteFile { - file_path: delete.manifest_entry.file_path().to_string(), - file_type: delete.manifest_entry.content_type(), - partition_spec_id: delete.partition_spec_id, - }) - }); - - results + .map(|delete| FileScanTaskDeleteFile { + file_path: delete.manifest_entry.file_path().to_string(), + file_type: delete.manifest_entry.content_type(), + partition_spec_id: delete.partition_spec_id, + }) + .collect() } } From 9ce23ac10189210a49199aef6bdde13d7d168cf1 Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Mon, 23 Dec 2024 14:06:53 +0000 Subject: [PATCH 09/11] fix: ensure arrow reader returns Unsupported when delete files present in a file scan task --- crates/iceberg/src/arrow/reader.rs | 8 +++++++ .../tests/read_positional_deletes.rs | 24 +++++++++++++++---- 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 7e6df8782..ffe4f5d3a 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -167,6 +167,14 @@ impl ArrowReader { row_group_filtering_enabled: bool, row_selection_enabled: bool, ) -> Result { + // TODO: add support for delete files + if !task.deletes.is_empty() { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Delete files are not yet supported", + )); + } + // Get the metadata for the Parquet file we need to read and build // a reader for the data within let parquet_file = file_io.new_input(&task.data_file_path)?; diff --git a/crates/integration_tests/tests/read_positional_deletes.rs b/crates/integration_tests/tests/read_positional_deletes.rs index 41ca057a6..9c6e3689d 100644 --- a/crates/integration_tests/tests/read_positional_deletes.rs +++ b/crates/integration_tests/tests/read_positional_deletes.rs @@ -17,6 +17,7 @@ //! Integration tests for rest catalog. +use futures::TryStreamExt; use iceberg::ErrorKind::FeatureUnsupported; use iceberg::{Catalog, TableIdent}; use iceberg_integration_tests::set_test_fixture; @@ -35,16 +36,31 @@ async fn test_read_table_with_positional_deletes() { .await .unwrap(); - let scan = table.scan().build().unwrap(); + let scan = table + .scan() + .with_delete_file_processing_enabled(true) + .build() + .unwrap(); println!("{:?}", scan); - assert!(scan - .to_arrow() + let plan: Vec<_> = scan + .plan_files() + .await + .unwrap() + .try_collect() .await - .is_err_and(|e| e.kind() == FeatureUnsupported)); + .unwrap(); + println!("{:?}", plan); + + // Scan plan phase should include delete files in file plan + // when with_delete_file_processing_enabled == true + assert_eq!(plan[0].deletes.len(), 2); // 😱 If we don't support positional deletes, we should fail when we try to read a table that // has positional deletes. The table has 12 rows, and 2 are deleted, see provision.py + let result = scan.to_arrow().await.unwrap().try_collect::>().await; + + assert!(result.is_err_and(|e| e.kind() == FeatureUnsupported)); // When we get support for it: // let batch_stream = scan.to_arrow().await.unwrap(); From 54d4ae243fa625d0a3977e1187b47f372addca8e Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Mon, 23 Dec 2024 17:27:41 +0000 Subject: [PATCH 10/11] feat: global delete support, DeleteFileIndex --- crates/iceberg/src/delete_file_index.rs | 156 ++++++++++++------------ crates/iceberg/src/scan.rs | 12 +- 2 files changed, 88 insertions(+), 80 deletions(-) diff --git a/crates/iceberg/src/delete_file_index.rs b/crates/iceberg/src/delete_file_index.rs index 341690639..8a534d1fd 100644 --- a/crates/iceberg/src/delete_file_index.rs +++ b/crates/iceberg/src/delete_file_index.rs @@ -28,6 +28,7 @@ use futures::StreamExt; use crate::runtime::spawn; use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile}; use crate::spec::{DataContentType, DataFile, Struct}; +use crate::{Error, ErrorKind, Result}; /// Index of delete files #[derive(Clone, Debug)] @@ -47,7 +48,10 @@ struct PopulatedDeleteFileIndex { global_deletes: Vec>, eq_deletes_by_partition: HashMap>>, pos_deletes_by_partition: HashMap>>, - pos_deletes_by_path: HashMap>>, + // TODO: do we need this? + // pos_deletes_by_path: HashMap>>, + + // TODO: Deletion Vector support } impl DeleteFileIndex { @@ -75,7 +79,7 @@ impl DeleteFileIndex { /// Gets all the delete files that apply to the specified data file. /// - /// Returns a future that resolves to a Vec + /// Returns a future that resolves to a Result> pub(crate) fn get_deletes_for_data_file<'a>( &self, data_file: &'a DataFile, @@ -95,60 +99,41 @@ impl PopulatedDeleteFileIndex { HashMap::default(); let mut pos_deletes_by_partition: HashMap>> = HashMap::default(); - let mut pos_deletes_by_path: HashMap>> = - HashMap::default(); - files.into_iter().for_each(|del_file_ctx| { - let arc_del_file_ctx = Arc::new(del_file_ctx); - match arc_del_file_ctx.manifest_entry.content_type() { - DataContentType::PositionDeletes => { - // TODO: implement logic from ContentFileUtil.referencedDataFile - // see https://github.com/apache/iceberg/blob/cdf748e8e5537f13d861aa4c617a51f3e11dc97c/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java#L54 - let referenced_data_file_path = "TODO".to_string(); - - pos_deletes_by_path - .entry(referenced_data_file_path) - .and_modify(|entry| { - entry.push(arc_del_file_ctx.clone()); - }) - .or_insert(vec![arc_del_file_ctx.clone()]); - - pos_deletes_by_partition - .entry( - arc_del_file_ctx - .manifest_entry - .data_file() - .partition() - .clone(), - ) - .and_modify(|entry| { - entry.push(arc_del_file_ctx.clone()); - }) - .or_insert(vec![arc_del_file_ctx.clone()]); - } - DataContentType::EqualityDeletes => { - eq_deletes_by_partition - .entry( - arc_del_file_ctx - .manifest_entry - .data_file() - .partition() - .clone(), - ) - .and_modify(|entry| { - entry.push(arc_del_file_ctx.clone()); - }) - .or_insert(vec![arc_del_file_ctx.clone()]); + let mut global_deletes: Vec> = vec![]; + + files.into_iter().for_each(|ctx| { + let arc_ctx = Arc::new(ctx); + + let partition = arc_ctx.manifest_entry.data_file().partition(); + + // The spec states that "Equality delete files stored with an unpartitioned spec are applied as global deletes". + if partition.fields().is_empty() { + // TODO: confirm we're good to skip here if we encounter a pos del + if arc_ctx.manifest_entry.content_type() != DataContentType::PositionDeletes { + global_deletes.push(arc_ctx); + return; } - _ => unreachable!(), } + + let destination_map = match arc_ctx.manifest_entry.content_type() { + DataContentType::PositionDeletes => &mut pos_deletes_by_partition, + DataContentType::EqualityDeletes => &mut eq_deletes_by_partition, + _ => unreachable!(), + }; + + destination_map + .entry(partition.clone()) + .and_modify(|entry| { + entry.push(arc_ctx.clone()); + }) + .or_insert(vec![arc_ctx.clone()]); }); PopulatedDeleteFileIndex { - global_deletes: vec![], + global_deletes, eq_deletes_by_partition, pos_deletes_by_partition, - pos_deletes_by_path, } } @@ -158,33 +143,47 @@ impl PopulatedDeleteFileIndex { data_file: &DataFile, seq_num: Option, ) -> Vec { - let mut deletes_queue = vec![]; - - if let Some(deletes) = self.pos_deletes_by_path.get(data_file.file_path()) { - deletes_queue.extend(deletes.iter()); - } - - if let Some(deletes) = self.pos_deletes_by_partition.get(data_file.partition()) { - deletes_queue.extend(deletes.iter()); - } + let mut results = vec![]; - if let Some(deletes) = self.eq_deletes_by_partition.get(data_file.partition()) { - deletes_queue.extend(deletes.iter()); - } - - deletes_queue + self.global_deletes .iter() + // filter that returns true if the provided delete file's sequence number is **greater than or equal to** `seq_num` .filter(|&delete| { seq_num - .map(|seq_num| delete.manifest_entry.sequence_number() > Some(seq_num)) + .map(|seq_num| delete.manifest_entry.sequence_number() >= Some(seq_num)) .unwrap_or_else(|| true) }) - .map(|delete| FileScanTaskDeleteFile { - file_path: delete.manifest_entry.file_path().to_string(), - file_type: delete.manifest_entry.content_type(), - partition_spec_id: delete.partition_spec_id, - }) - .collect() + .for_each(|delete| results.push(delete.as_ref().into())); + + if let Some(deletes) = self.eq_deletes_by_partition.get(data_file.partition()) { + deletes + .iter() + // filter that returns true if the provided delete file's sequence number is **greater than or equal to** `seq_num` + .filter(|&delete| { + seq_num + .map(|seq_num| delete.manifest_entry.sequence_number() >= Some(seq_num)) + .unwrap_or_else(|| true) + }) + .for_each(|delete| results.push(delete.as_ref().into())); + } + + // TODO: the spec states that: + // "The data file's file_path is equal to the delete file's referenced_data_file if it is non-null". + // we're not yet doing that here. The referenced data file's name will also be present in the positional + // delete file's file path column. + if let Some(deletes) = self.pos_deletes_by_partition.get(data_file.partition()) { + deletes + .iter() + // filter that returns true if the provided delete file's sequence number is **greater thano** `seq_num` + .filter(|&delete| { + seq_num + .map(|seq_num| delete.manifest_entry.sequence_number() > Some(seq_num)) + .unwrap_or_else(|| true) + }) + .for_each(|delete| results.push(delete.as_ref().into())); + } + + results } } @@ -196,18 +195,17 @@ pub(crate) struct DeletesForDataFile<'a> { } impl Future for DeletesForDataFile<'_> { - type Output = Vec; + type Output = Result>; fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { - let Ok(guard) = self.state.try_read() else { - return Poll::Pending; - }; - - match guard.deref() { - DeleteFileIndexState::Populated(idx) => { - Poll::Ready(idx.get_deletes_for_data_file(self.data_file, self.seq_num)) - } - _ => Poll::Pending, + match self.state.try_read() { + Ok(guard) => match guard.deref() { + DeleteFileIndexState::Populated(idx) => Poll::Ready(Ok( + idx.get_deletes_for_data_file(self.data_file, self.seq_num) + )), + _ => Poll::Pending, + }, + Err(err) => Poll::Ready(Err(Error::new(ErrorKind::Unexpected, err.to_string()))), } } } diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 0656fba13..f1afeb2f4 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -684,7 +684,7 @@ impl ManifestEntryContext { self.manifest_entry.data_file(), self.manifest_entry.sequence_number(), ) - .await + .await? } else { vec![] }; @@ -1088,6 +1088,16 @@ pub(crate) struct DeleteFileContext { pub(crate) partition_spec_id: i32, } +impl From<&DeleteFileContext> for FileScanTaskDeleteFile { + fn from(ctx: &DeleteFileContext) -> Self { + FileScanTaskDeleteFile { + file_path: ctx.manifest_entry.file_path().to_string(), + file_type: ctx.manifest_entry.content_type(), + partition_spec_id: ctx.partition_spec_id, + } + } +} + impl FileScanTask { /// Returns the data file path of this file scan task. pub fn data_file_path(&self) -> &str { From b82831528b1093ddde35cfa5def3c5b1f1cad8f4 Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Tue, 24 Dec 2024 08:38:41 +0000 Subject: [PATCH 11/11] style: reformat with cargo fmt --- crates/iceberg/src/arrow/reader.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index ffe4f5d3a..b35beb1f9 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -47,12 +47,8 @@ use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator; use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator; use crate::expr::{BoundPredicate, BoundReference}; use crate::io::{FileIO, FileMetadata, FileRead}; -use crate::scan::{ - ArrowRecordBatchStream, FileScanTask, FileScanTaskStream, -}; -use crate::spec::{ - Datum, PrimitiveType, Schema, -}; +use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream}; +use crate::spec::{Datum, PrimitiveType, Schema}; use crate::utils::available_parallelism; use crate::{Error, ErrorKind};