Skip to content

Commit

Permalink
[HSTACK] Push projection_deep down the physical nodes path
Browse files Browse the repository at this point in the history
  • Loading branch information
adragomir committed Aug 23, 2024
1 parent d262b5f commit 0a278ae
Show file tree
Hide file tree
Showing 12 changed files with 342 additions and 17 deletions.
94 changes: 94 additions & 0 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use datafusion_catalog::Session;
use futures::{future, stream, StreamExt, TryStreamExt};
use itertools::Itertools;
use object_store::ObjectStore;
use datafusion_common::deep::try_rewrite_schema_opt;

/// Configuration for creating a [`ListingTable`]
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -822,6 +823,99 @@ impl TableProvider for ListingTable {
.await
}

async fn scan_deep(
&self,
state: &dyn Session,
projection: Option<&Vec<usize>>,
projection_deep: Option<&HashMap<usize, Vec<String>>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let session_state = state.as_any().downcast_ref::<SessionState>().unwrap();
let (mut partitioned_file_lists, statistics) =
self.list_files_for_scan(session_state, filters, limit).await?;

// let projected_schema = project_schema(&self.schema(), projection)?;
let projected_schema = try_rewrite_schema_opt(
self.schema(),
projection,
projection_deep
)?;

// if no files need to be read, return an `EmptyExec`
if partitioned_file_lists.is_empty() {
return Ok(Arc::new(EmptyExec::new(projected_schema)));
}

let output_ordering = self.try_create_output_ordering()?;
match state
.config_options()
.execution
.split_file_groups_by_statistics
.then(|| {
output_ordering.first().map(|output_ordering| {
FileScanConfig::split_groups_by_statistics(
&self.table_schema,
&partitioned_file_lists,
output_ordering,
)
})
})
.flatten()
{
Some(Err(e)) => log::debug!("failed to split file groups by statistics: {e}"),
Some(Ok(new_groups)) => {
if new_groups.len() <= self.options.target_partitions {
partitioned_file_lists = new_groups;
} else {
log::debug!("attempted to split file groups by statistics, but there were more file groups than target_partitions; falling back to unordered")
}
}
None => {} // no ordering required
};

// extract types of partition columns
let table_partition_cols = self
.options
.table_partition_cols
.iter()
.map(|col| Ok(self.table_schema.field_with_name(&col.0)?.clone()))
.collect::<Result<Vec<_>>>()?;

let filters = if let Some(expr) = conjunction(filters.to_vec()) {
// NOTE: Use the table schema (NOT file schema) here because `expr` may contain references to partition columns.
let table_df_schema = self.table_schema.as_ref().clone().to_dfschema()?;
let filters =
create_physical_expr(&expr, &table_df_schema, state.execution_props())?;
Some(filters)
} else {
None
};

let object_store_url = if let Some(url) = self.table_paths.first() {
url.object_store()
} else {
return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty()))));
};

// create the execution plan
self.options
.format
.create_physical_plan(
session_state,
FileScanConfig::new(object_store_url, Arc::clone(&self.file_schema))
.with_file_groups(partitioned_file_lists)
.with_statistics(statistics)
.with_projection(projection.cloned())
.with_projection_deep(projection_deep.cloned())
.with_limit(limit)
.with_output_ordering(output_ordering)
.with_table_partition_cols(table_partition_cols),
filters.as_ref(),
)
.await
}

fn supports_filters_pushdown(
&self,
filters: &[&Expr],
Expand Down
31 changes: 28 additions & 3 deletions datafusion/core/src/datasource/physical_plan/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ use datafusion_common::stats::Precision;
use datafusion_common::{exec_err, ColumnStatistics, DataFusionError, Statistics};
use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};

use log::warn;
use log::{trace, warn};
use datafusion_common::deep::rewrite_field_projection;

/// Convert type to a type suitable for use as a [`ListingTable`]
/// partition column. Returns `Dictionary(UInt16, val_type)`, which is
Expand Down Expand Up @@ -121,6 +122,9 @@ pub struct FileScanConfig {
/// Columns on which to project the data. Indexes that are higher than the
/// number of columns of `file_schema` refer to `table_partition_cols`.
pub projection: Option<Vec<usize>>,
/// Columns on which to project the data. Indexes that are higher than the
/// number of columns of `file_schema` refer to `table_partition_cols`.
pub projection_deep: Option<HashMap<usize, Vec<String>>>,
/// The maximum number of records to read from this plan. If `None`,
/// all records after filtering are returned.
pub limit: Option<usize>,
Expand Down Expand Up @@ -149,6 +153,7 @@ impl FileScanConfig {
file_groups: vec![],
statistics,
projection: None,
projection_deep: None,
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
Expand All @@ -167,6 +172,13 @@ impl FileScanConfig {
self
}

/// Set the projection of the files
pub fn with_projection_deep(mut self, projection_deep: Option<HashMap<usize, Vec<String>>>) -> Self {
self.projection_deep = projection_deep;
self
}


/// Set the limit of the files
pub fn with_limit(mut self, limit: Option<usize>) -> Self {
self.limit = limit;
Expand Down Expand Up @@ -232,8 +244,21 @@ impl FileScanConfig {
let mut table_cols_stats = vec![];
for idx in proj_iter {
if idx < self.file_schema.fields().len() {
let field = self.file_schema.field(idx);
table_fields.push(field.clone());

let output_field = match &self.projection_deep {
None => {
self.file_schema.field(idx).clone()
}
Some(projection_deep) => {
trace!("FileScanConfig::project DEEP PROJECT");
let field_arc = Arc::new(self.file_schema.field(idx).clone());
let rewritten_field_arc = rewrite_field_projection(self.file_schema.clone(), idx, &projection_deep);
trace!("FileScanConfig::project DEEP PROJECT {:#?}", rewritten_field_arc);
rewritten_field_arc.as_ref().clone()
}
};

table_fields.push(output_field);
table_cols_stats.push(self.statistics.column_statistics[idx].clone())
} else {
let partition_idx = idx - self.file_schema.fields().len();
Expand Down
25 changes: 22 additions & 3 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! [`ParquetExec`] Execution plan for reading Parquet files
use std::any::Any;
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;

Expand All @@ -43,7 +44,7 @@ use arrow::datatypes::SchemaRef;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExpr};

use itertools::Itertools;
use log::debug;
use log::{debug, info, trace};

mod access_plan;
mod metrics;
Expand Down Expand Up @@ -374,8 +375,8 @@ impl ParquetExecBuilder {
} = self;

let base_config = file_scan_config;
debug!("Creating ParquetExec, files: {:?}, projection {:?}, predicate: {:?}, limit: {:?}",
base_config.file_groups, base_config.projection, predicate, base_config.limit);
info!("Creating ParquetExec, files: {:?}, projection {:?}, projection deep {:?}, predicate: {:?}, limit: {:?}",
base_config.file_groups, base_config.projection, base_config.projection_deep, predicate, base_config.limit);

let metrics = ExecutionPlanMetricsSet::new();
let predicate_creation_errors =
Expand Down Expand Up @@ -689,6 +690,23 @@ impl ExecutionPlan for ParquetExec {
Some(proj) => proj,
None => (0..self.base_config.file_schema.fields().len()).collect(),
};
let projection_deep = match &self.base_config.projection_deep {
None => {HashMap::new()}
Some(pd) => {
let mut out: HashMap<usize, Vec<String>> = HashMap::new();
for npi in &projection {
match pd.get(npi) {
None => {}
Some(v) => {
out.insert(*npi, v.clone());
}
}

}
out
}
};
trace!("ParquetExec::execute projection={:#?}, projection_deep={:#?}", &projection, &projection_deep);

let parquet_file_reader_factory = self
.parquet_file_reader_factory
Expand All @@ -711,6 +729,7 @@ impl ExecutionPlan for ParquetExec {
let opener = ParquetOpener {
partition_index,
projection: Arc::from(projection),
projection_deep: Arc::new(projection_deep),
batch_size: ctx.session_config().batch_size(),
limit: self.base_config.limit,
predicate: self.predicate.clone(),
Expand Down
Loading

0 comments on commit 0a278ae

Please sign in to comment.