Skip to content

Commit b2b5c06

Browse files
committed
Fixed hanged receive from a closed subscription of BroadcastChannel
Fixes #226
1 parent ab85ba8 commit b2b5c06

File tree

2 files changed

+16
-3
lines changed

2 files changed

+16
-3
lines changed

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -339,8 +339,9 @@ class ArrayBroadcastChannel<E>(
339339
val closedBroadcast = broadcastChannel.closedForReceive // unguarded volatile read
340340
val tail = broadcastChannel.tail // unguarded volatile read
341341
if (subHead >= tail) {
342-
// no elements to poll from the queue -- check if closed
343-
return closedBroadcast ?: POLL_FAILED // must retest `needsToCheckOfferWithoutLock` outside of the lock
342+
// no elements to poll from the queue -- check if closed broads & closed this sub
343+
// must retest `needsToCheckOfferWithoutLock` outside of the lock
344+
return closedBroadcast ?: this.closedForReceive ?: POLL_FAILED
344345
}
345346
// Get tentative result. This result may be wrong (completely invalid value, including null),
346347
// because this subscription might get closed, moving channel's head past this subscription's head.

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package kotlinx.coroutines.experimental.channels
1919
import kotlinx.coroutines.experimental.*
2020
import org.hamcrest.core.IsEqual
2121
import org.hamcrest.core.IsNull
22-
import org.junit.Assert.assertThat
22+
import org.junit.Assert.*
2323
import org.junit.Test
2424

2525
class ArrayBroadcastChannelTest : TestBase() {
@@ -166,4 +166,16 @@ class ArrayBroadcastChannelTest : TestBase() {
166166
}
167167
check(expected == 2)
168168
}
169+
170+
@Test
171+
fun testReceiveFromClosedSub() = runTest(
172+
expected = { it is ClosedReceiveChannelException }
173+
) {
174+
val channel = BroadcastChannel<Int>(1)
175+
val sub = channel.openSubscription()
176+
assertFalse(sub.isClosedForReceive)
177+
sub.close()
178+
assertTrue(sub.isClosedForReceive)
179+
sub.receive()
180+
}
169181
}

0 commit comments

Comments
 (0)