Skip to content

Commit 898b293

Browse files
committed
feat: Add Sink.none operator
1 parent 2b91612 commit 898b293

File tree

8 files changed

+228
-0
lines changed

8 files changed

+228
-0
lines changed
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
# Sink.none
2+
3+
A `Sink` that will test the given predicate `p` for every received element and completes with the result.
4+
5+
@ref[Sink operators](../index.md#sink-operators)
6+
7+
## Signature
8+
9+
@apidoc[Sink.none](Sink$) { scala="#none[T](p:T=%3EBoolean):org.apache.pekko.stream.scaladsl.Sink[T,scala.concurrent.Future[Boolean]]" java="#none(org.apache.pekko.japi.function.Predicate)" }
10+
11+
## Description
12+
none operator applies a predicate function to assert each element received, it returns false if any element satisfy the assertion, otherwise it returns true.
13+
14+
It materializes into a `Future` (in Scala) or a `CompletionStage` (in Java) that completes with the last state when the stream has finished.
15+
16+
Notes that if source is empty, it will return true
17+
18+
A `Sink` that will test the given predicate `p` for every received element and
19+
20+
- completes and returns @scala[`Future`] @java[`CompletionStage`] of `true` if the predicate is false for all elements;
21+
- completes and returns @scala[`Future`] @java[`CompletionStage`] of `true` if the stream is empty (i.e. completes before signalling any elements);
22+
- completes and returns @scala[`Future`] @java[`CompletionStage`] of `false` if the predicate is true for any element.
23+
24+
The materialized value @scala[`Future`] @java[`CompletionStage`] will be completed with the value `true` or `false`
25+
when the input stream ends, or completed with `Failure` if there is a failure signaled in the stream.
26+
27+
## Example
28+
29+
This example tests all elements in the stream is `<=` 100.
30+
31+
Scala
32+
: @@snip [ForAll.scala](/docs/src/test/scala/docs/stream/operators/sink/NoneMatch.scala) { #none }
33+
34+
Java
35+
: @@snip [ForAll.java](/docs/src/test/java/jdocs/stream/operators/sink/NoneMatch.java) { #none }
36+
37+
## Reactive Streams Semantics
38+
39+
@@@div { .callout }
40+
41+
***Completes*** when upstream completes or the predicate `p` returns `true`
42+
43+
**cancels** when predicate `p` returns `true`
44+
45+
**backpressures** when the invocation of predicate `p` has not yet completed
46+
47+
@@@

docs/src/main/paradox/stream/operators/index.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ These built-in sinks are available from @scala[`org.apache.pekko.stream.scaladsl
8282
|Sink|<a name="lazyinitasync"></a>@ref[lazyInitAsync](Sink/lazyInitAsync.md)|Deprecated by @ref[`Sink.lazyFutureSink`](Sink/lazyFutureSink.md).|
8383
|Sink|<a name="lazysink"></a>@ref[lazySink](Sink/lazySink.md)|Defers creation and materialization of a `Sink` until there is a first element.|
8484
|Sink|<a name="never"></a>@ref[never](Sink/never.md)|Always backpressure never cancel and never consume any elements from the stream.|
85+
|Sink|<a name="none"></a>@ref[none](Sink/none.md)|A `Sink` that will test the given predicate `p` for every received element and completes with the result.|
8586
|Sink|<a name="oncomplete"></a>@ref[onComplete](Sink/onComplete.md)|Invoke a callback when the stream has completed or failed.|
8687
|Sink|<a name="prematerialize"></a>@ref[preMaterialize](Sink/preMaterialize.md)|Materializes this Sink, immediately returning (1) its materialized value, and (2) a new Sink that can be consume elements 'into' the pre-materialized one.|
8788
|Sink|<a name="queue"></a>@ref[queue](Sink/queue.md)|Materialize a `SinkQueue` that can be pulled to trigger demand through the sink.|
@@ -555,6 +556,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
555556
* [monitor](Source-or-Flow/monitor.md)
556557
* [never](Source/never.md)
557558
* [never](Sink/never.md)
559+
* [none](Sink/none.md)
558560
* [onComplete](Sink/onComplete.md)
559561
* [onErrorComplete](Source-or-Flow/onErrorComplete.md)
560562
* [onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md)
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package jdocs.stream.operators.sink;
19+
20+
import org.apache.pekko.actor.ActorSystem;
21+
import org.apache.pekko.stream.javadsl.Sink;
22+
import org.apache.pekko.stream.javadsl.Source;
23+
24+
import java.util.concurrent.TimeUnit;
25+
26+
public class NoneMatch {
27+
private ActorSystem system = null;
28+
29+
public void noneUsage() throws Exception {
30+
// #none
31+
final boolean noneMatch =
32+
Source.range(1, 100)
33+
.runWith(Sink.none(elem -> elem > 100), system)
34+
.toCompletableFuture()
35+
.get(3, TimeUnit.SECONDS);
36+
System.out.println(noneMatch);
37+
// Expect prints:
38+
// true
39+
// #none
40+
}
41+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package docs.stream.operators.sink
19+
20+
import org.apache.pekko.actor.ActorSystem
21+
import org.apache.pekko.stream.scaladsl.{ Sink, Source }
22+
23+
import scala.concurrent.duration.DurationInt
24+
import scala.concurrent.{ Await, ExecutionContextExecutor, Future }
25+
26+
object NoneMatch {
27+
implicit val system: ActorSystem = ???
28+
implicit val ec: ExecutionContextExecutor = system.dispatcher
29+
def noneExample(): Unit = {
30+
// #none
31+
val result: Future[Boolean] =
32+
Source(1 to 100)
33+
.runWith(Sink.none(_ > 100))
34+
val noneMatch = Await.result(result, 3.seconds)
35+
println(noneMatch)
36+
// Expect prints:
37+
// true
38+
// #none
39+
}
40+
}

stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,15 @@ public void sinkMustBeAbleToUseForall()
246246
assertTrue(allMatch);
247247
}
248248

249+
@Test
250+
public void sinkMustBeAbleToUseNoneMatch()
251+
throws InterruptedException, ExecutionException, TimeoutException {
252+
CompletionStage<Boolean> cs =
253+
Source.from(Arrays.asList(1, 2, 3, 4)).runWith(Sink.none(param -> param < 0), system);
254+
boolean noneMatch = cs.toCompletableFuture().get(100, TimeUnit.MILLISECONDS);
255+
assertTrue(noneMatch);
256+
}
257+
249258
@Test
250259
public void sinkMustBeAbleToUseForExists()
251260
throws InterruptedException, ExecutionException, TimeoutException {

stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,46 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
381381

382382
}
383383

384+
"The none sink" must {
385+
386+
"completes with `ture` when all elements not match" in {
387+
Source(1 to 4)
388+
.runWith(Sink.none(_ < 0))
389+
.futureValue shouldBe true
390+
}
391+
392+
"completes with `false` when any element match" in {
393+
Source(1 to 4)
394+
.runWith(Sink.none(_ > 2))
395+
.futureValue shouldBe false
396+
}
397+
398+
"completes with `true` if the stream is empty" in {
399+
Source.empty[Int]
400+
.runWith(Sink.none(_ > 2))
401+
.futureValue shouldBe true
402+
}
403+
404+
"completes with `Failure` if the stream failed" in {
405+
Source.failed[Int](new RuntimeException("Oops"))
406+
.runWith(Sink.none(_ > 2))
407+
.failed.futureValue shouldBe a[RuntimeException]
408+
}
409+
410+
"completes with `false` with restart strategy" in {
411+
val sink = Sink.none[Int](elem => {
412+
if (elem == 2) {
413+
throw new RuntimeException("Oops")
414+
}
415+
elem > 1
416+
}).withAttributes(supervisionStrategy(Supervision.restartingDecider))
417+
418+
Source(1 to 3)
419+
.runWith(sink)
420+
.futureValue shouldBe false
421+
}
422+
}
423+
384424
"The exists sink" must {
385425

386426
"completes with `false` when none element match" in {

stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,31 @@ object Sink {
101101
.mapMaterializedValue(_.map(Boolean.box)(ExecutionContexts.parasitic).asJava))
102102
}
103103

104+
/**
105+
* A `Sink` that will test the given predicate `p` for every received element and
106+
* 1. completes and returns [[java.util.concurrent.CompletionStage]] of `true` if the predicate is false for all elements;
107+
* 2. completes and returns [[java.util.concurrent.CompletionStage]] of `true` if the stream is empty (i.e. completes before signalling any elements);
108+
* 3. completes and returns [[java.util.concurrent.CompletionStage]] of `false` if the predicate is true for any element.
109+
*
110+
* The materialized value [[java.util.concurrent.CompletionStage]] will be completed with the value `true` or `false`
111+
* when the input stream ends, or completed with `Failure` if there is a failure signaled in the stream.
112+
*
113+
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
114+
*
115+
* '''Completes when''' upstream completes or the predicate `p` returns `true`
116+
*
117+
* '''Backpressures when''' the invocation of predicate `p` has not yet completed
118+
*
119+
* '''Cancels when''' predicate `p` returns `true`
120+
*
121+
* @since 1.1.3
122+
*/
123+
def none[In](p: function.Predicate[In]): javadsl.Sink[In, CompletionStage[java.lang.Boolean]] = {
124+
import pekko.util.FutureConverters._
125+
new Sink(scaladsl.Sink.none[In](p.test)
126+
.mapMaterializedValue(_.map(Boolean.box)(ExecutionContexts.parasitic).asJava))
127+
}
128+
104129
/**
105130
* A `Sink` that will test the given predicate `p` for every received element and
106131
* 1. completes and returns [[java.util.concurrent.CompletionStage]] of `true` if the predicate is true for any element;

stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,30 @@ object Sink {
470470
.toMat(Sink.head)(Keep.right)
471471
.named("forallSink")
472472

473+
/**
474+
* A `Sink` that will test the given predicate `p` for every received element and
475+
* 1. completes and returns [[scala.concurrent.Future]] of `true` if the predicate is false for all elements;
476+
* 2. completes and returns [[scala.concurrent.Future]] of `true` if the stream is empty (i.e. completes before signalling any elements);
477+
* 3. completes and returns [[scala.concurrent.Future]] of `false` if the predicate is true for any element.
478+
*
479+
* The materialized value [[scala.concurrent.Future]] will be completed with the value `true` or `false`
480+
* when the input stream ends, or completed with `Failure` if there is a failure signaled in the stream.
481+
*
482+
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
483+
*
484+
* '''Completes when''' upstream completes or the predicate `p` returns `true`
485+
*
486+
* '''Backpressures when''' the invocation of predicate `p` has not yet completed
487+
*
488+
* '''Cancels when''' predicate `p` returns `true`
489+
*
490+
* @since 1.1.3
491+
*/
492+
def none[T](p: T => Boolean): Sink[T, Future[Boolean]] =
493+
Flow[T].foldWhile(true)(util.ConstantFun.scalaIdentityFunction)(_ && !p(_))
494+
.toMat(Sink.head)(Keep.right)
495+
.named("noneSink")
496+
473497
/**
474498
* A `Sink` that will test the given predicate `p` for every received element and
475499
* 1. completes and returns [[scala.concurrent.Future]] of `true` if the predicate is true for any element;

0 commit comments

Comments
 (0)