Skip to content

Commit 0154829

Browse files
authored
feat: implement manifest filtering in TableScan (#323)
1 parent aba6209 commit 0154829

File tree

1 file changed

+79
-3
lines changed

1 file changed

+79
-3
lines changed

crates/iceberg/src/scan.rs

+79-3
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,20 @@
1818
//! Table scan api.
1919
2020
use crate::arrow::ArrowReaderBuilder;
21+
use crate::expr::visitors::manifest_evaluator::ManifestEvaluator;
22+
use crate::expr::{Bind, Predicate};
2123
use crate::io::FileIO;
22-
use crate::spec::{DataContentType, ManifestEntryRef, SchemaRef, SnapshotRef, TableMetadataRef};
24+
use crate::spec::{
25+
DataContentType, ManifestEntryRef, SchemaRef, SnapshotRef, TableMetadata, TableMetadataRef,
26+
};
2327
use crate::table::Table;
2428
use crate::{Error, ErrorKind};
2529
use arrow_array::RecordBatch;
2630
use async_stream::try_stream;
2731
use futures::stream::{iter, BoxStream};
2832
use futures::StreamExt;
33+
use std::collections::HashMap;
34+
use std::sync::Arc;
2935

3036
/// Builder to create table scan.
3137
pub struct TableScanBuilder<'a> {
@@ -34,6 +40,8 @@ pub struct TableScanBuilder<'a> {
3440
column_names: Vec<String>,
3541
snapshot_id: Option<i64>,
3642
batch_size: Option<usize>,
43+
case_sensitive: bool,
44+
filter: Option<Predicate>,
3745
}
3846

3947
impl<'a> TableScanBuilder<'a> {
@@ -43,6 +51,8 @@ impl<'a> TableScanBuilder<'a> {
4351
column_names: vec![],
4452
snapshot_id: None,
4553
batch_size: None,
54+
case_sensitive: true,
55+
filter: None,
4656
}
4757
}
4858

@@ -53,6 +63,20 @@ impl<'a> TableScanBuilder<'a> {
5363
self
5464
}
5565

66+
/// Sets the scan's case sensitivity
67+
pub fn with_case_sensitive(mut self, case_sensitive: bool) -> Self {
68+
self.case_sensitive = case_sensitive;
69+
self
70+
}
71+
72+
/// Specifies a predicate to use as a filter
73+
pub fn with_filter(mut self, predicate: Predicate) -> Self {
74+
// calls rewrite_not to remove Not nodes, which must be absent
75+
// when applying the manifest evaluator
76+
self.filter = Some(predicate.rewrite_not());
77+
self
78+
}
79+
5680
/// Select all columns.
5781
pub fn select_all(mut self) -> Self {
5882
self.column_names.clear();
@@ -125,6 +149,8 @@ impl<'a> TableScanBuilder<'a> {
125149
column_names: self.column_names,
126150
schema,
127151
batch_size: self.batch_size,
152+
case_sensitive: self.case_sensitive,
153+
filter: self.filter.map(Arc::new),
128154
})
129155
}
130156
}
@@ -139,17 +165,29 @@ pub struct TableScan {
139165
column_names: Vec<String>,
140166
schema: SchemaRef,
141167
batch_size: Option<usize>,
168+
case_sensitive: bool,
169+
filter: Option<Arc<Predicate>>,
142170
}
143171

144172
/// A stream of [`FileScanTask`].
145173
pub type FileScanTaskStream = BoxStream<'static, crate::Result<FileScanTask>>;
146174

147175
impl TableScan {
148176
/// Returns a stream of file scan tasks.
177+
149178
pub async fn plan_files(&self) -> crate::Result<FileScanTaskStream> {
179+
// Cache `ManifestEvaluatorFactory`s created as part of this scan
180+
let mut manifest_evaluator_cache: HashMap<i32, ManifestEvaluator> = HashMap::new();
181+
182+
// these variables needed to ensure that we don't need to pass a
183+
// reference to self into `try_stream`, as it expects references
184+
// passed in to outlive 'static
185+
let schema = self.schema.clone();
150186
let snapshot = self.snapshot.clone();
151187
let table_metadata = self.table_metadata.clone();
152188
let file_io = self.file_io.clone();
189+
let case_sensitive = self.case_sensitive;
190+
let filter = self.filter.clone();
153191

154192
Ok(try_stream! {
155193
let manifest_list = snapshot
@@ -158,8 +196,24 @@ impl TableScan {
158196
.await?;
159197

160198
// Generate data file stream
161-
let mut entries = iter(manifest_list.entries());
162-
while let Some(entry) = entries.next().await {
199+
for entry in manifest_list.entries() {
200+
// If this scan has a filter, check the partition evaluator cache for an existing
201+
// PartitionEvaluator that matches this manifest's partition spec ID.
202+
// Use one from the cache if there is one. If not, create one, put it in
203+
// the cache, and take a reference to it.
204+
#[allow(clippy::map_entry)]
205+
if let Some(filter) = filter.as_ref() {
206+
if !manifest_evaluator_cache.contains_key(&entry.partition_spec_id) {
207+
manifest_evaluator_cache.insert(entry.partition_spec_id, Self::create_manifest_evaluator(entry.partition_spec_id, schema.clone(), table_metadata.clone(), case_sensitive, filter)?);
208+
}
209+
let manifest_evaluator = &manifest_evaluator_cache[&entry.partition_spec_id];
210+
211+
// reject any manifest files whose partition values don't match the filter.
212+
if !manifest_evaluator.eval(entry)? {
213+
continue;
214+
}
215+
}
216+
163217
let manifest = entry.load_manifest(&file_io).await?;
164218

165219
let mut manifest_entries = iter(manifest.entries().iter().filter(|e| e.is_alive()));
@@ -186,6 +240,28 @@ impl TableScan {
186240
.boxed())
187241
}
188242

243+
fn create_manifest_evaluator(
244+
id: i32,
245+
schema: SchemaRef,
246+
table_metadata: Arc<TableMetadata>,
247+
case_sensitive: bool,
248+
filter: &Predicate,
249+
) -> crate::Result<ManifestEvaluator> {
250+
let bound_predicate = filter.bind(schema.clone(), case_sensitive)?;
251+
252+
let partition_spec = table_metadata.partition_spec_by_id(id).ok_or(Error::new(
253+
ErrorKind::Unexpected,
254+
format!("Could not find partition spec for id {id}"),
255+
))?;
256+
257+
ManifestEvaluator::new(
258+
partition_spec.clone(),
259+
schema.clone(),
260+
bound_predicate,
261+
case_sensitive,
262+
)
263+
}
264+
189265
pub async fn to_arrow(&self) -> crate::Result<ArrowRecordBatchStream> {
190266
let mut arrow_reader_builder =
191267
ArrowReaderBuilder::new(self.file_io.clone(), self.schema.clone());

0 commit comments

Comments
 (0)