Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions datafusion/physical-expr-common/src/binary_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ where
/// NOTE null_index is the logical index in the final array, not the index
/// in the buffer
null: Option<(V, usize)>,
/// Number of distinct values already emitted via [`Self::emit`].
/// Acts as a cursor into `offsets` and `buffer`.
emitted: usize,
}

/// The size, in number of entries, of the initial hash table
Expand All @@ -253,6 +256,7 @@ where
random_state: RandomState::default(),
hashes_buffer: vec![],
null: None,
emitted: 0,
}
}

Expand Down Expand Up @@ -476,6 +480,90 @@ where
}
}

/// Emits the next `n` distinct values as an Arrow array.
/// Panics if `n` exceeds the number of not-yet-emitted values.
pub fn emit(&mut self, n: usize) -> ArrayRef
where
V: std::ops::Sub<Output = V> + PartialOrd + From<usize>,
{
let total = self.offsets.len() - 1;
let remaining = total - self.emitted;
debug_assert!(n <= remaining, "emit({n}): only {remaining} values remain");

let cursor = self.emitted;
let start = self.offsets[cursor].as_usize();
let end = self.offsets[cursor + n].as_usize();

// Build emitted offsets before compacting self.offsets.
let base = self.offsets[cursor];
let offsets: Vec<O> = self.offsets[cursor..=cursor + n]
.iter()
.map(|&o| O::usize_as(o.as_usize() - base.as_usize()))
.collect();
// SAFETY: monotonically increasing, derived from correctly constructed offsets.
let offsets = unsafe { OffsetBuffer::new_unchecked(ScalarBuffer::from(offsets)) };

// Compact surviving offsets and reset cursor.
let surviving: Vec<O> = self.offsets[cursor + n..]
.iter()
.map(|&o| O::usize_as(o.as_usize() - end))
.collect();
self.offsets = surviving;
self.emitted = 0;

let frozen = self.buffer.finish();
let values = frozen.slice(start);

// Copy only the surviving bytes into the fresh builder.
self.buffer.append_slice(&frozen[end..]);

// drop emitted entries, rebase buffer offsets and payloads.
let threshold = V::from(n);
self.map.retain(|entry| {
if entry.payload < threshold {
return false;
}
if end > 0
&& entry.len.as_usize() > SHORT_VALUE_LEN
&& entry.offset_or_inline >= end
{
entry.offset_or_inline -= end;
}
entry.payload = entry.payload - threshold;
true
});

let nulls = self.null.and_then(|(_, null_idx)| {
if null_idx >= cursor && null_idx < cursor + n {
Some(single_null_buffer(n, null_idx - cursor))
} else {
None
}
});
self.null = self.null.and_then(|(payload, null_idx)| {
if null_idx >= cursor + n {
Some((payload, null_idx - (cursor + n)))
} else {
None
}
});
// Rebase null payload. If self.null survived the and_then above,
// null_idx >= cursor + n, which by invariant means payload >= threshold.
if let Some((ref mut payload, _)) = self.null {
*payload = *payload - threshold;
}

match self.output_type {
OutputType::Binary => Arc::new(unsafe {
GenericBinaryArray::new_unchecked(offsets, values, nulls)
}),
OutputType::Utf8 => Arc::new(unsafe {
GenericStringArray::new_unchecked(offsets, values, nulls)
}),
_ => unreachable!("View types should use `ArrowBytesViewMap`"),
}
}

/// Converts this set into a `StringArray`, `LargeStringArray`,
/// `BinaryArray`, or `LargeBinaryArray` containing each distinct value
/// that was inserted. This is done without copying the values.
Expand All @@ -492,6 +580,7 @@ where
random_state: _,
hashes_buffer: _,
null,
emitted: _,
} = self;

// Only make a `NullBuffer` if there was a null value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,38 +85,21 @@ impl<O: OffsetSizeTrait> GroupValues for GroupValuesBytes<O> {
}

fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
// Reset the map to default, and convert it into a single array
let map_contents = self.map.take().into_state();

let group_values = match emit_to {
EmitTo::All => {
self.num_groups -= map_contents.len();
map_contents
self.num_groups = 0;
self.map.take().into_state()
}
EmitTo::First(n) if n == self.len() => {
self.num_groups -= map_contents.len();
map_contents
EmitTo::First(n) if n >= self.num_groups => {
self.num_groups = 0;
self.map.take().into_state()
}
EmitTo::First(n) => {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be worth adding a match if branch here to see if n == group_size. in that case also just use into_state()

// if we only wanted to take the first n, insert the rest back
// into the map we could potentially avoid this reallocation, at
// the expense of much more complex code.
// see https://github.com/apache/datafusion/issues/9195
let emit_group_values = map_contents.slice(0, n);
let remaining_group_values =
map_contents.slice(n, map_contents.len() - n);

self.num_groups = 0;
let mut group_indexes = vec![];
self.intern(&[remaining_group_values], &mut group_indexes)?;

// Verify that the group indexes were assigned in the correct order
assert_eq!(0, group_indexes[0]);

emit_group_values
let group_values = self.map.emit(n);
self.num_groups -= n;
group_values
}
};

Ok(vec![group_values])
}

Expand Down
Loading