Skip to content

Commit abc18a5

Browse files
authored
feat: Add emitMulti with Spliterator support (#1776)
1 parent f3a8075 commit abc18a5

File tree

2 files changed

+55
-1
lines changed

2 files changed

+55
-1
lines changed

stream-tests/src/test/scala/org/apache/pekko/stream/impl/GraphStageLogicSpec.scala

+20
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,20 @@ class GraphStageLogicSpec extends StreamSpec with GraphInterpreterSpecKit with S
121121
override def toString = "GraphStageLogicSpec.emitEmptyIterable"
122122
}
123123

124+
object EmitSplitIterator extends GraphStage[SourceShape[Int]] {
125+
val out = Outlet[Int]("out")
126+
override val shape = SourceShape(out)
127+
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
128+
setHandler(out,
129+
new OutHandler {
130+
override def onPull(): Unit = emitMultiple(
131+
out,
132+
java.util.stream.Stream.of(1, 2, 3).spliterator(), () => emit(out, 42, () => completeStage()))
133+
})
134+
}
135+
override def toString = "GraphStageLogicSpec.emitEmptyIterable"
136+
}
137+
124138
private case class ReadNEmitN(n: Int) extends GraphStage[FlowShape[Int, Int]] {
125139
override val shape = FlowShape(Inlet[Int]("readN.in"), Outlet[Int]("readN.out"))
126140

@@ -196,6 +210,12 @@ class GraphStageLogicSpec extends StreamSpec with GraphInterpreterSpecKit with S
196210

197211
}
198212

213+
"emit properly when using split iterator" in {
214+
215+
Source.fromGraph(EmitSplitIterator).runWith(Sink.seq).futureValue should ===(List(1, 2, 3, 42))
216+
217+
}
218+
199219
"invoke lifecycle hooks in the right order" in {
200220
val g = new GraphStage[FlowShape[Int, Int]] {
201221
val in = Inlet[Int]("in")

stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala

+35-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import scala.annotation.tailrec
2121
import scala.collection.{ immutable, mutable }
2222
import scala.concurrent.{ Future, Promise }
2323
import scala.concurrent.duration.FiniteDuration
24-
2524
import org.apache.pekko
2625
import pekko.{ Done, NotUsed }
2726
import pekko.actor._
@@ -37,6 +36,8 @@ import pekko.stream.stage.ConcurrentAsyncCallbackState.{ NoPendingEvents, State
3736
import pekko.util.OptionVal
3837
import pekko.util.unused
3938

39+
import java.util.Spliterator
40+
4041
/**
4142
* Scala API: A GraphStage represents a reusable graph stream processing operator.
4243
*
@@ -979,6 +980,26 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
979980
}
980981
} else andThen()
981982

983+
/**
984+
* Emit a sequence of elements through the given outlet and continue with the given thunk
985+
* afterwards, suspending execution if necessary.
986+
* This action replaces the [[OutHandler]] for the given outlet if suspension
987+
* is needed and reinstalls the current handler upon receiving an `onPull()`
988+
* signal (before invoking the `andThen` function).
989+
*/
990+
final protected def emitMultiple[T](out: Outlet[T], elems: Spliterator[T], andThen: () => Unit): Unit = {
991+
val iter = new EmittingSpliterator[T](out, elems, getNonEmittingHandler(out), andThen)
992+
if (isAvailable(out)) {
993+
if (!iter.tryPush()) {
994+
andThen()
995+
} else {
996+
setOrAddEmitting(out, iter)
997+
}
998+
} else {
999+
setOrAddEmitting(out, iter)
1000+
}
1001+
}
1002+
9821003
/**
9831004
* Emit a sequence of elements through the given outlet, suspending execution if necessary.
9841005
* This action replaces the [[OutHandler]] for the given outlet if suspension
@@ -1118,6 +1139,19 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
11181139
}
11191140
}
11201141

1142+
private final class EmittingSpliterator[T](_out: Outlet[T], elems: Spliterator[T], _previous: OutHandler,
1143+
_andThen: () => Unit)
1144+
extends Emitting[T](_out, _previous, _andThen) with java.util.function.Consumer[T] {
1145+
1146+
override def onPull(): Unit = if (!elems.tryAdvance(this)) {
1147+
followUp()
1148+
}
1149+
1150+
def tryPush(): Boolean = elems.tryAdvance(this)
1151+
1152+
override def accept(elem: T): Unit = push(out, elem)
1153+
}
1154+
11211155
private class EmittingCompletion[T](_out: Outlet[T], _previous: OutHandler)
11221156
extends Emitting[T](_out, _previous, DoNothing) {
11231157
override def onPull(): Unit = complete(out)

0 commit comments

Comments
 (0)