Skip to content

Commit f5bc047

Browse files
committed
A section on "Shared mutable state and concurrency" in the guide
1 parent f8fc478 commit f5bc047

File tree

6 files changed

+428
-0
lines changed

6 files changed

+428
-0
lines changed

coroutines-guide.md

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,12 @@ This is a short guide on core features of `kotlinx.coroutines` with a series of
6868
* [Fan-out](#fan-out)
6969
* [Fan-in](#fan-in)
7070
* [Buffered channels](#buffered-channels)
71+
* [Shared mutable state and concurrency](#shared-mutable-state-and-concurrency)
72+
* [The problem](#the-problem)
73+
* [Thread-safe data structures](#thread-safe-data-structures)
74+
* [Thread confinement](#thread-confinement)
75+
* [Mutual exclusion](#mutual-exclusion)
76+
* [Actors](#actors)
7177
* [Select expression](#select-expression)
7278
* [Selecting from channels](#selecting-from-channels)
7379
* [Selecting on close](#selecting-on-close)
@@ -1328,6 +1334,181 @@ Sending 4
13281334

13291335
The first four elements are added to the buffer and the sender suspends when trying to send the fifth one.
13301336

1337+
## Shared mutable state and concurrency
1338+
1339+
Coroutines can be executed concurrently using a multi-threaded dispatcher like [CommonPool]. It presents
1340+
all the usual concurrency problems. The main problem being synchronization of access to **shared mutable state**.
1341+
Some solutions to this problem in the land of coroutines are similar to the solutions in the multi-threaded world,
1342+
but others are unique.
1343+
1344+
### The problem
1345+
1346+
Let us launch 100k coroutines all doing the same action. We'll also measure their completion time for
1347+
further comparisons:
1348+
1349+
<!--- INCLUDE .*/example-sync-([0-9]+).kt
1350+
import kotlin.system.measureTimeMillis
1351+
-->
1352+
1353+
<!--- INCLUDE .*/example-sync-02.kt
1354+
import java.util.concurrent.atomic.AtomicInteger
1355+
-->
1356+
1357+
<!--- INCLUDE .*/example-sync-04.kt
1358+
import kotlinx.coroutines.experimental.sync.Mutex
1359+
-->
1360+
1361+
<!--- INCLUDE .*/example-sync-05.kt
1362+
import kotlinx.coroutines.experimental.channels.*
1363+
-->
1364+
1365+
```kotlin
1366+
suspend fun massiveRun(action: suspend () -> Unit) {
1367+
val n = 100_000
1368+
val time = measureTimeMillis {
1369+
val jobs = List(n) {
1370+
launch(CommonPool) {
1371+
action()
1372+
}
1373+
}
1374+
jobs.forEach { it.join() }
1375+
}
1376+
println("Completed in $time ms")
1377+
}
1378+
```
1379+
1380+
<!--- INCLUDE .*/example-sync-([0-9]+).kt -->
1381+
1382+
We start with a very simple action, that increments a shared mutable variable.
1383+
1384+
```kotlin
1385+
var counter = 0
1386+
1387+
fun main(args: Array<String>) = runBlocking<Unit> {
1388+
massiveRun {
1389+
counter++
1390+
}
1391+
println("Counter = $counter")
1392+
}
1393+
```
1394+
1395+
> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-01.kt)
1396+
1397+
What does it print at the end? It is highly unlikely to ever print "100000", because all the
1398+
100k coroutines increment the `counter` concurrently without any synchronization.
1399+
1400+
### Thread-safe data structures
1401+
1402+
The general solution that works both for threads and for coroutines is to use a thread-safe (aka synchronized,
1403+
linearizable, or atomic) data structure that provides all the necessarily synchronization for the corresponding
1404+
operations that needs to be performed on a shared state.
1405+
In the case of a simple counter we can use `AtomicInteger` class:
1406+
1407+
```kotlin
1408+
var counter = AtomicInteger()
1409+
1410+
fun main(args: Array<String>) = runBlocking<Unit> {
1411+
massiveRun {
1412+
counter.incrementAndGet()
1413+
}
1414+
println("Counter = ${counter.get()}")
1415+
}
1416+
```
1417+
1418+
> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-02.kt)
1419+
1420+
This is the fastest solution for this particular problem. It works for plain counters, collections, queues and other
1421+
standard data structures and basic operations on them. However, it does not easily scale to complex
1422+
state or to complex operations that do not have ready-to-use thread-safe implementations.
1423+
1424+
### Thread confinement
1425+
1426+
Thread confinement is an approach to the problem of shared mutable state where all access to the particular shared
1427+
state is confined to a single thread. It is typically used in UI applications, where all UI state is confined to
1428+
the single event-dispatch/application thread. It is easy to apply with coroutines by using a
1429+
single-threaded context:
1430+
1431+
```kotlin
1432+
val counterContext = newSingleThreadContext("CounterContext")
1433+
var counter = 0
1434+
1435+
fun main(args: Array<String>) = runBlocking<Unit> {
1436+
massiveRun {
1437+
run(counterContext) {
1438+
counter++
1439+
}
1440+
}
1441+
println("Counter = $counter")
1442+
}
1443+
```
1444+
1445+
> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-03.kt)
1446+
1447+
### Mutual exclusion
1448+
1449+
Mutual exclusion solution to the problem is to protect all modifications of the shared state with a _critical section_
1450+
that is never executed concurrently. In a blocking world you'd typically use `synchronized` or `ReentrantLock` for that.
1451+
Coroutine's alternative is called [Mutex]. It has [lock][Mutex.lock] and [unlock][Mutex.unlock] functions to
1452+
delimit a critical section. The key difference is that `Mutex.lock` is a suspending function. It does not block a thread.
1453+
1454+
```kotlin
1455+
val mutex = Mutex()
1456+
var counter = 0
1457+
1458+
fun main(args: Array<String>) = runBlocking<Unit> {
1459+
massiveRun {
1460+
mutex.lock()
1461+
try { counter++ }
1462+
finally { mutex.unlock() }
1463+
}
1464+
println("Counter = $counter")
1465+
}
1466+
```
1467+
1468+
> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-04.kt)
1469+
1470+
### Actors
1471+
1472+
An actor is a combination of a coroutine, the state that is confined and is encapsulated into this coroutine,
1473+
and a channel to communicate with other coroutines. A simple actor can be written as a function,
1474+
but an actor with a complex state is better suited for a class.
1475+
1476+
```kotlin
1477+
// Message types for counterActor
1478+
sealed class CounterMsg
1479+
object IncCounter : CounterMsg() // one-way message to increment counter
1480+
class GetCounter(val response: SendChannel<Int>) : CounterMsg() // a request with reply
1481+
1482+
// This function launches a new counter actor
1483+
fun counterActor(request: ReceiveChannel<CounterMsg>) = launch(CommonPool) {
1484+
var counter = 0 // actor state
1485+
while (true) { // main loop of the actor
1486+
val msg = request.receive()
1487+
when (msg) {
1488+
is IncCounter -> counter++
1489+
is GetCounter -> msg.response.send(counter)
1490+
}
1491+
}
1492+
}
1493+
1494+
fun main(args: Array<String>) = runBlocking<Unit> {
1495+
val request = Channel<CounterMsg>()
1496+
counterActor(request)
1497+
massiveRun {
1498+
request.send(IncCounter)
1499+
}
1500+
val response = Channel<Int>()
1501+
request.send(GetCounter(response))
1502+
println("Counter = ${response.receive()}")
1503+
}
1504+
```
1505+
1506+
> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-05.kt)
1507+
1508+
Notice, that it does not matter (for correctness) what context the actor itself is executed in. An actor is
1509+
a coroutine and a coroutine is executed sequentially, so confinement of the state to the specific coroutine
1510+
works as a solution to the problem of shared mutable state.
1511+
13311512
## Select expression
13321513

13331514
Select expression makes it possible to await multiple suspending function simultaneously and _select_
@@ -1684,6 +1865,10 @@ Channel was closed
16841865
[CoroutineName]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-name/index.html
16851866
[Job.invoke]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/invoke.html
16861867
[Job.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/cancel.html
1868+
<!--- INDEX kotlinx.coroutines.experimental.sync -->
1869+
[Mutex]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/-mutex/index.html
1870+
[Mutex.lock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/lock.html
1871+
[Mutex.unlock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/unlock.html
16871872
<!--- INDEX kotlinx.coroutines.experimental.channels -->
16881873
[Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel/index.html
16891874
[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/send.html
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
18+
package guide.sync.example01
19+
20+
import kotlinx.coroutines.experimental.*
21+
import kotlin.system.measureTimeMillis
22+
23+
suspend fun massiveRun(action: suspend () -> Unit) {
24+
val n = 100_000
25+
val time = measureTimeMillis {
26+
val jobs = List(n) {
27+
launch(CommonPool) {
28+
action()
29+
}
30+
}
31+
jobs.forEach { it.join() }
32+
}
33+
println("Completed in $time ms")
34+
}
35+
36+
var counter = 0
37+
38+
fun main(args: Array<String>) = runBlocking<Unit> {
39+
massiveRun {
40+
counter++
41+
}
42+
println("Counter = $counter")
43+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
18+
package guide.sync.example02
19+
20+
import kotlinx.coroutines.experimental.*
21+
import kotlin.system.measureTimeMillis
22+
import java.util.concurrent.atomic.AtomicInteger
23+
24+
suspend fun massiveRun(action: suspend () -> Unit) {
25+
val n = 100_000
26+
val time = measureTimeMillis {
27+
val jobs = List(n) {
28+
launch(CommonPool) {
29+
action()
30+
}
31+
}
32+
jobs.forEach { it.join() }
33+
}
34+
println("Completed in $time ms")
35+
}
36+
37+
var counter = AtomicInteger()
38+
39+
fun main(args: Array<String>) = runBlocking<Unit> {
40+
massiveRun {
41+
counter.incrementAndGet()
42+
}
43+
println("Counter = ${counter.get()}")
44+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
18+
package guide.sync.example03
19+
20+
import kotlinx.coroutines.experimental.*
21+
import kotlin.system.measureTimeMillis
22+
23+
suspend fun massiveRun(action: suspend () -> Unit) {
24+
val n = 100_000
25+
val time = measureTimeMillis {
26+
val jobs = List(n) {
27+
launch(CommonPool) {
28+
action()
29+
}
30+
}
31+
jobs.forEach { it.join() }
32+
}
33+
println("Completed in $time ms")
34+
}
35+
36+
val counterContext = newSingleThreadContext("CounterContext")
37+
var counter = 0
38+
39+
fun main(args: Array<String>) = runBlocking<Unit> {
40+
massiveRun {
41+
run(counterContext) {
42+
counter++
43+
}
44+
}
45+
println("Counter = $counter")
46+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
18+
package guide.sync.example04
19+
20+
import kotlinx.coroutines.experimental.*
21+
import kotlin.system.measureTimeMillis
22+
import kotlinx.coroutines.experimental.sync.Mutex
23+
24+
suspend fun massiveRun(action: suspend () -> Unit) {
25+
val n = 100_000
26+
val time = measureTimeMillis {
27+
val jobs = List(n) {
28+
launch(CommonPool) {
29+
action()
30+
}
31+
}
32+
jobs.forEach { it.join() }
33+
}
34+
println("Completed in $time ms")
35+
}
36+
37+
val mutex = Mutex()
38+
var counter = 0
39+
40+
fun main(args: Array<String>) = runBlocking<Unit> {
41+
massiveRun {
42+
mutex.lock()
43+
try { counter++ }
44+
finally { mutex.unlock() }
45+
}
46+
println("Counter = $counter")
47+
}

0 commit comments

Comments
 (0)