Skip to content

Commit 154926b

Browse files
committed
fix: add build-side join keys to memory accounting
1 parent 0283077 commit 154926b

File tree

1 file changed

+48
-10
lines changed

1 file changed

+48
-10
lines changed

datafusion/physical-plan/src/joins/hash_join.rs

Lines changed: 48 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,12 @@ use arrow::array::{
5656
Array, ArrayRef, BooleanArray, BooleanBufferBuilder, UInt32Array, UInt64Array,
5757
};
5858
use arrow::compute::kernels::cmp::{eq, not_distinct};
59-
use arrow::compute::{and, concat_batches, take, FilterBuilder};
59+
use arrow::compute::{and, concat, concat_batches, take, FilterBuilder};
6060
use arrow::datatypes::{Schema, SchemaRef};
6161
use arrow::record_batch::RecordBatch;
6262
use arrow::util::bit_util;
6363
use arrow_array::cast::downcast_array;
64+
use arrow_array::new_empty_array;
6465
use arrow_schema::ArrowError;
6566
use datafusion_common::utils::memory::estimate_memory_size;
6667
use datafusion_common::{
@@ -914,6 +915,52 @@ async fn collect_left_input(
914915
})
915916
.await?;
916917

918+
let batches_iter = batches.iter().rev();
919+
920+
let left_values = if batches.is_empty() {
921+
on_left
922+
.iter()
923+
.map(|expr| Ok(new_empty_array(&expr.data_type(&schema)?)))
924+
.collect::<Result<Vec<_>>>()?
925+
} else {
926+
on_left
927+
.iter()
928+
.map(|expr| {
929+
let mut key_reservation = 0;
930+
let join_key_arrays = batches_iter
931+
.clone()
932+
.map(|batch| {
933+
let array: Arc<dyn Array> =
934+
expr.evaluate(batch).unwrap().into_array(batch.num_rows())?;
935+
let array_size = array.get_array_memory_size();
936+
reservation.try_grow(array_size)?;
937+
key_reservation += array_size;
938+
Ok(array)
939+
})
940+
.collect::<Result<Vec<_>>>()?;
941+
942+
// `concat` function is non-consuming, so reserving x2 memory
943+
// required for collected join key arrays (assuming worst case scenario)
944+
reservation.try_grow(key_reservation)?;
945+
let concatenated = concat(
946+
join_key_arrays
947+
.iter()
948+
.map(AsRef::as_ref)
949+
.collect::<Vec<_>>()
950+
.as_ref(),
951+
)?;
952+
953+
// Resizing reservation to its original size + concatenated array size
954+
let build_size_mem = reservation.size() - key_reservation * 2
955+
+ concatenated.get_array_memory_size();
956+
reservation.resize(build_size_mem);
957+
metrics.build_mem_used.set(build_size_mem);
958+
959+
Ok(concatenated)
960+
})
961+
.collect::<Result<Vec<_>>>()?
962+
};
963+
917964
// Estimation of memory size, required for hashtable, prior to allocation.
918965
// Final result can be verified using `RawTable.allocation_info()`
919966
let fixed_size = size_of::<JoinHashMap>();
@@ -928,7 +975,6 @@ async fn collect_left_input(
928975
let mut offset = 0;
929976

930977
// Updating hashmap starting from the last batch
931-
let batches_iter = batches.iter().rev();
932978
for batch in batches_iter.clone() {
933979
hashes_buffer.clear();
934980
hashes_buffer.resize(batch.num_rows(), 0);
@@ -960,14 +1006,6 @@ async fn collect_left_input(
9601006
BooleanBufferBuilder::new(0)
9611007
};
9621008

963-
let left_values = on_left
964-
.iter()
965-
.map(|c| {
966-
c.evaluate(&single_batch)?
967-
.into_array(single_batch.num_rows())
968-
})
969-
.collect::<Result<Vec<_>>>()?;
970-
9711009
let data = JoinLeftData::new(
9721010
hashmap,
9731011
single_batch,

0 commit comments

Comments
 (0)