diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala index dc84291b2e..c674e78a31 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala @@ -211,7 +211,6 @@ import pekko.util.ccompat.JavaConverters._ tailSource.setHandler(subHandler) setKeepGoing(true) scheduleOnce(SubscriptionTimer, timeout) - builder = null Source.fromGraph(tailSource.source) } @@ -222,13 +221,16 @@ import pekko.util.ccompat.JavaConverters._ builder += grab(in) left -= 1 if (left == 0) { - push(out, (builder.result(), openSubstream())) + val prefix = builder.result() + builder = null // free for GC + push(out, (prefix, openSubstream())) complete(out) } else pull(in) } } override def onPull(): Unit = { if (left == 0) { + builder = null // free for GC push(out, (Nil, openSubstream())) complete(out) } else pull(in) @@ -237,7 +239,9 @@ import pekko.util.ccompat.JavaConverters._ override def onUpstreamFinish(): Unit = { if (!prefixComplete) { // This handles the unpulled out case as well - emit(out, (builder.result(), Source.empty), () => completeStage()) + val prefix = builder.result(); + builder = null // free for GC + emit(out, (prefix, Source.empty), () => completeStage()) } else { if (!tailSource.isClosed) tailSource.complete() completeStage()