Skip to content

Commit 8046fe1

Browse files
committed
ValueBroadcastChannel -> ConflatedBroadcastChannel
1 parent ca9d5be commit 8046fe1

File tree

4 files changed

+18
-17
lines changed

4 files changed

+18
-17
lines changed
Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,11 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater
2424
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
2525

2626
/**
27-
* Broadcasts the most recently sent value (aka [value]) to all [open] subscribers.
27+
* Broadcasts the most recently sent element (aka [value]) to all [open] subscribers.
2828
*
2929
* Back-to-send sent elements are _conflated_ -- only the the most recently sent value is received,
3030
* while previously sent elements **are lost**.
31+
* Every subscriber immediately receives the most recently sent element.
3132
* Sender to this broadcast channel never suspends and [offer] always returns `true`.
3233
*
3334
* A secondary constructor can be used to create an instance of this class that already holds a value.
@@ -36,12 +37,12 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
3637
* [opening][open] and [closing][SubscriptionReceiveChannel.close] subscription takes O(N) time, where N is the
3738
* number of subscribers.
3839
*/
39-
public class ValueBroadcastChannel<E>() : BroadcastChannel<E> {
40+
public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
4041
/**
4142
* Creates an instance of this class that already holds a value.
4243
*
4344
* It is as a shortcut to creating an instance with a default constructor and
44-
* immediately sending a value: `ValueBroadcastChannel().apply { offer(value) }`.
45+
* immediately sending an element: `ConflatedBroadcastChannel().apply { offer(value) }`.
4546
*/
4647
constructor(value: E) : this() {
4748
state = State<E>(value, null)
@@ -56,12 +57,12 @@ public class ValueBroadcastChannel<E>() : BroadcastChannel<E> {
5657

5758
private companion object {
5859
@JvmField
59-
val STATE: AtomicReferenceFieldUpdater<ValueBroadcastChannel<*>, Any> = AtomicReferenceFieldUpdater.
60-
newUpdater(ValueBroadcastChannel::class.java, Any::class.java, "state")
60+
val STATE: AtomicReferenceFieldUpdater<ConflatedBroadcastChannel<*>, Any> = AtomicReferenceFieldUpdater.
61+
newUpdater(ConflatedBroadcastChannel::class.java, Any::class.java, "state")
6162

6263
@JvmField
63-
val UPDATING: AtomicIntegerFieldUpdater<ValueBroadcastChannel<*>> = AtomicIntegerFieldUpdater.
64-
newUpdater(ValueBroadcastChannel::class.java, "updating")
64+
val UPDATING: AtomicIntegerFieldUpdater<ConflatedBroadcastChannel<*>> = AtomicIntegerFieldUpdater.
65+
newUpdater(ConflatedBroadcastChannel::class.java, "updating")
6566

6667
@JvmField
6768
val CLOSED = Closed(null)
@@ -257,7 +258,7 @@ public class ValueBroadcastChannel<E>() : BroadcastChannel<E> {
257258
}
258259

259260
private class Subscriber<E>(
260-
private val broadcastChannel: ValueBroadcastChannel<E>
261+
private val broadcastChannel: ConflatedBroadcastChannel<E>
261262
) : ConflatedChannel<E>(), SubscriptionReceiveChannel<E> {
262263
override fun close() {
263264
if (close(cause = null))
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@ import org.hamcrest.core.IsNull
2323
import org.junit.Assert.*
2424
import org.junit.Test
2525

26-
class ValueBroadcastChannelTest : TestBase() {
26+
class ConflatedBroadcastChannelTest : TestBase() {
2727
@Test
2828
fun testBasicScenario() = runBlocking {
2929
expect(1)
30-
val broadcast = ValueBroadcastChannel<String>()
30+
val broadcast = ConflatedBroadcastChannel<String>()
3131
assertThat(exceptionFrom { broadcast.value }, IsInstanceOf(IllegalStateException::class.java))
3232
assertThat(broadcast.valueOrNull, IsNull())
3333
launch(context, CoroutineStart.UNDISPATCHED) {
@@ -88,7 +88,7 @@ class ValueBroadcastChannelTest : TestBase() {
8888
@Test
8989
fun testInitialValueAndReceiveClosed() = runBlocking {
9090
expect(1)
91-
val broadcast = ValueBroadcastChannel<Int>(1)
91+
val broadcast = ConflatedBroadcastChannel<Int>(1)
9292
assertThat(broadcast.value, IsEqual(1))
9393
assertThat(broadcast.valueOrNull, IsEqual(1))
9494
launch(context, CoroutineStart.UNDISPATCHED) {

reactive/coroutines-guide-reactive.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -493,11 +493,11 @@ This is quite the desired behavior for any kind of state-holding variable that n
493493
other linked state, for example. There is no reason to react to back-to-back updates of the state.
494494
Only the most recent state is relevant.
495495

496-
The corresponding behavior in coroutines world is implemented by [ValueBroadcastChannel] that provides the same logic
496+
The corresponding behavior in coroutines world is implemented by [ConflatedBroadcastChannel] that provides the same logic
497497
on top of coroutine channels directly, without going through the bridge to the reactive streams:
498498

499499
<!--- INCLUDE
500-
import kotlinx.coroutines.experimental.channels.ValueBroadcastChannel
500+
import kotlinx.coroutines.experimental.channels.ConflatedBroadcastChannel
501501
import kotlinx.coroutines.experimental.channels.consumeEach
502502
import kotlinx.coroutines.experimental.launch
503503
import kotlinx.coroutines.experimental.runBlocking
@@ -506,7 +506,7 @@ import kotlinx.coroutines.experimental.yield
506506

507507
```kotlin
508508
fun main(args: Array<String>) = runBlocking<Unit> {
509-
val broadcast = ValueBroadcastChannel<String>()
509+
val broadcast = ConflatedBroadcastChannel<String>()
510510
broadcast.offer("one")
511511
broadcast.offer("two")
512512
// now launch a coroutine to print everything
@@ -1055,7 +1055,7 @@ coroutines for complex pipelines with fan-in and fan-out between multiple worker
10551055
[SubscriptionReceiveChannel.close]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-subscription-receive-channel/close.html
10561056
[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/send.html
10571057
[BroadcastChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-broadcast-channel/index.html
1058-
[ValueBroadcastChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-value-broadcast-channel/index.html
1058+
[ConflatedBroadcastChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-conflated-broadcast-channel/index.html
10591059
<!--- INDEX kotlinx.coroutines.experimental.selects -->
10601060
[select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/select.html
10611061
[whileSelect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/while-select.html

reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-09.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@
1717
// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
1818
package guide.reactive.basic.example09
1919

20-
import kotlinx.coroutines.experimental.channels.ValueBroadcastChannel
20+
import kotlinx.coroutines.experimental.channels.ConflatedBroadcastChannel
2121
import kotlinx.coroutines.experimental.channels.consumeEach
2222
import kotlinx.coroutines.experimental.launch
2323
import kotlinx.coroutines.experimental.runBlocking
2424
import kotlinx.coroutines.experimental.yield
2525

2626
fun main(args: Array<String>) = runBlocking<Unit> {
27-
val broadcast = ValueBroadcastChannel<String>()
27+
val broadcast = ConflatedBroadcastChannel<String>()
2828
broadcast.offer("one")
2929
broadcast.offer("two")
3030
// now launch a coroutine to print everything

0 commit comments

Comments
 (0)