Skip to content

Commit d84dbc2

Browse files
committed
Jon.onJoin select clause is implemented, lazy onAwait/onJoin & dispatch fixed
1 parent a84730b commit d84dbc2

File tree

10 files changed

+238
-37
lines changed

10 files changed

+238
-37
lines changed

kotlinx-coroutines-core/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ Top-level suspending functions:
4141

4242
| **Receiver** | **Suspending function** | **Select clause** | **Non-suspending version**
4343
| ---------------- | --------------------------------------------- | ------------------------------------------------ | --------------------------
44+
| [Job] | [join][Job.join] | [onJoin][SelectBuilder.onJoin] | [isCompleted][Job.isCompleted]
4445
| [Deferred] | [await][Deferred.await] | [onAwait][kotlinx.coroutines.experimental.selects.SelectBuilder.onAwait] | [isCompleted][Job.isCompleted]
4546
| [SendChannel][kotlinx.coroutines.experimental.channels.SendChannel] | [send][kotlinx.coroutines.experimental.channels.SendChannel.send] | [onSend][kotlinx.coroutines.experimental.selects.SelectBuilder.onSend] | [offer][kotlinx.coroutines.experimental.channels.SendChannel.offer]
4647
| [ReceiveChannel][kotlinx.coroutines.experimental.channels.ReceiveChannel] | [receive][kotlinx.coroutines.experimental.channels.ReceiveChannel.receive] | [onReceive][kotlinx.coroutines.experimental.selects.SelectBuilder.onReceive] | [poll][kotlinx.coroutines.experimental.channels.ReceiveChannel.poll]

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,10 @@ private class RemoveOnCancel(
149149
override fun toString() = "RemoveOnCancel[$node]"
150150
}
151151

152+
internal const val MODE_DISPATCHED = 0
153+
internal const val MODE_UNDISPATCHED = 1
154+
internal const val MODE_DIRECT = 2
155+
152156
@PublishedApi
153157
internal open class CancellableContinuationImpl<in T>(
154158
@JvmField
@@ -170,9 +174,6 @@ internal open class CancellableContinuationImpl<in T>(
170174
const val SUSPENDED = 1
171175
const val RESUMED = 2
172176

173-
const val MODE_UNDISPATCHED = 1
174-
const val MODE_DIRECT = 2
175-
176177
@Suppress("UNCHECKED_CAST")
177178
fun <T> getSuccessfulResult(state: Any?): T = if (state is CompletedIdempotentResult) state.result as T else state as T
178179
}
@@ -228,7 +229,7 @@ internal open class CancellableContinuationImpl<in T>(
228229
}
229230

230231
override fun completeResume(token: Any) {
231-
completeUpdateState(token, state, mode = 0)
232+
completeUpdateState(token, state, defaultResumeMode())
232233
}
233234

234235
override fun afterCompletion(state: Any?, mode: Int) {
@@ -238,7 +239,7 @@ internal open class CancellableContinuationImpl<in T>(
238239
if (state is CompletedExceptionally) {
239240
val exception = state.exception
240241
when (mode) {
241-
0 -> delegate.resumeWithException(exception)
242+
MODE_DISPATCHED -> delegate.resumeWithException(exception)
242243
MODE_UNDISPATCHED -> (delegate as DispatchedContinuation).resumeUndispatchedWithException(exception)
243244
MODE_DIRECT -> {
244245
if (delegate is DispatchedContinuation)
@@ -251,7 +252,7 @@ internal open class CancellableContinuationImpl<in T>(
251252
} else {
252253
val value = getSuccessfulResult<T>(state)
253254
when (mode) {
254-
0 -> delegate.resume(value)
255+
MODE_DISPATCHED -> delegate.resume(value)
255256
MODE_UNDISPATCHED -> (delegate as DispatchedContinuation).resumeUndispatched(value)
256257
MODE_DIRECT -> {
257258
if (delegate is DispatchedContinuation)

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public abstract class AbstractCoroutine<in T>(
6767

6868
protected open fun createContext() = parentContext + this
6969

70-
protected open fun defaultResumeMode(): Int = 0
70+
protected open fun defaultResumeMode(): Int = MODE_DISPATCHED
7171

7272
final override fun resume(value: T) = resume(value, defaultResumeMode())
7373

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package kotlinx.coroutines.experimental
1919
import kotlinx.coroutines.experimental.intrinsics.startUndispatchedCoroutine
2020
import kotlinx.coroutines.experimental.selects.SelectBuilder
2121
import kotlinx.coroutines.experimental.selects.SelectInstance
22+
import kotlinx.coroutines.experimental.selects.select
2223
import kotlin.coroutines.experimental.CoroutineContext
2324
import kotlin.coroutines.experimental.startCoroutine
2425

@@ -66,6 +67,9 @@ public interface Deferred<out T> : Job {
6667
* This suspending function is cancellable.
6768
* If the [Job] of the current coroutine is completed while this suspending function is waiting, this function
6869
* immediately resumes with [CancellationException].
70+
*
71+
* This function can be used in [select] invocation with [onAwait][SelectBuilder.onAwait] clause.
72+
* Use [isCompleted] to check for completion of this deferred value without waiting.
6973
*/
7074
public suspend fun await(): T
7175

@@ -161,22 +165,37 @@ private open class DeferredCoroutine<T>(
161165
})
162166
}
163167

168+
@Suppress("UNCHECKED_CAST")
164169
override fun <R> registerSelectAwait(select: SelectInstance<R>, block: suspend (T) -> R) {
165-
if (select.isSelected) return
166-
val state = this.state
167-
if (state is Incomplete) {
168-
select.unregisterOnCompletion(invokeOnCompletion(SelectOnCompletion(this, select, block)))
169-
} else
170-
selectCompletion(select, block, state)
170+
// fast-path -- check state and select/return if needed
171+
while (true) {
172+
if (select.isSelected) return
173+
val state = this.state
174+
if (state !is Incomplete) {
175+
// already complete -- select result
176+
if (select.trySelect(idempotent = null)) {
177+
if (state is CompletedExceptionally)
178+
select.resumeSelectWithException(state.exception, MODE_DIRECT)
179+
else
180+
block.startUndispatchedCoroutine(state as T, select.completion)
181+
}
182+
return
183+
}
184+
if (startInternal(state) == 0) {
185+
// slow-path -- register waiter for completion
186+
select.unregisterOnCompletion(invokeOnCompletion(SelectAwaitOnCompletion(this, select, block)))
187+
return
188+
}
189+
}
171190
}
172191

173192
@Suppress("UNCHECKED_CAST")
174-
internal fun <R> selectCompletion(select: SelectInstance<R>, block: suspend (T) -> R, state: Any? = this.state) {
193+
internal fun <R> selectAwaitCompletion(select: SelectInstance<R>, block: suspend (T) -> R, state: Any? = this.state) {
175194
if (select.trySelect(idempotent = null)) {
176195
if (state is CompletedExceptionally)
177-
select.resumeSelectWithException(state.exception)
196+
select.resumeSelectWithException(state.exception, MODE_DISPATCHED)
178197
else
179-
block.startUndispatchedCoroutine(state as T, select.completion)
198+
block.startCoroutine(state as T, select.completion)
180199
}
181200
}
182201

@@ -189,13 +208,13 @@ private open class DeferredCoroutine<T>(
189208
}
190209
}
191210

192-
private class SelectOnCompletion<T, R>(
211+
private class SelectAwaitOnCompletion<T, R>(
193212
deferred: DeferredCoroutine<T>,
194213
private val select: SelectInstance<R>,
195214
private val block: suspend (T) -> R
196215
) : JobNode<DeferredCoroutine<T>>(deferred) {
197-
override fun invoke(reason: Throwable?) = job.selectCompletion(select, block)
198-
override fun toString(): String = "SelectOnCompletion[$select]"
216+
override fun invoke(reason: Throwable?) = job.selectAwaitCompletion(select, block)
217+
override fun toString(): String = "SelectAwaitOnCompletion[$select]"
199218
}
200219

201220
private class LazyDeferredCoroutine<T>(

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,16 @@
1717
package kotlinx.coroutines.experimental
1818

1919
import kotlinx.coroutines.experimental.internal.*
20+
import kotlinx.coroutines.experimental.intrinsics.startUndispatchedCoroutine
21+
import kotlinx.coroutines.experimental.selects.SelectBuilder
22+
import kotlinx.coroutines.experimental.selects.SelectInstance
23+
import kotlinx.coroutines.experimental.selects.select
2024
import java.util.concurrent.Future
2125
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
2226
import kotlin.coroutines.experimental.AbstractCoroutineContextElement
2327
import kotlin.coroutines.experimental.Continuation
2428
import kotlin.coroutines.experimental.CoroutineContext
29+
import kotlin.coroutines.experimental.startCoroutine
2530

2631
// --------------- core job interfaces ---------------
2732

@@ -117,9 +122,18 @@ public interface Job : CoroutineContext.Element {
117122
*
118123
* This suspending function is cancellable. If the [Job] of the invoking coroutine is completed while this
119124
* suspending function is suspended, this function immediately resumes with [CancellationException].
125+
*
126+
* This function can be used in [select] invocation with [onJoin][SelectBuilder.onJoin] clause.
127+
* Use [isCompleted] to check for completion of this job without waiting.
120128
*/
121129
public suspend fun join()
122130

131+
/**
132+
* Registers [onJoin][SelectBuilder.onJoin] select clause.
133+
* @suppress **This is unstable API and it is subject to change.**
134+
*/
135+
public fun <R> registerSelectJoin(select: SelectInstance<R>, block: suspend () -> R)
136+
123137
/**
124138
* Cancel this activity with an optional cancellation [cause]. The result is `true` if this job was
125139
* cancelled as a result of this invocation and `false` otherwise
@@ -549,6 +563,25 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
549563
cont.unregisterOnCompletion(invokeOnCompletion(ResumeOnCompletion(this, cont)))
550564
}
551565

566+
override fun <R> registerSelectJoin(select: SelectInstance<R>, block: suspend () -> R) {
567+
// fast-path -- check state and select/return if needed
568+
while (true) {
569+
if (select.isSelected) return
570+
val state = this.state
571+
if (state !is Incomplete) {
572+
// already complete -- select result
573+
if (select.trySelect(idempotent = null))
574+
block.startUndispatchedCoroutine(select.completion)
575+
return
576+
}
577+
if (startInternal(state) == 0) {
578+
// slow-path -- register waiter for completion
579+
select.unregisterOnCompletion(invokeOnCompletion(SelectJoinOnCompletion(this, select, block)))
580+
return
581+
}
582+
}
583+
}
584+
552585
internal fun removeNode(node: JobNode<*>) {
553586
// remove logic depends on the state of the job
554587
while (true) { // lock-free loop on job state
@@ -759,6 +792,20 @@ private class CancelFutureOnCompletion(
759792
override fun toString() = "CancelFutureOnCompletion[$future]"
760793
}
761794

795+
private class SelectJoinOnCompletion<R>(
796+
job: JobSupport,
797+
private val select: SelectInstance<R>,
798+
private val block: suspend () -> R
799+
) : JobNode<JobSupport>(job) {
800+
override fun invoke(reason: Throwable?) {
801+
if (select.trySelect(idempotent = null))
802+
block.startCoroutine(select.completion)
803+
}
804+
override fun toString(): String = "SelectJoinOnCompletion[$select]"
805+
}
806+
807+
808+
762809
private class JobImpl(parent: Job? = null) : JobSupport(true) {
763810
init { initParentJob(parent) }
764811
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/NonCancellable.kt

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package kotlinx.coroutines.experimental
1818

1919
import kotlinx.coroutines.experimental.NonCancellable.isActive
20+
import kotlinx.coroutines.experimental.selects.SelectInstance
2021
import kotlin.coroutines.experimental.AbstractCoroutineContextElement
2122

2223
/**
@@ -41,7 +42,17 @@ object NonCancellable : AbstractCoroutineContextElement(Job), Job {
4142
override fun start(): Boolean = false
4243

4344
/** Always throws [UnsupportedOperationException]. */
44-
suspend override fun join() { throw UnsupportedOperationException("This job is always active") }
45+
suspend override fun join() {
46+
throw UnsupportedOperationException("This job is always active")
47+
}
48+
49+
/**
50+
* Always throws [UnsupportedOperationException].
51+
* @suppress **This is unstable API and it is subject to change.**
52+
*/
53+
override fun <R> registerSelectJoin(select: SelectInstance<R>, block: suspend () -> R) {
54+
throw UnsupportedOperationException("This job is always active")
55+
}
4556

4657
/** Always throws [IllegalStateException]. */
4758
override fun getCompletionException(): CancellationException = throw IllegalStateException("This job is always active")

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -724,7 +724,7 @@ public abstract class AbstractChannel<E> : Channel<E> {
724724
if (closed.closeCause == null && nullOnClose) {
725725
block.startCoroutine(null, select.completion)
726726
} else
727-
select.resumeSelectWithException(closed.receiveException)
727+
select.resumeSelectWithException(closed.receiveException, MODE_DISPATCHED)
728728
}
729729
}
730730

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,12 @@ import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
3030
* Scope for [select] invocation.
3131
*/
3232
public interface SelectBuilder<in R> : CoroutineScope {
33+
/**
34+
* Clause for [Job.join] suspending function that selects the given [block] when the job is complete.
35+
* This clause never fails, even if the job completes exceptionally.
36+
*/
37+
public fun Job.onJoin(block: suspend () -> R)
38+
3339
/**
3440
* Clause for [Deferred.await] suspending function that selects the given [block] with the deferred value is
3541
* resolved. The [select] invocation fails if the deferred value completes exceptionally (either fails or
@@ -89,9 +95,14 @@ public interface SelectInstance<in R> {
8995
*/
9096
public fun performAtomicIfNotSelected(desc: AtomicDesc): Any?
9197

98+
/**
99+
* Returns completion continuation of this select instance.
100+
* This select instance must be _selected_ first.
101+
* All resumption through this instance happen _directly_ (as if `mode` is [MODE_DIRECT]).
102+
*/
92103
public val completion: Continuation<R>
93104

94-
public fun resumeSelectWithException(exception: Throwable)
105+
public fun resumeSelectWithException(exception: Throwable, mode: Int)
95106

96107
public fun invokeOnCompletion(handler: CompletionHandler): Job.Registration
97108

@@ -113,7 +124,8 @@ public interface SelectInstance<in R> {
113124
*
114125
* | **Receiver** | **Suspending function** | **Select clause** | **Non-suspending version**
115126
* | ---------------- | --------------------------------------------- | ------------------------------------------------ | --------------------------
116-
* | [Deferred] | [await][Deferred.await] | [onAwait][SelectBuilder.onAwait] | [isCompleted][Deferred.isCompleted]
127+
* | [Job] | [join][Job.join] | [onJoin][SelectBuilder.onJoin] | [isCompleted][Job.isCompleted]
128+
* | [Deferred] | [await][Deferred.await] | [onAwait][SelectBuilder.onAwait] | [isCompleted][Job.isCompleted]
117129
* | [SendChannel] | [send][SendChannel.send] | [onSend][SelectBuilder.onSend] | [offer][SendChannel.offer]
118130
* | [ReceiveChannel] | [receive][ReceiveChannel.receive] | [onReceive][SelectBuilder.onReceive] | [poll][ReceiveChannel.poll]
119131
* | [ReceiveChannel] | [receiveOrNull][ReceiveChannel.receiveOrNull] | [onReceiveOrNull][SelectBuilder.onReceiveOrNull] | [poll][ReceiveChannel.poll]
@@ -183,9 +195,13 @@ internal class SelectBuilderImpl<in R>(
183195
return this
184196
}
185197

186-
override fun resumeSelectWithException(exception: Throwable) {
198+
override fun resumeSelectWithException(exception: Throwable, mode: Int) {
187199
check(isSelected) { "Must be selected first" }
188-
resumeWithException(exception, mode = 0)
200+
resumeWithException(exception, mode)
201+
}
202+
203+
override fun Job.onJoin(block: suspend () -> R) {
204+
registerSelectJoin(this@SelectBuilderImpl, block)
189205
}
190206

191207
override fun <T> Deferred<T>.onAwait(block: suspend (T) -> R) {

0 commit comments

Comments
 (0)