diff --git a/docs/src/main/paradox/stream/operators/Sink/exists.md b/docs/src/main/paradox/stream/operators/Sink/exists.md index c76d2e65bcc..c1115391378 100644 --- a/docs/src/main/paradox/stream/operators/Sink/exists.md +++ b/docs/src/main/paradox/stream/operators/Sink/exists.md @@ -1,24 +1,47 @@ -# exists +# Sink.exists + +A `Sink` that will test the given predicate `p` for every received element and completes with the result. -Completes the stream with true as soon as there's an element satisfy the *predicate*, or emits false when the stream -completes and no element satisfied the *predicate*. @ref[Sink operators](../index.md#sink-operators) ## Signature -@apidoc[Flow.exists](Flow$) { scala="#exists[I](p:Out%20=%3E%20Boolean)" java="#exists[I](p:org.apache.pekko.japi.function.Predicate[I])" } - -@apidoc[Sink.exists](Sink$) { scala="#exists[I](p:Out%20=%3E%20Boolean)" java="#exists[I](p:org.apache.pekko.japi.function.Predicate[I])" } +@apidoc[Sink.exists](Sink$) { scala="#exists[T](p:T=%3EBoolean):org.apache.pekko.stream.scaladsl.Sink[T,scala.concurrent.Future[Boolean]]" java="#exists(org.apache.pekko.japi.function.Predicate)" } ## Description +`exists` applies a predicate function to assert each element received, it returns true if any elements satisfy the assertion, otherwise it returns false. + +It materializes into a `Future` (in Scala) or a `CompletionStage` (in Java) that completes with the last state when the stream has finished. + +Notes that if source is empty, it will return false + +A `Sink` that will test the given predicate `p` for every received element and + +- completes and returns @scala[`Future`] @java[`CompletionStage`] of `true` if the predicate is true for any element; +- completes and returns @scala[`Future`] @java[`CompletionStage`] of `false` if the stream is empty (i.e. completes before signalling any elements); +- completes and returns @scala[`Future`] @java[`CompletionStage`] of `false` if the predicate is false for all elements. + +The materialized value @scala[`Future`] @java[`CompletionStage`] will be completed with the value `true` or `false` +when the input stream ends, or completed with `Failure` if there is a failure signaled in the stream. -Completes the stream with true as soon as there's an element satisfy the *predicate*, or emits false when the stream -completes and no element satisfied the *predicate*. +## Example -## Examples +This example tests any element in the stream is `>` 3. Scala -: @@snip [Exists.scala](/docs/src/test/scala/docs/stream/operators/sink/Exists.scala) { #imports #exists } +: @@snip [exists.scala](/docs/src/test/scala/docs/stream/operators/sink/Exists.scala) { #exists } Java -: @@snip [Exists.java](/docs/src/test/java/jdocs/stream/operators/sink/Exists.java) { #imports #exists } \ No newline at end of file +: @@snip [exists.java](/docs/src/test/java/jdocs/stream/operators/sink/Exists.java) { #exists } + +## Reactive Streams Semantics + +@@@div { .callout } + +***Completes*** when upstream completes or the predicate `p` returns `true` + +**cancels** when predicate `p` returns `true` + +**backpressures** when the invocation of predicate `p` has not yet completed + +@@@ diff --git a/docs/src/main/paradox/stream/operators/Sink/forall.md b/docs/src/main/paradox/stream/operators/Sink/forall.md index 387f5b2fb6b..a1a6cc27323 100644 --- a/docs/src/main/paradox/stream/operators/Sink/forall.md +++ b/docs/src/main/paradox/stream/operators/Sink/forall.md @@ -44,4 +44,4 @@ Java **backpressures** when the invocation of predicate `p` has not yet completed -@@@ \ No newline at end of file +@@@ diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md index 9fabc33f5fd..908320c26d6 100644 --- a/docs/src/main/paradox/stream/operators/index.md +++ b/docs/src/main/paradox/stream/operators/index.md @@ -60,7 +60,7 @@ These built-in sinks are available from @scala[`org.apache.pekko.stream.scaladsl |Sink|@ref[collection](Sink/collection.md)|@scala[Collect all values emitted from the stream into a collection.]@java[Operator only available in the Scala API. The closest operator in the Java API is @ref[`Sink.seq`](Sink/seq.md)].| |Sink|@ref[combine](Sink/combine.md)|Combine several sinks into one using a user specified strategy| |Sink|@ref[completionStageSink](Sink/completionStageSink.md)|Streams the elements to the given future sink once it successfully completes. | -|Sink|@ref[exists](Sink/exists.md)|Completes the stream with true as soon as there's an element satisfy the *predicate*, or emits false when the stream| +|Sink|@ref[exists](Sink/exists.md)|A `Sink` that will test the given predicate `p` for every received element and completes with the result.| |Sink|@ref[fold](Sink/fold.md)|Fold over emitted elements with a function, where each invocation will get the new element and the result from the previous fold invocation.| |Sink|@ref[foldWhile](Sink/foldWhile.md)|Fold over emitted elements with a function, where each invocation will get the new element and the result from the previous fold invocation.| |Sink|@ref[forall](Sink/forall.md)|A `Sink` that will test the given predicate `p` for every received element and completes with the result.| @@ -156,7 +156,6 @@ depending on being backpressured by downstream or not. |Flow|@ref[dimap](Flow/dimap.md)|Transform this Flow by applying a function `f` to each *incoming* upstream element before it is passed to the Flow, and a function `g` to each *outgoing* downstream element.| |Source/Flow|@ref[drop](Source-or-Flow/drop.md)|Drop `n` elements and then pass any subsequent element downstream.| |Source/Flow|@ref[dropWhile](Source-or-Flow/dropWhile.md)|Drop elements as long as a predicate function return true for the element| -|Source/Flow|@ref[exists](Source-or-Flow/exists.md)|Emits true and completes the stream as soon as there's an element satisfy the *predicate*, or emits false when upstream| |Source/Flow|@ref[filter](Source-or-Flow/filter.md)|Filter the incoming elements using a predicate.| |Source/Flow|@ref[filterNot](Source-or-Flow/filterNot.md)|Filter the incoming elements using a predicate.| |Flow|@ref[flattenOptional](Flow/flattenOptional.md)|Collect the value of `Optional` from all the elements passing through this flow , empty `Optional` is filtered out.| @@ -447,7 +446,6 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [dropWhile](Source-or-Flow/dropWhile.md) * [dropWithin](Source-or-Flow/dropWithin.md) * [empty](Source/empty.md) -* [exists](Source-or-Flow/exists.md) * [exists](Sink/exists.md) * [expand](Source-or-Flow/expand.md) * [extrapolate](Source-or-Flow/extrapolate.md) diff --git a/docs/src/test/java/jdocs/stream/operators/sink/Exists.java b/docs/src/test/java/jdocs/stream/operators/sink/Exists.java index 158a3f05e2f..95b75ef6770 100644 --- a/docs/src/test/java/jdocs/stream/operators/sink/Exists.java +++ b/docs/src/test/java/jdocs/stream/operators/sink/Exists.java @@ -19,40 +19,26 @@ // #imports -import org.apache.pekko.NotUsed; import org.apache.pekko.actor.ActorSystem; -import org.apache.pekko.japi.function.Predicate; -import org.apache.pekko.japi.function.Procedure; -import org.apache.pekko.stream.javadsl.Flow; import org.apache.pekko.stream.javadsl.Sink; import org.apache.pekko.stream.javadsl.Source; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; // #imports public class Exists { private static final ActorSystem system = null; - private void detectAnomaly() { + private void detectAnomaly() throws Exception { // #exists - final Source source = - Source.from(Arrays.asList("Sun is shining", "Unidentified Object", "River is flowing")); - - List anomalies = Collections.singletonList("Unidentified Object"); - Predicate isAnomaly = - new Predicate() { - @Override - public boolean test(String phenomenon) { - return anomalies.contains(phenomenon); - } - }; - - CompletionStage result = source.runWith(Sink.exists(isAnomaly), system); - - result.toCompletableFuture().complete(true); + final boolean anyMatch = + Source.range(1, 4) + .runWith(Sink.exists(elem -> elem > 3), system) + .toCompletableFuture() + .get(3, TimeUnit.SECONDS); + System.out.println(anyMatch); + // Expected prints: + // true // #exists } } diff --git a/docs/src/test/scala/docs/stream/operators/sink/Exists.scala b/docs/src/test/scala/docs/stream/operators/sink/Exists.scala index 89d4c180e3f..f796085d920 100644 --- a/docs/src/test/scala/docs/stream/operators/sink/Exists.scala +++ b/docs/src/test/scala/docs/stream/operators/sink/Exists.scala @@ -21,7 +21,8 @@ package docs.stream.operators.sink import org.apache.pekko.actor.ActorSystem import org.apache.pekko.stream.scaladsl._ -import scala.concurrent.ExecutionContext +import scala.concurrent.duration.DurationInt +import scala.concurrent.{ Await, ExecutionContext } //#imports object Exists { @@ -31,14 +32,11 @@ object Exists { def detectAnomaly(): Unit = { // #exists - val source = Source(Seq("Sun is shining", "Unidentified Object", "River is flowing")) - - val anomalies = Seq("Unidentified Object") - def isAnomaly(phenomenon: String): Boolean = anomalies.contains(phenomenon) - - val result = source.runWith(Sink.exists(isAnomaly)) - result.map(println) - // expected print: + val result = Source(1 to 4) + .runWith(Sink.exists(_ > 3)) + val anyMatch = Await.result(result, 3.seconds) + println(anyMatch) + // Expect prints: // true // #exists } diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java index ed4d2838674..652754ce8ae 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java @@ -245,4 +245,13 @@ public void sinkMustBeAbleToUseForall() boolean allMatch = cs.toCompletableFuture().get(100, TimeUnit.MILLISECONDS); assertTrue(allMatch); } + + @Test + public void sinkMustBeAbleToUseForExists() + throws InterruptedException, ExecutionException, TimeoutException { + CompletionStage cs = + Source.from(Arrays.asList(1, 2, 3, 4)).runWith(Sink.exists(param -> param > 3), system); + boolean anyMatch = cs.toCompletableFuture().get(100, TimeUnit.MILLISECONDS); + assertTrue(anyMatch); + } } diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala index 2ab8601998e..8368d7a96c5 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala @@ -381,6 +381,47 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures { } + "The exists sink" must { + + "completes with `false` when none element match" in { + Source(1 to 4) + .runWith(Sink.exists[Int](_ > 5)) + .futureValue shouldBe false + } + + "completes with `true` when any element match" in { + Source(1 to 4) + .runWith(Sink.exists(_ > 2)) + .futureValue shouldBe true + } + + "completes with `false` if the stream is empty" in { + Source.empty[Int] + .runWith(Sink.exists(_ > 2)) + .futureValue shouldBe false + } + + "completes with `Failure` if the stream failed" in { + Source.failed[Int](new RuntimeException("Oops")) + .runWith(Sink.exists(_ > 2)) + .failed.futureValue shouldBe a[RuntimeException] + } + + "completes with `exists` with restart strategy" in { + val sink = Sink.exists[Int](elem => { + if (elem == 2) { + throw new RuntimeException("Oops") + } + elem > 1 + }).withAttributes(supervisionStrategy(Supervision.restartingDecider)) + + Source(1 to 2) + .runWith(sink) + .futureValue shouldBe false + } + + } + "Sink pre-materialization" must { "materialize the sink and wrap its exposed publisher in a Source" in { val publisherSink: Sink[String, Publisher[String]] = Sink.asPublisher[String](false) diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala index 2f2d83e74f2..0805e5df74c 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala @@ -99,6 +99,31 @@ object Sink { .mapMaterializedValue(_.map(Boolean.box)(ExecutionContexts.parasitic).asJava)) } + /** + * A `Sink` that will test the given predicate `p` for every received element and + * 1. completes and returns [[java.util.concurrent.CompletionStage]] of `true` if the predicate is true for any element; + * 2. completes and returns [[java.util.concurrent.CompletionStage]] of `false` if the stream is empty (i.e. completes before signalling any elements); + * 3. completes and returns [[java.util.concurrent.CompletionStage]] of `false` if the predicate is false for all elements. + * + * The materialized value [[java.util.concurrent.CompletionStage]] will be completed with the value `true` or `false` + * when the input stream ends, or completed with `Failure` if there is a failure signaled in the stream. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Completes when''' upstream completes or the predicate `p` returns `true` + * + * '''Backpressures when''' the invocation of predicate `p` has not yet completed + * + * '''Cancels when''' predicate `p` returns `true` + * + * @since 1.1.0 + */ + def exists[In](p: function.Predicate[In]): javadsl.Sink[In, CompletionStage[java.lang.Boolean]] = { + import pekko.util.FutureConverters._ + new Sink(scaladsl.Sink.exists[In](p.test) + .mapMaterializedValue(_.map(Boolean.box)(ExecutionContexts.parasitic).asJava)) + } + /** * Creates a sink which materializes into a ``CompletionStage`` which will be completed with a result of the Java ``Collector`` * transformation and reduction operations. This allows usage of Java streams transformations for reactive streams. @@ -124,22 +149,6 @@ object Sink { def reduce[In](f: function.Function2[In, In, In]): Sink[In, CompletionStage[In]] = new Sink(scaladsl.Sink.reduce[In](f.apply).toCompletionStage()) - /** - * A `Sink` that will invoke the given predicate for every received element. - * - * The returned [[java.util.concurrent.CompletionStage]] will be completed with true as soon as - * predicate returned true, or be completed with false if there's no element satisfy - * predicate and the stream was completed. - * - * If the stream is empty (i.e. completes before signalling any elements), - * the returned [[java.util.concurrent.CompletionStage]] will be completed immediately with true. - * - * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. - */ - def exists[In](p: function.Predicate[In]): Sink[In, CompletionStage[java.lang.Boolean]] = - new Sink(scaladsl.Sink.exists(p.test).mapMaterializedValue( - _.map(Boolean.box)(ExecutionContexts.parasitic)).toCompletionStage()) - /** * Helper to create [[Sink]] from `Subscriber`. */ diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala index 6d086ad293f..841685e7951 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala @@ -464,6 +464,30 @@ object Sink { .toMat(Sink.head)(Keep.right) .named("forallSink") + /** + * A `Sink` that will test the given predicate `p` for every received element and + * 1. completes and returns [[scala.concurrent.Future]] of `true` if the predicate is true for any element; + * 2. completes and returns [[scala.concurrent.Future]] of `false` if the stream is empty (i.e. completes before signalling any elements); + * 3. completes and returns [[scala.concurrent.Future]] of `false` if the predicate is false for all elements. + * + * The materialized value [[scala.concurrent.Future]] will be completed with the value `true` or `false` + * when the input stream ends, or completed with `Failure` if there is a failure signaled in the stream. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Completes when''' upstream completes or the predicate `p` returns `true` + * + * '''Backpressures when''' the invocation of predicate `p` has not yet completed + * + * '''Cancels when''' predicate `p` returns `true` + * + * @since 1.1.0 + */ + def exists[T](p: T => Boolean): Sink[T, Future[Boolean]] = + Flow[T].foldWhile(false)(!_)(_ || p(_)) + .toMat(Sink.head)(Keep.right) + .named("existsSink") + /** * A `Sink` that will invoke the given function for every received element, giving it its previous * output (from the second element) and the element as input. @@ -481,22 +505,6 @@ object Sink { def reduce[T](f: (T, T) => T): Sink[T, Future[T]] = Flow[T].reduce(f).toMat(Sink.head)(Keep.right).named("reduceSink") - /** - * A `Sink` that will invoke the given predicate for every received element. - * - * The returned [[scala.concurrent.Future]] will be completed with true as soon as - * predicate returned true, or be completed with false if there's no element satisfy - * predicate and the stream was completed. - * - * If the stream is empty (i.e. completes before signalling any elements), - * the returned [[scala.concurrent.Future]] will be completed immediately with true. - * - * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. - * @since 1.1.0 - */ - def exists[T](p: T => Boolean): Sink[T, Future[Boolean]] = - Flow[T].exists(p).toMat(Sink.head)(Keep.right).named("existsSink") - /** * A `Sink` that when the flow is completed, either through a failure or normal * completion, apply the provided function with [[scala.util.Success]]