Skip to content

Commit e3aa8ff

Browse files
committed
ArrayBroadcastChannel
1 parent 8046fe1 commit e3aa8ff

File tree

10 files changed

+731
-213
lines changed

10 files changed

+731
-213
lines changed

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt

+196-182
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,346 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental.channels
18+
19+
import kotlinx.coroutines.experimental.ALREADY_SELECTED
20+
import kotlinx.coroutines.experimental.selects.SelectInstance
21+
import java.util.concurrent.CopyOnWriteArrayList
22+
import java.util.concurrent.locks.ReentrantLock
23+
import kotlin.concurrent.withLock
24+
25+
/**
26+
* Broadcast channel with array buffer of a fixed [capacity].
27+
* Sender suspends only when buffer is fully due to one of the receives not being late and
28+
* receiver suspends only when buffer is empty.
29+
*
30+
* Note, that elements that are sent to the broadcast channel while there are no [open] subscribers are immediately
31+
* lost.
32+
*
33+
* This implementation uses lock to protect the buffer, which is held only during very short buffer-update operations.
34+
* The lock at each subscription is also used to manage concurrent attempts to receive from the same subscriber.
35+
* The lists of suspended senders or receivers are lock-free.
36+
*/
37+
class ArrayBroadcastChannel<E>(
38+
/**
39+
* Buffer capacity.
40+
*/
41+
val capacity: Int
42+
) : AbstractSendChannel<E>(), BroadcastChannel<E> {
43+
init {
44+
check(capacity >= 1) { "ArrayBroadcastChannel capacity must be at least 1, but $capacity was specified" }
45+
}
46+
47+
private val bufferLock = ReentrantLock()
48+
private val buffer: Array<Any?> = arrayOfNulls<Any?>(capacity) // guarded by lock
49+
50+
// head & tail are Long (64 bits) and we assume that they never wrap around
51+
// head, tail, and size are guarded by bufferLock
52+
@Volatile
53+
private var head: Long = 0 // do modulo on use of head
54+
@Volatile
55+
private var tail: Long = 0 // do modulo on use of tail
56+
@Volatile
57+
private var size: Int = 0
58+
59+
private val subs = CopyOnWriteArrayList<Subscriber<E>>()
60+
61+
override val isBufferAlwaysFull: Boolean get() = false
62+
override val isBufferFull: Boolean get() = size >= capacity
63+
64+
override fun open(): SubscriptionReceiveChannel<E> {
65+
val sub = Subscriber(this, head)
66+
subs.add(sub)
67+
// between creating and adding of subscription into the list the buffer head could have been bumped past it,
68+
// so here we check if it did happen and update the head in subscription in this case
69+
// we did not leak newly created subscription yet, so its subHead cannot update
70+
val head = this.head // volatile read after sub was added to subs
71+
if (head != sub.subHead) {
72+
// needs update
73+
sub.subHead = head
74+
updateHead() // and also must recompute head of the buffer
75+
}
76+
return sub
77+
}
78+
79+
override fun close(cause: Throwable?): Boolean {
80+
if (!super.close(cause)) return false
81+
checkSubOffers()
82+
return true
83+
}
84+
85+
// result is `OFFER_SUCCESS | OFFER_FAILED | Closed`
86+
override fun offerInternal(element: E): Any {
87+
bufferLock.withLock {
88+
// check if closed for send (under lock, so size cannot change)
89+
closedForSend?.let { return it }
90+
val size = this.size
91+
if (size >= capacity) return OFFER_FAILED
92+
val tail = this.tail
93+
buffer[(tail % capacity).toInt()] = element
94+
this.size = size + 1
95+
this.tail = tail + 1
96+
}
97+
// if offered successfully, then check subs outside of lock
98+
checkSubOffers()
99+
return OFFER_SUCCESS
100+
}
101+
102+
// result is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`
103+
override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
104+
bufferLock.withLock {
105+
// check if closed for send (under lock, so size cannot change)
106+
closedForSend?.let { return it }
107+
val size = this.size
108+
if (size >= capacity) return OFFER_FAILED
109+
// let's try to select sending this element to buffer
110+
if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
111+
return ALREADY_SELECTED
112+
}
113+
val tail = this.tail
114+
buffer[(tail % capacity).toInt()] = element
115+
this.size = size + 1
116+
this.tail = tail + 1
117+
}
118+
// if offered successfully, then check subs outside of lock
119+
checkSubOffers()
120+
return OFFER_SUCCESS
121+
}
122+
123+
private fun closeSubscriber(sub: Subscriber<E>) {
124+
subs.remove(sub)
125+
if (head == sub.subHead)
126+
updateHead()
127+
}
128+
129+
private fun checkSubOffers() {
130+
var updated = false
131+
@Suppress("LoopToCallChain") // must invoke `checkOffer` on every sub
132+
for (sub in subs) {
133+
if (sub.checkOffer()) updated = true
134+
}
135+
if (updated)
136+
updateHead()
137+
}
138+
139+
private fun updateHead() {
140+
// compute minHead w/o lock (it will be eventually consistent)
141+
val minHead = computeMinHead()
142+
// update head in a loop
143+
while (true) {
144+
var send: Send? = null
145+
var token: Any? = null
146+
bufferLock.withLock {
147+
val tail = this.tail
148+
var head = this.head
149+
val targetHead = minHead.coerceAtMost(tail)
150+
if (targetHead <= head) return // nothing to do -- head was already moved
151+
var size = this.size
152+
// clean up removed (on not need if we don't have any subscribers anymore)
153+
while (head < targetHead) {
154+
buffer[(head % capacity).toInt()] = null
155+
val wasFull = size >= capacity
156+
// update the size before checking queue (no more senders can queue up)
157+
this.head = ++head
158+
this.size = --size
159+
if (wasFull) {
160+
while (true) {
161+
send = takeFirstSendOrPeekClosed() ?: break // when when no sender
162+
if (send is Closed<*>) break // break when closed for send
163+
token = send!!.tryResumeSend(idempotent = null)
164+
if (token != null) {
165+
// put sent element to the buffer
166+
buffer[(tail % capacity).toInt()] = (send as Send).pollResult
167+
this.size = size + 1
168+
this.tail = tail + 1
169+
return@withLock // go out of lock to wakeup this sender
170+
}
171+
}
172+
}
173+
}
174+
return // done updating here -> return
175+
}
176+
// we only get out of the lock normally when there is a sender to resume
177+
send!!.completeResumeSend(token!!)
178+
// since we've just sent an element, we might need to resume some receivers
179+
checkSubOffers()
180+
}
181+
}
182+
183+
private fun computeMinHead(): Long {
184+
var minHead = Long.MAX_VALUE
185+
for (sub in subs)
186+
minHead = minHead.coerceAtMost(sub.subHead) // volatile (atomic) reads of subHead
187+
return minHead
188+
}
189+
190+
@Suppress("UNCHECKED_CAST")
191+
private fun elementAt(index: Long): E = buffer[(index % capacity).toInt()] as E
192+
193+
private class Subscriber<E>(
194+
private val broadcastChannel: ArrayBroadcastChannel<E>,
195+
@Volatile @JvmField var subHead: Long // guarded by lock
196+
) : AbstractChannel<E>(), SubscriptionReceiveChannel<E> {
197+
private val lock = ReentrantLock()
198+
199+
override val isBufferAlwaysEmpty: Boolean get() = false
200+
override val isBufferEmpty: Boolean get() = subHead >= broadcastChannel.tail
201+
override val isBufferAlwaysFull: Boolean get() = error("Should not be used")
202+
override val isBufferFull: Boolean get() = error("Should not be used")
203+
204+
override fun close() {
205+
if (close(cause = null))
206+
broadcastChannel.closeSubscriber(this)
207+
}
208+
209+
// returns true if subHead was updated and broadcast channel's head must be checked
210+
// this method is lock-free (it never waits on lock)
211+
@Suppress("UNCHECKED_CAST")
212+
fun checkOffer(): Boolean {
213+
var updated = false
214+
var closed: Closed<*>? = null
215+
loop@
216+
while (needsToCheckOfferWithoutLock()) {
217+
// just use `tryLock` here and break when some other thread is checking under lock
218+
// it means that `checkOffer` must be retried after every `unlock`
219+
if (!lock.tryLock()) break
220+
val receive: ReceiveOrClosed<E>?
221+
val token: Any?
222+
try {
223+
val result = peekUnderLock()
224+
when {
225+
result === POLL_FAILED -> continue@loop // must retest `needsToCheckOfferWithoutLock` outside of the lock
226+
result is Closed<*> -> {
227+
closed = result
228+
break@loop // was closed
229+
}
230+
}
231+
// find a receiver for an element
232+
receive = takeFirstReceiveOrPeekClosed() ?: break // break when no one's receiving
233+
if (receive is Closed<*>) break // noting more to do if this sub already closed
234+
token = receive.tryResumeReceive(result as E, idempotent = null)
235+
if (token == null) continue // bail out here to next iteration (see for next receiver)
236+
val subHead = this.subHead
237+
this.subHead = subHead + 1 // retrieved element for this subscriber
238+
updated = true
239+
} finally {
240+
lock.unlock()
241+
}
242+
receive!!.completeResumeReceive(token!!)
243+
}
244+
// do close outside of lock if needed
245+
closed?.also { close(cause = it.closeCause) }
246+
return updated
247+
}
248+
249+
// result is `E | POLL_FAILED | Closed`
250+
override fun pollInternal(): Any? {
251+
var updated = false
252+
val result: Any?
253+
lock.lock()
254+
try {
255+
result = peekUnderLock()
256+
when {
257+
result is Closed<*> -> { /* just bail out of lock */ }
258+
result === POLL_FAILED -> { /* just bail out of lock */ }
259+
else -> {
260+
// update subHead after retrieiving element from buffer
261+
val subHead = this.subHead
262+
this.subHead = subHead + 1
263+
updated = true
264+
}
265+
}
266+
} finally {
267+
lock.unlock()
268+
}
269+
// do close outside of lock
270+
(result as? Closed<*>)?.also { close(cause = it.closeCause) }
271+
// there could have been checkOffer attempt while we were holding lock
272+
// now outside the lock recheck if anything else to offer
273+
if (checkOffer())
274+
updated = true
275+
// and finally update broadcast's channel head if needed
276+
if (updated)
277+
broadcastChannel.updateHead()
278+
return result
279+
}
280+
281+
// result is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
282+
override fun pollSelectInternal(select: SelectInstance<*>): Any? {
283+
var updated = false
284+
var result: Any?
285+
lock.lock()
286+
try {
287+
result = peekUnderLock()
288+
when {
289+
result is Closed<*> -> { /* just bail out of lock */ }
290+
result === POLL_FAILED -> { /* just bail out of lock */ }
291+
else -> {
292+
// let's try to select receiving this element from buffer
293+
if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
294+
result = ALREADY_SELECTED
295+
} else {
296+
// update subHead after retrieiving element from buffer
297+
val subHead = this.subHead
298+
this.subHead = subHead + 1
299+
updated = true
300+
}
301+
}
302+
}
303+
} finally {
304+
lock.unlock()
305+
}
306+
// do close outside of lock
307+
(result as? Closed<*>)?.also { close(cause = it.closeCause) }
308+
// there could have been checkOffer attempt while we were holding lock
309+
// now outside the lock recheck if anything else to offer
310+
if (checkOffer())
311+
updated = true
312+
// and finally update broadcast's channel head if needed
313+
if (updated)
314+
broadcastChannel.updateHead()
315+
return result
316+
}
317+
318+
// Must invoke this check this after lock, because offer's invocation of `checkOffer` might have failed
319+
// to `tryLock` just before the lock was about to unlocked, thus loosing notification to this
320+
// subscription about an element that was just offered
321+
private fun needsToCheckOfferWithoutLock(): Boolean {
322+
if (closedForReceive != null)
323+
return false // already closed -> nothing to do
324+
if (isBufferEmpty && broadcastChannel.closedForReceive == null)
325+
return false // no data for us && broadcast channel was not closed yet -> nothing to do
326+
return true // check otherwise
327+
}
328+
329+
// guarded by lock, returns:
330+
// E - the element from the buffer at subHead
331+
// Closed<*> when closed;
332+
// POLL_FAILED when there seems to be no data, but must retest `needsToCheckOfferWithoutLock` outside of lock
333+
private fun peekUnderLock(): Any? {
334+
val subHead = this.subHead // guarded read (can be non-volatile read)
335+
// note: from the broadcastChannel we must read closed token first, then read its tail
336+
// because it is Ok if tail moves in between the reads (we make decision based on tail first)
337+
val closed = broadcastChannel.closedForReceive // unguarded volatile read
338+
val tail = broadcastChannel.tail // unguarded volatile read
339+
if (subHead >= tail) {
340+
// no elements to poll from the queue -- check if closed
341+
return closed ?: POLL_FAILED // must retest `needsToCheckOfferWithoutLock` outside of the lock
342+
}
343+
return broadcastChannel.elementAt(subHead)
344+
}
345+
}
346+
}

0 commit comments

Comments
 (0)