diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/mapWithResource.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/mapWithResource.md index 015d149e99b..d18bb0b593f 100644 --- a/docs/src/main/paradox/stream/operators/Source-or-Flow/mapWithResource.md +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/mapWithResource.md @@ -6,11 +6,15 @@ Map elements with the help of a resource that can be opened, transform each elem ## Signature -@apidoc[Flow.mapWithResource](Flow) { scala="#mapWithResource%5BS%2C%20T%5D%28create%3A%20%28%29%20%3D%3E%20S%29%28f%3A%20%28S%2C%20Out%29%20%3D%3E%20T%2C%20close%3A%20S%20%3D%3E%20Option%5BT%5D%29%3A%20Repr%5BT%5D" java="#mapWithResource(org.apache.pekko.japi.function.Creator,org.apache.pekko.japi.function.Function2,org.apache.pekko.japi.function.Function)" } +@apidoc[Flow.mapWithResource](Flow) { scala="#mapWithResource[S,T](create:()=%3ES)(f:(S,Out)=%3ET,close:S=%3EOption[T]):Repr[T]" java="#mapWithResource(org.apache.pekko.japi.function.Creator,org.apache.pekko.japi.function.Function2,org.apache.pekko.japi.function.Function)" } 1. `create`: Open or Create the resource. 2. `f`: Transform each element inputs with the help of resource. 3. `close`: Close the resource, invoked on end of stream or if the stream fails, optionally outputting a last element. +@apidoc[Flow.mapWithResource](Flow) { scala="#mapWithResource[S%3C:AutoCloseable,T](create:()=%3ES,f:(S,Out)=%3ET):Repr[T]" java="#mapWithResource(org.apache.pekko.japi.function.Creator,org.apache.pekko.japi.function.Function2)" } +1. `create`: Open or Create the autocloseable resource. +2. `f`: Transform each element inputs with the help of resource. + ## Description Transform each stream element with the help of a resource. diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java index bdfcf3a6d5a..4110cf61ee8 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java @@ -37,6 +37,7 @@ import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -237,6 +238,21 @@ public void mustBeAbleToUseMapWithResource() { Assert.assertFalse(gate.get()); } + @Test + public void mustBeAbleToUseMapWithAutoCloseableResource() { + final TestKit probe = new TestKit(system); + final AtomicInteger closed = new AtomicInteger(); + Source.from(Arrays.asList("1", "2", "3")) + .via( + Flow.of(String.class) + .mapWithResource( + () -> (AutoCloseable) closed::incrementAndGet, (resource, elem) -> elem)) + .runWith(Sink.foreach(elem -> probe.getRef().tell(elem, ActorRef.noSender())), system); + + probe.expectMsgAllOf("1", "2", "3"); + Assert.assertEquals(closed.get(), 1); + } + @Test public void mustBeAbleToUseFoldWhile() throws Exception { final int result = diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java index b1ddb49111d..997d401a830 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java @@ -47,6 +47,7 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -815,6 +816,18 @@ public void mustBeAbleToUseMapWithResource() { Assert.assertFalse(gate.get()); } + @Test + public void mustBeAbleToUseMapWithAutoCloseableResource() { + final TestKit probe = new TestKit(system); + final AtomicInteger closed = new AtomicInteger(); + Source.from(Arrays.asList("1", "2", "3")) + .mapWithResource(() -> (AutoCloseable) closed::incrementAndGet, (resource, elem) -> elem) + .runWith(Sink.foreach(elem -> probe.getRef().tell(elem, ActorRef.noSender())), system); + + probe.expectMsgAllOf("1", "2", "3"); + Assert.assertEquals(closed.get(), 1); + } + @Test public void mustBeAbleToUseFoldWhile() throws Exception { final int result = diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala index 32eae23712c..56db0ed6254 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala @@ -35,7 +35,7 @@ import org.apache.pekko import pekko.Done import pekko.stream.{ AbruptTerminationException, ActorAttributes, ActorMaterializer, SystemMaterializer } import pekko.stream.ActorAttributes.supervisionStrategy -import pekko.stream.Supervision.{ restartingDecider, resumingDecider } +import pekko.stream.Supervision.{ restartingDecider, resumingDecider, stoppingDecider } import pekko.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor } import pekko.stream.impl.StreamSupervisor.Children import pekko.stream.testkit.{ StreamSpec, TestSubscriber } @@ -410,6 +410,198 @@ class FlowMapWithResourceSpec extends StreamSpec(UnboundedMailboxConfig) { Await.result(promise.future, 3.seconds) shouldBe Done } + "will close the autocloseable resource when upstream complete" in { + val closedCounter = new AtomicInteger(0) + val create = () => + new AutoCloseable { + override def close(): Unit = closedCounter.incrementAndGet() + } + val (pub, sub) = TestSource + .probe[Int] + .mapWithResource(create, (_: AutoCloseable, count) => count) + .toMat(TestSink.probe)(Keep.both) + .run() + sub.expectSubscription().request(2) + closedCounter.get shouldBe 0 + pub.sendNext(1) + sub.expectNext(1) + closedCounter.get shouldBe 0 + pub.sendComplete() + sub.expectComplete() + closedCounter.get shouldBe 1 + } + + "will close the autocloseable resource when upstream fail" in { + val closedCounter = new AtomicInteger(0) + val create = () => + new AutoCloseable { + override def close(): Unit = closedCounter.incrementAndGet() + } + val (pub, sub) = TestSource + .probe[Int] + .mapWithResource(create, (_: AutoCloseable, count) => count) + .toMat(TestSink.probe)(Keep.both) + .run() + sub.expectSubscription().request(2) + closedCounter.get shouldBe 0 + pub.sendNext(1) + sub.expectNext(1) + closedCounter.get shouldBe 0 + pub.sendError(ex) + sub.expectError(ex) + closedCounter.get shouldBe 1 + } + + "will close the autocloseable resource when downstream cancel" in { + val closedCounter = new AtomicInteger(0) + val create = () => + new AutoCloseable { + override def close(): Unit = closedCounter.incrementAndGet() + } + val (pub, sub) = TestSource + .probe[Int] + .mapWithResource(create, (_: AutoCloseable, count) => count) + .toMat(TestSink.probe)(Keep.both) + .run() + val subscription = sub.expectSubscription() + subscription.request(2) + closedCounter.get shouldBe 0 + pub.sendNext(1) + sub.expectNext(1) + closedCounter.get shouldBe 0 + subscription.cancel() + pub.expectCancellation() + closedCounter.get shouldBe 1 + } + + "will close the autocloseable resource when downstream fail" in { + val closedCounter = new AtomicInteger(0) + val create = () => + new AutoCloseable { + override def close(): Unit = closedCounter.incrementAndGet() + } + val (pub, sub) = TestSource + .probe[Int] + .mapWithResource(create, (_: AutoCloseable, count) => count) + .toMat(TestSink.probe)(Keep.both) + .run() + sub.request(2) + closedCounter.get shouldBe 0 + pub.sendNext(1) + sub.expectNext(1) + closedCounter.get shouldBe 0 + sub.cancel(ex) + pub.expectCancellationWithCause(ex) + closedCounter.get shouldBe 1 + } + + "will close the autocloseable resource on abrupt materializer termination" in { + val closedCounter = new AtomicInteger(0) + @nowarn("msg=deprecated") + val mat = ActorMaterializer() + val promise = Promise[Done]() + val create = () => + new AutoCloseable { + override def close(): Unit = { + closedCounter.incrementAndGet() + promise.complete(Success(Done)) + } + } + val matVal = Source + .single(1) + .mapWithResource(create, (_: AutoCloseable, count) => count) + .runWith(Sink.never)(mat) + closedCounter.get shouldBe 0 + mat.shutdown() + matVal.failed.futureValue shouldBe an[AbruptTerminationException] + Await.result(promise.future, 3.seconds) shouldBe Done + closedCounter.get shouldBe 1 + } + + "continue with autoCloseable when Strategy is Resume and exception happened" in { + val closedCounter = new AtomicInteger(0) + val create = () => + new AutoCloseable { + override def close(): Unit = closedCounter.incrementAndGet() + } + val p = Source + .fromIterator(() => (0 to 50).iterator) + .mapWithResource(create, + (_: AutoCloseable, elem) => { + if (elem == 10) throw TE("") else elem + }) + .withAttributes(supervisionStrategy(resumingDecider)) + .runWith(Sink.asPublisher(false)) + val c = TestSubscriber.manualProbe[Int]() + + p.subscribe(c) + val sub = c.expectSubscription() + + (0 to 48).foreach(i => { + sub.request(1) + c.expectNext() should ===(if (i < 10) i else i + 1) + }) + sub.request(1) + c.expectNext(50) + c.expectComplete() + closedCounter.get shouldBe 1 + } + + "close and open stream with autocloseable again when Strategy is Restart" in { + val closedCounter = new AtomicInteger(0) + val create = () => + new AutoCloseable { + override def close(): Unit = closedCounter.incrementAndGet() + } + val p = Source + .fromIterator(() => (0 to 50).iterator) + .mapWithResource(create, + (_: AutoCloseable, elem) => { + if (elem == 10 || elem == 20) throw TE("") else elem + }) + .withAttributes(supervisionStrategy(restartingDecider)) + .runWith(Sink.asPublisher(false)) + val c = TestSubscriber.manualProbe[Int]() + + p.subscribe(c) + val sub = c.expectSubscription() + + (0 to 30).filter(i => i != 10 && i != 20).foreach(i => { + sub.request(1) + c.expectNext() shouldBe i + closedCounter.get should ===(if (i < 10) 0 else if (i < 20) 1 else 2) + }) + sub.cancel() + } + + "stop stream with autoCloseable when Strategy is Stop and exception happened" in { + val closedCounter = new AtomicInteger(0) + val create = () => + new AutoCloseable { + override def close(): Unit = closedCounter.incrementAndGet() + } + val p = Source + .fromIterator(() => (0 to 50).iterator) + .mapWithResource(create, + (_: AutoCloseable, elem) => { + if (elem == 10) throw TE("") else elem + }) + .withAttributes(supervisionStrategy(stoppingDecider)) + .runWith(Sink.asPublisher(false)) + val c = TestSubscriber.manualProbe[Int]() + + p.subscribe(c) + val sub = c.expectSubscription() + + (0 to 9).foreach(i => { + sub.request(1) + c.expectNext() shouldBe i + }) + sub.request(1) + c.expectError() + closedCounter.get shouldBe 1 + } + } override def afterTermination(): Unit = { fs.close() diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index c48f246983b..3ed9bdf1505 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -828,6 +828,45 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr (resource, out) => f(resource, out), resource => close.apply(resource).toScala)) + /** + * Transform each stream element with the help of an [[AutoCloseable]] resource and close it when the stream finishes or fails. + * + * The resource creation function is invoked once when the stream is materialized and the returned resource is passed to + * the mapping function for mapping the first element. The mapping function returns a mapped element to emit + * downstream. The returned `T` MUST NOT be `null` as it is illegal as stream element - according to the Reactive Streams specification. + * + * The [[AutoCloseable]] resource is closed only once when the upstream or downstream finishes or fails. + * + * Early completion can be done with combination of the [[takeWhile]] operator. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * You can configure the default dispatcher for this Source by changing the `pekko.stream.materializer.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * '''Emits when''' the mapping function returns an element and downstream is ready to consume it + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @tparam R the type of the resource + * @tparam T the type of the output elements + * @param create function that creates the resource + * @param f function that transforms the upstream element and the resource to output element + * @since 1.1.0 + */ + def mapWithResource[R <: AutoCloseable, T]( + create: function.Creator[R], + f: function.Function2[R, Out, T]): javadsl.Flow[In, T, Mat] = + mapWithResource(create, f, + (resource: AutoCloseable) => { + resource.close() + Optional.empty() + }) + /** * Transform each input element into an `Iterable` of output elements that is * then flattened into the output stream. The transformation is meant to be stateful, diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index 43125ade66e..b8d521a4209 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -2541,6 +2541,45 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ (resource, out) => f(resource, out), resource => close.apply(resource).toScala)) + /** + * Transform each stream element with the help of an [[AutoCloseable]] resource and close it when the stream finishes or fails. + * + * The resource creation function is invoked once when the stream is materialized and the returned resource is passed to + * the mapping function for mapping the first element. The mapping function returns a mapped element to emit + * downstream. The returned `T` MUST NOT be `null` as it is illegal as stream element - according to the Reactive Streams specification. + * + * The [[AutoCloseable]] resource is closed only once when the upstream or downstream finishes or fails. + * + * Early completion can be done with combination of the [[takeWhile]] operator. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * You can configure the default dispatcher for this Source by changing the `pekko.stream.materializer.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * '''Emits when''' the mapping function returns an element and downstream is ready to consume it + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @tparam R the type of the resource + * @tparam T the type of the output elements + * @param create function that creates the resource + * @param f function that transforms the upstream element and the resource to output element + * @since 1.1.0 + */ + def mapWithResource[R <: AutoCloseable, T]( + create: function.Creator[R], + f: function.Function2[R, Out, T]): javadsl.Source[T, Mat] = + mapWithResource(create, f, + (resource: AutoCloseable) => { + resource.close() + Optional.empty() + }) + /** * Transform each input element into an `Iterable` of output elements that is * then flattened into the output stream. The transformation is meant to be stateful, diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index 8fa5662ab87..fc258e512ba 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -285,6 +285,45 @@ class SubFlow[In, Out, Mat]( (resource, out) => f(resource, out), resource => close.apply(resource).toScala)) + /** + * Transform each stream element with the help of an [[AutoCloseable]] resource and close it when the stream finishes or fails. + * + * The resource creation function is invoked once when the stream is materialized and the returned resource is passed to + * the mapping function for mapping the first element. The mapping function returns a mapped element to emit + * downstream. The returned `T` MUST NOT be `null` as it is illegal as stream element - according to the Reactive Streams specification. + * + * The [[AutoCloseable]] resource is closed only once when the upstream or downstream finishes or fails. + * + * Early completion can be done with combination of the [[takeWhile]] operator. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * You can configure the default dispatcher for this Source by changing the `pekko.stream.materializer.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * '''Emits when''' the mapping function returns an element and downstream is ready to consume it + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @tparam R the type of the resource + * @tparam T the type of the output elements + * @param create function that creates the resource + * @param f function that transforms the upstream element and the resource to output element + * @since 1.1.0 + */ + def mapWithResource[R <: AutoCloseable, T]( + create: function.Creator[R], + f: function.Function2[R, Out, T]): javadsl.SubFlow[In, T, Mat] = + mapWithResource(create, f, + (resource: AutoCloseable) => { + resource.close() + Optional.empty() + }) + /** * Transform each input element into an `Iterable` of output elements that is * then flattened into the output stream. The transformation is meant to be stateful, diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index f64f003a9c5..340ea3063cb 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -276,6 +276,45 @@ class SubSource[Out, Mat]( (resource, out) => f(resource, out), resource => close.apply(resource).toScala)) + /** + * Transform each stream element with the help of an [[AutoCloseable]] resource and close it when the stream finishes or fails. + * + * The resource creation function is invoked once when the stream is materialized and the returned resource is passed to + * the mapping function for mapping the first element. The mapping function returns a mapped element to emit + * downstream. The returned `T` MUST NOT be `null` as it is illegal as stream element - according to the Reactive Streams specification. + * + * The [[AutoCloseable]] resource is closed only once when the upstream or downstream finishes or fails. + * + * Early completion can be done with combination of the [[takeWhile]] operator. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * You can configure the default dispatcher for this Source by changing the `pekko.stream.materializer.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * '''Emits when''' the mapping function returns an element and downstream is ready to consume it + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @tparam R the type of the resource + * @tparam T the type of the output elements + * @param create function that creates the resource + * @param f function that transforms the upstream element and the resource to output element + * @since 1.1.0 + */ + def mapWithResource[R <: AutoCloseable, T]( + create: function.Creator[R], + f: function.Function2[R, Out, T]): javadsl.SubSource[T, Mat] = + mapWithResource(create, f, + (resource: AutoCloseable) => { + resource.close() + Optional.empty() + }) + /** * Transform each input element into an `Iterable` of output elements that is * then flattened into the output stream. The transformation is meant to be stateful, diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index d328a8ebb2c..0b73b0b90a0 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -1142,6 +1142,43 @@ trait FlowOps[+Out, +Mat] { resource => close(resource)) .withAttributes(DefaultAttributes.mapWithResource)) + /** + * Transform each stream element with the help of an [[AutoCloseable]] resource and close it when the stream finishes or fails. + * + * The resource creation function is invoked once when the stream is materialized and the returned resource is passed to + * the mapping function for mapping the first element. The mapping function returns a mapped element to emit + * downstream. The returned `T` MUST NOT be `null` as it is illegal as stream element - according to the Reactive Streams specification. + * + * The [[AutoCloseable]] resource is closed only once when the upstream or downstream finishes or fails. + * + * Early completion can be done with combination of the [[takeWhile]] operator. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * You can configure the default dispatcher for this Source by changing the `pekko.stream.materializer.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * '''Emits when''' the mapping function returns an element and downstream is ready to consume it + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @tparam R the type of the resource + * @tparam T the type of the output elements + * @param create function that creates the resource + * @param f function that transforms the upstream element and the resource to output element + * @since 1.1.0 + */ + def mapWithResource[R <: AutoCloseable, T](create: () => R, f: (R, Out) => T): Repr[T] = + mapWithResource(create)(f, + (resource: AutoCloseable) => { + resource.close() + None + }) + /** * Transform each input element into an `Iterable` of output elements that is * then flattened into the output stream. The transformation is meant to be stateful,