32
32
import reactor .blockhound .integration .BlockHoundIntegration ;
33
33
import reactor .core .publisher .Flux ;
34
34
import reactor .core .publisher .Mono ;
35
- import rx .RxReactiveStreams ;
36
35
37
36
import org .springframework .lang .Nullable ;
38
37
import org .springframework .util .ClassUtils ;
44
43
* {@code Observable}, and others.
45
44
*
46
45
* <p>By default, depending on classpath availability, adapters are registered
47
- * for Reactor, RxJava 2/3, or RxJava 1 (+ RxJava Reactive Streams bridge),
48
- * {@link CompletableFuture}, Java 9+ {@code Flow.Publisher}, and Kotlin
49
- * Coroutines' {@code Deferred} and {@code Flow}.
50
- *
51
- * <p><strong>Note:</strong> As of Spring Framework 5.3, support for RxJava 1.x
52
- * is deprecated in favor of RxJava 2 and 3.
46
+ * for Reactor, RxJava 2/3, {@link CompletableFuture}, {@code Flow.Publisher},
47
+ * and Kotlin Coroutines' {@code Deferred} and {@code Flow}.
53
48
*
54
49
* @author Rossen Stoyanchev
55
50
* @author Sebastien Deleuze
51
+ * @author Juergen Hoeller
56
52
* @since 5.0
57
53
*/
58
54
public class ReactiveAdapterRegistry {
@@ -62,26 +58,19 @@ public class ReactiveAdapterRegistry {
62
58
63
59
private static final boolean reactorPresent ;
64
60
65
- private static final boolean rxjava1Present ;
66
-
67
61
private static final boolean rxjava2Present ;
68
62
69
63
private static final boolean rxjava3Present ;
70
64
71
- private static final boolean flowPublisherPresent ;
72
-
73
65
private static final boolean kotlinCoroutinesPresent ;
74
66
75
67
private static final boolean mutinyPresent ;
76
68
77
69
static {
78
70
ClassLoader classLoader = ReactiveAdapterRegistry .class .getClassLoader ();
79
71
reactorPresent = ClassUtils .isPresent ("reactor.core.publisher.Flux" , classLoader );
80
- rxjava1Present = ClassUtils .isPresent ("rx.Observable" , classLoader ) &&
81
- ClassUtils .isPresent ("rx.RxReactiveStreams" , classLoader );
82
72
rxjava2Present = ClassUtils .isPresent ("io.reactivex.Flowable" , classLoader );
83
73
rxjava3Present = ClassUtils .isPresent ("io.reactivex.rxjava3.core.Flowable" , classLoader );
84
- flowPublisherPresent = ClassUtils .isPresent ("java.util.concurrent.Flow.Publisher" , classLoader );
85
74
kotlinCoroutinesPresent = ClassUtils .isPresent ("kotlinx.coroutines.reactor.MonoKt" , classLoader );
86
75
mutinyPresent = ClassUtils .isPresent ("io.smallrye.mutiny.Multi" , classLoader );
87
76
}
@@ -97,29 +86,17 @@ public ReactiveAdapterRegistry() {
97
86
// Reactor
98
87
if (reactorPresent ) {
99
88
new ReactorRegistrar ().registerAdapters (this );
89
+ new ReactorJdkFlowAdapterRegistrar ().registerAdapter (this );
100
90
}
101
91
102
- // RxJava1 (deprecated)
103
- if (rxjava1Present ) {
104
- new RxJava1Registrar ().registerAdapters (this );
105
- }
106
-
107
- // RxJava2
92
+ // RxJava
108
93
if (rxjava2Present ) {
109
94
new RxJava2Registrar ().registerAdapters (this );
110
95
}
111
- // RxJava3
112
96
if (rxjava3Present ) {
113
97
new RxJava3Registrar ().registerAdapters (this );
114
98
}
115
99
116
- // Java 9+ Flow.Publisher
117
- if (flowPublisherPresent ) {
118
- new ReactorJdkFlowAdapterRegistrar ().registerAdapter (this );
119
- }
120
- // If not present, do nothing for the time being...
121
- // We can fall back on "reactive-streams-flow-bridge" (once released)
122
-
123
100
// Kotlin Coroutines
124
101
if (reactorPresent && kotlinCoroutinesPresent ) {
125
102
new CoroutinesRegistrar ().registerAdapters (this );
@@ -253,23 +230,14 @@ void registerAdapters(ReactiveAdapterRegistry registry) {
253
230
}
254
231
255
232
256
- private static class RxJava1Registrar {
233
+ private static class ReactorJdkFlowAdapterRegistrar {
257
234
258
- void registerAdapters (ReactiveAdapterRegistry registry ) {
259
- registry .registerReactiveType (
260
- ReactiveTypeDescriptor .multiValue (rx .Observable .class , rx .Observable ::empty ),
261
- source -> RxReactiveStreams .toPublisher ((rx .Observable <?>) source ),
262
- RxReactiveStreams ::toObservable
263
- );
264
- registry .registerReactiveType (
265
- ReactiveTypeDescriptor .singleRequiredValue (rx .Single .class ),
266
- source -> RxReactiveStreams .toPublisher ((rx .Single <?>) source ),
267
- RxReactiveStreams ::toSingle
268
- );
235
+ void registerAdapter (ReactiveAdapterRegistry registry ) {
236
+ Flow .Publisher <?> emptyFlow = JdkFlowAdapter .publisherToFlowPublisher (Flux .empty ());
269
237
registry .registerReactiveType (
270
- ReactiveTypeDescriptor .noValue ( rx . Completable .class , rx . Completable :: complete ),
271
- source -> RxReactiveStreams . toPublisher (( rx . Completable ) source ),
272
- RxReactiveStreams :: toCompletable
238
+ ReactiveTypeDescriptor .multiValue ( Flow . Publisher .class , () -> emptyFlow ),
239
+ source -> JdkFlowAdapter . flowPublisherToFlux (( Flow . Publisher <?> ) source ),
240
+ JdkFlowAdapter :: publisherToFlowPublisher
273
241
);
274
242
}
275
243
}
@@ -347,18 +315,6 @@ void registerAdapters(ReactiveAdapterRegistry registry) {
347
315
}
348
316
}
349
317
350
- private static class ReactorJdkFlowAdapterRegistrar {
351
-
352
- void registerAdapter (ReactiveAdapterRegistry registry ) {
353
- Flow .Publisher <?> emptyFlow = JdkFlowAdapter .publisherToFlowPublisher (Flux .empty ());
354
- registry .registerReactiveType (
355
- ReactiveTypeDescriptor .multiValue (Flow .Publisher .class , () -> emptyFlow ),
356
- source -> JdkFlowAdapter .flowPublisherToFlux ((Flow .Publisher <?>) source ),
357
- JdkFlowAdapter ::publisherToFlowPublisher
358
- );
359
- }
360
- }
361
-
362
318
363
319
/**
364
320
* ReactiveAdapter variant that wraps adapted Publishers as {@link Flux} or
0 commit comments