Skip to content

Commit 7e0738a

Browse files
authored
Remove CoalescePartitions insertion from HashJoinExec (#15476)
* refactor: Catch PartitionMode:Auto in execute explicitly * refactor(hash_join): Move coalesce logic into function * refactor(hash_join): Move coalesce logic out of collect_left_input * refactor(hash_join): Execute build side earlier * chore(hash_join): Drop unnecessary clippy hint \o/ * Back out "refactor: Catch PartitionMode:Auto in execute explicitly" This backs out commit 4346cb8. * Remove CoalescePartitions from CollectLeft-HashJoins * Fix imports * Fix tests * Check CollectLeft-HJ distribution in execute
1 parent 9071503 commit 7e0738a

File tree

1 file changed

+34
-36
lines changed

1 file changed

+34
-36
lines changed

Diff for: datafusion/physical-plan/src/joins/hash_join.rs

+34-36
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ use crate::projection::{
4040
use crate::spill::get_record_batch_memory_size;
4141
use crate::ExecutionPlanProperties;
4242
use crate::{
43-
coalesce_partitions::CoalescePartitionsExec,
4443
common::can_project,
4544
handle_state,
4645
hash_utils::create_hashes,
@@ -792,34 +791,44 @@ impl ExecutionPlan for HashJoinExec {
792791
);
793792
}
794793

794+
if self.mode == PartitionMode::CollectLeft && left_partitions != 1 {
795+
return internal_err!(
796+
"Invalid HashJoinExec,the output partition count of the left child must be 1 in CollectLeft mode,\
797+
consider using CoalescePartitionsExec"
798+
);
799+
}
800+
795801
let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);
796802
let left_fut = match self.mode {
797-
PartitionMode::CollectLeft => self.left_fut.once(|| {
798-
let reservation =
799-
MemoryConsumer::new("HashJoinInput").register(context.memory_pool());
800-
collect_left_input(
801-
None,
802-
self.random_state.clone(),
803-
Arc::clone(&self.left),
804-
on_left.clone(),
805-
Arc::clone(&context),
806-
join_metrics.clone(),
807-
reservation,
808-
need_produce_result_in_final(self.join_type),
809-
self.right().output_partitioning().partition_count(),
810-
)
811-
}),
803+
PartitionMode::CollectLeft => {
804+
let left_stream = self.left.execute(0, Arc::clone(&context))?;
805+
806+
self.left_fut.once(|| {
807+
let reservation = MemoryConsumer::new("HashJoinInput")
808+
.register(context.memory_pool());
809+
810+
collect_left_input(
811+
self.random_state.clone(),
812+
left_stream,
813+
on_left.clone(),
814+
join_metrics.clone(),
815+
reservation,
816+
need_produce_result_in_final(self.join_type),
817+
self.right().output_partitioning().partition_count(),
818+
)
819+
})
820+
}
812821
PartitionMode::Partitioned => {
822+
let left_stream = self.left.execute(partition, Arc::clone(&context))?;
823+
813824
let reservation =
814825
MemoryConsumer::new(format!("HashJoinInput[{partition}]"))
815826
.register(context.memory_pool());
816827

817828
OnceFut::new(collect_left_input(
818-
Some(partition),
819829
self.random_state.clone(),
820-
Arc::clone(&self.left),
830+
left_stream,
821831
on_left.clone(),
822-
Arc::clone(&context),
823832
join_metrics.clone(),
824833
reservation,
825834
need_produce_result_in_final(self.join_type),
@@ -930,36 +939,22 @@ impl ExecutionPlan for HashJoinExec {
930939

931940
/// Reads the left (build) side of the input, buffering it in memory, to build a
932941
/// hash table (`LeftJoinData`)
933-
#[allow(clippy::too_many_arguments)]
934942
async fn collect_left_input(
935-
partition: Option<usize>,
936943
random_state: RandomState,
937-
left: Arc<dyn ExecutionPlan>,
944+
left_stream: SendableRecordBatchStream,
938945
on_left: Vec<PhysicalExprRef>,
939-
context: Arc<TaskContext>,
940946
metrics: BuildProbeJoinMetrics,
941947
reservation: MemoryReservation,
942948
with_visited_indices_bitmap: bool,
943949
probe_threads_count: usize,
944950
) -> Result<JoinLeftData> {
945-
let schema = left.schema();
946-
947-
let (left_input, left_input_partition) = if let Some(partition) = partition {
948-
(left, partition)
949-
} else if left.output_partitioning().partition_count() != 1 {
950-
(Arc::new(CoalescePartitionsExec::new(left)) as _, 0)
951-
} else {
952-
(left, 0)
953-
};
954-
955-
// Depending on partition argument load single partition or whole left side in memory
956-
let stream = left_input.execute(left_input_partition, Arc::clone(&context))?;
951+
let schema = left_stream.schema();
957952

958953
// This operation performs 2 steps at once:
959954
// 1. creates a [JoinHashMap] of all batches from the stream
960955
// 2. stores the batches in a vector.
961956
let initial = (Vec::new(), 0, metrics, reservation);
962-
let (batches, num_rows, metrics, mut reservation) = stream
957+
let (batches, num_rows, metrics, mut reservation) = left_stream
963958
.try_fold(initial, |mut acc, batch| async {
964959
let batch_size = get_record_batch_memory_size(&batch);
965960
// Reserve memory for incoming batch
@@ -1655,6 +1650,7 @@ impl EmbeddedProjection for HashJoinExec {
16551650
#[cfg(test)]
16561651
mod tests {
16571652
use super::*;
1653+
use crate::coalesce_partitions::CoalescePartitionsExec;
16581654
use crate::test::TestMemoryExec;
16591655
use crate::{
16601656
common, expressions::Column, repartition::RepartitionExec, test::build_table_i32,
@@ -2105,6 +2101,7 @@ mod tests {
21052101
let left =
21062102
TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
21072103
.unwrap();
2104+
let left = Arc::new(CoalescePartitionsExec::new(left));
21082105

21092106
let right = build_table(
21102107
("a1", &vec![1, 2, 3]),
@@ -2177,6 +2174,7 @@ mod tests {
21772174
let left =
21782175
TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
21792176
.unwrap();
2177+
let left = Arc::new(CoalescePartitionsExec::new(left));
21802178
let right = build_table(
21812179
("a2", &vec![20, 30, 10]),
21822180
("b2", &vec![5, 6, 4]),

0 commit comments

Comments
 (0)