Skip to content

Commit 53d2e25

Browse files
committed
LockFreeTaskQueue minor improvements & additional tests for MC case
1 parent 9512ab2 commit 53d2e25

File tree

3 files changed

+48
-34
lines changed

3 files changed

+48
-34
lines changed

core/kotlinx-coroutines-core/src/internal/LockFreeTaskQueue.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@ internal class LockFreeTaskQueueCore<E : Any>(
105105
_state.loop { state ->
106106
if (state and (FROZEN_MASK or CLOSED_MASK) != 0L) return state.addFailReason() // cannot add
107107
state.withState { head, tail ->
108-
// If queue is Single-Consumer than there could be one element beyond head that we cannot overwrite,
108+
val mask = this.mask // manually move instance field to local for performance
109+
// If queue is Single-Consumer then there could be one element beyond head that we cannot overwrite,
109110
// so we check for full queue with an extra margin of one element
110111
if ((tail + 2) and mask == head and mask) return ADD_FROZEN // overfull, so do freeze & copy
111112
// If queue is Multi-Consumer then the consumer could still have not cleared element

core/kotlinx-coroutines-core/test/internal/LockFreeTaskQueueLinearizabilityTest.kt

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,15 @@ import kotlin.test.*
1414

1515
@OpGroupConfigs(OpGroupConfig(name = "consumer", nonParallel = true))
1616
@Param(name = "value", gen = IntGen::class, conf = "1:3")
17-
class LockFreeTaskQueueLinearizabilityTest : TestBase() {
18-
private var singleConsumer = false
17+
class LockFreeTaskQueueLinearizabilityTestSC : LockFreeTaskQueueLinearizabilityTestBase(singleConsumer = true)
18+
19+
@OpGroupConfigs(OpGroupConfig(name = "consumer", nonParallel = true))
20+
@Param(name = "value", gen = IntGen::class, conf = "1:3")
21+
class LockFreeTaskQueueLinearizabilityTestMC : LockFreeTaskQueueLinearizabilityTestBase(singleConsumer = false)
22+
23+
open class LockFreeTaskQueueLinearizabilityTestBase(
24+
private val singleConsumer: Boolean
25+
) : TestBase() {
1926
private lateinit var q: LockFreeTaskQueue<Int>
2027

2128
@Reset
@@ -37,24 +44,13 @@ class LockFreeTaskQueueLinearizabilityTest : TestBase() {
3744
// fun removeFirstOrNull() = q.removeFirstOrNull()
3845

3946
@Test
40-
fun testLinearizabilitySC() {
41-
singleConsumer = true
42-
linTest()
43-
}
44-
45-
@Test
46-
fun testLinearizabilityMC() {
47-
singleConsumer = false
48-
linTest()
49-
}
50-
51-
private fun linTest() {
47+
fun linTest() {
5248
val options = StressOptions()
5349
.iterations(100 * stressTestMultiplierSqrt)
5450
.invocationsPerIteration(1000 * stressTestMultiplierSqrt)
5551
.addThread(1, 3)
5652
.addThread(1, 3)
5753
.addThread(1, 3)
58-
LinChecker.check(LockFreeTaskQueueLinearizabilityTest::class.java, options)
54+
LinChecker.check(this::class.java, options)
5955
}
6056
}

core/kotlinx-coroutines-core/test/internal/LockFreeTaskQueueStressTest.kt

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,25 @@ package kotlinx.coroutines.internal
66

77
import kotlinx.atomicfu.*
88
import kotlinx.coroutines.*
9+
import org.junit.runner.*
10+
import org.junit.runners.*
911
import java.util.concurrent.*
1012
import kotlin.concurrent.*
1113
import kotlin.test.*
1214

1315
// Tests many short queues to stress copy/resize
14-
class LockFreeTaskQueueStressTest : TestBase() {
16+
@RunWith(Parameterized::class)
17+
class LockFreeTaskQueueStressTest(
18+
private val nConsumers: Int
19+
) : TestBase() {
20+
companion object {
21+
@Parameterized.Parameters(name = "nConsumers={0}")
22+
@JvmStatic
23+
fun params(): Collection<Int> = listOf(1, 3)
24+
}
25+
26+
private val singleConsumer = nConsumers == 1
27+
1528
private val nSeconds = 3 * stressTestMultiplier
1629
private val nProducers = 4
1730
private val batchSize = 100
@@ -25,7 +38,7 @@ class LockFreeTaskQueueStressTest : TestBase() {
2538
private val done = atomic(0)
2639
private val doneProducers = atomic(0)
2740

28-
private val barrier = CyclicBarrier(nProducers + 2)
41+
private val barrier = CyclicBarrier(nProducers + nConsumers + 1)
2942

3043
private class Item(val producer: Int, val index: Long)
3144

@@ -34,7 +47,7 @@ class LockFreeTaskQueueStressTest : TestBase() {
3447
val threads = mutableListOf<Thread>()
3548
threads += thread(name = "Pacer", start = false) {
3649
while (done.value == 0) {
37-
queue.value = LockFreeTaskQueue(false)
50+
queue.value = LockFreeTaskQueue(singleConsumer)
3851
batch.value = 0
3952
doneProducers.value = 0
4053
barrier.await() // start consumers & producers
@@ -44,25 +57,30 @@ class LockFreeTaskQueueStressTest : TestBase() {
4457
println("Pacer done")
4558
barrier.await() // wakeup the rest
4659
}
47-
threads += thread(name = "Consumer", start = false) {
48-
while (true) {
49-
barrier.await()
50-
val queue = queue.value ?: break
60+
threads += List(nConsumers) { consumer ->
61+
thread(name = "Consumer-$consumer", start = false) {
5162
while (true) {
52-
val item = queue.removeFirstOrNull()
53-
if (item == null) {
54-
if (doneProducers.value == nProducers && queue.isEmpty) break // that's it
55-
continue // spin to retry
63+
barrier.await()
64+
val queue = queue.value ?: break
65+
while (true) {
66+
val item = queue.removeFirstOrNull()
67+
if (item == null) {
68+
if (doneProducers.value == nProducers && queue.isEmpty) break // that's it
69+
continue // spin to retry
70+
}
71+
consumed.incrementAndGet()
72+
if (singleConsumer) {
73+
// This check only properly works in single-consumer case
74+
val eItem = expected[item.producer]++
75+
if (eItem != item.index) error("Expected $eItem but got ${item.index} from Producer-${item.producer}")
76+
}
5677
}
57-
consumed.incrementAndGet()
58-
val eItem = expected[item.producer]++
59-
if (eItem != item.index) error("Expected $eItem but got ${item.index} from Producer-${item.producer}")
78+
barrier.await()
6079
}
61-
barrier.await()
80+
println("Consumer-$consumer done")
6281
}
63-
println("Consumer done")
6482
}
65-
val producers = List(nProducers) { producer ->
83+
threads += List(nProducers) { producer ->
6684
thread(name = "Producer-$producer", start = false) {
6785
var index = 0L
6886
while (true) {
@@ -79,7 +97,6 @@ class LockFreeTaskQueueStressTest : TestBase() {
7997
println("Producer-$producer done")
8098
}
8199
}
82-
threads += producers
83100
threads.forEach {
84101
it.setUncaughtExceptionHandler { t, e ->
85102
System.err.println("Thread $t failed: $e")

0 commit comments

Comments
 (0)