Skip to content

Commit 31f0dc9

Browse files
committed
feat: Add AutoCloseable shortcut on mapWithResource
1 parent e597a70 commit 31f0dc9

File tree

8 files changed

+241
-0
lines changed

8 files changed

+241
-0
lines changed

Diff for: stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java

+16
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737

3838
import java.util.*;
3939
import java.util.concurrent.atomic.AtomicBoolean;
40+
import java.util.concurrent.atomic.AtomicInteger;
4041
import java.util.function.Supplier;
4142
import java.util.concurrent.CompletableFuture;
4243
import java.util.concurrent.CompletionStage;
@@ -237,6 +238,21 @@ public void mustBeAbleToUseMapWithResource() {
237238
Assert.assertFalse(gate.get());
238239
}
239240

241+
@Test
242+
public void mustBeAbleToUseMapWithAutoCloseableResource() {
243+
final AtomicInteger closed = new AtomicInteger();
244+
Source.from(Arrays.asList("1", "2", "3"))
245+
.via(
246+
Flow.of(String.class)
247+
.mapWithResource(
248+
() -> (AutoCloseable) closed::incrementAndGet, (resource, elem) -> elem))
249+
.runWith(TestSink.create(system), system)
250+
.request(4)
251+
.expectNext("1", "2", "3")
252+
.expectComplete();
253+
Assert.assertEquals(closed.get(), 3);
254+
}
255+
240256
@Test
241257
public void mustBeAbleToUseFoldWhile() throws Exception {
242258
final int result =

Diff for: stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java

+13
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import java.util.*;
4848
import java.util.concurrent.*;
4949
import java.util.concurrent.atomic.AtomicBoolean;
50+
import java.util.concurrent.atomic.AtomicInteger;
5051
import java.util.stream.Collectors;
5152
import java.util.stream.IntStream;
5253
import java.util.stream.Stream;
@@ -815,6 +816,18 @@ public void mustBeAbleToUseMapWithResource() {
815816
Assert.assertFalse(gate.get());
816817
}
817818

819+
@Test
820+
public void mustBeAbleToUseMapWithAutoCloseableResource() {
821+
final AtomicInteger closed = new AtomicInteger();
822+
Source.from(Arrays.asList("1", "2", "3"))
823+
.mapWithResource(() -> (AutoCloseable) closed::incrementAndGet, (resource, elem) -> elem)
824+
.runWith(TestSink.create(system), system)
825+
.request(4)
826+
.expectNext("1", "2", "3")
827+
.expectComplete();
828+
Assert.assertEquals(closed.get(), 3);
829+
}
830+
818831
@Test
819832
public void mustBeAbleToUseFoldWhile() throws Exception {
820833
final int result =

Diff for: stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala

+19
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,25 @@ class FlowMapWithResourceSpec extends StreamSpec(UnboundedMailboxConfig) {
410410
Await.result(promise.future, 3.seconds) shouldBe Done
411411
}
412412

413+
"will close the autocloseable resource" in {
414+
val closedCounter = new AtomicInteger(0)
415+
val create = () =>
416+
new AutoCloseable {
417+
override def close(): Unit = closedCounter.incrementAndGet()
418+
}
419+
val (pub, sub) = TestSource
420+
.probe[Int]
421+
.mapWithResource(create, (_, count) => count)
422+
.toMat(TestSink.probe)(Keep.both)
423+
.run()
424+
sub.expectSubscription().request(2)
425+
pub.sendNext(1)
426+
sub.expectNext(1)
427+
pub.sendComplete()
428+
sub.expectComplete()
429+
closedCounter.get shouldBe 1
430+
}
431+
413432
}
414433
override def afterTermination(): Unit = {
415434
fs.close()

Diff for: stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala

+39
Original file line numberDiff line numberDiff line change
@@ -828,6 +828,45 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
828828
(resource, out) => f(resource, out),
829829
resource => close.apply(resource).toScala))
830830

831+
/**
832+
* Transform each stream element with the help of a [[AutoCloseable]] resource and close it.
833+
*
834+
* The resource creation function is invoked once when the stream is materialized and the returned resource is passed to
835+
* the mapping function for mapping the first element. The mapping function returns a mapped element to emit
836+
* downstream. The returned `T` MUST NOT be `null` as it is illegal as stream element - according to the Reactive Streams specification.
837+
*
838+
* The [[AutoCloseable]] resource is closed only once when the upstream or downstream finishes or fails.
839+
*
840+
* Early completion can be done with combination of the [[takeWhile]] operator.
841+
*
842+
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
843+
*
844+
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
845+
* set it for a given Source by using [[ActorAttributes]].
846+
*
847+
* '''Emits when''' the mapping function returns an element and downstream is ready to consume it
848+
*
849+
* '''Backpressures when''' downstream backpressures
850+
*
851+
* '''Completes when''' upstream completes
852+
*
853+
* '''Cancels when''' downstream cancels
854+
*
855+
* @tparam R the type of the resource
856+
* @tparam T the type of the output elements
857+
* @param create function that creates the resource
858+
* @param f function that transforms the upstream element and the resource to output element
859+
* @since 1.1.0
860+
*/
861+
def mapWithResource[R <: AutoCloseable, T](
862+
create: function.Creator[R],
863+
f: function.Function2[R, Out, T]): javadsl.Flow[In, T, Mat] =
864+
mapWithResource(create, f,
865+
(resource: AutoCloseable) => {
866+
resource.close()
867+
Optional.empty()
868+
})
869+
831870
/**
832871
* Transform each input element into an `Iterable` of output elements that is
833872
* then flattened into the output stream. The transformation is meant to be stateful,

Diff for: stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala

+39
Original file line numberDiff line numberDiff line change
@@ -2541,6 +2541,45 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
25412541
(resource, out) => f(resource, out),
25422542
resource => close.apply(resource).toScala))
25432543

2544+
/**
2545+
* Transform each stream element with the help of a [[AutoCloseable]] resource and close it.
2546+
*
2547+
* The resource creation function is invoked once when the stream is materialized and the returned resource is passed to
2548+
* the mapping function for mapping the first element. The mapping function returns a mapped element to emit
2549+
* downstream. The returned `T` MUST NOT be `null` as it is illegal as stream element - according to the Reactive Streams specification.
2550+
*
2551+
* The [[AutoCloseable]] resource is closed only once when the upstream or downstream finishes or fails.
2552+
*
2553+
* Early completion can be done with combination of the [[takeWhile]] operator.
2554+
*
2555+
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
2556+
*
2557+
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
2558+
* set it for a given Source by using [[ActorAttributes]].
2559+
*
2560+
* '''Emits when''' the mapping function returns an element and downstream is ready to consume it
2561+
*
2562+
* '''Backpressures when''' downstream backpressures
2563+
*
2564+
* '''Completes when''' upstream completes
2565+
*
2566+
* '''Cancels when''' downstream cancels
2567+
*
2568+
* @tparam R the type of the resource
2569+
* @tparam T the type of the output elements
2570+
* @param create function that creates the resource
2571+
* @param f function that transforms the upstream element and the resource to output element
2572+
* @since 1.1.0
2573+
*/
2574+
def mapWithResource[R <: AutoCloseable, T](
2575+
create: function.Creator[R],
2576+
f: function.Function2[R, Out, T]): javadsl.Source[T, Mat] =
2577+
mapWithResource(create, f,
2578+
(resource: AutoCloseable) => {
2579+
resource.close()
2580+
Optional.empty()
2581+
})
2582+
25442583
/**
25452584
* Transform each input element into an `Iterable` of output elements that is
25462585
* then flattened into the output stream. The transformation is meant to be stateful,

Diff for: stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala

+39
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,45 @@ class SubFlow[In, Out, Mat](
285285
(resource, out) => f(resource, out),
286286
resource => close.apply(resource).toScala))
287287

288+
/**
289+
* Transform each stream element with the help of a [[AutoCloseable]] resource and close it.
290+
*
291+
* The resource creation function is invoked once when the stream is materialized and the returned resource is passed to
292+
* the mapping function for mapping the first element. The mapping function returns a mapped element to emit
293+
* downstream. The returned `T` MUST NOT be `null` as it is illegal as stream element - according to the Reactive Streams specification.
294+
*
295+
* The [[AutoCloseable]] resource is closed only once when the upstream or downstream finishes or fails.
296+
*
297+
* Early completion can be done with combination of the [[takeWhile]] operator.
298+
*
299+
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
300+
*
301+
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
302+
* set it for a given Source by using [[ActorAttributes]].
303+
*
304+
* '''Emits when''' the mapping function returns an element and downstream is ready to consume it
305+
*
306+
* '''Backpressures when''' downstream backpressures
307+
*
308+
* '''Completes when''' upstream completes
309+
*
310+
* '''Cancels when''' downstream cancels
311+
*
312+
* @tparam R the type of the resource
313+
* @tparam T the type of the output elements
314+
* @param create function that creates the resource
315+
* @param f function that transforms the upstream element and the resource to output element
316+
* @since 1.1.0
317+
*/
318+
def mapWithResource[R <: AutoCloseable, T](
319+
create: function.Creator[R],
320+
f: function.Function2[R, Out, T]): javadsl.SubFlow[In, T, Mat] =
321+
mapWithResource(create, f,
322+
(resource: AutoCloseable) => {
323+
resource.close()
324+
Optional.empty()
325+
})
326+
288327
/**
289328
* Transform each input element into an `Iterable` of output elements that is
290329
* then flattened into the output stream. The transformation is meant to be stateful,

Diff for: stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala

+39
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,45 @@ class SubSource[Out, Mat](
276276
(resource, out) => f(resource, out),
277277
resource => close.apply(resource).toScala))
278278

279+
/**
280+
* Transform each stream element with the help of a [[AutoCloseable]] resource and close it.
281+
*
282+
* The resource creation function is invoked once when the stream is materialized and the returned resource is passed to
283+
* the mapping function for mapping the first element. The mapping function returns a mapped element to emit
284+
* downstream. The returned `T` MUST NOT be `null` as it is illegal as stream element - according to the Reactive Streams specification.
285+
*
286+
* The [[AutoCloseable]] resource is closed only once when the upstream or downstream finishes or fails.
287+
*
288+
* Early completion can be done with combination of the [[takeWhile]] operator.
289+
*
290+
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
291+
*
292+
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
293+
* set it for a given Source by using [[ActorAttributes]].
294+
*
295+
* '''Emits when''' the mapping function returns an element and downstream is ready to consume it
296+
*
297+
* '''Backpressures when''' downstream backpressures
298+
*
299+
* '''Completes when''' upstream completes
300+
*
301+
* '''Cancels when''' downstream cancels
302+
*
303+
* @tparam R the type of the resource
304+
* @tparam T the type of the output elements
305+
* @param create function that creates the resource
306+
* @param f function that transforms the upstream element and the resource to output element
307+
* @since 1.1.0
308+
*/
309+
def mapWithResource[R <: AutoCloseable, T](
310+
create: function.Creator[R],
311+
f: function.Function2[R, Out, T]): javadsl.SubSource[T, Mat] =
312+
mapWithResource(create, f,
313+
(resource: AutoCloseable) => {
314+
resource.close()
315+
Optional.empty()
316+
})
317+
279318
/**
280319
* Transform each input element into an `Iterable` of output elements that is
281320
* then flattened into the output stream. The transformation is meant to be stateful,

Diff for: stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala

+37
Original file line numberDiff line numberDiff line change
@@ -1142,6 +1142,43 @@ trait FlowOps[+Out, +Mat] {
11421142
resource => close(resource))
11431143
.withAttributes(DefaultAttributes.mapWithResource))
11441144

1145+
/**
1146+
* Transform each stream element with the help of a [[AutoCloseable]] resource and close it.
1147+
*
1148+
* The resource creation function is invoked once when the stream is materialized and the returned resource is passed to
1149+
* the mapping function for mapping the first element. The mapping function returns a mapped element to emit
1150+
* downstream. The returned `T` MUST NOT be `null` as it is illegal as stream element - according to the Reactive Streams specification.
1151+
*
1152+
* The [[AutoCloseable]] resource is closed only once when the upstream or downstream finishes or fails.
1153+
*
1154+
* Early completion can be done with combination of the [[takeWhile]] operator.
1155+
*
1156+
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
1157+
*
1158+
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
1159+
* set it for a given Source by using [[ActorAttributes]].
1160+
*
1161+
* '''Emits when''' the mapping function returns an element and downstream is ready to consume it
1162+
*
1163+
* '''Backpressures when''' downstream backpressures
1164+
*
1165+
* '''Completes when''' upstream completes
1166+
*
1167+
* '''Cancels when''' downstream cancels
1168+
*
1169+
* @tparam R the type of the resource
1170+
* @tparam T the type of the output elements
1171+
* @param create function that creates the resource
1172+
* @param f function that transforms the upstream element and the resource to output element
1173+
* @since 1.1.0
1174+
*/
1175+
def mapWithResource[R <: AutoCloseable, T](create: () => R, f: (R, Out) => T): Repr[T] =
1176+
mapWithResource(create)(f,
1177+
(resource: AutoCloseable) => {
1178+
resource.close()
1179+
None
1180+
})
1181+
11451182
/**
11461183
* Transform each input element into an `Iterable` of output elements that is
11471184
* then flattened into the output stream. The transformation is meant to be stateful,

0 commit comments

Comments
 (0)