Skip to content

Commit 8d2ee01

Browse files
committed
Added subscribeBy(DisposableContainer, [...]) extensions.
1 parent 8396845 commit 8d2ee01

File tree

8 files changed

+844
-39
lines changed

8 files changed

+844
-39
lines changed

build.gradle.kts

+1-3
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ dependencies {
4141
api("io.reactivex.rxjava3:rxjava:3.1.0")
4242
implementation(kotlin("stdlib"))
4343

44-
testImplementation("org.funktionale:funktionale-partials:1.0.0-final")
4544
testImplementation("junit:junit:4.12")
4645
testImplementation("org.mockito:mockito-core:1.10.19")
4746

@@ -60,7 +59,6 @@ val sourcesJar by tasks.creating(Jar::class) {
6059
val dokka by tasks.getting(DokkaTask::class) {
6160
outputFormat = "html"
6261
outputDirectory = "$buildDir/javadoc"
63-
6462
}
6563

6664
//documentation
@@ -145,7 +143,7 @@ bintray {
145143

146144
setPublications(if (isRelease) release else snapshot)
147145

148-
// dryRun = true
146+
// dryRun = true
149147

150148
with(pkg) {
151149
userOrg = "reactivex"

src/main/kotlin/io/reactivex/rxjava3/kotlin/subscribers.kt

+89-29
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@ import io.reactivex.rxjava3.annotations.CheckReturnValue
66
import io.reactivex.rxjava3.annotations.SchedulerSupport
77
import io.reactivex.rxjava3.core.*
88
import io.reactivex.rxjava3.disposables.Disposable
9+
import io.reactivex.rxjava3.disposables.DisposableContainer
910
import io.reactivex.rxjava3.functions.Action
1011
import io.reactivex.rxjava3.functions.Consumer
1112
import io.reactivex.rxjava3.internal.functions.Functions
1213

13-
1414
private val onNextStub: (Any) -> Unit = {}
1515
private val onErrorStub: (Throwable) -> Unit = {}
1616
private val onCompleteStub: () -> Unit = {}
@@ -33,9 +33,9 @@ private fun (() -> Unit).asOnCompleteAction(): Action {
3333
@CheckReturnValue
3434
@SchedulerSupport(SchedulerSupport.NONE)
3535
fun <T : Any> Observable<T>.subscribeBy(
36-
onError: (Throwable) -> Unit = onErrorStub,
37-
onComplete: () -> Unit = onCompleteStub,
38-
onNext: (T) -> Unit = onNextStub
36+
onError: (Throwable) -> Unit = onErrorStub,
37+
onComplete: () -> Unit = onCompleteStub,
38+
onNext: (T) -> Unit = onNextStub
3939
): Disposable = subscribe(onNext.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction())
4040

4141
/**
@@ -45,9 +45,9 @@ fun <T : Any> Observable<T>.subscribeBy(
4545
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
4646
@SchedulerSupport(SchedulerSupport.NONE)
4747
fun <T : Any> Flowable<T>.subscribeBy(
48-
onError: (Throwable) -> Unit = onErrorStub,
49-
onComplete: () -> Unit = onCompleteStub,
50-
onNext: (T) -> Unit = onNextStub
48+
onError: (Throwable) -> Unit = onErrorStub,
49+
onComplete: () -> Unit = onCompleteStub,
50+
onNext: (T) -> Unit = onNextStub
5151
): Disposable = subscribe(onNext.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction())
5252

5353
/**
@@ -56,8 +56,8 @@ fun <T : Any> Flowable<T>.subscribeBy(
5656
@CheckReturnValue
5757
@SchedulerSupport(SchedulerSupport.NONE)
5858
fun <T : Any> Single<T>.subscribeBy(
59-
onError: (Throwable) -> Unit = onErrorStub,
60-
onSuccess: (T) -> Unit = onNextStub
59+
onError: (Throwable) -> Unit = onErrorStub,
60+
onSuccess: (T) -> Unit = onNextStub
6161
): Disposable = subscribe(onSuccess.asConsumer(), onError.asOnErrorConsumer())
6262

6363
/**
@@ -66,9 +66,9 @@ fun <T : Any> Single<T>.subscribeBy(
6666
@CheckReturnValue
6767
@SchedulerSupport(SchedulerSupport.NONE)
6868
fun <T : Any> Maybe<T>.subscribeBy(
69-
onError: (Throwable) -> Unit = onErrorStub,
70-
onComplete: () -> Unit = onCompleteStub,
71-
onSuccess: (T) -> Unit = onNextStub
69+
onError: (Throwable) -> Unit = onErrorStub,
70+
onComplete: () -> Unit = onCompleteStub,
71+
onSuccess: (T) -> Unit = onNextStub
7272
): Disposable = subscribe(onSuccess.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction())
7373

7474
/**
@@ -77,8 +77,8 @@ fun <T : Any> Maybe<T>.subscribeBy(
7777
@CheckReturnValue
7878
@SchedulerSupport(SchedulerSupport.NONE)
7979
fun Completable.subscribeBy(
80-
onError: (Throwable) -> Unit = onErrorStub,
81-
onComplete: () -> Unit = onCompleteStub
80+
onError: (Throwable) -> Unit = onErrorStub,
81+
onComplete: () -> Unit = onCompleteStub
8282
): Disposable = when {
8383
// There are optimized versions of the completable Consumers, so we need to use the subscribe overloads
8484
// here.
@@ -87,14 +87,74 @@ fun Completable.subscribeBy(
8787
else -> subscribe(onComplete.asOnCompleteAction(), Consumer(onError))
8888
}
8989

90+
/**
91+
* Overloaded subscribe function that allows passing named parameters
92+
*/
93+
@CheckReturnValue
94+
@SchedulerSupport(SchedulerSupport.NONE)
95+
fun <T : Any> Observable<T>.subscribeBy(
96+
container: DisposableContainer,
97+
onError: (Throwable) -> Unit = onErrorStub,
98+
onComplete: () -> Unit = onCompleteStub,
99+
onNext: (T) -> Unit = onNextStub
100+
): Disposable = subscribe(onNext.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction(), container)
101+
102+
/**
103+
* Overloaded subscribe function that allows passing named parameters
104+
*/
105+
@CheckReturnValue
106+
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
107+
@SchedulerSupport(SchedulerSupport.NONE)
108+
fun <T : Any> Flowable<T>.subscribeBy(
109+
container: DisposableContainer,
110+
onError: (Throwable) -> Unit = onErrorStub,
111+
onComplete: () -> Unit = onCompleteStub,
112+
onNext: (T) -> Unit = onNextStub
113+
): Disposable = subscribe(onNext.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction(), container)
114+
115+
/**
116+
* Overloaded subscribe function that allows passing named parameters
117+
*/
118+
@CheckReturnValue
119+
@SchedulerSupport(SchedulerSupport.NONE)
120+
fun <T : Any> Single<T>.subscribeBy(
121+
container: DisposableContainer,
122+
onError: (Throwable) -> Unit = onErrorStub,
123+
onSuccess: (T) -> Unit = onNextStub
124+
): Disposable = subscribe(onSuccess.asConsumer(), onError.asOnErrorConsumer(), container)
125+
126+
/**
127+
* Overloaded subscribe function that allows passing named parameters
128+
*/
129+
@CheckReturnValue
130+
@SchedulerSupport(SchedulerSupport.NONE)
131+
fun <T : Any> Maybe<T>.subscribeBy(
132+
container: DisposableContainer,
133+
onError: (Throwable) -> Unit = onErrorStub,
134+
onComplete: () -> Unit = onCompleteStub,
135+
onSuccess: (T) -> Unit = onNextStub
136+
): Disposable =
137+
subscribe(onSuccess.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction(), container)
138+
139+
/**
140+
* Overloaded subscribe function that allows passing named parameters
141+
*/
142+
@CheckReturnValue
143+
@SchedulerSupport(SchedulerSupport.NONE)
144+
fun Completable.subscribeBy(
145+
container: DisposableContainer,
146+
onError: (Throwable) -> Unit = onErrorStub,
147+
onComplete: () -> Unit = onCompleteStub
148+
): Disposable = subscribe(onComplete.asOnCompleteAction(), onError.asOnErrorConsumer(), container)
149+
90150
/**
91151
* Overloaded blockingSubscribe function that allows passing named parameters
92152
*/
93153
@SchedulerSupport(SchedulerSupport.NONE)
94154
fun <T : Any> Observable<T>.blockingSubscribeBy(
95-
onError: (Throwable) -> Unit = onErrorStub,
96-
onComplete: () -> Unit = onCompleteStub,
97-
onNext: (T) -> Unit = onNextStub
155+
onError: (Throwable) -> Unit = onErrorStub,
156+
onComplete: () -> Unit = onCompleteStub,
157+
onNext: (T) -> Unit = onNextStub
98158
): Unit = blockingSubscribe(onNext.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction())
99159

100160
/**
@@ -103,35 +163,35 @@ fun <T : Any> Observable<T>.blockingSubscribeBy(
103163
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
104164
@SchedulerSupport(SchedulerSupport.NONE)
105165
fun <T : Any> Flowable<T>.blockingSubscribeBy(
106-
onError: (Throwable) -> Unit = onErrorStub,
107-
onComplete: () -> Unit = onCompleteStub,
108-
onNext: (T) -> Unit = onNextStub
166+
onError: (Throwable) -> Unit = onErrorStub,
167+
onComplete: () -> Unit = onCompleteStub,
168+
onNext: (T) -> Unit = onNextStub
109169
): Unit = blockingSubscribe(onNext.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction())
110170

111171
/**
112172
* Overloaded blockingSubscribe function that allows passing named parameters
113173
*/
114174
@SchedulerSupport(SchedulerSupport.NONE)
115175
fun <T : Any> Maybe<T>.blockingSubscribeBy(
116-
onError: (Throwable) -> Unit = onErrorStub,
117-
onComplete: () -> Unit = onCompleteStub,
118-
onSuccess: (T) -> Unit = onNextStub
119-
) : Unit = blockingSubscribe(onSuccess.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction())
176+
onError: (Throwable) -> Unit = onErrorStub,
177+
onComplete: () -> Unit = onCompleteStub,
178+
onSuccess: (T) -> Unit = onNextStub
179+
): Unit = blockingSubscribe(onSuccess.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction())
120180

121181
/**
122182
* Overloaded blockingSubscribe function that allows passing named parameters
123183
*/
124184
@SchedulerSupport(SchedulerSupport.NONE)
125185
fun <T : Any> Single<T>.blockingSubscribeBy(
126-
onError: (Throwable) -> Unit = onErrorStub,
127-
onSuccess: (T) -> Unit = onNextStub
128-
) : Unit = blockingSubscribe(onSuccess.asConsumer(), onError.asOnErrorConsumer())
186+
onError: (Throwable) -> Unit = onErrorStub,
187+
onSuccess: (T) -> Unit = onNextStub
188+
): Unit = blockingSubscribe(onSuccess.asConsumer(), onError.asOnErrorConsumer())
129189

130190
/**
131191
* Overloaded blockingSubscribe function that allows passing named parameters
132192
*/
133193
@SchedulerSupport(SchedulerSupport.NONE)
134194
fun Completable.blockingSubscribeBy(
135-
onError: (Throwable) -> Unit = onErrorStub,
136-
onComplete: () -> Unit = onCompleteStub
195+
onError: (Throwable) -> Unit = onErrorStub,
196+
onComplete: () -> Unit = onCompleteStub
137197
): Unit = blockingSubscribe(onComplete.asOnCompleteAction(), onError.asOnErrorConsumer())
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
* Copyright (c) 2021-present, RxKotlin Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.reactivex.rxjava3.kotlin
18+
19+
import io.reactivex.rxjava3.disposables.CompositeDisposable
20+
import io.reactivex.rxjava3.observers.LambdaConsumerIntrospection
21+
import io.reactivex.rxjava3.subjects.CompletableSubject
22+
import org.junit.Assert.*
23+
import org.junit.Test
24+
import java.io.IOException
25+
26+
class CompletableConsumersTest {
27+
28+
private fun CompositeDisposable.isEmpty(): Boolean = size() == 0
29+
private fun CompositeDisposable.isNotEmpty(): Boolean = size() > 0
30+
31+
private val disposables = CompositeDisposable()
32+
private val subject = CompletableSubject.create()
33+
private val events = mutableListOf<Any>()
34+
35+
@Test
36+
fun errorIntrospectionNormal() {
37+
val disposable = subject.subscribeBy(disposables) as LambdaConsumerIntrospection
38+
assertFalse(disposable.hasCustomOnError())
39+
}
40+
41+
@Test
42+
fun errorIntrospectionCustom() {
43+
val disposable = subject.subscribeBy(disposables, onError = {}) as LambdaConsumerIntrospection
44+
assertTrue(disposable.hasCustomOnError())
45+
}
46+
47+
@Test
48+
fun onErrorNormal() {
49+
subject.subscribeBy(
50+
disposables,
51+
onError = events::add
52+
)
53+
54+
assertTrue(disposables.isNotEmpty())
55+
assertTrue(events.isEmpty())
56+
57+
subject.onComplete()
58+
59+
assertTrue(disposables.isEmpty())
60+
assertTrue(events.isEmpty())
61+
}
62+
63+
@Test
64+
fun onErrorError() {
65+
subject.subscribeBy(
66+
disposables,
67+
onError = events::add
68+
)
69+
70+
assertTrue(disposables.isNotEmpty())
71+
assertTrue(events.isEmpty())
72+
73+
subject.onError(IOException())
74+
75+
assertTrue(disposables.isEmpty())
76+
assertEquals(1, events.size)
77+
assertTrue(events[0] is IOException)
78+
}
79+
80+
@Test
81+
fun onCompleteNormal() {
82+
subject.subscribeBy(
83+
disposables,
84+
onError = events::add,
85+
onComplete = { events.add("completed") }
86+
)
87+
88+
assertTrue(disposables.isNotEmpty())
89+
assertTrue(events.isEmpty())
90+
91+
subject.onComplete()
92+
93+
assertTrue(disposables.isEmpty())
94+
assertEquals(listOf("completed"), events)
95+
}
96+
97+
@Test
98+
fun onCompleteError() {
99+
subject.subscribeBy(
100+
disposables,
101+
onError = events::add,
102+
onComplete = { events.add("completed") }
103+
)
104+
105+
assertTrue(disposables.isNotEmpty())
106+
assertTrue(events.isEmpty())
107+
108+
subject.onError(IOException())
109+
110+
assertTrue(disposables.isEmpty())
111+
assertEquals(1, events.size)
112+
assertTrue(events[0] is IOException)
113+
}
114+
115+
@Test
116+
fun onCompleteDispose() {
117+
val disposable = subject.subscribeBy(
118+
disposables,
119+
onError = events::add,
120+
onComplete = { events.add("completed") }
121+
)
122+
123+
assertFalse(disposable.isDisposed)
124+
assertTrue(disposables.isNotEmpty())
125+
assertTrue(events.isEmpty())
126+
127+
disposable.dispose()
128+
129+
assertTrue(disposable.isDisposed)
130+
assertTrue(disposables.isEmpty())
131+
assertTrue(events.isEmpty())
132+
}
133+
}

src/test/kotlin/io/reactivex/rxjava3/kotlin/ExtensionTests.kt

+5-7
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import io.reactivex.rxjava3.core.Observable
2121
import io.reactivex.rxjava3.core.ObservableEmitter
2222
import io.reactivex.rxjava3.schedulers.TestScheduler
2323
import io.reactivex.rxjava3.functions.*
24-
import org.funktionale.partials.invoke
2524
import org.junit.Assert.assertEquals
2625
import org.junit.Assert.fail
2726
import org.junit.Test
@@ -248,11 +247,6 @@ class ExtensionTests : KotlinTests() {
248247
inOrder.verifyNoMoreInteractions()
249248
}
250249

251-
val funOnSubscribe: (Int, ObservableEmitter<in String>) -> Unit = { counter, subscriber ->
252-
subscriber.onNext("hello_$counter")
253-
subscriber.onComplete()
254-
}
255-
256250
val asyncObservable: (ObservableEmitter<in Int>) -> Unit = { subscriber ->
257251
thread {
258252
Thread.sleep(50)
@@ -270,7 +264,11 @@ class ExtensionTests : KotlinTests() {
270264
get() = listOf(1, 3, 2, 5, 4).toObservable()
271265

272266
val onSubscribe: (ObservableEmitter<in String>) -> Unit
273-
get() = funOnSubscribe(p1 = counter++) // partial applied function
267+
get() = {
268+
it.onNext("hello_$counter")
269+
it.onComplete()
270+
counter ++
271+
}
274272

275273
val observable: Observable<String>
276274
get() = Observable.create(onSubscribe)

0 commit comments

Comments
 (0)