From fa59f2f6a5ed934a45b174d17cb2d3f22dc68e06 Mon Sep 17 00:00:00 2001 From: coastalwhite Date: Mon, 17 Feb 2025 16:32:55 +0100 Subject: [PATCH] parquet --- .../src/nodes/io_sources/parquet/init.rs | 15 +- .../io_sources/parquet/metadata_fetch.rs | 82 ++----- .../io_sources/parquet/metadata_utils.rs | 29 +-- .../src/nodes/io_sources/parquet/mod.rs | 28 +-- .../parquet/row_group_data_fetch.rs | 19 +- .../io_sources/parquet/row_group_decode.rs | 212 ++---------------- .../src/physical_plan/to_graph.rs | 4 +- 7 files changed, 53 insertions(+), 336 deletions(-) diff --git a/crates/polars-stream/src/nodes/io_sources/parquet/init.rs b/crates/polars-stream/src/nodes/io_sources/parquet/init.rs index 1fe6d5e232bd..cb6b7ce8e4e0 100644 --- a/crates/polars-stream/src/nodes/io_sources/parquet/init.rs +++ b/crates/polars-stream/src/nodes/io_sources/parquet/init.rs @@ -72,7 +72,6 @@ impl ParquetSourceNode { current_row_group_idx: 0, current_max_row_group_height: 0, current_row_offset: 0, - current_shared_file_state: Default::default(), }; let row_group_decoder = self.init_row_group_decoder(); @@ -193,13 +192,6 @@ impl ParquetSourceNode { /// * `self.projected_arrow_schema` /// * `self.physical_predicate` pub(super) fn init_row_group_decoder(&self) -> RowGroupDecoder { - let scan_sources = self.scan_sources.clone(); - let hive_partitions = self.hive_parts.clone(); - let hive_partitions_width = hive_partitions - .as_deref() - .map(|x| x[0].get_statistics().column_stats().len()) - .unwrap_or(0); - let include_file_paths = self.file_options.include_file_paths.clone(); let projected_arrow_schema = self.projected_arrow_schema.clone().unwrap(); let row_index = self.row_index.clone(); let min_values_per_thread = self.config.min_values_per_thread; @@ -258,10 +250,6 @@ impl ParquetSourceNode { } RowGroupDecoder { - scan_sources, - hive_partitions, - hive_partitions_width, - include_file_paths, reader_schema: self.schema.clone().unwrap(), projected_arrow_schema, row_index, @@ -294,12 +282,11 @@ impl ParquetSourceNode { if self.verbose { eprintln!( - "[ParquetSource]: {} / {} parquet columns to be projected from {} files", + "[ParquetSource]: {} / {} parquet columns to be projected", self.projected_arrow_schema .as_ref() .map_or(reader_schema.len(), |x| x.len()), reader_schema.len(), - self.scan_sources.len(), ); } } diff --git a/crates/polars-stream/src/nodes/io_sources/parquet/metadata_fetch.rs b/crates/polars-stream/src/nodes/io_sources/parquet/metadata_fetch.rs index e5237cbccc8f..27d7b08d9c08 100644 --- a/crates/polars-stream/src/nodes/io_sources/parquet/metadata_fetch.rs +++ b/crates/polars-stream/src/nodes/io_sources/parquet/metadata_fetch.rs @@ -1,15 +1,14 @@ use std::sync::Arc; use futures::StreamExt; -use polars_error::{polars_bail, PolarsResult}; +use polars_error::PolarsResult; use polars_io::prelude::FileMetadata; -use polars_io::prelude::_internal::ensure_matching_dtypes_if_found; use polars_io::utils::byte_source::{DynByteSource, MemSliceByteSource}; use polars_io::utils::slice::SplitSlicePosition; use polars_utils::mmap::MemSlice; use polars_utils::IdxSize; -use super::metadata_utils::{ensure_schema_has_projected_fields, read_parquet_metadata_bytes}; +use super::metadata_utils::read_parquet_metadata_bytes; use super::ParquetSourceNode; use crate::async_executor; use crate::async_primitives::connector::connector; @@ -28,15 +27,13 @@ impl ParquetSourceNode { usize, usize, Arc, - FileMetadata, + Arc, )>, task_handles_ext::AbortOnDropHandle>, ) { let verbose = self.verbose; let io_runtime = polars_io::pl_async::get_runtime(); - let projected_arrow_schema = self.projected_arrow_schema.clone().unwrap(); - let (normalized_slice_oneshot_tx, normalized_slice_oneshot_rx) = tokio::sync::oneshot::channel(); let (mut metadata_tx, metadata_rx) = connector(); @@ -51,24 +48,22 @@ impl ParquetSourceNode { } let fetch_metadata_bytes_for_path_index = { - let scan_sources = &self.scan_sources; + let scan_source = &self.scan_source; let cloud_options = Arc::new(self.cloud_options.clone()); - let scan_sources = scan_sources.clone(); + let scan_source = scan_source.clone(); let cloud_options = cloud_options.clone(); let byte_source_builder = byte_source_builder.clone(); - let have_first_metadata = self.first_metadata.is_some(); move |path_idx: usize| { - let scan_sources = scan_sources.clone(); + let scan_source = scan_source.clone(); let cloud_options = cloud_options.clone(); let byte_source_builder = byte_source_builder.clone(); let handle = io_runtime.spawn(async move { let mut byte_source = Arc::new( - scan_sources - .get(path_idx) - .unwrap() + scan_source + .as_scan_source_ref() .to_dyn_byte_source( &byte_source_builder, cloud_options.as_ref().as_ref(), @@ -76,7 +71,7 @@ impl ParquetSourceNode { .await?, ); - if path_idx == 0 && have_first_metadata { + if path_idx == 0 { let metadata_bytes = MemSlice::EMPTY; return Ok((0, byte_source, metadata_bytes)); } @@ -107,49 +102,17 @@ impl ParquetSourceNode { } }; - let first_metadata = self.first_metadata.clone(); - let first_schema = self.schema.clone().unwrap(); - let has_projection = self.file_options.with_columns.is_some(); - let allow_missing_columns = self.file_options.allow_missing_columns; + let metadata = self.metadata.clone(); let process_metadata_bytes = { move |handle: task_handles_ext::AbortOnDropHandle< PolarsResult<(usize, Arc, MemSlice)>, >| { - let first_schema = first_schema.clone(); - let projected_arrow_schema = projected_arrow_schema.clone(); - let first_metadata = first_metadata.clone(); + let metadata = metadata.clone(); // Run on CPU runtime - metadata deserialization is expensive, especially // for very wide tables. let handle = async_executor::spawn(TaskPriority::Low, async move { - let (path_index, byte_source, metadata_bytes) = handle.await.unwrap()?; - - let metadata = match first_metadata { - Some(md) if path_index == 0 => Arc::unwrap_or_clone(md), - _ => polars_parquet::parquet::read::deserialize_metadata( - metadata_bytes.as_ref(), - metadata_bytes.len() * 2 + 1024, - )?, - }; - - let schema = polars_parquet::arrow::read::infer_schema(&metadata)?; - - if !has_projection && schema.len() > first_schema.len() { - polars_bail!( - SchemaMismatch: - "parquet file contained extra columns and no selection was given" - ) - } - - if allow_missing_columns { - ensure_matching_dtypes_if_found(projected_arrow_schema.as_ref(), &schema)?; - } else { - ensure_schema_has_projected_fields( - &schema, - projected_arrow_schema.as_ref(), - )?; - } - + let (path_index, byte_source, _) = handle.await.unwrap()?; PolarsResult::Ok((path_index, byte_source, metadata)) }); @@ -183,13 +146,13 @@ impl ParquetSourceNode { .slice .map(|(offset, len)| offset as usize..offset as usize + len); - let mut metadata_stream = futures::stream::iter(0..self.scan_sources.len()) + let mut metadata_stream = futures::stream::iter(0..1) .map(fetch_metadata_bytes_for_path_index) .buffered(metadata_prefetch_size) .map(process_metadata_bytes) .buffered(metadata_decode_ahead_size); - let scan_sources = self.scan_sources.clone(); + let scan_source = self.scan_source.clone(); // We need to be able to both stop early as well as skip values, which is easier to do // using a custom task instead of futures::stream @@ -218,10 +181,7 @@ impl ParquetSourceNode { format!( "error at path (index: {}, path: {}): {}", current_path_index, - scan_sources - .get(current_path_index) - .unwrap() - .to_include_path_name(), + scan_source.as_scan_source_ref().to_include_path_name(), msg ) }) @@ -264,13 +224,7 @@ impl ParquetSourceNode { if let Some(slice_range) = slice_range.as_ref() { if *current_row_offset_ref >= slice_range.end { if verbose { - eprintln!( - "[ParquetSource]: Slice pushdown: \ - Stopped reading at file at index {} \ - (remaining {} files will not be read)", - current_path_index, - scan_sources.len() - current_path_index - 1, - ); + eprintln!("[ParquetSource]: Slice pushdown: Stopped reading",); } break; } @@ -284,7 +238,7 @@ impl ParquetSourceNode { let slice = self.file_options.slice.unwrap(); let slice_start_as_n_from_end = -slice.0 as usize; - let mut metadata_stream = futures::stream::iter((0..self.scan_sources.len()).rev()) + let mut metadata_stream = futures::stream::iter((0..1).rev()) .map(fetch_metadata_bytes_for_path_index) .buffered(metadata_prefetch_size) .map(process_metadata_bytes) @@ -348,7 +302,7 @@ impl ParquetSourceNode { PolarsResult::Ok((slice_range, processed_metadata_rev, cum_rows)) }; - let path_count = self.scan_sources.len(); + let path_count = 1; io_runtime.spawn(async move { if start_rx.await.is_err() { diff --git a/crates/polars-stream/src/nodes/io_sources/parquet/metadata_utils.rs b/crates/polars-stream/src/nodes/io_sources/parquet/metadata_utils.rs index e205db6a7229..4a114ad69abc 100644 --- a/crates/polars-stream/src/nodes/io_sources/parquet/metadata_utils.rs +++ b/crates/polars-stream/src/nodes/io_sources/parquet/metadata_utils.rs @@ -1,5 +1,4 @@ -use polars_core::prelude::{ArrowSchema, DataType}; -use polars_error::{polars_bail, PolarsResult}; +use polars_error::PolarsResult; use polars_io::utils::byte_source::{ByteSource, DynByteSource}; use polars_utils::mmap::MemSlice; @@ -118,29 +117,3 @@ pub(super) async fn read_parquet_metadata_bytes( } } } - -/// Ensures that a parquet file has all the necessary columns for a projection with the correct -/// dtype. There are no ordering requirements and extra columns are permitted. -pub(super) fn ensure_schema_has_projected_fields( - schema: &ArrowSchema, - projected_fields: &ArrowSchema, -) -> PolarsResult<()> { - for field in projected_fields.iter_values() { - // Note: We convert to Polars-native dtypes for timezone normalization. - let expected_dtype = DataType::from_arrow_field(field); - let dtype = { - let Some(field) = schema.get(&field.name) else { - polars_bail!(ColumnNotFound: "error with column selection, consider enabling `allow_missing_columns`: did not find column in file: {}", field.name) - }; - DataType::from_arrow_field(field) - }; - - if dtype != expected_dtype { - polars_bail!(SchemaMismatch: "data type mismatch for column {}: expected: {}, found: {}", - &field.name, expected_dtype, dtype - ) - } - } - - Ok(()) -} diff --git a/crates/polars-stream/src/nodes/io_sources/parquet/mod.rs b/crates/polars-stream/src/nodes/io_sources/parquet/mod.rs index dc058fefa9b4..4ddb1ae2d50d 100644 --- a/crates/polars-stream/src/nodes/io_sources/parquet/mod.rs +++ b/crates/polars-stream/src/nodes/io_sources/parquet/mod.rs @@ -14,8 +14,7 @@ use polars_io::utils::byte_source::DynByteSourceBuilder; use polars_io::RowIndex; use polars_parquet::read::read_metadata; use polars_parquet::read::schema::infer_schema_with_options; -use polars_plan::plans::hive::HivePartitions; -use polars_plan::plans::{FileInfo, ScanSource, ScanSources}; +use polars_plan::plans::{FileInfo, ScanSource}; use polars_plan::prelude::FileScanOptions; use polars_utils::index::AtomicIdxSize; use polars_utils::pl_str::PlSmallStr; @@ -45,14 +44,13 @@ type AsyncTaskData = ( #[allow(clippy::type_complexity)] pub struct ParquetSourceNode { - scan_sources: ScanSources, + scan_source: ScanSource, file_info: FileInfo, - hive_parts: Option>>, predicate: Option, options: ParquetOptions, cloud_options: Option, file_options: FileScanOptions, - first_metadata: Option>, + metadata: Arc, // Run-time vars config: Config, verbose: bool, @@ -86,17 +84,17 @@ struct Config { #[allow(clippy::too_many_arguments)] impl ParquetSourceNode { pub fn new( - scan_sources: ScanSources, + scan_source: ScanSource, file_info: FileInfo, predicate: Option, options: ParquetOptions, cloud_options: Option, mut file_options: FileScanOptions, - first_metadata: Option>, + metadata: Arc, ) -> Self { let verbose = config::verbose(); - let byte_source_builder = if scan_sources.is_cloud_url() || config::force_async() { + let byte_source_builder = if scan_source.run_async() { DynByteSourceBuilder::ObjectStore } else { DynByteSourceBuilder::Mmap @@ -109,14 +107,13 @@ impl ParquetSourceNode { .map(|ri| Arc::new((ri.name, AtomicIdxSize::new(ri.offset)))); Self { - scan_sources, + scan_source, file_info, - hive_parts: None, predicate, options, cloud_options, file_options, - first_metadata, + metadata, config: Config { // Initialized later @@ -184,7 +181,7 @@ impl SourceNode for ParquetSourceNode { eprintln!("[ParquetSource]: {:?}", &self.config); } - let num_rows = self.first_metadata.as_ref().unwrap().num_rows; + let num_rows = self.metadata.as_ref().num_rows; self.schema = Some(self.file_info.reader_schema.take().unwrap().unwrap_left()); self.init_projected_arrow_schema(); @@ -277,8 +274,7 @@ impl MultiScanable for ParquetSourceNode { cloud_options: Option<&CloudOptions>, row_index: Option, ) -> PolarsResult { - let source = source.into_sources(); - let memslice = source.at(0).to_memslice()?; + let memslice = source.as_scan_source_ref().to_memslice()?; let file_metadata = read_metadata(&mut std::io::Cursor::new(memslice.as_ref()))?; let arrow_schema = infer_schema_with_options(&file_metadata, &None)?; @@ -308,7 +304,7 @@ impl MultiScanable for ParquetSourceNode { options, cloud_options.cloned(), file_options, - Some(Arc::new(file_metadata)), + Arc::new(file_metadata), )) } @@ -348,7 +344,7 @@ impl MultiScanable for ParquetSourceNode { } async fn unrestricted_row_count(&mut self) -> PolarsResult { - let num_rows = self.first_metadata.as_ref().unwrap().num_rows; + let num_rows = self.metadata.as_ref().num_rows; IdxSize::try_from(num_rows) .map_err(|_| polars_err!(bigidx, ctx = "parquet file", size = num_rows)) } diff --git a/crates/polars-stream/src/nodes/io_sources/parquet/row_group_data_fetch.rs b/crates/polars-stream/src/nodes/io_sources/parquet/row_group_data_fetch.rs index c1dbd74c2c0a..b719f212658a 100644 --- a/crates/polars-stream/src/nodes/io_sources/parquet/row_group_data_fetch.rs +++ b/crates/polars-stream/src/nodes/io_sources/parquet/row_group_data_fetch.rs @@ -15,19 +15,15 @@ use polars_utils::pl_str::PlSmallStr; use polars_utils::IdxSize; use super::mem_prefetch_funcs; -use super::row_group_decode::SharedFileState; use crate::utils::task_handles_ext; /// Represents byte-data that can be transformed into a DataFrame after some computation. pub(super) struct RowGroupData { pub(super) fetched_bytes: FetchedBytes, - pub(super) path_index: usize, pub(super) row_offset: usize, pub(super) slice: Option<(usize, usize)>, - pub(super) file_max_row_group_height: usize, pub(super) row_group_metadata: RowGroupMetadata, pub(super) sorting_map: PlHashMap, - pub(super) shared_file_state: Arc>, } pub(super) struct RowGroupDataFetcher { @@ -35,7 +31,7 @@ pub(super) struct RowGroupDataFetcher { usize, usize, Arc, - FileMetadata, + Arc, )>, pub(super) use_statistics: bool, pub(super) verbose: bool, @@ -51,7 +47,6 @@ pub(super) struct RowGroupDataFetcher { pub(super) current_row_group_idx: usize, pub(super) current_max_row_group_height: usize, pub(super) current_row_offset: usize, - pub(super) current_shared_file_state: Arc>, } impl RowGroupDataFetcher { @@ -68,9 +63,9 @@ impl RowGroupDataFetcher { // during slice pushdown. self.current_row_offset = row_offset; self.current_row_group_idx = 0; - self.current_row_groups = metadata.row_groups.into_iter(); - self.current_shared_file_state = Default::default(); - + self.current_row_groups = metadata.row_groups.as_slice().to_vec().into_iter(); // @TODO: + // Remove + // clone true } @@ -154,11 +149,8 @@ impl RowGroupDataFetcher { let current_byte_source = self.current_byte_source.clone(); let projection = self.projection.clone(); - let current_shared_file_state = self.current_shared_file_state.clone(); let memory_prefetch_func = self.memory_prefetch_func; let io_runtime = polars_io::pl_async::get_runtime(); - let current_path_index = self.current_path_index; - let current_max_row_group_height = self.current_max_row_group_height; let handle = io_runtime.spawn(async move { let fetched_bytes = if let DynByteSource::MemSlice(mem_slice) = @@ -228,13 +220,10 @@ impl RowGroupDataFetcher { PolarsResult::Ok(RowGroupData { fetched_bytes, - path_index: current_path_index, row_offset: current_row_offset, slice, - file_max_row_group_height: current_max_row_group_height, row_group_metadata, sorting_map, - shared_file_state: current_shared_file_state.clone(), }) }); diff --git a/crates/polars-stream/src/nodes/io_sources/parquet/row_group_decode.rs b/crates/polars-stream/src/nodes/io_sources/parquet/row_group_decode.rs index 8d31d025b3a6..360969a70f7f 100644 --- a/crates/polars-stream/src/nodes/io_sources/parquet/row_group_decode.rs +++ b/crates/polars-stream/src/nodes/io_sources/parquet/row_group_decode.rs @@ -2,21 +2,16 @@ use std::sync::Arc; use polars_core::frame::DataFrame; use polars_core::prelude::{ - AnyValue, ArrowField, ArrowSchema, BooleanChunked, ChunkFilter, Column, DataType, IdxCa, - IntoColumn, + ArrowField, ArrowSchema, BooleanChunked, ChunkFilter, Column, DataType, IdxCa, IntoColumn, }; -use polars_core::scalar::Scalar; use polars_core::series::{IsSorted, Series}; use polars_core::utils::arrow::bitmap::{Bitmap, BitmapBuilder, MutableBitmap}; use polars_error::{polars_bail, PolarsResult}; -use polars_io::hive; use polars_io::predicates::{ColumnPredicateExpr, ScanIOPredicate, SpecializedColumnPredicateExpr}; use polars_io::prelude::_internal::calc_prefilter_cost; pub use polars_io::prelude::_internal::PrefilterMaskSetting; use polars_io::prelude::try_set_sorted_flag; use polars_parquet::read::{Filter, PredicateFilter}; -use polars_plan::plans::hive::HivePartitions; -use polars_plan::plans::ScanSources; use polars_utils::index::AtomicIdxSize; use polars_utils::pl_str::PlSmallStr; use polars_utils::IdxSize; @@ -27,10 +22,6 @@ use crate::nodes::TaskPriority; /// Turns row group data into DataFrames. pub(super) struct RowGroupDecoder { - pub(super) scan_sources: ScanSources, - pub(super) hive_partitions: Option>>, - pub(super) hive_partitions_width: usize, - pub(super) include_file_paths: Option, pub(super) reader_schema: Arc, pub(super) projected_arrow_schema: Arc, pub(super) row_index: Option>, @@ -61,10 +52,7 @@ impl RowGroupDecoder { ) -> PolarsResult { let row_group_data = Arc::new(row_group_data); - let out_width = self.row_index.is_some() as usize - + self.projected_arrow_schema.len() - + self.hive_partitions_width - + self.include_file_paths.is_some() as usize; + let out_width = self.row_index.is_some() as usize + self.projected_arrow_schema.len(); let mut out_columns = Vec::with_capacity(out_width); @@ -89,30 +77,6 @@ impl RowGroupDecoder { let projection_height = slice_range.len(); - let shared_file_state = row_group_data - .shared_file_state - .get_or_init(|| self.shared_file_state_init_func(&row_group_data)) - .await; - - assert_eq!(shared_file_state.path_index, row_group_data.path_index); - - let mut hive_cols_iter = shared_file_state.hive_series.iter().map(|s| { - debug_assert!(s.len() >= projection_height); - s.slice(0, projection_height) - }); - - hive::merge_sorted_to_schema_order( - &mut decoded_cols.into_iter(), - &mut hive_cols_iter, - &self.reader_schema, - &mut out_columns, - ); - - if let Some(file_path_series) = &shared_file_state.file_path_series { - debug_assert!(file_path_series.len() >= projection_height); - out_columns.push(file_path_series.slice(0, projection_height)); - } - let df = unsafe { DataFrame::new_no_checks(projection_height, out_columns) }; let df = if let Some(predicate) = self.predicate.as_ref() { @@ -138,46 +102,6 @@ impl RowGroupDecoder { Ok(df) } - async fn shared_file_state_init_func(&self, row_group_data: &RowGroupData) -> SharedFileState { - let path_index = row_group_data.path_index; - - let hive_series = if let Some(hp) = self.hive_partitions.as_deref() { - let v = hp[path_index].materialize_partition_columns(); - v.into_iter() - .map(|s| { - s.into_column() - .new_from_index(0, row_group_data.file_max_row_group_height) - }) - .collect() - } else { - vec![] - }; - - // @scalar-opt - let file_path_series = self.include_file_paths.clone().map(|file_path_col| { - Column::new_scalar( - file_path_col, - Scalar::new( - DataType::String, - AnyValue::StringOwned( - self.scan_sources - .get(path_index) - .unwrap() - .to_include_path_name() - .into(), - ), - ), - row_group_data.file_max_row_group_height, - ) - }); - - SharedFileState { - path_index, - hive_series, - file_path_series, - } - } - fn materialize_row_index( &self, row_group_data: &RowGroupData, @@ -457,13 +381,6 @@ fn calc_cols_per_thread( parallel.then_some((cols_per_thread, remainder)) } -/// State shared across row groups for a single file. -pub(super) struct SharedFileState { - path_index: usize, - hive_series: Vec, - file_path_series: Option, -} - // Pre-filtered impl RowGroupDecoder { @@ -478,24 +395,11 @@ impl RowGroupDecoder { let row_group_data = Arc::new(row_group_data); let projection_height = row_group_data.row_group_metadata.num_rows(); - let shared_file_state = row_group_data - .shared_file_state - .get_or_init(|| self.shared_file_state_init_func(&row_group_data)) - .await; - - assert_eq!(shared_file_state.path_index, row_group_data.path_index); - let mut live_columns = Vec::with_capacity( - self.row_index.is_some() as usize - + self.predicate_arrow_field_indices.len() - + self.hive_partitions_width - + self.include_file_paths.is_some() as usize, + self.row_index.is_some() as usize + self.predicate_arrow_field_indices.len(), ); let mut masks = Vec::with_capacity( - self.row_index.is_some() as usize - + self.predicate_arrow_field_indices.len() - + self.hive_partitions_width - + self.include_file_paths.is_some() as usize, + self.row_index.is_some() as usize + self.predicate_arrow_field_indices.len(), ); if let Some(s) = self.materialize_row_index( @@ -515,9 +419,7 @@ impl RowGroupDecoder { let mut opt_decode_err = None; let use_column_predicates = scan_predicate.column_predicates.is_sumwise_complete - && self.include_file_paths.is_none() && self.row_index.is_none() - && self.hive_partitions.is_none() && self .predicate_arrow_field_indices .iter() @@ -573,24 +475,14 @@ impl RowGroupDecoder { }, } }); - let hive_cols_iter = shared_file_state.hive_series.iter().map(|s| { - debug_assert!(s.len() >= projection_height); - s.slice(0, projection_height) - }); for (c, m) in decoded_live_cols_iter { live_columns.push(c); masks.push(m); } - live_columns.extend(hive_cols_iter); opt_decode_err.transpose()?; - if let Some(file_path_series) = &shared_file_state.file_path_series { - debug_assert!(file_path_series.len() >= projection_height); - live_columns.push(file_path_series.slice(0, projection_height)); - } - - let (mut live_df_filtered, mask) = if use_column_predicates { + let (live_df_filtered, mask) = if use_column_predicates { assert!(scan_predicate.column_predicates.is_sumwise_complete); if masks.len() == 1 { ( @@ -649,36 +541,8 @@ impl RowGroupDecoder { ) }; - let filtered_height = live_df_filtered.height(); - if self.non_predicate_arrow_field_indices.is_empty() { // User or test may have explicitly requested prefiltering - - hive::merge_sorted_to_schema_order( - unsafe { - &mut live_df_filtered - .get_columns_mut() - .drain(..) - .collect::>() - .into_iter() - }, - &mut shared_file_state - .hive_series - .iter() - .map(|s| s.slice(0, filtered_height)), - &self.reader_schema, - unsafe { live_df_filtered.get_columns_mut() }, - ); - - unsafe { - live_df_filtered.get_columns_mut().extend( - shared_file_state - .file_path_series - .as_ref() - .map(|c| c.slice(0, filtered_height)), - ) - } - return Ok(live_df_filtered); } @@ -700,14 +564,12 @@ impl RowGroupDecoder { let prefilter_cost = calc_prefilter_cost(&mask_bitmap); let expected_num_rows = mask_bitmap.set_bits(); - let mut opt_decode_err = None; - - let mut dead_cols_decode_iter = self + let dead_columns: Vec = self .non_predicate_arrow_field_indices .iter() .map(|&i| self.projected_arrow_schema.get_at_index(i).unwrap()) .map(|(_, arrow_field)| { - match decode_column_prefiltered( + decode_column_prefiltered( arrow_field, &row_group_data, prefilter_cost, @@ -715,65 +577,21 @@ impl RowGroupDecoder { &mask, &mask_bitmap, expected_num_rows, - ) { - Ok(v) => v, - e @ Err(_) => { - opt_decode_err.replace(e); - Column::default() - }, - } - }); + ) + }) + .collect::>>()?; - let live_columns = live_df_filtered.take_columns(); + let mut merged = live_df_filtered.take_columns(); + merged.extend(dead_columns); // dead_columns // [ ..arrow_fields ] // live_df_filtered - // [ row_index?, ..arrow_fields, ..hive_cols, file_path? ] - // We re-use `hive::merge_sorted_to_schema_order()` as it performs most of the merge operation we want. + // [ row_index?, ..arrow_fields ] // But we take out the `row_index` column as it isn't on the correct side. - let mut merged = Vec::with_capacity(live_columns.len() + dead_cols_decode_iter.len()); - - if self.row_index.is_some() { - merged.push(live_columns[0].clone()); - }; - - hive::merge_sorted_to_schema_order( - &mut dead_cols_decode_iter, // df_columns - &mut live_columns - .into_iter() - .skip(self.row_index.is_some() as usize), // hive_columns - &self.projected_arrow_schema, - &mut merged, - ); - - opt_decode_err.transpose()?; - - let mut out = Vec::with_capacity( - merged.len() - + shared_file_state.hive_series.len() - + shared_file_state.file_path_series.is_some() as usize, - ); - - hive::merge_sorted_to_schema_order( - &mut merged.into_iter(), - &mut shared_file_state - .hive_series - .iter() - .map(|s| s.slice(0, filtered_height)), - &self.reader_schema, - &mut out, - ); - - out.extend( - shared_file_state - .file_path_series - .as_ref() - .map(|c| c.slice(0, filtered_height)), - ); - - let df = unsafe { DataFrame::new_no_checks(expected_num_rows, out) }; + let df = unsafe { DataFrame::new_no_checks(expected_num_rows, merged) } + .select(self.reader_schema.iter_names().cloned())?; Ok(df) } } diff --git a/crates/polars-stream/src/physical_plan/to_graph.rs b/crates/polars-stream/src/physical_plan/to_graph.rs index 9e0a85c9b085..6ce1218555c6 100644 --- a/crates/polars-stream/src/physical_plan/to_graph.rs +++ b/crates/polars-stream/src/physical_plan/to_graph.rs @@ -498,13 +498,13 @@ fn to_graph_rec<'a>( } => ctx.graph.add_node( nodes::io_sources::SourceComputeNode::new( nodes::io_sources::parquet::ParquetSourceNode::new( - scan_source.into_sources(), + scan_source, file_info, predicate, options, cloud_options, file_options, - first_metadata, + first_metadata.unwrap(), ), ), [],