From 18747bd84ca88165c5cf114a27f8469e8a2ca1fc Mon Sep 17 00:00:00 2001 From: injae-kim Date: Wed, 31 Jan 2024 03:04:02 +0900 Subject: [PATCH] Address comment --- .../scaladsl/FlowMapWithResourceSpec.scala | 36 +++---------------- 1 file changed, 4 insertions(+), 32 deletions(-) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala index a0fc9c7f9fa..56db0ed6254 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala @@ -547,35 +547,6 @@ class FlowMapWithResourceSpec extends StreamSpec(UnboundedMailboxConfig) { closedCounter.get shouldBe 1 } - "continue with autoCloseable when Strategy is Resume and exception happened on map" in { - val closedCounter = new AtomicInteger(0) - val create = () => - new AutoCloseable { - override def close(): Unit = closedCounter.incrementAndGet() - } - val p = Source - .fromIterator(() => (0 to 50).iterator) - .mapWithResource(create, (_: AutoCloseable, elem) => elem) - .map(elem => { - if (elem == 10) throw TE("") else elem - }) - .withAttributes(supervisionStrategy(resumingDecider)) - .runWith(Sink.asPublisher(false)) - val c = TestSubscriber.manualProbe[Int]() - - p.subscribe(c) - val sub = c.expectSubscription() - - (0 to 48).foreach(i => { - sub.request(1) - c.expectNext() should ===(if (i < 10) i else i + 1) - }) - sub.request(1) - c.expectNext(50) - c.expectComplete() - closedCounter.get shouldBe 1 - } - "close and open stream with autocloseable again when Strategy is Restart" in { val closedCounter = new AtomicInteger(0) val create = () => @@ -586,7 +557,7 @@ class FlowMapWithResourceSpec extends StreamSpec(UnboundedMailboxConfig) { .fromIterator(() => (0 to 50).iterator) .mapWithResource(create, (_: AutoCloseable, elem) => { - if (elem == 10) throw TE("") else elem + if (elem == 10 || elem == 20) throw TE("") else elem }) .withAttributes(supervisionStrategy(restartingDecider)) .runWith(Sink.asPublisher(false)) @@ -595,9 +566,10 @@ class FlowMapWithResourceSpec extends StreamSpec(UnboundedMailboxConfig) { p.subscribe(c) val sub = c.expectSubscription() - (0 to 18).foreach(i => { + (0 to 30).filter(i => i != 10 && i != 20).foreach(i => { sub.request(1) - c.expectNext() should ===(if (i < 10) i else i + 1) + c.expectNext() shouldBe i + closedCounter.get should ===(if (i < 10) 0 else if (i < 20) 1 else 2) }) sub.cancel() }