Java

Reactive’s Looming Doom. Part II: Fundamentals of Reactive

1. Introduction

The last post provided a comparison of multi-threading APIs throughout Java’s lifespan. In this post, we will dive into the Reactive philosophy to see how it differs from the CompletableFuture API

2. Specs

According to the Reactive Streams Specification:

The main goal of Reactive Streams is to govern the exchange of stream data across an asynchronous boundary—think passing elements on to another thread or thread-pool—while ensuring that the receiving side is not forced to buffer arbitrary amounts of data. In other words, back pressure is an integral part of this model in order to allow the queues which mediate between threads to be bounded. In other words, back pressure is an integral part of this model

From the API standpoint, there are 4 interfaces available as a dedicated library and integrated into JDK starting from Java 9:

  • Publisher,
    represents the source of data in the stream. The primary purpose is to provide ability to subscribe

    @FunctionalInterface
    public static interface Publisher<T> {
        public void subscribe(Subscriber<? super T> subscriber);
    }
    
  • Subscriber, represents the receiving side of the stream. The
    main purpose is to
    request data from a Publisher
    via Subscription and provide callbacks for asynchronous processing of the requested data
public static interface Subscriber<T> {

    public void onSubscribe(Subscription subscription);

    public void onNext(T item);

    public void onError(Throwable throwable);

    public void onComplete();
}
  • Subscription, represents a communication interface between a
    Publisher and its Subscriber.
    It is passed to
    the Subscriber
    on subscription via a callback and used by a Subscriber to asynchronously request finite amount of
    data from
    the Publisher
public static interface Subscription {

    public void request(long n);

    public void cancel();
}
  • Processor is just Publisher and Subscriber combined

Requested items are not returned synchronously on Subscription#request(long n) invocation. Instead, the Publisher invokes the Subscriber#onNext(T item) callback for each of the requested items. If all the requested items have been processed, the Subscriber may request more via the Subscription#request(long n). This way back-pressure is addressed in the specification, allowing a Subscriber to request additional data only when it is ready.

It should be noted that the interfaces above describe only the abstract protocol for asynchronous interaction, while real implementations may well be single-threaded. Therefore, in the textbook implementation example given in the specification itself, two types of Subscriber are demonstrated: SyncSubscriber, the onNext method of which is executed by the Publisher‘s thread, and AsyncSubscriber, where onNext is scheduled on a dedicated Executor. That’s the reason why Reactive code may be considered concurrency agnostic: instead of enforcing a specific concurrency model, it leaves it up to a developer.

3. Implementation

Every method of Flux and Mono, the main API of Project Reactor, is based on the fundamental principles described in the specification. Not let’s see how exactly those principles are applied.

3.1 Basics

In Project Reactor, both Flux and Mono implement Publisher, and their APIs are very similar. Therefore a reference to a Flux method (or operator, in reactive terms) is assumed to be in Mono as well, unless stated otherwise.

Since Flux and Mono are Publishers, each subsequent operator is under the hood subscribes to the upstream Publisher, processes the element according to its implementation, and passes the result downstream. For example, in this case .map() subscribes to Flux#just, requests 1 element, applies the passed lambda to the element, passes the result down to .subscribe(), and requests the next element:

Flux.just(1, 2, 3)
  .map(integer -> integer + 1)
  .subscribe(incrementedInteger -> log.info(String.valueOf(incrementedInteger)));

In the solution to the original problem:

  • The .flatMap() requests for 32 elements (by default) from the .buffer()
  • The .buffer() requests 32 * batchSize elements from the Flux.fromIterable(namesList), collects them into lists and passes down to the .flatMap()
  • The .flatMap() invokes the Main::processBatch for each list and passes the result downstream
var finalCounts = Flux.fromIterable(namesList)
    // Split to batches
    .buffer(batchSize)
    // Aggregate intermediate counts asynchronously
    .flatMap(Main::processBatch)
    .........

Both .map() and .buffer() do not switch a thread and will be executed where the Publihser code is executed, but this behavior can be configured by a developer via a special operator.

3.2 Operators .subscribe() and .subscribeOn()

As with Java 8 Stream API, any chain of operators in Reactor must end with a terminal operation. In Reactor such operation generally is .subscribe(). This operator accepts a Subscriber and has a number of overloads, including lambdas: .subscribe(elem -> log.info(elem);Conceptually, this method is non-blocking: a thread where .subscribe() is invoked should not be blocked by it, but in reality it is a bit more tricky. The execution context of the Reactive chain is governed by the Flux#subscribeOn operator. This operator allows a developer to set the thread pool where the operators of the corresponding chain will be executed. In Project Reactor thread pools are represented by a Scheduler class, with a set of standard general-purpose implementations supplied in a Schedulers class , for example Schedulers.boundedElastic().

By default, the chain is executed by the thread where the .subscribe() was invoked:

Flux.just(1, 2, 3)
      .map(integer -> {
          System.out.printf("Incrementing on thread: %s \n", Thread.currentThread().getName());
          return integer + 1;
      })
      .subscribe(integer -> {
          System.out.printf("Got %s int on thread: %s \n", integer, Thread.currentThread().getName());
      });
System.out.printf("I am after the Flux! on thread: %s \n", Thread.currentThread().getName());
      
// Output:
// ---
// Incrementing on thread: main 
// Got 2 int on thread: main 
// Incrementing on thread: main 
// Got 3 int on thread: main 
// Incrementing on thread: main 
// Got 4 int on thread: main 
// I am after the Flux! on thread: main

As you can see from the output, the .subscribe() call behaves as a blocking one since the chain is executed by the main thread. However, if you add .subscribeOn(Schedulers.boundedElastic()) to the chain:

Flux.just(1, 2, 3)
    .map(integer -> {
        System.out.printf("Incrementing on thread: %s \n", Thread.currentThread().getName());
        return integer + 1;
    })
    .subscribeOn(Schedulers.boundedElastic())
    .subscribe(integer -> {
        System.out.printf("Got %s int on thread: %s \n", integer, Thread.currentThread().getName());
    });
System.out.printf("I am after the Flux! on thread: %s \n", Thread.currentThread().getName());
Thread.sleep(Long.MAX_VALUE);

// Output:
// ---
// I am after the Flux! on thread: main 
// Incrementing on thread: boundedElastic-1 
// Got 2 int on thread: boundedElastic-1 
// Incrementing on thread: boundedElastic-1 
// Got 3 int on thread: boundedElastic-1 
// Incrementing on thread: boundedElastic-1 
// Got 4 int on thread: boundedElastic-1

The output shows that the chain execution thread has changed. Moreover, it was necessary to add Thread.sleep() at the end so that the program does not exit prematurely.

Also, similar to the Stream API, chain methods will not be executed until .subscribe() is called, unlike CompletableFuture#supplyAsync which runs the passed code immediately.

An attentive reader might wonder what is a .block() that was used in the solution to the original problem instead of .subscribe()? It’s simple: Mono#block() is a Subscriber implementation that blocks the calling thread until the Mono is finished and returns the element it produces. There is a similar method for FluxFlux#blockLast(). These methods serve as a bridge between blocking and non-blocking APIs, and their overuse is discouraged.

3.3 Operator .flatMap()

This, essential for every monad, operator in Project Reactor has a special meaning. It accepts a function that returns a Publisher for each element of the stream, subscribes to this publisher, and passes the items produced by the created publisher downstream. Unlike the .map() operator that simply processes an accepted element, with the .flatMap() a developer has full control over creating an internal Publisher, including its execution context via subscribeOn()!

In order to demonstrate this on the existing solution, let’s increase the names count up to 100000000, reduce the batches count to 10, and add some debug output:

public class Main {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // Generate names
        Random r = new Random();
        var names = List.of("Joe", "Monica", "Chandler", "Phoebe", "Rachel", "Ross", "Janice");
        var namesList = IntStream.range(0, 100000000)
            .mapToObj(__ -> names.get(r.nextInt(names.size())))
            .collect(Collectors.toList());


        int batchSize = namesList.size() / 10;
        var finalCounts = Flux.fromIterable(namesList)
            // Split to batches
            .buffer(batchSize)
            // Aggregate intermediate counts asynchronously
            .flatMap(Main::processBatch)
            .reduce(new HashMap<>(), Main::mergeIntermediateCount)
            .flatMapIterable(HashMap::entrySet);
        String mostFrequentName = MathFlux.max(finalCounts, Map.Entry.comparingByValue())
            .map(Map.Entry::getKey)
            .block();


        System.out.printf("The most frequent name is %s%n", mostFrequentName);
    }

    private static HashMap<String, Long> mergeIntermediateCount(HashMap<String, Long> acc,
                                                                Map<String, Long> intermediateResult) {
        intermediateResult.forEach((name, intermediateCount) -> acc.merge(name, intermediateCount,
            Long::sum));
        return acc;
    }

    private static Mono<Map<String, Long>> processBatch(List<String> batch) {
        System.out.printf("[%s][%s] Subscribing to batch processing \n", LocalDateTime.now(),
            Thread.currentThread().getName());
        return Flux.fromIterable(batch)
            .groupBy(Function.identity())
            .flatMap(group -> group.count().map(count -> Tuples.of(group.key(), count)))
            .collectMap(Tuple2::getT1, Tuple2::getT2)
            .doOnSubscribe(__ -> System.out.printf("[%s][%s] Processing batch... \n", LocalDateTime.now(), Thread.currentThread().getName()))
            .subscribeOn(Schedulers.boundedElastic());
    }
}

// Output:
// ---
// [2022-09-29T16:17:07.199396810][main] Subscribing to batch processing 
// [2022-09-29T16:17:07.292379575][boundedElastic-1] Processing batch... 
// [2022-09-29T16:17:08.047585945][main] Subscribing to batch processing 
// [2022-09-29T16:17:08.061301797][boundedElastic-2] Processing batch... 
// [2022-09-29T16:17:09.287728886][main] Subscribing to batch processing 
// [2022-09-29T16:17:09.305248432][boundedElastic-3] Processing batch... 
// [2022-09-29T16:17:10.202591054][main] Subscribing to batch processing 
// [2022-09-29T16:17:10.203799927][boundedElastic-1] Processing batch... 
// [2022-09-29T16:17:11.066984735][main] Subscribing to batch processing 
// [2022-09-29T16:17:11.067669551][boundedElastic-2] Processing batch... 
// [2022-09-29T16:17:11.385716328][main] Subscribing to batch processing 
// [2022-09-29T16:17:11.386001934][boundedElastic-3] Processing batch... 
// [2022-09-29T16:17:11.678548510][main] Subscribing to batch processing 
// [2022-09-29T16:17:11.678978961][boundedElastic-1] Processing batch... 
// [2022-09-29T16:17:11.962963502][main] Subscribing to batch processing 
// [2022-09-29T16:17:11.963282064][boundedElastic-3] Processing batch... 
// [2022-09-29T16:17:12.263382545][main] Subscribing to batch processing 
// [2022-09-29T16:17:12.263699716][boundedElastic-1] Processing batch... 
// [2022-09-29T16:17:13.662411349][main] Subscribing to batch processing 
// [2022-09-29T16:17:13.662926817][boundedElastic-3] Processing batch... 
// The most frequent name is Joe

As expected, the outer Flux chain is executed by the main thread, since .subscribeOn() was not called on it. As a result, the lambda passed to .flatMap() and the subsequent subscription to the Flux in the processBatch is executed by the main thread. At the same time, subscribeOn(Schedulers.boundedElastic()) was called on the inner Flux, so its statements are executed on the threads provided by the Schedulers.boundedElastic(). Essentially, in this code the main thread prepares the batches and offloads their processing to another thread, what can be confirmed by the thread names and timestamps.

In order to achieve optimal performance for this code developer may tweak the batchSize parameter to increase batches count, as well as add additional subscribeOn() to the processBatch():

.flatMap(group -> group.count().map(count -> Tuples.of(group.key(), count)).subscribeOn(Schedulers.boundedElastic()))

Without the .subscribeOn() all code will be executed in the main thread, as expected:

public class Main {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // Generate names
        Random r = new Random();
        var names = List.of("Joe", "Monica", "Chandler", "Phoebe", "Rachel", "Ross", "Janice");
        var namesList = IntStream.range(0, 100000000)
            .mapToObj(__ -> names.get(r.nextInt(names.size())))
            .collect(Collectors.toList());


        int batchSize = namesList.size() / 10;
        var finalCounts = Flux.fromIterable(namesList)
            // Split to batches
            .buffer(batchSize)
            // Aggregate intermediate counts asynchronously
            .flatMap(Main::processBatch)
            .reduce(new HashMap<>(), Main::mergeIntermediateCount)
            .flatMapIterable(HashMap::entrySet);
        String mostFrequentName = MathFlux.max(finalCounts, Map.Entry.comparingByValue())
            .map(Map.Entry::getKey)
            .block();


        System.out.printf("The most frequent name is %s%n", mostFrequentName);
    }

    private static HashMap<String, Long> mergeIntermediateCount(HashMap<String, Long> acc,
                                                                Map<String, Long> intermediateResult) {
        intermediateResult.forEach((name, intermediateCount) -> acc.merge(name, intermediateCount,
            Long::sum));
        return acc;
    }

    private static Mono<Map<String, Long>> processBatch(List<String> batch) {
        System.out.printf("[%s][%s] Subscribing to batch processing \n", LocalDateTime.now(),
            Thread.currentThread().getName());
        return Flux.fromIterable(batch)
            .groupBy(Function.identity())
            .flatMap(group -> group.count().map(count -> Tuples.of(group.key(), count)))
            .collectMap(Tuple2::getT1, Tuple2::getT2)
            .doOnSubscribe(__ -> System.out.printf("[%s][%s] Processing batch... \n", LocalDateTime.now(), Thread.currentThread().getName()));
    }
}

// Output:
// ---
// [2022-09-29T16:20:26.834341489][main] Subscribing to batch processing 
// [2022-09-29T16:20:26.858483034][main] Processing batch... 
// [2022-09-29T16:20:28.732505692][main] Subscribing to batch processing 
// [2022-09-29T16:20:28.733251231][main] Processing batch... 
// [2022-09-29T16:20:30.245662536][main] Subscribing to batch processing 
// [2022-09-29T16:20:30.246063404][main] Processing batch... 
// [2022-09-29T16:20:30.791314849][main] Subscribing to batch processing 
// [2022-09-29T16:20:30.791522434][main] Processing batch... 
// [2022-09-29T16:20:31.367503970][main] Subscribing to batch processing 
// [2022-09-29T16:20:31.367729165][main] Processing batch... 
// [2022-09-29T16:20:31.998805328][main] Subscribing to batch processing 
// [2022-09-29T16:20:31.999009391][main] Processing batch... 
// [2022-09-29T16:20:32.593334820][main] Subscribing to batch processing 
// [2022-09-29T16:20:32.593585871][main] Processing batch... 
// [2022-09-29T16:20:33.186949718][main] Subscribing to batch processing 
// [2022-09-29T16:20:33.187191706][main] Processing batch... 
// [2022-09-29T16:20:36.217136910][main] Subscribing to batch processing 
// [2022-09-29T16:20:36.217389675][main] Processing batch... 
// [2022-09-29T16:20:36.833071426][main] Subscribing to batch processing 
// [2022-09-29T16:20:36.833353321][main] Processing batch... 
// The most frequent name is Monica

4. Conclusion

As was demonstrated, the Reactive API has the following traits that differentiate it from the CompletableFuture API:

  • Execution of Reactive streams is deferred until subscription (more on that later)
  • Reactive streams are built with the back-pressure in mind
  • The execution context of the code can be switched without a modification of the code, e.g. the code is concurrency-agnostic

And, specific to the Project Reactor: there is a lot of functionality in a form of reactive operators that simplify the implementation of multithreading problems greatly. This vast and mature API will be the topic covered in the next post.

Ivan Vyazmitinov

Tech Lead at Vizor Games, primarily focused on data-warehousing and JVM technologies, excited about the Open-Source community, and willing to share knowledge and experience.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button