Reactive Programming with Reactor 3
1. Introduction to Reactive Programming
Reactor 3 is a library built around the Reactive Streams
specification, bringing the paradigm of Reactive Programming on the JVM.
In this course, you'll familiarize with the Reactor API. So let's make a quick introduction to the more general concepts in Reactive Streams and Reactive Programming.
Why
Reactive Programming is a new paradigm in which you use declarative code (in a manner that is similar to functional programming) in order to build asynchronous processing pipelines. It is an event-based model where data is pushed to the consumer, as it becomes available: we deal with asynchronous sequences of events.
This is important in order to be more efficient with resources and increase an application's capacity to serve large number of clients, without the headache of writing low-level concurrent or and/or parallelized code.
By being built around the core pillars of being fully asynchronous and non-blocking, Reactive Programming is an alternative to the more limited ways of doing asynchronous code in the JDK: namely Callback based APIs and Future
.
It also facilitates composition, which in turn makes asynchronous code more readable and maintainable.
Reactive Streams
The Reactive Streams specification is an industry-driven effort to standardize Reactive Programming libraries on the JVM, and more importantly specify how they must behave so that they are interoperable. Implementors include Reactor 3 but also RxJava from version 2 and above, Akka Streams, Vert.x and Ratpack.
It contains 4 very simple interfaces as well as a TCK, which shouldn't be overlooked since it is the rules of the specification that bring the most value to it.
From a user perspective however, it is fairly low-level. Reactor 3 aims at offering an higher level API that can be leverage in a large breadth of situations, building it on top of Reactive Streams Publisher
.
Interactions
In reactive stream sequences, the source Publisher
produces data. But by default, it does nothing until a Subscriber
has registered (subscribed), at which point it will push data to it.
Reactor adds the concept of operators, which are chained together to describe what processing to apply at each stage to the data. Applying an operator returns a new intermediate Publisher
(in fact it can be thought of as both a Subscriber to the operator upstream and a Publisher for downstream). The final form of the data ends up in the final Subscriber
that defines what to do from a user perspective.
Quizz
Publisher<Integer> source = Flux.range(1, 10);
Flux<String> flux = Flux.just("A");
flux.map(s -> "foo" + s);
flux.subscribe(System.out::println);
2. Learn how to create Flux instances
Flux
Description
A Flux<T>
is a Reactive Streams Publisher
, augmented with a lot of operators that can be used to generate, transform, orchestrate Flux sequences.
It can emit 0 to n <T>
elements (onNext
event) then either completes or errors (onComplete
and onError
terminal events). If no terminal event is triggered, the Flux
is infinite.
- Static factories on Flux allow to create sources, or generate them from several callbacks types.
- Instance methods, the operators, let you build an asynchronous processing pipeline that will produce an asynchronous sequence.
- Each
Flux#subscribe()
or multicasting operation such asFlux#publish
andFlux#publishNext
will materialize a dedicated instance of the pipeline and trigger the data flow inside it.
See the javadoc here.
Flux
in action :
Flux.fromIterable(getSomeLongList())
.delayElements(Duration.ofMillis(100))
.doOnNext(serviceA::someObserver)
.map(d -> d * 2)
.take(3)
.onErrorResumeWith(errorHandler::fallback)
.doAfterTerminate(serviceM::incrementTerminate)
.subscribe(System.out::println);
Practice
In this lesson we'll see different factory methods to create a Flux
.
Let's try a very simple example: just return an empty flux.
static <T> Flux<T> empty()
// Create a Flux that completes without emitting any item.
One can also create a Flux
out of readily available data:
static <T> Flux<T> just(T... data)
// Create a new Flux that emits the specified item(s) and then complete.
This time we will use items of a list to publish on the flux with fromIterable
:
static <T> Flux<T> fromIterable(Iterable<? extends T> it)
// Create a Flux that emits the items contained in the provided Iterable.
In imperative synchronous code, it's easy to manage exceptions with familiar try
-catch
blocks, throw
instructions...
But in an asynchronous context, we have to do things a bit differently. Reactive Streams defines the onError
signal to deal with exceptions. Note that such an event is terminal: this is the last event the Flux
will produce.
Flux#error
produces a Flux
that simply emits this signal, terminating immediately:
static <T> Flux<T> error(Throwable error)
// Create a Flux that completes with the specified error.
To finish with Flux
, let's try to create a Flux that produces ten elements, at a regular pace. In order to do that regular publishing, we can use interval
. But it produces an infinite stream (like ticks of a clock), and we want to take
only 10 elements, so don't forget to precise it.
static Flux<Long> interval(Duration period)
// Create a new Flux that emits an ever incrementing long starting with 0 every period on the global timer.
Part01Flux.java
package io.pivotal.literx;
//generic imports to help with simpler IDEs (ie tech.io)
import java.util.*;
import java.util.function.*;
import java.time.*;
import reactor.core.publisher.Flux;
/**
* Learn how to create Flux instances.
*
* @author Sebastien Deleuze
* @see <a href="https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html">Flux Javadoc</a>
*/
public class Part01Flux {
//========================================================================================
// TODO Return an empty Flux
Flux<String> emptyFlux() {
return Flux.empty();
}
//========================================================================================
// TODO Return a Flux that contains 2 values "foo" and "bar" without using an array or a collection
Flux<String> fooBarFluxFromValues() {
return Flux.just("foo","bar");
}
//========================================================================================
// TODO Create a Flux from a List that contains 2 values "foo" and "bar"
Flux<String> fooBarFluxFromList() {
List l = new ArrayList<>();
l.add("foo");
l.add("bar");
return Flux.fromIterable(l);
}
//========================================================================================
// TODO Create a Flux that emits an IllegalStateException
Flux<String> errorFlux() {
return Flux.error(new IllegalStateException());
}
//========================================================================================
// TODO Create a Flux that emits increasing values from 0 to 9 each 100ms
Flux<Long> counter() {
return Flux.interval(Duration.ofMillis(100)).take(10);
}
}
3. Learn how to create Mono instances
Mono
Description
A Mono<T>
is a Reactive Streams Publisher
, also augmented with a lot of operators that can be used to generate, transform, orchestrate Mono sequences.
It is a specialization of Flux
that can emit at most 1 <T>
element: a Mono is either valued (complete with element), empty (complete without element) or failed (error).
A Mono<Void>
can be used in cases where only the completion signal is interesting (the Reactive Streams equivalent of a Runnable
task completing).
Like for Flux
, the operators can be used to define an asynchronous pipeline which will be materialized anew for each Subscription
.
Note that some API that change the sequence's cardinality will return a Flux
(and vice-versa, APIs that reduce the cardinality to 1 in a Flux
return a Mono
).
See the javadoc here.
Mono
in action:
Mono.just(1)
.map(integer -> "foo" + integer)
.or(Mono.delay(Duration.ofMillis(100)))
.subscribe(System.out::println);
Practice
As for the Flux let's return a empty Mono
using the static factory.
static <T> Mono<T> empty()
// Create a Mono that completes without emitting any item.
Now, we will try to create a Mono
which never emits anything. Unlike empty()
, it won't even emit an onComplete
event.
Like Flux
, you can create a Mono
from an available (unique) value.
And exactly as we did for the flux, we can propagate exceptions.
Part02Mono.java
package io.pivotal.literx;
import reactor.core.publisher.Mono;
/**
* Learn how to create Mono instances.
*
* @author Sebastien Deleuze
* @see <a href="https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html">Mono Javadoc</a>
*/
public class Part02Mono {
//========================================================================================
// TODO Return an empty Mono
Mono<String> emptyMono() {
return Mono.empty();
}
//========================================================================================
// TODO Return a Mono that never emits any signal
Mono<String> monoWithNoSignal() {
return Mono.never();
}
//========================================================================================
// TODO Return a Mono that contains a "foo" value
Mono<String> fooMono() {
return Mono.just("foo");
}
//========================================================================================
// TODO Create a Mono that emits an IllegalStateException
Mono<String> errorMono() {
return Mono.error( new IllegalStateException() );
}
}
4. StepVerifier and how to use it
StepVerifier
Description
Until now, your solution for each exercise was checked by passing the Publisher
you defined to a test using a StepVerifier
.
This class from the reactor-test
artifact is capable of subscribing to any Publisher
(eg. a Flux
or an Akka Stream...) and then assert a set of user-defined expectations with regard to the sequence.
If any event is triggered that doesn't match the current expectation, the StepVerifier
will produce an AssertionError
.
You can obtain an instance of StepVerifier
from the static factory create
. It offers a DSL to set up expectations on the data part and finish with a single terminal expectation (completion, error, cancellation...).
Note that you must always call the verify()
method or one of the shortcuts that combine the terminal expectation and verify, like .verifyErrorMessage(String)
. Otherwise the StepVerifier
won't subscribe to your sequence and nothing will be asserted.
StepVerifier.create(T<Publisher>).{expectations...}.verify()
There are a lot of possible expectations, see the reference documentation and the javadoc.
Practice
In these exercises, the methods get a Flux
or Mono
as a parameter and you'll need to test its behavior. You should create a StepVerifier
that uses said Flux/Mono, describes expectations about it and verifies it.
Let's verify the sequence passed to the first test method emits two specific elements, "foo"
and "bar"
, and that the Flux
then completes successfully.
Now, let's do the same test but verifying that an exception is propagated at the end.
Let's try to create a StepVerifier
with an expectation on a User
's getUsername()
getter. Some expectations can work by checking a Predicate
on the next value, or even by consuming the next value by passing it to an assertion library like Assertions.assertThat(T)
from AssertJ
. Try these lambda-based versions (for instance StepVerifier#assertNext
with a lambda using an AssertJ assertion like assertThat(...).isEqualTo(...)
):
On this next test we will receive a Flux
which takes some time to emit. As you can expect, the test will take some time to run.
The next one is even worse: it emits 1 element per second, completing only after having emitted 3600 of them!
Since we don't want our tests to run for hours, we need a way to speed that up while still being able to assert the data itself (eliminating the time factor).
Fortunately, StepVerifier
comes with a virtual time option: by using StepVerifier.withVirtualTime(Supplier<Publisher>)
, the verifier will temporarily replace default core Schedulers
(the component that define the execution context in Reactor). All these default Scheduler
are replaced by a single instance of a VirtualTimeScheduler
, which has a virtual clock that can be manipulated.
In order for the operators to pick up that Scheduler
, you should lazily build your operator chain inside the lambda passed to withVirtualTime
.
You must then advance time as part of your test scenario, by calling either thenAwait(Duration)
or expectNoEvent(Duration)
. The former simply advances the clock, while the later additionally fails if any unexpected event triggers during the provided duration (note that almost all the time there will at least be a "subscription" event even though the clock hasn't advanced, so you should usually put a expectSubscription()
after .withVirtualTime()
if you're going to use expectNoEvent
right after).
StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofHours(3)))
.expectSubscription()
.expectNoEvent(Duration.ofHours(2))
.thenAwait(Duration.ofHours(1))
.expectNextCount(1)
.expectComplete()
.verify();
Let's try that by making a fast test of our hour-long publisher:
Part03StepVerifier.java
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.pivotal.literx;
import java.time.Duration;
import java.util.function.Supplier;
import io.pivotal.literx.domain.User;
import org.assertj.core.api.Assertions;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
/**
* Learn how to use StepVerifier to test Mono, Flux or any other kind of Reactive Streams Publisher.
*
* @author Sebastien Deleuze
* @see <a href="https://projectreactor.io/docs/test/release/api/reactor/test/StepVerifier.html">StepVerifier Javadoc</a>
*/
public class Part03StepVerifier {
//========================================================================================
// TODO Use StepVerifier to check that the flux parameter emits "foo" and "bar" elements then completes successfully.
void expectFooBarComplete(Flux<String> flux) {
StepVerifier.create(flux)
.expectNext("foo", "bar")
.verifyComplete();
}
//========================================================================================
// TODO Use StepVerifier to check that the flux parameter emits "foo" and "bar" elements then a RuntimeException error.
void expectFooBarError(Flux<String> flux) {
StepVerifier.create(flux)
.expectNext("foo", "bar")
.verifyError(RuntimeException.class);
}
//========================================================================================
// TODO Use StepVerifier to check that the flux parameter emits a User with "swhite"username
// and another one with "jpinkman" then completes successfully.
void expectSkylerJesseComplete(Flux<User> flux) {
StepVerifier.create(flux)
.assertNext(t -> Assertions.assertThat(t.getUsername()).isEqualTo("swhite"));
}
//========================================================================================
// TODO Expect 10 elements then complete and notice how long the test takes.
void expect10Elements(Flux<Long> flux) {
StepVerifier.create(flux).expectNextCount(10).verifyComplete();
}
//========================================================================================
// TODO Expect 3600 elements at intervals of 1 second, and verify quicker than 3600s
// by manipulating virtual time thanks to StepVerifier#withVirtualTime, notice how long the test takes
void expect3600Elements(Supplier<Flux<Long>> supplier) {
StepVerifier.withVirtualTime(supplier).expectSubscription()
.expectNoEvent(Duration.ofSeconds(1))
.thenAwait(Duration.ofHours(1))
.expectNextCount(10)
.expectComplete()
.verify();
}
private void fail() {
throw new AssertionError("workshop not implemented");
}
}
5. Learn to transform our asynchronous data
Transform
Description
Reactor ships with several operators that can be used to transform data.
Practice
In the first place, we will capitalize a String
. Since this is a simple 1-1 transformation with no expected latency, we can use the map
operator with a lambda transforming a T
into a U
.
We can use exactly the same code on a Flux
, applying the mapping to each element as it becomes available.
Now imagine that we have to call a webservice to capitalize our String. This new call can have latency so we cannot use the synchronous map
anymore. Instead, we want to represent the asynchronous call as a Flux
or Mono
, and use a different operator: flatMap
.
flatMap
takes a transformation Function
that returns a Publisher<U>
instead of a U
. This publisher represents the asynchronous transformation to apply to each element. If we were using it with map
, we'd obtain a stream of Flux<Publisher<U>>
. Not very useful.
But flatMap
on the other hand knows how to deal with these inner publishers: it will subscribe to them then merge all of them into a single global output, a much more useful Flux<U>
. Note that if values from inner publishers arrive at different times, they can interleave in the resulting Flux
.
Part04Transform.java
package io.pivotal.literx;
import io.pivotal.literx.domain.User;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* Learn how to transform values.
*
* @author Sebastien Deleuze
*/
public class Part04Transform {
//========================================================================================
// TODO Capitalize the user username, firstname and lastname
Mono<User> capitalizeOne(Mono<User> mono) {
return mono.map( t -> new User(
t.getUsername().toUpperCase(),
t.getFirstname().toUpperCase(),
t.getLastname().toUpperCase()));
}
//========================================================================================
// TODO Capitalize the users username, firstName and lastName
Flux<User> capitalizeMany(Flux<User> flux) {
return flux.map( t -> new User(
t.getUsername().toUpperCase(),
t.getFirstname().toUpperCase(),
t.getLastname().toUpperCase()));
}
//========================================================================================
// TODO Capitalize the users username, firstName and lastName using #asyncCapitalizeUser
Flux<User> asyncCapitalizeMany(Flux<User> flux) {
return flux.flatMap( this::asyncCapitalizeUser);
}
Mono<User> asyncCapitalizeUser(User u) {
return Mono.just(new User(u.getUsername().toUpperCase(), u.getFirstname().toUpperCase(), u.getLastname().toUpperCase()));
}
}
6. Merge
Merging sequences is the operation consisting of listening for values from several Publisher
s and emitting them in a single Flux
.
On this first exercise we will begin by merging elements of two Flux
as soon as they arrive. The caveat here is that values from flux1
arrive with a delay, so in the resulting Flux
we start seeing values from flux2
first.
But if we want to keep the order of sources, we can use the concat
operator. Concat will wait for flux1
to complete before it can subscribe to flux2
, ensuring that all the values from flux1
have been emitted, thus preserving an order corresponding to the source.
You can use concat
with several Publisher
. For example, you can get two Mono
and turn them into a same-order Flux
:
Part05Merge.java
package io.pivotal.literx;
import io.pivotal.literx.domain.User;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* Learn how to merge flux.
*
* @author Sebastien Deleuze
*/
public class Part05Merge {
//========================================================================================
// TODO Merge flux1 and flux2 values with interleave
Flux<User> mergeFluxWithInterleave(Flux<User> flux1, Flux<User> flux2) {
return flux1.mergeWith(flux2);
}
//========================================================================================
// TODO Merge flux1 and flux2 values with no interleave (flux1 values and then flux2 values)
Flux<User> mergeFluxWithNoInterleave(Flux<User> flux1, Flux<User> flux2) {
return flux1.concatWith(flux2);
}
//========================================================================================
// TODO Create a Flux containing the value of mono1 then the value of mono2
Flux<User> createFluxFromMultipleMono(Mono<User> mono1, Mono<User> mono2) {
return Flux.concat(mono1, mono2);
}
}
7. Request
Description
Remember this diagram?
There's one aspect to it that we didn't cover: the volume control. In Reactive Streams terms this is called backpressure. It is a feedback mechanism that allows a Subscriber
to signal to its Publisher
how much data it is prepared to process, limiting the rate at which the Publisher
produces data.
This control of the demand is done at the Subscription
level: a Subscription
is created for each subscribe()
call and it can be manipulated to either cancel()
the flow of data or tune demand with request(long)
.
Making a request(Long.MAX_VALUE)
means an unbounded demand, so the Publisher
will emit data at its fastest pace.
Practice
The demand can be tuned in the StepVerifier
as well, by using the relevant parameter to create
and withVirtualTime
for the initial request, then chaining in thenRequest(long)
in your expectations for further requests.
In this first example, create a StepVerifier
that produces an initial unbounded demand and verifies 4 values to be received, before completion. This is equivalent to the way you've been using StepVerifier so far.
Next we will request values one by one: for that you need an initial request, but also a second single request after you've received and asserted the first element.
Without more request, the source will never complete unless you cancel it. This can be done instead of the terminal expectations by using .thenCancel()
.
A note on debugging
How to check that the previous sequence was requested one by one, and that a cancellation happened?
It's important to be able to debug reactive APIs, so in the next example we will make use of the log
operator to know exactly what happens in term of signals and events.
Use the repository
to get a Flux
of all users, then apply a log to it. Observe in the console below how the underlying test requests it, and the other events like subscribe, onNext...
If you want to perform custom actions without really modifying the elements in the sequence, you can use the "side effect" methods that start with doOn
.
For example, if you want to print "Starting:" upon subscription, use doOnSubscribe
.
Each doOn
method takes a relevant callback representing the custom action for the corresponding event.
Note that you should not block or invoke operations with latency in these callbacks (which is also true of other operator callbacks like map
): it's more for quick operations.
Go ahead an modify the first two methods in this exercise in order to get some insight into their sequences using log
and doOnXXX
.
Part06Request.java
package io.pivotal.literx;
import io.pivotal.literx.domain.User;
import io.pivotal.literx.repository.ReactiveRepository;
import io.pivotal.literx.repository.ReactiveUserRepository;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
/**
* Learn how to control the demand.
*
* @author Sebastien Deleuze
*/
public class Part06Request {
ReactiveRepository<User> repository = new ReactiveUserRepository();
//========================================================================================
// TODO Create a StepVerifier that initially requests all values and expect 4 values to be received
StepVerifier requestAllExpectFour(Flux<User> flux) {
return StepVerifier
.create(flux)
.expectSubscription()
.thenRequest(Long.MAX_VALUE)
.expectNextCount(4)
.expectComplete();
}
//========================================================================================
// TODO Create a StepVerifier that initially requests 1 value and expects User.SKYLER then requests another value and expects User.JESSE then stops verifying by cancelling the source
StepVerifier requestOneExpectSkylerThenRequestOneExpectJesse(Flux<User> flux) {
return StepVerifier
.create(flux)
.thenRequest(1).expectNext(User.SKYLER)
.thenRequest(1).expectNext(User.JESSE)
.thenCancel();
}
//========================================================================================
// TODO Return a Flux with all users stored in the repository that prints automatically logs for all Reactive Streams signals
Flux<User> fluxWithLog() {
return repository.findAll().log();
}
//========================================================================================
// TODO Return a Flux with all users stored in the repository that prints "Starring:" on subscribe, "firstname lastname" for all values and "The end!" on complete
Flux<User> fluxWithDoOnPrintln() {
return repository
.findAll()
.doOnSubscribe(s -> System.out.println("Starring"))
.doOnNext(u -> System.out.println(u.getFirstname() + " " + u.getLastname()))
.doOnComplete(() -> System.out.println("The end!"));
}
}
8. Error
Description
Reactor ships with several operators that can be used to deal with errors: propagate errors but also recover from it (eg. by falling back to a different sequence or by retrying a new Subscription
).
Practice
In the first example, we will return a Mono
containing default user Saul when an error occurs in the original Mono
, using the method onErrorReturn
. If you want, you can even limit that fallback to the IllegalStateException
class. Use the User#SAUL
constant.
Let's try the same thing with Flux
. In this case, we don't just want a single fallback value, but a totally separate sequence (think getting stale data from a cache). This can be achieved with onErrorResume
, which falls back to a Publisher<T>
.
Emit bothUser#SAUL
and User#JESSE
whenever there is an error in the original FLux
.
Dealing with checked exceptions is a bit more complicated. Whenever some code that throws checked exceptions is used in an operator (eg. the transformation function of a map
), you will need to deal with it. The most straightforward way is to make a more complex lambda with a try-catch
block that will transform the checked exception into a RuntimeException
that can be signalled downstream.
There is a Exceptions#propagate
utility that will wrap a checked exception into a special runtime exception that can be automatically unwrapped by Reactor subscribers and StepVerifier
: this avoid seeing an irrelevant RuntimeException
in the stacktrace.
Try to use that on the capitalizeMany
method within a map
: you'll need to catch a GetOutOfHereException
, which is checked, but the corresponding test still expects the GetOutOfHereException
directly.
Part07Errors.java
/*
* Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.pivotal.literx;
import io.pivotal.literx.domain.User;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* Learn how to deal with errors.
*
* @author Sebastien Deleuze
* @see Exceptions#propagate(Throwable)
*/
public class Part07Errors {
//========================================================================================
// TODO Return a Mono<User> containing User.SAUL when an error occurs in the input Mono, else do not change the input Mono.
Mono<User> betterCallSaulForBogusMono(Mono<User> mono) {
return mono.onErrorReturn(User.SAUL);
}
//========================================================================================
// TODO Return a Flux<User> containing User.SAUL and User.JESSE when an error occurs in the input Flux, else do not change the input Flux.
Flux<User> betterCallSaulAndJesseForBogusFlux(Flux<User> flux) {
return flux.onErrorResume(e -> Flux.just(User.SAUL, User.JESSE));
}
//========================================================================================
// TODO Implement a method that capitalizes each user of the incoming flux using the
// #capitalizeUser method and emits an error containing a GetOutOfHereException error
Flux<User> capitalizeMany(Flux<User> flux) {
return flux.map(
u -> {
try {
return capitalizeUser(u);
} catch (GetOutOfHereException e) {
throw Exceptions.propagate(e);
}
}
);
}
User capitalizeUser(User user) throws GetOutOfHereException {
if (user.equals(User.SAUL)) {
throw new GetOutOfHereException();
}
return new User(user.getUsername(), user.getFirstname(), user.getLastname());
}
protected final class GetOutOfHereException extends Exception {
private static final long serialVersionUID = 0L;
}
}
9. Adapt
You can make RxJava3 and Reactor 3 types interact without a single external library.
In the first two examples we will adapt from Flux
to Flowable
, which implements Publisher
, and vice-versa.
This is straightforward as both libraries provide a factory method to do that conversion from any Publisher
. The checker below runs the two opposite conversions in one go.
The next two examples are a little trickier: we need to adapt between Flux
and Observable
, but the later doesn't implement Publisher
.
In the first case, you can transform any publisher to Observable
. In the second case, you have to first transform the Observable
into a Flowable
, which forces you to define a strategy to deal with backpressure (RxJava 3 Observable
doesn't support backpressure).
Next, let's try to transform a Mono
to a RxJava Single
, and vice-versa. You can simply call the firstOrError
method from Observable
. For the other way around, you'll once again need to transform the Single
into a Flowable
first.
Finally, you can easily transform a Mono
to a Java 8 CompletableFuture
and vice-versa. Notice how these conversion methods all begin with from
(when converting an external type to a Reactor one) and to
(when converting a Reactor type to an external one).
Part09Adapt.java
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.pivotal.literx;
import java.util.concurrent.CompletableFuture;
import io.pivotal.literx.domain.User;
import io.reactivex.rxjava3.core.BackpressureStrategy;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Single;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* Learn how to adapt from/to RxJava 3 Observable/Single/Flowable and Java 8+ CompletableFuture.
*
* Mono and Flux already implements Reactive Streams interfaces so they are natively
* Reactive Streams compliant + there are {@link Mono#from(Publisher)} and {@link Flux#from(Publisher)}
* factory methods.
*
* For RxJava 3, you should not use Reactor Adapter but only RxJava 3 and Reactor Core.
*
* @author Sebastien Deleuze
*/
public class Part09Adapt {
//========================================================================================
// TODO Adapt Flux to RxJava Flowable
Flowable<User> fromFluxToFlowable(Flux<User> flux) {
return Flowable.fromPublisher(flux);
}
// TODO Adapt RxJava Flowable to Flux
Flux<User> fromFlowableToFlux(Flowable<User> flowable) {
return Flux.from(flowable);
}
//========================================================================================
// TODO Adapt Flux to RxJava Observable
Observable<User> fromFluxToObservable(Flux<User> flux) {
return Observable.fromPublisher(flux);
}
// TODO Adapt RxJava Observable to Flux
Flux<User> fromObservableToFlux(Observable<User> observable) {
return Flux.from(observable.toFlowable(BackpressureStrategy.BUFFER));
}
//========================================================================================
// TODO Adapt Mono to RxJava Single
Single<User> fromMonoToSingle(Mono<User> mono) {
return Single.fromPublisher(mono);
}
// TODO Adapt RxJava Single to Mono
Mono<User> fromSingleToMono(Single<User> single) {
return Mono.from(single.toFlowable());
}
//========================================================================================
// TODO Adapt Mono to Java 8+ CompletableFuture
CompletableFuture<User> fromMonoToCompletableFuture(Mono<User> mono) {
return mono.toFuture();
}
// TODO Adapt Java 8+ CompletableFuture to Mono
Mono<User> fromCompletableFutureToMono(CompletableFuture<User> future) {
return Mono.fromFuture(future);
}
}
10. Other Operations
Description
In this section, we'll have a look at a few more useful operators that don't fall into the broad categories we explored earlier. Reactor 3 contains a lot of operators, so don't hesitate to have a look at the Flux and Mono javadocs as well as the reference guide to learn about more of them.
Practice
In the first exercise we'll receive 3 Flux<String>
. Their elements could arrive with latency, yet each time the three sequences have all emitted an element, we want to combine these 3 elements and create a new User
. This concatenate-and-transform operation is called zip
.
If you have 3 possible Mono sources and you only want to keep the fastest one, you can use the first
static method.
For Flux
, a similar result can be achieved using the first
static method. In this case it's the flux which emits an initial element first which is selected. Flux aren't mixed.
Sometimes you're not interested in elements of a Flux<T>
. If you want to still keep a Flux<T>
type, you can use ignoreElements()
. But if you really just want the completion, represented as a Mono<Void>
, you can use then()
instead.
Reactive Streams does not allow null values in onNext
. There's an operator that allow to just emit one value, unless it is null in which case it will revert to an empty Mono
. Can you find it?
Similarly, if you want to prevent the empty Mono
case by falling back to a different one, you can find an operator that does this switch.
Sometimes you want to capture all values emitted by Flux
into separate List
. In this case you can use collectList
operator that would return Mono
containing that List
.
There are more operators belonging to the collect family. You can check them out in Flux
documentation.
Part08OtherOperations.java
package io.pivotal.literx;
import io.pivotal.literx.domain.User;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* Learn how to use various other operators.
*
* @author Sebastien Deleuze
*/
public class Part08OtherOperations {
//========================================================================================
// TODO Create a Flux of user from Flux of username, firstname and lastname.
Flux<User> userFluxFromStringFlux(Flux<String> usernameFlux, Flux<String> firstnameFlux, Flux<String> lastnameFlux) {
return Flux
.zip(usernameFlux, firstnameFlux, lastnameFlux)
.map(t -> new User(t.getT1(), t.getT2(), t.getT3()));
}
//========================================================================================
// TODO Return the mono which returns its value faster
Mono<User> useFastestMono(Mono<User> mono1, Mono<User> mono2) {
return Mono.first(mono1, mono2);
}
//========================================================================================
// TODO Return the flux which returns the first value faster
Flux<User> useFastestFlux(Flux<User> flux1, Flux<User> flux2) {
return Flux.first(flux1, flux2);
}
//========================================================================================
// TODO Convert the input Flux<User> to a Mono<Void> that represents the complete signal of the flux
Mono<Void> fluxCompletion(Flux<User> flux) {
return flux.then();
}
//========================================================================================
// TODO Return a valid Mono of user for null input and non null input user (hint: Reactive Streams do not accept null values)
Mono<User> nullAwareUserToMono(User user) {
return Mono.justOrEmpty(user);
}
//========================================================================================
// TODO Return the same mono passed as input parameter, expect that it will emit User.SKYLER when empty
Mono<User> emptyToSkyler(Mono<User> mono) {
return mono.defaultIfEmpty(User.SKYLER);
}
}
11. Reactive to Blocking
Sometimes you can only migrate part of your code to be reactive, and you need to reuse reactive sequences in more imperative code.
Thus if you need to block until the value from a Mono
is available, use Mono#block()
method. It will throw an Exception
if the onError
event is triggered.
Note that you should avoid this by favoring having reactive code end-to-end, as much as possible. You MUST avoid this at all cost in the middle of other reactive code, as this has the potential to lock your whole reactive pipeline.
Similarly, you can block for the first or last value in a Flux
with blockFirst()
/blockLast()
. You can also transform a Flux
to an Iterable
with toIterable
. Same restrictions as above still apply.
Part10ReactiveToBlocking.java
package io.pivotal.literx;
//generic imports to help with simpler IDEs (ie tech.io)
import java.util.*;
import java.util.function.*;
import java.time.*;
import io.pivotal.literx.domain.User;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class Part10ReactiveToBlocking {
//========================================================================================
// TODO Return the user contained in that Mono
User monoToValue(Mono<User> mono) {
return mono.block();
}
//========================================================================================
// TODO Return the users contained in that Flux
Iterable<User> fluxToValues(Flux<User> flux) {
return flux.toIterable();
}
}
12. Blocking to Reactive
Description
The big question is "How to deal with legacy, non reactive code?".
Say you have blocking code (eg. a JDBC connection to a database), and you want to integrate that into your reactive pipelines while avoiding too much of an impact on performance.
The best course of action is to isolate such intrinsically blocking parts of your code into their own execution context via a Scheduler
, keeping the efficiency of the rest of the pipeline high and only creating extra threads when absolutely needed.
In the JDBC example you could use the fromIterable
factory method. But how do you prevent the call to block the rest of the pipeline?
Practice
The subscribeOn
method allow to isolate a sequence from the start on a provided Scheduler
. For example, the Schedulers.elastic()
will create a pool of threads that grows on demand, releasing threads that haven't been used in a while automatically.
Use that trick to slowly read all users from the blocking repository
in the first exercise. Note that you will need to wrap the call to the repository inside a Flux.defer
lambda.
For slow subscribers (eg. saving to a database), you can isolate a smaller section of the sequence with the publishOn
operator. Unlike subscribeOn
, it only affects the part of the chain below it, switching it to a new Scheduler
.
As an example, you can use doOnNext
to perform a save
on the repository
, but first use the trick above to isolate that save into its own execution context. You can make it more explicit that you're only interested in knowing if the save succeeded or failed by chaining the then()
operator at the end, which returns a Mono<Void>
.
Part11BlockingToReactive.java
package io.pivotal.literx;
//generic imports to help with simpler IDEs (ie tech.io)
import java.util.*;
import java.util.function.*;
import java.time.*;
import io.pivotal.literx.domain.User;
import io.pivotal.literx.repository.BlockingRepository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
/**
* Learn how to call blocking code from Reactive one with adapted concurrency strategy for
* blocking code that produces or receives data.
*
* For those who know RxJava:
* - RxJava subscribeOn = Reactor subscribeOn
* - RxJava observeOn = Reactor publishOn
* - RxJava Schedulers.io <==> Reactor Schedulers.elastic
*
* @author Sebastien Deleuze
* @see Flux#subscribeOn(Scheduler)
* @see Flux#publishOn(Scheduler)
* @see Schedulers
*/
public class Part11BlockingToReactive {
//========================================================================================
// TODO Create a Flux for reading all users from the blocking repository deferred until the flux is subscribed, and run it with an elastic scheduler
Flux<User> blockingRepositoryToFlux(BlockingRepository<User> repository) {
return Flux.defer(() ->
Flux.fromIterable(repository.findAll())
.subscribeOn(Schedulers.elastic())
);
}
//========================================================================================
// TODO Insert users contained in the Flux parameter in the blocking repository using an elastic scheduler and return a Mono<Void> that signal the end of the operation
Mono<Void> fluxToBlockingRepository(Flux<User> flux, BlockingRepository<User> repository) {
return flux.publishOn(Schedulers.elastic()).doOnNext(repository::save).then();
}
}