Skip to content

Commit

Permalink
chore: Update some doc and code.
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Jan 29, 2024
1 parent 380f5eb commit f4c5030
Show file tree
Hide file tree
Showing 9 changed files with 152 additions and 80 deletions.
45 changes: 34 additions & 11 deletions docs/src/main/paradox/stream/operators/Sink/exists.md
Original file line number Diff line number Diff line change
@@ -1,24 +1,47 @@
# exists
# Sink.exists

A `Sink` that will test the given predicate `p` for every received element and completes with the result.

Completes the stream with true as soon as there's an element satisfy the *predicate*, or emits false when the stream
completes and no element satisfied the *predicate*.
@ref[Sink operators](../index.md#sink-operators)

## Signature

@apidoc[Flow.exists](Flow$) { scala="#exists[I](p:Out%20=%3E%20Boolean)" java="#exists[I](p:org.apache.pekko.japi.function.Predicate[I])" }

@apidoc[Sink.exists](Sink$) { scala="#exists[I](p:Out%20=%3E%20Boolean)" java="#exists[I](p:org.apache.pekko.japi.function.Predicate[I])" }
@apidoc[Sink.exists](Sink$) { scala="#exists[T](p:T=%3EBoolean):org.apache.pekko.stream.scaladsl.Sink[T,scala.concurrent.Future[Boolean]]" java="#exists(org.apache.pekko.japi.function.Predicate)" }

## Description
`exists` applies a predicate function to assert each element received, it returns true if any elements satisfy the assertion, otherwise it returns false.

It materializes into a `Future` (in Scala) or a `CompletionStage` (in Java) that completes with the last state when the stream has finished.

Notes that if source is empty, it will return false

A `Sink` that will test the given predicate `p` for every received element and

- completes and returns @scala[`Future`] @java[`CompletionStage`] of `true` if the predicate is true for any element;
- completes and returns @scala[`Future`] @java[`CompletionStage`] of `false` if the stream is empty (i.e. completes before signalling any elements);
- completes and returns @scala[`Future`] @java[`CompletionStage`] of `false` if the predicate is false for all elements.

The materialized value @scala[`Future`] @java[`CompletionStage`] will be completed with the value `true` or `false`
when the input stream ends, or completed with `Failure` if there is a failure signaled in the stream.

Completes the stream with true as soon as there's an element satisfy the *predicate*, or emits false when the stream
completes and no element satisfied the *predicate*.
## Example

## Examples
This example tests any element in the stream is `>` 3.

Scala
: @@snip [Exists.scala](/docs/src/test/scala/docs/stream/operators/sink/Exists.scala) { #imports #exists }
: @@snip [exists.scala](/docs/src/test/scala/docs/stream/operators/sink/Exists.scala) { #exists }

Java
: @@snip [Exists.java](/docs/src/test/java/jdocs/stream/operators/sink/Exists.java) { #imports #exists }
: @@snip [exists.java](/docs/src/test/java/jdocs/stream/operators/sink/Exists.java) { #exists }

## Reactive Streams Semantics

@@@div { .callout }

***Completes*** when upstream completes or the predicate `p` returns `true`

**cancels** when predicate `p` returns `true`

**backpressures** when the invocation of predicate `p` has not yet completed

@@@
2 changes: 1 addition & 1 deletion docs/src/main/paradox/stream/operators/Sink/forall.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,4 @@ Java

**backpressures** when the invocation of predicate `p` has not yet completed

@@@
@@@
4 changes: 1 addition & 3 deletions docs/src/main/paradox/stream/operators/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ These built-in sinks are available from @scala[`org.apache.pekko.stream.scaladsl
|Sink|<a name="collection"></a>@ref[collection](Sink/collection.md)|@scala[Collect all values emitted from the stream into a collection.]@java[Operator only available in the Scala API. The closest operator in the Java API is @ref[`Sink.seq`](Sink/seq.md)].|
|Sink|<a name="combine"></a>@ref[combine](Sink/combine.md)|Combine several sinks into one using a user specified strategy|
|Sink|<a name="completionstagesink"></a>@ref[completionStageSink](Sink/completionStageSink.md)|Streams the elements to the given future sink once it successfully completes. |
|Sink|<a name="exists"></a>@ref[exists](Sink/exists.md)|Completes the stream with true as soon as there's an element satisfy the *predicate*, or emits false when the stream|
|Sink|<a name="exists"></a>@ref[exists](Sink/exists.md)|A `Sink` that will test the given predicate `p` for every received element and completes with the result.|
|Sink|<a name="fold"></a>@ref[fold](Sink/fold.md)|Fold over emitted elements with a function, where each invocation will get the new element and the result from the previous fold invocation.|
|Sink|<a name="foldwhile"></a>@ref[foldWhile](Sink/foldWhile.md)|Fold over emitted elements with a function, where each invocation will get the new element and the result from the previous fold invocation.|
|Sink|<a name="forall"></a>@ref[forall](Sink/forall.md)|A `Sink` that will test the given predicate `p` for every received element and completes with the result.|
Expand Down Expand Up @@ -156,7 +156,6 @@ depending on being backpressured by downstream or not.
|Flow|<a name="dimap"></a>@ref[dimap](Flow/dimap.md)|Transform this Flow by applying a function `f` to each *incoming* upstream element before it is passed to the Flow, and a function `g` to each *outgoing* downstream element.|
|Source/Flow|<a name="drop"></a>@ref[drop](Source-or-Flow/drop.md)|Drop `n` elements and then pass any subsequent element downstream.|
|Source/Flow|<a name="dropwhile"></a>@ref[dropWhile](Source-or-Flow/dropWhile.md)|Drop elements as long as a predicate function return true for the element|
|Source/Flow|<a name="exists"></a>@ref[exists](Source-or-Flow/exists.md)|Emits true and completes the stream as soon as there's an element satisfy the *predicate*, or emits false when upstream|
|Source/Flow|<a name="filter"></a>@ref[filter](Source-or-Flow/filter.md)|Filter the incoming elements using a predicate.|
|Source/Flow|<a name="filternot"></a>@ref[filterNot](Source-or-Flow/filterNot.md)|Filter the incoming elements using a predicate.|
|Flow|<a name="flattenoptional"></a>@ref[flattenOptional](Flow/flattenOptional.md)|Collect the value of `Optional` from all the elements passing through this flow , empty `Optional` is filtered out.|
Expand Down Expand Up @@ -447,7 +446,6 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [dropWhile](Source-or-Flow/dropWhile.md)
* [dropWithin](Source-or-Flow/dropWithin.md)
* [empty](Source/empty.md)
* [exists](Source-or-Flow/exists.md)
* [exists](Sink/exists.md)
* [expand](Source-or-Flow/expand.md)
* [extrapolate](Source-or-Flow/extrapolate.md)
Expand Down
34 changes: 10 additions & 24 deletions docs/src/test/java/jdocs/stream/operators/sink/Exists.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,40 +19,26 @@

// #imports

import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.japi.function.Predicate;
import org.apache.pekko.japi.function.Procedure;
import org.apache.pekko.stream.javadsl.Flow;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
// #imports

public class Exists {
private static final ActorSystem system = null;

private void detectAnomaly() {
private void detectAnomaly() throws Exception {
// #exists
final Source<String, NotUsed> source =
Source.from(Arrays.asList("Sun is shining", "Unidentified Object", "River is flowing"));

List<String> anomalies = Collections.singletonList("Unidentified Object");
Predicate<String> isAnomaly =
new Predicate<String>() {
@Override
public boolean test(String phenomenon) {
return anomalies.contains(phenomenon);
}
};

CompletionStage<Boolean> result = source.runWith(Sink.exists(isAnomaly), system);

result.toCompletableFuture().complete(true);
final boolean anyMatch =
Source.range(1, 4)
.runWith(Sink.exists(elem -> elem > 3), system)
.toCompletableFuture()
.get(3, TimeUnit.SECONDS);
System.out.println(anyMatch);
// Expected prints:
// true
// #exists
}
}
16 changes: 7 additions & 9 deletions docs/src/test/scala/docs/stream/operators/sink/Exists.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ package docs.stream.operators.sink
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.scaladsl._

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.DurationInt
import scala.concurrent.{ Await, ExecutionContext }
//#imports

object Exists {
Expand All @@ -31,14 +32,11 @@ object Exists {

def detectAnomaly(): Unit = {
// #exists
val source = Source(Seq("Sun is shining", "Unidentified Object", "River is flowing"))

val anomalies = Seq("Unidentified Object")
def isAnomaly(phenomenon: String): Boolean = anomalies.contains(phenomenon)

val result = source.runWith(Sink.exists(isAnomaly))
result.map(println)
// expected print:
val result = Source(1 to 4)
.runWith(Sink.exists(_ > 3))
val anyMatch = Await.result(result, 3.seconds)
println(anyMatch)
// Expect prints:
// true
// #exists
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,4 +245,13 @@ public void sinkMustBeAbleToUseForall()
boolean allMatch = cs.toCompletableFuture().get(100, TimeUnit.MILLISECONDS);
assertTrue(allMatch);
}

@Test
public void sinkMustBeAbleToUseForExists()
throws InterruptedException, ExecutionException, TimeoutException {
CompletionStage<Boolean> cs =
Source.from(Arrays.asList(1, 2, 3, 4)).runWith(Sink.exists(param -> param > 3), system);
boolean anyMatch = cs.toCompletableFuture().get(100, TimeUnit.MILLISECONDS);
assertTrue(anyMatch);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,47 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures {

}

"The exists sink" must {

"completes with `false` when none element match" in {
Source(1 to 4)
.runWith(Sink.exists[Int](_ > 5))
.futureValue shouldBe false
}

"completes with `true` when any element match" in {
Source(1 to 4)
.runWith(Sink.exists(_ > 2))
.futureValue shouldBe true
}

"completes with `false` if the stream is empty" in {
Source.empty[Int]
.runWith(Sink.exists(_ > 2))
.futureValue shouldBe false
}

"completes with `Failure` if the stream failed" in {
Source.failed[Int](new RuntimeException("Oops"))
.runWith(Sink.exists(_ > 2))
.failed.futureValue shouldBe a[RuntimeException]
}

"completes with `exists` with restart strategy" in {
val sink = Sink.exists[Int](elem => {
if (elem == 2) {
throw new RuntimeException("Oops")
}
elem > 1
}).withAttributes(supervisionStrategy(Supervision.restartingDecider))

Source(1 to 2)
.runWith(sink)
.futureValue shouldBe false
}

}

"Sink pre-materialization" must {
"materialize the sink and wrap its exposed publisher in a Source" in {
val publisherSink: Sink[String, Publisher[String]] = Sink.asPublisher[String](false)
Expand Down
41 changes: 25 additions & 16 deletions stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,31 @@ object Sink {
.mapMaterializedValue(_.map(Boolean.box)(ExecutionContexts.parasitic).asJava))
}

/**
* A `Sink` that will test the given predicate `p` for every received element and
* 1. completes and returns [[java.util.concurrent.CompletionStage]] of `true` if the predicate is true for any element;
* 2. completes and returns [[java.util.concurrent.CompletionStage]] of `false` if the stream is empty (i.e. completes before signalling any elements);
* 3. completes and returns [[java.util.concurrent.CompletionStage]] of `false` if the predicate is false for all elements.
*
* The materialized value [[java.util.concurrent.CompletionStage]] will be completed with the value `true` or `false`
* when the input stream ends, or completed with `Failure` if there is a failure signaled in the stream.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Completes when''' upstream completes or the predicate `p` returns `true`
*
* '''Backpressures when''' the invocation of predicate `p` has not yet completed
*
* '''Cancels when''' predicate `p` returns `true`
*
* @since 1.1.0
*/
def exists[In](p: function.Predicate[In]): javadsl.Sink[In, CompletionStage[java.lang.Boolean]] = {
import pekko.util.FutureConverters._
new Sink(scaladsl.Sink.exists[In](p.test)
.mapMaterializedValue(_.map(Boolean.box)(ExecutionContexts.parasitic).asJava))
}

/**
* Creates a sink which materializes into a ``CompletionStage`` which will be completed with a result of the Java ``Collector``
* transformation and reduction operations. This allows usage of Java streams transformations for reactive streams.
Expand All @@ -124,22 +149,6 @@ object Sink {
def reduce[In](f: function.Function2[In, In, In]): Sink[In, CompletionStage[In]] =
new Sink(scaladsl.Sink.reduce[In](f.apply).toCompletionStage())

/**
* A `Sink` that will invoke the given predicate for every received element.
*
* The returned [[java.util.concurrent.CompletionStage]] will be completed with true as soon as
* predicate returned true, or be completed with false if there's no element satisfy
* predicate and the stream was completed.
*
* If the stream is empty (i.e. completes before signalling any elements),
* the returned [[java.util.concurrent.CompletionStage]] will be completed immediately with true.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*/
def exists[In](p: function.Predicate[In]): Sink[In, CompletionStage[java.lang.Boolean]] =
new Sink(scaladsl.Sink.exists(p.test).mapMaterializedValue(
_.map(Boolean.box)(ExecutionContexts.parasitic)).toCompletionStage())

/**
* Helper to create [[Sink]] from `Subscriber`.
*/
Expand Down
40 changes: 24 additions & 16 deletions stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,30 @@ object Sink {
.toMat(Sink.head)(Keep.right)
.named("forallSink")

/**
* A `Sink` that will test the given predicate `p` for every received element and
* 1. completes and returns [[scala.concurrent.Future]] of `true` if the predicate is true for any element;
* 2. completes and returns [[scala.concurrent.Future]] of `false` if the stream is empty (i.e. completes before signalling any elements);
* 3. completes and returns [[scala.concurrent.Future]] of `false` if the predicate is false for all elements.
*
* The materialized value [[scala.concurrent.Future]] will be completed with the value `true` or `false`
* when the input stream ends, or completed with `Failure` if there is a failure signaled in the stream.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Completes when''' upstream completes or the predicate `p` returns `true`
*
* '''Backpressures when''' the invocation of predicate `p` has not yet completed
*
* '''Cancels when''' predicate `p` returns `true`
*
* @since 1.1.0
*/
def exists[T](p: T => Boolean): Sink[T, Future[Boolean]] =
Flow[T].foldWhile(false)(!_)(_ || p(_))
.toMat(Sink.head)(Keep.right)
.named("existsSink")

/**
* A `Sink` that will invoke the given function for every received element, giving it its previous
* output (from the second element) and the element as input.
Expand All @@ -481,22 +505,6 @@ object Sink {
def reduce[T](f: (T, T) => T): Sink[T, Future[T]] =
Flow[T].reduce(f).toMat(Sink.head)(Keep.right).named("reduceSink")

/**
* A `Sink` that will invoke the given predicate for every received element.
*
* The returned [[scala.concurrent.Future]] will be completed with true as soon as
* predicate returned true, or be completed with false if there's no element satisfy
* predicate and the stream was completed.
*
* If the stream is empty (i.e. completes before signalling any elements),
* the returned [[scala.concurrent.Future]] will be completed immediately with true.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
* @since 1.1.0
*/
def exists[T](p: T => Boolean): Sink[T, Future[Boolean]] =
Flow[T].exists(p).toMat(Sink.head)(Keep.right).named("existsSink")

/**
* A `Sink` that when the flow is completed, either through a failure or normal
* completion, apply the provided function with [[scala.util.Success]]
Expand Down

0 comments on commit f4c5030

Please sign in to comment.