Skip to content

Commit

Permalink
DRY
Browse files Browse the repository at this point in the history
  • Loading branch information
evgenii-kuznetcov committed Nov 20, 2024
1 parent 02f8098 commit 2a6658b
Showing 1 changed file with 26 additions and 62 deletions.
88 changes: 26 additions & 62 deletions src/expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,79 +103,43 @@ fn tdigest_fields() -> Vec<Field> {
// chunks
// }

// Todo support other numerical types
#[polars_expr(output_type_func=tdigest_output)]
fn tdigest(inputs: &[Series]) -> PolarsResult<Series> {
let series = &inputs[0];
// TODO: pooling is not feasible on small datasets
let chunks = match series.dtype() {
DataType::Float64 => {
let values = series.f64()?;
let chunks: Vec<TDigest> = POOL.install(|| {
values
.downcast_iter()
.par_bridge()
.map(|chunk| {
let t = TDigest::new_with_size(100);
let array = chunk.as_any().downcast_ref::<Float64Array>().unwrap();
let val_vec: Vec<f64> = array.non_null_values_iter().collect();
t.merge_unsorted(val_vec.to_owned())
})
.collect::<Vec<TDigest>>()
});
chunks
}
DataType::Float32 => {
let values = series.f32()?;
let chunks: Vec<TDigest> = POOL.install(|| {
values
.downcast_iter()
.par_bridge()
.map(|chunk| {
let t = TDigest::new_with_size(100);
let array = chunk.as_any().downcast_ref::<Float32Array>().unwrap();
let val_vec: Vec<f64> =
array.non_null_values_iter().map(|v| (v as f64)).collect();
t.merge_unsorted(val_vec.to_owned())
})
.collect::<Vec<TDigest>>()
});
chunks
}
DataType::Int64 => {
let values = series.i64()?;
macro_rules! gen {
($func:ident, $a_f64:ident, $a_Float64Array: ident) => {
fn $func(series: &Series) -> PolarsResult<Vec<TDigest>> {
let values = series.$a_f64()?;
let chunks: Vec<TDigest> = POOL.install(|| {
values
.downcast_iter()
.par_bridge()
.map(|chunk| {
let t = TDigest::new_with_size(100);
let array = chunk.as_any().downcast_ref::<Int64Array>().unwrap();
let val_vec: Vec<f64> =
array.non_null_values_iter().map(|v| (v as f64)).collect();
let array = chunk.as_any().downcast_ref::<$a_Float64Array>().unwrap();
let val_vec: Vec<f64> = array.non_null_values_iter().map(|v| (v as f64)).collect();
t.merge_unsorted(val_vec.to_owned())
})
.collect::<Vec<TDigest>>()
});
chunks
}
DataType::Int32 => {
let values = series.i32()?;
let chunks: Vec<TDigest> = POOL.install(|| {
values
.downcast_iter()
.par_bridge()
.map(|chunk| {
let t = TDigest::new_with_size(100);
let array = chunk.as_any().downcast_ref::<Int32Array>().unwrap();
let val_vec: Vec<f64> =
array.non_null_values_iter().map(|v| (v as f64)).collect();
t.merge_unsorted(val_vec.to_owned())
})
.collect::<Vec<TDigest>>()
});
chunks
Ok(chunks)
}
};
}

gen!(gen_f64, f64, Float64Array);
gen!(gen_f32, f32, Float32Array);
gen!(gen_i64, i64, Int64Array);
gen!(gen_i32, i32, Int32Array);


// Todo support other numerical types
#[polars_expr(output_type_func=tdigest_output)]
fn tdigest(inputs: &[Series]) -> PolarsResult<Series> {
let series = &inputs[0];
// TODO: pooling is not feasible on small datasets
let chunks = match series.dtype() {
DataType::Float64 => gen_f64(series)?,
DataType::Float32 => gen_f32(series)?,
DataType::Int64 => gen_i64(series)?,
DataType::Int32 => gen_i32(series)?,
_ => polars_bail!(InvalidOperation: "only supported for numerical types"),
};

Expand Down

0 comments on commit 2a6658b

Please sign in to comment.