Skip to content

Commit ee7c0eb

Browse files
committed
Multi-part atomic remove operation support for LockFreeLinkedList
1 parent e780347 commit ee7c0eb

File tree

13 files changed

+644
-238
lines changed

13 files changed

+644
-238
lines changed

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ private class BlockingCoroutine<T>(
142142
while (eventLoop!!.processNextEvent()) { /* just spin */ }
143143
}
144144
// now return result
145-
val state = getState()
145+
val state = this.state
146146
(state as? CompletedExceptionally)?.let { throw it.exception }
147147
return state as T
148148
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -113,10 +113,10 @@ public inline suspend fun <T> suspendCancellableCoroutine(
113113
crossinline block: (CancellableContinuation<T>) -> Unit
114114
): T =
115115
suspendCoroutineOrReturn { cont ->
116-
val safe = CancellableContinuationImpl(cont, getParentJobOrAbort(cont))
117-
if (!holdCancellability) safe.initCancellability()
118-
block(safe)
119-
safe.getResult()
116+
val cancellable = CancellableContinuationImpl(cont, getParentJobOrAbort(cont), active = true)
117+
if (!holdCancellability) cancellable.initCancellability()
118+
block(cancellable)
119+
cancellable.getResult()
120120
}
121121

122122

@@ -149,12 +149,11 @@ internal fun getParentJobOrAbort(cont: Continuation<*>): Job? {
149149
}
150150

151151
@PublishedApi
152-
internal class CancellableContinuationImpl<in T>(
153-
private val delegate: Continuation<T>,
154-
private val parentJob: Job?
155-
) : AbstractCoroutine<T>(delegate.context, active = true), CancellableContinuation<T> {
156-
// only updated from the thread that invoked suspendCancellableCoroutine
157-
152+
internal open class CancellableContinuationImpl<in T>(
153+
private val delegate: Continuation<T>,
154+
private val parentJob: Job?,
155+
active: Boolean
156+
) : AbstractCoroutine<T>(delegate.context, active), CancellableContinuation<T> {
158157
@Volatile
159158
private var decision = UNDECIDED
160159

@@ -173,24 +172,24 @@ internal class CancellableContinuationImpl<in T>(
173172
initParentJob(parentJob)
174173
}
175174

176-
fun getResult(): Any? {
175+
@PublishedApi
176+
internal fun getResult(): Any? {
177177
val decision = this.decision // volatile read
178178
when (decision) {
179179
UNDECIDED -> if (DECISION.compareAndSet(this, UNDECIDED, SUSPENDED)) return COROUTINE_SUSPENDED
180180
YIELD -> return COROUTINE_SUSPENDED
181181
}
182182
// otherwise, afterCompletion was already invoked, and the result is in the state
183-
val state = getState()
183+
val state = this.state
184184
if (state is CompletedExceptionally) throw state.exception
185185
return state
186186
}
187187

188-
override val isCancelled: Boolean
189-
get() = getState() is Cancelled
188+
override val isCancelled: Boolean get() = state is Cancelled
190189

191190
override fun tryResume(value: T): Any? {
192191
while (true) { // lock-free loop on state
193-
val state = getState() // atomic read
192+
val state = this.state // atomic read
194193
when (state) {
195194
is Incomplete -> if (tryUpdateState(state, value)) return state
196195
else -> return null // cannot resume -- not active anymore
@@ -200,7 +199,7 @@ internal class CancellableContinuationImpl<in T>(
200199

201200
override fun tryResumeWithException(exception: Throwable): Any? {
202201
while (true) { // lock-free loop on state
203-
val state = getState() // atomic read
202+
val state = this.state // atomic read
204203
when (state) {
205204
is Incomplete -> if (tryUpdateState(state, CompletedExceptionally(exception))) return state
206205
else -> return null // cannot resume -- not active anymore
@@ -209,7 +208,7 @@ internal class CancellableContinuationImpl<in T>(
209208
}
210209

211210
override fun completeResume(token: Any) {
212-
completeUpdateState(token, getState())
211+
completeUpdateState(token, state)
213212
}
214213

215214
@Suppress("UNCHECKED_CAST")

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public abstract class AbstractCoroutine<in T>(
6262

6363
final override fun resume(value: T) {
6464
while (true) { // lock-free loop on state
65-
val state = getState() // atomic read
65+
val state = this.state // atomic read
6666
when (state) {
6767
is Incomplete -> if (updateState(state, value)) return
6868
is Cancelled -> return // ignore resumes on cancelled continuation
@@ -73,7 +73,7 @@ public abstract class AbstractCoroutine<in T>(
7373

7474
final override fun resumeWithException(exception: Throwable) {
7575
while (true) { // lock-free loop on state
76-
val state = getState() // atomic read
76+
val state = this.state // atomic read
7777
when (state) {
7878
is Incomplete -> if (updateState(state, CompletedExceptionally(exception))) return
7979
is Cancelled -> {
@@ -92,8 +92,8 @@ public abstract class AbstractCoroutine<in T>(
9292

9393
// for nicer debugging
9494
override fun toString(): String {
95-
val state = getState()
95+
val state = this.state
9696
val result = if (state is Incomplete) "" else "[$state]"
97-
return "${this::class.java.simpleName}{${describeState(state)}}$result@${Integer.toHexString(System.identityHashCode(this))}"
97+
return "${this::class.java.simpleName}{${stateToString(state)}}$result@${Integer.toHexString(System.identityHashCode(this))}"
9898
}
9999
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -121,14 +121,14 @@ private open class DeferredCoroutine<T>(
121121
context: CoroutineContext,
122122
active: Boolean
123123
) : AbstractCoroutine<T>(context, active), Deferred<T> {
124-
override val isCompletedExceptionally: Boolean get() = getState() is CompletedExceptionally
125-
override val isCancelled: Boolean get() = getState() is Cancelled
124+
override val isCompletedExceptionally: Boolean get() = state is CompletedExceptionally
125+
override val isCancelled: Boolean get() = state is Cancelled
126126

127127
@Suppress("UNCHECKED_CAST")
128128
suspend override fun await(): T {
129129
// fast-path -- check state (avoid extra object creation)
130130
while(true) { // lock-free loop on state
131-
val state = this.getState()
131+
val state = this.state
132132
if (state !is Incomplete) {
133133
// already complete -- just return result
134134
if (state is CompletedExceptionally) throw state.exception
@@ -143,7 +143,7 @@ private open class DeferredCoroutine<T>(
143143
@Suppress("UNCHECKED_CAST")
144144
private suspend fun awaitSuspend(): T = suspendCancellableCoroutine { cont ->
145145
cont.unregisterOnCompletion(invokeOnCompletion {
146-
val state = getState()
146+
val state = this.state
147147
check(state !is Incomplete)
148148
if (state is CompletedExceptionally)
149149
cont.resumeWithException(state.exception)
@@ -154,7 +154,7 @@ private open class DeferredCoroutine<T>(
154154

155155
@Suppress("UNCHECKED_CAST")
156156
override fun getCompleted(): T {
157-
val state = getState()
157+
val state = this.state
158158
check(state !is Incomplete) { "This deferred value has not completed yet" }
159159
if (state is CompletedExceptionally) throw state.exception
160160
return state as T

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt

Lines changed: 87 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,8 @@
1616

1717
package kotlinx.coroutines.experimental
1818

19-
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
20-
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
19+
import kotlinx.coroutines.experimental.internal.*
2120
import java.util.concurrent.Future
22-
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater
2321
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
2422
import kotlin.coroutines.experimental.AbstractCoroutineContextElement
2523
import kotlin.coroutines.experimental.Continuation
@@ -260,20 +258,22 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
260258
261259
This state machine and its transition matrix are optimized for the common case when job is created in active
262260
state (EMPTY_A) and at most one completion listener is added to it during its life-time.
261+
262+
Note, that the actual `_state` variable can also be a reference to atomic operation descriptor `OpDescriptor`
263263
*/
264264

265265
@Volatile
266-
private var state: Any? = if (active) EmptyActive else EmptyNew // shared objects while we have no listeners
266+
private var _state: Any? = if (active) EmptyActive else EmptyNew // shared objects while we have no listeners
267267

268268
@Volatile
269269
private var registration: Job.Registration? = null
270270

271271
protected companion object {
272272
@JvmStatic
273273
private val STATE: AtomicReferenceFieldUpdater<JobSupport, Any?> =
274-
AtomicReferenceFieldUpdater.newUpdater(JobSupport::class.java, Any::class.java, "state")
274+
AtomicReferenceFieldUpdater.newUpdater(JobSupport::class.java, Any::class.java, "_state")
275275

276-
fun describeState(state: Any?): String =
276+
fun stateToString(state: Any?): String =
277277
if (state is Incomplete)
278278
if (state.isActive) "Active" else "New"
279279
else "Completed"
@@ -299,10 +299,16 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
299299
/**
300300
* Returns current state of this job.
301301
*/
302-
protected fun getState(): Any? = state
302+
protected val state: Any? get() {
303+
while (true) { // lock-free helping loop
304+
val state = _state
305+
if (state !is OpDescriptor) return state
306+
state.perform(this)
307+
}
308+
}
303309

304310
/**
305-
* Tries to update current [state][getState] of this job.
311+
* Tries to update current [state] of this job.
306312
*/
307313
internal fun updateState(expect: Any, update: Any?): Boolean {
308314
if (!tryUpdateState(expect, update)) return false
@@ -347,14 +353,20 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
347353
afterCompletion(update)
348354
}
349355

350-
final override val isActive: Boolean get() {
356+
public final override val isActive: Boolean get() {
351357
val state = this.state
352358
return state is Incomplete && state.isActive
353359
}
354360

355-
final override val isCompleted: Boolean get() = state !is Incomplete
361+
public final override val isCompleted: Boolean get() = state !is Incomplete
362+
363+
// this is for `select` operator. `isSelected` state means "not new" (== was started or already completed)
364+
public val isSelected: Boolean get() {
365+
val state = this.state
366+
return state !is Incomplete || state.isActive
367+
}
356368

357-
final override fun start(): Boolean {
369+
public final override fun start(): Boolean {
358370
while (true) { // lock-free loop on state
359371
when (startInternal(state)) {
360372
0 -> return false
@@ -375,7 +387,7 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
375387
// LIST -- a list of completion handlers (either new or active)
376388
state is NodeList -> {
377389
if (state.isActive) return 0
378-
if (!NodeList.ACTIVE.compareAndSet(state, 0, 1)) return -1
390+
if (!NodeList.ACTIVE.compareAndSet(state, null, NodeList.ACTIVE_STATE)) return -1
379391
onStart()
380392
return 1
381393
}
@@ -384,13 +396,53 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
384396
}
385397
}
386398

399+
internal fun describeStart(failureMarker: Any): AtomicDesc =
400+
object : AtomicDesc() {
401+
override fun prepare(op: AtomicOp): Any? {
402+
while (true) { // lock-free loop on state
403+
val state = this@JobSupport._state
404+
when {
405+
state === op -> return null // already in progress
406+
state is OpDescriptor -> state.perform(this@JobSupport) // help
407+
state === EmptyNew -> { // EMPTY_NEW state -- no completion handlers, new
408+
if (STATE.compareAndSet(this@JobSupport, state, op)) return null // success
409+
}
410+
state is NodeList -> { // LIST -- a list of completion handlers (either new or active)
411+
if (state.isActive) return failureMarker
412+
if (NodeList.ACTIVE.compareAndSet(state, null, op)) return null // success
413+
}
414+
else -> return failureMarker // not a new state
415+
}
416+
}
417+
}
418+
419+
override fun complete(op: AtomicOp, failure: Any?) {
420+
val success = failure == null
421+
val state = this@JobSupport._state
422+
when {
423+
state === op -> {
424+
if (STATE.compareAndSet(this@JobSupport, op, if (success) EmptyActive else EmptyNew)) {
425+
if (success) onStart()
426+
}
427+
}
428+
state is NodeList -> { // LIST -- a list of completion handlers (either new or active)
429+
if (state._active === op) {
430+
if (NodeList.ACTIVE.compareAndSet(state, op, if (success) NodeList.ACTIVE_STATE else null)) {
431+
if (success) onStart()
432+
}
433+
}
434+
}
435+
}
436+
}
437+
}
438+
387439
/**
388440
* Override to provide the actual [start] action.
389441
*/
390442
protected open fun onStart() {}
391443

392444
final override fun getCompletionException(): Throwable {
393-
val state = getState()
445+
val state = this.state
394446
return when (state) {
395447
is Incomplete -> throw IllegalStateException("Job has not completed yet")
396448
is CompletedExceptionally -> state.exception
@@ -414,14 +466,14 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
414466
// EMPTY_NEW state -- no completion handlers, new
415467
state === EmptyNew -> {
416468
// try to promote it to list in new state
417-
STATE.compareAndSet(this, state, NodeList(active = 0))
469+
STATE.compareAndSet(this, state, NodeList(active = false))
418470
}
419471
// SINGLE/SINGLE+ state -- one completion handler
420472
state is JobNode<*> -> {
421473
// try to promote it to list (SINGLE+ state)
422-
state.addFirstIfEmpty(NodeList(active = 1))
474+
state.addOneIfEmpty(NodeList(active = true))
423475
// it must be in SINGLE+ state or state has changed (node could have need removed from state)
424-
val list = state.next() // either NodeList or somebody else won the race, updated state
476+
val list = state.next // either NodeList or somebody else won the race, updated state
425477
// just attempt converting it to list if state is still the same, then continue lock-free loop
426478
STATE.compareAndSet(this, state, list)
427479
}
@@ -498,25 +550,36 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
498550
?: InvokeOnCompletion(this, handler)
499551

500552
// for nicer debugging
501-
override fun toString(): String = "${this::class.java.simpleName}{${describeState(state)}}@${Integer.toHexString(System.identityHashCode(this))}"
553+
override fun toString(): String = "${this::class.java.simpleName}{${stateToString(state)}}@${Integer.toHexString(System.identityHashCode(this))}"
502554

503555
/**
504-
* Interface for incomplete [state][getState] of a job.
556+
* Interface for incomplete [state] of a job.
505557
*/
506558
public interface Incomplete {
507559
val isActive: Boolean
508560
}
509561

510562
private class NodeList(
511-
@Volatile
512-
var active: Int
563+
active: Boolean
513564
) : LockFreeLinkedListHead(), Incomplete {
514-
override val isActive: Boolean get() = active != 0
565+
@Volatile
566+
var _active: Any? = if (active) ACTIVE_STATE else null
567+
568+
override val isActive: Boolean get() {
569+
while (true) { // helper loop for atomic ops
570+
val active = this._active
571+
if (active !is OpDescriptor) return active != null
572+
active.perform(this)
573+
}
574+
}
515575

516576
companion object {
517577
@JvmStatic
518-
val ACTIVE: AtomicIntegerFieldUpdater<NodeList> =
519-
AtomicIntegerFieldUpdater.newUpdater(NodeList::class.java, "active")
578+
val ACTIVE: AtomicReferenceFieldUpdater<NodeList, Any?> =
579+
AtomicReferenceFieldUpdater.newUpdater(NodeList::class.java, Any::class.java, "_active")
580+
581+
@JvmStatic
582+
val ACTIVE_STATE = Symbol("ACTIVE_STATE")
520583
}
521584

522585
override fun toString(): String = buildString {
@@ -533,7 +596,7 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
533596
}
534597

535598
/**
536-
* Class for a [state][getState] of a job that had completed exceptionally, including cancellation.
599+
* Class for a [state] of a job that had completed exceptionally, including cancellation.
537600
*
538601
* @param cause the exceptional completion cause. If `cause` is null, then a [CancellationException]
539602
* if created on first get from [exception] property.

0 commit comments

Comments
 (0)