diff --git a/datafusion/physical-expr-common/src/binary_map.rs b/datafusion/physical-expr-common/src/binary_map.rs index ad184d6500d56..4e6b9236061f8 100644 --- a/datafusion/physical-expr-common/src/binary_map.rs +++ b/datafusion/physical-expr-common/src/binary_map.rs @@ -233,6 +233,9 @@ where /// NOTE null_index is the logical index in the final array, not the index /// in the buffer null: Option<(V, usize)>, + /// Number of distinct values already emitted via [`Self::emit`]. + /// Acts as a cursor into `offsets` and `buffer`. + emitted: usize, } /// The size, in number of entries, of the initial hash table @@ -253,6 +256,7 @@ where random_state: RandomState::default(), hashes_buffer: vec![], null: None, + emitted: 0, } } @@ -476,6 +480,90 @@ where } } + /// Emits the next `n` distinct values as an Arrow array. + /// Panics if `n` exceeds the number of not-yet-emitted values. + pub fn emit(&mut self, n: usize) -> ArrayRef + where + V: std::ops::Sub + PartialOrd + From, + { + let total = self.offsets.len() - 1; + let remaining = total - self.emitted; + debug_assert!(n <= remaining, "emit({n}): only {remaining} values remain"); + + let cursor = self.emitted; + let start = self.offsets[cursor].as_usize(); + let end = self.offsets[cursor + n].as_usize(); + + // Build emitted offsets before compacting self.offsets. + let base = self.offsets[cursor]; + let offsets: Vec = self.offsets[cursor..=cursor + n] + .iter() + .map(|&o| O::usize_as(o.as_usize() - base.as_usize())) + .collect(); + // SAFETY: monotonically increasing, derived from correctly constructed offsets. + let offsets = unsafe { OffsetBuffer::new_unchecked(ScalarBuffer::from(offsets)) }; + + // Compact surviving offsets and reset cursor. + let surviving: Vec = self.offsets[cursor + n..] + .iter() + .map(|&o| O::usize_as(o.as_usize() - end)) + .collect(); + self.offsets = surviving; + self.emitted = 0; + + let frozen = self.buffer.finish(); + let values = frozen.slice(start); + + // Copy only the surviving bytes into the fresh builder. + self.buffer.append_slice(&frozen[end..]); + + // drop emitted entries, rebase buffer offsets and payloads. + let threshold = V::from(n); + self.map.retain(|entry| { + if entry.payload < threshold { + return false; + } + if end > 0 + && entry.len.as_usize() > SHORT_VALUE_LEN + && entry.offset_or_inline >= end + { + entry.offset_or_inline -= end; + } + entry.payload = entry.payload - threshold; + true + }); + + let nulls = self.null.and_then(|(_, null_idx)| { + if null_idx >= cursor && null_idx < cursor + n { + Some(single_null_buffer(n, null_idx - cursor)) + } else { + None + } + }); + self.null = self.null.and_then(|(payload, null_idx)| { + if null_idx >= cursor + n { + Some((payload, null_idx - (cursor + n))) + } else { + None + } + }); + // Rebase null payload. If self.null survived the and_then above, + // null_idx >= cursor + n, which by invariant means payload >= threshold. + if let Some((ref mut payload, _)) = self.null { + *payload = *payload - threshold; + } + + match self.output_type { + OutputType::Binary => Arc::new(unsafe { + GenericBinaryArray::new_unchecked(offsets, values, nulls) + }), + OutputType::Utf8 => Arc::new(unsafe { + GenericStringArray::new_unchecked(offsets, values, nulls) + }), + _ => unreachable!("View types should use `ArrowBytesViewMap`"), + } + } + /// Converts this set into a `StringArray`, `LargeStringArray`, /// `BinaryArray`, or `LargeBinaryArray` containing each distinct value /// that was inserted. This is done without copying the values. @@ -492,6 +580,7 @@ where random_state: _, hashes_buffer: _, null, + emitted: _, } = self; // Only make a `NullBuffer` if there was a null value diff --git a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs index b881a51b25474..6658ba905b5a6 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs @@ -85,38 +85,21 @@ impl GroupValues for GroupValuesBytes { } fn emit(&mut self, emit_to: EmitTo) -> Result> { - // Reset the map to default, and convert it into a single array - let map_contents = self.map.take().into_state(); - let group_values = match emit_to { EmitTo::All => { - self.num_groups -= map_contents.len(); - map_contents + self.num_groups = 0; + self.map.take().into_state() } - EmitTo::First(n) if n == self.len() => { - self.num_groups -= map_contents.len(); - map_contents + EmitTo::First(n) if n >= self.num_groups => { + self.num_groups = 0; + self.map.take().into_state() } EmitTo::First(n) => { - // if we only wanted to take the first n, insert the rest back - // into the map we could potentially avoid this reallocation, at - // the expense of much more complex code. - // see https://github.com/apache/datafusion/issues/9195 - let emit_group_values = map_contents.slice(0, n); - let remaining_group_values = - map_contents.slice(n, map_contents.len() - n); - - self.num_groups = 0; - let mut group_indexes = vec![]; - self.intern(&[remaining_group_values], &mut group_indexes)?; - - // Verify that the group indexes were assigned in the correct order - assert_eq!(0, group_indexes[0]); - - emit_group_values + let group_values = self.map.emit(n); + self.num_groups -= n; + group_values } }; - Ok(vec![group_values]) }