Skip to content

Commit faa4774

Browse files
committed
Do not step into thread-local event-loop on CancellableContinuation#dispatch fast-path
1 parent 09b9d6c commit faa4774

File tree

3 files changed

+66
-14
lines changed

3 files changed

+66
-14
lines changed

common/kotlinx-coroutines-core-common/src/Dispatched.kt

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,14 @@ internal object UndispatchedEventLoop {
3333
runLoop(state, block)
3434
}
3535

36-
inline fun execute(task: DispatchedTask<*>, block: () -> Unit) {
36+
fun resumeUndispatched(task: DispatchedTask<*>) {
3737
val state = state.get()
3838
if (state.isActive) {
3939
state.threadLocalQueue.addLast(task)
4040
return
4141
}
4242

43-
runLoop(state, block)
43+
runLoop(state, { task.resume(task.delegate, MODE_UNDISPATCHED) })
4444
}
4545

4646
inline fun runLoop(state: State, block: () -> Unit) {
@@ -226,28 +226,28 @@ internal interface DispatchedTask<in T> : Runnable {
226226
}
227227

228228
internal fun <T> DispatchedTask<T>.dispatch(mode: Int = MODE_CANCELLABLE) {
229-
var useMode = mode
230229
val delegate = this.delegate
231230
if (mode.isDispatchedMode && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) {
232231
// dispatch directly using this instance's Runnable implementation
233232
val dispatcher = delegate.dispatcher
234233
val context = delegate.context
235234
if (dispatcher.isDispatchNeeded(context)) {
236235
dispatcher.dispatch(context, this)
237-
return // and that's it -- dispatched via fast-path
238236
} else {
239-
useMode = MODE_UNDISPATCHED
237+
UndispatchedEventLoop.resumeUndispatched(this)
240238
}
239+
} else {
240+
resume(delegate, mode)
241241
}
242+
}
242243

243-
UndispatchedEventLoop.execute(this) {
244-
// slow-path - use delegate
245-
val state = takeState()
246-
val exception = getExceptionalResult(state)
247-
if (exception != null) {
248-
delegate.resumeWithExceptionMode(exception, useMode)
249-
} else {
250-
delegate.resumeMode(getSuccessfulResult(state), useMode)
251-
}
244+
internal fun <T> DispatchedTask<T>.resume(delegate: Continuation<T>, useMode: Int) {
245+
// slow-path - use delegate
246+
val state = takeState()
247+
val exception = getExceptionalResult(state)
248+
if (exception != null) {
249+
delegate.resumeWithExceptionMode(exception, useMode)
250+
} else {
251+
delegate.resumeMode(getSuccessfulResult(state), useMode)
252252
}
253253
}

core/kotlinx-coroutines-core/test/RunBlockingTest.kt

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,4 +103,18 @@ class RunBlockingTest : TestBase() {
103103
finish(4)
104104
}
105105
}
106+
107+
@Test
108+
fun testNestedRunBlocking() = runBlocking {
109+
delay(100)
110+
val value = runBlocking {
111+
delay(100)
112+
runBlocking {
113+
delay(100)
114+
1
115+
}
116+
}
117+
118+
assertEquals(1, value)
119+
}
106120
}

core/kotlinx-coroutines-core/test/test/TestCoroutineContextTest.kt

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,44 @@ class TestCoroutineContextTest {
137137
}.await()
138138
}
139139

140+
@Test
141+
fun testBlockingFunctionWithRunBlocking() = withTestContext(injectedContext) {
142+
val delay = 1000L
143+
val expectedValue = 16
144+
val result = runBlocking {
145+
suspendedBlockingFunction(delay) {
146+
expectedValue
147+
}
148+
}
149+
assertEquals(expectedValue, result)
150+
assertEquals(delay, now())
151+
}
152+
153+
@Test
154+
fun testBlockingFunctionWithAsync() = withTestContext(injectedContext) {
155+
val delay = 1000L
156+
val expectedValue = 16
157+
var now = 0L
158+
val deferred = async {
159+
suspendedBlockingFunction(delay) {
160+
expectedValue
161+
}
162+
}
163+
now += advanceTimeBy((delay / 4) - 1)
164+
assertEquals((delay / 4) - 1, now)
165+
assertEquals(now, now())
166+
try {
167+
deferred.getCompleted()
168+
fail("The Job should not have been completed yet.")
169+
} catch (e: Exception) {
170+
// Success.
171+
}
172+
now += advanceTimeBy(1)
173+
assertEquals(delay, now())
174+
assertEquals(now, now())
175+
assertEquals(expectedValue, deferred.getCompleted())
176+
}
177+
140178
private suspend fun <T> TestCoroutineContext.suspendedBlockingFunction(delay: Long, function: () -> T): T {
141179
delay(delay / 4)
142180
return runBlocking {

0 commit comments

Comments
 (0)