Skip to content

Commit c953f50

Browse files
authored
chore: Avoid forwarding method on ArrayDequeue in stream module. (#1687)
1 parent 7782cf5 commit c953f50

File tree

3 files changed

+6
-6
lines changed

3 files changed

+6
-6
lines changed

Diff for: stream/src/main/scala/org/apache/pekko/stream/impl/ActorRefBackpressureSinkStage.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ import pekko.stream.stage._
4545
private val maxBuffer = inheritedAttributes.get[InputBuffer](InputBuffer(16, 16)).max
4646
require(maxBuffer > 0, "Buffer size must be greater than 0")
4747

48-
private val buffer: util.Deque[In] = new util.ArrayDeque[In]()
48+
private val buffer: util.ArrayDeque[In] = new util.ArrayDeque[In]()
4949
private var acknowledgementReceived = false
5050
private var completeReceived = false
5151
private var completionSignalled = false
@@ -75,7 +75,7 @@ import pekko.stream.stage._
7575
}
7676

7777
private def dequeueAndSend(): Unit = {
78-
ref ! messageAdapter(self)(buffer.poll())
78+
ref ! messageAdapter(self)(buffer.pollFirst())
7979
}
8080

8181
private def finish(): Unit = {
@@ -85,7 +85,7 @@ import pekko.stream.stage._
8585
}
8686

8787
def onPush(): Unit = {
88-
buffer.offer(grab(in))
88+
buffer.offerLast(grab(in))
8989
if (acknowledgementReceived) {
9090
dequeueAndSend()
9191
acknowledgementReceived = false

Diff for: stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -524,7 +524,7 @@ private final case class SavedIslandData(
524524
if (Debug) println(s"PUSH: $matValue => $matValueStack")
525525

526526
case Concat(first, next) =>
527-
if (next ne EmptyTraversal) traversalStack.add(next)
527+
if (next ne EmptyTraversal) traversalStack.addLast(next)
528528
nextStep = first
529529
case Pop =>
530530
val popped = matValueStack.removeLast()

Diff for: stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreter.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -793,7 +793,7 @@ import org.reactivestreams.Subscription
793793
else if (currentLimit == 0) {
794794
self ! Resume
795795
} else {
796-
shortCircuitBuffer.poll() match {
796+
shortCircuitBuffer.pollFirst() match {
797797
case b: BoundaryEvent => processEvent(b)
798798
case Resume => finishShellRegistration()
799799
case unexpected =>
@@ -842,7 +842,7 @@ import org.reactivestreams.Subscription
842842
override def postStop(): Unit = {
843843
if (shortCircuitBuffer ne null) {
844844
while (!shortCircuitBuffer.isEmpty) {
845-
shortCircuitBuffer.poll() match {
845+
shortCircuitBuffer.pollFirst() match {
846846
case b: BoundaryEvent =>
847847
// signal to telemetry that this event won't be processed
848848
b.cancel()

0 commit comments

Comments
 (0)