From 111793250ca30b2f4c6e1c2289426c49e6d78642 Mon Sep 17 00:00:00 2001 From: Andrei Dragomir Date: Fri, 5 Apr 2024 15:20:17 +0300 Subject: [PATCH] [WIP] Column filtering --- datafusion-examples/examples/csv_opener.rs | 1 + .../examples/flight/flight_server.rs | 2 +- datafusion-examples/examples/json_opener.rs | 1 + datafusion/core/benches/sql_query_with_io.rs | 2 +- .../core/src/datasource/file_format/arrow.rs | 3 + .../core/src/datasource/file_format/avro.rs | 1 + .../core/src/datasource/file_format/csv.rs | 2 + .../core/src/datasource/file_format/json.rs | 3 +- .../core/src/datasource/file_format/mod.rs | 4 +- .../src/datasource/file_format/options.rs | 6 +- .../src/datasource/file_format/parquet.rs | 64 +++++++++++++++---- .../core/src/datasource/listing/table.rs | 22 +++++-- .../src/datasource/listing_table_factory.rs | 2 +- .../core/src/datasource/physical_plan/avro.rs | 9 ++- .../physical_plan/file_scan_config.rs | 3 + .../datasource/physical_plan/file_stream.rs | 1 + .../core/src/datasource/physical_plan/json.rs | 10 ++- .../datasource/physical_plan/parquet/mod.rs | 55 +++++++++++++--- datafusion/core/src/execution/context/mod.rs | 2 +- .../combine_partial_final_agg.rs | 1 + .../enforce_distribution.rs | 5 ++ .../physical_optimizer/projection_pushdown.rs | 2 + .../replace_with_order_preserving_variants.rs | 1 + .../core/src/physical_optimizer/test_utils.rs | 2 + datafusion/core/src/test/mod.rs | 3 + datafusion/core/src/test_util/parquet.rs | 1 + .../core/tests/parquet/custom_reader.rs | 1 + .../core/tests/parquet/file_statistics.rs | 1 + datafusion/core/tests/parquet/page_pruning.rs | 3 +- .../core/tests/parquet/schema_coercion.rs | 2 + datafusion/core/tests/path_partition.rs | 2 +- .../proto/src/physical_plan/from_proto.rs | 1 + .../tests/cases/roundtrip_physical_plan.rs | 2 + .../substrait/src/physical_plan/consumer.rs | 1 + .../tests/cases/roundtrip_physical_plan.rs | 1 + 35 files changed, 182 insertions(+), 40 deletions(-) diff --git a/datafusion-examples/examples/csv_opener.rs b/datafusion-examples/examples/csv_opener.rs index 96753c8c52608..de0aa8191a806 100644 --- a/datafusion-examples/examples/csv_opener.rs +++ b/datafusion-examples/examples/csv_opener.rs @@ -67,6 +67,7 @@ async fn main() -> Result<()> { limit: Some(5), table_partition_cols: vec![], output_ordering: vec![], + column_hints: None, }; let result = diff --git a/datafusion-examples/examples/flight/flight_server.rs b/datafusion-examples/examples/flight/flight_server.rs index f9d1b8029f04b..ca2675f322b04 100644 --- a/datafusion-examples/examples/flight/flight_server.rs +++ b/datafusion-examples/examples/flight/flight_server.rs @@ -59,7 +59,7 @@ impl FlightService for FlightServiceImpl { let ctx = SessionContext::new(); let schema = listing_options - .infer_schema(&ctx.state(), &table_path) + .infer_schema(&ctx.state(), &table_path, None) .await .unwrap(); diff --git a/datafusion-examples/examples/json_opener.rs b/datafusion-examples/examples/json_opener.rs index ee33f969caa9f..e9fc092467f5e 100644 --- a/datafusion-examples/examples/json_opener.rs +++ b/datafusion-examples/examples/json_opener.rs @@ -70,6 +70,7 @@ async fn main() -> Result<()> { limit: Some(5), table_partition_cols: vec![], output_ordering: vec![], + column_hints: None, }; let result = diff --git a/datafusion/core/benches/sql_query_with_io.rs b/datafusion/core/benches/sql_query_with_io.rs index 916f48ce40c67..8aa8cd39f696b 100644 --- a/datafusion/core/benches/sql_query_with_io.rs +++ b/datafusion/core/benches/sql_query_with_io.rs @@ -131,7 +131,7 @@ async fn setup_context(object_store: Arc) -> SessionContext { // make sure we actually find the data let path = format!("data://my_store/{table_name}/"); let schema2 = options - .infer_schema(&context.state(), &ListingTableUrl::parse(&path).unwrap()) + .infer_schema(&context.state(), &ListingTableUrl::parse(&path).unwrap(), None) .await .unwrap(); assert_eq!(schema2, schema()); diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs index 9d58465191e17..0dbe07889f6c9 100644 --- a/datafusion/core/src/datasource/file_format/arrow.rs +++ b/datafusion/core/src/datasource/file_format/arrow.rs @@ -76,6 +76,7 @@ impl FileFormat for ArrowFormat { _state: &SessionState, store: &Arc, objects: &[ObjectMeta], + _columns: Option> ) -> Result { let mut schemas = vec![]; for object in objects { @@ -417,6 +418,7 @@ mod tests { &state, &(store.clone() as Arc), &[object_meta.clone()], + None, ) .await?; let actual_fields = inferred_schema @@ -456,6 +458,7 @@ mod tests { &state, &(store.clone() as Arc), &[object_meta.clone()], + None, ) .await; diff --git a/datafusion/core/src/datasource/file_format/avro.rs b/datafusion/core/src/datasource/file_format/avro.rs index 132dae14c684b..e3997ffb7d8aa 100644 --- a/datafusion/core/src/datasource/file_format/avro.rs +++ b/datafusion/core/src/datasource/file_format/avro.rs @@ -50,6 +50,7 @@ impl FileFormat for AvroFormat { _state: &SessionState, store: &Arc, objects: &[ObjectMeta], + _columns: Option>, ) -> Result { let mut schemas = vec![]; for object in objects { diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 645f98cd3fb04..c5162edd99449 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -203,6 +203,7 @@ impl FileFormat for CsvFormat { _state: &SessionState, store: &Arc, objects: &[ObjectMeta], + _columns: Option>, ) -> Result { let mut schemas = vec![]; @@ -675,6 +676,7 @@ mod tests { &state, &(variable_object_store.clone() as Arc), &[object_meta], + None, ) .await?; diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index efc0aa4328d85..e3a8ff1bde750 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -100,6 +100,7 @@ impl FileFormat for JsonFormat { _state: &SessionState, store: &Arc, objects: &[ObjectMeta], + _columns: Option>, ) -> Result { let mut schemas = Vec::new(); let mut records_to_read = self.options.schema_infer_max_rec; @@ -425,7 +426,7 @@ mod tests { let format = JsonFormat::default().with_schema_infer_max_rec(3); let file_schema = format - .infer_schema(&ctx, &store, &[local_unpartitioned_file(filename)]) + .infer_schema(&ctx, &store, &[local_unpartitioned_file(filename)], None) .await .expect("Schema inference"); diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 5ee0f71867031..56f0e849b8c96 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -67,6 +67,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug { state: &SessionState, store: &Arc, objects: &[ObjectMeta], + column_hints: Option>, ) -> Result; /// Infer the statistics for the provided object. The cost and accuracy of the @@ -140,7 +141,7 @@ pub(crate) mod test_util { let store = Arc::new(LocalFileSystem::new()) as _; let meta = local_unpartitioned_file(format!("{store_root}/{file_name}")); - let file_schema = format.infer_schema(state, &store, &[meta.clone()]).await?; + let file_schema = format.infer_schema(state, &store, &[meta.clone()], None).await?; let statistics = format .infer_stats(state, &store, file_schema.clone(), &meta) @@ -165,6 +166,7 @@ pub(crate) mod test_util { limit, table_partition_cols: vec![], output_ordering: vec![], + column_hints: None, }, None, ) diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index f5bd72495d662..c8399d780dfcb 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -202,6 +202,8 @@ pub struct ParquetReadOptions<'a> { pub schema: Option<&'a Schema>, /// Indicates how the file is sorted pub file_sort_order: Vec>, + /// some specific implementations use this to load only specified columns + pub column_hints: Option>, } impl<'a> Default for ParquetReadOptions<'a> { @@ -213,6 +215,7 @@ impl<'a> Default for ParquetReadOptions<'a> { skip_metadata: None, schema: None, file_sort_order: vec![], + column_hints: None, } } } @@ -462,7 +465,7 @@ pub trait ReadOptions<'a> { } self.to_listing_options(config, state.default_table_options()) - .infer_schema(&state, &table_path) + .infer_schema(&state, &table_path, None) .await } } @@ -523,6 +526,7 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> { .with_target_partitions(config.target_partitions()) .with_table_partition_cols(self.table_partition_cols.clone()) .with_file_sort_order(self.file_sort_order.clone()) + .with_column_hints(self.column_hints.clone()) } async fn get_resolved_schema( diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 66f506f9aa2e0..5a4b07c6583f8 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -61,9 +61,7 @@ use parquet::arrow::arrow_writer::{ compute_leaves, get_column_writers, ArrowColumnChunk, ArrowColumnWriter, ArrowLeafColumn, }; -use parquet::arrow::{ - arrow_to_parquet_schema, parquet_to_arrow_schema, AsyncArrowWriter, -}; +use parquet::arrow::{arrow_to_parquet_schema, parquet_to_arrow_schema, AsyncArrowWriter, parquet_to_arrow_schema_by_columns, ProjectionMask}; use parquet::file::footer::{decode_footer, decode_metadata}; use parquet::file::metadata::ParquetMetaData; use parquet::file::properties::WriterProperties; @@ -78,6 +76,7 @@ use futures::{StreamExt, TryStreamExt}; use hashbrown::HashMap; use object_store::path::Path; use object_store::{ObjectMeta, ObjectStore}; +use parquet::schema::types::SchemaDescriptor; /// Initial writing buffer size. Note this is just a size hint for efficiency. It /// will grow beyond the set value if needed. @@ -176,9 +175,10 @@ async fn fetch_schema_with_location( store: &dyn ObjectStore, file: &ObjectMeta, metadata_size_hint: Option, + columns: Option> ) -> Result<(Path, Schema)> { let loc_path = file.location.clone(); - let schema = fetch_schema(store, file, metadata_size_hint).await?; + let schema = fetch_schema(store, file, metadata_size_hint, columns).await?; Ok((loc_path, schema)) } @@ -193,13 +193,16 @@ impl FileFormat for ParquetFormat { state: &SessionState, store: &Arc, objects: &[ObjectMeta], + columns: Option> ) -> Result { + let columns = columns.clone(); let mut schemas: Vec<_> = futures::stream::iter(objects) .map(|object| { fetch_schema_with_location( store.as_ref(), object, self.metadata_size_hint(), + columns.clone(), ) }) .boxed() // Workaround https://github.com/rust-lang/rust/issues/64552 @@ -436,18 +439,55 @@ pub async fn fetch_parquet_metadata( } } +/// Returns the index from the schema descriptor for a certain column path +pub fn find_leaf_id(schema_desc_ptr: &SchemaDescriptor, name: &str) -> Result { + let pos = schema_desc_ptr + .columns() + .iter() + .position(|col| col.path().string() == name); + + pos.ok_or(DataFusionError::Internal(format!("{name} not found"))) +} + +/// Returns vector of indices from the schema descriptor for some column paths +pub fn find_leaf_ids( + schema_desc_ptr: &SchemaDescriptor, + names: Vec, +) -> Result> { + names + .iter() + // .map(|s| s.as_str()) + .map(|name| find_leaf_id(schema_desc_ptr, name.as_str())) + .collect() +} + /// Read and parse the schema of the Parquet file at location `path` async fn fetch_schema( store: &dyn ObjectStore, file: &ObjectMeta, metadata_size_hint: Option, + column_hints: Option>, ) -> Result { let metadata = fetch_parquet_metadata(store, file, metadata_size_hint).await?; let file_metadata = metadata.file_metadata(); - let schema = parquet_to_arrow_schema( - file_metadata.schema_descr(), - file_metadata.key_value_metadata(), - )?; + let schema = match column_hints { + Some(cols) => { + let ids = find_leaf_ids( + metadata.file_metadata().schema_descr(), + cols.clone() + )?; + let mask = ProjectionMask::leaves(metadata.file_metadata().schema_descr(), ids); + parquet_to_arrow_schema_by_columns( + file_metadata.schema_descr(), + mask, + file_metadata.key_value_metadata() + )? + }, + _ => parquet_to_arrow_schema( + file_metadata.schema_descr(), + file_metadata.key_value_metadata(), + )? + }; Ok(schema) } @@ -1149,7 +1189,7 @@ mod tests { let session = SessionContext::new(); let ctx = session.state(); let format = ParquetFormat::default(); - let schema = format.infer_schema(&ctx, &store, &meta).await.unwrap(); + let schema = format.infer_schema(&ctx, &store, &meta, None).await.unwrap(); let stats = fetch_statistics(store.as_ref(), schema.clone(), &meta[0], None).await?; @@ -1198,7 +1238,7 @@ mod tests { let session = SessionContext::new(); let ctx = session.state(); let format = ParquetFormat::default(); - let schema = format.infer_schema(&ctx, &store, &meta).await.unwrap(); + let schema = format.infer_schema(&ctx, &store, &meta, None).await.unwrap(); let order: Vec<_> = ["a", "b", "c", "d"] .into_iter() @@ -1343,7 +1383,7 @@ mod tests { let ctx = session.state(); let format = ParquetFormat::default().with_metadata_size_hint(Some(9)); let schema = format - .infer_schema(&ctx, &store.upcast(), &meta) + .infer_schema(&ctx, &store.upcast(), &meta, None) .await .unwrap(); @@ -1373,7 +1413,7 @@ mod tests { let format = ParquetFormat::default().with_metadata_size_hint(Some(size_hint)); let schema = format - .infer_schema(&ctx, &store.upcast(), &meta) + .infer_schema(&ctx, &store.upcast(), &meta, None) .await .unwrap(); let stats = fetch_statistics( diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 6ee19828f1d48..3d9cace490f46 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -197,7 +197,7 @@ impl ListingTableConfig { match self.options { Some(options) => { let schema = if let Some(url) = self.table_paths.first() { - options.infer_schema(state, url).await? + options.infer_schema(state, url, None).await? } else { Arc::new(Schema::empty()) }; @@ -251,6 +251,8 @@ pub struct ListingOptions { /// multiple equivalent orderings, the outer `Vec` will have a /// single element. pub file_sort_order: Vec>, + /// used to pass column load hints to underlying implementations + pub column_hints: Option> } impl ListingOptions { @@ -268,6 +270,7 @@ impl ListingOptions { collect_stat: true, target_partitions: 1, file_sort_order: vec![], + column_hints: None, } } @@ -418,6 +421,13 @@ impl ListingOptions { self } + /// + /// Set column_hints on [`ListingOptions`] and returns self + pub fn with_column_hints(mut self, column_hints: Option>) -> Self { + self.column_hints = column_hints; + self + } + /// Infer the schema of the files at the given path on the provided object store. /// The inferred schema does not include the partitioning columns. /// @@ -428,6 +438,7 @@ impl ListingOptions { &'a self, state: &SessionState, table_path: &'a ListingTableUrl, + column_hints: Option> ) -> Result { let store = state.runtime_env().object_store(table_path)?; @@ -437,7 +448,7 @@ impl ListingOptions { .try_collect() .await?; - self.format.infer_schema(state, &store, &files).await + self.format.infer_schema(state, &store, &files, column_hints).await } /// Infers the partition columns stored in `LOCATION` and compares @@ -783,6 +794,7 @@ impl TableProvider for ListingTable { file_groups: partitioned_file_lists, statistics, projection: projection.cloned(), + column_hints: self.options.column_hints.clone(), limit, output_ordering: self.try_create_output_ordering()?, table_partition_cols, @@ -1058,7 +1070,7 @@ mod tests { let state = ctx.state(); let opt = ListingOptions::new(Arc::new(ParquetFormat::default())); - let schema = opt.infer_schema(&state, &table_path).await?; + let schema = opt.infer_schema(&state, &table_path, None).await?; let config = ListingTableConfig::new(table_path) .with_listing_options(opt) .with_schema(schema); @@ -1083,7 +1095,7 @@ mod tests { let opt = ListingOptions::new(Arc::new(ParquetFormat::default())) .with_collect_stat(false); - let schema = opt.infer_schema(&state, &table_path).await?; + let schema = opt.infer_schema(&state, &table_path, None).await?; let config = ListingTableConfig::new(table_path) .with_listing_options(opt) .with_schema(schema); @@ -1106,7 +1118,7 @@ mod tests { let ctx = SessionContext::new(); let state = ctx.state(); let options = ListingOptions::new(Arc::new(ParquetFormat::default())); - let schema = options.infer_schema(&state, &table_path).await.unwrap(); + let schema = options.infer_schema(&state, &table_path, None).await.unwrap(); use crate::physical_plan::expressions::col as physical_col; use std::ops::Add; diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 1a0eb34d1234a..ff7eb09be8f6e 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -140,7 +140,7 @@ impl TableProviderFactory for ListingTableFactory { options.validate_partitions(state, &table_path).await?; let resolved_schema = match provided_schema { - None => options.infer_schema(state, &table_path).await?, + None => options.infer_schema(state, &table_path, None).await?, Some(s) => s, }; let config = ListingTableConfig::new(table_path) diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs b/datafusion/core/src/datasource/physical_plan/avro.rs index 4e5140e82d3ff..763b60e409980 100644 --- a/datafusion/core/src/datasource/physical_plan/avro.rs +++ b/datafusion/core/src/datasource/physical_plan/avro.rs @@ -270,7 +270,7 @@ mod tests { let meta = local_unpartitioned_file(filename); let file_schema = AvroFormat {} - .infer_schema(&state, &store, &[meta.clone()]) + .infer_schema(&state, &store, &[meta.clone()], None) .await?; let avro_exec = AvroExec::new(FileScanConfig { @@ -282,6 +282,7 @@ mod tests { limit: None, table_partition_cols: vec![], output_ordering: vec![], + column_hints: None, }); assert_eq!( avro_exec @@ -340,7 +341,7 @@ mod tests { let object_store_url = ObjectStoreUrl::local_filesystem(); let meta = local_unpartitioned_file(filename); let actual_schema = AvroFormat {} - .infer_schema(&state, &object_store, &[meta.clone()]) + .infer_schema(&state, &object_store, &[meta.clone()], None) .await?; let mut builder = SchemaBuilder::from(actual_schema.fields()); @@ -359,6 +360,7 @@ mod tests { limit: None, table_partition_cols: vec![], output_ordering: vec![], + column_hints: None, }); assert_eq!( avro_exec @@ -418,7 +420,7 @@ mod tests { let object_store_url = ObjectStoreUrl::local_filesystem(); let meta = local_unpartitioned_file(filename); let file_schema = AvroFormat {} - .infer_schema(&state, &object_store, &[meta.clone()]) + .infer_schema(&state, &object_store, &[meta.clone()], None) .await?; let mut partitioned_file = PartitionedFile::from(meta); @@ -435,6 +437,7 @@ mod tests { limit: None, table_partition_cols: vec![Field::new("date", DataType::Utf8, false)], output_ordering: vec![], + column_hints: None, }); assert_eq!( avro_exec diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index 1ea411cb6f59f..4fe6906d96296 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -89,6 +89,8 @@ pub struct FileScanConfig { /// Columns on which to project the data. Indexes that are higher than the /// number of columns of `file_schema` refer to `table_partition_cols`. pub projection: Option>, + /// used to pass column load hints to underlying implementation + pub column_hints: Option>, /// The maximum number of records to read from this plan. If `None`, /// all records after filtering are returned. pub limit: Option, @@ -786,6 +788,7 @@ mod tests { statistics, table_partition_cols, output_ordering: vec![], + column_hints: None, } } diff --git a/datafusion/core/src/datasource/physical_plan/file_stream.rs b/datafusion/core/src/datasource/physical_plan/file_stream.rs index 619bcb29e2cc2..0c6430c8bea0c 100644 --- a/datafusion/core/src/datasource/physical_plan/file_stream.rs +++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs @@ -655,6 +655,7 @@ mod tests { limit: self.limit, table_partition_cols: vec![], output_ordering: vec![], + column_hints: None, }; let metrics_set = ExecutionPlanMetricsSet::new(); let file_stream = FileStream::new(&config, 0, self.opener, &metrics_set) diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index 2ec1b91d08ea8..31ae71269d36a 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -437,7 +437,7 @@ mod tests { .object_meta; let schema = JsonFormat::default() .with_file_compression_type(file_compression_type.to_owned()) - .infer_schema(state, &store, &[meta.clone()]) + .infer_schema(state, &store, &[meta.clone()], None) .await .unwrap(); @@ -534,6 +534,8 @@ mod tests { limit: Some(3), table_partition_cols: vec![], output_ordering: vec![], + column_hints: None, + }, file_compression_type.to_owned(), ); @@ -612,6 +614,8 @@ mod tests { limit: Some(3), table_partition_cols: vec![], output_ordering: vec![], + column_hints: None, + }, file_compression_type.to_owned(), ); @@ -659,6 +663,8 @@ mod tests { limit: None, table_partition_cols: vec![], output_ordering: vec![], + column_hints: None, + }, file_compression_type.to_owned(), ); @@ -711,6 +717,8 @@ mod tests { limit: None, table_partition_cols: vec![], output_ordering: vec![], + column_hints: None, + }, file_compression_type.to_owned(), ); diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 73fb82980fc4c..3505bba18dee6 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -45,6 +45,7 @@ use crate::{ use arrow::datatypes::{DataType, SchemaRef}; use arrow::error::ArrowError; +use arrow_schema::Schema; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExpr}; use bytes::Bytes; @@ -70,6 +71,7 @@ mod row_groups; mod statistics; pub use metrics::ParquetFileMetrics; +use crate::datasource::file_format::parquet::find_leaf_ids; /// Execution plan for scanning one or more Parquet partitions #[derive(Debug, Clone)] @@ -392,9 +394,11 @@ impl ExecutionPlan for ParquetExec { }) })?; + // let columns = self.base_config().columns.clone() let opener = ParquetOpener { partition_index, projection: Arc::from(projection), + column_hints: self.base_config.column_hints.clone().map(Arc::new), batch_size: ctx.session_config().batch_size(), limit: self.base_config.limit, predicate: self.predicate.clone(), @@ -429,6 +433,7 @@ impl ExecutionPlan for ParquetExec { struct ParquetOpener { partition_index: usize, projection: Arc<[usize]>, + column_hints: Option>>, batch_size: usize, limit: Option, predicate: Option>, @@ -464,6 +469,7 @@ impl FileOpener for ParquetOpener { let batch_size = self.batch_size; let projection = self.projection.clone(); + let column_hints = self.column_hints.clone(); let projected_schema = SchemaRef::from(self.table_schema.project(&projection)?); let schema_adapter = SchemaAdapter::new(projected_schema); let predicate = self.predicate.clone(); @@ -484,17 +490,42 @@ impl FileOpener for ParquetOpener { let mut builder = ParquetRecordBatchStreamBuilder::new_with_options(reader, options) .await?; + let mut field_mask: Vec = vec![]; + if column_hints.is_some() { + let cols = column_hints.unwrap(); + if cols.len() > 0 { + // let orig_file_schema = builder.schema().clone(); + field_mask = find_leaf_ids( + builder.parquet_schema(), + cols.as_ref().to_vec() + )?; + debug!("Using column hints to load file: {:?}", field_mask.clone()); + } + } - let file_schema = builder.schema().clone(); + let file_schema: Arc = if field_mask.len() > 0 { + let orig_file_schema = builder.schema().clone(); + let filtered = orig_file_schema.fields().filter_leaves(|idx, _| field_mask.contains(&idx)); + Arc::new(Schema::new(filtered)) + } else { + builder.schema().clone() + }; - let (schema_mapping, adapted_projections) = - schema_adapter.map_schema(&file_schema)?; - // let predicate = predicate.map(|p| reassign_predicate_columns(p, builder.schema(), true)).transpose()?; + // println!("file_schema: {:?}", file_schema); + let (schema_mapping, adapted_projections) = schema_adapter.map_schema(&file_schema)?; - let mask = ProjectionMask::roots( - builder.parquet_schema(), - adapted_projections.iter().cloned(), - ); + let mask: ProjectionMask = if field_mask.len() > 0 { + // let orig_file_schema = builder.schema().clone(); + ProjectionMask::leaves( + builder.parquet_schema(), + field_mask, + ) + } else { + ProjectionMask::roots( + builder.parquet_schema(), + adapted_projections.iter().cloned(), + ) + }; // Filter pushdown: evaluate predicates during scan if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() { @@ -901,6 +932,7 @@ mod tests { limit: None, table_partition_cols: vec![], output_ordering: vec![], + column_hints: None, }, predicate, None, @@ -1558,6 +1590,7 @@ mod tests { limit: None, table_partition_cols: vec![], output_ordering: vec![], + column_hints: None, }, None, None, @@ -1592,7 +1625,7 @@ mod tests { let store = Arc::new(LocalFileSystem::new()) as _; let file_schema = ParquetFormat::default() - .infer_schema(&state, &store, &[meta.clone()]) + .infer_schema(&state, &store, &[meta.clone()], None) .await?; let group_empty = vec![vec![file_range(&meta, 0, 2)]]; @@ -1624,7 +1657,7 @@ mod tests { let meta = local_unpartitioned_file(filename); let schema = ParquetFormat::default() - .infer_schema(&state, &store, &[meta.clone()]) + .infer_schema(&state, &store, &[meta.clone()], None) .await .unwrap(); @@ -1679,6 +1712,7 @@ mod tests { ), ], output_ordering: vec![], + column_hints: None, }, None, None, @@ -1746,6 +1780,7 @@ mod tests { limit: None, table_partition_cols: vec![], output_ordering: vec![], + column_hints: None, }, None, None, diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index d83644597e784..6de4c97c4f583 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1053,7 +1053,7 @@ impl SessionContext { let table_path = ListingTableUrl::parse(table_path)?; let resolved_schema = match provided_schema { Some(s) => s, - None => options.infer_schema(&self.state(), &table_path).await?, + None => options.infer_schema(&self.state(), &table_path, options.column_hints.clone()).await?, }; let config = ListingTableConfig::new(table_path) .with_listing_options(options) diff --git a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs index 92787df461d3f..e68b74f1a5380 100644 --- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs +++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs @@ -255,6 +255,7 @@ mod tests { limit: None, table_partition_cols: vec![], output_ordering: vec![], + column_hints: None, }, None, None, diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 14232f4933f87..1bb4d850e638a 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1437,6 +1437,7 @@ pub(crate) mod tests { limit: None, table_partition_cols: vec![], output_ordering, + column_hints: None, }, None, None, @@ -1465,6 +1466,7 @@ pub(crate) mod tests { limit: None, table_partition_cols: vec![], output_ordering, + column_hints: None, }, None, None, @@ -1487,6 +1489,7 @@ pub(crate) mod tests { limit: None, table_partition_cols: vec![], output_ordering, + column_hints: None, }, false, b',', @@ -1517,6 +1520,7 @@ pub(crate) mod tests { limit: None, table_partition_cols: vec![], output_ordering, + column_hints: None, }, false, b',', @@ -3722,6 +3726,7 @@ pub(crate) mod tests { limit: None, table_partition_cols: vec![], output_ordering: vec![], + column_hints: None, }, false, b',', diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index 359916de0f1ea..a809bce2ac2e5 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -1701,6 +1701,7 @@ mod tests { limit: None, table_partition_cols: vec![], output_ordering: vec![vec![]], + column_hints: None, }, false, 0, @@ -1727,6 +1728,7 @@ mod tests { limit: None, table_partition_cols: vec![], output_ordering: vec![vec![]], + column_hints: None, }, false, 0, diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index 9b6e2076912cf..a0b126ba5d3c5 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -1502,6 +1502,7 @@ mod tests { limit: None, table_partition_cols: vec![], output_ordering: vec![sort_exprs], + column_hints: None, }, true, 0, diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs b/datafusion/core/src/physical_optimizer/test_utils.rs index 7bc1eeb7c4a5a..fdb3e46fd8012 100644 --- a/datafusion/core/src/physical_optimizer/test_utils.rs +++ b/datafusion/core/src/physical_optimizer/test_utils.rs @@ -284,6 +284,7 @@ pub fn parquet_exec(schema: &SchemaRef) -> Arc { limit: None, table_partition_cols: vec![], output_ordering: vec![], + column_hints: None, }, None, None, @@ -308,6 +309,7 @@ pub fn parquet_exec_sorted( limit: None, table_partition_cols: vec![], output_ordering: vec![sort_exprs], + column_hints: None, }, None, None, diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index 0042554f6c734..55c227f2a53e4 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -206,6 +206,7 @@ pub fn partitioned_csv_config( limit: None, table_partition_cols: vec![], output_ordering: vec![], + column_hints: None, }) } @@ -292,6 +293,7 @@ pub fn csv_exec_sorted( limit: None, table_partition_cols: vec![], output_ordering: vec![sort_exprs], + column_hints: None, }, false, 0, @@ -353,6 +355,7 @@ pub fn csv_exec_ordered( limit: None, table_partition_cols: vec![], output_ordering: vec![sort_exprs], + column_hints: None, }, true, 0, diff --git a/datafusion/core/src/test_util/parquet.rs b/datafusion/core/src/test_util/parquet.rs index 8113d799a184d..6abf99bdd884d 100644 --- a/datafusion/core/src/test_util/parquet.rs +++ b/datafusion/core/src/test_util/parquet.rs @@ -158,6 +158,7 @@ impl TestParquetFile { limit: None, table_partition_cols: vec![], output_ordering: vec![], + column_hints: None, }; let df_schema = self.schema.clone().to_dfschema_ref()?; diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index 4bacc80579ede..60fbc29687a2f 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -85,6 +85,7 @@ async fn route_data_access_ops_to_parquet_file_reader_factory() { limit: None, table_partition_cols: vec![], output_ordering: vec![], + column_hints: None, }, None, None, diff --git a/datafusion/core/tests/parquet/file_statistics.rs b/datafusion/core/tests/parquet/file_statistics.rs index 9f94a59a3e598..33ab3d2311900 100644 --- a/datafusion/core/tests/parquet/file_statistics.rs +++ b/datafusion/core/tests/parquet/file_statistics.rs @@ -172,6 +172,7 @@ async fn get_listing_table( Arc::new(RuntimeEnv::default()), ), table_path, + None, ) .await .unwrap(); diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index ccaa65b7ee5fc..1c6a5c37e2a63 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -54,7 +54,7 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr) -> ParquetExec { }; let schema = ParquetFormat::default() - .infer_schema(state, &store, &[meta.clone()]) + .infer_schema(state, &store, &[meta.clone()], None) .await .unwrap(); @@ -80,6 +80,7 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr) -> ParquetExec { limit: None, table_partition_cols: vec![], output_ordering: vec![], + column_hints: None, }, Some(predicate), None, diff --git a/datafusion/core/tests/parquet/schema_coercion.rs b/datafusion/core/tests/parquet/schema_coercion.rs index 88f795d2a4fe1..e342498d41da2 100644 --- a/datafusion/core/tests/parquet/schema_coercion.rs +++ b/datafusion/core/tests/parquet/schema_coercion.rs @@ -69,6 +69,7 @@ async fn multi_parquet_coercion() { limit: None, table_partition_cols: vec![], output_ordering: vec![], + column_hints: None, }, None, None, @@ -133,6 +134,7 @@ async fn multi_parquet_coercion_projection() { limit: None, table_partition_cols: vec![], output_ordering: vec![], + column_hints: None, }, None, None, diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/path_partition.rs index dd8eb52f67c71..b5c9342c03f0d 100644 --- a/datafusion/core/tests/path_partition.rs +++ b/datafusion/core/tests/path_partition.rs @@ -576,7 +576,7 @@ async fn register_partitioned_alltypes_parquet( ListingTableUrl::parse(format!("mirror:///{}", store_paths[0])).unwrap(); let file_schema = options - .infer_schema(&ctx.state(), &store_path) + .infer_schema(&ctx.state(), &store_path, None) .await .expect("Parquet schema inference failed"); diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 6184332ea581d..d2ea72f81f039 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -516,6 +516,7 @@ pub fn parse_protobuf_file_scan_config( limit: proto.limit.as_ref().map(|sl| sl.limit as usize), table_partition_cols, output_ordering, + column_hints: None, }) } diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 642860d6397bf..3d698c273d252 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -549,6 +549,7 @@ fn roundtrip_parquet_exec_with_pruning_predicate() -> Result<()> { limit: None, table_partition_cols: vec![], output_ordering: vec![], + column_hints: None, }; let predicate = Arc::new(BinaryExpr::new( @@ -585,6 +586,7 @@ async fn roundtrip_parquet_exec_with_table_partition_cols() -> Result<()> { false, )], output_ordering: vec![], + column_hints: None, }; roundtrip_test(Arc::new(ParquetExec::new( diff --git a/datafusion/substrait/src/physical_plan/consumer.rs b/datafusion/substrait/src/physical_plan/consumer.rs index 11ddb91ad3913..7d547c24d6c14 100644 --- a/datafusion/substrait/src/physical_plan/consumer.rs +++ b/datafusion/substrait/src/physical_plan/consumer.rs @@ -112,6 +112,7 @@ pub async fn from_substrait_rel( limit: None, table_partition_cols: vec![], output_ordering: vec![], + column_hints: None }; if let Some(MaskExpression { select, .. }) = &read.projection { diff --git a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs index 70887e3934919..12954c42b8adc 100644 --- a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs @@ -49,6 +49,7 @@ async fn parquet_exec() -> Result<()> { limit: None, table_partition_cols: vec![], output_ordering: vec![], + column_hints: None, }; let parquet_exec: Arc = Arc::new(ParquetExec::new( scan_config,