Skip to content

Commit

Permalink
chore: Polish the api and implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Jan 27, 2024
1 parent 97eb4e7 commit d837002
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 66 deletions.
25 changes: 18 additions & 7 deletions docs/src/main/paradox/stream/operators/Sink/forall.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# Sink.forall

Apply a predicate function to assert each element received, it returns true if all elements satisfy the assertion, otherwise it returns false.
A `Sink` that will test the given predicate `p` for every received element and completes with the result.

@ref[Sink operators](../index.md#sink-operators)

## Signature

@apidoc[Sink.forall](Sink$) { scala="#forall(predicate:T=>U):org.apache.pekko.stream.scaladsl.Sink[T,scala.concurrent.Future[Boolean]]" java="#forall(org.apache.pekko.japi.function.Predicate)" }
@apidoc[Sink.forall](Sink$) { scala="#forall%5BT%5D(p%3A%20T%20%3D%3E%20Boolean):org.apache.pekko.stream.scaladsl.Sink[T,scala.concurrent.Future[Boolean]]" java="#forall(org.apache.pekko.japi.function.Predicate)" }

## Description
forall applies a predicate function to assert each element received, it returns true if all elements satisfy the assertion, otherwise it returns false.
Expand All @@ -15,22 +15,33 @@ It materializes into a `Future` (in Scala) or a `CompletionStage` (in Java) that

Notes that if source is empty, it will return true

A `Sink` that will test the given predicate `p` for every received element and

- completes and returns @scala[`Future`] @java[`CompletionStage`] of `true` if the predicate is true for all elements;
- completes and returns @scala[`Future`] @java[`CompletionStage`] of `true` if the stream is empty (i.e. completes before signalling any elements);
- completes and returns @scala[`Future`] @java[`CompletionStage`] of `false` if the predicate is false for any element.

The materialized value @scala[`Future`] @java[`CompletionStage`] will be completed with the value `true` or `false`
when the input stream ends, or completed with `Failure` if there is a failure signaled in the stream.

## Example

This example reads a stream of positive integers, asserts that all numbers are greater than 0, and finally prints the assertion result.
This example tests all elements in the stream is `<=` 100.

Scala
: @@snip [snip](/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala) { #forall }
: @@snip [ForAll.scala](/docs/src/test/scala/docs/stream/operators/sink/ForAll.scala) { #forall }

Java
: @@snip [snip](/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java) { #forall }
: @@snip [ForAll.java](/docs/src/test/java/jdocs/stream/operators/sink/ForAll.java) { #forall }

## Reactive Streams Semantics

@@@div { .callout }

**cancels** never
***Completes*** when upstream completes or the predicate `p` returns `false`

**cancels** when predicate `p` returns `false`

**backpressures** when the predicate function invocation has not yet completed
**backpressures** when the invocation of predicate `p` has not yet completed

@@@
2 changes: 1 addition & 1 deletion docs/src/main/paradox/stream/operators/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ These built-in sinks are available from @scala[`org.apache.pekko.stream.scaladsl
|Sink|<a name="completionstagesink"></a>@ref[completionStageSink](Sink/completionStageSink.md)|Streams the elements to the given future sink once it successfully completes. |
|Sink|<a name="fold"></a>@ref[fold](Sink/fold.md)|Fold over emitted elements with a function, where each invocation will get the new element and the result from the previous fold invocation.|
|Sink|<a name="foldwhile"></a>@ref[foldWhile](Sink/foldWhile.md)|Fold over emitted elements with a function, where each invocation will get the new element and the result from the previous fold invocation.|
|Sink|<a name="forall"></a>@ref[forall](Sink/forall.md)|Apply a predicate function to assert each element received, it returns true if all elements satisfy the assertion, otherwise it returns false.|
|Sink|<a name="forall"></a>@ref[forall](Sink/forall.md)|A `Sink` that will test the given predicate `p` for every received element and completes with the result.|
|Sink|<a name="foreach"></a>@ref[foreach](Sink/foreach.md)|Invoke a given procedure for each element received.|
|Sink|<a name="foreachasync"></a>@ref[foreachAsync](Sink/foreachAsync.md)|Invoke a given procedure asynchronously for each element received.|
|Sink|<a name="foreachparallel"></a>@ref[foreachParallel](Sink/foreachParallel.md)|Like `foreach` but allows up to `parallellism` procedure calls to happen in parallel.|
Expand Down
41 changes: 41 additions & 0 deletions docs/src/test/java/jdocs/stream/operators/sink/ForAll.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package jdocs.stream.operators.sink;

import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;

import java.util.concurrent.TimeUnit;

public class ForAll {
private ActorSystem system = null;

public void forAllUsage() throws Exception {
// #forall
final boolean allMatch =
Source.range(1, 100)
.runWith(Sink.forall(elem -> elem <= 100), system)
.toCompletableFuture()
.get(3, TimeUnit.SECONDS);
System.out.println(allMatch);
// Expect prints:
// true
// #forall
}
}
40 changes: 40 additions & 0 deletions docs/src/test/scala/docs/stream/operators/sink/ForAll.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package docs.stream.operators.sink

import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.scaladsl.{ Sink, Source }

import scala.concurrent.duration.DurationInt
import scala.concurrent.{ Await, ExecutionContextExecutor, Future }

object ForAll {
implicit val system: ActorSystem = ???
implicit val ec: ExecutionContextExecutor = system.dispatcher
def foldExample: Unit = {
// #forall
val result: Future[Boolean] =
Source(1 to 100)
.runWith(Sink.forall(_ <= 100))
val allMatch = Await.result(result, 3.seconds)
println(allMatch)
// Expect prints:
// true
// #forall
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -238,16 +238,11 @@ public void sinkForeachMustBeDocumented()
}

@Test
public void sinkForallMustBeDocumented()
public void sinkMustBeAbleToUseForall()
throws InterruptedException, ExecutionException, TimeoutException {
// #forall
Sink<Integer, CompletionStage<Boolean>> forallSink = Sink.forall(param -> param > 0);
CompletionStage<Boolean> cs =
Source.from(Arrays.asList(1, 2, 3, 4)).runWith(forallSink, system);
Boolean predicate = cs.toCompletableFuture().get(100, TimeUnit.MILLISECONDS);
// will print
// true
// #forall
assertEquals(predicate, true);
Source.from(Arrays.asList(1, 2, 3, 4)).runWith(Sink.forall(param -> param > 0), system);
boolean allMatch = cs.toCompletableFuture().get(100, TimeUnit.MILLISECONDS);
assertTrue(allMatch);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@ package org.apache.pekko.stream.scaladsl
import scala.annotation.nowarn
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._

import org.apache.pekko
import pekko.Done
import pekko.stream._
import pekko.stream.testkit._
import pekko.stream.testkit.scaladsl.{ TestSink, TestSource }
import pekko.testkit.DefaultTimeout

import org.reactivestreams.Publisher

import org.scalatest.concurrent.ScalaFutures

class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
Expand Down Expand Up @@ -338,37 +341,30 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures {

"The forall sink" must {

"always true forall = true" in {
// #forall
val forallSink: Sink[Int, Future[Boolean]] = Sink.forall[Int](_ > 0)
val f = Source(1 to 4).runWith(forallSink)
val result = Await.result(f, 100.millis)
// will print
// true
// #forall
result shouldBe true
"completes with `ture` when all elements match" in {
Source(1 to 4)
.runWith(Sink.forall(_ > 0))
.futureValue shouldBe true
}

"has false forall = false" in {
val forallSink: Sink[Int, Future[Boolean]] = Sink.forall[Int](_ > 2)
val f = Source(1 to 4).runWith(forallSink)
val result = Await.result(f, 100.millis)
result shouldBe false
"completes with `false` when any element match" in {
Source(1 to 4)
.runWith(Sink.forall(_ > 2))
.futureValue shouldBe false
}

"always false forall = false" in {
val forallSink: Sink[Int, Future[Boolean]] = Sink.forall[Int](_ < 0)
val f = Source(1 to 4).runWith(forallSink)
val result = Await.result(f, 100.millis)
result shouldBe false
"completes with `true` if the stream is empty" in {
Source.empty[Int]
.runWith(Sink.forall(_ > 2))
.futureValue shouldBe true
}

"empty forall = true" in {
val forallSink: Sink[Int, Future[Boolean]] = Sink.forall[Int](_ < 0)
val f = Source.empty.runWith(forallSink)
val result = Await.result(f, 100.millis)
result shouldBe true
"completes with `Failure` if the stream failed" in {
Source.failed[Int](new RuntimeException("Oops"))
.runWith(Sink.forall(_ > 2))
.failed.futureValue shouldBe a[RuntimeException]
}

}

"Sink pre-materialization" must {
Expand Down
40 changes: 22 additions & 18 deletions stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
package org.apache.pekko.stream.javadsl

import java.util.Optional
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionStage
import java.util.concurrent.{ CompletableFuture, CompletionStage }
import java.util.function.BiFunction
import java.util.stream.Collector

Expand All @@ -27,22 +26,17 @@ import scala.util.Try

import org.apache.pekko
import pekko._
import pekko.actor.ActorRef
import pekko.actor.ClassicActorSystemProvider
import pekko.actor.Status
import pekko.actor.{ ActorRef, ClassicActorSystemProvider, Status }
import pekko.dispatch.ExecutionContexts
import pekko.japi.{ function, Util }
import pekko.japi.function.Creator
import pekko.stream._
import pekko.stream.impl.LinearTraversalBuilder
import pekko.stream.javadsl
import pekko.stream.scaladsl
import pekko.stream.scaladsl.SinkToCompletionStage
import pekko.util.FutureConverters._
import pekko.util.OptionConverters._

import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import org.reactivestreams.{ Publisher, Subscriber }

/** Java API */
object Sink {
Expand Down Expand Up @@ -81,17 +75,27 @@ object Sink {
new Sink(scaladsl.Sink.foldAsync[U, In](zero)(f(_, _).asScala).toCompletionStage())

/**
* A `Sink` that will invoke the given predicate for every received element, giving it its previous
* output (or the given `false`) and the element as input.
* A `Sink` that will test the given predicate `p` for every received element and
* 1. completes and returns [[java.util.concurrent.CompletionStage]] of `true` if the predicate is true for all elements;
* 2. completes and returns [[java.util.concurrent.CompletionStage]] of `true` if the stream is empty (i.e. completes before signalling any elements);
* 3. completes and returns [[java.util.concurrent.CompletionStage]] of `false` if the predicate is false for any element.
*
* The returned [[java.util.concurrent.CompletionStage]] will be completed with the predicate is false, or the
* the final predicate evaluation when the input stream ends, or completed with `Failure` if
* there is a failure signaled in the stream.
* The materialized value [[java.util.concurrent.CompletionStage]] will be completed with the value `true` or `false`
* when the input stream ends, or completed with `Failure` if there is a failure signaled in the stream.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Completes when''' upstream completes or the predicate `p` returns `false`
*
* '''Backpressures when''' the invocation of predicate `p` has not yet completed
*
* '''Cancels when''' predicate `p` returns `false`
*/
def forall[T](predicate: function.Predicate[T]): javadsl.Sink[T, CompletionStage[java.lang.Boolean]] =
scaladsl.Sink.forall(predicate.test)
.mapMaterializedValue(_.map(Boolean.box)(ExecutionContexts.parasitic))
.toCompletionStage().asJava
def forall[In](p: function.Predicate[In]): javadsl.Sink[In, CompletionStage[java.lang.Boolean]] = {
import pekko.util.FutureConverters._
new Sink(scaladsl.Sink.forall[In](p.test)
.mapMaterializedValue(_.map(Boolean.box)(ExecutionContexts.parasitic).asJava))
}

/**
* Creates a sink which materializes into a ``CompletionStage`` which will be completed with a result of the Java ``Collector``
Expand Down
23 changes: 16 additions & 7 deletions stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -441,15 +441,24 @@ object Sink {
Flow[T].foldAsync(zero)(f).toMat(Sink.head)(Keep.right).named("foldAsyncSink")

/**
* A `Sink` that will invoke the given predicate for every received element, giving it its previous
* output (or the given `false`) and the element as input.
* A `Sink` that will test the given predicate `p` for every received element and
* 1. completes and returns [[scala.concurrent.Future]] of `true` if the predicate is true for all elements;
* 2. completes and returns [[scala.concurrent.Future]] of `true` if the stream is empty (i.e. completes before signalling any elements);
* 3. completes and returns [[scala.concurrent.Future]] of `false` if the predicate is false for any element.
*
* The returned [[scala.concurrent.Future]] will be completed with the predicate is false, or the
* the final predicate evaluation when the input stream ends, or completed with `Failure` if
* there is a failure signaled in the stream.
* The materialized value [[scala.concurrent.Future]] will be completed with the value `true` or `false`
* when the input stream ends, or completed with `Failure` if there is a failure signaled in the stream.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Completes when''' upstream completes or the predicate `p` returns `false`
*
* '''Backpressures when''' the invocation of predicate `p` has not yet completed
*
* '''Cancels when''' predicate `p` returns `false`
*/
def forall[T](predicate: T => Boolean): Sink[T, Future[Boolean]] =
Flow[T].foldWhile(true)(util.ConstantFun.scalaIdentityFunction)(_ && predicate(_))
def forall[T](p: T => Boolean): Sink[T, Future[Boolean]] =
Flow[T].foldWhile(true)(util.ConstantFun.scalaIdentityFunction)(_ && p(_))
.toMat(Sink.head)(Keep.right)
.named("forallSink")

Expand Down

0 comments on commit d837002

Please sign in to comment.