diff --git a/docs/src/main/paradox/stream/operators/Sink/forall.md b/docs/src/main/paradox/stream/operators/Sink/forall.md index 978270743fe..ed79a8b2e4e 100644 --- a/docs/src/main/paradox/stream/operators/Sink/forall.md +++ b/docs/src/main/paradox/stream/operators/Sink/forall.md @@ -1,12 +1,12 @@ # Sink.forall -Apply a predicate function to assert each element received, it returns true if all elements satisfy the assertion, otherwise it returns false. +A `Sink` that will test the given predicate `p` for every received element and completes with the result. @ref[Sink operators](../index.md#sink-operators) ## Signature -@apidoc[Sink.forall](Sink$) { scala="#forall(predicate:T=>U):org.apache.pekko.stream.scaladsl.Sink[T,scala.concurrent.Future[Boolean]]" java="#forall(org.apache.pekko.japi.function.Predicate)" } +@apidoc[Sink.forall](Sink$) { scala="#forall%5BT%5D(p%3A%20T%20%3D%3E%20Boolean):org.apache.pekko.stream.scaladsl.Sink[T,scala.concurrent.Future[Boolean]]" java="#forall(org.apache.pekko.japi.function.Predicate)" } ## Description forall applies a predicate function to assert each element received, it returns true if all elements satisfy the assertion, otherwise it returns false. @@ -15,22 +15,33 @@ It materializes into a `Future` (in Scala) or a `CompletionStage` (in Java) that Notes that if source is empty, it will return true +A `Sink` that will test the given predicate `p` for every received element and + + - completes and returns @scala[`Future`] @java[`CompletionStage`] of `true` if the predicate is true for all elements; + - completes and returns @scala[`Future`] @java[`CompletionStage`] of `true` if the stream is empty (i.e. completes before signalling any elements); + - completes and returns @scala[`Future`] @java[`CompletionStage`] of `false` if the predicate is false for any element. + +The materialized value @scala[`Future`] @java[`CompletionStage`] will be completed with the value `true` or `false` +when the input stream ends, or completed with `Failure` if there is a failure signaled in the stream. + ## Example -This example reads a stream of positive integers, asserts that all numbers are greater than 0, and finally prints the assertion result. +This example tests all elements in the stream is `<=` 100. Scala -: @@snip [snip](/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala) { #forall } +: @@snip [ForAll.scala](/docs/src/test/scala/docs/stream/operators/sink/ForAll.scala) { #forall } Java -: @@snip [snip](/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java) { #forall } +: @@snip [ForAll.java](/docs/src/test/java/jdocs/stream/operators/sink/ForAll.java) { #forall } ## Reactive Streams Semantics @@@div { .callout } -**cancels** never +***Completes*** when upstream completes or the predicate `p` returns `false` + +**cancels** when predicate `p` returns `false` -**backpressures** when the predicate function invocation has not yet completed +**backpressures** when the invocation of predicate `p` has not yet completed @@@ \ No newline at end of file diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md index 27811ba87d9..d43e2556b3d 100644 --- a/docs/src/main/paradox/stream/operators/index.md +++ b/docs/src/main/paradox/stream/operators/index.md @@ -62,7 +62,7 @@ These built-in sinks are available from @scala[`org.apache.pekko.stream.scaladsl |Sink|@ref[completionStageSink](Sink/completionStageSink.md)|Streams the elements to the given future sink once it successfully completes. | |Sink|@ref[fold](Sink/fold.md)|Fold over emitted elements with a function, where each invocation will get the new element and the result from the previous fold invocation.| |Sink|@ref[foldWhile](Sink/foldWhile.md)|Fold over emitted elements with a function, where each invocation will get the new element and the result from the previous fold invocation.| -|Sink|@ref[forall](Sink/forall.md)|Apply a predicate function to assert each element received, it returns true if all elements satisfy the assertion, otherwise it returns false.| +|Sink|@ref[forall](Sink/forall.md)|A `Sink` that will test the given predicate `p` for every received element and completes with the result.| |Sink|@ref[foreach](Sink/foreach.md)|Invoke a given procedure for each element received.| |Sink|@ref[foreachAsync](Sink/foreachAsync.md)|Invoke a given procedure asynchronously for each element received.| |Sink|@ref[foreachParallel](Sink/foreachParallel.md)|Like `foreach` but allows up to `parallellism` procedure calls to happen in parallel.| diff --git a/docs/src/test/java/jdocs/stream/operators/sink/ForAll.java b/docs/src/test/java/jdocs/stream/operators/sink/ForAll.java new file mode 100644 index 00000000000..b9d2c65662f --- /dev/null +++ b/docs/src/test/java/jdocs/stream/operators/sink/ForAll.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package jdocs.stream.operators.sink; + +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.stream.javadsl.Sink; +import org.apache.pekko.stream.javadsl.Source; + +import java.util.concurrent.TimeUnit; + +public class ForAll { + private ActorSystem system = null; + + public void forAllUsage() throws Exception { + // #forall + final boolean allMatch = + Source.range(1, 100) + .runWith(Sink.forall(elem -> elem <= 100), system) + .toCompletableFuture() + .get(3, TimeUnit.SECONDS); + System.out.println(allMatch); + // Expect prints: + // true + // #forall + } +} diff --git a/docs/src/test/scala/docs/stream/operators/sink/ForAll.scala b/docs/src/test/scala/docs/stream/operators/sink/ForAll.scala new file mode 100644 index 00000000000..bef79330ea5 --- /dev/null +++ b/docs/src/test/scala/docs/stream/operators/sink/ForAll.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package docs.stream.operators.sink + +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.stream.scaladsl.{ Sink, Source } + +import scala.concurrent.duration.DurationInt +import scala.concurrent.{ Await, ExecutionContextExecutor, Future } + +object ForAll { + implicit val system: ActorSystem = ??? + implicit val ec: ExecutionContextExecutor = system.dispatcher + def foldExample: Unit = { + // #forall + val result: Future[Boolean] = + Source(1 to 100) + .runWith(Sink.forall(_ <= 100)) + val allMatch = Await.result(result, 3.seconds) + println(allMatch) + // Expect prints: + // true + // #forall + } +} diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java index b130b615883..ed4d2838674 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java @@ -238,16 +238,11 @@ public void sinkForeachMustBeDocumented() } @Test - public void sinkForallMustBeDocumented() + public void sinkMustBeAbleToUseForall() throws InterruptedException, ExecutionException, TimeoutException { - // #forall - Sink> forallSink = Sink.forall(param -> param > 0); CompletionStage cs = - Source.from(Arrays.asList(1, 2, 3, 4)).runWith(forallSink, system); - Boolean predicate = cs.toCompletableFuture().get(100, TimeUnit.MILLISECONDS); - // will print - // true - // #forall - assertEquals(predicate, true); + Source.from(Arrays.asList(1, 2, 3, 4)).runWith(Sink.forall(param -> param > 0), system); + boolean allMatch = cs.toCompletableFuture().get(100, TimeUnit.MILLISECONDS); + assertTrue(allMatch); } } diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala index 217253373de..a7060282d27 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala @@ -16,13 +16,16 @@ package org.apache.pekko.stream.scaladsl import scala.annotation.nowarn import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ + import org.apache.pekko import pekko.Done import pekko.stream._ import pekko.stream.testkit._ import pekko.stream.testkit.scaladsl.{ TestSink, TestSource } import pekko.testkit.DefaultTimeout + import org.reactivestreams.Publisher + import org.scalatest.concurrent.ScalaFutures class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures { @@ -338,37 +341,30 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures { "The forall sink" must { - "always true forall = true" in { - // #forall - val forallSink: Sink[Int, Future[Boolean]] = Sink.forall[Int](_ > 0) - val f = Source(1 to 4).runWith(forallSink) - val result = Await.result(f, 100.millis) - // will print - // true - // #forall - result shouldBe true + "completes with `ture` when all elements match" in { + Source(1 to 4) + .runWith(Sink.forall(_ > 0)) + .futureValue shouldBe true } - "has false forall = false" in { - val forallSink: Sink[Int, Future[Boolean]] = Sink.forall[Int](_ > 2) - val f = Source(1 to 4).runWith(forallSink) - val result = Await.result(f, 100.millis) - result shouldBe false + "completes with `false` when any element match" in { + Source(1 to 4) + .runWith(Sink.forall(_ > 2)) + .futureValue shouldBe false } - "always false forall = false" in { - val forallSink: Sink[Int, Future[Boolean]] = Sink.forall[Int](_ < 0) - val f = Source(1 to 4).runWith(forallSink) - val result = Await.result(f, 100.millis) - result shouldBe false + "completes with `true` if the stream is empty" in { + Source.empty[Int] + .runWith(Sink.forall(_ > 2)) + .futureValue shouldBe true } - "empty forall = true" in { - val forallSink: Sink[Int, Future[Boolean]] = Sink.forall[Int](_ < 0) - val f = Source.empty.runWith(forallSink) - val result = Await.result(f, 100.millis) - result shouldBe true + "completes with `Failure` if the stream failed" in { + Source.failed[Int](new RuntimeException("Oops")) + .runWith(Sink.forall(_ > 2)) + .failed.futureValue shouldBe a[RuntimeException] } + } "Sink pre-materialization" must { diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala index 40db3ab2b0a..56c2eba7aac 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala @@ -14,8 +14,7 @@ package org.apache.pekko.stream.javadsl import java.util.Optional -import java.util.concurrent.CompletableFuture -import java.util.concurrent.CompletionStage +import java.util.concurrent.{ CompletableFuture, CompletionStage } import java.util.function.BiFunction import java.util.stream.Collector @@ -27,22 +26,17 @@ import scala.util.Try import org.apache.pekko import pekko._ -import pekko.actor.ActorRef -import pekko.actor.ClassicActorSystemProvider -import pekko.actor.Status +import pekko.actor.{ ActorRef, ClassicActorSystemProvider, Status } import pekko.dispatch.ExecutionContexts import pekko.japi.{ function, Util } import pekko.japi.function.Creator import pekko.stream._ import pekko.stream.impl.LinearTraversalBuilder -import pekko.stream.javadsl -import pekko.stream.scaladsl import pekko.stream.scaladsl.SinkToCompletionStage import pekko.util.FutureConverters._ import pekko.util.OptionConverters._ -import org.reactivestreams.Publisher -import org.reactivestreams.Subscriber +import org.reactivestreams.{ Publisher, Subscriber } /** Java API */ object Sink { @@ -81,17 +75,27 @@ object Sink { new Sink(scaladsl.Sink.foldAsync[U, In](zero)(f(_, _).asScala).toCompletionStage()) /** - * A `Sink` that will invoke the given predicate for every received element, giving it its previous - * output (or the given `false`) and the element as input. + * A `Sink` that will test the given predicate `p` for every received element and + * 1. completes and returns [[java.util.concurrent.CompletionStage]] of `true` if the predicate is true for all elements; + * 2. completes and returns [[java.util.concurrent.CompletionStage]] of `true` if the stream is empty (i.e. completes before signalling any elements); + * 3. completes and returns [[java.util.concurrent.CompletionStage]] of `false` if the predicate is false for any element. * - * The returned [[java.util.concurrent.CompletionStage]] will be completed with the predicate is false, or the - * the final predicate evaluation when the input stream ends, or completed with `Failure` if - * there is a failure signaled in the stream. + * The materialized value [[java.util.concurrent.CompletionStage]] will be completed with the value `true` or `false` + * when the input stream ends, or completed with `Failure` if there is a failure signaled in the stream. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Completes when''' upstream completes or the predicate `p` returns `false` + * + * '''Backpressures when''' the invocation of predicate `p` has not yet completed + * + * '''Cancels when''' predicate `p` returns `false` */ - def forall[T](predicate: function.Predicate[T]): javadsl.Sink[T, CompletionStage[java.lang.Boolean]] = - scaladsl.Sink.forall(predicate.test) - .mapMaterializedValue(_.map(Boolean.box)(ExecutionContexts.parasitic)) - .toCompletionStage().asJava + def forall[In](p: function.Predicate[In]): javadsl.Sink[In, CompletionStage[java.lang.Boolean]] = { + import pekko.util.FutureConverters._ + new Sink(scaladsl.Sink.forall[In](p.test) + .mapMaterializedValue(_.map(Boolean.box)(ExecutionContexts.parasitic).asJava)) + } /** * Creates a sink which materializes into a ``CompletionStage`` which will be completed with a result of the Java ``Collector`` diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala index 81046e5ccc5..6f608b153f9 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala @@ -441,15 +441,24 @@ object Sink { Flow[T].foldAsync(zero)(f).toMat(Sink.head)(Keep.right).named("foldAsyncSink") /** - * A `Sink` that will invoke the given predicate for every received element, giving it its previous - * output (or the given `false`) and the element as input. + * A `Sink` that will test the given predicate `p` for every received element and + * 1. completes and returns [[scala.concurrent.Future]] of `true` if the predicate is true for all elements; + * 2. completes and returns [[scala.concurrent.Future]] of `true` if the stream is empty (i.e. completes before signalling any elements); + * 3. completes and returns [[scala.concurrent.Future]] of `false` if the predicate is false for any element. * - * The returned [[scala.concurrent.Future]] will be completed with the predicate is false, or the - * the final predicate evaluation when the input stream ends, or completed with `Failure` if - * there is a failure signaled in the stream. + * The materialized value [[scala.concurrent.Future]] will be completed with the value `true` or `false` + * when the input stream ends, or completed with `Failure` if there is a failure signaled in the stream. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Completes when''' upstream completes or the predicate `p` returns `false` + * + * '''Backpressures when''' the invocation of predicate `p` has not yet completed + * + * '''Cancels when''' predicate `p` returns `false` */ - def forall[T](predicate: T => Boolean): Sink[T, Future[Boolean]] = - Flow[T].foldWhile(true)(util.ConstantFun.scalaIdentityFunction)(_ && predicate(_)) + def forall[T](p: T => Boolean): Sink[T, Future[Boolean]] = + Flow[T].foldWhile(true)(util.ConstantFun.scalaIdentityFunction)(_ && p(_)) .toMat(Sink.head)(Keep.right) .named("forallSink")