Skip to content

Commit 86349be

Browse files
committed
Introduced consumeEach for channels and reactive streams, deprecated iteration on reactive streams
1 parent 9ade16a commit 86349be

36 files changed

+377
-386
lines changed

coroutines-guide.md

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1190,7 +1190,8 @@ This is a part of _producer-consumer_ pattern that is often found in concurrent
11901190
You could abstract such a producer into a function that takes channel as its parameter, but this goes contrary
11911191
to common sense that results must be returned from functions.
11921192

1193-
There is a convenience coroutine builder named [produce] that makes it easy to do it right:
1193+
There is a convenience coroutine builder named [produce] that makes it easy to do it right on producer side,
1194+
and an extension function [consumeEach], that can replace a `for` loop on the consumer side:
11941195

11951196
```kotlin
11961197
fun produceSquares() = produce<Int>(CommonPool) {
@@ -1199,7 +1200,7 @@ fun produceSquares() = produce<Int>(CommonPool) {
11991200

12001201
fun main(args: Array<String>) = runBlocking<Unit> {
12011202
val squares = produceSquares()
1202-
for (y in squares) println(y)
1203+
squares.consumeEach { println(it) }
12031204
println("Done!")
12041205
}
12051206
```
@@ -1363,8 +1364,8 @@ received number:
13631364

13641365
```kotlin
13651366
fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch(CommonPool) {
1366-
for (x in channel) {
1367-
println("Processor #$id received $x")
1367+
channel.consumeEach {
1368+
println("Processor #$id received $it")
13681369
}
13691370
}
13701371
```
@@ -1988,10 +1989,10 @@ Consumer is going to be quite slow, taking 250 ms to process each number:
19881989
fun main(args: Array<String>) = runBlocking<Unit> {
19891990
val side = Channel<Int>() // allocate side channel
19901991
launch(context) { // this is a very fast consumer for the side channel
1991-
for (num in side) println("Side channel has $num")
1992+
side.consumeEach { println("Side channel has $it") }
19921993
}
1993-
for (num in produceNumbers(side)) {
1994-
println("Consuming $num")
1994+
produceNumbers(side).consumeEach {
1995+
println("Consuming $it")
19951996
delay(250) // let us digest the consumed number properly, do not hurry
19961997
}
19971998
println("Done consuming")
@@ -2193,6 +2194,7 @@ Channel was closed
21932194
[ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/receive.html
21942195
[SendChannel.close]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/close.html
21952196
[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/produce.html
2197+
[consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/consume-each.html
21962198
[Channel.invoke]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel/invoke.html
21972199
[actor]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/actor.html
21982200
<!--- INDEX kotlinx.coroutines.experimental.selects -->
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental.channels
18+
19+
/**
20+
* Performs the given [action] on each received element.
21+
*/
22+
// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
23+
public suspend fun <E> ReceiveChannel<E>.consumeEach(action: suspend (E) -> Unit) {
24+
for (element in this) action(element)
25+
}

kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-03.kt

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,17 @@
1717
// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
1818
package guide.channel.example03
1919

20-
import kotlinx.coroutines.experimental.*
21-
import kotlinx.coroutines.experimental.channels.*
20+
import kotlinx.coroutines.experimental.CommonPool
21+
import kotlinx.coroutines.experimental.channels.consumeEach
22+
import kotlinx.coroutines.experimental.channels.produce
23+
import kotlinx.coroutines.experimental.runBlocking
2224

2325
fun produceSquares() = produce<Int>(CommonPool) {
2426
for (x in 1..5) send(x * x)
2527
}
2628

2729
fun main(args: Array<String>) = runBlocking<Unit> {
2830
val squares = produceSquares()
29-
for (y in squares) println(y)
31+
squares.consumeEach { println(it) }
3032
println("Done!")
3133
}

kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-06.kt

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,13 @@
1717
// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
1818
package guide.channel.example06
1919

20-
import kotlinx.coroutines.experimental.*
21-
import kotlinx.coroutines.experimental.channels.*
20+
import kotlinx.coroutines.experimental.CommonPool
21+
import kotlinx.coroutines.experimental.channels.ReceiveChannel
22+
import kotlinx.coroutines.experimental.channels.consumeEach
23+
import kotlinx.coroutines.experimental.channels.produce
24+
import kotlinx.coroutines.experimental.delay
25+
import kotlinx.coroutines.experimental.launch
26+
import kotlinx.coroutines.experimental.runBlocking
2227

2328
fun produceNumbers() = produce<Int>(CommonPool) {
2429
var x = 1 // start from 1
@@ -29,8 +34,8 @@ fun produceNumbers() = produce<Int>(CommonPool) {
2934
}
3035

3136
fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch(CommonPool) {
32-
for (x in channel) {
33-
println("Processor #$id received $x")
37+
channel.consumeEach {
38+
println("Processor #$id received $it")
3439
}
3540
}
3641

kotlinx-coroutines-core/src/test/kotlin/guide/example-select-03.kt

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,15 @@
1717
// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
1818
package guide.select.example03
1919

20-
import kotlinx.coroutines.experimental.*
21-
import kotlinx.coroutines.experimental.channels.*
22-
import kotlinx.coroutines.experimental.selects.*
20+
import kotlinx.coroutines.experimental.CommonPool
21+
import kotlinx.coroutines.experimental.channels.Channel
22+
import kotlinx.coroutines.experimental.channels.SendChannel
23+
import kotlinx.coroutines.experimental.channels.consumeEach
24+
import kotlinx.coroutines.experimental.channels.produce
25+
import kotlinx.coroutines.experimental.delay
26+
import kotlinx.coroutines.experimental.launch
27+
import kotlinx.coroutines.experimental.runBlocking
28+
import kotlinx.coroutines.experimental.selects.select
2329

2430
fun produceNumbers(side: SendChannel<Int>) = produce<Int>(CommonPool) {
2531
for (num in 1..10) { // produce 10 numbers from 1 to 10
@@ -34,10 +40,10 @@ fun produceNumbers(side: SendChannel<Int>) = produce<Int>(CommonPool) {
3440
fun main(args: Array<String>) = runBlocking<Unit> {
3541
val side = Channel<Int>() // allocate side channel
3642
launch(context) { // this is a very fast consumer for the side channel
37-
for (num in side) println("Side channel has $num")
43+
side.consumeEach { println("Side channel has $it") }
3844
}
39-
for (num in produceNumbers(side)) {
40-
println("Consuming $num")
45+
produceNumbers(side).consumeEach {
46+
println("Consuming $it")
4147
delay(250) // let us digest the consumed number properly, do not hurry
4248
}
4349
println("Done consuming")

0 commit comments

Comments
 (0)