Skip to content

Commit

Permalink
Optimize closeState flag (#872)
Browse files Browse the repository at this point in the history
Co-authored-by: jxnu-liguobin <[email protected]>
  • Loading branch information
jxnu-liguobin and jxnu-liguobin authored Dec 25, 2023
1 parent 16e587d commit 417196f
Showing 1 changed file with 3 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2251,8 +2251,8 @@ private[pekko] final class StatefulMap[S, In, Out](create: () => S, f: (S, In) =
override def onUpstreamFailure(ex: Throwable): Unit = closeStateAndFail(ex)

override def onDownstreamFinish(cause: Throwable): Unit = {
onComplete(state)
needInvokeOnCompleteCallback = false
onComplete(state)
super.onDownstreamFinish(cause)
}

Expand All @@ -2265,19 +2265,19 @@ private[pekko] final class StatefulMap[S, In, Out](create: () => S, f: (S, In) =
}

private def closeStateAndComplete(): Unit = {
needInvokeOnCompleteCallback = false
onComplete(state) match {
case Some(elem) => emit(out, elem, () => completeStage())
case None => completeStage()
}
needInvokeOnCompleteCallback = false
}

private def closeStateAndFail(ex: Throwable): Unit = {
needInvokeOnCompleteCallback = false
onComplete(state) match {
case Some(elem) => emit(out, elem, () => failStage(ex))
case None => failStage(ex)
}
needInvokeOnCompleteCallback = false
}

override def onPull(): Unit = pull(in)
Expand Down

0 comments on commit 417196f

Please sign in to comment.