Skip to content

Commit

Permalink
parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite committed Feb 17, 2025
1 parent 0ad99df commit fa59f2f
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 336 deletions.
15 changes: 1 addition & 14 deletions crates/polars-stream/src/nodes/io_sources/parquet/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ impl ParquetSourceNode {
current_row_group_idx: 0,
current_max_row_group_height: 0,
current_row_offset: 0,
current_shared_file_state: Default::default(),
};

let row_group_decoder = self.init_row_group_decoder();
Expand Down Expand Up @@ -193,13 +192,6 @@ impl ParquetSourceNode {
/// * `self.projected_arrow_schema`
/// * `self.physical_predicate`
pub(super) fn init_row_group_decoder(&self) -> RowGroupDecoder {
let scan_sources = self.scan_sources.clone();
let hive_partitions = self.hive_parts.clone();
let hive_partitions_width = hive_partitions
.as_deref()
.map(|x| x[0].get_statistics().column_stats().len())
.unwrap_or(0);
let include_file_paths = self.file_options.include_file_paths.clone();
let projected_arrow_schema = self.projected_arrow_schema.clone().unwrap();
let row_index = self.row_index.clone();
let min_values_per_thread = self.config.min_values_per_thread;
Expand Down Expand Up @@ -258,10 +250,6 @@ impl ParquetSourceNode {
}

RowGroupDecoder {
scan_sources,
hive_partitions,
hive_partitions_width,
include_file_paths,
reader_schema: self.schema.clone().unwrap(),
projected_arrow_schema,
row_index,
Expand Down Expand Up @@ -294,12 +282,11 @@ impl ParquetSourceNode {

if self.verbose {
eprintln!(
"[ParquetSource]: {} / {} parquet columns to be projected from {} files",
"[ParquetSource]: {} / {} parquet columns to be projected",
self.projected_arrow_schema
.as_ref()
.map_or(reader_schema.len(), |x| x.len()),
reader_schema.len(),
self.scan_sources.len(),
);
}
}
Expand Down
82 changes: 18 additions & 64 deletions crates/polars-stream/src/nodes/io_sources/parquet/metadata_fetch.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
use std::sync::Arc;

use futures::StreamExt;
use polars_error::{polars_bail, PolarsResult};
use polars_error::PolarsResult;
use polars_io::prelude::FileMetadata;
use polars_io::prelude::_internal::ensure_matching_dtypes_if_found;
use polars_io::utils::byte_source::{DynByteSource, MemSliceByteSource};
use polars_io::utils::slice::SplitSlicePosition;
use polars_utils::mmap::MemSlice;
use polars_utils::IdxSize;

use super::metadata_utils::{ensure_schema_has_projected_fields, read_parquet_metadata_bytes};
use super::metadata_utils::read_parquet_metadata_bytes;
use super::ParquetSourceNode;
use crate::async_executor;
use crate::async_primitives::connector::connector;
Expand All @@ -28,15 +27,13 @@ impl ParquetSourceNode {
usize,
usize,
Arc<DynByteSource>,
FileMetadata,
Arc<FileMetadata>,
)>,
task_handles_ext::AbortOnDropHandle<PolarsResult<()>>,
) {
let verbose = self.verbose;
let io_runtime = polars_io::pl_async::get_runtime();

let projected_arrow_schema = self.projected_arrow_schema.clone().unwrap();

let (normalized_slice_oneshot_tx, normalized_slice_oneshot_rx) =
tokio::sync::oneshot::channel();
let (mut metadata_tx, metadata_rx) = connector();
Expand All @@ -51,32 +48,30 @@ impl ParquetSourceNode {
}

let fetch_metadata_bytes_for_path_index = {
let scan_sources = &self.scan_sources;
let scan_source = &self.scan_source;
let cloud_options = Arc::new(self.cloud_options.clone());

let scan_sources = scan_sources.clone();
let scan_source = scan_source.clone();
let cloud_options = cloud_options.clone();
let byte_source_builder = byte_source_builder.clone();
let have_first_metadata = self.first_metadata.is_some();

move |path_idx: usize| {
let scan_sources = scan_sources.clone();
let scan_source = scan_source.clone();
let cloud_options = cloud_options.clone();
let byte_source_builder = byte_source_builder.clone();

let handle = io_runtime.spawn(async move {
let mut byte_source = Arc::new(
scan_sources
.get(path_idx)
.unwrap()
scan_source
.as_scan_source_ref()
.to_dyn_byte_source(
&byte_source_builder,
cloud_options.as_ref().as_ref(),
)
.await?,
);

if path_idx == 0 && have_first_metadata {
if path_idx == 0 {
let metadata_bytes = MemSlice::EMPTY;
return Ok((0, byte_source, metadata_bytes));
}
Expand Down Expand Up @@ -107,49 +102,17 @@ impl ParquetSourceNode {
}
};

let first_metadata = self.first_metadata.clone();
let first_schema = self.schema.clone().unwrap();
let has_projection = self.file_options.with_columns.is_some();
let allow_missing_columns = self.file_options.allow_missing_columns;
let metadata = self.metadata.clone();

let process_metadata_bytes = {
move |handle: task_handles_ext::AbortOnDropHandle<
PolarsResult<(usize, Arc<DynByteSource>, MemSlice)>,
>| {
let first_schema = first_schema.clone();
let projected_arrow_schema = projected_arrow_schema.clone();
let first_metadata = first_metadata.clone();
let metadata = metadata.clone();
// Run on CPU runtime - metadata deserialization is expensive, especially
// for very wide tables.
let handle = async_executor::spawn(TaskPriority::Low, async move {
let (path_index, byte_source, metadata_bytes) = handle.await.unwrap()?;

let metadata = match first_metadata {
Some(md) if path_index == 0 => Arc::unwrap_or_clone(md),
_ => polars_parquet::parquet::read::deserialize_metadata(
metadata_bytes.as_ref(),
metadata_bytes.len() * 2 + 1024,
)?,
};

let schema = polars_parquet::arrow::read::infer_schema(&metadata)?;

if !has_projection && schema.len() > first_schema.len() {
polars_bail!(
SchemaMismatch:
"parquet file contained extra columns and no selection was given"
)
}

if allow_missing_columns {
ensure_matching_dtypes_if_found(projected_arrow_schema.as_ref(), &schema)?;
} else {
ensure_schema_has_projected_fields(
&schema,
projected_arrow_schema.as_ref(),
)?;
}

let (path_index, byte_source, _) = handle.await.unwrap()?;
PolarsResult::Ok((path_index, byte_source, metadata))
});

Expand Down Expand Up @@ -183,13 +146,13 @@ impl ParquetSourceNode {
.slice
.map(|(offset, len)| offset as usize..offset as usize + len);

let mut metadata_stream = futures::stream::iter(0..self.scan_sources.len())
let mut metadata_stream = futures::stream::iter(0..1)
.map(fetch_metadata_bytes_for_path_index)
.buffered(metadata_prefetch_size)
.map(process_metadata_bytes)
.buffered(metadata_decode_ahead_size);

let scan_sources = self.scan_sources.clone();
let scan_source = self.scan_source.clone();

// We need to be able to both stop early as well as skip values, which is easier to do
// using a custom task instead of futures::stream
Expand Down Expand Up @@ -218,10 +181,7 @@ impl ParquetSourceNode {
format!(
"error at path (index: {}, path: {}): {}",
current_path_index,
scan_sources
.get(current_path_index)
.unwrap()
.to_include_path_name(),
scan_source.as_scan_source_ref().to_include_path_name(),
msg
)
})
Expand Down Expand Up @@ -264,13 +224,7 @@ impl ParquetSourceNode {
if let Some(slice_range) = slice_range.as_ref() {
if *current_row_offset_ref >= slice_range.end {
if verbose {
eprintln!(
"[ParquetSource]: Slice pushdown: \
Stopped reading at file at index {} \
(remaining {} files will not be read)",
current_path_index,
scan_sources.len() - current_path_index - 1,
);
eprintln!("[ParquetSource]: Slice pushdown: Stopped reading",);
}
break;
}
Expand All @@ -284,7 +238,7 @@ impl ParquetSourceNode {
let slice = self.file_options.slice.unwrap();
let slice_start_as_n_from_end = -slice.0 as usize;

let mut metadata_stream = futures::stream::iter((0..self.scan_sources.len()).rev())
let mut metadata_stream = futures::stream::iter((0..1).rev())
.map(fetch_metadata_bytes_for_path_index)
.buffered(metadata_prefetch_size)
.map(process_metadata_bytes)
Expand Down Expand Up @@ -348,7 +302,7 @@ impl ParquetSourceNode {
PolarsResult::Ok((slice_range, processed_metadata_rev, cum_rows))
};

let path_count = self.scan_sources.len();
let path_count = 1;

io_runtime.spawn(async move {
if start_rx.await.is_err() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use polars_core::prelude::{ArrowSchema, DataType};
use polars_error::{polars_bail, PolarsResult};
use polars_error::PolarsResult;
use polars_io::utils::byte_source::{ByteSource, DynByteSource};
use polars_utils::mmap::MemSlice;

Expand Down Expand Up @@ -118,29 +117,3 @@ pub(super) async fn read_parquet_metadata_bytes(
}
}
}

/// Ensures that a parquet file has all the necessary columns for a projection with the correct
/// dtype. There are no ordering requirements and extra columns are permitted.
pub(super) fn ensure_schema_has_projected_fields(
schema: &ArrowSchema,
projected_fields: &ArrowSchema,
) -> PolarsResult<()> {
for field in projected_fields.iter_values() {
// Note: We convert to Polars-native dtypes for timezone normalization.
let expected_dtype = DataType::from_arrow_field(field);
let dtype = {
let Some(field) = schema.get(&field.name) else {
polars_bail!(ColumnNotFound: "error with column selection, consider enabling `allow_missing_columns`: did not find column in file: {}", field.name)
};
DataType::from_arrow_field(field)
};

if dtype != expected_dtype {
polars_bail!(SchemaMismatch: "data type mismatch for column {}: expected: {}, found: {}",
&field.name, expected_dtype, dtype
)
}
}

Ok(())
}
28 changes: 12 additions & 16 deletions crates/polars-stream/src/nodes/io_sources/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ use polars_io::utils::byte_source::DynByteSourceBuilder;
use polars_io::RowIndex;
use polars_parquet::read::read_metadata;
use polars_parquet::read::schema::infer_schema_with_options;
use polars_plan::plans::hive::HivePartitions;
use polars_plan::plans::{FileInfo, ScanSource, ScanSources};
use polars_plan::plans::{FileInfo, ScanSource};
use polars_plan::prelude::FileScanOptions;
use polars_utils::index::AtomicIdxSize;
use polars_utils::pl_str::PlSmallStr;
Expand Down Expand Up @@ -45,14 +44,13 @@ type AsyncTaskData = (

#[allow(clippy::type_complexity)]
pub struct ParquetSourceNode {
scan_sources: ScanSources,
scan_source: ScanSource,
file_info: FileInfo,
hive_parts: Option<Arc<Vec<HivePartitions>>>,
predicate: Option<ScanIOPredicate>,
options: ParquetOptions,
cloud_options: Option<CloudOptions>,
file_options: FileScanOptions,
first_metadata: Option<Arc<FileMetadata>>,
metadata: Arc<FileMetadata>,
// Run-time vars
config: Config,
verbose: bool,
Expand Down Expand Up @@ -86,17 +84,17 @@ struct Config {
#[allow(clippy::too_many_arguments)]
impl ParquetSourceNode {
pub fn new(
scan_sources: ScanSources,
scan_source: ScanSource,
file_info: FileInfo,
predicate: Option<ScanIOPredicate>,
options: ParquetOptions,
cloud_options: Option<CloudOptions>,
mut file_options: FileScanOptions,
first_metadata: Option<Arc<FileMetadata>>,
metadata: Arc<FileMetadata>,
) -> Self {
let verbose = config::verbose();

let byte_source_builder = if scan_sources.is_cloud_url() || config::force_async() {
let byte_source_builder = if scan_source.run_async() {
DynByteSourceBuilder::ObjectStore
} else {
DynByteSourceBuilder::Mmap
Expand All @@ -109,14 +107,13 @@ impl ParquetSourceNode {
.map(|ri| Arc::new((ri.name, AtomicIdxSize::new(ri.offset))));

Self {
scan_sources,
scan_source,
file_info,
hive_parts: None,
predicate,
options,
cloud_options,
file_options,
first_metadata,
metadata,

config: Config {
// Initialized later
Expand Down Expand Up @@ -184,7 +181,7 @@ impl SourceNode for ParquetSourceNode {
eprintln!("[ParquetSource]: {:?}", &self.config);
}

let num_rows = self.first_metadata.as_ref().unwrap().num_rows;
let num_rows = self.metadata.as_ref().num_rows;
self.schema = Some(self.file_info.reader_schema.take().unwrap().unwrap_left());
self.init_projected_arrow_schema();

Expand Down Expand Up @@ -277,8 +274,7 @@ impl MultiScanable for ParquetSourceNode {
cloud_options: Option<&CloudOptions>,
row_index: Option<PlSmallStr>,
) -> PolarsResult<Self> {
let source = source.into_sources();
let memslice = source.at(0).to_memslice()?;
let memslice = source.as_scan_source_ref().to_memslice()?;
let file_metadata = read_metadata(&mut std::io::Cursor::new(memslice.as_ref()))?;

let arrow_schema = infer_schema_with_options(&file_metadata, &None)?;
Expand Down Expand Up @@ -308,7 +304,7 @@ impl MultiScanable for ParquetSourceNode {
options,
cloud_options.cloned(),
file_options,
Some(Arc::new(file_metadata)),
Arc::new(file_metadata),
))
}

Expand Down Expand Up @@ -348,7 +344,7 @@ impl MultiScanable for ParquetSourceNode {
}

async fn unrestricted_row_count(&mut self) -> PolarsResult<IdxSize> {
let num_rows = self.first_metadata.as_ref().unwrap().num_rows;
let num_rows = self.metadata.as_ref().num_rows;
IdxSize::try_from(num_rows)
.map_err(|_| polars_err!(bigidx, ctx = "parquet file", size = num_rows))
}
Expand Down
Loading

0 comments on commit fa59f2f

Please sign in to comment.