↑ Back to Project Reactor Notes
← Home
A hands on intro course developed by Project Reactor
🔗 Link to course
Introduction to Reactive Programming
Flux
Mono
StepVerifier
Transform
Merge
Request
Error
Adapt
Other Operations
Reactive to Blocking
Blocking to Reactive
Each topic has a set of exercises, if utilizing the course they will appear on each topics page. If you are utilizing just the notebook here are links to the GitHub repo conatining the exercises and solutions.
🔗 Intro to Reactor Topics Exercises
🔗 Intro to Reactor Topics Solutions
Reactor 3 is a library built around the Reactive Streams specification
The goal behind Reactive Programming is to be fully asynchronous and non-blocking in a more readable and maintainable manner than Callback based APIs adn Future types
- Publisher (the source) produces data
- Does nothing until the Subscriber has subscribed (registered)
- Subscriber consumes the data
- Data is pushed to the Subscriber via the Publisher
Reactor adds operators to the Reactive Stream Sequence. Operators are chained together to describe what processing to apply at each stage to the data.
Apply an operator returns a new intermediate Publisher
- Subscriber to the operator upstream
- Publisher to the operator downstream
The final form of the data ends up in the final Subscriber that defines what to do from a user perspective.
Needed for creating a Reactive Stream Publisher or Subscriber
- Code must comply with the spec and pass TCK
- Favor using an existing library like Reactor 3, RxJava, Akka Streams, Vert.x or Ratpack
Flux is a Reactive Streams Publisher that can be used to generate, transform and orchestrate Flux sequences via operators
Flux can emit 0 to n elements via onNext
events
- Completes via
onComplete
terminal events - Errors via
onError
terminal events
Note: if no terminal event is triggered, the Flux is infinite
Flux.fromIterable(getSomeLongList())
.delayElements(Duration.ofMillis(100))
.doOnNext(serviceA::someObserver)
.map(d -> d * 2)
.take(3)
.onErrorResumeWith(errorHandler::fallback)
.doAfterTerminate(serviceM::incrementTerminate)
.subscribe(System.out::println);
Mono is a Reactive Streams Publisher that can be used to generate, transform and orchestrate Mono sequences via operators
It is a specialization of Flux that can emit at most 1 element
- Valued (complete with element)
- Empty (complete without element)
- Failed (error)
Mono<Void>
is something that can be utilized when only the completion signal is interesting
Mono.firstWithValue(
Mono.just(1).map(integer -> "foo" + integer),
Mono.delay(Duration.ofMillis(100)).thenReturn("bar")
)
.subscribe(System.out::println);
A StepVerifier
comes from the reactor-test
artifact and is capable of subscribing to any Publisher and assert expectations.
This is something that would be utilized with unit testing.
An instance of StepVerifier
can be created via .create()
, configured via a DSL
for setting expectations, and finish with a single terminal expectation (completion, error, cancellation...)
When utilizing StepVerifier
some form of verify()
method needs to be used. If it is not used, the StepVerifier
won't
subscribe to the sequence and nothing will be asserted.
StepVerifier.create(T<Publisher>).{expectations...}.verify()
Reactor has several operators that can be used to transform data. This means that when the subscriber receives data, it then can take that data and transform it into something else.
Merging sequences is utilized when listening for values from multiple Publishers, merging the data retrieved and returning a single Flux.
Note: Examples of ways to merge data from multiple publishers can be found using the link above
and looking for the bullet point stating "I want to combine publishers..."
Backpressure a feedback mechanism that allows a Subscriber to signal to its Publisher how much data it is prepared to process, limiting the rate at which the Publisher produces data.
AKA: A way that the Subscriber can inform the Publisher how much data it can consume
Backpressure is configured at the Subscription level.
subscribe()
- creates a Subscription
cancel()
- cancels the flow of data
request(long)
- tunes demand of data
Request Example: request(Long.MAX_VALUE)
- Publisher will emit data at its fastest pace due to request demand is essentially unbound.
🔗 Backpressure Overview
🔗 Subscribe Method Examples
🔗 Peeking into a Sequence
Reactor ships with several tools that can be used to handle, recover from and even retry a new Subscription. The main goal with handling errors still stands, catch them and handle them gracefully.
🔗 Error Handling Operators Overview
🔗 Handling Errors Overview
Reactor 3 has the ability to interact with RxJava3 without having to utilize a library inbetween to translate. This can help with projects that are utilizing RxJava3 to leverage Reactor 3 with less complexity and re-work.
// Adapt Flux to RxJava Flowable
Flowable<User> fromFluxToFlowable(Flux<User> flux) {
return Flowable.fromPublisher(flux);
}
// Adapt RxJava Flowable to Flux
Flux<User> fromFlowableToFlux(Flowable<User> flowable) {
return Flux.from(flowable);
}
🔗 Flux.from()
🔗 Flowable.fromPublisher()
// Adapt Flux to RxJava Observable
Observable<User> fromFluxToObservable(Flux<User> flux) {
return Flowable.fromPublisher(flux).toObservable();
}
// Adapt RxJava Observable to Flux
Flux<User> fromObservableToFlux(Observable<User> observable) {
return Flux.from(observable.toFlowable(BackpressureStrategy.BUFFER));
}
🔗 Observable
🔗 Flowable.toObservable()
🔗 Observable.toFlowable()
🔗 BackpressureStrategy
// Adapt Mono to RxJava Single
Single<User> fromMonoToSingle(Mono<User> mono) {
return Single.fromPublisher(mono);
}
// Adapt RxJava Single to Mono
Mono<User> fromSingleToMono(Single<User> single) {
return Mono.from(Flowable.fromSingle(single));
}
🔗 Single
🔗 Mono.from()
🔗 Flowable.fromSingle()
🔗 Single.fromPublisher()
// Adapt Mono to Java 8+ CompletableFuture
CompletableFuture<User> fromMonoToCompletableFuture(Mono<User> mono) {
return mono.toFuture();
}
// Adapt Java 8+ CompletableFuture to Mono
Mono<User> fromCompletableFutureToMono(CompletableFuture<User> future) {
return Mono.fromFuture(future);
}
🔗 Mono.toFuture
🔗 Mono.fromFuture
Reactor 3 has a wide variety of other operations under it's toolbelt outside of what was already covered.
In this example you can see that the Flux.zip()
operator takes in the 3 Flux<Strings>
that are passed into the function.
Following the Flux.zip()
operator a .flatMap()
is utilized to create a new Flux<User>
to be returned. When creating the User there are arguements using .getT1()
and then 'T' followed by a different number. This is how to retreve information from a Tuple which is what is returned by the .zip()
operator.
// Create a Flux of user from Flux of username, firstname and lastname.
Flux<User> userFluxFromStringFlux(Flux<String> usernameFlux, Flux<String> firstnameFlux, Flux<String> lastnameFlux) {
return Flux.zip(
usernameFlux,
firstnameFlux,
lastnameFlux)
.flatMap(
info ->
Flux.just(
new User(
info.getT1(),
info.getT2(),
info.getT3())));
}
In this example the goal is to retrieve a Mono
that returns it's value first or faster. Utilizing the Mono.firstWithValue()
operator it will return whichever provided Mono
returns its value fastest.
// Return the mono which returns its value faster
Mono<User> useFastestMono(Mono<User> mono1, Mono<User> mono2) {
return Mono.firstWithValue(mono1, mono2);
}
Similar to the example above - the goal is to retrieve the first or fastest element returned by a Flux
. Utilizing the Flux.firstWithValue()
operator it will select which Flux
is the fastest or first to emit a value.
// Return the flux which returns the first value faster
Flux<User> useFastestFlux(Flux<User> flux1, Flux<User> flux2) {
return Flux.firstWithValue(flux1, flux2);
}
🔗Flux.ignoreElements()
🔗Flux.then()
In this example there is a scenario where you're not interested in the elements of a Flux
, but want the completion of the sequence represented as a Mono<Void>
. This is accomplished by utilizing the Flux.then()
operator.
// Convert the input Flux<User> to a Mono<Void> that represents the complete signal of the flux
Mono<Void> fluxCompletion(Flux<User> flux) {
return flux.then();
}
If there was a need to maintain the Flux
type the operator Flux.ignoreElements()
could be utilized.
Reactor 3 is not a fan of null
values, I mean no one really is. In this example we have a situation where the arguement being passed into the function is nullable. Which means there is a chance it may be null
.
The goal is to return a Mono<User>
however still return a valid Mono
if the argument is null
. Utilizing Mono.justOrEmpty()
is able to accomplish that goal.
// Return a valid Mono of user for null input and non null input user (hint: Reactive Streams do not accept null values)
Mono<User> nullAwareUserToMono(User user) {
return Mono.justOrEmpty(user);
}
🔗Mono.defaultIfEmpty()
🔗Mono.switchIfEmpty()
In this example there is chance that the Mono
argument could be empty. If it is empty, the goal is to have it return a default value instead of an empty Mono
. This is accomplished by utilizing the Mono.defaultIfEmpty()
operator.
// Return the same mono passed as input parameter, expect that it will emit User.SKYLER when empty
Mono<User> emptyToSkyler(Mono<User> mono) {
return mono.defaultIfEmpty(User.SKYLER);
}
There is a similar operator that can be utilized if there was a need for another sequence instead of another value. That operator is Mono.switchIfEmpty()
.
In this example the goal is to collect all values provided by a Flux
into a Mono<List>
containing all the values of the Flux
. To accomplish this the Flux.collectList()
operator is utilized.
// Convert the input Flux<User> to a Mono<List<User>> containing list of collected flux values
Mono<List<User>> fluxCollection(Flux<User> flux) {
return flux.collectList();
}
At times there may be cases where portions of a project will be non-reactive, but could utilize a portion of code that is reactive.
This is something that should be avoided; however, there is a solution available if it's absolutely needed.
In this example a Mono
is being passed into a function that returns a non-reactive type. This is to represent utilizing reactive components with non-reactive components. This is accomplished by the Mono.block()
operator.
// Return the user contained in that Mono
User monoToValue(Mono<User> mono) {
return mono.block();
}
Similar to the example above a Flux
is being passed into a function returning a non-reactive type. This is acccomplished by the Flux.toIterable()
operator.
// Return the users contained in that Flux
Iterable<User> fluxToValues(Flux<User> flux) {
return flux.toIterable();
}
This covers another scenario of non-reactive code interacting with reactive code. The non-reactive code in this case is a BlockingRepository
(example - JDBC connection to a database).
The best approach is to isolate blocking parts of your code into their own execution contex via a Scheduler
. This allows to keep efficiency of the rest of the pipeline high and only creating extra threads when needed.
🔗 Flux.defer()
🔗 Flux.subscribeOn()
🔗 Schedulers.boundedElastic()
In this example a Flux
is created out of a Blocking Repository
. What takes place is that the Flux
is defered until it is suscribed to, which is when the repository completes its operations. This allows a pool of threads that grows on demand.
// Create a Flux for reading all users from the blocking repository deferred until the flux is subscribed, and run it with a bounded elastic scheduler
Flux<User> blockingRepositoryToFlux(BlockingRepository<User> repository) {
return Flux.defer(() -> Flux.fromIterable(repository.findAll()))
.subscribeOn(Schedulers.boundedElastic());
}
In this example a non-reactive repository save is the what's creating a slow Subscriber
. The save is isolated into its own excicution, and the Mono.then()
operator is utilized for knowing if succuss or failure.
// Insert users contained in the Flux parameter in the blocking repository using a bounded elastic scheduler and return a Mono<Void> that signal the end of the operation
Mono<Void> fluxToBlockingRepository(Flux<User> flux, BlockingRepository<User> repository) {
return flux
.publishOn(Schedulers.boundedElastic())
.doOnNext(repository::save)
.then();
}