Skip to content

Commit

Permalink
feat: Add AutoCloseable shortcut on mapWithResource (#1053)
Browse files Browse the repository at this point in the history
* feat: Add AutoCloseable shortcut on mapWithResource

* Enhance test to check resource is closed after stream is completed

* Enhance comment

* Update doc

* Address comment

* Add resume, restart, stop strategy test

* Address comment

* Fix doc

* Fix typo
  • Loading branch information
injae-kim authored Feb 1, 2024
1 parent 7dd5d73 commit 7de1fb2
Show file tree
Hide file tree
Showing 9 changed files with 420 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@ Map elements with the help of a resource that can be opened, transform each elem

## Signature

@apidoc[Flow.mapWithResource](Flow) { scala="#mapWithResource%5BS%2C%20T%5D%28create%3A%20%28%29%20%3D%3E%20S%29%28f%3A%20%28S%2C%20Out%29%20%3D%3E%20T%2C%20close%3A%20S%20%3D%3E%20Option%5BT%5D%29%3A%20Repr%5BT%5D" java="#mapWithResource(org.apache.pekko.japi.function.Creator,org.apache.pekko.japi.function.Function2,org.apache.pekko.japi.function.Function)" }
@apidoc[Flow.mapWithResource](Flow) { scala="#mapWithResource[S,T](create:()=%3ES)(f:(S,Out)=%3ET,close:S=%3EOption[T]):Repr[T]" java="#mapWithResource(org.apache.pekko.japi.function.Creator,org.apache.pekko.japi.function.Function2,org.apache.pekko.japi.function.Function)" }
1. `create`: Open or Create the resource.
2. `f`: Transform each element inputs with the help of resource.
3. `close`: Close the resource, invoked on end of stream or if the stream fails, optionally outputting a last element.

@apidoc[Flow.mapWithResource](Flow) { scala="#mapWithResource[S%3C:AutoCloseable,T](create:()=%3ES,f:(S,Out)=%3ET):Repr[T]" java="#mapWithResource(org.apache.pekko.japi.function.Creator,org.apache.pekko.japi.function.Function2)" }
1. `create`: Open or Create the autocloseable resource.
2. `f`: Transform each element inputs with the help of resource.

## Description

Transform each stream element with the help of a resource.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand Down Expand Up @@ -237,6 +238,21 @@ public void mustBeAbleToUseMapWithResource() {
Assert.assertFalse(gate.get());
}

@Test
public void mustBeAbleToUseMapWithAutoCloseableResource() {
final TestKit probe = new TestKit(system);
final AtomicInteger closed = new AtomicInteger();
Source.from(Arrays.asList("1", "2", "3"))
.via(
Flow.of(String.class)
.mapWithResource(
() -> (AutoCloseable) closed::incrementAndGet, (resource, elem) -> elem))
.runWith(Sink.foreach(elem -> probe.getRef().tell(elem, ActorRef.noSender())), system);

probe.expectMsgAllOf("1", "2", "3");
Assert.assertEquals(closed.get(), 1);
}

@Test
public void mustBeAbleToUseFoldWhile() throws Exception {
final int result =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
Expand Down Expand Up @@ -815,6 +816,18 @@ public void mustBeAbleToUseMapWithResource() {
Assert.assertFalse(gate.get());
}

@Test
public void mustBeAbleToUseMapWithAutoCloseableResource() {
final TestKit probe = new TestKit(system);
final AtomicInteger closed = new AtomicInteger();
Source.from(Arrays.asList("1", "2", "3"))
.mapWithResource(() -> (AutoCloseable) closed::incrementAndGet, (resource, elem) -> elem)
.runWith(Sink.foreach(elem -> probe.getRef().tell(elem, ActorRef.noSender())), system);

probe.expectMsgAllOf("1", "2", "3");
Assert.assertEquals(closed.get(), 1);
}

@Test
public void mustBeAbleToUseFoldWhile() throws Exception {
final int result =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.pekko
import pekko.Done
import pekko.stream.{ AbruptTerminationException, ActorAttributes, ActorMaterializer, SystemMaterializer }
import pekko.stream.ActorAttributes.supervisionStrategy
import pekko.stream.Supervision.{ restartingDecider, resumingDecider }
import pekko.stream.Supervision.{ restartingDecider, resumingDecider, stoppingDecider }
import pekko.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor }
import pekko.stream.impl.StreamSupervisor.Children
import pekko.stream.testkit.{ StreamSpec, TestSubscriber }
Expand Down Expand Up @@ -410,6 +410,198 @@ class FlowMapWithResourceSpec extends StreamSpec(UnboundedMailboxConfig) {
Await.result(promise.future, 3.seconds) shouldBe Done
}

"will close the autocloseable resource when upstream complete" in {
val closedCounter = new AtomicInteger(0)
val create = () =>
new AutoCloseable {
override def close(): Unit = closedCounter.incrementAndGet()
}
val (pub, sub) = TestSource
.probe[Int]
.mapWithResource(create, (_: AutoCloseable, count) => count)
.toMat(TestSink.probe)(Keep.both)
.run()
sub.expectSubscription().request(2)
closedCounter.get shouldBe 0
pub.sendNext(1)
sub.expectNext(1)
closedCounter.get shouldBe 0
pub.sendComplete()
sub.expectComplete()
closedCounter.get shouldBe 1
}

"will close the autocloseable resource when upstream fail" in {
val closedCounter = new AtomicInteger(0)
val create = () =>
new AutoCloseable {
override def close(): Unit = closedCounter.incrementAndGet()
}
val (pub, sub) = TestSource
.probe[Int]
.mapWithResource(create, (_: AutoCloseable, count) => count)
.toMat(TestSink.probe)(Keep.both)
.run()
sub.expectSubscription().request(2)
closedCounter.get shouldBe 0
pub.sendNext(1)
sub.expectNext(1)
closedCounter.get shouldBe 0
pub.sendError(ex)
sub.expectError(ex)
closedCounter.get shouldBe 1
}

"will close the autocloseable resource when downstream cancel" in {
val closedCounter = new AtomicInteger(0)
val create = () =>
new AutoCloseable {
override def close(): Unit = closedCounter.incrementAndGet()
}
val (pub, sub) = TestSource
.probe[Int]
.mapWithResource(create, (_: AutoCloseable, count) => count)
.toMat(TestSink.probe)(Keep.both)
.run()
val subscription = sub.expectSubscription()
subscription.request(2)
closedCounter.get shouldBe 0
pub.sendNext(1)
sub.expectNext(1)
closedCounter.get shouldBe 0
subscription.cancel()
pub.expectCancellation()
closedCounter.get shouldBe 1
}

"will close the autocloseable resource when downstream fail" in {
val closedCounter = new AtomicInteger(0)
val create = () =>
new AutoCloseable {
override def close(): Unit = closedCounter.incrementAndGet()
}
val (pub, sub) = TestSource
.probe[Int]
.mapWithResource(create, (_: AutoCloseable, count) => count)
.toMat(TestSink.probe)(Keep.both)
.run()
sub.request(2)
closedCounter.get shouldBe 0
pub.sendNext(1)
sub.expectNext(1)
closedCounter.get shouldBe 0
sub.cancel(ex)
pub.expectCancellationWithCause(ex)
closedCounter.get shouldBe 1
}

"will close the autocloseable resource on abrupt materializer termination" in {
val closedCounter = new AtomicInteger(0)
@nowarn("msg=deprecated")
val mat = ActorMaterializer()
val promise = Promise[Done]()
val create = () =>
new AutoCloseable {
override def close(): Unit = {
closedCounter.incrementAndGet()
promise.complete(Success(Done))
}
}
val matVal = Source
.single(1)
.mapWithResource(create, (_: AutoCloseable, count) => count)
.runWith(Sink.never)(mat)
closedCounter.get shouldBe 0
mat.shutdown()
matVal.failed.futureValue shouldBe an[AbruptTerminationException]
Await.result(promise.future, 3.seconds) shouldBe Done
closedCounter.get shouldBe 1
}

"continue with autoCloseable when Strategy is Resume and exception happened" in {
val closedCounter = new AtomicInteger(0)
val create = () =>
new AutoCloseable {
override def close(): Unit = closedCounter.incrementAndGet()
}
val p = Source
.fromIterator(() => (0 to 50).iterator)
.mapWithResource(create,
(_: AutoCloseable, elem) => {
if (elem == 10) throw TE("") else elem
})
.withAttributes(supervisionStrategy(resumingDecider))
.runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[Int]()

p.subscribe(c)
val sub = c.expectSubscription()

(0 to 48).foreach(i => {
sub.request(1)
c.expectNext() should ===(if (i < 10) i else i + 1)
})
sub.request(1)
c.expectNext(50)
c.expectComplete()
closedCounter.get shouldBe 1
}

"close and open stream with autocloseable again when Strategy is Restart" in {
val closedCounter = new AtomicInteger(0)
val create = () =>
new AutoCloseable {
override def close(): Unit = closedCounter.incrementAndGet()
}
val p = Source
.fromIterator(() => (0 to 50).iterator)
.mapWithResource(create,
(_: AutoCloseable, elem) => {
if (elem == 10 || elem == 20) throw TE("") else elem
})
.withAttributes(supervisionStrategy(restartingDecider))
.runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[Int]()

p.subscribe(c)
val sub = c.expectSubscription()

(0 to 30).filter(i => i != 10 && i != 20).foreach(i => {
sub.request(1)
c.expectNext() shouldBe i
closedCounter.get should ===(if (i < 10) 0 else if (i < 20) 1 else 2)
})
sub.cancel()
}

"stop stream with autoCloseable when Strategy is Stop and exception happened" in {
val closedCounter = new AtomicInteger(0)
val create = () =>
new AutoCloseable {
override def close(): Unit = closedCounter.incrementAndGet()
}
val p = Source
.fromIterator(() => (0 to 50).iterator)
.mapWithResource(create,
(_: AutoCloseable, elem) => {
if (elem == 10) throw TE("") else elem
})
.withAttributes(supervisionStrategy(stoppingDecider))
.runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[Int]()

p.subscribe(c)
val sub = c.expectSubscription()

(0 to 9).foreach(i => {
sub.request(1)
c.expectNext() shouldBe i
})
sub.request(1)
c.expectError()
closedCounter.get shouldBe 1
}

}
override def afterTermination(): Unit = {
fs.close()
Expand Down
39 changes: 39 additions & 0 deletions stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,45 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
(resource, out) => f(resource, out),
resource => close.apply(resource).toScala))

/**
* Transform each stream element with the help of an [[AutoCloseable]] resource and close it when the stream finishes or fails.
*
* The resource creation function is invoked once when the stream is materialized and the returned resource is passed to
* the mapping function for mapping the first element. The mapping function returns a mapped element to emit
* downstream. The returned `T` MUST NOT be `null` as it is illegal as stream element - according to the Reactive Streams specification.
*
* The [[AutoCloseable]] resource is closed only once when the upstream or downstream finishes or fails.
*
* Early completion can be done with combination of the [[takeWhile]] operator.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* You can configure the default dispatcher for this Source by changing the `pekko.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* '''Emits when''' the mapping function returns an element and downstream is ready to consume it
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @tparam R the type of the resource
* @tparam T the type of the output elements
* @param create function that creates the resource
* @param f function that transforms the upstream element and the resource to output element
* @since 1.1.0
*/
def mapWithResource[R <: AutoCloseable, T](
create: function.Creator[R],
f: function.Function2[R, Out, T]): javadsl.Flow[In, T, Mat] =
mapWithResource(create, f,
(resource: AutoCloseable) => {
resource.close()
Optional.empty()
})

/**
* Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream. The transformation is meant to be stateful,
Expand Down
39 changes: 39 additions & 0 deletions stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2541,6 +2541,45 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
(resource, out) => f(resource, out),
resource => close.apply(resource).toScala))

/**
* Transform each stream element with the help of an [[AutoCloseable]] resource and close it when the stream finishes or fails.
*
* The resource creation function is invoked once when the stream is materialized and the returned resource is passed to
* the mapping function for mapping the first element. The mapping function returns a mapped element to emit
* downstream. The returned `T` MUST NOT be `null` as it is illegal as stream element - according to the Reactive Streams specification.
*
* The [[AutoCloseable]] resource is closed only once when the upstream or downstream finishes or fails.
*
* Early completion can be done with combination of the [[takeWhile]] operator.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* You can configure the default dispatcher for this Source by changing the `pekko.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* '''Emits when''' the mapping function returns an element and downstream is ready to consume it
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @tparam R the type of the resource
* @tparam T the type of the output elements
* @param create function that creates the resource
* @param f function that transforms the upstream element and the resource to output element
* @since 1.1.0
*/
def mapWithResource[R <: AutoCloseable, T](
create: function.Creator[R],
f: function.Function2[R, Out, T]): javadsl.Source[T, Mat] =
mapWithResource(create, f,
(resource: AutoCloseable) => {
resource.close()
Optional.empty()
})

/**
* Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream. The transformation is meant to be stateful,
Expand Down
Loading

0 comments on commit 7de1fb2

Please sign in to comment.