diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala index 9acee47c76..3fb0252dfe 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala @@ -194,17 +194,19 @@ class FlowWithContextSpec extends StreamSpec { .asFlowWithContext[Option[String], Int, Int](collapseContext = Tuple2.apply)(extractContext = _._2) .map(_._1) - SourceWithContext - .fromTuples(Source(data)).via( - FlowWithContext.unsafeOptionalDataVia( - flow, - Flow.fromFunction { (string: String) => string.toInt } - )(Keep.none) - ) - .runWith(TestSink.probe[(Option[Int], Int)]) - .request(4) - .expectNext((Some(1), 1), (None, 2), (None, 3), (Some(4), 4)) - .expectComplete() + for (_ <- 0 until 64) { + SourceWithContext + .fromTuples(Source(data)).via( + FlowWithContext.unsafeOptionalDataVia( + flow, + Flow.fromFunction { (string: String) => string.toInt } + )(Keep.none) + ) + .runWith(TestSink.probe[(Option[Int], Int)]) + .request(4) + .expectNext((Some(1), 1), (None, 2), (None, 3), (Some(4), 4)) + .expectComplete() + } } } } diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala index 06650a7b77..3d05bd81ba 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala @@ -242,14 +242,16 @@ class SourceWithContextSpec extends StreamSpec { val source = SourceWithContext.fromTuples(Source(data)) - SourceWithContext.unsafeOptionalDataVia( - source, - Flow.fromFunction { (string: String) => string.toInt } - )(Keep.none) - .runWith(TestSink.probe[(Option[Int], Int)]) - .request(4) - .expectNext((Some(1), 1), (None, 2), (None, 3), (Some(4), 4)) - .expectComplete() + for (_ <- 0 until 64) { + SourceWithContext.unsafeOptionalDataVia( + source, + Flow.fromFunction { (string: String) => string.toInt } + )(Keep.none) + .runWith(TestSink.probe[(Option[Int], Int)]) + .request(4) + .expectNext((Some(1), 1), (None, 2), (None, 3), (Some(4), 4)) + .expectComplete() + } } } }