Skip to content

Commit a71d296

Browse files
committed
Add resume, restart, stop strategy test
1 parent 63d191f commit a71d296

File tree

1 file changed

+113
-1
lines changed

1 file changed

+113
-1
lines changed

stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala

Lines changed: 113 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import org.apache.pekko
3535
import pekko.Done
3636
import pekko.stream.{ AbruptTerminationException, ActorAttributes, ActorMaterializer, SystemMaterializer }
3737
import pekko.stream.ActorAttributes.supervisionStrategy
38-
import pekko.stream.Supervision.{ restartingDecider, resumingDecider }
38+
import pekko.stream.Supervision.{ restartingDecider, resumingDecider, stoppingDecider }
3939
import pekko.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor }
4040
import pekko.stream.impl.StreamSupervisor.Children
4141
import pekko.stream.testkit.{ StreamSpec, TestSubscriber }
@@ -518,6 +518,118 @@ class FlowMapWithResourceSpec extends StreamSpec(UnboundedMailboxConfig) {
518518
closedCounter.get shouldBe 1
519519
}
520520

521+
"continue with autoCloseable when Strategy is Resume and exception happened" in {
522+
val closedCounter = new AtomicInteger(0)
523+
val create = () =>
524+
new AutoCloseable {
525+
override def close(): Unit = closedCounter.incrementAndGet()
526+
}
527+
val p = Source
528+
.fromIterator(() => (0 to 50).iterator)
529+
.mapWithResource(create,
530+
(_: AutoCloseable, elem) => {
531+
if (elem == 10) throw TE("") else elem
532+
})
533+
.withAttributes(supervisionStrategy(resumingDecider))
534+
.runWith(Sink.asPublisher(false))
535+
val c = TestSubscriber.manualProbe[Int]()
536+
537+
p.subscribe(c)
538+
val sub = c.expectSubscription()
539+
540+
(0 to 48).foreach(i => {
541+
sub.request(1)
542+
c.expectNext() should ===(if (i < 10) i else i + 1)
543+
})
544+
sub.request(1)
545+
c.expectNext(50)
546+
c.expectComplete()
547+
closedCounter.get shouldBe 1
548+
}
549+
550+
"continue with autoCloseable when Strategy is Resume and exception happened on map" in {
551+
val closedCounter = new AtomicInteger(0)
552+
val create = () =>
553+
new AutoCloseable {
554+
override def close(): Unit = closedCounter.incrementAndGet()
555+
}
556+
val p = Source
557+
.fromIterator(() => (0 to 50).iterator)
558+
.mapWithResource(create, (_: AutoCloseable, elem) => elem)
559+
.map(elem => {
560+
if (elem == 10) throw TE("") else elem
561+
})
562+
.withAttributes(supervisionStrategy(resumingDecider))
563+
.runWith(Sink.asPublisher(false))
564+
val c = TestSubscriber.manualProbe[Int]()
565+
566+
p.subscribe(c)
567+
val sub = c.expectSubscription()
568+
569+
(0 to 48).foreach(i => {
570+
sub.request(1)
571+
c.expectNext() should ===(if (i < 10) i else i + 1)
572+
})
573+
sub.request(1)
574+
c.expectNext(50)
575+
c.expectComplete()
576+
closedCounter.get shouldBe 1
577+
}
578+
579+
"close and open stream with autocloseable again when Strategy is Restart" in {
580+
val closedCounter = new AtomicInteger(0)
581+
val create = () =>
582+
new AutoCloseable {
583+
override def close(): Unit = closedCounter.incrementAndGet()
584+
}
585+
val p = Source
586+
.fromIterator(() => (0 to 50).iterator)
587+
.mapWithResource(create,
588+
(_: AutoCloseable, elem) => {
589+
if (elem == 10) throw TE("") else elem
590+
})
591+
.withAttributes(supervisionStrategy(restartingDecider))
592+
.runWith(Sink.asPublisher(false))
593+
val c = TestSubscriber.manualProbe[Int]()
594+
595+
p.subscribe(c)
596+
val sub = c.expectSubscription()
597+
598+
(0 to 18).foreach(i => {
599+
sub.request(1)
600+
c.expectNext() should ===(if (i < 10) i else i + 1)
601+
})
602+
sub.cancel()
603+
}
604+
605+
"stop stream with autoCloseable when Strategy is Stop and exception happened" in {
606+
val closedCounter = new AtomicInteger(0)
607+
val create = () =>
608+
new AutoCloseable {
609+
override def close(): Unit = closedCounter.incrementAndGet()
610+
}
611+
val p = Source
612+
.fromIterator(() => (0 to 50).iterator)
613+
.mapWithResource(create,
614+
(_: AutoCloseable, elem) => {
615+
if (elem == 10) throw TE("") else elem
616+
})
617+
.withAttributes(supervisionStrategy(stoppingDecider))
618+
.runWith(Sink.asPublisher(false))
619+
val c = TestSubscriber.manualProbe[Int]()
620+
621+
p.subscribe(c)
622+
val sub = c.expectSubscription()
623+
624+
(0 to 9).foreach(i => {
625+
sub.request(1)
626+
c.expectNext() shouldBe i
627+
})
628+
sub.request(1)
629+
c.expectError()
630+
closedCounter.get shouldBe 1
631+
}
632+
521633
}
522634
override def afterTermination(): Unit = {
523635
fs.close()

0 commit comments

Comments
 (0)