Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix unequal DataFrame column heights from parquet hive scan with filter #21340

Merged
merged 3 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions .github/workflows/benchmark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,16 @@ jobs:
run: maturin build --profile nodebug-release -- -C codegen-units=8 -C lto=thin -C target-cpu=native

- name: Install Polars release build
run: pip install target/wheels/polars*.whl
run: |
pip install target/wheels/polars*.whl

# This workflow builds and installs a wheel, meaning there is no polars.abi3.so under
# py-polars/. This causes a binary not found error if a test tries to import polars in
# a Python executed using `subprocess.check_output([sys.executable])`. Fix this by
# symlinking the binary.
ln -sv \
$(python -c "import importlib; print(importlib.util.find_spec('polars').submodule_search_locations[0] + '/polars.abi3.so')") \
py-polars/polars/polars.abi3.so
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need this for the new test case I added below

This is CI logs for the failure - https://github.com/pola-rs/polars/actions/runs/13417351173/job/37481333425


- name: Set wheel size
run: |
Expand Down Expand Up @@ -119,7 +128,7 @@ jobs:
body: commentBody,
});
}
continue-on-error: true
continue-on-error: true

- name: Run benchmark tests
uses: CodSpeedHQ/action@v3
Expand Down
3 changes: 2 additions & 1 deletion crates/polars-io/src/hive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ pub(crate) fn materialize_hive_partitions<D>(
df: &mut DataFrame,
reader_schema: &polars_schema::Schema<D>,
hive_partition_columns: Option<&[Series]>,
num_rows: usize,
) {
let num_rows = df.height();

if let Some(hive_columns) = hive_partition_columns {
// Insert these hive columns in the order they are stored in the file.
if hive_columns.is_empty() {
Expand Down
22 changes: 7 additions & 15 deletions crates/polars-io/src/ipc/ipc_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,24 +252,22 @@ impl<R: MmapBytesReader> SerReader<R> for IpcReader<R> {

// In case only hive columns are projected, the df would be empty, but we need the row count
// of the file in order to project the correct number of rows for the hive columns.
let (mut df, row_count) = (|| {
let mut df = (|| {
if self.projection.as_ref().is_some_and(|x| x.is_empty()) {
let row_count = if let Some(v) = self.n_rows {
v
} else {
get_row_count(&mut self.reader)? as usize
};
let mut df = DataFrame::empty();
unsafe { df.set_height(row_count) };
let df = DataFrame::empty_with_height(row_count);

return PolarsResult::Ok((df, row_count));
return PolarsResult::Ok(df);
}

if self.memory_map.is_some() && self.reader.to_file().is_some() {
match self.finish_memmapped(None) {
Ok(df) => {
let n = df.height();
return Ok((df, n));
return Ok(df);
},
Err(err) => check_mmap_err(err)?,
}
Expand All @@ -293,17 +291,11 @@ impl<R: MmapBytesReader> SerReader<R> for IpcReader<R> {
let ipc_reader =
read::FileReader::new(self.reader, metadata, self.projection, self.n_rows);
let df = finish_reader(ipc_reader, rechunk, None, None, &schema, self.row_index)?;
let n = df.height();
Ok((df, n))
Ok(df)
})()?;

if let Some(hive_cols) = hive_partition_columns {
materialize_hive_partitions(
&mut df,
reader_schema,
Some(hive_cols.as_slice()),
row_count,
);
materialize_hive_partitions(&mut df, reader_schema, Some(hive_cols.as_slice()));
};

if let Some((col, value)) = include_file_path {
Expand All @@ -314,7 +306,7 @@ impl<R: MmapBytesReader> SerReader<R> for IpcReader<R> {
DataType::String,
AnyValue::StringOwned(value.as_ref().into()),
),
row_count,
df.height(),
))
};
}
Expand Down
30 changes: 5 additions & 25 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,12 +440,7 @@ fn rg_to_dfs_prefiltered(
} else {
df = unsafe { DataFrame::new_no_checks(md.num_rows(), live_columns.clone()) };

materialize_hive_partitions(
&mut df,
schema.as_ref(),
hive_partition_columns,
md.num_rows(),
);
materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns);
let s = predicate.predicate.evaluate_io(&df)?;
let mask = s.bool().expect("filter predicates was not of type boolean");

Expand Down Expand Up @@ -489,12 +484,7 @@ fn rg_to_dfs_prefiltered(

// We don't need to do any further work if there are no dead columns
if dead_idx_to_col_idx.is_empty() {
materialize_hive_partitions(
&mut df,
schema.as_ref(),
hive_partition_columns,
md.num_rows(),
Copy link
Collaborator Author

@nameexhaustion nameexhaustion Feb 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cause was due to materializing md.num_rows() height hive partition columns on a filtered DataFrame that may have less rows.

Fix this by removing the extra n_rows arg. It was needed in the past due to empty DataFrames not having the height property. Now that we have the height property we simply need to ensure it is set to the correct height for empty DataFrames.

);
materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns);

return Ok(Some(df));
}
Expand Down Expand Up @@ -606,12 +596,7 @@ fn rg_to_dfs_prefiltered(
// and the length is given by the parquet file which should always be the same.
let mut df = unsafe { DataFrame::new_no_checks(height, merged) };

materialize_hive_partitions(
&mut df,
schema.as_ref(),
hive_partition_columns,
md.num_rows(),
);
materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns);

PolarsResult::Ok(Some(df))
})
Expand Down Expand Up @@ -713,7 +698,7 @@ fn rg_to_dfs_optionally_par_over_columns(
);
}

materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns, rg_slice.1);
materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns);
apply_predicate(
&mut df,
predicate.as_ref().map(|p| p.predicate.as_ref()),
Expand Down Expand Up @@ -850,12 +835,7 @@ fn rg_to_dfs_par_over_rg(
);
}

materialize_hive_partitions(
&mut df,
schema.as_ref(),
hive_partition_columns,
slice.1,
);
materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns);
apply_predicate(
&mut df,
predicate.as_ref().map(|p| p.predicate.as_ref()),
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/parquet/read/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub fn materialize_empty_df(
.unwrap();
}

materialize_hive_partitions(&mut df, reader_schema, hive_partition_columns, 0);
materialize_hive_partitions(&mut df, reader_schema, hive_partition_columns);

df
}
Expand Down
64 changes: 63 additions & 1 deletion py-polars/tests/unit/io/test_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import urllib.parse
import warnings
from collections import OrderedDict
from datetime import datetime
from datetime import date, datetime
from functools import partial
from pathlib import Path
from typing import Any, Callable
Expand Down Expand Up @@ -888,3 +888,65 @@ def test_hive_auto_enables_when_unspecified_and_hive_schema_passed(
pl.Series("a", [1], dtype=pl.UInt8),
),
)


@pytest.mark.write_disk
@pytest.mark.parametrize("force_single_thread", [True, False])
def test_hive_parquet_prefiltered_20894_21327(
tmp_path: Path, force_single_thread: bool
) -> None:
n_threads = 1 if force_single_thread else pl.thread_pool_size()

file_path = tmp_path / "date=2025-01-01/00000000.parquet"
file_path.parent.mkdir(exist_ok=True, parents=True)

data = pl.DataFrame(
{
"date": [date(2025, 1, 1), date(2025, 1, 1)],
"value": ["1", "2"],
}
)

data.write_parquet(file_path)

import base64
import subprocess

# For security, and for Windows backslashes.
scan_path_b64 = base64.b64encode(str(file_path.absolute()).encode()).decode()

# This is, the easiest way to control the threadpool size so that it is stable.
out = subprocess.check_output(
[
sys.executable,
"-c",
f"""\
import os
os.environ["POLARS_MAX_THREADS"] = "{n_threads}"

import polars as pl
import datetime
import base64

from polars.testing import assert_frame_equal

assert pl.thread_pool_size() == {n_threads}

tmp_path = base64.b64decode("{scan_path_b64}").decode()
df = pl.scan_parquet(tmp_path, hive_partitioning=True).filter(pl.col("value") == "1").collect()
# We need the str() to trigger panic on invalid state
str(df)

assert_frame_equal(df, pl.DataFrame(
[
pl.Series('date', [datetime.date(2025, 1, 1)], dtype=pl.Date),
pl.Series('value', ['1'], dtype=pl.String),
]
))

print("OK", end="")
""",
],
)

assert out == b"OK"
Loading