Skip to content

Commit

Permalink
[WIP] Column filtering
Browse files Browse the repository at this point in the history
  • Loading branch information
adragomir committed Apr 19, 2024
1 parent ee13264 commit 1adaf62
Show file tree
Hide file tree
Showing 35 changed files with 180 additions and 40 deletions.
1 change: 1 addition & 0 deletions datafusion-examples/examples/csv_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ async fn main() -> Result<()> {
limit: Some(5),
table_partition_cols: vec![],
output_ordering: vec![],
column_hints: None,
};

let result =
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/flight/flight_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl FlightService for FlightServiceImpl {

let ctx = SessionContext::new();
let schema = listing_options
.infer_schema(&ctx.state(), &table_path)
.infer_schema(&ctx.state(), &table_path, None)
.await
.unwrap();

Expand Down
1 change: 1 addition & 0 deletions datafusion-examples/examples/json_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ async fn main() -> Result<()> {
limit: Some(5),
table_partition_cols: vec![],
output_ordering: vec![],
column_hints: None,
};

let result =
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/benches/sql_query_with_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ async fn setup_context(object_store: Arc<dyn ObjectStore>) -> SessionContext {
// make sure we actually find the data
let path = format!("data://my_store/{table_name}/");
let schema2 = options
.infer_schema(&context.state(), &ListingTableUrl::parse(&path).unwrap())
.infer_schema(&context.state(), &ListingTableUrl::parse(&path).unwrap(), None)
.await
.unwrap();
assert_eq!(schema2, schema());
Expand Down
3 changes: 3 additions & 0 deletions datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ impl FileFormat for ArrowFormat {
_state: &SessionState,
store: &Arc<dyn ObjectStore>,
objects: &[ObjectMeta],
_columns: Option<Vec<String>>
) -> Result<SchemaRef> {
let mut schemas = vec![];
for object in objects {
Expand Down Expand Up @@ -417,6 +418,7 @@ mod tests {
&state,
&(store.clone() as Arc<dyn ObjectStore>),
&[object_meta.clone()],
None,
)
.await?;
let actual_fields = inferred_schema
Expand Down Expand Up @@ -456,6 +458,7 @@ mod tests {
&state,
&(store.clone() as Arc<dyn ObjectStore>),
&[object_meta.clone()],
None,
)
.await;

Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ impl FileFormat for AvroFormat {
_state: &SessionState,
store: &Arc<dyn ObjectStore>,
objects: &[ObjectMeta],
_columns: Option<Vec<String>>,
) -> Result<SchemaRef> {
let mut schemas = vec![];
for object in objects {
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ impl FileFormat for CsvFormat {
_state: &SessionState,
store: &Arc<dyn ObjectStore>,
objects: &[ObjectMeta],
_columns: Option<Vec<String>>,
) -> Result<SchemaRef> {
let mut schemas = vec![];

Expand Down Expand Up @@ -677,6 +678,7 @@ mod tests {
&state,
&(variable_object_store.clone() as Arc<dyn ObjectStore>),
&[object_meta],
None,
)
.await?;

Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ impl FileFormat for JsonFormat {
_state: &SessionState,
store: &Arc<dyn ObjectStore>,
objects: &[ObjectMeta],
_columns: Option<Vec<String>>,
) -> Result<SchemaRef> {
let mut schemas = Vec::new();
let mut records_to_read = self.options.schema_infer_max_rec;
Expand Down Expand Up @@ -425,7 +426,7 @@ mod tests {
let format = JsonFormat::default().with_schema_infer_max_rec(3);

let file_schema = format
.infer_schema(&ctx, &store, &[local_unpartitioned_file(filename)])
.infer_schema(&ctx, &store, &[local_unpartitioned_file(filename)], None)
.await
.expect("Schema inference");

Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
state: &SessionState,
store: &Arc<dyn ObjectStore>,
objects: &[ObjectMeta],
column_hints: Option<Vec<String>>,
) -> Result<SchemaRef>;

/// Infer the statistics for the provided object. The cost and accuracy of the
Expand Down Expand Up @@ -140,7 +141,7 @@ pub(crate) mod test_util {
let store = Arc::new(LocalFileSystem::new()) as _;
let meta = local_unpartitioned_file(format!("{store_root}/{file_name}"));

let file_schema = format.infer_schema(state, &store, &[meta.clone()]).await?;
let file_schema = format.infer_schema(state, &store, &[meta.clone()], None).await?;

let statistics = format
.infer_stats(state, &store, file_schema.clone(), &meta)
Expand All @@ -165,6 +166,7 @@ pub(crate) mod test_util {
limit,
table_partition_cols: vec![],
output_ordering: vec![],
column_hints: None,
},
None,
)
Expand Down
6 changes: 5 additions & 1 deletion datafusion/core/src/datasource/file_format/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ pub struct ParquetReadOptions<'a> {
pub schema: Option<&'a Schema>,
/// Indicates how the file is sorted
pub file_sort_order: Vec<Vec<Expr>>,
/// some specific implementations use this to load only specified columns
pub column_hints: Option<Vec<String>>,
}

impl<'a> Default for ParquetReadOptions<'a> {
Expand All @@ -213,6 +215,7 @@ impl<'a> Default for ParquetReadOptions<'a> {
skip_metadata: None,
schema: None,
file_sort_order: vec![],
column_hints: None,
}
}
}
Expand Down Expand Up @@ -462,7 +465,7 @@ pub trait ReadOptions<'a> {
}

self.to_listing_options(config, state.default_table_options())
.infer_schema(&state, &table_path)
.infer_schema(&state, &table_path, None)
.await
}
}
Expand Down Expand Up @@ -523,6 +526,7 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> {
.with_target_partitions(config.target_partitions())
.with_table_partition_cols(self.table_partition_cols.clone())
.with_file_sort_order(self.file_sort_order.clone())
.with_column_hints(self.column_hints.clone())
}

async fn get_resolved_schema(
Expand Down
64 changes: 52 additions & 12 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,7 @@ use parquet::arrow::arrow_writer::{
compute_leaves, get_column_writers, ArrowColumnChunk, ArrowColumnWriter,
ArrowLeafColumn,
};
use parquet::arrow::{
arrow_to_parquet_schema, parquet_to_arrow_schema, AsyncArrowWriter,
};
use parquet::arrow::{arrow_to_parquet_schema, parquet_to_arrow_schema, AsyncArrowWriter, parquet_to_arrow_schema_by_columns, ProjectionMask};
use parquet::file::footer::{decode_footer, decode_metadata};
use parquet::file::metadata::ParquetMetaData;
use parquet::file::properties::WriterProperties;
Expand All @@ -78,6 +76,7 @@ use futures::{StreamExt, TryStreamExt};
use hashbrown::HashMap;
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};
use parquet::schema::types::SchemaDescriptor;

/// Initial writing buffer size. Note this is just a size hint for efficiency. It
/// will grow beyond the set value if needed.
Expand Down Expand Up @@ -176,9 +175,10 @@ async fn fetch_schema_with_location(
store: &dyn ObjectStore,
file: &ObjectMeta,
metadata_size_hint: Option<usize>,
columns: Option<Vec<String>>
) -> Result<(Path, Schema)> {
let loc_path = file.location.clone();
let schema = fetch_schema(store, file, metadata_size_hint).await?;
let schema = fetch_schema(store, file, metadata_size_hint, columns).await?;
Ok((loc_path, schema))
}

Expand All @@ -193,13 +193,16 @@ impl FileFormat for ParquetFormat {
state: &SessionState,
store: &Arc<dyn ObjectStore>,
objects: &[ObjectMeta],
columns: Option<Vec<String>>
) -> Result<SchemaRef> {
let columns = columns.clone();
let mut schemas: Vec<_> = futures::stream::iter(objects)
.map(|object| {
fetch_schema_with_location(
store.as_ref(),
object,
self.metadata_size_hint(),
columns.clone(),
)
})
.boxed() // Workaround https://github.com/rust-lang/rust/issues/64552
Expand Down Expand Up @@ -436,18 +439,55 @@ pub async fn fetch_parquet_metadata(
}
}

/// Returns the index from the schema descriptor for a certain column path
pub fn find_leaf_id(schema_desc_ptr: &SchemaDescriptor, name: &str) -> Result<usize> {
let pos = schema_desc_ptr
.columns()
.iter()
.position(|col| col.path().string() == name);

pos.ok_or(DataFusionError::Internal(format!("{name} not found")))
}

/// Returns vector of indices from the schema descriptor for some column paths
pub fn find_leaf_ids(
schema_desc_ptr: &SchemaDescriptor,
names: Vec<String>,
) -> Result<Vec<usize>> {
names
.iter()
// .map(|s| s.as_str())
.map(|name| find_leaf_id(schema_desc_ptr, name.as_str()))
.collect()
}

/// Read and parse the schema of the Parquet file at location `path`
async fn fetch_schema(
store: &dyn ObjectStore,
file: &ObjectMeta,
metadata_size_hint: Option<usize>,
column_hints: Option<Vec<String>>,
) -> Result<Schema> {
let metadata = fetch_parquet_metadata(store, file, metadata_size_hint).await?;
let file_metadata = metadata.file_metadata();
let schema = parquet_to_arrow_schema(
file_metadata.schema_descr(),
file_metadata.key_value_metadata(),
)?;
let schema = match column_hints {
Some(cols) => {
let ids = find_leaf_ids(
metadata.file_metadata().schema_descr(),
cols.clone()
)?;
let mask = ProjectionMask::leaves(metadata.file_metadata().schema_descr(), ids);
parquet_to_arrow_schema_by_columns(
file_metadata.schema_descr(),
mask,
file_metadata.key_value_metadata()
)?
},
_ => parquet_to_arrow_schema(
file_metadata.schema_descr(),
file_metadata.key_value_metadata(),
)?
};
Ok(schema)
}

Expand Down Expand Up @@ -1157,7 +1197,7 @@ mod tests {
let session = SessionContext::new();
let ctx = session.state();
let format = ParquetFormat::default();
let schema = format.infer_schema(&ctx, &store, &meta).await.unwrap();
let schema = format.infer_schema(&ctx, &store, &meta, None).await.unwrap();

let stats =
fetch_statistics(store.as_ref(), schema.clone(), &meta[0], None).await?;
Expand Down Expand Up @@ -1206,7 +1246,7 @@ mod tests {
let session = SessionContext::new();
let ctx = session.state();
let format = ParquetFormat::default();
let schema = format.infer_schema(&ctx, &store, &meta).await.unwrap();
let schema = format.infer_schema(&ctx, &store, &meta, None).await.unwrap();

let order: Vec<_> = ["a", "b", "c", "d"]
.into_iter()
Expand Down Expand Up @@ -1351,7 +1391,7 @@ mod tests {
let ctx = session.state();
let format = ParquetFormat::default().with_metadata_size_hint(Some(9));
let schema = format
.infer_schema(&ctx, &store.upcast(), &meta)
.infer_schema(&ctx, &store.upcast(), &meta, None)
.await
.unwrap();

Expand Down Expand Up @@ -1381,7 +1421,7 @@ mod tests {

let format = ParquetFormat::default().with_metadata_size_hint(Some(size_hint));
let schema = format
.infer_schema(&ctx, &store.upcast(), &meta)
.infer_schema(&ctx, &store.upcast(), &meta, None)
.await
.unwrap();
let stats = fetch_statistics(
Expand Down
20 changes: 15 additions & 5 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ impl ListingTableConfig {
match self.options {
Some(options) => {
let schema = if let Some(url) = self.table_paths.first() {
options.infer_schema(state, url).await?
options.infer_schema(state, url, None).await?
} else {
Arc::new(Schema::empty())
};
Expand Down Expand Up @@ -251,6 +251,8 @@ pub struct ListingOptions {
/// multiple equivalent orderings, the outer `Vec` will have a
/// single element.
pub file_sort_order: Vec<Vec<Expr>>,
/// used to pass column load hints to underlying implementations
pub column_hints: Option<Vec<String>>
}

impl ListingOptions {
Expand All @@ -268,6 +270,7 @@ impl ListingOptions {
collect_stat: true,
target_partitions: 1,
file_sort_order: vec![],
column_hints: None,
}
}

Expand Down Expand Up @@ -418,6 +421,11 @@ impl ListingOptions {
self
}

pub fn with_column_hints(mut self, column_hints: Option<Vec<String>>) -> Self {
self.column_hints = column_hints;
self
}

/// Infer the schema of the files at the given path on the provided object store.
/// The inferred schema does not include the partitioning columns.
///
Expand All @@ -428,6 +436,7 @@ impl ListingOptions {
&'a self,
state: &SessionState,
table_path: &'a ListingTableUrl,
column_hints: Option<Vec<String>>
) -> Result<SchemaRef> {
let store = state.runtime_env().object_store(table_path)?;

Expand All @@ -437,7 +446,7 @@ impl ListingOptions {
.try_collect()
.await?;

self.format.infer_schema(state, &store, &files).await
self.format.infer_schema(state, &store, &files, column_hints).await
}

/// Infers the partition columns stored in `LOCATION` and compares
Expand Down Expand Up @@ -783,6 +792,7 @@ impl TableProvider for ListingTable {
file_groups: partitioned_file_lists,
statistics,
projection: projection.cloned(),
column_hints: self.options.column_hints.clone(),
limit,
output_ordering: self.try_create_output_ordering()?,
table_partition_cols,
Expand Down Expand Up @@ -1063,7 +1073,7 @@ mod tests {
let state = ctx.state();

let opt = ListingOptions::new(Arc::new(ParquetFormat::default()));
let schema = opt.infer_schema(&state, &table_path).await?;
let schema = opt.infer_schema(&state, &table_path, None).await?;
let config = ListingTableConfig::new(table_path)
.with_listing_options(opt)
.with_schema(schema);
Expand All @@ -1088,7 +1098,7 @@ mod tests {

let opt = ListingOptions::new(Arc::new(ParquetFormat::default()))
.with_collect_stat(false);
let schema = opt.infer_schema(&state, &table_path).await?;
let schema = opt.infer_schema(&state, &table_path, None).await?;
let config = ListingTableConfig::new(table_path)
.with_listing_options(opt)
.with_schema(schema);
Expand All @@ -1111,7 +1121,7 @@ mod tests {
let ctx = SessionContext::new();
let state = ctx.state();
let options = ListingOptions::new(Arc::new(ParquetFormat::default()));
let schema = options.infer_schema(&state, &table_path).await.unwrap();
let schema = options.infer_schema(&state, &table_path, None).await.unwrap();

use crate::physical_plan::expressions::col as physical_col;
use std::ops::Add;
Expand Down
Loading

0 comments on commit 1adaf62

Please sign in to comment.