diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md
index d797f53942d..ae0d0c40284 100644
--- a/docs/src/main/paradox/stream/operators/index.md
+++ b/docs/src/main/paradox/stream/operators/index.md
@@ -61,7 +61,6 @@ These built-in sinks are available from @scala[`org.apache.pekko.stream.scaladsl
|Sink|@ref[combine](Sink/combine.md)|Combine several sinks into one using a user specified strategy|
|Sink|@ref[completionStageSink](Sink/completionStageSink.md)|Streams the elements to the given future sink once it successfully completes. |
|Sink|@ref[fold](Sink/fold.md)|Fold over emitted element with a function, where each invocation will get the new element and the result from the previous fold invocation.|
-|Sink|@ref[foreach](Sink/foreach.md)|Invoke a given procedure for each element received.|
|Sink|@ref[foreachAsync](Sink/foreachAsync.md)|Invoke a given procedure asynchronously for each element received.|
|Sink|@ref[foreachParallel](Sink/foreachParallel.md)|Like `foreach` but allows up to `parallellism` procedure calls to happen in parallel.|
|Sink|@ref[fromMaterializer](Sink/fromMaterializer.md)|Defer the creation of a `Sink` until materialization and access `Materializer` and `Attributes`|
@@ -446,7 +445,6 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [fold](Source-or-Flow/fold.md)
* [fold](Sink/fold.md)
* [foldAsync](Source-or-Flow/foldAsync.md)
-* [foreach](Sink/foreach.md)
* [foreachAsync](Sink/foreachAsync.md)
* [foreachParallel](Sink/foreachParallel.md)
* [from](Source/from.md)
diff --git a/project/StreamOperatorsIndexGenerator.scala b/project/StreamOperatorsIndexGenerator.scala
index 1c4974ef318..31c9c309ce5 100644
--- a/project/StreamOperatorsIndexGenerator.scala
+++ b/project/StreamOperatorsIndexGenerator.scala
@@ -67,6 +67,10 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
"actorPublisher",
"addAttributes",
"mapMaterializedValue",
+ // for comprehensions
+ "withFilter",
+ "flatMap",
+ "foreach",
// *Graph:
"concatGraph",
"prependGraph",
@@ -108,7 +112,7 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
"foldAsync",
"newOnCompleteStage"))
- val ignore =
+ val ignore = {
Set("equals", "hashCode", "notify", "notifyAll", "wait", "toString", "getClass") ++
Set("productArity", "canEqual", "productPrefix", "copy", "productIterator", "productElement") ++
Set(
@@ -123,6 +127,7 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
"transformMaterializing") ++
Set("asScala", "asJava", "deprecatedAndThen", "deprecatedAndThenMat") ++
Set("++", "onPush", "onPull", "actorRefWithAck")
+ }
def isPending(element: String, opName: String) =
pendingTestCases.get(element).exists(_.contains(opName))
diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/DslConsistencySpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/DslConsistencySpec.scala
index 557609e2886..d3188f9053b 100755
--- a/stream-tests/src/test/scala/org/apache/pekko/stream/DslConsistencySpec.scala
+++ b/stream-tests/src/test/scala/org/apache/pekko/stream/DslConsistencySpec.scala
@@ -91,12 +91,16 @@ class DslConsistencySpec extends AnyWordSpec with Matchers {
"orElseGraph",
"divertToGraph")
+ val forComprehensions = Set("withFilter", "flatMap", "foreach")
+
val allowMissing: Map[Class[_], Set[String]] = Map(
- jFlowClass -> graphHelpers,
- jSourceClass -> (graphHelpers ++ Set("watch", "ask")),
+ jFlowClass -> (graphHelpers ++ forComprehensions),
+ jSourceClass -> (graphHelpers ++ forComprehensions ++ Set("watch", "ask")),
// Java subflows can only be nested using .via and .to (due to type system restrictions)
- jSubFlowClass -> (graphHelpers ++ Set("groupBy", "splitAfter", "splitWhen", "subFlow", "watch", "ask")),
- jSubSourceClass -> (graphHelpers ++ Set("groupBy", "splitAfter", "splitWhen", "subFlow", "watch", "ask")),
+ jSubFlowClass -> (graphHelpers ++ forComprehensions ++ Set("groupBy", "splitAfter", "splitWhen", "subFlow", "watch",
+ "ask")),
+ jSubSourceClass -> (graphHelpers ++ forComprehensions ++ Set("groupBy", "splitAfter", "splitWhen", "subFlow",
+ "watch", "ask")),
sFlowClass -> Set("of"),
sSourceClass -> Set("adapt", "from", "watch"),
sSinkClass -> Set("adapt"),
diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/ForComprehensionsCompileSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/ForComprehensionsCompileSpec.scala
new file mode 100644
index 00000000000..4106282c6b4
--- /dev/null
+++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/ForComprehensionsCompileSpec.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.scaladsl
+
+import org.apache.pekko
+import pekko.Done
+import pekko.japi.Util
+import pekko.stream.testkit.StreamSpec
+import pekko.stream.testkit.scaladsl.TestSink
+
+import java.util.concurrent.CopyOnWriteArrayList
+import scala.concurrent.Await
+import scala.concurrent.duration.DurationInt
+
+class ForComprehensionsCompileSpec extends StreamSpec {
+ "A Source" must {
+ "be able to be used in a for comprehension which yield" in {
+ val source = Source(1 to 5)
+ val evenSource = for {
+ i <- source if i % 2 == 0
+ } yield i.toString
+ evenSource.runWith(TestSink[String]())
+ .request(5)
+ .expectNextN(List("2", "4"))
+ .expectComplete()
+ }
+
+ "be able to be used in a for comprehension which flatMap" in {
+ val source = Source(1 to 5)
+ val evenSource = for {
+ i <- source if i % 2 == 0
+ j <- Source.lazySingle(() => i)
+ str = j.toString
+ } yield str
+ evenSource.runWith(TestSink[String]())
+ .request(5)
+ .expectNextN(List("2", "4"))
+ .expectComplete()
+ }
+
+ "be able to be used in a for comprehension which yield a runnable graph" in {
+ val source = Source(1 to 5)
+ val list = new CopyOnWriteArrayList[String]()
+ val future = (for (i <- source if i % 2 == 0) {
+ list.add(i.toString)
+ }).run()
+
+ Await.result(future, 3.seconds) shouldBe Done
+ Util.immutableSeq(list) shouldBe List("2", "4")
+ }
+
+ "be able to be used in a for comprehension which with Flow" in {
+ (for {
+ i <- Source(1 to 20) if i % 2 == 0
+ j <- Source.lazySingle(() => i)
+ str = j.toString
+ } yield str)
+ .via(for {
+ str <- Flow[String] if str.length > 1
+ doubleStr = str + str
+ number <- Source.lazySingle(() => doubleStr)
+ } yield number.toInt)
+ .runWith(TestSink[Int]())
+ .request(6)
+ .expectNextN(List(1010, 1212, 1414, 1616, 1818, 2020))
+ .expectComplete()
+ }
+ }
+
+ "A Flow" must {
+ "be able to be used in a for comprehension which yield" in {
+ Source(1 to 5).via(for (i <- Flow[Int] if i % 2 == 0) yield i.toString)
+ .runWith(TestSink[String]())
+ .request(5)
+ .expectNextN(List("2", "4"))
+ .expectComplete()
+ }
+
+ "be able to be used in a for comprehension which flatmap" in {
+ Source(1 to 5).via(for {
+ i <- Flow[Int] if i % 2 == 0
+ j <- Source.single(i)
+ str = j.toString
+ } yield str)
+ .runWith(TestSink[String]())
+ .request(5)
+ .expectNextN(List("2", "4"))
+ .expectComplete()
+ }
+
+ "be able to be used in a for comprehension which yield a sink" in {
+ val source = Source(1 to 5)
+ val list = new CopyOnWriteArrayList[String]()
+ val sink = for (i <- Flow[Int] if i % 2 == 0) {
+ list.add(i.toString)
+ }
+ val future = source.runWith(sink)
+ Await.result(future, 3.seconds) shouldBe Done
+ Util.immutableSeq(list) shouldBe List("2", "4")
+ }
+ }
+}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
index 9452ffc972e..6bb973aff54 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
@@ -1383,6 +1383,16 @@ trait FlowOps[+Out, +Mat] {
*/
def filter(p: Out => Boolean): Repr[Out] = via(Filter(p))
+ /**
+ * Alias for [[filter]], added to enable filtering in for comprehensions.
+ *
+ * NOTE: Support for `for` comprehensions is still experimental and it's possible that we might need to change
+ * the internal implementation.
+ * @since 1.1.0
+ */
+ @ApiMayChange
+ def withFilter(p: Out => Boolean): Repr[Out] = filter(p)
+
/**
* Only pass on those elements that NOT satisfy the given predicate.
*
@@ -2521,6 +2531,16 @@ trait FlowOps[+Out, +Mat] {
*/
def flatMapConcat[T, M](f: Out => Graph[SourceShape[T], M]): Repr[T] = map(f).via(new FlattenMerge[T, M](1))
+ /**
+ * Alias for [[flatMapConcat]], added to enable for comprehensions.
+ *
+ * NOTE: Support for `for` comprehensions is still experimental and it's possible that we might need to change
+ * the internal implementation.
+ * @since 1.1.0
+ */
+ @ApiMayChange
+ def flatMap[T, M](f: Out => Graph[SourceShape[T], M]): Repr[T] = flatMapConcat(f)
+
/**
* Transform each input element into a `Source` of output elements that is
* then flattened into the output stream by merging, where at most `breadth`
@@ -3732,6 +3752,17 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
*/
def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) => Mat3): ClosedMat[Mat3]
+ /**
+ * Connect this [[Flow]] to a `foreach` [[Sink]], that will invoke the given procedure for each received element.
+ * Added to enable for comprehensions.
+ *
+ * NOTE: Support for `for` comprehensions is still experimental and it's possible that we might need to change
+ * the internal implementation.
+ * @since 1.1.0
+ */
+ @ApiMayChange
+ def foreach(f: Out => Unit): ClosedMat[Future[Done]] = toMat(Sink.foreach(f))(Keep.right)
+
/**
* mat version of [[#flatMapPrefix]], this method gives access to a future materialized value of the downstream flow.
* see [[#flatMapPrefix]] for details.