Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Sink.exists operator #990

Merged
merged 3 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions docs/src/main/paradox/stream/operators/Sink/exists.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Sink.exists

A `Sink` that will test the given predicate `p` for every received element and completes with the result.

@ref[Sink operators](../index.md#sink-operators)

## Signature

@apidoc[Sink.exists](Sink$) { scala="#exists[T](p:T=%3EBoolean):org.apache.pekko.stream.scaladsl.Sink[T,scala.concurrent.Future[Boolean]]" java="#exists(org.apache.pekko.japi.function.Predicate)" }

## Description
`exists` applies a predicate function to assert each element received, it returns true if any elements satisfy the assertion, otherwise it returns false.

It materializes into a `Future` (in Scala) or a `CompletionStage` (in Java) that completes with the last state when the stream has finished.

Notes that if source is empty, it will return false

A `Sink` that will test the given predicate `p` for every received element and

- completes and returns @scala[`Future`] @java[`CompletionStage`] of `true` if the predicate is true for any element;
- completes and returns @scala[`Future`] @java[`CompletionStage`] of `false` if the stream is empty (i.e. completes before signalling any elements);
- completes and returns @scala[`Future`] @java[`CompletionStage`] of `false` if the predicate is false for all elements.

The materialized value @scala[`Future`] @java[`CompletionStage`] will be completed with the value `true` or `false`
when the input stream ends, or completed with `Failure` if there is a failure signaled in the stream.

## Example

This example tests any element in the stream is `>` 3.

Scala
: @@snip [exists.scala](/docs/src/test/scala/docs/stream/operators/sink/Exists.scala) { #exists }

Java
: @@snip [exists.java](/docs/src/test/java/jdocs/stream/operators/sink/Exists.java) { #exists }

## Reactive Streams Semantics

@@@div { .callout }

***Completes*** when upstream completes or the predicate `p` returns `true`

**cancels** when predicate `p` returns `true`

**backpressures** when the invocation of predicate `p` has not yet completed

@@@
2 changes: 1 addition & 1 deletion docs/src/main/paradox/stream/operators/Sink/forall.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,4 @@ Java

**backpressures** when the invocation of predicate `p` has not yet completed

@@@
@@@
2 changes: 2 additions & 0 deletions docs/src/main/paradox/stream/operators/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ These built-in sinks are available from @scala[`org.apache.pekko.stream.scaladsl
|Sink|<a name="collection"></a>@ref[collection](Sink/collection.md)|@scala[Collect all values emitted from the stream into a collection.]@java[Operator only available in the Scala API. The closest operator in the Java API is @ref[`Sink.seq`](Sink/seq.md)].|
|Sink|<a name="combine"></a>@ref[combine](Sink/combine.md)|Combine several sinks into one using a user specified strategy|
|Sink|<a name="completionstagesink"></a>@ref[completionStageSink](Sink/completionStageSink.md)|Streams the elements to the given future sink once it successfully completes. |
|Sink|<a name="exists"></a>@ref[exists](Sink/exists.md)|A `Sink` that will test the given predicate `p` for every received element and completes with the result.|
|Sink|<a name="fold"></a>@ref[fold](Sink/fold.md)|Fold over emitted elements with a function, where each invocation will get the new element and the result from the previous fold invocation.|
|Sink|<a name="foldwhile"></a>@ref[foldWhile](Sink/foldWhile.md)|Fold over emitted elements with a function, where each invocation will get the new element and the result from the previous fold invocation.|
|Sink|<a name="forall"></a>@ref[forall](Sink/forall.md)|A `Sink` that will test the given predicate `p` for every received element and completes with the result.|
Expand Down Expand Up @@ -445,6 +446,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [dropWhile](Source-or-Flow/dropWhile.md)
* [dropWithin](Source-or-Flow/dropWithin.md)
* [empty](Source/empty.md)
* [exists](Sink/exists.md)
* [expand](Source-or-Flow/expand.md)
* [extrapolate](Source-or-Flow/extrapolate.md)
* [failed](Source/failed.md)
Expand Down
44 changes: 44 additions & 0 deletions docs/src/test/java/jdocs/stream/operators/sink/Exists.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package jdocs.stream.operators.sink;

// #imports

import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;

import java.util.concurrent.TimeUnit;
// #imports

public class Exists {
private static final ActorSystem system = null;

private void existsExample() throws Exception {
// #exists
final boolean anyMatch =
Source.range(1, 4)
.runWith(Sink.exists(elem -> elem > 3), system)
.toCompletableFuture()
.get(3, TimeUnit.SECONDS);
System.out.println(anyMatch);
// Expected prints:
// true
// #exists
}
}
44 changes: 44 additions & 0 deletions docs/src/test/scala/docs/stream/operators/sink/Exists.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package docs.stream.operators.sink

//#imports
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.scaladsl._

import scala.concurrent.duration.DurationInt
import scala.concurrent.{ Await, ExecutionContext }
//#imports

object Exists {

implicit val system: ActorSystem = null
implicit val ec: ExecutionContext = system.dispatcher

def existsExample(): Unit = {
// #exists
val result = Source(1 to 4)
.runWith(Sink.exists(_ > 3))
val anyMatch = Await.result(result, 3.seconds)
println(anyMatch)
// Expect prints:
// true
// #exists
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -245,4 +245,13 @@ public void sinkMustBeAbleToUseForall()
boolean allMatch = cs.toCompletableFuture().get(100, TimeUnit.MILLISECONDS);
assertTrue(allMatch);
}

@Test
public void sinkMustBeAbleToUseForExists()
throws InterruptedException, ExecutionException, TimeoutException {
CompletionStage<Boolean> cs =
Source.from(Arrays.asList(1, 2, 3, 4)).runWith(Sink.exists(param -> param > 3), system);
boolean anyMatch = cs.toCompletableFuture().get(100, TimeUnit.MILLISECONDS);
assertTrue(anyMatch);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,47 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures {

}

"The exists sink" must {

"completes with `false` when none element match" in {
Source(1 to 4)
.runWith(Sink.exists[Int](_ > 5))
.futureValue shouldBe false
}

"completes with `true` when any element match" in {
Source(1 to 4)
.runWith(Sink.exists(_ > 2))
.futureValue shouldBe true
}

"completes with `false` if the stream is empty" in {
Source.empty[Int]
.runWith(Sink.exists(_ > 2))
.futureValue shouldBe false
}

"completes with `Failure` if the stream failed" in {
Source.failed[Int](new RuntimeException("Oops"))
.runWith(Sink.exists(_ > 2))
.failed.futureValue shouldBe a[RuntimeException]
}

"completes with `exists` with restart strategy" in {
val sink = Sink.exists[Int](elem => {
if (elem == 2) {
throw new RuntimeException("Oops")
}
elem > 1
}).withAttributes(supervisionStrategy(Supervision.restartingDecider))

Source(1 to 2)
.runWith(sink)
.futureValue shouldBe false
}

}

"Sink pre-materialization" must {
"materialize the sink and wrap its exposed publisher in a Source" in {
val publisherSink: Sink[String, Publisher[String]] = Sink.asPublisher[String](false)
Expand Down
25 changes: 25 additions & 0 deletions stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,31 @@ object Sink {
.mapMaterializedValue(_.map(Boolean.box)(ExecutionContexts.parasitic).asJava))
}

/**
* A `Sink` that will test the given predicate `p` for every received element and
* 1. completes and returns [[java.util.concurrent.CompletionStage]] of `true` if the predicate is true for any element;
* 2. completes and returns [[java.util.concurrent.CompletionStage]] of `false` if the stream is empty (i.e. completes before signalling any elements);
* 3. completes and returns [[java.util.concurrent.CompletionStage]] of `false` if the predicate is false for all elements.
*
* The materialized value [[java.util.concurrent.CompletionStage]] will be completed with the value `true` or `false`
* when the input stream ends, or completed with `Failure` if there is a failure signaled in the stream.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Completes when''' upstream completes or the predicate `p` returns `true`
*
* '''Backpressures when''' the invocation of predicate `p` has not yet completed
*
* '''Cancels when''' predicate `p` returns `true`
*
* @since 1.1.0
*/
def exists[In](p: function.Predicate[In]): javadsl.Sink[In, CompletionStage[java.lang.Boolean]] = {
import pekko.util.FutureConverters._
new Sink(scaladsl.Sink.exists[In](p.test)
.mapMaterializedValue(_.map(Boolean.box)(ExecutionContexts.parasitic).asJava))
}

/**
* Creates a sink which materializes into a ``CompletionStage`` which will be completed with a result of the Java ``Collector``
* transformation and reduction operations. This allows usage of Java streams transformations for reactive streams.
Expand Down
24 changes: 24 additions & 0 deletions stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,30 @@ object Sink {
.toMat(Sink.head)(Keep.right)
.named("forallSink")

/**
* A `Sink` that will test the given predicate `p` for every received element and
* 1. completes and returns [[scala.concurrent.Future]] of `true` if the predicate is true for any element;
* 2. completes and returns [[scala.concurrent.Future]] of `false` if the stream is empty (i.e. completes before signalling any elements);
* 3. completes and returns [[scala.concurrent.Future]] of `false` if the predicate is false for all elements.
*
* The materialized value [[scala.concurrent.Future]] will be completed with the value `true` or `false`
* when the input stream ends, or completed with `Failure` if there is a failure signaled in the stream.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Completes when''' upstream completes or the predicate `p` returns `true`
*
* '''Backpressures when''' the invocation of predicate `p` has not yet completed
*
* '''Cancels when''' predicate `p` returns `true`
*
* @since 1.1.0
*/
def exists[T](p: T => Boolean): Sink[T, Future[Boolean]] =
Flow[T].foldWhile(false)(!_)(_ || p(_))
.toMat(Sink.head)(Keep.right)
.named("existsSink")

/**
* A `Sink` that will invoke the given function for every received element, giving it its previous
* output (from the second element) and the element as input.
Expand Down
Loading