From f35e8f97d04385bd9e4413beea9dba0dca11e4da Mon Sep 17 00:00:00 2001 From: Evgenii Kuznetcov Date: Thu, 17 Oct 2024 16:55:55 +0200 Subject: [PATCH 1/2] Fix handling nulls --- Cargo.lock | 2 +- Cargo.toml | 2 +- README.md | 6 +++++- src/expressions.rs | 21 ++++++++++++--------- 4 files changed, 19 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 874871f..ae96916 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -947,7 +947,7 @@ dependencies = [ [[package]] name = "polars-tdigest" -version = "0.1.4" +version = "0.1.5" dependencies = [ "jemallocator", "ordered-float 4.2.0", diff --git a/Cargo.toml b/Cargo.toml index cbf83c0..a43f43d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "polars-tdigest" -version = "0.1.4" +version = "0.1.5" edition = "2021" [lib] diff --git a/README.md b/README.md index d9cfe02..7077a9a 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,11 @@ For an example see the [Yellow Taxi Notebook](./tdigest_yellow_taxi.ipynb). Note ## Dev setup -Setup your virtual environment with a python version `>=3.8`, e.g. use `python -m venv .env`. +Setup your virtual environment with a python version `>=3.8`, e.g. use +```bash +python -m venv .env +source .env/bin/activate +``` . Install the python dependencies used for development: ```bash python -m pip install -r requirements.txt diff --git a/src/expressions.rs b/src/expressions.rs index a405f9c..533f603 100644 --- a/src/expressions.rs +++ b/src/expressions.rs @@ -63,7 +63,11 @@ fn estimate_median(inputs: &[Series]) -> PolarsResult { } fn tdigest_output(_: &[Field]) -> PolarsResult { - let fields = vec![ + Ok(Field::new("tdigest", DataType::Struct(tdigest_fields()))) +} + +fn tdigest_fields() -> Vec { + vec![ Field::new( "centroids", DataType::List(Box::new(DataType::Struct(vec![ @@ -76,9 +80,7 @@ fn tdigest_output(_: &[Field]) -> PolarsResult { Field::new("max", DataType::Int64), Field::new("count", DataType::Int64), Field::new("max_size", DataType::Int64), - ]; - - Ok(Field::new("tdigest", DataType::Struct(fields))) + ] } // fn tidgest_compute(values: &ChunkedArray) -> Vec { @@ -105,6 +107,7 @@ fn tdigest_output(_: &[Field]) -> PolarsResult { #[polars_expr(output_type_func=tdigest_output)] fn tdigest(inputs: &[Series]) -> PolarsResult { let series = &inputs[0]; + // TODO: pooling is not feasible on small datasets let chunks = match series.dtype() { DataType::Float64 => { let values = series.f64()?; @@ -115,7 +118,7 @@ fn tdigest(inputs: &[Series]) -> PolarsResult { .map(|chunk| { let t = TDigest::new_with_size(100); let array = chunk.as_any().downcast_ref::().unwrap(); - let val_vec: Vec = array.values().iter().copied().collect(); + let val_vec: Vec = array.non_null_values_iter().collect(); t.merge_unsorted(val_vec.to_owned()) }) .collect::>() @@ -132,7 +135,7 @@ fn tdigest(inputs: &[Series]) -> PolarsResult { let t = TDigest::new_with_size(100); let array = chunk.as_any().downcast_ref::().unwrap(); let val_vec: Vec = - array.values().iter().map(|v| (*v as f64)).collect(); + array.non_null_values_iter().map(|v| (v as f64)).collect(); t.merge_unsorted(val_vec.to_owned()) }) .collect::>() @@ -149,7 +152,7 @@ fn tdigest(inputs: &[Series]) -> PolarsResult { let t = TDigest::new_with_size(100); let array = chunk.as_any().downcast_ref::().unwrap(); let val_vec: Vec = - array.values().iter().map(|v| (*v as f64)).collect(); + array.non_null_values_iter().map(|v| (v as f64)).collect(); t.merge_unsorted(val_vec.to_owned()) }) .collect::>() @@ -166,7 +169,7 @@ fn tdigest(inputs: &[Series]) -> PolarsResult { let t = TDigest::new_with_size(100); let array = chunk.as_any().downcast_ref::().unwrap(); let val_vec: Vec = - array.values().iter().map(|v| (*v as f64)).collect(); + array.non_null_values_iter().map(|v| (v as f64)).collect(); t.merge_unsorted(val_vec.to_owned()) }) .collect::>() @@ -187,7 +190,7 @@ fn tdigest(inputs: &[Series]) -> PolarsResult { let file = Cursor::new(&td_json); let df = JsonReader::new(file) .with_json_format(JsonFormat::JsonLines) - .infer_schema_len(Some(3)) + .with_schema(Arc::new(Schema::from_iter(tdigest_fields()))) .with_batch_size(NonZeroUsize::new(3).unwrap()) .finish() .unwrap(); From cee0d133cdf883db4e20edac269d035b308eb762 Mon Sep 17 00:00:00 2001 From: Evgenii Kuznetcov Date: Thu, 17 Oct 2024 17:03:38 +0200 Subject: [PATCH 2/2] fix typo --- src/expressions.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/expressions.rs b/src/expressions.rs index 533f603..166e578 100644 --- a/src/expressions.rs +++ b/src/expressions.rs @@ -263,7 +263,7 @@ fn estimate_quantile(inputs: &[Series], kwargs: MergeTDKwargs) -> PolarsResult = - serde_json::from_str(&json_str).expect("Failed to parse the tigest JSON string"); + serde_json::from_str(&json_str).expect("Failed to parse the tdigest JSON string"); let tdigests: Vec = tdigest_json.into_iter().map(|td| td.tdigest).collect(); let tdigest = TDigest::merge_digests(tdigests);