Skip to content

Commit 97eb4e7

Browse files
committed
chore: Make use of foldWhile and format.
1 parent 8a47af1 commit 97eb4e7

File tree

5 files changed

+18
-23
lines changed

5 files changed

+18
-23
lines changed

Diff for: docs/src/main/paradox/stream/operators/index.md

+7-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ These built-in sinks are available from @scala[`org.apache.pekko.stream.scaladsl
6060
|Sink|<a name="collection"></a>@ref[collection](Sink/collection.md)|@scala[Collect all values emitted from the stream into a collection.]@java[Operator only available in the Scala API. The closest operator in the Java API is @ref[`Sink.seq`](Sink/seq.md)].|
6161
|Sink|<a name="combine"></a>@ref[combine](Sink/combine.md)|Combine several sinks into one using a user specified strategy|
6262
|Sink|<a name="completionstagesink"></a>@ref[completionStageSink](Sink/completionStageSink.md)|Streams the elements to the given future sink once it successfully completes. |
63-
|Sink|<a name="fold"></a>@ref[fold](Sink/fold.md)|Fold over emitted element with a function, where each invocation will get the new element and the result from the previous fold invocation.|
63+
|Sink|<a name="fold"></a>@ref[fold](Sink/fold.md)|Fold over emitted elements with a function, where each invocation will get the new element and the result from the previous fold invocation.|
64+
|Sink|<a name="foldwhile"></a>@ref[foldWhile](Sink/foldWhile.md)|Fold over emitted elements with a function, where each invocation will get the new element and the result from the previous fold invocation.|
6465
|Sink|<a name="forall"></a>@ref[forall](Sink/forall.md)|Apply a predicate function to assert each element received, it returns true if all elements satisfy the assertion, otherwise it returns false.|
6566
|Sink|<a name="foreach"></a>@ref[foreach](Sink/foreach.md)|Invoke a given procedure for each element received.|
6667
|Sink|<a name="foreachasync"></a>@ref[foreachAsync](Sink/foreachAsync.md)|Invoke a given procedure asynchronously for each element received.|
@@ -145,6 +146,7 @@ depending on being backpressured by downstream or not.
145146
|--|--|--|
146147
|Flow|<a name="asflowwithcontext"></a>@ref[asFlowWithContext](Flow/asFlowWithContext.md)|Extracts context data from the elements of a `Flow` so that it can be turned into a `FlowWithContext` which can propagate that context per element along a stream.|
147148
|Source/Flow|<a name="collect"></a>@ref[collect](Source-or-Flow/collect.md)|Apply a partial function to each incoming element, if the partial function is defined for a value the returned value is passed downstream.|
149+
|Source/Flow|<a name="collectfirst"></a>@ref[collectFirst](Source-or-Flow/collectFirst.md)|Transform this stream by applying the given partial function to the first element on which the function is defined as it pass through this processing step, and cancel the upstream publisher after the first element is emitted.|
148150
|Source/Flow|<a name="collecttype"></a>@ref[collectType](Source-or-Flow/collectType.md)|Transform this stream by testing the type of each of the elements on which the element is an instance of the provided type as they pass through this processing step.|
149151
|Source/Flow|<a name="collectwhile"></a>@ref[collectWhile](Source-or-Flow/collectWhile.md)|Transform this stream by applying the given partial function to each of the elements on which the function is defined as they pass through this processing step, and cancel the upstream publisher after the partial function is not applied.|
150152
|Flow|<a name="completionstageflow"></a>@ref[completionStageFlow](Flow/completionStageFlow.md)|Streams the elements through the given future flow once it successfully completes.|
@@ -158,6 +160,7 @@ depending on being backpressured by downstream or not.
158160
|Flow|<a name="flattenoptional"></a>@ref[flattenOptional](Flow/flattenOptional.md)|Collect the value of `Optional` from all the elements passing through this flow , empty `Optional` is filtered out.|
159161
|Source/Flow|<a name="fold"></a>@ref[fold](Source-or-Flow/fold.md)|Start with current value `zero` and then apply the current and next value to the given function. When upstream completes, the current value is emitted downstream.|
160162
|Source/Flow|<a name="foldasync"></a>@ref[foldAsync](Source-or-Flow/foldAsync.md)|Just like `fold` but receives a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value.|
163+
|Source/Flow|<a name="foldwhile"></a>@ref[foldWhile](Source-or-Flow/foldWhile.md)|Start with current value `zero` and then apply the current and next value to the given function. When upstream completes or the predicate `p` returns `false`, the current value is emitted downstream.|
161164
|Source/Flow|<a name="frommaterializer"></a>@ref[fromMaterializer](Source-or-Flow/fromMaterializer.md)|Defer the creation of a `Source/Flow` until materialization and access `Materializer` and `Attributes`|
162165
|Flow|<a name="futureflow"></a>@ref[futureFlow](Flow/futureFlow.md)|Streams the elements through the given future flow once it successfully completes.|
163166
|Source/Flow|<a name="grouped"></a>@ref[grouped](Source-or-Flow/grouped.md)|Accumulate incoming events until the specified number of elements have been accumulated and then pass the collection of elements downstream.|
@@ -413,6 +416,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
413416
* [cancelled](Sink/cancelled.md)
414417
* [collect](Source-or-Flow/collect.md)
415418
* [collect](Sink/collect.md)
419+
* [collectFirst](Source-or-Flow/collectFirst.md)
416420
* [collection](Sink/collection.md)
417421
* [collectType](Source-or-Flow/collectType.md)
418422
* [collectWhile](Source-or-Flow/collectWhile.md)
@@ -452,6 +456,8 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
452456
* [fold](Source-or-Flow/fold.md)
453457
* [fold](Sink/fold.md)
454458
* [foldAsync](Source-or-Flow/foldAsync.md)
459+
* [foldWhile](Source-or-Flow/foldWhile.md)
460+
* [foldWhile](Sink/foldWhile.md)
455461
* [forall](Sink/forall.md)
456462
* [foreach](Sink/foreach.md)
457463
* [foreachAsync](Sink/foreachAsync.md)

Diff for: stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java

-2
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
import org.apache.pekko.NotUsed;
1818
import org.apache.pekko.japi.Pair;
1919
import org.apache.pekko.japi.function.Function;
20-
import org.apache.pekko.japi.function.Predicate;
21-
import org.apache.pekko.stream.*;
2220
import org.apache.pekko.stream.Attributes;
2321
import org.apache.pekko.stream.Graph;
2422
import org.apache.pekko.stream.StreamTest;

Diff for: stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package org.apache.pekko.stream.impl.fusing
1515

1616
import java.util.concurrent.TimeUnit.NANOSECONDS
17+
1718
import scala.annotation.nowarn
1819
import scala.annotation.tailrec
1920
import scala.collection.immutable

Diff for: stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala

+1-2
Original file line numberDiff line numberDiff line change
@@ -88,11 +88,10 @@ object Sink {
8888
* the final predicate evaluation when the input stream ends, or completed with `Failure` if
8989
* there is a failure signaled in the stream.
9090
*/
91-
def forall[T](predicate: function.Predicate[T]): javadsl.Sink[T, CompletionStage[java.lang.Boolean]] = {
91+
def forall[T](predicate: function.Predicate[T]): javadsl.Sink[T, CompletionStage[java.lang.Boolean]] =
9292
scaladsl.Sink.forall(predicate.test)
9393
.mapMaterializedValue(_.map(Boolean.box)(ExecutionContexts.parasitic))
9494
.toCompletionStage().asJava
95-
}
9695

9796
/**
9897
* Creates a sink which materializes into a ``CompletionStage`` which will be completed with a result of the Java ``Collector``

Diff for: stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala

+9-18
Original file line numberDiff line numberDiff line change
@@ -16,30 +16,22 @@ package org.apache.pekko.stream.scaladsl
1616
import scala.annotation.{ nowarn, tailrec }
1717
import scala.annotation.unchecked.uncheckedVariance
1818
import scala.collection.immutable
19-
import scala.concurrent.ExecutionContext
20-
import scala.concurrent.Future
21-
import scala.Option
22-
import scala.util.Failure
23-
import scala.util.Success
24-
import scala.util.Try
19+
import scala.concurrent.{ ExecutionContext, Future }
20+
import scala.util.{ Failure, Success, Try }
2521

2622
import org.apache.pekko
27-
import pekko.Done
28-
import pekko.NotUsed
29-
import pekko.actor.ActorRef
30-
import pekko.actor.Status
23+
import pekko.{ util, Done, NotUsed }
24+
import pekko.actor.{ ActorRef, Status }
3125
import pekko.annotation.InternalApi
3226
import pekko.dispatch.ExecutionContexts
3327
import pekko.stream._
3428
import pekko.stream.impl._
3529
import pekko.stream.impl.Stages.DefaultAttributes
3630
import pekko.stream.impl.fusing.GraphStages
37-
import pekko.stream.javadsl
3831
import pekko.stream.stage._
3932
import pekko.util.ccompat._
4033

41-
import org.reactivestreams.Publisher
42-
import org.reactivestreams.Subscriber
34+
import org.reactivestreams.{ Publisher, Subscriber }
4335

4436
/**
4537
* A `Sink` is a set of stream processing steps that has one open input.
@@ -456,11 +448,10 @@ object Sink {
456448
* the final predicate evaluation when the input stream ends, or completed with `Failure` if
457449
* there is a failure signaled in the stream.
458450
*/
459-
def forall[T](predicate: T => Boolean): Sink[T, Future[Boolean]] = {
460-
Flow[T].statefulMap(() => true)((state, ele) => (predicate(ele), state), bool => Some(bool))
461-
.takeWhile(s => s, inclusive = true)
462-
.toMat(Sink.last)(Keep.right)
463-
}
451+
def forall[T](predicate: T => Boolean): Sink[T, Future[Boolean]] =
452+
Flow[T].foldWhile(true)(util.ConstantFun.scalaIdentityFunction)(_ && predicate(_))
453+
.toMat(Sink.head)(Keep.right)
454+
.named("forallSink")
464455

465456
/**
466457
* A `Sink` that will invoke the given function for every received element, giving it its previous

0 commit comments

Comments
 (0)