Skip to content

Commit b8ed47c

Browse files
konrad-kaminskielizarov
authored andcommitted
Implementation of awaitFirstOrDefault and awaitFirstOrNull.
1 parent b2b5c06 commit b8ed47c

File tree

12 files changed

+229
-0
lines changed

12 files changed

+229
-0
lines changed

reactive/kotlinx-coroutines-reactive/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ Suspending extension functions and suspending iteration:
1414
| -------- | ---------------
1515
| [Publisher.awaitFirst][org.reactivestreams.Publisher.awaitFirst] | Returns the first value from the given publisher
1616
| [Publisher.awaitFirstOrDefault][org.reactivestreams.Publisher.awaitFirstOrDefault] | Returns the first value from the given publisher or default
17+
| [Publisher.awaitFirstOrElse][org.reactivestreams.Publisher.awaitFirstOrElse] | Returns the first value from the given publisher or default from a function
18+
| [Publisher.awaitFirstOrNull][org.reactivestreams.Publisher.awaitFirstOrNull] | Returns the first value from the given publisher or null
1719
| [Publisher.awaitLast][org.reactivestreams.Publisher.awaitFirst] | Returns the last value from the given publisher
1820
| [Publisher.awaitSingle][org.reactivestreams.Publisher.awaitSingle] | Returns the single value from the given publisher
1921
| [Publisher.openSubscription][org.reactivestreams.Publisher.openSubscription] | Subscribes to publisher and returns [ReceiveChannel]
@@ -36,6 +38,8 @@ Conversion functions:
3638
[publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/publish.html
3739
[org.reactivestreams.Publisher.awaitFirst]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/org.reactivestreams.-publisher/await-first.html
3840
[org.reactivestreams.Publisher.awaitFirstOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/org.reactivestreams.-publisher/await-first-or-default.html
41+
[org.reactivestreams.Publisher.awaitFirstOrElse]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/org.reactivestreams.-publisher/await-first-or-else.html
42+
[org.reactivestreams.Publisher.awaitFirstOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/org.reactivestreams.-publisher/await-first-or-null.html
3943
[org.reactivestreams.Publisher.awaitSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/org.reactivestreams.-publisher/await-single.html
4044
[org.reactivestreams.Publisher.openSubscription]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/org.reactivestreams.-publisher/open-subscription.html
4145
[org.reactivestreams.Publisher.iterator]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/org.reactivestreams.-publisher/iterator.html

reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Await.kt

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,26 @@ public suspend fun <T> Publisher<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
4545
*/
4646
public suspend fun <T> Publisher<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)
4747

48+
/**
49+
* Awaits for the first value from the given observable or `null` value if none is emitted without blocking a
50+
* thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
51+
*
52+
* This suspending function is cancellable.
53+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
54+
* immediately resumes with [CancellationException].
55+
*/
56+
public suspend fun <T> Publisher<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
57+
58+
/**
59+
* Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a
60+
* thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
61+
*
62+
* This suspending function is cancellable.
63+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
64+
* immediately resumes with [CancellationException].
65+
*/
66+
public suspend fun <T> Publisher<T>.awaitFirstOrElse(defaultValue: () -> T): T = awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
67+
4868
/**
4969
* Awaits for the last value from the given publisher without blocking a thread and
5070
* returns the resulting value or throws the corresponding exception if this publisher had produced error.

reactive/kotlinx-coroutines-reactive/src/test/kotlin/kotlinx/coroutines/experimental/reactive/IntegrationTest.kt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import kotlinx.coroutines.experimental.*
2020
import org.hamcrest.MatcherAssert.assertThat
2121
import org.hamcrest.core.IsEqual
2222
import org.hamcrest.core.IsInstanceOf
23+
import org.hamcrest.core.IsNull
2324
import org.junit.Test
2425
import org.junit.runner.RunWith
2526
import org.junit.runners.Parameterized
@@ -58,6 +59,8 @@ class IntegrationTest(
5859
}
5960
assertNSE { pub.awaitFirst() }
6061
assertThat(pub.awaitFirstOrDefault("OK"), IsEqual("OK"))
62+
assertThat(pub.awaitFirstOrNull(), IsNull())
63+
assertThat(pub.awaitFirstOrElse { "ELSE" }, IsEqual("ELSE"))
6164
assertNSE { pub.awaitLast() }
6265
assertNSE { pub.awaitSingle() }
6366
var cnt = 0
@@ -73,6 +76,8 @@ class IntegrationTest(
7376
}
7477
assertThat(pub.awaitFirst(), IsEqual("OK"))
7578
assertThat(pub.awaitFirstOrDefault("!"), IsEqual("OK"))
79+
assertThat(pub.awaitFirstOrNull(), IsEqual("OK"))
80+
assertThat(pub.awaitFirstOrElse { "ELSE" }, IsEqual("OK"))
7681
assertThat(pub.awaitLast(), IsEqual("OK"))
7782
assertThat(pub.awaitSingle(), IsEqual("OK"))
7883
var cnt = 0
@@ -95,6 +100,8 @@ class IntegrationTest(
95100
assertThat(pub.awaitFirst(), IsEqual(1))
96101
assertThat(pub.awaitFirstOrDefault(0), IsEqual(1))
97102
assertThat(pub.awaitLast(), IsEqual(n))
103+
assertThat(pub.awaitFirstOrNull(), IsEqual(1))
104+
assertThat(pub.awaitFirstOrElse { 0 }, IsEqual(1))
98105
assertIAE { pub.awaitSingle() }
99106
checkNumbers(n, pub)
100107
val channel = pub.openSubscription()

reactive/kotlinx-coroutines-reactor/src/test/kotlin/kotlinx/coroutines/experimental/reactor/FluxSingleTest.kt

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,50 @@ class FluxSingleTest {
111111
}
112112
}
113113

114+
@Test
115+
fun testAwaitFirstOrNull() {
116+
val flux = flux<String>(CommonPool) {
117+
send(Flux.empty<String>().awaitFirstOrNull() ?: "OK")
118+
}
119+
120+
checkSingleValue(flux) {
121+
assertEquals("OK", it)
122+
}
123+
}
124+
125+
@Test
126+
fun testAwaitFirstOrNullWithValues() {
127+
val flux = flux(CommonPool) {
128+
send((Flux.just("O", "#").awaitFirstOrNull() ?: "!") + "K")
129+
}
130+
131+
checkSingleValue(flux) {
132+
assertEquals("OK", it)
133+
}
134+
}
135+
136+
@Test
137+
fun testAwaitFirstOrElse() {
138+
val flux = flux(CommonPool) {
139+
send(Flux.empty<String>().awaitFirstOrElse { "O" } + "K")
140+
}
141+
142+
checkSingleValue(flux) {
143+
assertEquals("OK", it)
144+
}
145+
}
146+
147+
@Test
148+
fun testAwaitFirstOrElseWithValues() {
149+
val flux = flux(CommonPool) {
150+
send(Flux.just("O", "#").awaitFirstOrElse { "!" } + "K")
151+
}
152+
153+
checkSingleValue(flux) {
154+
assertEquals("OK", it)
155+
}
156+
}
157+
114158
@Test
115159
fun testAwaitLast() {
116160
val flux = flux(CommonPool) {

reactive/kotlinx-coroutines-rx1/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ Suspending extension functions and suspending iteration:
1818
| [Single.await][rx.Single.await] | Awaits for completion of the single value and returns it
1919
| [Observable.awaitFirst][rx.Observable.awaitFirst] | Returns the first value from the given observable
2020
| [Observable.awaitFirstOrDefault][rx.Observable.awaitFirstOrDefault] | Returns the first value from the given observable or default
21+
| [Observable.awaitFirstOrElse][rx.Observable.awaitFirstOrElse] | Returns the first value from the given observable or default from a function
22+
| [Observable.awaitFirstOrNull][rx.Observable.awaitFirstOrNull] | Returns the first value from the given observable or null
2123
| [Observable.awaitLast][rx.Observable.awaitFirst] | Returns the last value from the given observable
2224
| [Observable.awaitSingle][rx.Observable.awaitSingle] | Returns the single value from the given observable
2325
| [Observable.openSubscription][rx.Observable.openSubscription] | Subscribes to observable and returns [ReceiveChannel]
@@ -47,6 +49,8 @@ Conversion functions:
4749
[rx.Single.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/rx.-single/await.html
4850
[rx.Observable.awaitFirst]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/rx.-observable/await-first.html
4951
[rx.Observable.awaitFirstOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/rx.-observable/await-first-or-default.html
52+
[rx.Observable.awaitFirstOrElse]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/rx.-observable/await-first-or-else.html
53+
[rx.Observable.awaitFirstOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/rx.-observable/await-first-or-null.html
5054
[rx.Observable.awaitSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/rx.-observable/await-single.html
5155
[rx.Observable.openSubscription]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/rx.-observable/open-subscription.html
5256
[rx.Observable.iterator]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/rx.-observable/iterator.html

reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxAwait.kt

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,26 @@ public suspend fun <T> Observable<T>.awaitFirst(): T = first().awaitOne()
8080
*/
8181
public suspend fun <T> Observable<T>.awaitFirstOrDefault(default: T): T = firstOrDefault(default).awaitOne()
8282

83+
/**
84+
* Awaits for the first value from the given observable or `null` value if none is emitted without blocking a
85+
* thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
86+
*
87+
* This suspending function is cancellable.
88+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
89+
* immediately resumes with [CancellationException].
90+
*/
91+
public suspend fun <T> Observable<T>.awaitFirstOrNull(): T? = firstOrDefault(null).awaitOne()
92+
93+
/**
94+
* Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a
95+
* thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
96+
*
97+
* This suspending function is cancellable.
98+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
99+
* immediately resumes with [CancellationException].
100+
*/
101+
public suspend fun <T> Observable<T>.awaitFirstOrElse(defaultValue: () -> T): T = switchIfEmpty(Observable.fromCallable(defaultValue)).first().awaitOne()
102+
83103
/**
84104
* Awaits for the last value from the given observable without blocking a thread and
85105
* returns the resulting value or throws the corresponding exception if this observable had produced error.

reactive/kotlinx-coroutines-rx1/src/test/kotlin/kotlinx/coroutines/experimental/rx1/IntegrationTest.kt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import kotlinx.coroutines.experimental.*
2020
import org.hamcrest.MatcherAssert.assertThat
2121
import org.hamcrest.core.IsEqual
2222
import org.hamcrest.core.IsInstanceOf
23+
import org.hamcrest.core.IsNull
2324
import org.junit.Test
2425
import org.junit.runner.RunWith
2526
import org.junit.runners.Parameterized
@@ -58,6 +59,8 @@ class IntegrationTest(
5859
}
5960
assertNSE { observable.awaitFirst() }
6061
assertThat(observable.awaitFirstOrDefault("OK"), IsEqual("OK"))
62+
assertThat(observable.awaitFirstOrNull(), IsNull())
63+
assertThat(observable.awaitFirstOrElse {"ELSE" }, IsEqual("ELSE"))
6164
assertNSE { observable.awaitLast() }
6265
assertNSE { observable.awaitSingle() }
6366
var cnt = 0
@@ -74,6 +77,9 @@ class IntegrationTest(
7477
send("OK")
7578
}
7679
assertThat(observable.awaitFirst(), IsEqual("OK"))
80+
assertThat(observable.awaitFirstOrDefault("OK"), IsEqual("OK"))
81+
assertThat(observable.awaitFirstOrNull(), IsEqual("OK"))
82+
assertThat(observable.awaitFirstOrElse {"OK" }, IsEqual("OK"))
7783
assertThat(observable.awaitLast(), IsEqual("OK"))
7884
assertThat(observable.awaitSingle(), IsEqual("OK"))
7985
var cnt = 0
@@ -94,6 +100,9 @@ class IntegrationTest(
94100
}
95101
}
96102
assertThat(observable.awaitFirst(), IsEqual(1))
103+
assertThat(observable.awaitFirstOrDefault(0), IsEqual(1))
104+
assertThat(observable.awaitFirstOrNull(), IsEqual(1))
105+
assertThat(observable.awaitFirstOrElse { 0 }, IsEqual(1))
97106
assertThat(observable.awaitLast(), IsEqual(n))
98107
assertIAE { observable.awaitSingle() }
99108
checkNumbers(n, observable)

reactive/kotlinx-coroutines-rx1/src/test/kotlin/kotlinx/coroutines/experimental/rx1/ObservableSingleTest.kt

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,50 @@ class ObservableSingleTest {
121121
}
122122
}
123123

124+
@Test
125+
fun testAwaitFirstOrNull() {
126+
val observable = rxObservable<String>(CommonPool) {
127+
send(Observable.empty<String>().awaitFirstOrNull() ?: "OK")
128+
}
129+
130+
checkSingleValue(observable) {
131+
assertEquals("OK", it)
132+
}
133+
}
134+
135+
@Test
136+
fun testAwaitFirstOrNullWithValues() {
137+
val observable = rxObservable(CommonPool) {
138+
send((Observable.just("O", "#").awaitFirstOrNull() ?: "!") + "K")
139+
}
140+
141+
checkSingleValue(observable) {
142+
assertEquals("OK", it)
143+
}
144+
}
145+
146+
@Test
147+
fun testAwaitFirstOrElse() {
148+
val observable = rxObservable(CommonPool) {
149+
send(Observable.empty<String>().awaitFirstOrElse { "O" } + "K")
150+
}
151+
152+
checkSingleValue(observable) {
153+
assertEquals("OK", it)
154+
}
155+
}
156+
157+
@Test
158+
fun testAwaitFirstOrElseWithValues() {
159+
val observable = rxObservable(CommonPool) {
160+
send(Observable.just("O", "#").awaitFirstOrElse { "!" } + "K")
161+
}
162+
163+
checkSingleValue(observable) {
164+
assertEquals("OK", it)
165+
}
166+
}
167+
124168
@Test
125169
fun testAwaitLast() {
126170
val observable = rxObservable(CommonPool) {

reactive/kotlinx-coroutines-rx2/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ Suspending extension functions and suspending iteration:
2323
| [SingleSource.await][io.reactivex.SingleSource.await] | Awaits for completion of the single value and returns it
2424
| [ObservableSource.awaitFirst][io.reactivex.ObservableSource.awaitFirst] | Awaits for the first value from the given observable
2525
| [ObservableSource.awaitFirstOrDefault][io.reactivex.ObservableSource.awaitFirstOrDefault] | Awaits for the first value from the given observable or default
26+
| [ObservableSource.awaitFirstOrElse][io.reactivex.ObservableSource.awaitFirstOrElse] | Awaits for the first value from the given observable or default from a function
27+
| [ObservableSource.awaitFirstOrNull][io.reactivex.ObservableSource.awaitFirstOrNull] | Awaits for the first value from the given observable or null
2628
| [ObservableSource.awaitLast][io.reactivex.ObservableSource.awaitFirst] | Awaits for the last value from the given observable
2729
| [ObservableSource.awaitSingle][io.reactivex.ObservableSource.awaitSingle] | Awaits for the single value from the given observable
2830
| [ObservableSource.openSubscription][io.reactivex.ObservableSource.openSubscription] | Subscribes to observable and returns [ReceiveChannel]
@@ -63,6 +65,8 @@ Conversion functions:
6365
[io.reactivex.SingleSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-single-source/await.html
6466
[io.reactivex.ObservableSource.awaitFirst]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-observable-source/await-first.html
6567
[io.reactivex.ObservableSource.awaitFirstOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-observable-source/await-first-or-default.html
68+
[io.reactivex.ObservableSource.awaitFirstOrElse]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-observable-source/await-first-or-else.html
69+
[io.reactivex.ObservableSource.awaitFirstOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-observable-source/await-first-or-null.html
6670
[io.reactivex.ObservableSource.awaitSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-observable-source/await-single.html
6771
[io.reactivex.ObservableSource.openSubscription]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-observable-source/open-subscription.html
6872
[io.reactivex.ObservableSource.iterator]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-observable-source/iterator.html

reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxAwait.kt

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,26 @@ public suspend fun <T> ObservableSource<T>.awaitFirst(): T = awaitOne(Mode.FIRST
114114
*/
115115
public suspend fun <T> ObservableSource<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)
116116

117+
/**
118+
* Awaits for the first value from the given observable or `null` value if none is emitted without blocking a
119+
* thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
120+
*
121+
* This suspending function is cancellable.
122+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
123+
* immediately resumes with [CancellationException].
124+
*/
125+
public suspend fun <T> ObservableSource<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
126+
127+
/**
128+
* Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a
129+
* thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
130+
*
131+
* This suspending function is cancellable.
132+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
133+
* immediately resumes with [CancellationException].
134+
*/
135+
public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () -> T): T = awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
136+
117137
/**
118138
* Awaits for the last value from the given observable without blocking a thread.
119139
* Returns the resulting value or throws the corresponding exception if this observable had produced error.

reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/IntegrationTest.kt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import kotlinx.coroutines.experimental.*
2121
import org.hamcrest.MatcherAssert.assertThat
2222
import org.hamcrest.core.IsEqual
2323
import org.hamcrest.core.IsInstanceOf
24+
import org.hamcrest.core.IsNull
2425
import org.junit.Test
2526
import org.junit.runner.RunWith
2627
import org.junit.runners.Parameterized
@@ -58,6 +59,8 @@ class IntegrationTest(
5859
}
5960
assertNSE { observable.awaitFirst() }
6061
assertThat(observable.awaitFirstOrDefault("OK"), IsEqual("OK"))
62+
assertThat(observable.awaitFirstOrNull(), IsNull())
63+
assertThat(observable.awaitFirstOrElse { "ELSE" }, IsEqual("ELSE"))
6164
assertNSE { observable.awaitLast() }
6265
assertNSE { observable.awaitSingle() }
6366
var cnt = 0
@@ -74,6 +77,9 @@ class IntegrationTest(
7477
send("OK")
7578
}
7679
assertThat(observable.awaitFirst(), IsEqual("OK"))
80+
assertThat(observable.awaitFirstOrDefault("OK"), IsEqual("OK"))
81+
assertThat(observable.awaitFirstOrNull(), IsEqual("OK"))
82+
assertThat(observable.awaitFirstOrElse { "ELSE" }, IsEqual("OK"))
7783
assertThat(observable.awaitLast(), IsEqual("OK"))
7884
assertThat(observable.awaitSingle(), IsEqual("OK"))
7985
var cnt = 0
@@ -94,6 +100,9 @@ class IntegrationTest(
94100
}
95101
}
96102
assertThat(observable.awaitFirst(), IsEqual(1))
103+
assertThat(observable.awaitFirstOrDefault(0), IsEqual(1))
104+
assertThat(observable.awaitFirstOrNull(), IsEqual(1))
105+
assertThat(observable.awaitFirstOrElse { 0 }, IsEqual(1))
97106
assertThat(observable.awaitLast(), IsEqual(n))
98107
assertIAE { observable.awaitSingle() }
99108
checkNumbers(n, observable)

0 commit comments

Comments
 (0)