Skip to content

Commit 74d112d

Browse files
authored
Merge pull request #971 from albertache1998/flows-concat
[KTLN-774] Sequentially Concatenate 2 Kotlin Flows
2 parents a5eec76 + 87093bb commit 74d112d

File tree

2 files changed

+86
-0
lines changed

2 files changed

+86
-0
lines changed

core-kotlin-modules/core-kotlin-concurrency-3/pom.xml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,19 @@
2929
<artifactId>kotlinx-coroutines-rx3</artifactId>
3030
<version>${rxcoroutines.version}</version>
3131
</dependency>
32+
33+
<dependency>
34+
<groupId>org.jetbrains.kotlinx</groupId>
35+
<artifactId>kotlinx-coroutines-reactor</artifactId>
36+
<version>1.6.0</version>
37+
</dependency>
38+
39+
<dependency>
40+
<groupId>io.projectreactor</groupId>
41+
<artifactId>reactor-core</artifactId>
42+
<version>3.4.11</version>
43+
</dependency>
44+
3245
</dependencies>
3346

3447
<properties>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package com.baeldung.concatenateTwoFlows
2+
3+
import kotlinx.coroutines.flow.*
4+
import kotlinx.coroutines.reactive.asFlow
5+
import kotlinx.coroutines.reactor.asFlux
6+
import kotlinx.coroutines.runBlocking
7+
import org.junit.jupiter.api.Assertions.assertEquals
8+
import org.junit.jupiter.api.Test
9+
import reactor.core.publisher.Flux
10+
11+
class ConcatenateTwoFlowsUnitTest {
12+
13+
@Test
14+
fun `concatenate two flows using custom flow builder`() = runBlocking {
15+
val flow1 = flowOf(1, 2, 3)
16+
val flow2 = flowOf(4, 5, 6)
17+
val result = concatenateFlowsUsingCustomBuilder(flow1, flow2).toList()
18+
19+
assertEquals(listOf(1, 2, 3, 4, 5, 6), result)
20+
}
21+
22+
@Test
23+
fun `concatenate two flows using flattenConcat method`() = runBlocking {
24+
val flow1 = flowOf(1, 2, 3)
25+
val flow2 = flowOf(4, 5, 6)
26+
val result = flowOf(flow1, flow2).flattenConcat().toList()
27+
28+
assertEquals(listOf(1, 2, 3, 4, 5, 6), result)
29+
}
30+
31+
@Test
32+
fun `concatenate two flows using onCompletion method`() = runBlocking {
33+
val flow1 = flowOf(1, 2, 3)
34+
val flow2 = flowOf(4, 5, 6)
35+
val result = flow1.onCompletion { emitAll(flow2) }.toList()
36+
37+
assertEquals(listOf(1, 2, 3, 4, 5, 6), result)
38+
}
39+
40+
@Test
41+
fun `concatenate two flows using collect and emitAll method`() = runBlocking {
42+
val flow1 = flowOf(1, 2, 3)
43+
val flow2 = flowOf(4, 5, 6)
44+
val result = concatenateFlowsUsingEmitAll(flow1, flow2).toList()
45+
46+
assertEquals(listOf(1, 2, 3, 4, 5, 6), result)
47+
}
48+
49+
@Test
50+
fun `concatenate two flows using reactive`() = runBlocking {
51+
val flow1 = flowOf(1, 2, 3)
52+
val flow2 = flowOf(4, 5, 6)
53+
val result = concatenateFlowsUsingReactive(flow1, flow2).toList()
54+
55+
assertEquals(listOf(1, 2, 3, 4, 5, 6), result)
56+
}
57+
}
58+
59+
fun concatenateFlowsUsingCustomBuilder(flow1: Flow<Int>, flow2: Flow<Int>): Flow<Int> = flow {
60+
flow1.collect { emit(it) }
61+
flow2.collect { emit(it) }
62+
}
63+
64+
fun concatenateFlowsUsingEmitAll(flow1: Flow<Int>, flow2: Flow<Int>): Flow<Int> = flow {
65+
flow1.collect { emit(it) }
66+
emitAll(flow2)
67+
}
68+
69+
fun concatenateFlowsUsingReactive(flow1: Flow<Int>, flow2: Flow<Int>): Flow<Int> {
70+
val flux1 = flow1.asFlux()
71+
val flux2 = flow2.asFlux()
72+
return Flux.concat(flux1, flux2).asFlow()
73+
}

0 commit comments

Comments
 (0)