Skip to content

Commit 9faa039

Browse files
author
Sergey Mashkov
committed
IO: fix joining and continuous writing byte array interference
1 parent e4e064c commit 9faa039

File tree

2 files changed

+37
-2
lines changed

2 files changed

+37
-2
lines changed

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt

+10-2
Original file line numberDiff line numberDiff line change
@@ -1428,7 +1428,10 @@ internal class ByteBufferChannel(
14281428
private tailrec suspend fun writeFullySuspend(src: ByteArray, offset: Int, length: Int) {
14291429
if (length == 0) return
14301430
writeSuspendUnit(src, offset, length)
1431-
val copied = writeSuspendResult
1431+
val copied = writeSuspendResult.takeIf { it != -1 } ?:
1432+
joining?.let { resolveDelegation(this, it) }?.writeSuspendResult?.takeIf { it != -1 } ?:
1433+
error("-1 should be only set in case of joining")
1434+
14321435
return writeFullySuspend(src, offset + copied, length - copied)
14331436
}
14341437

@@ -1446,7 +1449,12 @@ internal class ByteBufferChannel(
14461449
while (true) {
14471450
tryWriteSuspend(1)
14481451

1449-
joining?.let { resolveDelegation(this, it)?.let { return it.writeSuspendUnit(src, offset, length) } }
1452+
joining?.let {
1453+
resolveDelegation(this, it)?.let {
1454+
writeSuspendResult = -1;
1455+
return it.writeSuspendUnit(src, offset, length)
1456+
}
1457+
}
14501458

14511459
val size = writeAsMuchAsPossible(src, offset, length)
14521460
if (size > 0) {

core/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannelTest.kt

+27
Original file line numberDiff line numberDiff line change
@@ -1618,6 +1618,33 @@ class ByteBufferChannelTest : TestBase() {
16181618
assertEquals(0x22, next.readByte())
16191619
}
16201620

1621+
@Test
1622+
fun testJoiningDuringWriteFully() = runTest {
1623+
val bb = ByteArray(65536)
1624+
Random().nextBytes(bb)
1625+
val dest = ByteChannel()
1626+
1627+
launch(coroutineContext) {
1628+
expect(1)
1629+
ch.writeFully(bb)
1630+
ch.flush()
1631+
}
1632+
yield()
1633+
expect(2)
1634+
1635+
launch {
1636+
expect(3)
1637+
ch.joinTo(dest, false)
1638+
}
1639+
yield()
1640+
1641+
val result = ByteArray(bb.size)
1642+
dest.readFully(result)
1643+
1644+
Assert.assertArrayEquals(bb, result)
1645+
finish(4)
1646+
}
1647+
16211648
private inline fun buildPacket(block: ByteWritePacket.() -> Unit): ByteReadPacket {
16221649
val builder = BytePacketBuilder(0, pktPool)
16231650
try {

0 commit comments

Comments
 (0)