Skip to content

Commit

Permalink
Address comment
Browse files Browse the repository at this point in the history
  • Loading branch information
injae-kim committed Jan 30, 2024
1 parent a71d296 commit 18747bd
Showing 1 changed file with 4 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -547,35 +547,6 @@ class FlowMapWithResourceSpec extends StreamSpec(UnboundedMailboxConfig) {
closedCounter.get shouldBe 1
}

"continue with autoCloseable when Strategy is Resume and exception happened on map" in {
val closedCounter = new AtomicInteger(0)
val create = () =>
new AutoCloseable {
override def close(): Unit = closedCounter.incrementAndGet()
}
val p = Source
.fromIterator(() => (0 to 50).iterator)
.mapWithResource(create, (_: AutoCloseable, elem) => elem)
.map(elem => {
if (elem == 10) throw TE("") else elem
})
.withAttributes(supervisionStrategy(resumingDecider))
.runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[Int]()

p.subscribe(c)
val sub = c.expectSubscription()

(0 to 48).foreach(i => {
sub.request(1)
c.expectNext() should ===(if (i < 10) i else i + 1)
})
sub.request(1)
c.expectNext(50)
c.expectComplete()
closedCounter.get shouldBe 1
}

"close and open stream with autocloseable again when Strategy is Restart" in {
val closedCounter = new AtomicInteger(0)
val create = () =>
Expand All @@ -586,7 +557,7 @@ class FlowMapWithResourceSpec extends StreamSpec(UnboundedMailboxConfig) {
.fromIterator(() => (0 to 50).iterator)
.mapWithResource(create,
(_: AutoCloseable, elem) => {
if (elem == 10) throw TE("") else elem
if (elem == 10 || elem == 20) throw TE("") else elem
})
.withAttributes(supervisionStrategy(restartingDecider))
.runWith(Sink.asPublisher(false))
Expand All @@ -595,9 +566,10 @@ class FlowMapWithResourceSpec extends StreamSpec(UnboundedMailboxConfig) {
p.subscribe(c)
val sub = c.expectSubscription()

(0 to 18).foreach(i => {
(0 to 30).filter(i => i != 10 && i != 20).foreach(i => {
sub.request(1)
c.expectNext() should ===(if (i < 10) i else i + 1)
c.expectNext() shouldBe i
closedCounter.get should ===(if (i < 10) 0 else if (i < 20) 1 else 2)
})
sub.cancel()
}
Expand Down

0 comments on commit 18747bd

Please sign in to comment.