diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala index cf3a9710e8..94c44005c1 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala @@ -605,10 +605,7 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I else if (head != finalOffset) { // If our final consumer goes away, we roll forward the buffer so a subsequent consumer does not // see the already consumed elements. This feature is quite handy. - while (head != finalOffset) { - queue(head & Mask) = null - head += 1 - } + cleanQueueInRange(head, finalOffset) head = finalOffset tryPull() } @@ -617,6 +614,20 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I } } + private def cleanQueueInRange(headOffset: Int, upToOffset: Int): Unit = { + // We need to clean the queue from headOffset to upToOffset + if (headOffset != upToOffset) { + val startIdx = headOffset & Mask + val endIdx = upToOffset & Mask + if (startIdx <= endIdx) { + java.util.Arrays.fill(queue, startIdx, endIdx, null) + } else { + java.util.Arrays.fill(queue, startIdx, queue.length, null) + java.util.Arrays.fill(queue, 0, endIdx, null) + } + } + } + // Producer API // We are full if the distance between the slowest (known) consumer and the fastest (known) consumer is // the buffer size. We must wait until the slowest either advances, or cancels.