From f3ca3d062e6b3f2af9e4c2b3f77a15080858ab69 Mon Sep 17 00:00:00 2001 From: "He-Pin(kerr)" Date: Mon, 6 Jan 2025 00:06:49 +0800 Subject: [PATCH] chore: Test more rounds for unsafeDataVia keeping order. (#1680) (cherry picked from commit 2ee37454cb4a9058f4ad57b818f5d8cb783ba820) --- .../stream/scaladsl/FlowWithContextSpec.scala | 24 ++++++++++--------- .../scaladsl/SourceWithContextSpec.scala | 18 +++++++------- 2 files changed, 23 insertions(+), 19 deletions(-) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala index 9acee47c76d..3fb0252dfe6 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala @@ -194,17 +194,19 @@ class FlowWithContextSpec extends StreamSpec { .asFlowWithContext[Option[String], Int, Int](collapseContext = Tuple2.apply)(extractContext = _._2) .map(_._1) - SourceWithContext - .fromTuples(Source(data)).via( - FlowWithContext.unsafeOptionalDataVia( - flow, - Flow.fromFunction { (string: String) => string.toInt } - )(Keep.none) - ) - .runWith(TestSink.probe[(Option[Int], Int)]) - .request(4) - .expectNext((Some(1), 1), (None, 2), (None, 3), (Some(4), 4)) - .expectComplete() + for (_ <- 0 until 64) { + SourceWithContext + .fromTuples(Source(data)).via( + FlowWithContext.unsafeOptionalDataVia( + flow, + Flow.fromFunction { (string: String) => string.toInt } + )(Keep.none) + ) + .runWith(TestSink.probe[(Option[Int], Int)]) + .request(4) + .expectNext((Some(1), 1), (None, 2), (None, 3), (Some(4), 4)) + .expectComplete() + } } } } diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala index 06650a7b774..3d05bd81ba3 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala @@ -242,14 +242,16 @@ class SourceWithContextSpec extends StreamSpec { val source = SourceWithContext.fromTuples(Source(data)) - SourceWithContext.unsafeOptionalDataVia( - source, - Flow.fromFunction { (string: String) => string.toInt } - )(Keep.none) - .runWith(TestSink.probe[(Option[Int], Int)]) - .request(4) - .expectNext((Some(1), 1), (None, 2), (None, 3), (Some(4), 4)) - .expectComplete() + for (_ <- 0 until 64) { + SourceWithContext.unsafeOptionalDataVia( + source, + Flow.fromFunction { (string: String) => string.toInt } + )(Keep.none) + .runWith(TestSink.probe[(Option[Int], Int)]) + .request(4) + .expectNext((Some(1), 1), (None, 2), (None, 3), (Some(4), 4)) + .expectComplete() + } } } }