From 7cbf29b96453726ee7acae6af867d62db57841ff Mon Sep 17 00:00:00 2001 From: Vaclav Haisman Date: Sat, 12 Apr 2025 10:11:33 +0200 Subject: [PATCH] Replace RxJava 2 with RxJava 3. --- build.gradle.kts | 2 +- gradle.properties | 2 +- .../mqtt3/reactor/Mqtt3ReactorClientView.java | 2 +- .../mqtt/reactor/MqttReactorClient.java | 2 +- .../client/internal/mqtt/MqttAsyncClient.java | 4 +-- .../internal/mqtt/MqttBlockingClient.java | 6 ++--- .../mqtt/MqttClientExecutorConfigImpl.java | 2 +- .../MqttClientExecutorConfigImplBuilder.java | 2 +- .../client/internal/mqtt/MqttRxClient.java | 26 ++++--------------- .../mqtt3/Mqtt3ExceptionFactory.java | 2 +- .../handler/auth/MqttReAuthCompletable.java | 6 ++--- .../mqtt/handler/connect/MqttConnAckFlow.java | 4 +-- .../handler/connect/MqttConnAckSingle.java | 6 ++--- .../disconnect/MqttDisconnectCompletable.java | 6 ++--- .../MqttGlobalIncomingPublishFlowable.java | 2 +- .../incoming/MqttIncomingPublishFlow.java | 6 ++--- .../publish/outgoing/MqttAckFlowable.java | 4 +-- .../publish/outgoing/MqttAckFlowableFlow.java | 4 +-- .../publish/outgoing/MqttAckSingle.java | 10 +++---- .../outgoing/MqttAckSingleFlowable.java | 4 +-- .../outgoing/MqttOutgoingQosHandler.java | 4 +-- .../outgoing/MqttPublishFlowableAckLink.java | 6 ++--- .../outgoing/MqttPublishFlowables.java | 4 +-- .../handler/subscribe/MqttSubAckSingle.java | 4 +-- .../subscribe/MqttSubOrUnsubAckFlow.java | 4 +-- .../handler/subscribe/MqttUnsubAckSingle.java | 4 +-- .../connack/mqtt3/Mqtt3ConnAckView.java | 2 +- .../publish/mqtt3/Mqtt3PublishResultView.java | 2 +- .../publish/mqtt3/Mqtt3PublishView.java | 2 +- .../suback/mqtt3/Mqtt3SubAckView.java | 2 +- .../mqtt/mqtt3/Mqtt3RxClientView.java | 8 +++--- .../client/internal/rx/CompletableFlow.java | 4 +-- .../client/internal/rx/RxFutureConverter.java | 9 +++++-- .../rx/WithSingleConditionalSubscriber.java | 2 +- .../rx/WithSingleStrictSubscriber.java | 4 +-- .../operators/FlowableWithSingleCombine.java | 6 ++--- .../rx/operators/FlowableWithSingleMap.java | 6 ++--- .../operators/FlowableWithSingleMapError.java | 10 +++---- .../FlowableWithSingleObserveOn.java | 2 +- .../client/mqtt/MqttClientExecutorConfig.java | 4 +-- .../MqttClientExecutorConfigBuilderBase.java | 2 +- .../client/mqtt/mqtt3/Mqtt3RxClient.java | 6 ++--- .../client/mqtt/mqtt5/Mqtt5RxClient.java | 6 ++--- .../hivemq/client/rx/FlowableWithSingle.java | 16 ++++++------ .../rx/FlowableWithSingleSubscriber.java | 2 +- .../client/example/Mqtt3ClientExample.java | 6 ++--- .../Mqtt3RxClientViewExceptionsTest.java | 6 ++--- .../internal/rx/RxFutureConverterTest.java | 11 +++++--- .../client/rx/FlowableWithSingleItem.java | 4 +-- .../client/rx/FlowableWithSingleSplit.java | 2 +- .../client/rx/FlowableWithSingleTest.java | 10 +++---- 51 files changed, 128 insertions(+), 134 deletions(-) diff --git a/build.gradle.kts b/build.gradle.kts index 5fe82547c..9ba4bec43 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -66,7 +66,7 @@ allprojects { /* ******************** dependencies ******************** */ dependencies { - api("io.reactivex.rxjava2:rxjava:${property("rxjava.version")}") + api("io.reactivex.rxjava3:rxjava:${property("rxjava.version")}") api("org.reactivestreams:reactive-streams:${property("reactive-streams.version")}") implementation("io.netty:netty-buffer:${property("netty.version")}") diff --git a/gradle.properties b/gradle.properties index 5f19da623..7afeb4eb7 100644 --- a/gradle.properties +++ b/gradle.properties @@ -3,7 +3,7 @@ prevVersion=1.3.4 # # main dependencies # -rxjava.version=2.2.21 +rxjava.version=3.1.10 reactive-streams.version=1.0.4 netty.version=4.1.119.Final jctools.version=2.1.2 diff --git a/reactor/src/main/java/com/hivemq/client/internal/mqtt/mqtt3/reactor/Mqtt3ReactorClientView.java b/reactor/src/main/java/com/hivemq/client/internal/mqtt/mqtt3/reactor/Mqtt3ReactorClientView.java index 5cfb253bd..6e73713ab 100644 --- a/reactor/src/main/java/com/hivemq/client/internal/mqtt/mqtt3/reactor/Mqtt3ReactorClientView.java +++ b/reactor/src/main/java/com/hivemq/client/internal/mqtt/mqtt3/reactor/Mqtt3ReactorClientView.java @@ -34,7 +34,7 @@ import com.hivemq.client.mqtt.mqtt3.message.unsubscribe.Mqtt3Unsubscribe; import com.hivemq.client.mqtt.mqtt3.reactor.Mqtt3ReactorClient; import com.hivemq.client.rx.reactor.FluxWithSingle; -import io.reactivex.Flowable; +import io.reactivex.rxjava3.core.Flowable; import org.jetbrains.annotations.NotNull; import org.reactivestreams.Publisher; import reactor.adapter.rxjava.RxJava2Adapter; diff --git a/reactor/src/main/java/com/hivemq/client/internal/mqtt/reactor/MqttReactorClient.java b/reactor/src/main/java/com/hivemq/client/internal/mqtt/reactor/MqttReactorClient.java index 351ed3ebe..b6453e653 100644 --- a/reactor/src/main/java/com/hivemq/client/internal/mqtt/reactor/MqttReactorClient.java +++ b/reactor/src/main/java/com/hivemq/client/internal/mqtt/reactor/MqttReactorClient.java @@ -38,7 +38,7 @@ import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.unsuback.Mqtt5UnsubAck; import com.hivemq.client.mqtt.mqtt5.reactor.Mqtt5ReactorClient; import com.hivemq.client.rx.reactor.FluxWithSingle; -import io.reactivex.Flowable; +import io.reactivex.rxjava3.core.Flowable; import org.jetbrains.annotations.NotNull; import org.reactivestreams.Publisher; import reactor.adapter.rxjava.RxJava2Adapter; diff --git a/src/main/java/com/hivemq/client/internal/mqtt/MqttAsyncClient.java b/src/main/java/com/hivemq/client/internal/mqtt/MqttAsyncClient.java index e9c768431..8992ae9f2 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/MqttAsyncClient.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/MqttAsyncClient.java @@ -40,8 +40,8 @@ import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck; import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.Mqtt5Unsubscribe; import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.unsuback.Mqtt5UnsubAck; -import io.reactivex.FlowableSubscriber; -import io.reactivex.schedulers.Schedulers; +import io.reactivex.rxjava3.core.FlowableSubscriber; +import io.reactivex.rxjava3.schedulers.Schedulers; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Subscription; diff --git a/src/main/java/com/hivemq/client/internal/mqtt/MqttBlockingClient.java b/src/main/java/com/hivemq/client/internal/mqtt/MqttBlockingClient.java index a5c86b83e..98f3e47ce 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/MqttBlockingClient.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/MqttBlockingClient.java @@ -45,9 +45,9 @@ import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck; import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.Mqtt5Unsubscribe; import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.unsuback.Mqtt5UnsubAck; -import io.reactivex.Flowable; -import io.reactivex.FlowableSubscriber; -import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.FlowableSubscriber; +import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Subscription; diff --git a/src/main/java/com/hivemq/client/internal/mqtt/MqttClientExecutorConfigImpl.java b/src/main/java/com/hivemq/client/internal/mqtt/MqttClientExecutorConfigImpl.java index 4f1b290f9..b899f68f4 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/MqttClientExecutorConfigImpl.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/MqttClientExecutorConfigImpl.java @@ -17,7 +17,7 @@ package com.hivemq.client.internal.mqtt; import com.hivemq.client.mqtt.MqttClientExecutorConfig; -import io.reactivex.Scheduler; +import io.reactivex.rxjava3.core.Scheduler; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/src/main/java/com/hivemq/client/internal/mqtt/MqttClientExecutorConfigImplBuilder.java b/src/main/java/com/hivemq/client/internal/mqtt/MqttClientExecutorConfigImplBuilder.java index 6c68726af..4bf1fb8c2 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/MqttClientExecutorConfigImplBuilder.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/MqttClientExecutorConfigImplBuilder.java @@ -18,7 +18,7 @@ import com.hivemq.client.internal.util.Checks; import com.hivemq.client.mqtt.MqttClientExecutorConfigBuilder; -import io.reactivex.Scheduler; +import io.reactivex.rxjava3.core.Scheduler; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/src/main/java/com/hivemq/client/internal/mqtt/MqttRxClient.java b/src/main/java/com/hivemq/client/internal/mqtt/MqttRxClient.java index c9fa963ea..2a2f06930 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/MqttRxClient.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/MqttRxClient.java @@ -23,7 +23,6 @@ import com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttSubscribedPublishFlowable; import com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttAckFlowable; import com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttAckSingle; -import com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttAckSingleFlowable; import com.hivemq.client.internal.mqtt.handler.subscribe.MqttSubAckSingle; import com.hivemq.client.internal.mqtt.handler.subscribe.MqttUnsubAckSingle; import com.hivemq.client.internal.mqtt.message.connect.MqttConnect; @@ -49,12 +48,11 @@ import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.Mqtt5Unsubscribe; import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.unsuback.Mqtt5UnsubAck; import com.hivemq.client.rx.FlowableWithSingle; -import io.reactivex.Completable; -import io.reactivex.Flowable; -import io.reactivex.Scheduler; -import io.reactivex.Single; -import io.reactivex.functions.Function; -import io.reactivex.internal.fuseable.ScalarCallable; +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Scheduler; +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.functions.Function; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -214,20 +212,6 @@ public MqttRxClient(final @NotNull MqttClientConfig clientConfig) { final @NotNull Flowable

publishFlowable, final @NotNull Function publishMapper) { final Scheduler applicationScheduler = clientConfig.getExecutorConfig().getApplicationScheduler(); - if (publishFlowable instanceof ScalarCallable) { - //noinspection unchecked - final P publish = ((ScalarCallable

) publishFlowable).call(); - if (publish == null) { - return Flowable.empty(); - } - final MqttPublish mqttPublish; - try { - mqttPublish = publishMapper.apply(publish); - } catch (final Throwable t) { - return Flowable.error(t); - } - return new MqttAckSingleFlowable(clientConfig, mqttPublish).observeOn(applicationScheduler, true); - } return new MqttAckFlowable( clientConfig, publishFlowable.subscribeOn(applicationScheduler).map(publishMapper)).observeOn( applicationScheduler, true); diff --git a/src/main/java/com/hivemq/client/internal/mqtt/exceptions/mqtt3/Mqtt3ExceptionFactory.java b/src/main/java/com/hivemq/client/internal/mqtt/exceptions/mqtt3/Mqtt3ExceptionFactory.java index 6c3083380..63e0a31f8 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/exceptions/mqtt3/Mqtt3ExceptionFactory.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/exceptions/mqtt3/Mqtt3ExceptionFactory.java @@ -24,7 +24,7 @@ import com.hivemq.client.mqtt.mqtt3.exceptions.*; import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5MessageException; import com.hivemq.client.mqtt.mqtt5.message.Mqtt5Message; -import io.reactivex.functions.Function; +import io.reactivex.rxjava3.functions.Function; import org.jetbrains.annotations.NotNull; /** diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/auth/MqttReAuthCompletable.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/auth/MqttReAuthCompletable.java index f8335c314..5d2ac3008 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/handler/auth/MqttReAuthCompletable.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/auth/MqttReAuthCompletable.java @@ -22,9 +22,9 @@ import com.hivemq.client.internal.rx.CompletableFlow; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; -import io.reactivex.Completable; -import io.reactivex.CompletableObserver; -import io.reactivex.internal.disposables.EmptyDisposable; +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.CompletableObserver; +import io.reactivex.rxjava3.internal.disposables.EmptyDisposable; import org.jetbrains.annotations.NotNull; /** diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/connect/MqttConnAckFlow.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/connect/MqttConnAckFlow.java index b6d2215ca..d48c267aa 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/handler/connect/MqttConnAckFlow.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/connect/MqttConnAckFlow.java @@ -17,8 +17,8 @@ package com.hivemq.client.internal.mqtt.handler.connect; import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck; -import io.reactivex.SingleObserver; -import io.reactivex.disposables.Disposable; +import io.reactivex.rxjava3.core.SingleObserver; +import io.reactivex.rxjava3.disposables.Disposable; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/connect/MqttConnAckSingle.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/connect/MqttConnAckSingle.java index 31e8f13c0..9b8ef6627 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/handler/connect/MqttConnAckSingle.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/connect/MqttConnAckSingle.java @@ -31,9 +31,9 @@ import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck; import io.netty.bootstrap.Bootstrap; import io.netty.channel.EventLoop; -import io.reactivex.Single; -import io.reactivex.SingleObserver; -import io.reactivex.internal.disposables.EmptyDisposable; +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.core.SingleObserver; +import io.reactivex.rxjava3.internal.disposables.EmptyDisposable; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/disconnect/MqttDisconnectCompletable.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/disconnect/MqttDisconnectCompletable.java index 6b70bac2e..5dbdd048d 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/handler/disconnect/MqttDisconnectCompletable.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/disconnect/MqttDisconnectCompletable.java @@ -22,9 +22,9 @@ import com.hivemq.client.internal.mqtt.message.disconnect.MqttDisconnect; import com.hivemq.client.internal.rx.CompletableFlow; import io.netty.channel.Channel; -import io.reactivex.Completable; -import io.reactivex.CompletableObserver; -import io.reactivex.internal.disposables.EmptyDisposable; +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.CompletableObserver; +import io.reactivex.rxjava3.internal.disposables.EmptyDisposable; import org.jetbrains.annotations.NotNull; /** diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttGlobalIncomingPublishFlowable.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttGlobalIncomingPublishFlowable.java index 05a71dc17..e57ceaa61 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttGlobalIncomingPublishFlowable.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttGlobalIncomingPublishFlowable.java @@ -21,7 +21,7 @@ import com.hivemq.client.internal.mqtt.ioc.ClientComponent; import com.hivemq.client.mqtt.MqttGlobalPublishFilter; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; -import io.reactivex.Flowable; +import io.reactivex.rxjava3.core.Flowable; import org.jetbrains.annotations.NotNull; import org.reactivestreams.Subscriber; diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlow.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlow.java index a1ed75d3a..a181ee32f 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlow.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlow.java @@ -20,9 +20,9 @@ import com.hivemq.client.internal.mqtt.MqttClientConfig; import com.hivemq.client.internal.mqtt.handler.util.FlowWithEventLoop; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; -import io.reactivex.Emitter; -import io.reactivex.internal.util.BackpressureHelper; -import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.rxjava3.core.Emitter; +import io.reactivex.rxjava3.internal.util.BackpressureHelper; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Subscriber; diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckFlowable.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckFlowable.java index 6c0f4a274..fda8cfea2 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckFlowable.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckFlowable.java @@ -21,8 +21,8 @@ import com.hivemq.client.internal.mqtt.ioc.ClientComponent; import com.hivemq.client.internal.mqtt.message.publish.MqttPublish; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult; -import io.reactivex.Flowable; -import io.reactivex.internal.subscriptions.EmptySubscription; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.internal.subscriptions.EmptySubscription; import org.jetbrains.annotations.NotNull; import org.reactivestreams.Subscriber; diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckFlowableFlow.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckFlowableFlow.java index 44d3ef4fe..40c843fa5 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckFlowableFlow.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckFlowableFlow.java @@ -21,8 +21,8 @@ import com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttPublishFlowableAckLink.LinkedFlow; import com.hivemq.client.internal.mqtt.message.publish.MqttPublishResult; import com.hivemq.client.internal.util.collections.ChunkedArrayQueue; -import io.reactivex.internal.util.BackpressureHelper; -import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.rxjava3.internal.util.BackpressureHelper; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Subscriber; diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckSingle.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckSingle.java index 160d34afa..98c7db0a4 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckSingle.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckSingle.java @@ -23,11 +23,11 @@ import com.hivemq.client.internal.mqtt.message.publish.MqttPublish; import com.hivemq.client.internal.mqtt.message.publish.MqttPublishResult; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult; -import io.reactivex.Flowable; -import io.reactivex.Single; -import io.reactivex.SingleObserver; -import io.reactivex.disposables.Disposable; -import io.reactivex.internal.disposables.EmptyDisposable; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.core.SingleObserver; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.internal.disposables.EmptyDisposable; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckSingleFlowable.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckSingleFlowable.java index 0592e148e..7e3432a84 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckSingleFlowable.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckSingleFlowable.java @@ -23,8 +23,8 @@ import com.hivemq.client.internal.mqtt.message.publish.MqttPublish; import com.hivemq.client.internal.mqtt.message.publish.MqttPublishResult; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult; -import io.reactivex.Flowable; -import io.reactivex.internal.subscriptions.EmptySubscription; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.internal.subscriptions.EmptySubscription; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Subscriber; diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttOutgoingQosHandler.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttOutgoingQosHandler.java index 7eb3e534e..614c9e9fe 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttOutgoingQosHandler.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttOutgoingQosHandler.java @@ -58,8 +58,8 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.EventLoop; -import io.reactivex.Flowable; -import io.reactivex.FlowableSubscriber; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.FlowableSubscriber; import org.jctools.queues.SpscUnboundedArrayQueue; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttPublishFlowableAckLink.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttPublishFlowableAckLink.java index b72298d2c..4eccc672e 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttPublishFlowableAckLink.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttPublishFlowableAckLink.java @@ -19,9 +19,9 @@ import com.hivemq.client.internal.logging.InternalLogger; import com.hivemq.client.internal.logging.InternalLoggerFactory; import com.hivemq.client.internal.mqtt.message.publish.MqttPublish; -import io.reactivex.Flowable; -import io.reactivex.FlowableSubscriber; -import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.FlowableSubscriber; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Subscriber; diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttPublishFlowables.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttPublishFlowables.java index f7586fdc3..da0d1e11e 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttPublishFlowables.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttPublishFlowables.java @@ -19,8 +19,8 @@ import com.hivemq.client.internal.logging.InternalLogger; import com.hivemq.client.internal.logging.InternalLoggerFactory; import com.hivemq.client.internal.mqtt.ioc.ClientScope; -import io.reactivex.Flowable; -import io.reactivex.internal.util.BackpressureHelper; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.internal.util.BackpressureHelper; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Subscriber; diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/subscribe/MqttSubAckSingle.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/subscribe/MqttSubAckSingle.java index ab914a897..344426eae 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/handler/subscribe/MqttSubAckSingle.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/subscribe/MqttSubAckSingle.java @@ -21,8 +21,8 @@ import com.hivemq.client.internal.mqtt.message.subscribe.MqttSubscribe; import com.hivemq.client.internal.mqtt.message.subscribe.suback.MqttSubAck; import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck; -import io.reactivex.Single; -import io.reactivex.SingleObserver; +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.core.SingleObserver; import org.jetbrains.annotations.NotNull; /** diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/subscribe/MqttSubOrUnsubAckFlow.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/subscribe/MqttSubOrUnsubAckFlow.java index 0dad079a3..61e716b2b 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/handler/subscribe/MqttSubOrUnsubAckFlow.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/subscribe/MqttSubOrUnsubAckFlow.java @@ -18,8 +18,8 @@ import com.hivemq.client.internal.mqtt.MqttClientConfig; import com.hivemq.client.internal.mqtt.handler.util.FlowWithEventLoop; -import io.reactivex.SingleObserver; -import io.reactivex.disposables.Disposable; +import io.reactivex.rxjava3.core.SingleObserver; +import io.reactivex.rxjava3.disposables.Disposable; import org.jetbrains.annotations.NotNull; /** diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/subscribe/MqttUnsubAckSingle.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/subscribe/MqttUnsubAckSingle.java index 197a87716..47836cdd5 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/handler/subscribe/MqttUnsubAckSingle.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/subscribe/MqttUnsubAckSingle.java @@ -21,8 +21,8 @@ import com.hivemq.client.internal.mqtt.message.unsubscribe.MqttUnsubscribe; import com.hivemq.client.internal.mqtt.message.unsubscribe.unsuback.MqttUnsubAck; import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.unsuback.Mqtt5UnsubAck; -import io.reactivex.Single; -import io.reactivex.SingleObserver; +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.core.SingleObserver; import org.jetbrains.annotations.NotNull; /** diff --git a/src/main/java/com/hivemq/client/internal/mqtt/message/connect/connack/mqtt3/Mqtt3ConnAckView.java b/src/main/java/com/hivemq/client/internal/mqtt/message/connect/connack/mqtt3/Mqtt3ConnAckView.java index 161315ec9..fd11ab487 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/message/connect/connack/mqtt3/Mqtt3ConnAckView.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/message/connect/connack/mqtt3/Mqtt3ConnAckView.java @@ -24,7 +24,7 @@ import com.hivemq.client.mqtt.mqtt3.message.connect.connack.Mqtt3ConnAckReturnCode; import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck; import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAckReasonCode; -import io.reactivex.functions.Function; +import io.reactivex.rxjava3.functions.Function; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/src/main/java/com/hivemq/client/internal/mqtt/message/publish/mqtt3/Mqtt3PublishResultView.java b/src/main/java/com/hivemq/client/internal/mqtt/message/publish/mqtt3/Mqtt3PublishResultView.java index 938ccaa22..e17ffe627 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/message/publish/mqtt3/Mqtt3PublishResultView.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/message/publish/mqtt3/Mqtt3PublishResultView.java @@ -21,7 +21,7 @@ import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish; import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3PublishResult; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult; -import io.reactivex.functions.Function; +import io.reactivex.rxjava3.functions.Function; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/src/main/java/com/hivemq/client/internal/mqtt/message/publish/mqtt3/Mqtt3PublishView.java b/src/main/java/com/hivemq/client/internal/mqtt/message/publish/mqtt3/Mqtt3PublishView.java index e5861a92a..c68d54f98 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/message/publish/mqtt3/Mqtt3PublishView.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/message/publish/mqtt3/Mqtt3PublishView.java @@ -27,7 +27,7 @@ import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5WillPublish; -import io.reactivex.functions.Function; +import io.reactivex.rxjava3.functions.Function; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/src/main/java/com/hivemq/client/internal/mqtt/message/subscribe/suback/mqtt3/Mqtt3SubAckView.java b/src/main/java/com/hivemq/client/internal/mqtt/message/subscribe/suback/mqtt3/Mqtt3SubAckView.java index 940507d6c..2c98752ed 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/message/subscribe/suback/mqtt3/Mqtt3SubAckView.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/message/subscribe/suback/mqtt3/Mqtt3SubAckView.java @@ -24,7 +24,7 @@ import com.hivemq.client.mqtt.mqtt3.message.subscribe.suback.Mqtt3SubAckReturnCode; import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck; import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAckReasonCode; -import io.reactivex.functions.Function; +import io.reactivex.rxjava3.functions.Function; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/src/main/java/com/hivemq/client/internal/mqtt/mqtt3/Mqtt3RxClientView.java b/src/main/java/com/hivemq/client/internal/mqtt/mqtt3/Mqtt3RxClientView.java index b85701b6f..bafadb9ee 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/mqtt3/Mqtt3RxClientView.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/mqtt3/Mqtt3RxClientView.java @@ -47,10 +47,10 @@ import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult; import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck; import com.hivemq.client.rx.FlowableWithSingle; -import io.reactivex.Completable; -import io.reactivex.Flowable; -import io.reactivex.Single; -import io.reactivex.functions.Function; +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.functions.Function; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/src/main/java/com/hivemq/client/internal/rx/CompletableFlow.java b/src/main/java/com/hivemq/client/internal/rx/CompletableFlow.java index c6acc42ee..d9d3b1a34 100644 --- a/src/main/java/com/hivemq/client/internal/rx/CompletableFlow.java +++ b/src/main/java/com/hivemq/client/internal/rx/CompletableFlow.java @@ -16,8 +16,8 @@ package com.hivemq.client.internal.rx; -import io.reactivex.CompletableObserver; -import io.reactivex.disposables.Disposable; +import io.reactivex.rxjava3.core.CompletableObserver; +import io.reactivex.rxjava3.disposables.Disposable; import org.jetbrains.annotations.NotNull; /** diff --git a/src/main/java/com/hivemq/client/internal/rx/RxFutureConverter.java b/src/main/java/com/hivemq/client/internal/rx/RxFutureConverter.java index fa15aff6d..2ed3a3957 100644 --- a/src/main/java/com/hivemq/client/internal/rx/RxFutureConverter.java +++ b/src/main/java/com/hivemq/client/internal/rx/RxFutureConverter.java @@ -16,8 +16,13 @@ package com.hivemq.client.internal.rx; -import io.reactivex.*; -import io.reactivex.disposables.Disposable; +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.CompletableObserver; +import io.reactivex.rxjava3.core.Maybe; +import io.reactivex.rxjava3.core.MaybeObserver; +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.core.SingleObserver; +import io.reactivex.rxjava3.disposables.Disposable; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/src/main/java/com/hivemq/client/internal/rx/WithSingleConditionalSubscriber.java b/src/main/java/com/hivemq/client/internal/rx/WithSingleConditionalSubscriber.java index a874ad7c2..07c6b997e 100644 --- a/src/main/java/com/hivemq/client/internal/rx/WithSingleConditionalSubscriber.java +++ b/src/main/java/com/hivemq/client/internal/rx/WithSingleConditionalSubscriber.java @@ -17,7 +17,7 @@ package com.hivemq.client.internal.rx; import com.hivemq.client.rx.FlowableWithSingleSubscriber; -import io.reactivex.internal.fuseable.ConditionalSubscriber; +import io.reactivex.rxjava3.operators.ConditionalSubscriber; /** * @author Silvio Giebl diff --git a/src/main/java/com/hivemq/client/internal/rx/WithSingleStrictSubscriber.java b/src/main/java/com/hivemq/client/internal/rx/WithSingleStrictSubscriber.java index 587b7a840..31aa2b257 100644 --- a/src/main/java/com/hivemq/client/internal/rx/WithSingleStrictSubscriber.java +++ b/src/main/java/com/hivemq/client/internal/rx/WithSingleStrictSubscriber.java @@ -18,8 +18,8 @@ import com.hivemq.client.rx.FlowableWithSingleSubscriber; import com.hivemq.client.rx.reactivestreams.WithSingleSubscriber; -import io.reactivex.internal.subscriptions.SubscriptionHelper; -import io.reactivex.internal.util.BackpressureHelper; +import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper; +import io.reactivex.rxjava3.internal.util.BackpressureHelper; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Subscription; diff --git a/src/main/java/com/hivemq/client/internal/rx/operators/FlowableWithSingleCombine.java b/src/main/java/com/hivemq/client/internal/rx/operators/FlowableWithSingleCombine.java index 2a6485a4f..d1c7a896e 100644 --- a/src/main/java/com/hivemq/client/internal/rx/operators/FlowableWithSingleCombine.java +++ b/src/main/java/com/hivemq/client/internal/rx/operators/FlowableWithSingleCombine.java @@ -20,9 +20,9 @@ import com.hivemq.client.rx.FlowableWithSingle; import com.hivemq.client.rx.FlowableWithSingleSubscriber; import com.hivemq.client.rx.reactivestreams.WithSingleSubscriber; -import io.reactivex.Flowable; -import io.reactivex.internal.fuseable.ConditionalSubscriber; -import io.reactivex.internal.util.BackpressureHelper; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.operators.ConditionalSubscriber; +import io.reactivex.rxjava3.internal.util.BackpressureHelper; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Subscriber; diff --git a/src/main/java/com/hivemq/client/internal/rx/operators/FlowableWithSingleMap.java b/src/main/java/com/hivemq/client/internal/rx/operators/FlowableWithSingleMap.java index 1c26f0646..fa84faeff 100644 --- a/src/main/java/com/hivemq/client/internal/rx/operators/FlowableWithSingleMap.java +++ b/src/main/java/com/hivemq/client/internal/rx/operators/FlowableWithSingleMap.java @@ -21,9 +21,9 @@ import com.hivemq.client.rx.FlowableWithSingle; import com.hivemq.client.rx.FlowableWithSingleSubscriber; import com.hivemq.client.rx.reactivestreams.WithSingleSubscriber; -import io.reactivex.exceptions.Exceptions; -import io.reactivex.functions.Function; -import io.reactivex.internal.fuseable.ConditionalSubscriber; +import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.functions.Function; +import io.reactivex.rxjava3.operators.ConditionalSubscriber; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Subscriber; diff --git a/src/main/java/com/hivemq/client/internal/rx/operators/FlowableWithSingleMapError.java b/src/main/java/com/hivemq/client/internal/rx/operators/FlowableWithSingleMapError.java index f27ba8cbb..1e6e9ca80 100644 --- a/src/main/java/com/hivemq/client/internal/rx/operators/FlowableWithSingleMapError.java +++ b/src/main/java/com/hivemq/client/internal/rx/operators/FlowableWithSingleMapError.java @@ -21,11 +21,11 @@ import com.hivemq.client.rx.FlowableWithSingle; import com.hivemq.client.rx.FlowableWithSingleSubscriber; import com.hivemq.client.rx.reactivestreams.WithSingleSubscriber; -import io.reactivex.FlowableSubscriber; -import io.reactivex.exceptions.CompositeException; -import io.reactivex.exceptions.Exceptions; -import io.reactivex.functions.Function; -import io.reactivex.internal.fuseable.ConditionalSubscriber; +import io.reactivex.rxjava3.core.FlowableSubscriber; +import io.reactivex.rxjava3.exceptions.CompositeException; +import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.functions.Function; +import io.reactivex.rxjava3.operators.ConditionalSubscriber; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Subscriber; diff --git a/src/main/java/com/hivemq/client/internal/rx/operators/FlowableWithSingleObserveOn.java b/src/main/java/com/hivemq/client/internal/rx/operators/FlowableWithSingleObserveOn.java index 0394f62da..3fcec3107 100644 --- a/src/main/java/com/hivemq/client/internal/rx/operators/FlowableWithSingleObserveOn.java +++ b/src/main/java/com/hivemq/client/internal/rx/operators/FlowableWithSingleObserveOn.java @@ -18,7 +18,7 @@ import com.hivemq.client.rx.FlowableWithSingle; import com.hivemq.client.rx.reactivestreams.WithSingleSubscriber; -import io.reactivex.Scheduler; +import io.reactivex.rxjava3.core.Scheduler; import org.jetbrains.annotations.NotNull; import org.reactivestreams.Subscriber; diff --git a/src/main/java/com/hivemq/client/mqtt/MqttClientExecutorConfig.java b/src/main/java/com/hivemq/client/mqtt/MqttClientExecutorConfig.java index 5b98e764f..560b725dd 100644 --- a/src/main/java/com/hivemq/client/mqtt/MqttClientExecutorConfig.java +++ b/src/main/java/com/hivemq/client/mqtt/MqttClientExecutorConfig.java @@ -18,8 +18,8 @@ import com.hivemq.client.annotations.DoNotImplement; import com.hivemq.client.internal.mqtt.MqttClientExecutorConfigImplBuilder; -import io.reactivex.Scheduler; -import io.reactivex.schedulers.Schedulers; +import io.reactivex.rxjava3.core.Scheduler; +import io.reactivex.rxjava3.schedulers.Schedulers ; import org.jetbrains.annotations.NotNull; import java.util.Optional; diff --git a/src/main/java/com/hivemq/client/mqtt/MqttClientExecutorConfigBuilderBase.java b/src/main/java/com/hivemq/client/mqtt/MqttClientExecutorConfigBuilderBase.java index 1ac5696e0..e28ac8a32 100644 --- a/src/main/java/com/hivemq/client/mqtt/MqttClientExecutorConfigBuilderBase.java +++ b/src/main/java/com/hivemq/client/mqtt/MqttClientExecutorConfigBuilderBase.java @@ -18,7 +18,7 @@ import com.hivemq.client.annotations.CheckReturnValue; import com.hivemq.client.annotations.DoNotImplement; -import io.reactivex.Scheduler; +import io.reactivex.rxjava3.core.Scheduler; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/src/main/java/com/hivemq/client/mqtt/mqtt3/Mqtt3RxClient.java b/src/main/java/com/hivemq/client/mqtt/mqtt3/Mqtt3RxClient.java index 380bbd564..b80b5842b 100644 --- a/src/main/java/com/hivemq/client/mqtt/mqtt3/Mqtt3RxClient.java +++ b/src/main/java/com/hivemq/client/mqtt/mqtt3/Mqtt3RxClient.java @@ -30,9 +30,9 @@ import com.hivemq.client.mqtt.mqtt3.message.unsubscribe.Mqtt3Unsubscribe; import com.hivemq.client.mqtt.mqtt3.message.unsubscribe.Mqtt3UnsubscribeBuilder; import com.hivemq.client.rx.FlowableWithSingle; -import io.reactivex.Completable; -import io.reactivex.Flowable; -import io.reactivex.Single; +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Single; import org.jetbrains.annotations.NotNull; /** diff --git a/src/main/java/com/hivemq/client/mqtt/mqtt5/Mqtt5RxClient.java b/src/main/java/com/hivemq/client/mqtt/mqtt5/Mqtt5RxClient.java index 9fae6f419..dd30a0a41 100644 --- a/src/main/java/com/hivemq/client/mqtt/mqtt5/Mqtt5RxClient.java +++ b/src/main/java/com/hivemq/client/mqtt/mqtt5/Mqtt5RxClient.java @@ -33,9 +33,9 @@ import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.Mqtt5UnsubscribeBuilder; import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.unsuback.Mqtt5UnsubAck; import com.hivemq.client.rx.FlowableWithSingle; -import io.reactivex.Completable; -import io.reactivex.Flowable; -import io.reactivex.Single; +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Single; import org.jetbrains.annotations.NotNull; /** diff --git a/src/main/java/com/hivemq/client/rx/FlowableWithSingle.java b/src/main/java/com/hivemq/client/rx/FlowableWithSingle.java index 5eb5de2be..03f94a6ef 100644 --- a/src/main/java/com/hivemq/client/rx/FlowableWithSingle.java +++ b/src/main/java/com/hivemq/client/rx/FlowableWithSingle.java @@ -24,14 +24,14 @@ import com.hivemq.client.internal.util.Checks; import com.hivemq.client.rx.reactivestreams.PublisherWithSingle; import com.hivemq.client.rx.reactivestreams.WithSingleSubscriber; -import io.reactivex.Flowable; -import io.reactivex.Scheduler; -import io.reactivex.annotations.BackpressureKind; -import io.reactivex.annotations.BackpressureSupport; -import io.reactivex.annotations.SchedulerSupport; -import io.reactivex.functions.Action; -import io.reactivex.functions.Consumer; -import io.reactivex.functions.Function; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Scheduler; +import io.reactivex.rxjava3.annotations.BackpressureKind; +import io.reactivex.rxjava3.annotations.BackpressureSupport; +import io.reactivex.rxjava3.annotations.SchedulerSupport; +import io.reactivex.rxjava3.functions.Action; +import io.reactivex.rxjava3.functions.Consumer; +import io.reactivex.rxjava3.functions.Function; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Subscriber; diff --git a/src/main/java/com/hivemq/client/rx/FlowableWithSingleSubscriber.java b/src/main/java/com/hivemq/client/rx/FlowableWithSingleSubscriber.java index 2f54963b6..42a7e4e96 100644 --- a/src/main/java/com/hivemq/client/rx/FlowableWithSingleSubscriber.java +++ b/src/main/java/com/hivemq/client/rx/FlowableWithSingleSubscriber.java @@ -17,7 +17,7 @@ package com.hivemq.client.rx; import com.hivemq.client.rx.reactivestreams.WithSingleSubscriber; -import io.reactivex.FlowableSubscriber; +import io.reactivex.rxjava3.core.FlowableSubscriber; /** * Represents a Reactive-Streams inspired {@link WithSingleSubscriber} that is RxJava 2 only and weakens rules for diff --git a/src/test/java/com/hivemq/client/example/Mqtt3ClientExample.java b/src/test/java/com/hivemq/client/example/Mqtt3ClientExample.java index 1517ab22d..54b77ccb4 100644 --- a/src/test/java/com/hivemq/client/example/Mqtt3ClientExample.java +++ b/src/test/java/com/hivemq/client/example/Mqtt3ClientExample.java @@ -29,9 +29,9 @@ import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3PublishResult; import com.hivemq.client.mqtt.mqtt3.message.subscribe.Mqtt3Subscribe; import com.hivemq.client.mqtt.mqtt3.message.subscribe.Mqtt3Subscription; -import io.reactivex.Completable; -import io.reactivex.Flowable; -import io.reactivex.Single; +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Single; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/src/test/java/com/hivemq/client/internal/mqtt/mqtt3/Mqtt3RxClientViewExceptionsTest.java b/src/test/java/com/hivemq/client/internal/mqtt/mqtt3/Mqtt3RxClientViewExceptionsTest.java index 1b2a9fb41..09a7d08cf 100644 --- a/src/test/java/com/hivemq/client/internal/mqtt/mqtt3/Mqtt3RxClientViewExceptionsTest.java +++ b/src/test/java/com/hivemq/client/internal/mqtt/mqtt3/Mqtt3RxClientViewExceptionsTest.java @@ -32,9 +32,9 @@ import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck; import com.hivemq.client.rx.FlowableWithSingleSplit; -import io.reactivex.Completable; -import io.reactivex.Flowable; -import io.reactivex.Single; +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Single; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; diff --git a/src/test/java/com/hivemq/client/internal/rx/RxFutureConverterTest.java b/src/test/java/com/hivemq/client/internal/rx/RxFutureConverterTest.java index 7c9b0df71..f61ffb8d8 100644 --- a/src/test/java/com/hivemq/client/internal/rx/RxFutureConverterTest.java +++ b/src/test/java/com/hivemq/client/internal/rx/RxFutureConverterTest.java @@ -16,9 +16,14 @@ package com.hivemq.client.internal.rx; -import io.reactivex.*; -import io.reactivex.disposables.Disposable; -import io.reactivex.schedulers.Schedulers; +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.CompletableObserver; +import io.reactivex.rxjava3.core.Maybe; +import io.reactivex.rxjava3.core.MaybeObserver; +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.core.SingleObserver; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.schedulers.Schedulers; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.Test; diff --git a/src/test/java/com/hivemq/client/rx/FlowableWithSingleItem.java b/src/test/java/com/hivemq/client/rx/FlowableWithSingleItem.java index 2018e9d7e..ca84cd597 100644 --- a/src/test/java/com/hivemq/client/rx/FlowableWithSingleItem.java +++ b/src/test/java/com/hivemq/client/rx/FlowableWithSingleItem.java @@ -17,8 +17,8 @@ package com.hivemq.client.rx; import com.hivemq.client.rx.reactivestreams.WithSingleSubscriber; -import io.reactivex.Flowable; -import io.reactivex.FlowableSubscriber; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.FlowableSubscriber; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Subscriber; diff --git a/src/test/java/com/hivemq/client/rx/FlowableWithSingleSplit.java b/src/test/java/com/hivemq/client/rx/FlowableWithSingleSplit.java index f13683c92..013ead311 100644 --- a/src/test/java/com/hivemq/client/rx/FlowableWithSingleSplit.java +++ b/src/test/java/com/hivemq/client/rx/FlowableWithSingleSplit.java @@ -17,7 +17,7 @@ package com.hivemq.client.rx; import com.hivemq.client.rx.reactivestreams.WithSingleSubscriber; -import io.reactivex.Flowable; +import io.reactivex.rxjava3.core.Flowable; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Subscriber; diff --git a/src/test/java/com/hivemq/client/rx/FlowableWithSingleTest.java b/src/test/java/com/hivemq/client/rx/FlowableWithSingleTest.java index 1ea70e94d..38aa4a43e 100644 --- a/src/test/java/com/hivemq/client/rx/FlowableWithSingleTest.java +++ b/src/test/java/com/hivemq/client/rx/FlowableWithSingleTest.java @@ -18,9 +18,9 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.hivemq.client.rx.reactivestreams.WithSingleSubscriber; -import io.reactivex.BackpressureStrategy; -import io.reactivex.Flowable; -import io.reactivex.schedulers.Schedulers; +import io.reactivex.rxjava3.core.BackpressureStrategy; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.schedulers.Schedulers; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.function.ThrowingSupplier; @@ -157,7 +157,7 @@ void observeOnBoth_delayError() throws InterruptedException { .assertValueAt(2, "next2") .await() .assertError(IllegalArgumentException.class) - .assertErrorMessage("test"); + .assertError(ex -> ex.getMessage().equals("test")); executorService.shutdown(); } @@ -196,7 +196,7 @@ void observeOnBoth_delayError_bufferSize() throws InterruptedException { .assertValueCount(1024) .await() .assertError(IllegalArgumentException.class) - .assertErrorMessage("test"); + .assertError(ex -> ex.getMessage().equals("test")); executorService.shutdown(); }