Skip to content

Commit cd37d8e

Browse files
committed
Merge branch 'context-elements-opto' into release-candidate
# Conflicts: # common/kotlinx-coroutines-core-common/src/Dispatched.kt
2 parents aba0edc + ae85797 commit cd37d8e

File tree

13 files changed

+80
-26
lines changed

13 files changed

+80
-26
lines changed

benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkBenchmark.kt

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,39 @@ import org.openjdk.jmh.annotations.*
1010
import java.util.concurrent.*
1111
import kotlin.coroutines.*
1212

13-
@Warmup(iterations = 10, time = 1)
14-
@Measurement(iterations = 10, time = 1)
13+
@Warmup(iterations = 5, time = 1)
14+
@Measurement(iterations = 5, time = 1)
1515
@BenchmarkMode(Mode.AverageTime)
1616
@OutputTimeUnit(TimeUnit.MILLISECONDS)
1717
@State(Scope.Benchmark)
18-
@Fork(2)
18+
@Fork(1)
1919
open class ChannelSinkBenchmark {
20+
private val tl = ThreadLocal.withInitial({ 42 })
21+
private val tl2 = ThreadLocal.withInitial({ 239 })
22+
23+
private val unconfined = Dispatchers.Unconfined
24+
private val unconfinedOneElement = Dispatchers.Unconfined + tl.asContextElement()
25+
private val unconfinedTwoElements = Dispatchers.Unconfined + tl.asContextElement() + tl2.asContextElement()
2026

2127
@Benchmark
2228
fun channelPipeline(): Int = runBlocking {
23-
Channel
24-
.range(1, 1_000_000, Dispatchers.Unconfined)
25-
.filter(Dispatchers.Unconfined) { it % 4 == 0 }
29+
run(unconfined)
30+
}
31+
32+
@Benchmark
33+
fun channelPipelineOneThreadLocal(): Int = runBlocking {
34+
run(unconfinedOneElement)
35+
}
36+
37+
@Benchmark
38+
fun channelPipelineTwoThreadLocals(): Int = runBlocking {
39+
run(unconfinedTwoElements)
40+
}
41+
42+
private suspend inline fun run(context: CoroutineContext): Int {
43+
return Channel
44+
.range(1, 1_000_000, context)
45+
.filter(context) { it % 4 == 0 }
2646
.fold(0) { a, b -> a + b }
2747
}
2848

common/kotlinx-coroutines-core-common/src/Builders.common.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ public suspend fun <T> withContext(
138138
if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
139139
val coroutine = UndispatchedCoroutine(newContext, uCont) // MODE_UNDISPATCHED
140140
// There are changes in the context, so this thread needs to be updated
141-
withCoroutineContext(newContext) {
141+
withCoroutineContext(newContext, null) {
142142
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
143143
}
144144
}

common/kotlinx-coroutines-core-common/src/CoroutineContext.common.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ internal expect fun createDefaultDispatcher(): CoroutineDispatcher
1717
@Suppress("PropertyName")
1818
internal expect val DefaultDelay: Delay
1919

20-
internal expect inline fun <T> withCoroutineContext(context: CoroutineContext, block: () -> T): T
20+
// countOrElement -- pre-cached value for ThreadContext.kt
21+
internal expect inline fun <T> withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T
2122
internal expect fun Continuation<*>.toDebugString(): String
2223
internal expect val CoroutineContext.coroutineName: String?

common/kotlinx-coroutines-core-common/src/Dispatched.kt

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ internal class DispatchedContinuation<in T>(
7272
@Suppress("PropertyName")
7373
internal var _state: Any? = UNDEFINED
7474
public override var resumeMode: Int = 0
75+
@JvmField // pre-cached value to avoid ctx.fold on every resumption
76+
internal val countOrElement = threadContextElements(context)
7577

7678
override fun takeState(): Any? {
7779
val state = _state
@@ -92,7 +94,7 @@ internal class DispatchedContinuation<in T>(
9294
dispatcher.dispatch(context, this)
9395
} else {
9496
UndispatchedEventLoop.execute(this, state, MODE_ATOMIC_DEFAULT) {
95-
withCoroutineContext(this.context) {
97+
withCoroutineContext(this.context, countOrElement) {
9698
continuation.resumeWith(result)
9799
}
98100
}
@@ -144,14 +146,14 @@ internal class DispatchedContinuation<in T>(
144146

145147
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
146148
inline fun resumeUndispatched(value: T) {
147-
withCoroutineContext(context) {
149+
withCoroutineContext(context, countOrElement) {
148150
continuation.resume(value)
149151
}
150152
}
151153

152154
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
153155
inline fun resumeUndispatchedWithException(exception: Throwable) {
154-
withCoroutineContext(context) {
156+
withCoroutineContext(context, countOrElement) {
155157
continuation.resumeWithException(exception)
156158
}
157159
}
@@ -208,7 +210,7 @@ internal interface DispatchedTask<in T> : Runnable {
208210
val context = continuation.context
209211
val job = if (resumeMode.isCancellableMode) context[Job] else null
210212
val state = takeState() // NOTE: Must take state in any case, even if cancelled
211-
withCoroutineContext(context) {
213+
withCoroutineContext(context, delegate.countOrElement) {
212214
if (job != null && !job.isActive)
213215
continuation.resumeWithException(job.getCancellationException())
214216
else {

common/kotlinx-coroutines-core-common/src/ResumeMode.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ internal fun <T> Continuation<T>.resumeUninterceptedMode(value: T, mode: Int) {
4343
MODE_ATOMIC_DEFAULT -> intercepted().resume(value)
4444
MODE_CANCELLABLE -> intercepted().resumeCancellable(value)
4545
MODE_DIRECT -> resume(value)
46-
MODE_UNDISPATCHED -> withCoroutineContext(context) { resume(value) }
46+
MODE_UNDISPATCHED -> withCoroutineContext(context, null) { resume(value) }
4747
MODE_IGNORE -> {}
4848
else -> error("Invalid mode $mode")
4949
}
@@ -54,7 +54,7 @@ internal fun <T> Continuation<T>.resumeUninterceptedWithExceptionMode(exception:
5454
MODE_ATOMIC_DEFAULT -> intercepted().resumeWithException(exception)
5555
MODE_CANCELLABLE -> intercepted().resumeCancellableWithException(exception)
5656
MODE_DIRECT -> resumeWithException(exception)
57-
MODE_UNDISPATCHED -> withCoroutineContext(context) { resumeWithException(exception) }
57+
MODE_UNDISPATCHED -> withCoroutineContext(context, null) { resumeWithException(exception) }
5858
MODE_IGNORE -> {}
5959
else -> error("Invalid mode $mode")
6060
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.internal
6+
7+
import kotlin.coroutines.*
8+
9+
internal expect fun threadContextElements(context: CoroutineContext): Any

common/kotlinx-coroutines-core-common/src/intrinsics/Undispatched.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ internal fun <R, T> (suspend (R) -> T).startCoroutineUnintercepted(receiver: R,
3737
*/
3838
internal fun <T> (suspend () -> T).startCoroutineUndispatched(completion: Continuation<T>) {
3939
startDirect(completion) {
40-
withCoroutineContext(completion.context) {
40+
withCoroutineContext(completion.context, null) {
4141
startCoroutineUninterceptedOrReturn(completion)
4242
}
4343
}
@@ -50,7 +50,7 @@ internal fun <T> (suspend () -> T).startCoroutineUndispatched(completion: Contin
5050
*/
5151
internal fun <R, T> (suspend (R) -> T).startCoroutineUndispatched(receiver: R, completion: Continuation<T>) {
5252
startDirect(completion) {
53-
withCoroutineContext(completion.context) {
53+
withCoroutineContext(completion.context, null) {
5454
startCoroutineUninterceptedOrReturn(receiver, completion)
5555
}
5656
}

core/kotlinx-coroutines-core/src/CoroutineContext.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,8 @@ public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext):
6363
/**
6464
* Executes a block using a given coroutine context.
6565
*/
66-
internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, block: () -> T): T {
67-
val oldValue = updateThreadContext(context)
66+
internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T {
67+
val oldValue = updateThreadContext(context, countOrElement)
6868
try {
6969
return block()
7070
} finally {

core/kotlinx-coroutines-core/src/internal/ThreadContext.kt

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,20 +57,24 @@ private val restoreState =
5757
return state
5858
}
5959

60-
internal fun updateThreadContext(context: CoroutineContext): Any? {
61-
val count = context.fold(0, countAll)
60+
internal actual fun threadContextElements(context: CoroutineContext): Any = context.fold(0, countAll)!!
61+
62+
// countOrElement is pre-cached in dispatched continuation
63+
internal fun updateThreadContext(context: CoroutineContext, countOrElement: Any?): Any? {
64+
@Suppress("NAME_SHADOWING")
65+
val countOrElement = countOrElement ?: threadContextElements(context)
6266
@Suppress("IMPLICIT_BOXING_IN_IDENTITY_EQUALS")
6367
return when {
64-
count === 0 -> ZERO // very fast path when there are no active ThreadContextElements
68+
countOrElement === 0 -> ZERO // very fast path when there are no active ThreadContextElements
6569
// ^^^ identity comparison for speed, we know zero always has the same identity
66-
count is Int -> {
70+
countOrElement is Int -> {
6771
// slow path for multiple active ThreadContextElements, allocates ThreadState for multiple old values
68-
context.fold(ThreadState(context, count), updateState)
72+
context.fold(ThreadState(context, countOrElement), updateState)
6973
}
7074
else -> {
7175
// fast path for one ThreadContextElement (no allocations, no additional context scan)
7276
@Suppress("UNCHECKED_CAST")
73-
val element = count as ThreadContextElement<Any?>
77+
val element = countOrElement as ThreadContextElement<Any?>
7478
element.updateThreadContext(context)
7579
}
7680
}

js/kotlinx-coroutines-core-js/src/CoroutineContext.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,6 @@ public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext):
3434
}
3535

3636
// No debugging facilities on JS
37-
internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, block: () -> T): T = block()
37+
internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T = block()
3838
internal actual fun Continuation<*>.toDebugString(): String = toString()
3939
internal actual val CoroutineContext.coroutineName: String? get() = null // not supported on JS
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.internal
6+
7+
import kotlin.coroutines.*
8+
9+
internal actual fun threadContextElements(context: CoroutineContext): Any = 0

native/kotlinx-coroutines-core-native/src/CoroutineContext.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,6 @@ public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext):
4444
}
4545

4646
// No debugging facilities on native
47-
internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, block: () -> T): T = block()
47+
internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T = block()
4848
internal actual fun Continuation<*>.toDebugString(): String = toString()
4949
internal actual val CoroutineContext.coroutineName: String? get() = null // not supported on native
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.internal
6+
7+
import kotlin.coroutines.*
8+
9+
internal actual fun threadContextElements(context: CoroutineContext): Any = 0

0 commit comments

Comments
 (0)