Skip to content

Commit 6042b8e

Browse files
authored
Merge pull request #741 from Kotlin/array-queue
Use array-based queue for Scheduler.GlobalQueue & optimize Task
2 parents 9193668 + f4e7b68 commit 6042b8e

File tree

18 files changed

+332
-250
lines changed

18 files changed

+332
-250
lines changed

common/kotlinx-coroutines-core-common/src/AbstractContinuation.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ private const val RESUMED = 2
1818
*/
1919
internal abstract class AbstractContinuation<in T>(
2020
public final override val delegate: Continuation<T>,
21-
public final override val resumeMode: Int
22-
) : Continuation<T>, DispatchedTask<T> {
21+
resumeMode: Int
22+
) : DispatchedTask<T>(resumeMode), Continuation<T> {
2323

2424
/*
2525
* Implementation notes

common/kotlinx-coroutines-core-common/src/Dispatched.kt

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,10 @@ internal object UndispatchedEventLoop {
8181
internal class DispatchedContinuation<in T>(
8282
@JvmField val dispatcher: CoroutineDispatcher,
8383
@JvmField val continuation: Continuation<T>
84-
) : Continuation<T> by continuation, DispatchedTask<T> {
84+
) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), Continuation<T> by continuation {
8585
@JvmField
8686
@Suppress("PropertyName")
8787
internal var _state: Any? = UNDEFINED
88-
public override var resumeMode: Int = 0
8988
@JvmField // pre-cached value to avoid ctx.fold on every resumption
9089
internal val countOrElement = threadContextElements(context)
9190

@@ -204,20 +203,22 @@ internal fun <T> Continuation<T>.resumeDirectWithException(exception: Throwable)
204203
else -> resumeWithException(exception)
205204
}
206205

207-
internal interface DispatchedTask<in T> : Runnable {
208-
public val delegate: Continuation<T>
209-
public val resumeMode: Int get() = MODE_CANCELLABLE
206+
internal abstract class DispatchedTask<in T>(
207+
@JvmField var resumeMode: Int
208+
) : SchedulerTask() {
209+
public abstract val delegate: Continuation<T>
210210

211-
public fun takeState(): Any?
211+
public abstract fun takeState(): Any?
212212

213213
@Suppress("UNCHECKED_CAST")
214-
public fun <T> getSuccessfulResult(state: Any?): T =
214+
public open fun <T> getSuccessfulResult(state: Any?): T =
215215
state as T
216216

217217
public fun getExceptionalResult(state: Any?): Throwable? =
218218
(state as? CompletedExceptionally)?.cause
219219

220-
public override fun run() {
220+
public final override fun run() {
221+
val taskContext = this.taskContext
221222
try {
222223
val delegate = delegate as DispatchedContinuation<T>
223224
val continuation = delegate.continuation
@@ -237,6 +238,8 @@ internal interface DispatchedTask<in T> : Runnable {
237238
}
238239
} catch (e: Throwable) {
239240
throw DispatchException("Unexpected exception running $this", e)
241+
} finally {
242+
taskContext.afterTask()
240243
}
241244
}
242245
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
internal expect abstract class SchedulerTask() : Runnable
8+
9+
internal expect interface SchedulerTaskContext
10+
11+
internal expect val SchedulerTask.taskContext: SchedulerTaskContext
12+
13+
internal expect inline fun SchedulerTaskContext.afterTask()

core/kotlinx-coroutines-core/src/EventLoop.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ internal fun delayNanosToMillis(timeNanos: Long): Long =
5151
@Suppress("PrivatePropertyName")
5252
private val CLOSED_EMPTY = Symbol("CLOSED_EMPTY")
5353

54-
private typealias Queue<T> = LockFreeMPSCQueueCore<T>
54+
private typealias Queue<T> = LockFreeTaskQueueCore<T>
5555

5656
internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop {
5757
// null | CLOSED_EMPTY | task | Queue<Runnable>
@@ -150,7 +150,7 @@ internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop {
150150
queue === CLOSED_EMPTY -> return false
151151
else -> {
152152
// update to full-blown queue to add one more
153-
val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY)
153+
val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true)
154154
newQueue.addLast(queue as Runnable)
155155
newQueue.addLast(task)
156156
if (_queue.compareAndSet(queue, newQueue)) return true
@@ -191,7 +191,7 @@ internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop {
191191
queue === CLOSED_EMPTY -> return
192192
else -> {
193193
// update to full-blown queue to close
194-
val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY)
194+
val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true)
195195
newQueue.addLast(queue as Runnable)
196196
if (_queue.compareAndSet(queue, newQueue)) return
197197
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
import kotlinx.coroutines.scheduling.*
8+
9+
internal actual typealias SchedulerTask = Task
10+
11+
internal actual typealias SchedulerTaskContext = TaskContext
12+
13+
@Suppress("EXTENSION_SHADOWED_BY_MEMBER")
14+
internal actual val SchedulerTask.taskContext: SchedulerTaskContext get() = taskContext
15+
16+
@Suppress("NOTHING_TO_INLINE", "EXTENSION_SHADOWED_BY_MEMBER")
17+
internal actual inline fun SchedulerTaskContext.afterTask() =
18+
afterTask()

core/kotlinx-coroutines-core/src/internal/LockFreeMPMCQueue.kt

Lines changed: 0 additions & 99 deletions
This file was deleted.

0 commit comments

Comments
 (0)