Skip to content

Commit 0a78839

Browse files
committed
RendezvousChannel is now an open class with a afterClose and onEnqueuedReceive/onCancelledReceive extension points
More efficient cancellable suspending functions for Channel.send/receive and Mutex.lock
1 parent 33ecdca commit 0a78839

File tree

5 files changed

+91
-59
lines changed

5 files changed

+91
-59
lines changed

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package kotlinx.coroutines.experimental
1818

19+
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
1920
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater
2021
import kotlin.coroutines.experimental.Continuation
2122
import kotlin.coroutines.experimental.ContinuationInterceptor
@@ -118,8 +119,27 @@ public inline suspend fun <T> suspendCancellableCoroutine(
118119
safe.getResult()
119120
}
120121

122+
123+
/**
124+
* Removes a given node on cancellation.
125+
* @suppress **This is unstable API and it is subject to change.**
126+
*/
127+
public fun CancellableContinuation<*>.removeOnCancel(node: LockFreeLinkedListNode): Job.Registration =
128+
onCompletion(RemoveOnCancel(this, node))
129+
121130
// --------------- implementation details ---------------
122131

132+
private class RemoveOnCancel(
133+
cont: CancellableContinuation<*>,
134+
val node: LockFreeLinkedListNode
135+
) : JobNode<CancellableContinuation<*>>(cont) {
136+
override fun invoke(reason: Throwable?) {
137+
if (job.isCancelled)
138+
node.remove()
139+
}
140+
override fun toString() = "RemoveOnCancel[$node]"
141+
}
142+
123143
@PublishedApi
124144
internal fun getParentJobOrAbort(cont: Continuation<*>): Job? {
125145
val job = cont.context[Job]

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt

Lines changed: 16 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -185,13 +185,6 @@ public fun Job.unregisterOnCompletion(registration: Job.Registration): Job.Regis
185185
public fun Job.cancelFutureOnCompletion(future: Future<*>): Job.Registration =
186186
onCompletion(CancelFutureOnCompletion(this, future))
187187

188-
/**
189-
* Removes a given node on completion.
190-
* @suppress **This is unstable API and it is subject to change.**
191-
*/
192-
public fun Job.removeOnCompletion(node: LockFreeLinkedListNode): Job.Registration =
193-
onCompletion(RemoveOnCompletion(this, node))
194-
195188
/**
196189
* @suppress **Deprecated**: `join` is now a member function of `Job`.
197190
*/
@@ -326,13 +319,13 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
326319
var completionException: Throwable? = null
327320
when (expect) {
328321
// SINGLE/SINGLE+ state -- one completion handler (common case)
329-
is JobNode -> try {
322+
is JobNode<*> -> try {
330323
expect.invoke(cause)
331324
} catch (ex: Throwable) {
332325
completionException = ex
333326
}
334327
// LIST state -- a list of completion handlers
335-
is NodeList -> expect.forEach<JobNode> { node ->
328+
is NodeList -> expect.forEach<JobNode<*>> { node ->
336329
try {
337330
node.invoke(cause)
338331
} catch (ex: Throwable) {
@@ -401,7 +394,7 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
401394
}
402395

403396
final override fun onCompletion(handler: CompletionHandler): Job.Registration {
404-
var nodeCache: JobNode? = null
397+
var nodeCache: JobNode<*>? = null
405398
while (true) { // lock-free loop on state
406399
val state = this.state
407400
when {
@@ -417,7 +410,7 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
417410
STATE.compareAndSet(this, state, NodeList(active = 0))
418411
}
419412
// SINGLE/SINGLE+ state -- one completion handler
420-
state is JobNode -> {
413+
state is JobNode<*> -> {
421414
// try to promote it to list (SINGLE+ state)
422415
state.addFirstIfEmpty(NodeList(active = 1))
423416
// it must be in SINGLE+ state or state has changed (node could have need removed from state)
@@ -451,13 +444,13 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
451444
cont.unregisterOnCompletion(onCompletion(ResumeOnCompletion(this, cont)))
452445
}
453446

454-
internal fun removeNode(node: JobNode) {
447+
internal fun removeNode(node: JobNode<*>) {
455448
// remove logic depends on the state of the job
456449
while (true) { // lock-free loop on job state
457450
val state = this.state
458451
when (state) {
459452
// SINGE/SINGLE+ state -- one completion handler
460-
is JobNode -> {
453+
is JobNode<*> -> {
461454
if (state !== this) return // a different job node --> we were already removed
462455
// try remove and revert back to empty state
463456
if (STATE.compareAndSet(this, state, EmptyActive)) return
@@ -493,8 +486,8 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
493486
*/
494487
protected open fun afterCompletion(state: Any?) {}
495488

496-
private fun makeNode(handler: CompletionHandler): JobNode =
497-
(handler as? JobNode)?.also { require(it.job === this) }
489+
private fun makeNode(handler: CompletionHandler): JobNode<*> =
490+
(handler as? JobNode<*>)?.also { require(it.job === this) }
498491
?: InvokeOnCompletion(this, handler)
499492

500493
// for nicer debugging
@@ -524,7 +517,7 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
524517
append(if (isActive) "{Active}" else "{New}")
525518
append("[")
526519
var first = true
527-
this@NodeList.forEach<JobNode> { node ->
520+
this@NodeList.forEach<JobNode<*>> { node ->
528521
if (first) first = false else append(", ")
529522
append(node)
530523
}
@@ -565,8 +558,8 @@ private class Empty(override val isActive: Boolean) : JobSupport.Incomplete {
565558
override fun toString(): String = "Empty{${if (isActive) "Active" else "New" }}"
566559
}
567560

568-
internal abstract class JobNode(
569-
val job: Job
561+
internal abstract class JobNode<out J : Job>(
562+
val job: J
570563
) : LockFreeLinkedListNode(), Job.Registration, CompletionHandler, JobSupport.Incomplete {
571564
final override val isActive: Boolean get() = true
572565
// if unregister is called on this instance, then Job was an instance of JobSupport that added this node it itself
@@ -578,39 +571,39 @@ internal abstract class JobNode(
578571
private class InvokeOnCompletion(
579572
job: Job,
580573
val handler: CompletionHandler
581-
) : JobNode(job) {
574+
) : JobNode<Job>(job) {
582575
override fun invoke(reason: Throwable?) = handler.invoke(reason)
583576
override fun toString() = "InvokeOnCompletion[${handler::class.java.name}@${Integer.toHexString(System.identityHashCode(handler))}]"
584577
}
585578

586579
private class ResumeOnCompletion(
587580
job: Job,
588581
val continuation: Continuation<Unit>
589-
) : JobNode(job) {
582+
) : JobNode<Job>(job) {
590583
override fun invoke(reason: Throwable?) = continuation.resume(Unit)
591584
override fun toString() = "ResumeOnCompletion[$continuation]"
592585
}
593586

594587
private class UnregisterOnCompletion(
595588
job: Job,
596589
val registration: Job.Registration
597-
) : JobNode(job) {
590+
) : JobNode<Job>(job) {
598591
override fun invoke(reason: Throwable?) = registration.unregister()
599592
override fun toString(): String = "UnregisterOnCompletion[$registration]"
600593
}
601594

602595
private class CancelOnCompletion(
603596
parentJob: Job,
604597
val subordinateJob: Job
605-
) : JobNode(parentJob) {
598+
) : JobNode<Job>(parentJob) {
606599
override fun invoke(reason: Throwable?) { subordinateJob.cancel(reason) }
607600
override fun toString(): String = "CancelOnCompletion[$subordinateJob]"
608601
}
609602

610603
private class CancelFutureOnCompletion(
611604
job: Job,
612605
val future: Future<*>
613-
) : JobNode(job) {
606+
) : JobNode<Job>(job) {
614607
override fun invoke(reason: Throwable?) {
615608
// Don't interrupt when cancelling future on completion, because no one is going to reset this
616609
// interruption flag and it will cause spurious failures elsewhere
@@ -619,14 +612,6 @@ private class CancelFutureOnCompletion(
619612
override fun toString() = "CancelFutureOnCompletion[$future]"
620613
}
621614

622-
private class RemoveOnCompletion(
623-
job: Job,
624-
val node: LockFreeLinkedListNode
625-
) : JobNode(job) {
626-
override fun invoke(reason: Throwable?) { node.remove() }
627-
override fun toString() = "RemoveOnCompletion[$node]"
628-
}
629-
630615
private class JobImpl(parent: Job? = null) : JobSupport(true) {
631616
init { initParentJob(parent) }
632617
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Mutex.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ public class Mutex(locked: Boolean = false) {
118118
if (state.addLastIf(waiter, { this.state === state })) {
119119
// added to waiter list!
120120
cont.initCancellability()
121-
cont.removeOnCompletion(waiter)
121+
cont.removeOnCancel(waiter)
122122
return@sc
123123
}
124124
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt

Lines changed: 48 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package kotlinx.coroutines.experimental.channels
1919
import kotlinx.coroutines.experimental.CancellableContinuation
2020
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
2121
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
22-
import kotlinx.coroutines.experimental.removeOnCompletion
22+
import kotlinx.coroutines.experimental.removeOnCancel
2323
import kotlinx.coroutines.experimental.suspendCancellableCoroutine
2424

2525
/**
@@ -72,17 +72,17 @@ public abstract class AbstractChannel<E> : Channel<E> {
7272

7373
// ------ SendChannel ------
7474

75-
override val isClosedForSend: Boolean get() = closedForSend != null
76-
override val isFull: Boolean get() = queue.next() !is ReceiveOrClosed<*> && isBufferFull
75+
public final override val isClosedForSend: Boolean get() = closedForSend != null
76+
public final override val isFull: Boolean get() = queue.next() !is ReceiveOrClosed<*> && isBufferFull
7777

78-
suspend override fun send(element: E) {
78+
public final override suspend fun send(element: E) {
7979
// fast path -- try offer non-blocking
8080
if (offer(element)) return
8181
// slow-path does suspend
8282
return sendSuspend(element)
8383
}
8484

85-
override fun offer(element: E): Boolean {
85+
public final override fun offer(element: E): Boolean {
8686
val result = offerInternal(element)
8787
return when {
8888
result === OFFER_SUCCESS -> true
@@ -96,7 +96,7 @@ public abstract class AbstractChannel<E> : Channel<E> {
9696
loop@ while (true) {
9797
if (enqueueSend(send)) {
9898
cont.initCancellability() // make it properly cancellable
99-
cont.removeOnCompletion(send)
99+
cont.removeOnCancel(send)
100100
return@sc
101101
}
102102
// hm... something is not right. try to offer
@@ -120,13 +120,16 @@ public abstract class AbstractChannel<E> : Channel<E> {
120120
else
121121
queue.addLastIfPrev(send, { it !is ReceiveOrClosed<*> })
122122

123-
override fun close(cause: Throwable?): Boolean {
123+
public final override fun close(cause: Throwable?): Boolean {
124124
val closed = Closed<E>(cause)
125125
while (true) {
126126
val receive = takeFirstReceiveOrPeekClosed()
127127
if (receive == null) {
128128
// queue empty or has only senders -- try add last "Closed" item to the queue
129-
if (queue.addLastIfPrev(closed, { it !is ReceiveOrClosed<*> })) return true
129+
if (queue.addLastIfPrev(closed, { it !is ReceiveOrClosed<*> })) {
130+
afterClose(cause)
131+
return true
132+
}
130133
continue // retry on failure
131134
}
132135
if (receive is Closed<*>) return false // already marked as closed -- nothing to do
@@ -135,6 +138,11 @@ public abstract class AbstractChannel<E> : Channel<E> {
135138
}
136139
}
137140

141+
/**
142+
* Invoked after successful [close].
143+
*/
144+
protected open fun afterClose(cause: Throwable?) {}
145+
138146
/**
139147
* Retrieves first receiving waiter from the queue or returns closed token.
140148
*/
@@ -143,11 +151,11 @@ public abstract class AbstractChannel<E> : Channel<E> {
143151

144152
// ------ ReceiveChannel ------
145153

146-
override val isClosedForReceive: Boolean get() = closedForReceive != null && isBufferEmpty
147-
override val isEmpty: Boolean get() = queue.next() !is Send && isBufferEmpty
154+
public final override val isClosedForReceive: Boolean get() = closedForReceive != null && isBufferEmpty
155+
public final override val isEmpty: Boolean get() = queue.next() !is Send && isBufferEmpty
148156

149157
@Suppress("UNCHECKED_CAST")
150-
suspend override fun receive(): E {
158+
public final override suspend fun receive(): E {
151159
// fast path -- try poll non-blocking
152160
val result = pollInternal()
153161
if (result !== POLL_EMPTY) return receiveResult(result)
@@ -167,7 +175,7 @@ public abstract class AbstractChannel<E> : Channel<E> {
167175
while (true) {
168176
if (enqueueReceive(receive)) {
169177
cont.initCancellability() // make it properly cancellable
170-
cont.removeOnCompletion(receive)
178+
removeReceiveOnCancel(cont, receive)
171179
return@sc
172180
}
173181
// hm... something is not right. try to poll
@@ -183,14 +191,16 @@ public abstract class AbstractChannel<E> : Channel<E> {
183191
}
184192
}
185193

186-
private fun enqueueReceive(receive: Receive<E>) =
187-
if (hasBuffer)
188-
queue.addLastIfPrevAndIf(receive, { it !is Send }, { isBufferEmpty })
189-
else
194+
private fun enqueueReceive(receive: Receive<E>): Boolean {
195+
val result = if (hasBuffer)
196+
queue.addLastIfPrevAndIf(receive, { it !is Send }, { isBufferEmpty }) else
190197
queue.addLastIfPrev(receive, { it !is Send })
198+
if (result) onEnqueuedReceive()
199+
return result
200+
}
191201

192202
@Suppress("UNCHECKED_CAST")
193-
suspend override fun receiveOrNull(): E? {
203+
public final override suspend fun receiveOrNull(): E? {
194204
// fast path -- try poll non-blocking
195205
val result = pollInternal()
196206
if (result !== POLL_EMPTY) return receiveOrNullResult(result)
@@ -213,7 +223,7 @@ public abstract class AbstractChannel<E> : Channel<E> {
213223
while (true) {
214224
if (enqueueReceive(receive)) {
215225
cont.initCancellability() // make it properly cancellable
216-
cont.removeOnCompletion(receive)
226+
removeReceiveOnCancel(cont, receive)
217227
return@sc
218228
}
219229
// hm... something is not right. try to poll
@@ -233,12 +243,12 @@ public abstract class AbstractChannel<E> : Channel<E> {
233243
}
234244

235245
@Suppress("UNCHECKED_CAST")
236-
override fun poll(): E? {
246+
public final override fun poll(): E? {
237247
val result = pollInternal()
238248
return if (result === POLL_EMPTY) null else receiveOrNullResult(result)
239249
}
240250

241-
override fun iterator(): ChannelIterator<E> = Iterator(this)
251+
public final override fun iterator(): ChannelIterator<E> = Iterator(this)
242252

243253
/**
244254
* Retrieves first sending waiter from the queue or returns closed token.
@@ -262,6 +272,23 @@ public abstract class AbstractChannel<E> : Channel<E> {
262272
override fun toString(): String = string
263273
}
264274

275+
private fun removeReceiveOnCancel(cont: CancellableContinuation<*>, receive: Receive<*>) {
276+
cont.onCompletion {
277+
if (cont.isCancelled && receive.remove())
278+
onCancelledReceive()
279+
}
280+
}
281+
282+
/**
283+
* Invoked when receiver is successfully enqueued to the queue of waiting receivers.
284+
*/
285+
protected open fun onEnqueuedReceive() {}
286+
287+
/**
288+
* Invoked when enqueued receiver was successfully cancelled.
289+
*/
290+
protected open fun onCancelledReceive() {}
291+
265292
private class Iterator<E>(val channel: AbstractChannel<E>) : ChannelIterator<E> {
266293
var result: Any? = POLL_EMPTY // E | POLL_CLOSED | POLL_EMPTY
267294

@@ -288,7 +315,7 @@ public abstract class AbstractChannel<E> : Channel<E> {
288315
while (true) {
289316
if (channel.enqueueReceive(receive)) {
290317
cont.initCancellability() // make it properly cancellable
291-
cont.removeOnCompletion(receive)
318+
channel.removeReceiveOnCancel(cont, receive)
292319
return@sc
293320
}
294321
// hm... something is not right. try to poll

0 commit comments

Comments
 (0)