Skip to content

Commit 6e84d1a

Browse files
committed
Merge branch 'develop'
2 parents b019b10 + 517a259 commit 6e84d1a

File tree

19 files changed

+415
-59
lines changed

19 files changed

+415
-59
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -542,6 +542,7 @@ public abstract interface class kotlinx/coroutines/experimental/channels/ActorSc
542542

543543
public final class kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel : kotlinx/coroutines/experimental/channels/AbstractSendChannel, kotlinx/coroutines/experimental/channels/BroadcastChannel {
544544
public fun <init> (I)V
545+
public fun cancel (Ljava/lang/Throwable;)Z
545546
public fun close (Ljava/lang/Throwable;)Z
546547
public final fun getCapacity ()I
547548
public fun open ()Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
@@ -566,12 +567,14 @@ public class kotlinx/coroutines/experimental/channels/ArrayChannel : kotlinx/cor
566567

567568
public abstract interface class kotlinx/coroutines/experimental/channels/BroadcastChannel : kotlinx/coroutines/experimental/channels/SendChannel {
568569
public static final field Factory Lkotlinx/coroutines/experimental/channels/BroadcastChannel$Factory;
570+
public abstract fun cancel (Ljava/lang/Throwable;)Z
569571
public abstract fun open ()Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
570572
public abstract fun openSubscription ()Lkotlinx/coroutines/experimental/channels/ReceiveChannel;
571573
public abstract synthetic fun openSubscription ()Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
572574
}
573575

574576
public final class kotlinx/coroutines/experimental/channels/BroadcastChannel$DefaultImpls {
577+
public static synthetic fun cancel$default (Lkotlinx/coroutines/experimental/channels/BroadcastChannel;Ljava/lang/Throwable;ILjava/lang/Object;)Z
575578
public static fun open (Lkotlinx/coroutines/experimental/channels/BroadcastChannel;)Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
576579
public static synthetic fun openSubscription (Lkotlinx/coroutines/experimental/channels/BroadcastChannel;)Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
577580
}
@@ -585,6 +588,13 @@ public final class kotlinx/coroutines/experimental/channels/BroadcastChannelKt {
585588
public static final synthetic fun use (Lkotlinx/coroutines/experimental/channels/ReceiveChannel;Lkotlin/jvm/functions/Function1;)V
586589
}
587590

591+
public final class kotlinx/coroutines/experimental/channels/BroadcastKt {
592+
public static final fun broadcast (Lkotlin/coroutines/experimental/CoroutineContext;ILkotlinx/coroutines/experimental/CoroutineStart;Lkotlinx/coroutines/experimental/Job;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/experimental/channels/BroadcastChannel;
593+
public static final fun broadcast (Lkotlinx/coroutines/experimental/channels/ReceiveChannel;ILkotlinx/coroutines/experimental/CoroutineStart;)Lkotlinx/coroutines/experimental/channels/BroadcastChannel;
594+
public static synthetic fun broadcast$default (Lkotlin/coroutines/experimental/CoroutineContext;ILkotlinx/coroutines/experimental/CoroutineStart;Lkotlinx/coroutines/experimental/Job;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/experimental/channels/BroadcastChannel;
595+
public static synthetic fun broadcast$default (Lkotlinx/coroutines/experimental/channels/ReceiveChannel;ILkotlinx/coroutines/experimental/CoroutineStart;ILjava/lang/Object;)Lkotlinx/coroutines/experimental/channels/BroadcastChannel;
596+
}
597+
588598
public abstract interface class kotlinx/coroutines/experimental/channels/Channel : kotlinx/coroutines/experimental/channels/ReceiveChannel, kotlinx/coroutines/experimental/channels/SendChannel {
589599
public static final field CONFLATED I
590600
public static final field Factory Lkotlinx/coroutines/experimental/channels/Channel$Factory;
@@ -715,6 +725,7 @@ public final class kotlinx/coroutines/experimental/channels/ConflatedBroadcastCh
715725
public static final field UNDEFINED Lkotlinx/coroutines/experimental/internal/Symbol;
716726
public fun <init> ()V
717727
public fun <init> (Ljava/lang/Object;)V
728+
public fun cancel (Ljava/lang/Throwable;)Z
718729
public fun close (Ljava/lang/Throwable;)Z
719730
public fun getOnSend ()Lkotlinx/coroutines/experimental/selects/SelectClause2;
720731
public final fun getValue ()Ljava/lang/Object;

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineStart.kt

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import kotlinx.coroutines.experimental.intrinsics.*
2121
import kotlin.coroutines.experimental.*
2222

2323
/**
24-
* Defines start option for coroutines builders.
24+
* Defines start options for coroutines builders.
2525
* It is used in `start` parameter of [launch], [async], and other coroutine builder functions.
2626
*
2727
* The summary of coroutine start options is:
@@ -42,7 +42,7 @@ public enum class CoroutineStart {
4242
* function, so starting coroutine with [Unconfined] dispatcher by [DEFAULT] is the same as using [UNDISPATCHED].
4343
*
4444
* If coroutine [Job] is cancelled before it even had a chance to start executing, then it will not start its
45-
* execution at all, but complete with an exception.
45+
* execution at all, but will complete with an exception.
4646
*
4747
* Cancellability of coroutine at suspension points depends on the particular implementation details of
4848
* suspending functions. Use [suspendCancellableCoroutine] to implement cancellable suspending functions.
@@ -56,12 +56,12 @@ public enum class CoroutineStart {
5656
* (like [launch] and [async]).
5757
*
5858
* If coroutine [Job] is cancelled before it even had a chance to start executing, then it will not start its
59-
* execution at all, but complete with an exception.
59+
* execution at all, but will complete with an exception.
6060
*/
6161
LAZY,
6262

6363
/**
64-
* Atomically (in non-cancellable way) schedules coroutine for execution according to its context.
64+
* Atomically (i.e., in a non-cancellable way) schedules coroutine for execution according to its context.
6565
* This is similar to [DEFAULT], but the coroutine cannot be cancelled before it starts executing.
6666
*
6767
* Cancellability of coroutine at suspension points depends on the particular implementation details of
@@ -70,12 +70,12 @@ public enum class CoroutineStart {
7070
ATOMIC,
7171

7272
/**
73-
* Immediately executes coroutine until its first suspension point _in the current thread_ as if it the
73+
* Immediately executes coroutine until its first suspension point _in the current thread_ as if the
7474
* coroutine was started using [Unconfined] dispatcher. However, when coroutine is resumed from suspension
7575
* it is dispatched according to the [CoroutineDispatcher] in its context.
7676
*
7777
* This is similar to [ATOMIC] in the sense that coroutine starts executing even if it was already cancelled,
78-
* but the difference is that it start executing in the same thread.
78+
* but the difference is that it starts executing in the same thread.
7979
*
8080
* Cancellability of coroutine at suspension points depends on the particular implementation details of
8181
* suspending functions as in [DEFAULT].

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ import kotlinx.coroutines.experimental.selects.*
2525
* Sender suspends only when buffer is full due to one of the receives being slow to consume and
2626
* receiver suspends only when buffer is empty.
2727
*
28-
* Note, that elements that are sent to the broadcast channel while there are no [openSubscription] subscribers are immediately
29-
* lost.
28+
* **Note**, that elements that are sent to this channel while there are no
29+
* [openSubscription] subscribers are immediately lost.
3030
*
3131
* This channel is created by `BroadcastChannel(capacity)` factory function invocation.
3232
*
@@ -68,17 +68,22 @@ class ArrayBroadcastChannel<E>(
6868
override val isBufferAlwaysFull: Boolean get() = false
6969
override val isBufferFull: Boolean get() = size >= capacity
7070

71-
override fun openSubscription(): ReceiveChannel<E> =
71+
public override fun openSubscription(): ReceiveChannel<E> =
7272
Subscriber(this).also {
7373
updateHead(addSub = it)
7474
}
7575

76-
override fun close(cause: Throwable?): Boolean {
76+
public override fun close(cause: Throwable?): Boolean {
7777
if (!super.close(cause)) return false
7878
checkSubOffers()
7979
return true
8080
}
8181

82+
public override fun cancel(cause: Throwable?): Boolean =
83+
close(cause).also {
84+
for (sub in subs) sub.cancel(cause)
85+
}
86+
8287
// result is `OFFER_SUCCESS | OFFER_FAILED | Closed`
8388
override fun offerInternal(element: E): Any {
8489
bufferLock.withLock {
@@ -210,8 +215,15 @@ class ArrayBroadcastChannel<E>(
210215
override fun cancel(cause: Throwable?): Boolean =
211216
close(cause).also { closed ->
212217
if (closed) broadcastChannel.updateHead(removeSub = this)
218+
clearBuffer()
213219
}
214220

221+
private fun clearBuffer() {
222+
subLock.withLock {
223+
subHead = broadcastChannel.tail
224+
}
225+
}
226+
215227
// returns true if subHead was updated and broadcast channel's head must be checked
216228
// this method is lock-free (it never waits on lock)
217229
@Suppress("UNCHECKED_CAST")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental.channels
18+
19+
import kotlinx.coroutines.experimental.*
20+
import kotlinx.coroutines.experimental.channels.Channel.Factory.CONFLATED
21+
import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
22+
import kotlinx.coroutines.experimental.intrinsics.*
23+
import kotlin.coroutines.experimental.*
24+
25+
/**
26+
* Broadcasts all elements of the channel.
27+
*
28+
* @param capacity capacity of the channel's buffer (1 by default).
29+
* @param start coroutine start option. The default value is [CoroutineStart.LAZY].
30+
*/
31+
fun <E> ReceiveChannel<E>.broadcast(
32+
capacity: Int = 1,
33+
start: CoroutineStart = CoroutineStart.LAZY
34+
) : BroadcastChannel<E> =
35+
broadcast(Unconfined, capacity = capacity, start = start, onCompletion = consumes()) {
36+
for (e in this@broadcast) {
37+
send(e)
38+
}
39+
}
40+
41+
/**
42+
* Launches new coroutine to produce a stream of values by sending them to a broadcast channel
43+
* and returns a reference to the coroutine as a [BroadcastChannel]. The resulting
44+
* object can be used to [subscribe][BroadcastChannel.openSubscription] to elements produced by this coroutine.
45+
*
46+
* The scope of the coroutine contains [ProducerScope] interface, which implements
47+
* both [CoroutineScope] and [SendChannel], so that coroutine can invoke
48+
* [send][SendChannel.send] directly. The channel is [closed][SendChannel.close]
49+
* when the coroutine completes.
50+
*
51+
* The [context] for the new coroutine can be explicitly specified.
52+
* See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
53+
* The [coroutineContext] of the parent coroutine may be used,
54+
* in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
55+
* The parent job may be also explicitly specified using [parent] parameter.
56+
*
57+
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
58+
*
59+
* Uncaught exceptions in this coroutine close the channel with this exception as a cause and
60+
* the resulting channel becomes _failed_, so that any attempt to receive from such a channel throws exception.
61+
*
62+
* The kind of the resulting channel depends on the specified [capacity] parameter:
63+
* * when `capacity` positive (1 by default), but less than [UNLIMITED] -- uses [ArrayBroadcastChannel]
64+
* * when `capacity` is [CONFLATED] -- uses [ConflatedBroadcastChannel] that conflates back-to-back sends;
65+
* * otherwise -- throws [IllegalArgumentException].
66+
*
67+
* **Note:** By default, the coroutine does not start until the first subscriber appears via [BroadcastChannel.openSubscription]
68+
* as [start] parameter has a value of [CoroutineStart.LAZY] by default.
69+
* This ensures that the first subscriber does not miss any sent elements.
70+
* However, later subscribers may miss elements.
71+
*
72+
* See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
73+
*
74+
* @param context context of the coroutine. The default value is [DefaultDispatcher].
75+
* @param capacity capacity of the channel's buffer (1 by default).
76+
* @param start coroutine start option. The default value is [CoroutineStart.LAZY].
77+
* @param parent explicitly specifies the parent job, overrides job from the [context] (if any).*
78+
* @param onCompletion optional completion handler for the producer coroutine (see [Job.invokeOnCompletion]).
79+
* @param block the coroutine code.
80+
*/
81+
public fun <E> broadcast(
82+
context: CoroutineContext = DefaultDispatcher,
83+
capacity: Int = 1,
84+
start: CoroutineStart = CoroutineStart.LAZY,
85+
parent: Job? = null,
86+
onCompletion: CompletionHandler? = null,
87+
block: suspend ProducerScope<E>.() -> Unit
88+
): BroadcastChannel<E> {
89+
val channel = BroadcastChannel<E>(capacity)
90+
val newContext = newCoroutineContext(context, parent)
91+
val coroutine = if (start.isLazy)
92+
LazyBroadcastCoroutine(newContext, channel, block) else
93+
BroadcastCoroutine(newContext, channel, active = true)
94+
if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
95+
coroutine.start(start, coroutine, block)
96+
return coroutine
97+
}
98+
99+
private open class BroadcastCoroutine<E>(
100+
parentContext: CoroutineContext,
101+
protected val _channel: BroadcastChannel<E>,
102+
active: Boolean
103+
) : AbstractCoroutine<Unit>(parentContext, active), ProducerScope<E>, BroadcastChannel<E> by _channel {
104+
override val channel: SendChannel<E>
105+
get() = this
106+
107+
public override fun cancel(cause: Throwable?): Boolean = super.cancel(cause)
108+
109+
override fun onCancellationInternal(exceptionally: CompletedExceptionally?) {
110+
val cause = exceptionally?.cause
111+
val processed = when (exceptionally) {
112+
is Cancelled -> _channel.cancel(cause) // producer coroutine was cancelled -- cancel channel
113+
else -> _channel.close(cause) // producer coroutine has completed -- close channel
114+
}
115+
if (!processed && cause != null)
116+
handleCoroutineException(context, cause)
117+
}
118+
119+
// Workaround for KT-23094
120+
override suspend fun send(element: E) = _channel.send(element)
121+
}
122+
123+
private class LazyBroadcastCoroutine<E>(
124+
parentContext: CoroutineContext,
125+
channel: BroadcastChannel<E>,
126+
private val block: suspend ProducerScope<E>.() -> Unit
127+
) : BroadcastCoroutine<E>(parentContext, channel, active = false) {
128+
override fun openSubscription(): ReceiveChannel<E> {
129+
// open subscription _first_
130+
val subscription = _channel.openSubscription()
131+
// then start coroutine
132+
start()
133+
return subscription
134+
}
135+
136+
override fun onStart() {
137+
block.startCoroutineCancellable(this, this)
138+
}
139+
}
140+

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannel.kt

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,25 @@ public interface BroadcastChannel<E> : SendChannel<E> {
6565
@Deprecated(message = "Renamed to `openSubscription`",
6666
replaceWith = ReplaceWith("openSubscription()"))
6767
public fun open(): SubscriptionReceiveChannel<E> = openSubscription() as SubscriptionReceiveChannel<E>
68+
69+
/**
70+
* Cancels reception of remaining elements from this channel. This function closes the channel with
71+
* the specified cause (unless it was already closed), removes all buffered sent elements from it,
72+
* and [cancels][ReceiveChannel.cancel] all open subscriptions.
73+
* This function returns `true` if the channel was not closed previously, or `false` otherwise.
74+
*
75+
* A channel that was cancelled with non-null [cause] is called a _failed_ channel. Attempts to send or
76+
* receive on a failed channel throw the specified [cause] exception.
77+
*/
78+
public fun cancel(cause: Throwable? = null): Boolean
6879
}
6980

7081
/**
7182
* Creates a broadcast channel with the specified buffer capacity.
7283
*
7384
* The resulting channel type depends on the specified [capacity] parameter:
74-
* * when `capacity` positive, but less than [UNLIMITED] -- creates [ArrayBroadcastChannel];
85+
* * when `capacity` positive, but less than [UNLIMITED] -- creates [ArrayBroadcastChannel]
86+
* **Note:** this channel looses all items that are send to it until the first subscriber appears;
7587
* * when `capacity` is [CONFLATED] -- creates [ConflatedBroadcastChannel] that conflates back-to-back sends;
7688
* * otherwise -- throws [IllegalArgumentException].
7789
*/

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,10 @@
1616

1717
package kotlinx.coroutines.experimental.channels
1818

19-
import kotlinx.coroutines.experimental.CancellationException
20-
import kotlinx.coroutines.experimental.CoroutineScope
21-
import kotlinx.coroutines.experimental.Job
19+
import kotlinx.coroutines.experimental.*
2220
import kotlinx.coroutines.experimental.channels.Channel.Factory.CONFLATED
2321
import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
24-
import kotlinx.coroutines.experimental.selects.SelectClause1
25-
import kotlinx.coroutines.experimental.selects.SelectClause2
26-
import kotlinx.coroutines.experimental.selects.select
27-
import kotlinx.coroutines.experimental.yield
22+
import kotlinx.coroutines.experimental.selects.*
2823

2924
/**
3025
* Sender's interface to [Channel].

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.common.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,8 @@ public suspend fun <E> BroadcastChannel<E>.consumeEach(action: suspend (E) -> Un
8282
* with the corresponding cause. See also [ReceiveChannel.consume].
8383
*
8484
* **WARNING**: It is planned that in the future a second invocation of this method
85-
* on an channel that is already being consumed is going to fail fast, that is
86-
* immediately throw an [IllegalStateException].
85+
* on an channel that is already being consumed is going to fail fast, that it
86+
* immediately throws an [IllegalStateException].
8787
* See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/167)
8888
* for details.
8989
*/

0 commit comments

Comments
 (0)