Skip to content

Commit f0c1d99

Browse files
author
Sergey Mashkov
committed
IO: fix error propagation in blocking adapter
1 parent 8fbdff4 commit f0c1d99

File tree

2 files changed

+103
-7
lines changed
  • core/kotlinx-coroutines-io/src
    • main/kotlin/kotlinx/coroutines/experimental/io/jvm/javaio
    • test/kotlin/kotlinx/coroutines/experimental/io

2 files changed

+103
-7
lines changed

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/jvm/javaio/Blocking.kt

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,16 +67,24 @@ private class OutputAdapter(parent: Job?, private val channel: ByteWriteChannel)
6767
try {
6868
while (true) {
6969
val task = rendezvous(0)
70-
if (task === CloseToken) break
71-
if (task === FlushToken) channel.flush()
70+
if (task === CloseToken) {
71+
break
72+
}
73+
else if (task === FlushToken) {
74+
channel.flush()
75+
channel.closedCause?.let { throw it }
76+
}
7277
else if (task is ByteArray) channel.writeFully(task, offset, length)
7378
}
7479
} catch (t: Throwable) {
7580
if (t !is CancellationException) {
7681
channel.close(t)
7782
}
83+
throw t
7884
} finally {
79-
channel.close()
85+
if (!channel.close()) {
86+
channel.closedCause?.let { throw it }
87+
}
8088
}
8189
}
8290
}
@@ -102,8 +110,12 @@ private class OutputAdapter(parent: Job?, private val channel: ByteWriteChannel)
102110

103111
@Synchronized
104112
override fun close() {
105-
loop.submitAndAwait(CloseToken)
106-
loop.shutdown()
113+
try {
114+
loop.submitAndAwait(CloseToken)
115+
loop.shutdown()
116+
} catch (t: Throwable) {
117+
throw IOException(t)
118+
}
107119
}
108120
}
109121

@@ -224,6 +236,12 @@ private abstract class BlockingAdapter(val parent: Job? = null) {
224236
LockSupport.park()
225237
}
226238

239+
state.value.let { state ->
240+
if (state is Throwable) {
241+
throw state
242+
}
243+
}
244+
227245
return result.value
228246
}
229247

core/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/JavaIOTest.kt

Lines changed: 80 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ class JavaIOTest : TestBase() {
149149
@Test
150150
fun testPipedALot() = runBlocking {
151151
val exec = newFixedThreadPoolContext(2, "blocking-io")
152-
val numberOfLines = 10000
152+
val numberOfLines = 1000 * stressTestMultiplier
153153
val pipe = Pipe.open()
154154

155155
val channel1 = ByteChannel()
@@ -235,7 +235,7 @@ class JavaIOTest : TestBase() {
235235
val data = ByteArray(4096)
236236
Random().nextBytes(data)
237237

238-
repeat(10000) {
238+
repeat(100 * stressTestMultiplier * stressTestMultiplierSqrt) {
239239
val channel = ByteChannel(false)
240240
launch(exec) {
241241
for (i in 1..count) {
@@ -276,4 +276,82 @@ class JavaIOTest : TestBase() {
276276
}
277277
}
278278
}
279+
280+
@Test
281+
fun testOutputAdapterExceptionFromWrite() = runTest {
282+
val channel = ByteChannel(true)
283+
284+
val output = channel.toOutputStream()
285+
286+
launch(coroutineContext) {
287+
channel.cancel()
288+
}
289+
290+
yield()
291+
292+
try {
293+
output.write(1)
294+
fail("write() should fail")
295+
} catch (expected: CancellationException) {
296+
}
297+
}
298+
299+
@Test
300+
fun testOutputAdapterExceptionFromClose() = runTest {
301+
val channel = ByteChannel(true)
302+
303+
val output = channel.toOutputStream()
304+
305+
launch(coroutineContext) {
306+
channel.cancel()
307+
}
308+
309+
yield()
310+
311+
try {
312+
output.close()
313+
fail("close() should fail")
314+
} catch (expected: IOException) {
315+
}
316+
}
317+
318+
@Test
319+
fun testOutputAdapterExceptionFromFlush() = runTest {
320+
val channel = ByteChannel(true)
321+
322+
val output = channel.toOutputStream()
323+
324+
launch(coroutineContext) {
325+
channel.cancel()
326+
}
327+
328+
yield()
329+
330+
try {
331+
output.flush()
332+
fail("flush() should fail")
333+
} catch (expected: CancellationException) {
334+
}
335+
}
336+
337+
@Test
338+
fun testOutputAdapterExceptionFromUse() = runTest {
339+
val channel = ByteChannel(true)
340+
341+
val output = channel.toOutputStream()
342+
343+
launch(coroutineContext) {
344+
channel.cancel()
345+
}
346+
347+
yield()
348+
349+
try {
350+
output.use {
351+
output.write(1)
352+
}
353+
fail("write() should fail")
354+
} catch (expected: CancellationException) {
355+
}
356+
}
279357
}

0 commit comments

Comments
 (0)