diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala index 9acee47c76..920c82e04d 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala @@ -178,13 +178,15 @@ class FlowWithContextSpec extends StreamSpec { .map(_._1) .unsafeDataVia(Flow.fromFunction[String, Int] { _.toInt }) - SourceWithContext - .fromTuples(Source(data)) - .via(baseFlow) - .runWith(TestSink.probe[(Int, Int)]) - .request(4) - .expectNext((1, 1), (2, 2), (3, 3), (4, 4)) - .expectComplete() + for (_ <- 0 until 64) { + SourceWithContext + .fromTuples(Source(data)) + .via(baseFlow) + .runWith(TestSink.probe[(Int, Int)]) + .request(4) + .expectNext((1, 1), (2, 2), (3, 3), (4, 4)) + .expectComplete() + } } "Apply a viaFlow with optional elements using unsafeOptionalVia" in { @@ -194,17 +196,19 @@ class FlowWithContextSpec extends StreamSpec { .asFlowWithContext[Option[String], Int, Int](collapseContext = Tuple2.apply)(extractContext = _._2) .map(_._1) - SourceWithContext - .fromTuples(Source(data)).via( - FlowWithContext.unsafeOptionalDataVia( - flow, - Flow.fromFunction { (string: String) => string.toInt } - )(Keep.none) - ) - .runWith(TestSink.probe[(Option[Int], Int)]) - .request(4) - .expectNext((Some(1), 1), (None, 2), (None, 3), (Some(4), 4)) - .expectComplete() + for (_ <- 0 until 64) { + SourceWithContext + .fromTuples(Source(data)).via( + FlowWithContext.unsafeOptionalDataVia( + flow, + Flow.fromFunction { (string: String) => string.toInt } + )(Keep.none) + ) + .runWith(TestSink.probe[(Option[Int], Int)]) + .request(4) + .expectNext((Some(1), 1), (None, 2), (None, 3), (Some(4), 4)) + .expectComplete() + } } } }