Skip to content

Commit 337af59

Browse files
committed
emit-all path
1 parent f77b22b commit 337af59

2 files changed

Lines changed: 24 additions & 6 deletions

File tree

  • datafusion

datafusion/physical-expr-common/src/binary_map.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -523,6 +523,18 @@ where
523523
// Copy only the surviving bytes into the fresh builder.
524524
self.buffer.append_slice(&frozen[end..]);
525525

526+
// Rebase non-inline offsets: buffer now starts at what was `end`.
527+
// Only surviving entries (offset >= end) need rebasing; emitted entries
528+
// (offset < end) are removed by drain_emitted and must not be touched.
529+
if end > 0 {
530+
for entry in self.map.iter_mut() {
531+
if entry.len.as_usize() > SHORT_VALUE_LEN && entry.offset_or_inline >= end
532+
{
533+
entry.offset_or_inline -= end;
534+
}
535+
}
536+
}
537+
526538
let nulls = self.null.and_then(|(_, null_idx)| {
527539
if null_idx >= cursor && null_idx < cursor + n {
528540
Some(single_null_buffer(n, null_idx - cursor))

datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -85,13 +85,19 @@ impl<O: OffsetSizeTrait> GroupValues for GroupValuesBytes<O> {
8585
}
8686

8787
fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
88-
let n = match emit_to {
89-
EmitTo::All => self.num_groups,
90-
EmitTo::First(n) => n.min(self.num_groups),
88+
let group_values = match emit_to {
89+
EmitTo::All => {
90+
self.num_groups = 0;
91+
self.map.take().into_state()
92+
}
93+
EmitTo::First(n) => {
94+
let n = n.min(self.num_groups);
95+
let group_values = self.map.emit(n);
96+
self.map.drain_emitted(n);
97+
self.num_groups -= n;
98+
group_values
99+
}
91100
};
92-
let group_values = self.map.emit(n);
93-
self.map.drain_emitted(n);
94-
self.num_groups -= n;
95101
Ok(vec![group_values])
96102
}
97103

0 commit comments

Comments
 (0)