diff --git a/build.gradle.kts b/build.gradle.kts index 02f63c0..c470248 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,24 +1,23 @@ @file:Suppress("UNUSED_VARIABLE", "HasPlatformType") -import org.gradle.api.publish.maven.MavenPom import org.jetbrains.dokka.gradle.DokkaTask buildscript { repositories { - jcenter() + mavenCentral() } } plugins { id("java-library") - kotlin("jvm") version "1.4.10" + kotlin("jvm") version "1.5.21" id("org.jetbrains.dokka") version "0.9.18" id("maven-publish") id("com.jfrog.bintray") version "1.8.4" } repositories { - jcenter() + mavenCentral() } group = "io.reactivex.rxjava3" @@ -39,10 +38,9 @@ val examplesImplementation by configurations.getting { } dependencies { - api("io.reactivex.rxjava3:rxjava:3.0.6") + api("io.reactivex.rxjava3:rxjava:3.1.0") implementation(kotlin("stdlib")) - testImplementation("org.funktionale:funktionale-partials:1.0.0-final") testImplementation("junit:junit:4.12") testImplementation("org.mockito:mockito-core:1.10.19") @@ -61,7 +59,6 @@ val sourcesJar by tasks.creating(Jar::class) { val dokka by tasks.getting(DokkaTask::class) { outputFormat = "html" outputDirectory = "$buildDir/javadoc" - } //documentation @@ -146,7 +143,7 @@ bintray { setPublications(if (isRelease) release else snapshot) -// dryRun = true + // dryRun = true with(pkg) { userOrg = "reactivex" @@ -162,7 +159,7 @@ bintray { name = project.version.toString() vcsTag = project.version.toString() - with(gpg){ + with(gpg) { sign = true } diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 5c2d1cf..7454180 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index ee69dd6..05679dc 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-5.4.1-all.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-7.1.1-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/gradlew b/gradlew index b0d6d0a..744e882 100755 --- a/gradlew +++ b/gradlew @@ -7,7 +7,7 @@ # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, @@ -72,7 +72,7 @@ case "`uname`" in Darwin* ) darwin=true ;; - MINGW* ) + MSYS* | MINGW* ) msys=true ;; NONSTOP* ) @@ -82,6 +82,7 @@ esac CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + # Determine the Java command to use to start the JVM. if [ -n "$JAVA_HOME" ] ; then if [ -x "$JAVA_HOME/jre/sh/java" ] ; then @@ -125,10 +126,11 @@ if $darwin; then GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\"" fi -# For Cygwin, switch paths to Windows format before running java -if $cygwin ; then +# For Cygwin or MSYS, switch paths to Windows format before running java +if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then APP_HOME=`cygpath --path --mixed "$APP_HOME"` CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` + JAVACMD=`cygpath --unix "$JAVACMD"` # We build the pattern for arguments to be converted via cygpath @@ -154,19 +156,19 @@ if $cygwin ; then else eval `echo args$i`="\"$arg\"" fi - i=$((i+1)) + i=`expr $i + 1` done case $i in - (0) set -- ;; - (1) set -- "$args0" ;; - (2) set -- "$args0" "$args1" ;; - (3) set -- "$args0" "$args1" "$args2" ;; - (4) set -- "$args0" "$args1" "$args2" "$args3" ;; - (5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; - (6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; - (7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; - (8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; - (9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; + 0) set -- ;; + 1) set -- "$args0" ;; + 2) set -- "$args0" "$args1" ;; + 3) set -- "$args0" "$args1" "$args2" ;; + 4) set -- "$args0" "$args1" "$args2" "$args3" ;; + 5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; + 6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; + 7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; + 8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; + 9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; esac fi @@ -175,14 +177,9 @@ save () { for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done echo " " } -APP_ARGS=$(save "$@") +APP_ARGS=`save "$@"` # Collect all arguments for the java command, following the shell quoting and substitution rules eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS" -# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong -if [ "$(uname)" = "Darwin" ] && [ "$HOME" = "$PWD" ]; then - cd "$(dirname "$0")" -fi - exec "$JAVACMD" "$@" diff --git a/gradlew.bat b/gradlew.bat index 9991c50..107acd3 100644 --- a/gradlew.bat +++ b/gradlew.bat @@ -5,7 +5,7 @@ @rem you may not use this file except in compliance with the License. @rem You may obtain a copy of the License at @rem -@rem http://www.apache.org/licenses/LICENSE-2.0 +@rem https://www.apache.org/licenses/LICENSE-2.0 @rem @rem Unless required by applicable law or agreed to in writing, software @rem distributed under the License is distributed on an "AS IS" BASIS, @@ -29,6 +29,9 @@ if "%DIRNAME%" == "" set DIRNAME=. set APP_BASE_NAME=%~n0 set APP_HOME=%DIRNAME% +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + @rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" @@ -37,7 +40,7 @@ if defined JAVA_HOME goto findJavaFromJavaHome set JAVA_EXE=java.exe %JAVA_EXE% -version >NUL 2>&1 -if "%ERRORLEVEL%" == "0" goto init +if "%ERRORLEVEL%" == "0" goto execute echo. echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. @@ -51,7 +54,7 @@ goto fail set JAVA_HOME=%JAVA_HOME:"=% set JAVA_EXE=%JAVA_HOME%/bin/java.exe -if exist "%JAVA_EXE%" goto init +if exist "%JAVA_EXE%" goto execute echo. echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% @@ -61,28 +64,14 @@ echo location of your Java installation. goto fail -:init -@rem Get command-line arguments, handling Windows variants - -if not "%OS%" == "Windows_NT" goto win9xME_args - -:win9xME_args -@rem Slurp the command line arguments. -set CMD_LINE_ARGS= -set _SKIP=2 - -:win9xME_args_slurp -if "x%~1" == "x" goto execute - -set CMD_LINE_ARGS=%* - :execute @rem Setup the command line set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + @rem Execute Gradle -"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS% +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* :end @rem End local scope for the variables with windows NT shell diff --git a/src/main/kotlin/io/reactivex/rxjava3/kotlin/subscribers.kt b/src/main/kotlin/io/reactivex/rxjava3/kotlin/subscribers.kt index c333f2a..c39a4a9 100755 --- a/src/main/kotlin/io/reactivex/rxjava3/kotlin/subscribers.kt +++ b/src/main/kotlin/io/reactivex/rxjava3/kotlin/subscribers.kt @@ -6,11 +6,11 @@ import io.reactivex.rxjava3.annotations.CheckReturnValue import io.reactivex.rxjava3.annotations.SchedulerSupport import io.reactivex.rxjava3.core.* import io.reactivex.rxjava3.disposables.Disposable +import io.reactivex.rxjava3.disposables.DisposableContainer import io.reactivex.rxjava3.functions.Action import io.reactivex.rxjava3.functions.Consumer import io.reactivex.rxjava3.internal.functions.Functions - private val onNextStub: (Any) -> Unit = {} private val onErrorStub: (Throwable) -> Unit = {} private val onCompleteStub: () -> Unit = {} @@ -33,9 +33,9 @@ private fun (() -> Unit).asOnCompleteAction(): Action { @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) fun Observable.subscribeBy( - onError: (Throwable) -> Unit = onErrorStub, - onComplete: () -> Unit = onCompleteStub, - onNext: (T) -> Unit = onNextStub + onError: (Throwable) -> Unit = onErrorStub, + onComplete: () -> Unit = onCompleteStub, + onNext: (T) -> Unit = onNextStub ): Disposable = subscribe(onNext.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction()) /** @@ -45,9 +45,9 @@ fun Observable.subscribeBy( @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) @SchedulerSupport(SchedulerSupport.NONE) fun Flowable.subscribeBy( - onError: (Throwable) -> Unit = onErrorStub, - onComplete: () -> Unit = onCompleteStub, - onNext: (T) -> Unit = onNextStub + onError: (Throwable) -> Unit = onErrorStub, + onComplete: () -> Unit = onCompleteStub, + onNext: (T) -> Unit = onNextStub ): Disposable = subscribe(onNext.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction()) /** @@ -56,8 +56,8 @@ fun Flowable.subscribeBy( @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) fun Single.subscribeBy( - onError: (Throwable) -> Unit = onErrorStub, - onSuccess: (T) -> Unit = onNextStub + onError: (Throwable) -> Unit = onErrorStub, + onSuccess: (T) -> Unit = onNextStub ): Disposable = subscribe(onSuccess.asConsumer(), onError.asOnErrorConsumer()) /** @@ -66,9 +66,9 @@ fun Single.subscribeBy( @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) fun Maybe.subscribeBy( - onError: (Throwable) -> Unit = onErrorStub, - onComplete: () -> Unit = onCompleteStub, - onSuccess: (T) -> Unit = onNextStub + onError: (Throwable) -> Unit = onErrorStub, + onComplete: () -> Unit = onCompleteStub, + onSuccess: (T) -> Unit = onNextStub ): Disposable = subscribe(onSuccess.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction()) /** @@ -77,8 +77,8 @@ fun Maybe.subscribeBy( @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) fun Completable.subscribeBy( - onError: (Throwable) -> Unit = onErrorStub, - onComplete: () -> Unit = onCompleteStub + onError: (Throwable) -> Unit = onErrorStub, + onComplete: () -> Unit = onCompleteStub ): Disposable = when { // There are optimized versions of the completable Consumers, so we need to use the subscribe overloads // here. @@ -87,14 +87,69 @@ fun Completable.subscribeBy( else -> subscribe(onComplete.asOnCompleteAction(), Consumer(onError)) } +/** + * Overloaded subscribe function that allows passing named parameters + */ +@SchedulerSupport(SchedulerSupport.NONE) +fun Observable.subscribeBy( + container: DisposableContainer, + onError: (Throwable) -> Unit = onErrorStub, + onComplete: () -> Unit = onCompleteStub, + onNext: (T) -> Unit = onNextStub +): Disposable = subscribe(onNext.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction(), container) + +/** + * Overloaded subscribe function that allows passing named parameters + */ +@BackpressureSupport(BackpressureKind.UNBOUNDED_IN) +@SchedulerSupport(SchedulerSupport.NONE) +fun Flowable.subscribeBy( + container: DisposableContainer, + onError: (Throwable) -> Unit = onErrorStub, + onComplete: () -> Unit = onCompleteStub, + onNext: (T) -> Unit = onNextStub +): Disposable = subscribe(onNext.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction(), container) + +/** + * Overloaded subscribe function that allows passing named parameters + */ +@SchedulerSupport(SchedulerSupport.NONE) +fun Single.subscribeBy( + container: DisposableContainer, + onError: (Throwable) -> Unit = onErrorStub, + onSuccess: (T) -> Unit = onNextStub +): Disposable = subscribe(onSuccess.asConsumer(), onError.asOnErrorConsumer(), container) + +/** + * Overloaded subscribe function that allows passing named parameters + */ +@SchedulerSupport(SchedulerSupport.NONE) +fun Maybe.subscribeBy( + container: DisposableContainer, + onError: (Throwable) -> Unit = onErrorStub, + onComplete: () -> Unit = onCompleteStub, + onSuccess: (T) -> Unit = onNextStub +): Disposable = + subscribe(onSuccess.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction(), container) + +/** + * Overloaded subscribe function that allows passing named parameters + */ +@SchedulerSupport(SchedulerSupport.NONE) +fun Completable.subscribeBy( + container: DisposableContainer, + onError: (Throwable) -> Unit = onErrorStub, + onComplete: () -> Unit = onCompleteStub +): Disposable = subscribe(onComplete.asOnCompleteAction(), onError.asOnErrorConsumer(), container) + /** * Overloaded blockingSubscribe function that allows passing named parameters */ @SchedulerSupport(SchedulerSupport.NONE) fun Observable.blockingSubscribeBy( - onError: (Throwable) -> Unit = onErrorStub, - onComplete: () -> Unit = onCompleteStub, - onNext: (T) -> Unit = onNextStub + onError: (Throwable) -> Unit = onErrorStub, + onComplete: () -> Unit = onCompleteStub, + onNext: (T) -> Unit = onNextStub ): Unit = blockingSubscribe(onNext.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction()) /** @@ -103,9 +158,9 @@ fun Observable.blockingSubscribeBy( @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) @SchedulerSupport(SchedulerSupport.NONE) fun Flowable.blockingSubscribeBy( - onError: (Throwable) -> Unit = onErrorStub, - onComplete: () -> Unit = onCompleteStub, - onNext: (T) -> Unit = onNextStub + onError: (Throwable) -> Unit = onErrorStub, + onComplete: () -> Unit = onCompleteStub, + onNext: (T) -> Unit = onNextStub ): Unit = blockingSubscribe(onNext.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction()) /** @@ -113,25 +168,25 @@ fun Flowable.blockingSubscribeBy( */ @SchedulerSupport(SchedulerSupport.NONE) fun Maybe.blockingSubscribeBy( - onError: (Throwable) -> Unit = onErrorStub, - onComplete: () -> Unit = onCompleteStub, - onSuccess: (T) -> Unit = onNextStub -) : Unit = blockingSubscribe(onSuccess.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction()) + onError: (Throwable) -> Unit = onErrorStub, + onComplete: () -> Unit = onCompleteStub, + onSuccess: (T) -> Unit = onNextStub +): Unit = blockingSubscribe(onSuccess.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction()) /** * Overloaded blockingSubscribe function that allows passing named parameters */ @SchedulerSupport(SchedulerSupport.NONE) fun Single.blockingSubscribeBy( - onError: (Throwable) -> Unit = onErrorStub, - onSuccess: (T) -> Unit = onNextStub -) : Unit = blockingSubscribe(onSuccess.asConsumer(), onError.asOnErrorConsumer()) + onError: (Throwable) -> Unit = onErrorStub, + onSuccess: (T) -> Unit = onNextStub +): Unit = blockingSubscribe(onSuccess.asConsumer(), onError.asOnErrorConsumer()) /** * Overloaded blockingSubscribe function that allows passing named parameters */ @SchedulerSupport(SchedulerSupport.NONE) fun Completable.blockingSubscribeBy( - onError: (Throwable) -> Unit = onErrorStub, - onComplete: () -> Unit = onCompleteStub + onError: (Throwable) -> Unit = onErrorStub, + onComplete: () -> Unit = onCompleteStub ): Unit = blockingSubscribe(onComplete.asOnCompleteAction(), onError.asOnErrorConsumer()) diff --git a/src/test/kotlin/io/reactivex/rxjava3/kotlin/CompletableConsumersTest.kt b/src/test/kotlin/io/reactivex/rxjava3/kotlin/CompletableConsumersTest.kt new file mode 100644 index 0000000..2a6467f --- /dev/null +++ b/src/test/kotlin/io/reactivex/rxjava3/kotlin/CompletableConsumersTest.kt @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2021-present, RxKotlin Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.reactivex.rxjava3.kotlin + +import io.reactivex.rxjava3.disposables.CompositeDisposable +import io.reactivex.rxjava3.observers.LambdaConsumerIntrospection +import io.reactivex.rxjava3.subjects.CompletableSubject +import org.junit.Assert.* +import org.junit.Test +import java.io.IOException + +class CompletableConsumersTest { + + private fun CompositeDisposable.isEmpty(): Boolean = size() == 0 + private fun CompositeDisposable.isNotEmpty(): Boolean = size() > 0 + + private val disposables = CompositeDisposable() + private val subject = CompletableSubject.create() + private val events = mutableListOf() + + @Test + fun errorIntrospectionNormal() { + val disposable = subject.subscribeBy(disposables) as LambdaConsumerIntrospection + assertFalse(disposable.hasCustomOnError()) + } + + @Test + fun errorIntrospectionCustom() { + val disposable = subject.subscribeBy(disposables, onError = {}) as LambdaConsumerIntrospection + assertTrue(disposable.hasCustomOnError()) + } + + @Test + fun onErrorNormal() { + subject.subscribeBy( + disposables, + onError = events::add + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + subject.onComplete() + + assertTrue(disposables.isEmpty()) + assertTrue(events.isEmpty()) + } + + @Test + fun onErrorError() { + subject.subscribeBy( + disposables, + onError = events::add + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + subject.onError(IOException()) + + assertTrue(disposables.isEmpty()) + assertEquals(1, events.size) + assertTrue(events[0] is IOException) + } + + @Test + fun onCompleteNormal() { + subject.subscribeBy( + disposables, + onError = events::add, + onComplete = { events.add("completed") } + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + subject.onComplete() + + assertTrue(disposables.isEmpty()) + assertEquals(listOf("completed"), events) + } + + @Test + fun onCompleteError() { + subject.subscribeBy( + disposables, + onError = events::add, + onComplete = { events.add("completed") } + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + subject.onError(IOException()) + + assertTrue(disposables.isEmpty()) + assertEquals(1, events.size) + assertTrue(events[0] is IOException) + } + + @Test + fun onCompleteDispose() { + val disposable = subject.subscribeBy( + disposables, + onError = events::add, + onComplete = { events.add("completed") } + ) + + assertFalse(disposable.isDisposed) + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + disposable.dispose() + + assertTrue(disposable.isDisposed) + assertTrue(disposables.isEmpty()) + assertTrue(events.isEmpty()) + } +} diff --git a/src/test/kotlin/io/reactivex/rxjava3/kotlin/ExtensionTests.kt b/src/test/kotlin/io/reactivex/rxjava3/kotlin/ExtensionTests.kt index be0aa2b..70cb384 100644 --- a/src/test/kotlin/io/reactivex/rxjava3/kotlin/ExtensionTests.kt +++ b/src/test/kotlin/io/reactivex/rxjava3/kotlin/ExtensionTests.kt @@ -21,7 +21,6 @@ import io.reactivex.rxjava3.core.Observable import io.reactivex.rxjava3.core.ObservableEmitter import io.reactivex.rxjava3.schedulers.TestScheduler import io.reactivex.rxjava3.functions.* -import org.funktionale.partials.invoke import org.junit.Assert.assertEquals import org.junit.Assert.fail import org.junit.Test @@ -248,11 +247,6 @@ class ExtensionTests : KotlinTests() { inOrder.verifyNoMoreInteractions() } - val funOnSubscribe: (Int, ObservableEmitter) -> Unit = { counter, subscriber -> - subscriber.onNext("hello_$counter") - subscriber.onComplete() - } - val asyncObservable: (ObservableEmitter) -> Unit = { subscriber -> thread { Thread.sleep(50) @@ -270,7 +264,11 @@ class ExtensionTests : KotlinTests() { get() = listOf(1, 3, 2, 5, 4).toObservable() val onSubscribe: (ObservableEmitter) -> Unit - get() = funOnSubscribe(p1 = counter++) // partial applied function + get() = { + it.onNext("hello_$counter") + it.onComplete() + counter ++ + } val observable: Observable get() = Observable.create(onSubscribe) diff --git a/src/test/kotlin/io/reactivex/rxjava3/kotlin/FlowableConsumersTest.kt b/src/test/kotlin/io/reactivex/rxjava3/kotlin/FlowableConsumersTest.kt new file mode 100644 index 0000000..46ef417 --- /dev/null +++ b/src/test/kotlin/io/reactivex/rxjava3/kotlin/FlowableConsumersTest.kt @@ -0,0 +1,181 @@ +/* + * Copyright (c) 2021-present, RxKotlin Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.reactivex.rxjava3.kotlin + +import io.reactivex.rxjava3.disposables.CompositeDisposable +import io.reactivex.rxjava3.observers.LambdaConsumerIntrospection +import io.reactivex.rxjava3.processors.PublishProcessor +import org.junit.Assert.* +import org.junit.Test +import java.io.IOException + +class FlowableConsumersTest { + + private fun CompositeDisposable.isEmpty(): Boolean = size() == 0 + private fun CompositeDisposable.isNotEmpty(): Boolean = size() > 0 + + private val disposables = CompositeDisposable() + private val processor = PublishProcessor.create() + private val events = mutableListOf() + + @Test + fun errorIntrospectionNormal() { + val disposable = processor.subscribeBy(disposables) as LambdaConsumerIntrospection + assertFalse(disposable.hasCustomOnError()) + } + + @Test + fun errorIntrospectionCustom() { + val disposable = processor.subscribeBy(disposables, onError = {}) as LambdaConsumerIntrospection + assertTrue(disposable.hasCustomOnError()) + } + + @Test + fun onNextNormal() { + processor.subscribeBy( + disposables, + onNext = events::add + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + processor.onNext(1) + + assertTrue(disposables.isNotEmpty()) + assertEquals(listOf(1), events) + + processor.onComplete() + + assertTrue(disposables.isEmpty()) + assertEquals(listOf(1), events) + } + + @Test + fun onErrorNormal() { + processor.subscribeBy( + disposables, + onNext = events::add, + onError = events::add + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + processor.onNext(1) + + assertTrue(disposables.isNotEmpty()) + assertEquals(listOf(1), events) + + processor.onComplete() + + assertTrue(disposables.isEmpty()) + assertEquals(listOf(1), events) + } + + @Test + fun onErrorError() { + processor.subscribeBy( + disposables, + onNext = events::add, + onError = events::add + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + processor.onNext(1) + + assertTrue(disposables.isNotEmpty()) + assertEquals(listOf(1), events) + + processor.onError(IOException()) + + assertTrue(disposables.isEmpty()) + assertEquals(2, events.size) + assertEquals(1, events[0]) + assertTrue(events[1] is IOException) + } + + @Test + fun onCompleteNormal() { + processor.subscribeBy( + disposables, + onNext = events::add, + onError = events::add, + onComplete = { events.add("completed") } + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + processor.onNext(1) + + assertTrue(disposables.isNotEmpty()) + assertEquals(listOf(1), events) + + processor.onComplete() + + assertTrue(disposables.isEmpty()) + assertEquals(listOf(1, "completed"), events) + } + + @Test + fun onCompleteError() { + processor.subscribeBy( + disposables, + onNext = events::add, + onError = events::add, + onComplete = { events.add("completed") } + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + processor.onNext(1) + + assertTrue(disposables.isNotEmpty()) + assertEquals(listOf(1), events) + + processor.onError(IOException()) + + assertTrue(disposables.isEmpty()) + assertEquals(2, events.size) + assertEquals(1, events[0]) + assertTrue(events[1] is IOException) + } + + @Test + fun onCompleteDispose() { + val disposable = processor.subscribeBy( + disposables, + onNext = events::add, + onError = events::add, + onComplete = { events.add("completed") } + ) + + assertFalse(disposable.isDisposed) + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + disposable.dispose() + + assertTrue(disposable.isDisposed) + assertTrue(disposables.isEmpty()) + assertTrue(events.isEmpty()) + } +} diff --git a/src/test/kotlin/io/reactivex/rxjava3/kotlin/MaybeConsumersTest.kt b/src/test/kotlin/io/reactivex/rxjava3/kotlin/MaybeConsumersTest.kt new file mode 100644 index 0000000..f6adbce --- /dev/null +++ b/src/test/kotlin/io/reactivex/rxjava3/kotlin/MaybeConsumersTest.kt @@ -0,0 +1,154 @@ +/* + * Copyright (c) 2021-present, RxKotlin Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.reactivex.rxjava3.kotlin + +import io.reactivex.rxjava3.disposables.CompositeDisposable +import io.reactivex.rxjava3.observers.LambdaConsumerIntrospection +import io.reactivex.rxjava3.subjects.MaybeSubject +import org.junit.Assert.* +import org.junit.Test +import java.io.IOException + +class MaybeConsumersTest { + + private fun CompositeDisposable.isEmpty(): Boolean = size() == 0 + private fun CompositeDisposable.isNotEmpty(): Boolean = size() > 0 + + private val disposables = CompositeDisposable() + private val subject = MaybeSubject.create() + private val events = mutableListOf() + + @Test + fun errorIntrospectionNormal() { + val disposable = subject.subscribeBy(disposables) as LambdaConsumerIntrospection + assertFalse(disposable.hasCustomOnError()) + } + + @Test + fun errorIntrospectionCustom() { + val disposable = subject.subscribeBy(disposables, onError = {}) as LambdaConsumerIntrospection + assertTrue(disposable.hasCustomOnError()) + } + + @Test + fun onSuccessNormal() { + subject.subscribeBy( + disposables, + onSuccess = events::add + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + subject.onSuccess(1) + + assertTrue(disposables.isEmpty()) + assertEquals(listOf(1), events) + } + + @Test + fun onErrorNormal() { + subject.subscribeBy( + disposables, + onSuccess = events::add, + onError = events::add + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + subject.onSuccess(1) + + assertTrue(disposables.isEmpty()) + assertEquals(listOf(1), events) + } + + @Test + fun onErrorError() { + subject.subscribeBy( + disposables, + onSuccess = events::add, + onError = events::add + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + subject.onError(IOException()) + + assertTrue(disposables.isEmpty()) + assertEquals(1, events.size) + assertTrue(events[0] is IOException) + } + + @Test + fun onCompleteNormal() { + subject.subscribeBy( + disposables, + onSuccess = events::add, + onError = events::add, + onComplete = { events.add("completed") } + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + subject.onComplete() + + assertTrue(disposables.isEmpty()) + assertEquals(listOf("completed"), events) + } + + @Test + fun onCompleteError() { + subject.subscribeBy( + disposables, + onSuccess = events::add, + onError = events::add, + onComplete = { events.add("completed") } + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + subject.onError(IOException()) + + assertTrue(disposables.isEmpty()) + assertEquals(1, events.size) + assertTrue(events[0] is IOException) + } + + @Test + fun onCompleteDispose() { + val disposable = subject.subscribeBy( + disposables, + onSuccess = events::add, + onError = events::add, + onComplete = { events.add("completed") } + ) + + assertFalse(disposable.isDisposed) + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + disposable.dispose() + + assertTrue(disposable.isDisposed) + assertTrue(disposables.isEmpty()) + assertTrue(events.isEmpty()) + } +} diff --git a/src/test/kotlin/io/reactivex/rxjava3/kotlin/ObservableConsumersTest.kt b/src/test/kotlin/io/reactivex/rxjava3/kotlin/ObservableConsumersTest.kt new file mode 100644 index 0000000..cc5fac3 --- /dev/null +++ b/src/test/kotlin/io/reactivex/rxjava3/kotlin/ObservableConsumersTest.kt @@ -0,0 +1,181 @@ +/* + * Copyright (c) 2021-present, RxKotlin Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.reactivex.rxjava3.kotlin + +import io.reactivex.rxjava3.disposables.CompositeDisposable +import io.reactivex.rxjava3.observers.LambdaConsumerIntrospection +import io.reactivex.rxjava3.subjects.PublishSubject +import org.junit.Assert.* +import org.junit.Test +import java.io.IOException + +class ObservableConsumersTest { + + private fun CompositeDisposable.isEmpty(): Boolean = size() == 0 + private fun CompositeDisposable.isNotEmpty(): Boolean = size() > 0 + + private val disposables = CompositeDisposable() + private val subject = PublishSubject.create() + private val events = mutableListOf() + + @Test + fun errorIntrospectionNormal() { + val disposable = subject.subscribeBy(disposables) as LambdaConsumerIntrospection + assertFalse(disposable.hasCustomOnError()) + } + + @Test + fun errorIntrospectionCustom() { + val disposable = subject.subscribeBy(disposables, onError = {}) as LambdaConsumerIntrospection + assertTrue(disposable.hasCustomOnError()) + } + + @Test + fun onNextNormal() { + subject.subscribeBy( + disposables, + onNext = events::add + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + subject.onNext(1) + + assertTrue(disposables.isNotEmpty()) + assertEquals(listOf(1), events) + + subject.onComplete() + + assertTrue(disposables.isEmpty()) + assertEquals(listOf(1), events) + } + + @Test + fun onErrorNormal() { + subject.subscribeBy( + disposables, + onNext = events::add, + onError = events::add + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + subject.onNext(1) + + assertTrue(disposables.isNotEmpty()) + assertEquals(listOf(1), events) + + subject.onComplete() + + assertTrue(disposables.isEmpty()) + assertEquals(listOf(1), events) + } + + @Test + fun onErrorError() { + subject.subscribeBy( + disposables, + onNext = events::add, + onError = events::add + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + subject.onNext(1) + + assertTrue(disposables.isNotEmpty()) + assertEquals(listOf(1), events) + + subject.onError(IOException()) + + assertTrue(disposables.isEmpty()) + assertEquals(2, events.size) + assertEquals(1, events[0]) + assertTrue(events[1] is IOException) + } + + @Test + fun onCompleteNormal() { + subject.subscribeBy( + disposables, + onNext = events::add, + onError = events::add, + onComplete = { events.add("completed") } + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + subject.onNext(1) + + assertTrue(disposables.isNotEmpty()) + assertEquals(listOf(1), events) + + subject.onComplete() + + assertTrue(disposables.isEmpty()) + assertEquals(listOf(1, "completed"), events) + } + + @Test + fun onCompleteError() { + subject.subscribeBy( + disposables, + onNext = events::add, + onError = events::add, + onComplete = { events.add("completed") } + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + subject.onNext(1) + + assertTrue(disposables.isNotEmpty()) + assertEquals(listOf(1), events) + + subject.onError(IOException()) + + assertTrue(disposables.isEmpty()) + assertEquals(2, events.size) + assertEquals(1, events[0]) + assertTrue(events[1] is IOException) + } + + @Test + fun onCompleteDispose() { + val disposable = subject.subscribeBy( + disposables, + onNext = events::add, + onError = events::add, + onComplete = { events.add("completed") } + ) + + assertFalse(disposable.isDisposed) + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + disposable.dispose() + + assertTrue(disposable.isDisposed) + assertTrue(disposables.isEmpty()) + assertTrue(events.isEmpty()) + } +} diff --git a/src/test/kotlin/io/reactivex/rxjava3/kotlin/SingleConsumersTest.kt b/src/test/kotlin/io/reactivex/rxjava3/kotlin/SingleConsumersTest.kt new file mode 100644 index 0000000..cf02fe5 --- /dev/null +++ b/src/test/kotlin/io/reactivex/rxjava3/kotlin/SingleConsumersTest.kt @@ -0,0 +1,100 @@ +package io.reactivex.rxjava3.kotlin + +import io.reactivex.rxjava3.disposables.CompositeDisposable +import io.reactivex.rxjava3.observers.LambdaConsumerIntrospection +import io.reactivex.rxjava3.subjects.SingleSubject +import org.junit.Assert.* +import org.junit.Test +import java.io.IOException + +class SingleConsumersTest { + + private fun CompositeDisposable.isEmpty(): Boolean = size() == 0 + private fun CompositeDisposable.isNotEmpty(): Boolean = size() > 0 + + private val disposables = CompositeDisposable() + private val subject = SingleSubject.create() + private val events = mutableListOf() + + @Test + fun errorIntrospectionNormal() { + val disposable = subject.subscribeBy(disposables) as LambdaConsumerIntrospection + assertFalse(disposable.hasCustomOnError()) + } + + @Test + fun errorIntrospectionCustom() { + val disposable = subject.subscribeBy(disposables, onError = {}) as LambdaConsumerIntrospection + assertTrue(disposable.hasCustomOnError()) + } + + @Test + fun onSuccessNormal() { + subject.subscribeBy( + disposables, + onSuccess = events::add + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + subject.onSuccess(1) + + assertTrue(disposables.isEmpty()) + assertEquals(listOf(1), events) + } + + @Test + fun onErrorNormal() { + subject.subscribeBy( + disposables, + onSuccess = events::add, + onError = events::add + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + subject.onSuccess(1) + + assertTrue(disposables.isEmpty()) + assertEquals(listOf(1), events) + } + + @Test + fun onErrorError() { + subject.subscribeBy( + disposables, + onSuccess = events::add, + onError = events::add + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + subject.onError(IOException()) + + assertTrue(disposables.isEmpty()) + assertEquals(1, events.size) + assertTrue(events[0] is IOException) + } + + @Test + fun onCompleteDispose() { + val disposable = subject.subscribeBy( + disposables, + onSuccess = events::add, + onError = events::add + ) + + assertFalse(disposable.isDisposed) + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + disposable.dispose() + + assertTrue(disposable.isDisposed) + assertTrue(disposables.isEmpty()) + assertTrue(events.isEmpty()) + } +}