From 2c19116ce71cf4ad42eb38b280fa49412791f753 Mon Sep 17 00:00:00 2001 From: Eduard Karacharov Date: Wed, 20 Dec 2023 22:04:04 +0300 Subject: [PATCH] maintaining fifo hashmap in hash join --- .../physical-plan/src/joins/hash_join.rs | 138 ++++++++---------- .../src/joins/symmetric_hash_join.rs | 2 + datafusion/physical-plan/src/joins/utils.rs | 79 +++++++++- 3 files changed, 141 insertions(+), 78 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 13ac06ee301c0..efce4a1c47bd2 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -29,7 +29,6 @@ use crate::joins::utils::{ need_produce_result_in_final, JoinHashMap, JoinHashMapType, }; use crate::{ - coalesce_batches::concat_batches, coalesce_partitions::CoalescePartitionsExec, expressions::Column, expressions::PhysicalSortExpr, @@ -52,10 +51,10 @@ use super::{ use arrow::array::{ Array, ArrayRef, BooleanArray, BooleanBufferBuilder, PrimitiveArray, UInt32Array, - UInt32BufferBuilder, UInt64Array, UInt64BufferBuilder, + UInt64Array, }; use arrow::compute::kernels::cmp::{eq, not_distinct}; -use arrow::compute::{and, take, FilterBuilder}; +use arrow::compute::{and, concat_batches, take, FilterBuilder}; use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow::util::bit_util; @@ -715,7 +714,10 @@ async fn collect_left_input( let mut hashmap = JoinHashMap::with_capacity(num_rows); let mut hashes_buffer = Vec::new(); let mut offset = 0; - for batch in batches.iter() { + + // Reverse iteration over build-side input batches allows to create FIFO hashmap + let batches_iter = batches.iter().rev(); + for batch in batches_iter.clone() { hashes_buffer.clear(); hashes_buffer.resize(batch.num_rows(), 0); update_hash( @@ -726,19 +728,25 @@ async fn collect_left_input( &random_state, &mut hashes_buffer, 0, + true, )?; offset += batch.num_rows(); } // Merge all batches into a single batch, so we // can directly index into the arrays - let single_batch = concat_batches(&schema, &batches, num_rows)?; + let single_batch = concat_batches(&schema, batches_iter)?; let data = JoinLeftData::new(hashmap, single_batch, reservation); Ok(data) } -/// Updates `hash` with new entries from [RecordBatch] evaluated against the expressions `on`, -/// assuming that the [RecordBatch] corresponds to the `index`th +/// Updates `hash_map` with new entries from `batch` evaluated against the expressions `on` +/// using `offset` as a start value for `batch` row indices. +/// +/// `fifo_hashmap` sets the order of iteration over `batch` rows while updating hashmap, +/// which allows to keep either first (if set to true) or last (if set to false) row index +/// as a chain head for matching hashes. +#[allow(clippy::too_many_arguments)] pub fn update_hash( on: &[Column], batch: &RecordBatch, @@ -747,6 +755,7 @@ pub fn update_hash( random_state: &RandomState, hashes_buffer: &mut Vec, deleted_offset: usize, + fifo_hashmap: bool, ) -> Result<()> where T: JoinHashMapType, @@ -763,28 +772,18 @@ where // For usual JoinHashmap, the implementation is void. hash_map.extend_zero(batch.num_rows()); - // insert hashes to key of the hashmap - let (mut_map, mut_list) = hash_map.get_mut(); - for (row, hash_value) in hash_values.iter().enumerate() { - let item = mut_map.get_mut(*hash_value, |(hash, _)| *hash_value == *hash); - if let Some((_, index)) = item { - // Already exists: add index to next array - let prev_index = *index; - // Store new value inside hashmap - *index = (row + offset + 1) as u64; - // Update chained Vec at row + offset with previous value - mut_list[row + offset - deleted_offset] = prev_index; - } else { - mut_map.insert( - *hash_value, - // store the value + 1 as 0 value reserved for end of list - (*hash_value, (row + offset + 1) as u64), - |(hash, _)| *hash, - ); - // chained list at (row + offset) is already initialized with 0 - // meaning end of list - } + // Updating JoinHashMap from hash values iterator + let hash_values_iter = hash_values + .iter() + .enumerate() + .map(|(i, val)| (i + offset, val)); + + if fifo_hashmap { + hash_map.update_from_iter(hash_values_iter.rev(), deleted_offset); + } else { + hash_map.update_from_iter(hash_values_iter, deleted_offset); } + Ok(()) } @@ -987,6 +986,7 @@ pub fn build_equal_condition_join_indices( filter: Option<&JoinFilter>, build_side: JoinSide, deleted_offset: Option, + fifo_hashmap: bool, ) -> Result<(UInt64Array, UInt32Array)> { let keys_values = probe_on .iter() @@ -1002,10 +1002,9 @@ pub fn build_equal_condition_join_indices( hashes_buffer.clear(); hashes_buffer.resize(probe_batch.num_rows(), 0); let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?; - // Using a buffer builder to avoid slower normal builder - let mut build_indices = UInt64BufferBuilder::new(0); - let mut probe_indices = UInt32BufferBuilder::new(0); - // The chained list algorithm generates build indices for each probe row in a reversed sequence as such: + + // In case build-side input has not been inverted while JoinHashMap creation, the chained list algorithm + // will return build indices for each probe row in a reverse order: // Build Indices: [5, 4, 3] // Probe Indices: [1, 1, 1] // @@ -1034,44 +1033,17 @@ pub fn build_equal_condition_join_indices( // (5,1) // // With this approach, the lexicographic order on both the probe side and the build side is preserved. - let hash_map = build_hashmap.get_map(); - let next_chain = build_hashmap.get_list(); - for (row, hash_value) in hash_values.iter().enumerate().rev() { - // Get the hash and find it in the build index - - // For every item on the build and probe we check if it matches - // This possibly contains rows with hash collisions, - // So we have to check here whether rows are equal or not - if let Some((_, index)) = - hash_map.get(*hash_value, |(hash, _)| *hash_value == *hash) - { - let mut i = *index - 1; - loop { - let build_row_value = if let Some(offset) = deleted_offset { - // This arguments means that we prune the next index way before here. - if i < offset as u64 { - // End of the list due to pruning - break; - } - i - offset as u64 - } else { - i - }; - build_indices.append(build_row_value); - probe_indices.append(row as u32); - // Follow the chain to get the next index value - let next = next_chain[build_row_value as usize]; - if next == 0 { - // end of list - break; - } - i = next - 1; - } - } - } - // Reversing both sets of indices - build_indices.as_slice_mut().reverse(); - probe_indices.as_slice_mut().reverse(); + let (mut build_indices, mut probe_indices) = if fifo_hashmap { + build_hashmap.get_matched_indices(hash_values.iter().enumerate(), deleted_offset) + } else { + let (mut matched_build, mut matched_probe) = build_hashmap + .get_matched_indices(hash_values.iter().enumerate().rev(), deleted_offset); + + matched_build.as_slice_mut().reverse(); + matched_probe.as_slice_mut().reverse(); + + (matched_build, matched_probe) + }; let left: UInt64Array = PrimitiveArray::new(build_indices.finish().into(), None); let right: UInt32Array = PrimitiveArray::new(probe_indices.finish().into(), None); @@ -1279,6 +1251,7 @@ impl HashJoinStream { self.filter.as_ref(), JoinSide::Left, None, + true, ); let result = match left_right_indices { @@ -1393,7 +1366,9 @@ mod tests { use arrow::array::{ArrayRef, Date32Array, Int32Array, UInt32Builder, UInt64Builder}; use arrow::datatypes::{DataType, Field, Schema}; - use datafusion_common::{assert_batches_sorted_eq, assert_contains, ScalarValue}; + use datafusion_common::{ + assert_batches_eq, assert_batches_sorted_eq, assert_contains, ScalarValue, + }; use datafusion_execution::config::SessionConfig; use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion_expr::Operator; @@ -1558,7 +1533,9 @@ mod tests { "| 3 | 5 | 9 | 20 | 5 | 80 |", "+----+----+----+----+----+----+", ]; - assert_batches_sorted_eq!(expected, &batches); + + // Inner join output is expected to preserve both inputs order + assert_batches_eq!(expected, &batches); Ok(()) } @@ -1640,7 +1617,8 @@ mod tests { "+----+----+----+----+----+----+", ]; - assert_batches_sorted_eq!(expected, &batches); + // Inner join output is expected to preserve both inputs order + assert_batches_eq!(expected, &batches); Ok(()) } @@ -1686,7 +1664,8 @@ mod tests { "+----+----+----+----+----+----+", ]; - assert_batches_sorted_eq!(expected, &batches); + // Inner join output is expected to preserve both inputs order + assert_batches_eq!(expected, &batches); Ok(()) } @@ -1740,7 +1719,8 @@ mod tests { "+----+----+----+----+----+----+", ]; - assert_batches_sorted_eq!(expected, &batches); + // Inner join output is expected to preserve both inputs order + assert_batches_eq!(expected, &batches); Ok(()) } @@ -1789,7 +1769,9 @@ mod tests { "| 1 | 4 | 7 | 10 | 4 | 70 |", "+----+----+----+----+----+----+", ]; - assert_batches_sorted_eq!(expected, &batches); + + // Inner join output is expected to preserve both inputs order + assert_batches_eq!(expected, &batches); // second part let stream = join.execute(1, task_ctx.clone())?; @@ -1804,7 +1786,8 @@ mod tests { "+----+----+----+----+----+----+", ]; - assert_batches_sorted_eq!(expected, &batches); + // Inner join output is expected to preserve both inputs order + assert_batches_eq!(expected, &batches); Ok(()) } @@ -2734,6 +2717,7 @@ mod tests { None, JoinSide::Left, None, + false, )?; let mut left_ids = UInt64Builder::with_capacity(0); diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index b9101b57c3e55..d7473c0a37964 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -770,6 +770,7 @@ pub(crate) fn join_with_probe_batch( filter, build_hash_joiner.build_side, Some(build_hash_joiner.deleted_offset), + false, )?; if need_to_produce_result_in_final(build_hash_joiner.build_side, join_type) { record_visited_indices( @@ -882,6 +883,7 @@ impl OneSideHashJoiner { random_state, &mut self.hashes_buffer, self.deleted_offset, + false, )?; Ok(()) } diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index eae65ce9c26be..872680dcbdbd4 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -31,7 +31,7 @@ use crate::{ColumnStatistics, ExecutionPlan, Partitioning, Statistics}; use arrow::array::{ downcast_array, new_null_array, Array, BooleanBufferBuilder, UInt32Array, - UInt32Builder, UInt64Array, + UInt32BufferBuilder, UInt32Builder, UInt64Array, UInt64BufferBuilder, }; use arrow::compute; use arrow::datatypes::{Field, Schema, SchemaBuilder}; @@ -151,6 +151,83 @@ pub trait JoinHashMapType { fn get_map(&self) -> &RawTable<(u64, u64)>; /// Returns a reference to the next. fn get_list(&self) -> &Self::NextType; + + /// Updates hashmap from iterator of row indices & row hashes pairs. + fn update_from_iter<'a>( + &mut self, + iter: impl Iterator, + deleted_offset: usize, + ) { + let (mut_map, mut_list) = self.get_mut(); + for (row, hash_value) in iter { + let item = mut_map.get_mut(*hash_value, |(hash, _)| *hash_value == *hash); + if let Some((_, index)) = item { + // Already exists: add index to next array + let prev_index = *index; + // Store new value inside hashmap + *index = (row + 1) as u64; + // Update chained Vec at row + offset with previous value + mut_list[row - deleted_offset] = prev_index; + } else { + mut_map.insert( + *hash_value, + // store the value + 1 as 0 value reserved for end of list + (*hash_value, (row + 1) as u64), + |(hash, _)| *hash, + ); + // chained list at (row + offset) is already initialized with 0 + // meaning end of list + } + } + } + + /// Returns all pairs of row indices matched by hash + fn get_matched_indices<'a>( + &self, + iter: impl Iterator, + deleted_offset: Option, + ) -> (UInt64BufferBuilder, UInt32BufferBuilder) { + let mut input_indices = UInt32BufferBuilder::new(0); + let mut matched_indices = UInt64BufferBuilder::new(0); + + let hash_map = self.get_map(); + let next_chain = self.get_list(); + for (row, hash_value) in iter { + // Get the hash and find it in the build index + + // For every item on the build and probe we check if it matches + // This possibly contains rows with hash collisions, + // So we have to check here whether rows are equal or not + if let Some((_, index)) = + hash_map.get(*hash_value, |(hash, _)| *hash_value == *hash) + { + let mut i = *index - 1; + loop { + let build_row_value = if let Some(offset) = deleted_offset { + // This arguments means that we prune the next index way before here. + if i < offset as u64 { + // End of the list due to pruning + break; + } + i - offset as u64 + } else { + i + }; + matched_indices.append(build_row_value); + input_indices.append(row as u32); + // Follow the chain to get the next index value + let next = next_chain[build_row_value as usize]; + if next == 0 { + // end of list + break; + } + i = next - 1; + } + } + } + + (matched_indices, input_indices) + } } /// Implementation of `JoinHashMapType` for `JoinHashMap`.