From c6b441180def8580b6324cb3b6ce78413b5b3343 Mon Sep 17 00:00:00 2001 From: Richard Date: Sun, 21 Jun 2026 21:10:55 -0400 Subject: [PATCH 1/5] add benchmarks for single column group-values traits --- datafusion/physical-plan/Cargo.toml | 4 + .../benches/group_values_bytes.rs | 310 ++++++++++++++++++ 2 files changed, 314 insertions(+) create mode 100644 datafusion/physical-plan/benches/group_values_bytes.rs diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index b4fc8f9d01176..f7694b4768a27 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -132,6 +132,10 @@ name = "compute_statistics" harness = false name = "dictionary_group_values" +[[bench]] +harness = false +name = "group_values_bytes" + [[bench]] harness = false name = "hash_join_semi_anti" diff --git a/datafusion/physical-plan/benches/group_values_bytes.rs b/datafusion/physical-plan/benches/group_values_bytes.rs new file mode 100644 index 0000000000000..786c4778578f2 --- /dev/null +++ b/datafusion/physical-plan/benches/group_values_bytes.rs @@ -0,0 +1,310 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Benchmarks for single-column `GroupValues` backed by bytes/boolean types: +//! `GroupValuesBytes` (Utf8/Binary), `GroupValuesBytes` +//! (LargeUtf8/LargeBinary), `GroupValuesBytesView` (Utf8View/BinaryView), and +//! `GroupValuesBoolean`. +//! +//! Each benchmark covers two patterns: +//! - `intern_emit`: one `intern` call followed by `emit(EmitTo::All)`. +//! - `repeated_intern_emit`: N `intern` calls followed by `emit(EmitTo::All)`. + +use arrow::array::{ + ArrayRef, BinaryArray, BinaryViewArray, BooleanArray, LargeBinaryArray, + LargeStringArray, StringArray, StringViewArray, +}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use criterion::{ + BatchSize, BenchmarkId, Criterion, Throughput, criterion_group, criterion_main, +}; +use datafusion_expr::EmitTo; +use datafusion_physical_plan::aggregates::group_values::new_group_values; +use datafusion_physical_plan::aggregates::order::GroupOrdering; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; +use std::hint::black_box; +use std::sync::Arc; + +const SIZES: [usize; 1] = [8 * 1024]; +const CARDINALITIES: [usize; 3] = [20, 300, 1000]; +const N_BATCHES: usize = 4; +const SEED: u64 = 0xB175; + +fn schema(data_type: DataType) -> SchemaRef { + Arc::new(Schema::new(vec![Field::new("g", data_type, true)])) +} + +fn make_strings(size: usize, cardinality: usize, seed: u64) -> Vec { + let values: Vec = (0..cardinality).map(|i| format!("val_{i:08}")).collect(); + let mut rng = StdRng::seed_from_u64(seed); + (0..size) + .map(|_| values[rng.random_range(0..cardinality)].clone()) + .collect() +} + +fn make_utf8(size: usize, cardinality: usize, seed: u64) -> ArrayRef { + let strings = make_strings(size, cardinality, seed); + Arc::new(StringArray::from( + strings.iter().map(String::as_str).collect::>(), + )) +} + +fn make_large_utf8(size: usize, cardinality: usize, seed: u64) -> ArrayRef { + let strings = make_strings(size, cardinality, seed); + Arc::new(LargeStringArray::from( + strings.iter().map(String::as_str).collect::>(), + )) +} + +fn make_utf8view(size: usize, cardinality: usize, seed: u64) -> ArrayRef { + let strings = make_strings(size, cardinality, seed); + Arc::new(StringViewArray::from( + strings.iter().map(String::as_str).collect::>(), + )) +} + +fn make_binary(size: usize, cardinality: usize, seed: u64) -> ArrayRef { + let strings = make_strings(size, cardinality, seed); + Arc::new(BinaryArray::from( + strings.iter().map(|s| s.as_bytes()).collect::>(), + )) +} + +fn make_large_binary(size: usize, cardinality: usize, seed: u64) -> ArrayRef { + let strings = make_strings(size, cardinality, seed); + Arc::new(LargeBinaryArray::from( + strings.iter().map(|s| s.as_bytes()).collect::>(), + )) +} + +fn make_binary_view(size: usize, cardinality: usize, seed: u64) -> ArrayRef { + let strings = make_strings(size, cardinality, seed); + Arc::new(BinaryViewArray::from( + strings.iter().map(|s| s.as_bytes()).collect::>(), + )) +} + +fn make_boolean(size: usize, seed: u64) -> ArrayRef { + let mut rng = StdRng::seed_from_u64(seed); + Arc::new(BooleanArray::from( + (0..size).map(|_| rng.random_bool(0.5)).collect::>(), + )) +} + +struct Case { + name: &'static str, + data_type: DataType, +} + +fn byte_cases() -> Vec { + vec![ + Case { + name: "utf8", + data_type: DataType::Utf8, + }, + Case { + name: "large_utf8", + data_type: DataType::LargeUtf8, + }, + Case { + name: "utf8view", + data_type: DataType::Utf8View, + }, + Case { + name: "binary", + data_type: DataType::Binary, + }, + Case { + name: "large_binary", + data_type: DataType::LargeBinary, + }, + Case { + name: "binary_view", + data_type: DataType::BinaryView, + }, + ] +} + +fn make_array( + data_type: &DataType, + size: usize, + cardinality: usize, + seed: u64, +) -> ArrayRef { + match data_type { + DataType::Utf8 => make_utf8(size, cardinality, seed), + DataType::LargeUtf8 => make_large_utf8(size, cardinality, seed), + DataType::Utf8View => make_utf8view(size, cardinality, seed), + DataType::Binary => make_binary(size, cardinality, seed), + DataType::LargeBinary => make_large_binary(size, cardinality, seed), + DataType::BinaryView => make_binary_view(size, cardinality, seed), + _ => unreachable!(), + } +} + +fn bench_intern_emit(c: &mut Criterion) { + let mut group = c.benchmark_group("group_values_bytes_intern_emit"); + + for case in byte_cases() { + let s = schema(case.data_type.clone()); + for &size in &SIZES { + for &card in &CARDINALITIES { + let array = make_array(&case.data_type, size, card, SEED); + group.throughput(Throughput::Elements(size as u64)); + group.bench_function( + BenchmarkId::new(case.name, format!("size_{size}_card_{card}")), + |b| { + b.iter_batched_ref( + || { + ( + new_group_values(s.clone(), &GroupOrdering::None) + .unwrap(), + Vec::::with_capacity(size), + ) + }, + |(gv, groups)| { + gv.intern(std::slice::from_ref(&array), groups).unwrap(); + black_box(&*groups); + black_box(gv.emit(EmitTo::All).unwrap()); + }, + BatchSize::SmallInput, + ); + }, + ); + } + } + } + + group.finish(); +} + +fn bench_repeated_intern_emit(c: &mut Criterion) { + let mut group = c.benchmark_group("group_values_bytes_repeated_intern_emit"); + + for case in byte_cases() { + let s = schema(case.data_type.clone()); + for &size in &SIZES { + for &card in &CARDINALITIES { + let batches: Vec = (0..N_BATCHES) + .map(|i| { + make_array( + &case.data_type, + size, + card, + SEED.wrapping_add(i as u64), + ) + }) + .collect(); + group.throughput(Throughput::Elements((size * N_BATCHES) as u64)); + group.bench_function( + BenchmarkId::new(case.name, format!("size_{size}_card_{card}")), + |b| { + b.iter_batched_ref( + || { + ( + new_group_values(s.clone(), &GroupOrdering::None) + .unwrap(), + Vec::::with_capacity(size), + ) + }, + |(gv, groups)| { + for arr in &batches { + gv.intern(std::slice::from_ref(arr), groups).unwrap(); + black_box(&*groups); + } + black_box(gv.emit(EmitTo::All).unwrap()); + }, + BatchSize::SmallInput, + ); + }, + ); + } + } + } + + group.finish(); +} + +fn bench_boolean_intern_emit(c: &mut Criterion) { + let mut group = c.benchmark_group("group_values_boolean_intern_emit"); + let s = schema(DataType::Boolean); + + for &size in &SIZES { + let array = make_boolean(size, SEED); + group.throughput(Throughput::Elements(size as u64)); + group.bench_function(BenchmarkId::new("boolean", format!("size_{size}")), |b| { + b.iter_batched_ref( + || { + ( + new_group_values(s.clone(), &GroupOrdering::None).unwrap(), + Vec::::with_capacity(size), + ) + }, + |(gv, groups)| { + gv.intern(std::slice::from_ref(&array), groups).unwrap(); + black_box(&*groups); + black_box(gv.emit(EmitTo::All).unwrap()); + }, + BatchSize::SmallInput, + ); + }); + } + + group.finish(); +} + +fn bench_boolean_repeated_intern_emit(c: &mut Criterion) { + let mut group = c.benchmark_group("group_values_boolean_repeated_intern_emit"); + let s = schema(DataType::Boolean); + + for &size in &SIZES { + let batches: Vec = (0..N_BATCHES) + .map(|i| make_boolean(size, SEED.wrapping_add(i as u64))) + .collect(); + group.throughput(Throughput::Elements((size * N_BATCHES) as u64)); + group.bench_function(BenchmarkId::new("boolean", format!("size_{size}")), |b| { + b.iter_batched_ref( + || { + ( + new_group_values(s.clone(), &GroupOrdering::None).unwrap(), + Vec::::with_capacity(size), + ) + }, + |(gv, groups)| { + for arr in &batches { + gv.intern(std::slice::from_ref(arr), groups).unwrap(); + black_box(&*groups); + } + black_box(gv.emit(EmitTo::All).unwrap()); + }, + BatchSize::SmallInput, + ); + }); + } + + group.finish(); +} + +criterion_group!( + benches, + bench_intern_emit, + bench_repeated_intern_emit, + bench_boolean_intern_emit, + bench_boolean_repeated_intern_emit, +); +criterion_main!(benches); From f77b22b7695002f46b9691aa5801ab734397d6cb Mon Sep 17 00:00:00 2001 From: Richard Date: Thu, 25 Jun 2026 13:20:18 -0400 Subject: [PATCH 2/5] avoid inter() call in emit. introduce arrowbytesmap.emit() perf: zero-copy emit and unified drain for GroupValuesBytes Replace the O(n) Buffer::from(slice) memcpy in ArrowBytesMap::emit() with a zero-copy path: freeze the MutableBuffer (finish()), slice the emitted window from the frozen Arc, and copy only the surviving bytes back into a fresh builder. Add drain_emitted(n) to evict hash table entries for the n emitted values and renumber survivors, enabling EmitTo::First(n) without a full rebuild. Unify GroupValuesBytes::emit() to a single code path for both EmitTo::All and EmitTo::First(n): compute n, call map.emit(n), then map.drain_emitted(n). remove local benchmarks --- .../physical-expr-common/src/binary_map.rs | 94 ++++++ datafusion/physical-plan/Cargo.toml | 4 - .../benches/group_values_bytes.rs | 310 ------------------ .../group_values/single_group_by/bytes.rs | 37 +-- 4 files changed, 100 insertions(+), 345 deletions(-) delete mode 100644 datafusion/physical-plan/benches/group_values_bytes.rs diff --git a/datafusion/physical-expr-common/src/binary_map.rs b/datafusion/physical-expr-common/src/binary_map.rs index ad184d6500d56..9e7c8e3123d16 100644 --- a/datafusion/physical-expr-common/src/binary_map.rs +++ b/datafusion/physical-expr-common/src/binary_map.rs @@ -233,6 +233,9 @@ where /// NOTE null_index is the logical index in the final array, not the index /// in the buffer null: Option<(V, usize)>, + /// Number of distinct values already emitted via [`Self::emit`]. + /// Acts as a cursor into `offsets` and `buffer`. + emitted: usize, } /// The size, in number of entries, of the initial hash table @@ -253,6 +256,7 @@ where random_state: RandomState::default(), hashes_buffer: vec![], null: None, + emitted: 0, } } @@ -476,6 +480,75 @@ where } } + /// Emits the next `n` distinct values as an Arrow array. + /// + /// The builder is frozen into an immutable `Buffer` (zero-copy) and the + /// emitted window is sliced from it directly (zero-copy). Only the + /// surviving bytes are copied back into a fresh builder, so the insertion + /// path is unchanged. + /// + /// Panics if `n` exceeds the number of not-yet-emitted values. + pub fn emit(&mut self, n: usize) -> ArrayRef { + let total = self.offsets.len() - 1; + let remaining = total - self.emitted; + assert!(n <= remaining, "emit({n}): only {remaining} values remain"); + + let cursor = self.emitted; + let start = self.offsets[cursor].as_usize(); + let end = self.offsets[cursor + n].as_usize(); + + // Build emitted offsets before compacting self.offsets. + let base = self.offsets[cursor]; + let offsets: Vec = self.offsets[cursor..=cursor + n] + .iter() + .map(|&o| O::usize_as(o.as_usize() - base.as_usize())) + .collect(); + // SAFETY: monotonically increasing, derived from correctly constructed offsets. + let offsets = unsafe { OffsetBuffer::new_unchecked(ScalarBuffer::from(offsets)) }; + + // Compact surviving offsets and reset cursor. + let surviving: Vec = self.offsets[cursor + n..] + .iter() + .map(|&o| O::usize_as(o.as_usize() - end)) + .collect(); + self.offsets = surviving; + self.emitted = 0; + + // Freeze the builder (zero-copy: MutableBuffer → Arc via finish()). + // slice(start) is a zero-copy Arc clone; the array reads only [0, end-start) + // bytes because the offsets bound it, so the extra trailing bytes are harmless. + let frozen = self.buffer.finish(); + let values = frozen.slice(start); + + // Copy only the surviving bytes into the fresh builder. + self.buffer.append_slice(&frozen[end..]); + + let nulls = self.null.and_then(|(_, null_idx)| { + if null_idx >= cursor && null_idx < cursor + n { + Some(single_null_buffer(n, null_idx - cursor)) + } else { + None + } + }); + self.null = self.null.and_then(|(payload, null_idx)| { + if null_idx >= cursor + n { + Some((payload, null_idx - (cursor + n))) + } else { + None + } + }); + + match self.output_type { + OutputType::Binary => Arc::new(unsafe { + GenericBinaryArray::new_unchecked(offsets, values, nulls) + }), + OutputType::Utf8 => Arc::new(unsafe { + GenericStringArray::new_unchecked(offsets, values, nulls) + }), + _ => unreachable!("View types should use `ArrowBytesViewMap`"), + } + } + /// Converts this set into a `StringArray`, `LargeStringArray`, /// `BinaryArray`, or `LargeBinaryArray` containing each distinct value /// that was inserted. This is done without copying the values. @@ -492,6 +565,7 @@ where random_state: _, hashes_buffer: _, null, + emitted: _, } = self; // Only make a `NullBuffer` if there was a null value @@ -526,6 +600,26 @@ where } } + /// Removes entries whose payload is less than `n` from the hash table and + /// subtracts `n` from every remaining payload. Emitted values are evicted + pub fn drain_emitted(&mut self, n: usize) + where + V: std::ops::Sub + PartialOrd + From, + { + let threshold = V::from(n); + self.map.retain(|entry| entry.payload >= threshold); + for entry in self.map.iter_mut() { + entry.payload = entry.payload - threshold; + } + match &mut self.null { + Some((payload, _)) if *payload >= threshold => { + *payload = *payload - threshold; + } + Some(_) => self.null = None, // null was in the emitted window + None => {} + } + } + /// Total number of entries (including null, if present) pub fn len(&self) -> usize { self.non_null_len() + self.null.map(|_| 1).unwrap_or(0) diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index f7694b4768a27..b4fc8f9d01176 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -132,10 +132,6 @@ name = "compute_statistics" harness = false name = "dictionary_group_values" -[[bench]] -harness = false -name = "group_values_bytes" - [[bench]] harness = false name = "hash_join_semi_anti" diff --git a/datafusion/physical-plan/benches/group_values_bytes.rs b/datafusion/physical-plan/benches/group_values_bytes.rs deleted file mode 100644 index 786c4778578f2..0000000000000 --- a/datafusion/physical-plan/benches/group_values_bytes.rs +++ /dev/null @@ -1,310 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Benchmarks for single-column `GroupValues` backed by bytes/boolean types: -//! `GroupValuesBytes` (Utf8/Binary), `GroupValuesBytes` -//! (LargeUtf8/LargeBinary), `GroupValuesBytesView` (Utf8View/BinaryView), and -//! `GroupValuesBoolean`. -//! -//! Each benchmark covers two patterns: -//! - `intern_emit`: one `intern` call followed by `emit(EmitTo::All)`. -//! - `repeated_intern_emit`: N `intern` calls followed by `emit(EmitTo::All)`. - -use arrow::array::{ - ArrayRef, BinaryArray, BinaryViewArray, BooleanArray, LargeBinaryArray, - LargeStringArray, StringArray, StringViewArray, -}; -use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use criterion::{ - BatchSize, BenchmarkId, Criterion, Throughput, criterion_group, criterion_main, -}; -use datafusion_expr::EmitTo; -use datafusion_physical_plan::aggregates::group_values::new_group_values; -use datafusion_physical_plan::aggregates::order::GroupOrdering; -use rand::rngs::StdRng; -use rand::{Rng, SeedableRng}; -use std::hint::black_box; -use std::sync::Arc; - -const SIZES: [usize; 1] = [8 * 1024]; -const CARDINALITIES: [usize; 3] = [20, 300, 1000]; -const N_BATCHES: usize = 4; -const SEED: u64 = 0xB175; - -fn schema(data_type: DataType) -> SchemaRef { - Arc::new(Schema::new(vec![Field::new("g", data_type, true)])) -} - -fn make_strings(size: usize, cardinality: usize, seed: u64) -> Vec { - let values: Vec = (0..cardinality).map(|i| format!("val_{i:08}")).collect(); - let mut rng = StdRng::seed_from_u64(seed); - (0..size) - .map(|_| values[rng.random_range(0..cardinality)].clone()) - .collect() -} - -fn make_utf8(size: usize, cardinality: usize, seed: u64) -> ArrayRef { - let strings = make_strings(size, cardinality, seed); - Arc::new(StringArray::from( - strings.iter().map(String::as_str).collect::>(), - )) -} - -fn make_large_utf8(size: usize, cardinality: usize, seed: u64) -> ArrayRef { - let strings = make_strings(size, cardinality, seed); - Arc::new(LargeStringArray::from( - strings.iter().map(String::as_str).collect::>(), - )) -} - -fn make_utf8view(size: usize, cardinality: usize, seed: u64) -> ArrayRef { - let strings = make_strings(size, cardinality, seed); - Arc::new(StringViewArray::from( - strings.iter().map(String::as_str).collect::>(), - )) -} - -fn make_binary(size: usize, cardinality: usize, seed: u64) -> ArrayRef { - let strings = make_strings(size, cardinality, seed); - Arc::new(BinaryArray::from( - strings.iter().map(|s| s.as_bytes()).collect::>(), - )) -} - -fn make_large_binary(size: usize, cardinality: usize, seed: u64) -> ArrayRef { - let strings = make_strings(size, cardinality, seed); - Arc::new(LargeBinaryArray::from( - strings.iter().map(|s| s.as_bytes()).collect::>(), - )) -} - -fn make_binary_view(size: usize, cardinality: usize, seed: u64) -> ArrayRef { - let strings = make_strings(size, cardinality, seed); - Arc::new(BinaryViewArray::from( - strings.iter().map(|s| s.as_bytes()).collect::>(), - )) -} - -fn make_boolean(size: usize, seed: u64) -> ArrayRef { - let mut rng = StdRng::seed_from_u64(seed); - Arc::new(BooleanArray::from( - (0..size).map(|_| rng.random_bool(0.5)).collect::>(), - )) -} - -struct Case { - name: &'static str, - data_type: DataType, -} - -fn byte_cases() -> Vec { - vec![ - Case { - name: "utf8", - data_type: DataType::Utf8, - }, - Case { - name: "large_utf8", - data_type: DataType::LargeUtf8, - }, - Case { - name: "utf8view", - data_type: DataType::Utf8View, - }, - Case { - name: "binary", - data_type: DataType::Binary, - }, - Case { - name: "large_binary", - data_type: DataType::LargeBinary, - }, - Case { - name: "binary_view", - data_type: DataType::BinaryView, - }, - ] -} - -fn make_array( - data_type: &DataType, - size: usize, - cardinality: usize, - seed: u64, -) -> ArrayRef { - match data_type { - DataType::Utf8 => make_utf8(size, cardinality, seed), - DataType::LargeUtf8 => make_large_utf8(size, cardinality, seed), - DataType::Utf8View => make_utf8view(size, cardinality, seed), - DataType::Binary => make_binary(size, cardinality, seed), - DataType::LargeBinary => make_large_binary(size, cardinality, seed), - DataType::BinaryView => make_binary_view(size, cardinality, seed), - _ => unreachable!(), - } -} - -fn bench_intern_emit(c: &mut Criterion) { - let mut group = c.benchmark_group("group_values_bytes_intern_emit"); - - for case in byte_cases() { - let s = schema(case.data_type.clone()); - for &size in &SIZES { - for &card in &CARDINALITIES { - let array = make_array(&case.data_type, size, card, SEED); - group.throughput(Throughput::Elements(size as u64)); - group.bench_function( - BenchmarkId::new(case.name, format!("size_{size}_card_{card}")), - |b| { - b.iter_batched_ref( - || { - ( - new_group_values(s.clone(), &GroupOrdering::None) - .unwrap(), - Vec::::with_capacity(size), - ) - }, - |(gv, groups)| { - gv.intern(std::slice::from_ref(&array), groups).unwrap(); - black_box(&*groups); - black_box(gv.emit(EmitTo::All).unwrap()); - }, - BatchSize::SmallInput, - ); - }, - ); - } - } - } - - group.finish(); -} - -fn bench_repeated_intern_emit(c: &mut Criterion) { - let mut group = c.benchmark_group("group_values_bytes_repeated_intern_emit"); - - for case in byte_cases() { - let s = schema(case.data_type.clone()); - for &size in &SIZES { - for &card in &CARDINALITIES { - let batches: Vec = (0..N_BATCHES) - .map(|i| { - make_array( - &case.data_type, - size, - card, - SEED.wrapping_add(i as u64), - ) - }) - .collect(); - group.throughput(Throughput::Elements((size * N_BATCHES) as u64)); - group.bench_function( - BenchmarkId::new(case.name, format!("size_{size}_card_{card}")), - |b| { - b.iter_batched_ref( - || { - ( - new_group_values(s.clone(), &GroupOrdering::None) - .unwrap(), - Vec::::with_capacity(size), - ) - }, - |(gv, groups)| { - for arr in &batches { - gv.intern(std::slice::from_ref(arr), groups).unwrap(); - black_box(&*groups); - } - black_box(gv.emit(EmitTo::All).unwrap()); - }, - BatchSize::SmallInput, - ); - }, - ); - } - } - } - - group.finish(); -} - -fn bench_boolean_intern_emit(c: &mut Criterion) { - let mut group = c.benchmark_group("group_values_boolean_intern_emit"); - let s = schema(DataType::Boolean); - - for &size in &SIZES { - let array = make_boolean(size, SEED); - group.throughput(Throughput::Elements(size as u64)); - group.bench_function(BenchmarkId::new("boolean", format!("size_{size}")), |b| { - b.iter_batched_ref( - || { - ( - new_group_values(s.clone(), &GroupOrdering::None).unwrap(), - Vec::::with_capacity(size), - ) - }, - |(gv, groups)| { - gv.intern(std::slice::from_ref(&array), groups).unwrap(); - black_box(&*groups); - black_box(gv.emit(EmitTo::All).unwrap()); - }, - BatchSize::SmallInput, - ); - }); - } - - group.finish(); -} - -fn bench_boolean_repeated_intern_emit(c: &mut Criterion) { - let mut group = c.benchmark_group("group_values_boolean_repeated_intern_emit"); - let s = schema(DataType::Boolean); - - for &size in &SIZES { - let batches: Vec = (0..N_BATCHES) - .map(|i| make_boolean(size, SEED.wrapping_add(i as u64))) - .collect(); - group.throughput(Throughput::Elements((size * N_BATCHES) as u64)); - group.bench_function(BenchmarkId::new("boolean", format!("size_{size}")), |b| { - b.iter_batched_ref( - || { - ( - new_group_values(s.clone(), &GroupOrdering::None).unwrap(), - Vec::::with_capacity(size), - ) - }, - |(gv, groups)| { - for arr in &batches { - gv.intern(std::slice::from_ref(arr), groups).unwrap(); - black_box(&*groups); - } - black_box(gv.emit(EmitTo::All).unwrap()); - }, - BatchSize::SmallInput, - ); - }); - } - - group.finish(); -} - -criterion_group!( - benches, - bench_intern_emit, - bench_repeated_intern_emit, - bench_boolean_intern_emit, - bench_boolean_repeated_intern_emit, -); -criterion_main!(benches); diff --git a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs index b881a51b25474..787e54a070b71 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs @@ -85,38 +85,13 @@ impl GroupValues for GroupValuesBytes { } fn emit(&mut self, emit_to: EmitTo) -> Result> { - // Reset the map to default, and convert it into a single array - let map_contents = self.map.take().into_state(); - - let group_values = match emit_to { - EmitTo::All => { - self.num_groups -= map_contents.len(); - map_contents - } - EmitTo::First(n) if n == self.len() => { - self.num_groups -= map_contents.len(); - map_contents - } - EmitTo::First(n) => { - // if we only wanted to take the first n, insert the rest back - // into the map we could potentially avoid this reallocation, at - // the expense of much more complex code. - // see https://github.com/apache/datafusion/issues/9195 - let emit_group_values = map_contents.slice(0, n); - let remaining_group_values = - map_contents.slice(n, map_contents.len() - n); - - self.num_groups = 0; - let mut group_indexes = vec![]; - self.intern(&[remaining_group_values], &mut group_indexes)?; - - // Verify that the group indexes were assigned in the correct order - assert_eq!(0, group_indexes[0]); - - emit_group_values - } + let n = match emit_to { + EmitTo::All => self.num_groups, + EmitTo::First(n) => n.min(self.num_groups), }; - + let group_values = self.map.emit(n); + self.map.drain_emitted(n); + self.num_groups -= n; Ok(vec![group_values]) } From 337af594aa6a1e7eb4fe9ae31ea790f030c67c99 Mon Sep 17 00:00:00 2001 From: Richard Date: Thu, 25 Jun 2026 23:12:25 -0400 Subject: [PATCH 3/5] emit-all path --- .../physical-expr-common/src/binary_map.rs | 12 ++++++++++++ .../group_values/single_group_by/bytes.rs | 18 ++++++++++++------ 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/datafusion/physical-expr-common/src/binary_map.rs b/datafusion/physical-expr-common/src/binary_map.rs index 9e7c8e3123d16..78c0e5d88f4c8 100644 --- a/datafusion/physical-expr-common/src/binary_map.rs +++ b/datafusion/physical-expr-common/src/binary_map.rs @@ -523,6 +523,18 @@ where // Copy only the surviving bytes into the fresh builder. self.buffer.append_slice(&frozen[end..]); + // Rebase non-inline offsets: buffer now starts at what was `end`. + // Only surviving entries (offset >= end) need rebasing; emitted entries + // (offset < end) are removed by drain_emitted and must not be touched. + if end > 0 { + for entry in self.map.iter_mut() { + if entry.len.as_usize() > SHORT_VALUE_LEN && entry.offset_or_inline >= end + { + entry.offset_or_inline -= end; + } + } + } + let nulls = self.null.and_then(|(_, null_idx)| { if null_idx >= cursor && null_idx < cursor + n { Some(single_null_buffer(n, null_idx - cursor)) diff --git a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs index 787e54a070b71..4267613733dd5 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs @@ -85,13 +85,19 @@ impl GroupValues for GroupValuesBytes { } fn emit(&mut self, emit_to: EmitTo) -> Result> { - let n = match emit_to { - EmitTo::All => self.num_groups, - EmitTo::First(n) => n.min(self.num_groups), + let group_values = match emit_to { + EmitTo::All => { + self.num_groups = 0; + self.map.take().into_state() + } + EmitTo::First(n) => { + let n = n.min(self.num_groups); + let group_values = self.map.emit(n); + self.map.drain_emitted(n); + self.num_groups -= n; + group_values + } }; - let group_values = self.map.emit(n); - self.map.drain_emitted(n); - self.num_groups -= n; Ok(vec![group_values]) } From 902effe4b1d7f018239107d87948e1e326bd4867 Mon Sep 17 00:00:00 2001 From: Richard Date: Fri, 26 Jun 2026 00:49:19 -0400 Subject: [PATCH 4/5] shrink pr --- .../physical-expr-common/src/binary_map.rs | 22 +++++++------------ .../group_values/single_group_by/bytes.rs | 1 - 2 files changed, 8 insertions(+), 15 deletions(-) diff --git a/datafusion/physical-expr-common/src/binary_map.rs b/datafusion/physical-expr-common/src/binary_map.rs index 78c0e5d88f4c8..150c86a947db1 100644 --- a/datafusion/physical-expr-common/src/binary_map.rs +++ b/datafusion/physical-expr-common/src/binary_map.rs @@ -481,14 +481,11 @@ where } /// Emits the next `n` distinct values as an Arrow array. - /// - /// The builder is frozen into an immutable `Buffer` (zero-copy) and the - /// emitted window is sliced from it directly (zero-copy). Only the - /// surviving bytes are copied back into a fresh builder, so the insertion - /// path is unchanged. - /// /// Panics if `n` exceeds the number of not-yet-emitted values. - pub fn emit(&mut self, n: usize) -> ArrayRef { + pub fn emit(&mut self, n: usize) -> ArrayRef + where + V: std::ops::Sub + PartialOrd + From, + { let total = self.offsets.len() - 1; let remaining = total - self.emitted; assert!(n <= remaining, "emit({n}): only {remaining} values remain"); @@ -514,16 +511,12 @@ where self.offsets = surviving; self.emitted = 0; - // Freeze the builder (zero-copy: MutableBuffer → Arc via finish()). - // slice(start) is a zero-copy Arc clone; the array reads only [0, end-start) - // bytes because the offsets bound it, so the extra trailing bytes are harmless. let frozen = self.buffer.finish(); let values = frozen.slice(start); // Copy only the surviving bytes into the fresh builder. self.buffer.append_slice(&frozen[end..]); - // Rebase non-inline offsets: buffer now starts at what was `end`. // Only surviving entries (offset >= end) need rebasing; emitted entries // (offset < end) are removed by drain_emitted and must not be touched. if end > 0 { @@ -550,6 +543,8 @@ where } }); + self.drain_emitted(n); + match self.output_type { OutputType::Binary => Arc::new(unsafe { GenericBinaryArray::new_unchecked(offsets, values, nulls) @@ -612,9 +607,8 @@ where } } - /// Removes entries whose payload is less than `n` from the hash table and - /// subtracts `n` from every remaining payload. Emitted values are evicted - pub fn drain_emitted(&mut self, n: usize) + #[inline] + fn drain_emitted(&mut self, n: usize) where V: std::ops::Sub + PartialOrd + From, { diff --git a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs index 4267613733dd5..48bfedbb1918f 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs @@ -93,7 +93,6 @@ impl GroupValues for GroupValuesBytes { EmitTo::First(n) => { let n = n.min(self.num_groups); let group_values = self.map.emit(n); - self.map.drain_emitted(n); self.num_groups -= n; group_values } From 8e321b1ef8b3962c3377370c21681ce2690913bf Mon Sep 17 00:00:00 2001 From: Richard Date: Fri, 26 Jun 2026 15:44:55 -0400 Subject: [PATCH 5/5] remove two O(m) passes --- .../physical-expr-common/src/binary_map.rs | 51 ++++++++----------- .../group_values/single_group_by/bytes.rs | 5 +- 2 files changed, 24 insertions(+), 32 deletions(-) diff --git a/datafusion/physical-expr-common/src/binary_map.rs b/datafusion/physical-expr-common/src/binary_map.rs index 150c86a947db1..4e6b9236061f8 100644 --- a/datafusion/physical-expr-common/src/binary_map.rs +++ b/datafusion/physical-expr-common/src/binary_map.rs @@ -488,7 +488,7 @@ where { let total = self.offsets.len() - 1; let remaining = total - self.emitted; - assert!(n <= remaining, "emit({n}): only {remaining} values remain"); + debug_assert!(n <= remaining, "emit({n}): only {remaining} values remain"); let cursor = self.emitted; let start = self.offsets[cursor].as_usize(); @@ -517,16 +517,21 @@ where // Copy only the surviving bytes into the fresh builder. self.buffer.append_slice(&frozen[end..]); - // Only surviving entries (offset >= end) need rebasing; emitted entries - // (offset < end) are removed by drain_emitted and must not be touched. - if end > 0 { - for entry in self.map.iter_mut() { - if entry.len.as_usize() > SHORT_VALUE_LEN && entry.offset_or_inline >= end - { - entry.offset_or_inline -= end; - } + // drop emitted entries, rebase buffer offsets and payloads. + let threshold = V::from(n); + self.map.retain(|entry| { + if entry.payload < threshold { + return false; } - } + if end > 0 + && entry.len.as_usize() > SHORT_VALUE_LEN + && entry.offset_or_inline >= end + { + entry.offset_or_inline -= end; + } + entry.payload = entry.payload - threshold; + true + }); let nulls = self.null.and_then(|(_, null_idx)| { if null_idx >= cursor && null_idx < cursor + n { @@ -542,8 +547,11 @@ where None } }); - - self.drain_emitted(n); + // Rebase null payload. If self.null survived the and_then above, + // null_idx >= cursor + n, which by invariant means payload >= threshold. + if let Some((ref mut payload, _)) = self.null { + *payload = *payload - threshold; + } match self.output_type { OutputType::Binary => Arc::new(unsafe { @@ -607,25 +615,6 @@ where } } - #[inline] - fn drain_emitted(&mut self, n: usize) - where - V: std::ops::Sub + PartialOrd + From, - { - let threshold = V::from(n); - self.map.retain(|entry| entry.payload >= threshold); - for entry in self.map.iter_mut() { - entry.payload = entry.payload - threshold; - } - match &mut self.null { - Some((payload, _)) if *payload >= threshold => { - *payload = *payload - threshold; - } - Some(_) => self.null = None, // null was in the emitted window - None => {} - } - } - /// Total number of entries (including null, if present) pub fn len(&self) -> usize { self.non_null_len() + self.null.map(|_| 1).unwrap_or(0) diff --git a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs index 48bfedbb1918f..6658ba905b5a6 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs @@ -90,8 +90,11 @@ impl GroupValues for GroupValuesBytes { self.num_groups = 0; self.map.take().into_state() } + EmitTo::First(n) if n >= self.num_groups => { + self.num_groups = 0; + self.map.take().into_state() + } EmitTo::First(n) => { - let n = n.min(self.num_groups); let group_values = self.map.emit(n); self.num_groups -= n; group_values