Skip to content

Commit 666ea6e

Browse files
authored
Merge pull request #791 from Kotlin/undispatched-cancellation
Use final Job state in undispatched coroutine completion
2 parents 6042b8e + 510ccfd commit 666ea6e

File tree

7 files changed

+188
-30
lines changed

7 files changed

+188
-30
lines changed

common/kotlinx-coroutines-core-common/src/Timeout.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ private fun <U, T: U> setupTimeout(
7474
coroutine.disposeOnCompletion(context.delay.invokeOnTimeout(coroutine.time, coroutine))
7575
// restart block using new coroutine with new job,
7676
// however start it as undispatched coroutine, because we are already in the proper context
77-
return coroutine.startUndispatchedOrReturn(coroutine, block)
77+
return coroutine.startUndispatchedOrReturnIgnoreTimeout(coroutine, block)
7878
}
7979

8080
private open class TimeoutCoroutine<U, in T: U>(

common/kotlinx-coroutines-core-common/src/intrinsics/Undispatched.kt

+46-5
Original file line numberDiff line numberDiff line change
@@ -70,29 +70,70 @@ private inline fun <T> startDirect(completion: Continuation<T>, block: () -> Any
7070
}
7171

7272
/**
73-
* Starts this coroutine with the given code [block] in the same context and returns result when it
73+
* Starts this coroutine with the given code [block] in the same context and returns coroutine result when it
7474
* completes without suspension.
7575
* This function shall be invoked at most once on this coroutine.
76+
* This function checks cancellation of the outer [Job] on fast-path.
7677
*
7778
* First, this function initializes parent job from the `parentContext` of this coroutine that was passed to it
7879
* during construction. Second, it starts the coroutine using [startCoroutineUninterceptedOrReturn].
7980
*/
8081
internal fun <T, R> AbstractCoroutine<T>.startUndispatchedOrReturn(receiver: R, block: suspend R.() -> T): Any? {
8182
initParentJob()
82-
return undispatchedResult { block.startCoroutineUninterceptedOrReturn(receiver, this) }
83+
return undispatchedResult({ true }) {
84+
block.startCoroutineUninterceptedOrReturn(receiver, this)
85+
}
8386
}
8487

85-
private inline fun <T> AbstractCoroutine<T>.undispatchedResult(startBlock: () -> Any?): Any? {
88+
/**
89+
* Same as [startUndispatchedOrReturn], but ignores [TimeoutCancellationException] on fast-path.
90+
*/
91+
internal fun <T, R> AbstractCoroutine<T>.startUndispatchedOrReturnIgnoreTimeout(
92+
receiver: R, block: suspend R.() -> T): Any? {
93+
initParentJob()
94+
return undispatchedResult({ e -> !(e is TimeoutCancellationException && e.coroutine === this) }) {
95+
block.startCoroutineUninterceptedOrReturn(receiver, this)
96+
}
97+
}
98+
99+
private inline fun <T> AbstractCoroutine<T>.undispatchedResult(
100+
shouldThrow: (Throwable) -> Boolean,
101+
startBlock: () -> Any?
102+
): Any? {
86103
val result = try {
87104
startBlock()
88105
} catch (e: Throwable) {
89106
CompletedExceptionally(e)
90107
}
108+
109+
/*
110+
* We're trying to complete our undispatched block here and have three code-paths:
111+
* 1) Suspended.
112+
*
113+
* Or we are completing our block (and its job).
114+
* 2) If we can't complete it, we suspend, probably waiting for children (2)
115+
* 3) If we have successfully completed the whole coroutine here in an undispatched manner,
116+
* we should decide which result to return. We have two options: either return proposed update or actual final state.
117+
* But if fact returning proposed value is not an option, otherwise we will ignore possible cancellation or child failure.
118+
*
119+
* shouldThrow parameter is a special code path for timeout coroutine:
120+
* If timeout is exceeded, but withTimeout() block was not suspended, we would like to return block value,
121+
* not a timeout exception.
122+
*/
91123
return when {
92124
result === COROUTINE_SUSPENDED -> COROUTINE_SUSPENDED
93125
makeCompletingOnce(result, MODE_IGNORE) -> {
94-
if (result is CompletedExceptionally) throw result.cause else result
126+
val state = state
127+
if (state is CompletedExceptionally) {
128+
when {
129+
shouldThrow(state.cause) -> throw state.cause
130+
result is CompletedExceptionally -> throw result.cause
131+
else -> result
132+
}
133+
} else {
134+
state
135+
}
95136
}
96137
else -> COROUTINE_SUSPENDED
97138
}
98-
}
139+
}

common/kotlinx-coroutines-core-common/test/TestBase.common.kt

+12
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
package kotlinx.coroutines
66

7+
import kotlin.coroutines.*
8+
79
public expect open class TestBase constructor() {
810
public val isStressTest: Boolean
911
public val stressTestMultiplier: Int
@@ -26,3 +28,13 @@ public class TestException1(message: String? = null) : Throwable(message)
2628
public class TestException2(message: String? = null) : Throwable(message)
2729
public class TestException3(message: String? = null) : Throwable(message)
2830
public class TestRuntimeException(message: String? = null) : RuntimeException(message)
31+
32+
// Wrap context to avoid fast-paths on dispatcher comparison
33+
public fun wrapperDispatcher(context: CoroutineContext): CoroutineContext {
34+
val dispatcher = context[ContinuationInterceptor] as CoroutineDispatcher
35+
return object : CoroutineDispatcher() {
36+
override fun dispatch(context: CoroutineContext, block: Runnable) {
37+
dispatcher.dispatch(context, block)
38+
}
39+
}
40+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
import kotlin.coroutines.*
8+
import kotlin.test.*
9+
10+
class UndispatchedResultTest : TestBase() {
11+
12+
@Test
13+
fun testWithContext() = runTest {
14+
invokeTest { block -> withContext(wrapperDispatcher(coroutineContext), block) }
15+
}
16+
17+
@Test
18+
fun testWithContextFastPath() = runTest {
19+
invokeTest { block -> withContext(coroutineContext, block) }
20+
}
21+
22+
@Test
23+
fun testWithTimeout() = runTest {
24+
invokeTest { block -> withTimeout(Long.MAX_VALUE, block) }
25+
}
26+
27+
@Test
28+
fun testAsync() = runTest {
29+
invokeTest { block -> async(NonCancellable, block = block).await() }
30+
}
31+
32+
@Test
33+
fun testCoroutineScope() = runTest {
34+
invokeTest { block -> coroutineScope(block) }
35+
}
36+
37+
private suspend fun invokeTest(scopeProvider: suspend (suspend CoroutineScope.() -> Unit) -> Unit) {
38+
invokeTest(EmptyCoroutineContext, scopeProvider)
39+
invokeTest(Unconfined, scopeProvider)
40+
}
41+
42+
private suspend fun invokeTest(
43+
context: CoroutineContext,
44+
scopeProvider: suspend (suspend CoroutineScope.() -> Unit) -> Unit
45+
) {
46+
try {
47+
scopeProvider { block(context) }
48+
} catch (e: TestException) {
49+
finish(5)
50+
reset()
51+
}
52+
}
53+
54+
private suspend fun CoroutineScope.block(context: CoroutineContext) {
55+
try {
56+
expect(1)
57+
// Will cancel its parent
58+
async(context) {
59+
expect(2)
60+
throw TestException()
61+
}.await()
62+
} catch (e: TestException) {
63+
expect(3)
64+
}
65+
expect(4)
66+
}
67+
}

common/kotlinx-coroutines-core-common/test/WithContextTest.kt

+17-22
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
package kotlinx.coroutines
99

10-
import kotlin.coroutines.*
1110
import kotlin.test.*
1211

1312
class WithContextTest : TestBase() {
@@ -85,20 +84,25 @@ class WithContextTest : TestBase() {
8584
}
8685
expect(2)
8786
val job = Job()
88-
val result = withContext(coroutineContext + job) { // same context + new job
89-
expect(3) // still here
90-
job.cancel() // cancel out job!
91-
try {
92-
yield() // shall throw CancellationException
93-
expectUnreached()
94-
} catch (e: CancellationException) {
95-
expect(4)
87+
try {
88+
withContext(coroutineContext + job) {
89+
// same context + new job
90+
expect(3) // still here
91+
job.cancel() // cancel out job!
92+
try {
93+
yield() // shall throw CancellationException
94+
expectUnreached()
95+
} catch (e: CancellationException) {
96+
expect(4)
97+
}
98+
"OK"
9699
}
97-
"OK"
100+
101+
expectUnreached()
102+
} catch (e: CancellationException) {
103+
expect(5)
104+
// will wait for the first coroutine
98105
}
99-
assertEquals("OK", result)
100-
expect(5)
101-
// will wait for the first coroutine
102106
}
103107

104108
@Test
@@ -300,13 +304,4 @@ class WithContextTest : TestBase() {
300304
}
301305
finish(5)
302306
}
303-
304-
private fun wrapperDispatcher(context: CoroutineContext): CoroutineContext {
305-
val dispatcher = context[ContinuationInterceptor] as CoroutineDispatcher
306-
return object : CoroutineDispatcher() {
307-
override fun dispatch(context: CoroutineContext, block: Runnable) {
308-
dispatcher.dispatch(context, block)
309-
}
310-
}
311-
}
312307
}

common/kotlinx-coroutines-core-common/test/WithTimeoutOrNullTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -238,4 +238,4 @@ class WithTimeoutOrNullTest : TestBase() {
238238
finish(4)
239239
}
240240
}
241-
}
241+
}

core/kotlinx-coroutines-core/test/WithTimeoutOrNullJvmTest.kt

+44-1
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,47 @@ class WithTimeoutOrNullJvmTest : TestBase() {
2121
// outer timeout results in null
2222
assertEquals(null, result)
2323
}
24-
}
24+
25+
@Test
26+
fun testIgnoredTimeout() = runTest {
27+
val value = withTimeout(1) {
28+
Thread.sleep(10)
29+
42
30+
}
31+
32+
assertEquals(42, value)
33+
}
34+
35+
@Test
36+
fun testIgnoredTimeoutOnNull() = runTest {
37+
val value = withTimeoutOrNull(1) {
38+
Thread.sleep(10)
39+
42
40+
}
41+
42+
assertEquals(42, value)
43+
}
44+
45+
@Test
46+
fun testIgnoredTimeoutOnNullThrowsCancellation() = runTest {
47+
try {
48+
withTimeoutOrNull(1) {
49+
expect(1)
50+
Thread.sleep(10)
51+
throw CancellationException()
52+
}
53+
expectUnreached()
54+
} catch (e: CancellationException) {
55+
finish(2)
56+
}
57+
}
58+
59+
@Test
60+
fun testIgnoredTimeoutOnNullThrowsOnYield() = runTest {
61+
val value = withTimeoutOrNull(1) {
62+
Thread.sleep(10)
63+
yield()
64+
}
65+
assertNull(value)
66+
}
67+
}

0 commit comments

Comments
 (0)