From 735a60683024fbbdf74771376d7e9e65e1346be7 Mon Sep 17 00:00:00 2001 From: "He-Pin(kerr)" Date: Mon, 30 Dec 2024 13:05:01 +0800 Subject: [PATCH] perf: Reduce loops in when clean queue in BroadcastHub (#1628) (cherry picked from commit 9596ea4c1827701ab7f540897bba9f6209ceef15) --- .../apache/pekko/stream/scaladsl/Hub.scala | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala index cf3a9710e86..94c44005c15 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala @@ -605,10 +605,7 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I else if (head != finalOffset) { // If our final consumer goes away, we roll forward the buffer so a subsequent consumer does not // see the already consumed elements. This feature is quite handy. - while (head != finalOffset) { - queue(head & Mask) = null - head += 1 - } + cleanQueueInRange(head, finalOffset) head = finalOffset tryPull() } @@ -617,6 +614,20 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I } } + private def cleanQueueInRange(headOffset: Int, upToOffset: Int): Unit = { + // We need to clean the queue from headOffset to upToOffset + if (headOffset != upToOffset) { + val startIdx = headOffset & Mask + val endIdx = upToOffset & Mask + if (startIdx <= endIdx) { + java.util.Arrays.fill(queue, startIdx, endIdx, null) + } else { + java.util.Arrays.fill(queue, startIdx, queue.length, null) + java.util.Arrays.fill(queue, 0, endIdx, null) + } + } + } + // Producer API // We are full if the distance between the slowest (known) consumer and the fastest (known) consumer is // the buffer size. We must wait until the slowest either advances, or cancels.