diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 2f1d652c8b97..5f52fa370bf5 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -625,7 +625,7 @@ impl ExecutionPlan for HashJoinExec { null_equals_null: self.null_equals_null, reservation, state: HashJoinStreamState::WaitBuildSide, - build_side_state: BuildSideState::Initial(BuildSideInitialState { left_fut }), + build_side: BuildSide::Initial(BuildSideInitialState { left_fut }), })) } @@ -788,51 +788,58 @@ where Ok(()) } -/// Represents state of HashJoin build-side. -enum BuildSideState { +/// Represents build-side of hash join. +enum BuildSide { /// Indicates that build-side not collected yet Initial(BuildSideInitialState), /// Indicates that build-side data has been collected Ready(BuildSideReadyState), } +/// Container for BuildSide::Initial related data struct BuildSideInitialState { + /// Future for building hash table from build-side input left_fut: OnceFut, } +/// Container for BuildSide::Ready related data struct BuildSideReadyState { + /// Collected build-side data left_data: Arc, + /// Which build-side rows have been matched while creating output. + /// For some OUTER joins, we need to know which rows have not been matched + /// to produce the correct output. visited_left_side: BooleanBufferBuilder, } -impl BuildSideState { - /// Tries to extract BuildSideInitialState from BuildSideState enum. +impl BuildSide { + /// Tries to extract BuildSideInitialState from BuildSide enum. /// Returns an error if state is not Initial. fn try_into_initial_mut(&mut self) -> Result<&mut BuildSideInitialState> { match self { - BuildSideState::Initial(state) => Ok(state), + BuildSide::Initial(state) => Ok(state), _ => Err(DataFusionError::Internal( "Expected build side in initial state".to_string(), )), } } - /// Tries to extract BuildSideReadyState from BuildSideState enum. + /// Tries to extract BuildSideReadyState from BuildSide enum. /// Returns an error if state is not Ready. fn try_into_ready(&self) -> Result<&BuildSideReadyState> { match self { - BuildSideState::Ready(state) => Ok(state), + BuildSide::Ready(state) => Ok(state), _ => Err(DataFusionError::Internal( "Expected build side in ready state".to_string(), )), } } - /// Tries to extract BuildSideReadyState from BuildSideState enum. + /// Tries to extract BuildSideReadyState from BuildSide enum. /// Returns an error if state is not Ready. fn try_into_ready_mut(&mut self) -> Result<&mut BuildSideReadyState> { match self { - BuildSideState::Ready(state) => Ok(state), + BuildSide::Ready(state) => Ok(state), _ => Err(DataFusionError::Internal( "Expected build side in ready state".to_string(), )), @@ -854,11 +861,15 @@ enum HashJoinStreamState { Completed, } +/// Container for HashJoinStreamState::ProcessProbeBatch related data struct ProcessProbeBatchState { + /// Current probe-side batch batch: RecordBatch, } impl HashJoinStreamState { + /// Tries to extract ProcessProbeBatchState from HashJoinStreamState enum. + /// Returns an error if state is not ProcessProbeBatchState. fn try_as_process_probe_batch(&self) -> Result<&ProcessProbeBatchState> { match self { HashJoinStreamState::ProcessProbeBatch(state) => Ok(state), @@ -900,10 +911,10 @@ struct HashJoinStream { null_equals_null: bool, /// Memory reservation reservation: MemoryReservation, - /// Stream state + /// State of the stream state: HashJoinStreamState, - /// Build side state - build_side_state: BuildSideState, + /// Build side + build_side: BuildSide, } impl RecordBatchStream for HashJoinStream { @@ -1179,9 +1190,9 @@ impl HashJoinStream { } } - /// Collects build-side data by polling `OnceFut` future from initial build-side state + /// Collects build-side data by polling `OnceFut` future from initialized build-side /// - /// Updates build-side state to `Ready`, and state to `FetchProbeSide` + /// Updates build-side to `Ready`, and state to `FetchProbeSide` fn collect_build_side( &mut self, cx: &mut std::task::Context<'_>, @@ -1189,7 +1200,7 @@ impl HashJoinStream { let build_timer = self.join_metrics.build_time.timer(); // build hash table from left (build) side, if not yet done let left_data = ready!(self - .build_side_state + .build_side .try_into_initial_mut()? .left_fut .get_shared(cx))?; @@ -1220,7 +1231,7 @@ impl HashJoinStream { }; self.state = HashJoinStreamState::FetchProbeBatch; - self.build_side_state = BuildSideState::Ready(BuildSideReadyState { + self.build_side = BuildSide::Ready(BuildSideReadyState { left_data, visited_left_side, }); @@ -1259,7 +1270,7 @@ impl HashJoinStream { &mut self, ) -> Result>> { let state = self.state.try_as_process_probe_batch()?; - let build_side = self.build_side_state.try_into_ready_mut()?; + let build_side = self.build_side.try_into_ready_mut()?; self.join_metrics.input_batches.add(1); self.join_metrics.input_rows.add(state.batch.num_rows()); @@ -1337,7 +1348,7 @@ impl HashJoinStream { return Ok(StatefulStreamResult::Continue); } - let build_side = self.build_side_state.try_into_ready()?; + let build_side = self.build_side.try_into_ready()?; // use the global left bitmap to produce the left indices and right indices let (left_side, right_side) = diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 26fb26d283ee..eae65ce9c26b 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -850,19 +850,18 @@ impl OnceFut { } } - /// Get the result of the computation as a shared reference if it is ready, without consuming it + /// Get shared reference to the result of the computation if it is ready, without consuming it pub(crate) fn get_shared(&mut self, cx: &mut Context<'_>) -> Poll>> { if let OnceFutState::Pending(fut) = &mut self.state { let r = ready!(fut.poll_unpin(cx)); self.state = OnceFutState::Ready(r); } - // Cannot use loop as this would trip up the borrow checker match &self.state { OnceFutState::Pending(_) => unreachable!(), OnceFutState::Ready(r) => Poll::Ready( r.clone() - .map_err(|e| DataFusionError::External(Box::new(e.clone()))), + .map_err(|e| DataFusionError::External(Box::new(e))), ), } }