Reactive Programming with Reactor 3

1. Introduction to Reactive Programming

Reactor 3 is a library built around the Reactive Streams specification, bringing the paradigm of Reactive Programming on the JVM.

In this course, you'll familiarize with the Reactor API. So let's make a quick introduction to the more general concepts in Reactive Streams and Reactive Programming.

Why

Reactive Programming is a new paradigm in which you use declarative code (in a manner that is similar to functional programming) in order to build asynchronous processing pipelines. It is an event-based model where data is pushed to the consumer, as it becomes available: we deal with asynchronous sequences of events.

This is important in order to be more efficient with resources and increase an application's capacity to serve large number of clients, without the headache of writing low-level concurrent or and/or parallelized code.

By being built around the core pillars of being fully asynchronous and non-blocking, Reactive Programming is an alternative to the more limited ways of doing asynchronous code in the JDK: namely Callback based APIs and Future.

It also facilitates composition, which in turn makes asynchronous code more readable and maintainable.

Reactive Streams

The Reactive Streams specification is an industry-driven effort to standardize Reactive Programming libraries on the JVM, and more importantly specify how they must behave so that they are interoperable. Implementors include Reactor 3 but also RxJava from version 2 and above, Akka Streams, Vert.x and Ratpack.

It contains 4 very simple interfaces as well as a TCK, which shouldn't be overlooked since it is the rules of the specification that bring the most value to it.

From a user perspective however, it is fairly low-level. Reactor 3 aims at offering an higher level API that can be leverage in a large breadth of situations, building it on top of Reactive Streams Publisher.

Interactions

In reactive stream sequences, the source Publisher produces data. But by default, it does nothing until a Subscriber has registered (subscribed), at which point it will push data to it.

Reactive Programming with Reactor 3-LMLPHP

Reactor adds the concept of operators, which are chained together to describe what processing to apply at each stage to the data. Applying an operator returns a new intermediate Publisher (in fact it can be thought of as both a Subscriber to the operator upstream and a Publisher for downstream). The final form of the data ends up in the final Subscriber that defines what to do from a user perspective.

Quizz

Publisher<Integer> source = Flux.range(1, 10);
Flux<String> flux = Flux.just("A");
flux.map(s -> "foo" + s);
flux.subscribe(System.out::println);

2. Learn how to create Flux instances

Flux

Description

A Flux<T> is a Reactive Streams Publisher, augmented with a lot of operators that can be used to generate, transform, orchestrate Flux sequences.

It can emit 0 to n <T> elements (onNext event) then either completes or errors (onComplete and onError terminal events). If no terminal event is triggered, the Flux is infinite.

  • Static factories on Flux allow to create sources, or generate them from several callbacks types.
  • Instance methods, the operators, let you build an asynchronous processing pipeline that will produce an asynchronous sequence.
  • Each Flux#subscribe() or multicasting operation such as Flux#publish and Flux#publishNext will materialize a dedicated instance of the pipeline and trigger the data flow inside it.

See the javadoc here.

Reactive Programming with Reactor 3-LMLPHP

Flux in action :

Flux.fromIterable(getSomeLongList())
    .delayElements(Duration.ofMillis(100))
    .doOnNext(serviceA::someObserver)
    .map(d -> d * 2)
    .take(3)
    .onErrorResumeWith(errorHandler::fallback)
    .doAfterTerminate(serviceM::incrementTerminate)
    .subscribe(System.out::println);

Practice

In this lesson we'll see different factory methods to create a Flux.

Let's try a very simple example: just return an empty flux.

static <T> Flux<T> empty()
// Create a Flux that completes without emitting any item.

One can also create a Flux out of readily available data:

static <T> Flux<T> just(T... data)
// Create a new Flux that emits the specified item(s) and then complete.

This time we will use items of a list to publish on the flux with fromIterable:

static <T> Flux<T> fromIterable(Iterable<? extends T> it)
// Create a Flux that emits the items contained in the provided Iterable.

In imperative synchronous code, it's easy to manage exceptions with familiar try-catch blocks, throw instructions...

But in an asynchronous context, we have to do things a bit differently. Reactive Streams defines the onError signal to deal with exceptions. Note that such an event is terminal: this is the last event the Flux will produce.

Flux#error produces a Flux that simply emits this signal, terminating immediately:

static <T> Flux<T> error(Throwable error)
// Create a Flux that completes with the specified error.

To finish with Flux, let's try to create a Flux that produces ten elements, at a regular pace. In order to do that regular publishing, we can use interval. But it produces an infinite stream (like ticks of a clock), and we want to take only 10 elements, so don't forget to precise it.

static Flux<Long> interval(Duration period)
// Create a new Flux that emits an ever incrementing long starting with 0 every period on the global timer.

Part01Flux.java

package io.pivotal.literx;

//generic imports to help with simpler IDEs (ie tech.io)
import java.util.*;
import java.util.function.*;
import java.time.*;

import reactor.core.publisher.Flux;

/**
 * Learn how to create Flux instances.
 *
 * @author Sebastien Deleuze
 * @see <a href="https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html">Flux Javadoc</a>
 */
public class Part01Flux {

//========================================================================================

	// TODO Return an empty Flux
	Flux<String> emptyFlux() {
		return Flux.empty();
	}

//========================================================================================

	// TODO Return a Flux that contains 2 values "foo" and "bar" without using an array or a collection
	Flux<String> fooBarFluxFromValues() {
		return Flux.just("foo","bar");
	}

//========================================================================================

	// TODO Create a Flux from a List that contains 2 values "foo" and "bar"
	Flux<String> fooBarFluxFromList() {
		List l = new ArrayList<>();
        l.add("foo");
        l.add("bar");
        return Flux.fromIterable(l);
	}

//========================================================================================

	// TODO Create a Flux that emits an IllegalStateException
	Flux<String> errorFlux() {
		return Flux.error(new IllegalStateException());
	}

//========================================================================================

		// TODO Create a Flux that emits increasing values from 0 to 9 each 100ms
	Flux<Long> counter() {
		return Flux.interval(Duration.ofMillis(100)).take(10);
	}

}

3. Learn how to create Mono instances

Mono

Description

A Mono<T> is a Reactive Streams Publisher, also augmented with a lot of operators that can be used to generate, transform, orchestrate Mono sequences.

It is a specialization of Flux that can emit at most 1 <T> element: a Mono is either valued (complete with element), empty (complete without element) or failed (error).

A Mono<Void> can be used in cases where only the completion signal is interesting (the Reactive Streams equivalent of a Runnable task completing).

Like for Flux, the operators can be used to define an asynchronous pipeline which will be materialized anew for each Subscription.

Note that some API that change the sequence's cardinality will return a Flux (and vice-versa, APIs that reduce the cardinality to 1 in a Flux return a Mono).

See the javadoc here.

Reactive Programming with Reactor 3-LMLPHP

Mono in action:

Mono.just(1)
    .map(integer -> "foo" + integer)
    .or(Mono.delay(Duration.ofMillis(100)))
    .subscribe(System.out::println);

Practice

As for the Flux let's return a empty Mono using the static factory.

static <T> Mono<T> empty()
// Create a Mono that completes without emitting any item.

Now, we will try to create a Mono which never emits anything. Unlike empty(), it won't even emit an onComplete event.

Like Flux, you can create a Mono from an available (unique) value.

And exactly as we did for the flux, we can propagate exceptions.

Part02Mono.java

package io.pivotal.literx;

import reactor.core.publisher.Mono;

/**
 * Learn how to create Mono instances.
 *
 * @author Sebastien Deleuze
 * @see <a href="https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html">Mono Javadoc</a>
 */
public class Part02Mono {

//========================================================================================

	// TODO Return an empty Mono
	Mono<String> emptyMono() {
		return Mono.empty();
	}

//========================================================================================

	// TODO Return a Mono that never emits any signal
	Mono<String> monoWithNoSignal() {
		return Mono.never();
	}

//========================================================================================

	// TODO Return a Mono that contains a "foo" value
	Mono<String> fooMono() {
		return Mono.just("foo");
	}

//========================================================================================

	// TODO Create a Mono that emits an IllegalStateException
	Mono<String> errorMono() {
		return Mono.error( new IllegalStateException() );
	}

}

4. StepVerifier and how to use it

StepVerifier

Description

Until now, your solution for each exercise was checked by passing the Publisher you defined to a test using a StepVerifier.

This class from the reactor-test artifact is capable of subscribing to any Publisher (eg. a Flux or an Akka Stream...) and then assert a set of user-defined expectations with regard to the sequence.

If any event is triggered that doesn't match the current expectation, the StepVerifier will produce an AssertionError.

You can obtain an instance of StepVerifier from the static factory create. It offers a DSL to set up expectations on the data part and finish with a single terminal expectation (completion, error, cancellation...).

Note that you must always call the verify() method or one of the shortcuts that combine the terminal expectation and verify, like .verifyErrorMessage(String). Otherwise the StepVerifier won't subscribe to your sequence and nothing will be asserted.

StepVerifier.create(T<Publisher>).{expectations...}.verify()

There are a lot of possible expectations, see the reference documentation and the javadoc.

Practice

In these exercises, the methods get a Flux or Mono as a parameter and you'll need to test its behavior. You should create a StepVerifier that uses said Flux/Mono, describes expectations about it and verifies it.

Let's verify the sequence passed to the first test method emits two specific elements, "foo" and "bar", and that the Flux then completes successfully.

Now, let's do the same test but verifying that an exception is propagated at the end.

Let's try to create a StepVerifier with an expectation on a User's getUsername() getter. Some expectations can work by checking a Predicate on the next value, or even by consuming the next value by passing it to an assertion library like Assertions.assertThat(T) from AssertJ. Try these lambda-based versions (for instance StepVerifier#assertNext with a lambda using an AssertJ assertion like assertThat(...).isEqualTo(...)):

On this next test we will receive a Flux which takes some time to emit. As you can expect, the test will take some time to run.

The next one is even worse: it emits 1 element per second, completing only after having emitted 3600 of them!

Since we don't want our tests to run for hours, we need a way to speed that up while still being able to assert the data itself (eliminating the time factor).

Fortunately, StepVerifier comes with a virtual time option: by using StepVerifier.withVirtualTime(Supplier<Publisher>), the verifier will temporarily replace default core Schedulers (the component that define the execution context in Reactor). All these default Scheduler are replaced by a single instance of a VirtualTimeScheduler, which has a virtual clock that can be manipulated.

In order for the operators to pick up that Scheduler, you should lazily build your operator chain inside the lambda passed to withVirtualTime.

You must then advance time as part of your test scenario, by calling either thenAwait(Duration) or expectNoEvent(Duration). The former simply advances the clock, while the later additionally fails if any unexpected event triggers during the provided duration (note that almost all the time there will at least be a "subscription" event even though the clock hasn't advanced, so you should usually put a expectSubscription() after .withVirtualTime() if you're going to use expectNoEvent right after).

StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofHours(3)))
            .expectSubscription()
            .expectNoEvent(Duration.ofHours(2))
            .thenAwait(Duration.ofHours(1))
            .expectNextCount(1)
            .expectComplete()
            .verify();

Let's try that by making a fast test of our hour-long publisher:

Part03StepVerifier.java

/*
 * Copyright 2002-2016 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package io.pivotal.literx;

import java.time.Duration;
import java.util.function.Supplier;

import io.pivotal.literx.domain.User;
import org.assertj.core.api.Assertions;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

/**
 * Learn how to use StepVerifier to test Mono, Flux or any other kind of Reactive Streams Publisher.
 *
 * @author Sebastien Deleuze
 * @see <a href="https://projectreactor.io/docs/test/release/api/reactor/test/StepVerifier.html">StepVerifier Javadoc</a>
 */
public class Part03StepVerifier {

//========================================================================================

	// TODO Use StepVerifier to check that the flux parameter emits "foo" and "bar" elements then completes successfully.
	void expectFooBarComplete(Flux<String> flux) {
		StepVerifier.create(flux)
				.expectNext("foo", "bar")
				.verifyComplete();
	}

//========================================================================================

	// TODO Use StepVerifier to check that the flux parameter emits "foo" and "bar" elements then a RuntimeException error.
	void expectFooBarError(Flux<String> flux) {
		StepVerifier.create(flux)
				.expectNext("foo", "bar")
				.verifyError(RuntimeException.class);
	}

//========================================================================================

	// TODO Use StepVerifier to check that the flux parameter emits a User with "swhite"username
	// and another one with "jpinkman" then completes successfully.
	void expectSkylerJesseComplete(Flux<User> flux) {
		StepVerifier.create(flux)
				.assertNext(t -> Assertions.assertThat(t.getUsername()).isEqualTo("swhite"));
	}

//========================================================================================

	// TODO Expect 10 elements then complete and notice how long the test takes.
	void expect10Elements(Flux<Long> flux) {
		StepVerifier.create(flux).expectNextCount(10).verifyComplete();
	}

//========================================================================================

	// TODO Expect 3600 elements at intervals of 1 second, and verify quicker than 3600s
	// by manipulating virtual time thanks to StepVerifier#withVirtualTime, notice how long the test takes
	void expect3600Elements(Supplier<Flux<Long>> supplier) {
		StepVerifier.withVirtualTime(supplier).expectSubscription()
				.expectNoEvent(Duration.ofSeconds(1))
				.thenAwait(Duration.ofHours(1))
				.expectNextCount(10)
				.expectComplete()
				.verify();
	}

	private void fail() {
		throw new AssertionError("workshop not implemented");
	}

}

5. Learn to transform our asynchronous data

Transform

Description

Reactor ships with several operators that can be used to transform data.

Practice

In the first place, we will capitalize a String. Since this is a simple 1-1 transformation with no expected latency, we can use the map operator with a lambda transforming a T into a U.

We can use exactly the same code on a Flux, applying the mapping to each element as it becomes available.

Now imagine that we have to call a webservice to capitalize our String. This new call can have latency so we cannot use the synchronous map anymore. Instead, we want to represent the asynchronous call as a Flux or Mono, and use a different operator: flatMap.

flatMap takes a transformation Function that returns a Publisher<U> instead of a U. This publisher represents the asynchronous transformation to apply to each element. If we were using it with map, we'd obtain a stream of Flux<Publisher<U>>. Not very useful.

But flatMap on the other hand knows how to deal with these inner publishers: it will subscribe to them then merge all of them into a single global output, a much more useful Flux<U>. Note that if values from inner publishers arrive at different times, they can interleave in the resulting Flux.

Reactive Programming with Reactor 3-LMLPHP

Part04Transform.java

package io.pivotal.literx;

import io.pivotal.literx.domain.User;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
 * Learn how to transform values.
 *
 * @author Sebastien Deleuze
 */
public class Part04Transform {

//========================================================================================

	// TODO Capitalize the user username, firstname and lastname
	Mono<User> capitalizeOne(Mono<User> mono) {
		return mono.map( t -> new User(
				t.getUsername().toUpperCase(),
				t.getFirstname().toUpperCase(),
				t.getLastname().toUpperCase()));
	}

//========================================================================================

	// TODO Capitalize the users username, firstName and lastName
	Flux<User> capitalizeMany(Flux<User> flux) {
		return flux.map( t -> new User(
				t.getUsername().toUpperCase(),
				t.getFirstname().toUpperCase(),
				t.getLastname().toUpperCase()));
	}


//========================================================================================

	// TODO Capitalize the users username, firstName and lastName using #asyncCapitalizeUser
	Flux<User> asyncCapitalizeMany(Flux<User> flux) {
		return flux.flatMap( this::asyncCapitalizeUser);
	}

	Mono<User> asyncCapitalizeUser(User u) {
		return Mono.just(new User(u.getUsername().toUpperCase(), u.getFirstname().toUpperCase(), u.getLastname().toUpperCase()));
	}

}

6. Merge

Merging sequences is the operation consisting of listening for values from several Publishers and emitting them in a single Flux.

On this first exercise we will begin by merging elements of two Flux as soon as they arrive. The caveat here is that values from flux1 arrive with a delay, so in the resulting Flux we start seeing values from flux2 first.

But if we want to keep the order of sources, we can use the concat operator. Concat will wait for flux1 to complete before it can subscribe to flux2, ensuring that all the values from flux1 have been emitted, thus preserving an order corresponding to the source.

You can use concat with several Publisher. For example, you can get two Mono and turn them into a same-order Flux:

Part05Merge.java

package io.pivotal.literx;

import io.pivotal.literx.domain.User;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
 * Learn how to merge flux.
 *
 * @author Sebastien Deleuze
 */
public class Part05Merge {

//========================================================================================

	// TODO Merge flux1 and flux2 values with interleave
	Flux<User> mergeFluxWithInterleave(Flux<User> flux1, Flux<User> flux2) {
		return flux1.mergeWith(flux2);
	}

//========================================================================================

	// TODO Merge flux1 and flux2 values with no interleave (flux1 values and then flux2 values)
	Flux<User> mergeFluxWithNoInterleave(Flux<User> flux1, Flux<User> flux2) {
		return flux1.concatWith(flux2);
	}

//========================================================================================

	// TODO Create a Flux containing the value of mono1 then the value of mono2
	Flux<User> createFluxFromMultipleMono(Mono<User> mono1, Mono<User> mono2) {
		return Flux.concat(mono1, mono2);
	}

}

7. Request

Description

Remember this diagram?

Reactive Programming with Reactor 3-LMLPHP

There's one aspect to it that we didn't cover: the volume control. In Reactive Streams terms this is called backpressure. It is a feedback mechanism that allows a Subscriber to signal to its Publisher how much data it is prepared to process, limiting the rate at which the Publisher produces data.

This control of the demand is done at the Subscription level: a Subscription is created for each subscribe() call and it can be manipulated to either cancel() the flow of data or tune demand with request(long).

Making a request(Long.MAX_VALUE) means an unbounded demand, so the Publisher will emit data at its fastest pace.

Practice

The demand can be tuned in the StepVerifier as well, by using the relevant parameter to create and withVirtualTime for the initial request, then chaining in thenRequest(long) in your expectations for further requests.

In this first example, create a StepVerifier that produces an initial unbounded demand and verifies 4 values to be received, before completion. This is equivalent to the way you've been using StepVerifier so far.

Next we will request values one by one: for that you need an initial request, but also a second single request after you've received and asserted the first element.

Without more request, the source will never complete unless you cancel it. This can be done instead of the terminal expectations by using .thenCancel().

A note on debugging

How to check that the previous sequence was requested one by one, and that a cancellation happened?

It's important to be able to debug reactive APIs, so in the next example we will make use of the log operator to know exactly what happens in term of signals and events.

Use the repository to get a Flux of all users, then apply a log to it. Observe in the console below how the underlying test requests it, and the other events like subscribe, onNext...

If you want to perform custom actions without really modifying the elements in the sequence, you can use the "side effect" methods that start with doOn.

For example, if you want to print "Starting:" upon subscription, use doOnSubscribe.

Each doOn method takes a relevant callback representing the custom action for the corresponding event.

Note that you should not block or invoke operations with latency in these callbacks (which is also true of other operator callbacks like map): it's more for quick operations.

Go ahead an modify the first two methods in this exercise in order to get some insight into their sequences using log and doOnXXX.

Part06Request.java

package io.pivotal.literx;

import io.pivotal.literx.domain.User;
import io.pivotal.literx.repository.ReactiveRepository;
import io.pivotal.literx.repository.ReactiveUserRepository;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

/**
 * Learn how to control the demand.
 *
 * @author Sebastien Deleuze
 */
public class Part06Request {

	ReactiveRepository<User> repository = new ReactiveUserRepository();

//========================================================================================

	// TODO Create a StepVerifier that initially requests all values and expect 4 values to be received
	StepVerifier requestAllExpectFour(Flux<User> flux) {
		return StepVerifier
				.create(flux)
				.expectSubscription()
				.thenRequest(Long.MAX_VALUE)
				.expectNextCount(4)
				.expectComplete();
	}

//========================================================================================

	// TODO Create a StepVerifier that initially requests 1 value and expects User.SKYLER then requests another value and expects User.JESSE then stops verifying by cancelling the source
	StepVerifier requestOneExpectSkylerThenRequestOneExpectJesse(Flux<User> flux) {
		return StepVerifier
				.create(flux)
				.thenRequest(1).expectNext(User.SKYLER)
				.thenRequest(1).expectNext(User.JESSE)
				.thenCancel();
	}

//========================================================================================

	// TODO Return a Flux with all users stored in the repository that prints automatically logs for all Reactive Streams signals
	Flux<User> fluxWithLog() {
		return repository.findAll().log();
	}

//========================================================================================

	// TODO Return a Flux with all users stored in the repository that prints "Starring:" on subscribe, "firstname lastname" for all values and "The end!" on complete
	Flux<User> fluxWithDoOnPrintln() {
		return repository
				.findAll()
				.doOnSubscribe(s -> System.out.println("Starring"))
				.doOnNext(u -> System.out.println(u.getFirstname() + " " + u.getLastname()))
				.doOnComplete(() -> System.out.println("The end!"));
	}

}

8. Error

Description

Reactor ships with several operators that can be used to deal with errors: propagate errors but also recover from it (eg. by falling back to a different sequence or by retrying a new Subscription).

Practice

In the first example, we will return a Mono containing default user Saul when an error occurs in the original Mono, using the method onErrorReturn. If you want, you can even limit that fallback to the IllegalStateException class. Use the User#SAUL constant.

Let's try the same thing with Flux. In this case, we don't just want a single fallback value, but a totally separate sequence (think getting stale data from a cache). This can be achieved with onErrorResume, which falls back to a Publisher<T>.

Emit bothUser#SAUL and User#JESSE whenever there is an error in the original FLux.

Dealing with checked exceptions is a bit more complicated. Whenever some code that throws checked exceptions is used in an operator (eg. the transformation function of a map), you will need to deal with it. The most straightforward way is to make a more complex lambda with a try-catch block that will transform the checked exception into a RuntimeException that can be signalled downstream.

There is a Exceptions#propagate utility that will wrap a checked exception into a special runtime exception that can be automatically unwrapped by Reactor subscribers and StepVerifier: this avoid seeing an irrelevant RuntimeException in the stacktrace.

Try to use that on the capitalizeMany method within a map: you'll need to catch a GetOutOfHereException, which is checked, but the corresponding test still expects the GetOutOfHereException directly.

Part07Errors.java

/*
 * Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *       https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package io.pivotal.literx;

import io.pivotal.literx.domain.User;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
 * Learn how to deal with errors.
 *
 * @author Sebastien Deleuze
 * @see Exceptions#propagate(Throwable)
 */
public class Part07Errors {

//========================================================================================

	// TODO Return a Mono<User> containing User.SAUL when an error occurs in the input Mono, else do not change the input Mono.
	Mono<User> betterCallSaulForBogusMono(Mono<User> mono) {
		return mono.onErrorReturn(User.SAUL);
	}

//========================================================================================

	// TODO Return a Flux<User> containing User.SAUL and User.JESSE when an error occurs in the input Flux, else do not change the input Flux.
	Flux<User> betterCallSaulAndJesseForBogusFlux(Flux<User> flux) {
		return flux.onErrorResume(e -> Flux.just(User.SAUL, User.JESSE));
	}

//========================================================================================

	// TODO Implement a method that capitalizes each user of the incoming flux using the
	// #capitalizeUser method and emits an error containing a GetOutOfHereException error
	Flux<User> capitalizeMany(Flux<User> flux) {
		return flux.map(
				u -> {
					try {
						return capitalizeUser(u);
					} catch (GetOutOfHereException e) {
						throw Exceptions.propagate(e);
					}
				}
		);
	}

	User capitalizeUser(User user) throws GetOutOfHereException {
		if (user.equals(User.SAUL)) {
			throw new GetOutOfHereException();
		}
		return new User(user.getUsername(), user.getFirstname(), user.getLastname());
	}

	protected final class GetOutOfHereException extends Exception {
	    private static final long serialVersionUID = 0L;
	}

}

9. Adapt

You can make RxJava3 and Reactor 3 types interact without a single external library.

In the first two examples we will adapt from Flux to Flowable, which implements Publisher, and vice-versa.

This is straightforward as both libraries provide a factory method to do that conversion from any Publisher. The checker below runs the two opposite conversions in one go.

The next two examples are a little trickier: we need to adapt between Flux and Observable, but the later doesn't implement Publisher.

In the first case, you can transform any publisher to Observable. In the second case, you have to first transform the Observable into a Flowable, which forces you to define a strategy to deal with backpressure (RxJava 3 Observable doesn't support backpressure).

Next, let's try to transform a Mono to a RxJava Single, and vice-versa. You can simply call the firstOrError method from Observable. For the other way around, you'll once again need to transform the Single into a Flowable first.

Finally, you can easily transform a Mono to a Java 8 CompletableFuture and vice-versa. Notice how these conversion methods all begin with from (when converting an external type to a Reactor one) and to (when converting a Reactor type to an external one).

Part09Adapt.java

/*
 * Copyright 2002-2016 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package io.pivotal.literx;

import java.util.concurrent.CompletableFuture;

import io.pivotal.literx.domain.User;
import io.reactivex.rxjava3.core.BackpressureStrategy;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Single;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
 * Learn how to adapt from/to RxJava 3 Observable/Single/Flowable and Java 8+ CompletableFuture.
 *
 * Mono and Flux already implements Reactive Streams interfaces so they are natively
 * Reactive Streams compliant + there are {@link Mono#from(Publisher)} and {@link Flux#from(Publisher)}
 * factory methods.
 *
 * For RxJava 3, you should not use Reactor Adapter but only RxJava 3 and Reactor Core.
 *
 * @author Sebastien Deleuze
 */
public class Part09Adapt {

//========================================================================================

	// TODO Adapt Flux to RxJava Flowable
	Flowable<User> fromFluxToFlowable(Flux<User> flux) {
		return Flowable.fromPublisher(flux);
	}

	// TODO Adapt RxJava Flowable to Flux
	Flux<User> fromFlowableToFlux(Flowable<User> flowable) {
		return Flux.from(flowable);
	}

//========================================================================================

	// TODO Adapt Flux to RxJava Observable
	Observable<User> fromFluxToObservable(Flux<User> flux) {
		return Observable.fromPublisher(flux);
	}

	// TODO Adapt RxJava Observable to Flux
	Flux<User> fromObservableToFlux(Observable<User> observable) {
		return Flux.from(observable.toFlowable(BackpressureStrategy.BUFFER));
	}

//========================================================================================

	// TODO Adapt Mono to RxJava Single
	Single<User> fromMonoToSingle(Mono<User> mono) {
		return Single.fromPublisher(mono);
	}

	// TODO Adapt RxJava Single to Mono
	Mono<User> fromSingleToMono(Single<User> single) {
		return Mono.from(single.toFlowable());
	}

//========================================================================================

	// TODO Adapt Mono to Java 8+ CompletableFuture
	CompletableFuture<User> fromMonoToCompletableFuture(Mono<User> mono) {
		return mono.toFuture();
	}

	// TODO Adapt Java 8+ CompletableFuture to Mono
	Mono<User> fromCompletableFutureToMono(CompletableFuture<User> future) {
		return Mono.fromFuture(future);
	}

}

10. Other Operations

Description

In this section, we'll have a look at a few more useful operators that don't fall into the broad categories we explored earlier. Reactor 3 contains a lot of operators, so don't hesitate to have a look at the Flux and Mono javadocs as well as the reference guide to learn about more of them.

Practice

In the first exercise we'll receive 3 Flux<String>. Their elements could arrive with latency, yet each time the three sequences have all emitted an element, we want to combine these 3 elements and create a new User. This concatenate-and-transform operation is called zip.

If you have 3 possible Mono sources and you only want to keep the fastest one, you can use the first static method.

For Flux, a similar result can be achieved using the first static method. In this case it's the flux which emits an initial element first which is selected. Flux aren't mixed.

Sometimes you're not interested in elements of a Flux<T>. If you want to still keep a Flux<T> type, you can use ignoreElements(). But if you really just want the completion, represented as a Mono<Void>, you can use then() instead.

Reactive Streams does not allow null values in onNext. There's an operator that allow to just emit one value, unless it is null in which case it will revert to an empty Mono. Can you find it?

Similarly, if you want to prevent the empty Mono case by falling back to a different one, you can find an operator that does this switch.

Sometimes you want to capture all values emitted by Flux into separate List. In this case you can use collectList operator that would return Mono containing that List.

There are more operators belonging to the collect family. You can check them out in Flux documentation.

Part08OtherOperations.java

package io.pivotal.literx;

import io.pivotal.literx.domain.User;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
 * Learn how to use various other operators.
 *
 * @author Sebastien Deleuze
 */
public class Part08OtherOperations {

//========================================================================================

	// TODO Create a Flux of user from Flux of username, firstname and lastname.
	Flux<User> userFluxFromStringFlux(Flux<String> usernameFlux, Flux<String> firstnameFlux, Flux<String> lastnameFlux) {
		return Flux
				.zip(usernameFlux, firstnameFlux, lastnameFlux)
				.map(t -> new User(t.getT1(), t.getT2(), t.getT3()));
	}

//========================================================================================

	// TODO Return the mono which returns its value faster
	Mono<User> useFastestMono(Mono<User> mono1, Mono<User> mono2) {
		return Mono.first(mono1, mono2);
	}

//========================================================================================

	// TODO Return the flux which returns the first value faster
	Flux<User> useFastestFlux(Flux<User> flux1, Flux<User> flux2) {
		return Flux.first(flux1, flux2);
	}

//========================================================================================

	// TODO Convert the input Flux<User> to a Mono<Void> that represents the complete signal of the flux
	Mono<Void> fluxCompletion(Flux<User> flux) {
		return flux.then();
	}

//========================================================================================

	// TODO Return a valid Mono of user for null input and non null input user (hint: Reactive Streams do not accept null values)
	Mono<User> nullAwareUserToMono(User user) {
		return Mono.justOrEmpty(user);
	}

//========================================================================================

	// TODO Return the same mono passed as input parameter, expect that it will emit User.SKYLER when empty
	Mono<User> emptyToSkyler(Mono<User> mono) {
		return mono.defaultIfEmpty(User.SKYLER);
	}

}

11. Reactive to Blocking

Sometimes you can only migrate part of your code to be reactive, and you need to reuse reactive sequences in more imperative code.

Thus if you need to block until the value from a Mono is available, use Mono#block() method. It will throw an Exception if the onError event is triggered.

Note that you should avoid this by favoring having reactive code end-to-end, as much as possible. You MUST avoid this at all cost in the middle of other reactive code, as this has the potential to lock your whole reactive pipeline.

Similarly, you can block for the first or last value in a Flux with blockFirst()/blockLast(). You can also transform a Flux to an Iterable with toIterable. Same restrictions as above still apply.

Part10ReactiveToBlocking.java

package io.pivotal.literx;

//generic imports to help with simpler IDEs (ie tech.io)
import java.util.*;
import java.util.function.*;
import java.time.*;

import io.pivotal.literx.domain.User;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class Part10ReactiveToBlocking {

//========================================================================================

	// TODO Return the user contained in that Mono
	User monoToValue(Mono<User> mono) {
		return mono.block();
	}

//========================================================================================

	// TODO Return the users contained in that Flux
	Iterable<User> fluxToValues(Flux<User> flux) {
		return flux.toIterable();
	}

}

12. Blocking to Reactive

Description

The big question is "How to deal with legacy, non reactive code?".

Say you have blocking code (eg. a JDBC connection to a database), and you want to integrate that into your reactive pipelines while avoiding too much of an impact on performance.

The best course of action is to isolate such intrinsically blocking parts of your code into their own execution context via a Scheduler, keeping the efficiency of the rest of the pipeline high and only creating extra threads when absolutely needed.

In the JDBC example you could use the fromIterable factory method. But how do you prevent the call to block the rest of the pipeline?

Practice

The subscribeOn method allow to isolate a sequence from the start on a provided Scheduler. For example, the Schedulers.elastic() will create a pool of threads that grows on demand, releasing threads that haven't been used in a while automatically.

Use that trick to slowly read all users from the blocking repository in the first exercise. Note that you will need to wrap the call to the repository inside a Flux.defer lambda.

For slow subscribers (eg. saving to a database), you can isolate a smaller section of the sequence with the publishOn operator. Unlike subscribeOn, it only affects the part of the chain below it, switching it to a new Scheduler.

As an example, you can use doOnNext to perform a save on the repository, but first use the trick above to isolate that save into its own execution context. You can make it more explicit that you're only interested in knowing if the save succeeded or failed by chaining the then() operator at the end, which returns a Mono<Void>.

Part11BlockingToReactive.java

package io.pivotal.literx;

//generic imports to help with simpler IDEs (ie tech.io)
import java.util.*;
import java.util.function.*;
import java.time.*;

import io.pivotal.literx.domain.User;
import io.pivotal.literx.repository.BlockingRepository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/**
 * Learn how to call blocking code from Reactive one with adapted concurrency strategy for
 * blocking code that produces or receives data.
 *
 * For those who know RxJava:
 *  - RxJava subscribeOn = Reactor subscribeOn
 *  - RxJava observeOn = Reactor publishOn
 *  - RxJava Schedulers.io <==> Reactor Schedulers.elastic
 *
 * @author Sebastien Deleuze
 * @see Flux#subscribeOn(Scheduler)
 * @see Flux#publishOn(Scheduler)
 * @see Schedulers
 */
public class Part11BlockingToReactive {

//========================================================================================

	// TODO Create a Flux for reading all users from the blocking repository deferred until the flux is subscribed, and run it with an elastic scheduler
	Flux<User> blockingRepositoryToFlux(BlockingRepository<User> repository) {
		return Flux.defer(() ->
			Flux.fromIterable(repository.findAll())
					.subscribeOn(Schedulers.elastic())
		);
	}

//========================================================================================

	// TODO Insert users contained in the Flux parameter in the blocking repository using an elastic scheduler and return a Mono<Void> that signal the end of the operation
	Mono<Void> fluxToBlockingRepository(Flux<User> flux, BlockingRepository<User> repository) {
		return flux.publishOn(Schedulers.elastic()).doOnNext(repository::save).then();
	}

}
03-08 17:56