Skip to content

Commit bc37570

Browse files
committed
chore: Fix leak in FlatMapPrefix operator.
1 parent d34927b commit bc37570

File tree

1 file changed

+14
-11
lines changed

1 file changed

+14
-11
lines changed

Diff for: stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlatMapPrefix.scala

+14-11
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,10 @@ import pekko.util.OptionVal
4343
.mandatoryAttribute[Attributes.NestedMaterializationCancellationPolicy]
4444
.propagateToNestedMaterialization
4545
val matPromise = Promise[M]()
46-
val logic = new GraphStageLogic(shape) with InHandler with OutHandler {
47-
val accumulated = collection.mutable.Buffer.empty[In]
46+
object logic extends GraphStageLogic(shape) with InHandler with OutHandler {
47+
private var left = n
48+
private var builder = Vector.newBuilder[In]
49+
builder.sizeHint(left)
4850

4951
private var subSource = OptionVal.none[SubSourceOutlet[In]]
5052
private var subSink = OptionVal.none[SubSinkInlet[Out]]
@@ -65,11 +67,12 @@ import pekko.util.OptionVal
6567
subSource match {
6668
case OptionVal.Some(s) => s.push(grab(in))
6769
case _ =>
68-
accumulated.append(grab(in))
69-
if (accumulated.size == n) {
70+
builder += grab(in)
71+
left -= 1
72+
if (left == 0) {
7073
materializeFlow()
7174
} else {
72-
// gi'me some more!
75+
// give me some more!
7376
pull(in)
7477
}
7578
}
@@ -98,12 +101,12 @@ import pekko.util.OptionVal
98101
// delegate to subSink
99102
s.pull()
100103
case _ =>
101-
if (accumulated.size < n) pull(in)
102-
else if (accumulated.size == n) {
104+
if (left > 0) pull(in)
105+
else if (left == 0) {
103106
// corner case for n = 0, can be handled in FlowOps
104107
materializeFlow()
105108
} else {
106-
throw new IllegalStateException(s"Unexpected accumulated size: ${accumulated.size} (n: $n)")
109+
throw new IllegalStateException(s"Unexpected accumulated size, left : $left (n: $n)")
107110
}
108111
}
109112
}
@@ -114,7 +117,7 @@ import pekko.util.OptionVal
114117
case _ =>
115118
if (propagateToNestedMaterialization) {
116119
downstreamCause = OptionVal.Some(cause)
117-
if (accumulated.size == n) {
120+
if (left == 0) {
118121
// corner case for n = 0, can be handled in FlowOps
119122
materializeFlow()
120123
} else if (!hasBeenPulled(in)) { // if in was already closed, nested flow would have already been materialized
@@ -128,8 +131,8 @@ import pekko.util.OptionVal
128131

129132
def materializeFlow(): Unit =
130133
try {
131-
val prefix = accumulated.toVector
132-
accumulated.clear()
134+
val prefix = builder.result()
135+
builder = null // free for GC
133136
subSource = OptionVal.Some(new SubSourceOutlet[In]("FlatMapPrefix.subSource"))
134137
val theSubSource = subSource.get
135138
subSink = OptionVal.Some(new SubSinkInlet[Out]("FlatMapPrefix.subSink"))

0 commit comments

Comments
 (0)