Skip to content

Commit f6da401

Browse files
authored
feat: Add Sink#forall operator (#989)
Co-authored-by: Jiafu Tang <[email protected]>
1 parent aa10e9b commit f6da401

File tree

9 files changed

+240
-22
lines changed

9 files changed

+240
-22
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
# Sink.forall
2+
3+
A `Sink` that will test the given predicate `p` for every received element and completes with the result.
4+
5+
@ref[Sink operators](../index.md#sink-operators)
6+
7+
## Signature
8+
9+
@apidoc[Sink.forall](Sink$) { scala="#forall[T](p:T=%3EBoolean):org.apache.pekko.stream.scaladsl.Sink[T,scala.concurrent.Future[Boolean]]" java="#forall(org.apache.pekko.japi.function.Predicate)" }
10+
11+
## Description
12+
forall applies a predicate function to assert each element received, it returns true if all elements satisfy the assertion, otherwise it returns false.
13+
14+
It materializes into a `Future` (in Scala) or a `CompletionStage` (in Java) that completes with the last state when the stream has finished.
15+
16+
Notes that if source is empty, it will return true
17+
18+
A `Sink` that will test the given predicate `p` for every received element and
19+
20+
- completes and returns @scala[`Future`] @java[`CompletionStage`] of `true` if the predicate is true for all elements;
21+
- completes and returns @scala[`Future`] @java[`CompletionStage`] of `true` if the stream is empty (i.e. completes before signalling any elements);
22+
- completes and returns @scala[`Future`] @java[`CompletionStage`] of `false` if the predicate is false for any element.
23+
24+
The materialized value @scala[`Future`] @java[`CompletionStage`] will be completed with the value `true` or `false`
25+
when the input stream ends, or completed with `Failure` if there is a failure signaled in the stream.
26+
27+
## Example
28+
29+
This example tests all elements in the stream is `<=` 100.
30+
31+
Scala
32+
: @@snip [ForAll.scala](/docs/src/test/scala/docs/stream/operators/sink/ForAll.scala) { #forall }
33+
34+
Java
35+
: @@snip [ForAll.java](/docs/src/test/java/jdocs/stream/operators/sink/ForAll.java) { #forall }
36+
37+
## Reactive Streams Semantics
38+
39+
@@@div { .callout }
40+
41+
***Completes*** when upstream completes or the predicate `p` returns `false`
42+
43+
**cancels** when predicate `p` returns `false`
44+
45+
**backpressures** when the invocation of predicate `p` has not yet completed
46+
47+
@@@

docs/src/main/paradox/stream/operators/index.md

+2
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ These built-in sinks are available from @scala[`org.apache.pekko.stream.scaladsl
6262
|Sink|<a name="completionstagesink"></a>@ref[completionStageSink](Sink/completionStageSink.md)|Streams the elements to the given future sink once it successfully completes. |
6363
|Sink|<a name="fold"></a>@ref[fold](Sink/fold.md)|Fold over emitted elements with a function, where each invocation will get the new element and the result from the previous fold invocation.|
6464
|Sink|<a name="foldwhile"></a>@ref[foldWhile](Sink/foldWhile.md)|Fold over emitted elements with a function, where each invocation will get the new element and the result from the previous fold invocation.|
65+
|Sink|<a name="forall"></a>@ref[forall](Sink/forall.md)|A `Sink` that will test the given predicate `p` for every received element and completes with the result.|
6566
|Sink|<a name="foreach"></a>@ref[foreach](Sink/foreach.md)|Invoke a given procedure for each element received.|
6667
|Sink|<a name="foreachasync"></a>@ref[foreachAsync](Sink/foreachAsync.md)|Invoke a given procedure asynchronously for each element received.|
6768
|Sink|<a name="foreachparallel"></a>@ref[foreachParallel](Sink/foreachParallel.md)|Like `foreach` but allows up to `parallellism` procedure calls to happen in parallel.|
@@ -459,6 +460,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
459460
* [foldAsync](Source-or-Flow/foldAsync.md)
460461
* [foldWhile](Source-or-Flow/foldWhile.md)
461462
* [foldWhile](Sink/foldWhile.md)
463+
* [forall](Sink/forall.md)
462464
* [foreach](Sink/foreach.md)
463465
* [foreachAsync](Sink/foreachAsync.md)
464466
* [foreachParallel](Sink/foreachParallel.md)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package jdocs.stream.operators.sink;
19+
20+
import org.apache.pekko.actor.ActorSystem;
21+
import org.apache.pekko.stream.javadsl.Sink;
22+
import org.apache.pekko.stream.javadsl.Source;
23+
24+
import java.util.concurrent.TimeUnit;
25+
26+
public class ForAll {
27+
private ActorSystem system = null;
28+
29+
public void forAllUsage() throws Exception {
30+
// #forall
31+
final boolean allMatch =
32+
Source.range(1, 100)
33+
.runWith(Sink.forall(elem -> elem <= 100), system)
34+
.toCompletableFuture()
35+
.get(3, TimeUnit.SECONDS);
36+
System.out.println(allMatch);
37+
// Expect prints:
38+
// true
39+
// #forall
40+
}
41+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package docs.stream.operators.sink
19+
20+
import org.apache.pekko.actor.ActorSystem
21+
import org.apache.pekko.stream.scaladsl.{ Sink, Source }
22+
23+
import scala.concurrent.duration.DurationInt
24+
import scala.concurrent.{ Await, ExecutionContextExecutor, Future }
25+
26+
object ForAll {
27+
implicit val system: ActorSystem = ???
28+
implicit val ec: ExecutionContextExecutor = system.dispatcher
29+
def foldExample: Unit = {
30+
// #forall
31+
val result: Future[Boolean] =
32+
Source(1 to 100)
33+
.runWith(Sink.forall(_ <= 100))
34+
val allMatch = Await.result(result, 3.seconds)
35+
println(allMatch)
36+
// Expect prints:
37+
// true
38+
// #forall
39+
}
40+
}

stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java

+9
Original file line numberDiff line numberDiff line change
@@ -236,4 +236,13 @@ public void sinkForeachMustBeDocumented()
236236
// #foreach
237237
assertEquals(Done.done(), done);
238238
}
239+
240+
@Test
241+
public void sinkMustBeAbleToUseForall()
242+
throws InterruptedException, ExecutionException, TimeoutException {
243+
CompletionStage<Boolean> cs =
244+
Source.from(Arrays.asList(1, 2, 3, 4)).runWith(Sink.forall(param -> param > 0), system);
245+
boolean allMatch = cs.toCompletableFuture().get(100, TimeUnit.MILLISECONDS);
246+
assertTrue(allMatch);
247+
}
239248
}

stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala

+42
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import scala.concurrent.duration._
2020
import org.apache.pekko
2121
import pekko.Done
2222
import pekko.stream._
23+
import pekko.stream.ActorAttributes.supervisionStrategy
2324
import pekko.stream.testkit._
2425
import pekko.stream.testkit.scaladsl.{ TestSink, TestSource }
2526
import pekko.testkit.DefaultTimeout
@@ -339,6 +340,47 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
339340
}
340341
}
341342

343+
"The forall sink" must {
344+
345+
"completes with `ture` when all elements match" in {
346+
Source(1 to 4)
347+
.runWith(Sink.forall(_ > 0))
348+
.futureValue shouldBe true
349+
}
350+
351+
"completes with `false` when any element match" in {
352+
Source(1 to 4)
353+
.runWith(Sink.forall(_ > 2))
354+
.futureValue shouldBe false
355+
}
356+
357+
"completes with `true` if the stream is empty" in {
358+
Source.empty[Int]
359+
.runWith(Sink.forall(_ > 2))
360+
.futureValue shouldBe true
361+
}
362+
363+
"completes with `Failure` if the stream failed" in {
364+
Source.failed[Int](new RuntimeException("Oops"))
365+
.runWith(Sink.forall(_ > 2))
366+
.failed.futureValue shouldBe a[RuntimeException]
367+
}
368+
369+
"completes with `true` with restart strategy" in {
370+
val sink = Sink.forall[Int](elem => {
371+
if (elem == 2) {
372+
throw new RuntimeException("Oops")
373+
}
374+
elem > 0
375+
}).withAttributes(supervisionStrategy(Supervision.restartingDecider))
376+
377+
Source(1 to 2)
378+
.runWith(sink)
379+
.futureValue shouldBe true
380+
}
381+
382+
}
383+
342384
"Sink pre-materialization" must {
343385
"materialize the sink and wrap its exposed publisher in a Source" in {
344386
val publisherSink: Sink[String, Publisher[String]] = Sink.asPublisher[String](false)

stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ package org.apache.pekko.stream.impl.fusing
1515

1616
import java.util.concurrent.TimeUnit.NANOSECONDS
1717

18-
import scala.annotation.{ nowarn, tailrec }
18+
import scala.annotation.nowarn
19+
import scala.annotation.tailrec
1920
import scala.collection.immutable
2021
import scala.collection.immutable.VectorBuilder
2122
import scala.concurrent.Future

stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala

+28-9
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,7 @@
1414
package org.apache.pekko.stream.javadsl
1515

1616
import java.util.Optional
17-
import java.util.concurrent.CompletableFuture
18-
import java.util.concurrent.CompletionStage
17+
import java.util.concurrent.{ CompletableFuture, CompletionStage }
1918
import java.util.function.BiFunction
2019
import java.util.stream.Collector
2120

@@ -27,22 +26,17 @@ import scala.util.Try
2726

2827
import org.apache.pekko
2928
import pekko._
30-
import pekko.actor.ActorRef
31-
import pekko.actor.ClassicActorSystemProvider
32-
import pekko.actor.Status
29+
import pekko.actor.{ ActorRef, ClassicActorSystemProvider, Status }
3330
import pekko.dispatch.ExecutionContexts
3431
import pekko.japi.{ function, Util }
3532
import pekko.japi.function.Creator
3633
import pekko.stream._
3734
import pekko.stream.impl.LinearTraversalBuilder
38-
import pekko.stream.javadsl
39-
import pekko.stream.scaladsl
4035
import pekko.stream.scaladsl.SinkToCompletionStage
4136
import pekko.util.FutureConverters._
4237
import pekko.util.OptionConverters._
4338

44-
import org.reactivestreams.Publisher
45-
import org.reactivestreams.Subscriber
39+
import org.reactivestreams.{ Publisher, Subscriber }
4640

4741
/** Java API */
4842
object Sink {
@@ -80,6 +74,31 @@ object Sink {
8074
f: function.Function2[U, In, CompletionStage[U]]): javadsl.Sink[In, CompletionStage[U]] =
8175
new Sink(scaladsl.Sink.foldAsync[U, In](zero)(f(_, _).asScala).toCompletionStage())
8276

77+
/**
78+
* A `Sink` that will test the given predicate `p` for every received element and
79+
* 1. completes and returns [[java.util.concurrent.CompletionStage]] of `true` if the predicate is true for all elements;
80+
* 2. completes and returns [[java.util.concurrent.CompletionStage]] of `true` if the stream is empty (i.e. completes before signalling any elements);
81+
* 3. completes and returns [[java.util.concurrent.CompletionStage]] of `false` if the predicate is false for any element.
82+
*
83+
* The materialized value [[java.util.concurrent.CompletionStage]] will be completed with the value `true` or `false`
84+
* when the input stream ends, or completed with `Failure` if there is a failure signaled in the stream.
85+
*
86+
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
87+
*
88+
* '''Completes when''' upstream completes or the predicate `p` returns `false`
89+
*
90+
* '''Backpressures when''' the invocation of predicate `p` has not yet completed
91+
*
92+
* '''Cancels when''' predicate `p` returns `false`
93+
*
94+
* @since 1.1.0
95+
*/
96+
def forall[In](p: function.Predicate[In]): javadsl.Sink[In, CompletionStage[java.lang.Boolean]] = {
97+
import pekko.util.FutureConverters._
98+
new Sink(scaladsl.Sink.forall[In](p.test)
99+
.mapMaterializedValue(_.map(Boolean.box)(ExecutionContexts.parasitic).asJava))
100+
}
101+
83102
/**
84103
* Creates a sink which materializes into a ``CompletionStage`` which will be completed with a result of the Java ``Collector``
85104
* transformation and reduction operations. This allows usage of Java streams transformations for reactive streams.

stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala

+29-12
Original file line numberDiff line numberDiff line change
@@ -16,29 +16,22 @@ package org.apache.pekko.stream.scaladsl
1616
import scala.annotation.{ nowarn, tailrec }
1717
import scala.annotation.unchecked.uncheckedVariance
1818
import scala.collection.immutable
19-
import scala.concurrent.ExecutionContext
20-
import scala.concurrent.Future
21-
import scala.util.Failure
22-
import scala.util.Success
23-
import scala.util.Try
19+
import scala.concurrent.{ ExecutionContext, Future }
20+
import scala.util.{ Failure, Success, Try }
2421

2522
import org.apache.pekko
26-
import pekko.Done
27-
import pekko.NotUsed
28-
import pekko.actor.ActorRef
29-
import pekko.actor.Status
23+
import pekko.{ util, Done, NotUsed }
24+
import pekko.actor.{ ActorRef, Status }
3025
import pekko.annotation.InternalApi
3126
import pekko.dispatch.ExecutionContexts
3227
import pekko.stream._
3328
import pekko.stream.impl._
3429
import pekko.stream.impl.Stages.DefaultAttributes
3530
import pekko.stream.impl.fusing.GraphStages
36-
import pekko.stream.javadsl
3731
import pekko.stream.stage._
3832
import pekko.util.ccompat._
3933

40-
import org.reactivestreams.Publisher
41-
import org.reactivestreams.Subscriber
34+
import org.reactivestreams.{ Publisher, Subscriber }
4235

4336
/**
4437
* A `Sink` is a set of stream processing steps that has one open input.
@@ -447,6 +440,30 @@ object Sink {
447440
def foldAsync[U, T](zero: U)(f: (U, T) => Future[U]): Sink[T, Future[U]] =
448441
Flow[T].foldAsync(zero)(f).toMat(Sink.head)(Keep.right).named("foldAsyncSink")
449442

443+
/**
444+
* A `Sink` that will test the given predicate `p` for every received element and
445+
* 1. completes and returns [[scala.concurrent.Future]] of `true` if the predicate is true for all elements;
446+
* 2. completes and returns [[scala.concurrent.Future]] of `true` if the stream is empty (i.e. completes before signalling any elements);
447+
* 3. completes and returns [[scala.concurrent.Future]] of `false` if the predicate is false for any element.
448+
*
449+
* The materialized value [[scala.concurrent.Future]] will be completed with the value `true` or `false`
450+
* when the input stream ends, or completed with `Failure` if there is a failure signaled in the stream.
451+
*
452+
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
453+
*
454+
* '''Completes when''' upstream completes or the predicate `p` returns `false`
455+
*
456+
* '''Backpressures when''' the invocation of predicate `p` has not yet completed
457+
*
458+
* '''Cancels when''' predicate `p` returns `false`
459+
*
460+
* @since 1.1.0
461+
*/
462+
def forall[T](p: T => Boolean): Sink[T, Future[Boolean]] =
463+
Flow[T].foldWhile(true)(util.ConstantFun.scalaIdentityFunction)(_ && p(_))
464+
.toMat(Sink.head)(Keep.right)
465+
.named("forallSink")
466+
450467
/**
451468
* A `Sink` that will invoke the given function for every received element, giving it its previous
452469
* output (from the second element) and the element as input.

0 commit comments

Comments
 (0)