Java

Reactive’s Looming Doom. Part III: Advanced Reactive

1. Introduction

The last post presented the fundamental concepts of the reactive approach, including a specification and its implementation. Now it is time to show where the reactive approach shines, including those possibilities that are often overlooked.

 

2. Reactor docs

One of the great features of reactive libraries is the graphical descriptions of the operators. They are called marble diagrams and, in most cases, represent the source flux, the action the operator performs, and the resulting flux. For example, here is the .map() marble diagram. Such graphic representations allow developers to quickly grasp the basic principle of an operator, with a few exceptions.

3. Flux/Mono states

Before going further, it is necessary to clarify the basic functionality omitted in the previous post. As the reader can remember, there are three methods in the Subscriber interface:

public static interface Subscriber<T> {

    public void onSubscribe(Subscription subscription);

    public void onNext(T item);

    public void onError(Throwable throwable);

    public void onComplete();
}    

They represent callbacks for signals that a publisher may send downstream. Apart from the regular element requested by a subscriber, the publisher can signalize about an error that occurred while providing the elements, as well as about the completion (assuming that the publisher is finite). The Project Reactor offers several operators based on those signals. They vary from side effect ones, like doOnError, to behavioral, like switchIfEmpty and onErrorResume. This part of the API provides the ability to handle exceptional situations analogous to the try/catch block in the imperative programming model.

4. Operator .publishOn()

In order to perform a thread switching via .subscribeOn() a Publisher should be created to be subscribed. But what if we want to switch a thread for the operator that doesn’t create a nested publisher, like .map() or .buffer()? That’s where the .publishOn() comes into the picture: it allows to switch a thread for all operators downstream:

Flux.range(1, 10)
    .map(i -> {
        System.out.printf("[%s] Mapping %s\n", Thread.currentThread().getName(), i);
        return i * 2;
    })
    .publishOn(Schedulers.parallel())
    .filter(i -> {
        System.out.printf("[%s] Filtering %s\n", Thread.currentThread().getName(), i);
        return i >= 10;
    })
    .subscribeOn(Schedulers.boundedElastic())
    .subscribe();

// Output:
// ---
// [boundedElastic-1] Mapping 1
// [boundedElastic-1] Mapping 2
// [parallel-1] Filtering 2
// [parallel-1] Filtering 4
// [boundedElastic-1] Mapping 3
// [boundedElastic-1] Mapping 4
// [parallel-1] Filtering 6
// [boundedElastic-1] Mapping 5
// [parallel-1] Filtering 8
// [boundedElastic-1] Mapping 6
// [parallel-1] Filtering 10
// [parallel-1] Filtering 12
// [boundedElastic-1] Mapping 7
// [boundedElastic-1] Mapping 8
// [parallel-1] Filtering 14
// [boundedElastic-1] Mapping 9
// [boundedElastic-1] Mapping 10
// [parallel-1] Filtering 16
// [parallel-1] Filtering 18
// [parallel-1] Filtering 20

As can be seen, .map() is executed on the scheduler configured for the whole chain via .subscribeOn(), while for .filter() the scheduler was switched. It is very important to note that .subscribeOn() affects the entire chain, so its placement doesn’t matter while .publishOn() affects only the downstream.

5. Best use cases

With the most important aspects covered, we are ready to dive into practice. The Project Reactor API is designed to process a flux of data in a functional style asynchronously. Thus, naturally, it is best suited for tasks that involve a flux (or stream) of data and require asynchronous processing, such as described further.

5.1 Streaming IO

IO is the most common goal when the reactive approach is considered: the basic idea is to offload the blocking IO to another thread pool while keeping “worker threads” always busy with actual processing. This approach works especially well when IO is performed on a stream of data, like a file or result of a database, and with some sort of reactive driver plugged in. A perfect example would be the r2dbc driver combined with ORM that supports reactive, like jOOQ.

Let’s demonstrate it on the familiar problem of parallel names counting from previous posts. To do this, let’s move the names into a file and offload reading to the dedicated thread pool:

public class IoReactiveExampleMain {

    private static final Scheduler IO = Schedulers.newParallel("IO");

    public static void main(String[] args) throws Exception {

        int batchSize = 1000;
        var finalCounts = Flux.fromStream(Files.lines(Paths.get("src/main/resources/names.txt")))
            // Split to batches
            .buffer(batchSize)
            .subscribeOn(IO)
            .doOnNext(__ -> System.out.printf("[%s] batch is provided\n", Thread.currentThread().getName()))
            // Aggregate intermediate counts asynchronously
            .flatMap(IoReactiveExampleMain::processBatch)
            .reduce(new HashMap<>(), IoReactiveExampleMain::mergeIntermediateCount)
            .flatMapIterable(HashMap::entrySet);
        String mostFrequentName = MathFlux.max(finalCounts, Map.Entry.comparingByValue())
            .map(Map.Entry::getKey)
            .block();


        System.out.printf("The most frequent name is %s\n", mostFrequentName);
    }

    private static HashMap<String, Long> mergeIntermediateCount(HashMap<String, Long> acc,
                                                                Map<String, Long> intermediateResult) {
        intermediateResult.forEach((name, intermediateCount) -> acc.merge(name, intermediateCount,
            Long::sum));
        return acc;
    }

    private static Mono<Map<String, Long>> processBatch(List<String> batch) {

        return Flux.fromIterable(batch)
            .groupBy(Function.identity())
            .flatMap(group -> group.count().map(count -> Tuples.of(group.key(), count)))
            .collectMap(Tuple2::getT1, Tuple2::getT2)
            .doOnSubscribe(__ -> System.out.printf("[%s] Processing batch... \n", Thread.currentThread().getName()))
            .subscribeOn(Schedulers.boundedElastic());
    }
}    

Notice how seamlessly it is integrated with the  Stream API: a publisher created via Flux#fromStream() provides new names from the file lazily, only when requested via the .flatMap(). This guarantees that memory won’t be overflowed if batch processing is slower than file reading. Also, the Flux#fromStream() closes the underlying stream when it is finished, which is necessary when using the Files#lines().

Now, let’s implement the same via the Loom API:

public class IoLoomExampleMain {

    public static void main(String[] args) throws Exception {

        final var thread = Thread.ofVirtual().start(IoLoomExampleMain::start);
        thread.join();
    }

    private static void start() {
        try (var batchScope = new BatchScope()) {
            int batchSize = 1000;
            try (var lines = Files.lines(Paths.get("src/main/resources/names.txt"))) {
                final var iterator = lines.iterator();
                try (var fileReadingScope = new StructuredTaskScope.ShutdownOnFailure()) {
                    while (iterator.hasNext()) {
                        final var batchFuture = fileReadingScope.fork(() -> prepareBatch(batchSize, iterator));
                        fileReadingScope.join();
                        batchScope.fork(prepareBatchProcessing(batchFuture.resultNow()));
                    }
                }

            }
            batchScope.join();
            System.out.println("The most frequent name is " + batchScope.mostFrequentName());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static ArrayList<String> prepareBatch(int batchSize, Iterator<String> iterator) {
        System.out.printf("[%s][virtual=%s] Preparing batch... \n", LocalDateTime.now(), Thread.currentThread().isVirtual());
        ArrayList<String> batch = new ArrayList<>(batchSize);
        while (iterator.hasNext() && batch.size() < batchSize) {
            batch.add(iterator.next());
        }
        return batch;
    }

    private static Callable<Map<String, Long>> prepareBatchProcessing(List<String> batch) {
        return () -> {
            Map<String, Long> localCounts = new ConcurrentHashMap<>();
            System.out.printf("[%s][virtual=%s] Processing batch... \n", LocalDateTime.now(), Thread.currentThread().isVirtual());
            for (String name : batch) {
                localCounts.compute(name, (n, c) -> c == null ? 1L : c + 1);
            }
            return localCounts;
        };
    }

    private static class BatchScope extends StructuredTaskScope<Map<String, Long>> {

        private final ConcurrentHashMap<String, Long> result = new ConcurrentHashMap<>();

        @Override
        protected void handleComplete(Future<Map<String, Long>> future) {
            final var intermediateResult = future.resultNow();
            for (var stringLongEntry : intermediateResult.entrySet()) {
                result.compute(stringLongEntry.getKey(), (n, c) -> updateCount(stringLongEntry.getValue(), c));
            }
        }

        private long updateCount(Long newCount, Long existingCount) {
            return existingCount == null ? newCount : existingCount + newCount;
        }

        public String mostFrequentName() {
            return result.entrySet()
                .stream()
                .max(Map.Entry.comparingByValue())
                .get()
                .getKey();
        }
    }
}

While the Structured Concurrency API is in the incubating stage and best practices are not yet formed, that is, conceptually, the intended way to do concurrency in future versions of java.

While it may seem more familiar and straightforward, it requires developers to perform a lot of boilerplate imperative actions (like batch preparation, for this task). Also, since there is no way to signal the backpressure in the Structured Concurrency API, the provided solution will overflow memory when batch processing is slow. Developers would have to implement the signaling system themselves to avoid that.

5.2 Events

The subscription model of the reactive approach makes it very appropriate for tasks related to events, like various listeners and event buses. To achieve this, the Project Reactor provides a few ways to create a Flux programmatically via a Sink abstraction. Here is an example of creating a Flux that wraps the theoretical listener interface:

interface MyEventListener<T> {
    void onDataChunk(List<T> chunk);

    void processComplete();
}

Flux<String> bridge = Flux.create(sink -> {
    myEventProcessor.register(
        new MyEventListener<String>() {

            public void onDataChunk(List<String> chunk) {
                for (String s : chunk) {
                    sink.next(s);
                }
            }

            public void processComplete() {
                sink.complete();
            }
        });
});

This is described in more detail in the documentation. It should be noted that with this wrapper, it is now possible to subscribe to the bridge and use all available Reactor API to buffer/filter/transform/perform any necessary logic with incoming events.

The previous example described a wrapper for the external event source that intended to have a single subscription. With Reactor, it is extremely easy to elevate this to full-blown bus functionality with broadcasting support. This is done via the Sinks API based on the Processor interface from the Reactive Streams Specification. With the Sinks API implementation of a bus can be done as follows:

public class ReactorEventBusImpl<T> {
    private final Sinks.Many<T> sink = Sinks.many().replay().limit(5);

    private final Flux<T> eventFlux = sink.asFlux()
        .publishOn(Schedulers.newParallel("bus"));

    public void publish(T event) {
        sink.emitNext(event, Sinks.EmitFailureHandler.busyLooping(Duration.ofMinutes(1)));
    }

    public Flux<T> receive() {
        return eventFlux;
    }
}    

In this particular example, the bus allows many subscribers and replays up to 5 elements for new ones:

ReactorEventBusImpl<Integer> bus = new ReactorEventBusImpl<>();


final var firstSubscription = bus.receive()
    .subscribeOn(Schedulers.boundedElastic())
    .doOnNext(i1 -> System.out.printf("[%s][%s] got %s\n", 0, Thread.currentThread().getName(), i1))
    .subscribe();

    Thread.sleep(Duration.ofSeconds(1));

    System.out.println("Sending 1..10");
    IntStream.range(1, 11)
    .forEach(bus::publish);

    Thread.sleep(Duration.ofSeconds(1));

final var secondSubscription = bus.receive()
    .subscribeOn(Schedulers.boundedElastic())
    .doOnNext(i -> System.out.printf("[%s][%s] got %s\n", 1, Thread.currentThread().getName(), i))
    .subscribe();

    Thread.sleep(Duration.ofSeconds(1));

    System.out.println("Sending 11..20");
    IntStream.range(11, 21)
    .forEach(bus::publish);

// Output:
// ---
// Sending 1..10
// [0][bus-1] got 1
// [0][bus-1] got 2
// [0][bus-1] got 3
// [0][bus-1] got 4
// [0][bus-1] got 5
// [0][bus-1] got 6
// [0][bus-1] got 7
// [0][bus-1] got 8
// [0][bus-1] got 9
// [0][bus-1] got 10
// [1][bus-2] got 6
// [1][bus-2] got 7
// [1][bus-2] got 8
// [1][bus-2] got 9
// [1][bus-2] got 10
// Sending 11..20
// [0][bus-1] got 11
// [0][bus-1] got 12
// [0][bus-1] got 13
// [0][bus-1] got 14
// [0][bus-1] got 15
// [0][bus-1] got 16
// [0][bus-1] got 17
// [0][bus-1] got 18
// [0][bus-1] got 19
// [0][bus-1] got 20
// [1][bus-2] got 11
// [1][bus-2] got 12
// [1][bus-2] got 13
// [1][bus-2] got 14
// [1][bus-2] got 15
// [1][bus-2] got 16
// [1][bus-2] got 17
// [1][bus-2] got 18
// [1][bus-2] got 19
// [1][bus-2] got 20

The provided implementation can be easily reconfigured to a different behavior via the Sinks API.

The Project Loom may help with this use case either, but since it was not designed specifically for it, developers would have to implement the bus based on a concurrent Queue from scratch, including the management of multiple subscribers, etc. Because of that, it doesn’t make much sense to compare the Projects Loom and Reactor for this particular use case.

5.3 Resiliency

One (probably, unexpected) outcome of the subscription model is how easy it is to retry execution on an error: it is enough to simply resubscribe to a publisher! Because of this trait, the Project Reactor provides a rich retry facility out-of-box. The simplest form is a .retry() operator that simply resubscribes to the publisher in case of error indefinitely or configured number of times:

Flux.just(1, 2, 3, 4, 5, 6)
    .doOnNext(i -> {
        if (i != 3) {
            System.out.printf("[%s] Got %s\n", Thread.currentThread().getName(), i);
        } else {
            System.out.printf("[%s] Got illegal %s\n", Thread.currentThread().getName(), i);
            throw new IllegalStateException("Got illegal 3");
        }
    })
    .retry(2)
    .subscribeOn(Schedulers.boundedElastic())
    .blockLast();
    
// Output:
// ---
//  [boundedElastic-1] Got 1
//  [boundedElastic-1] Got 2
//  [boundedElastic-1] Got illegal 3
//  [boundedElastic-1] Got 1
//  [boundedElastic-1] Got 2
//  [boundedElastic-1] Got illegal 3
//  [boundedElastic-1] Got 1
//  [boundedElastic-1] Got 2
//  [boundedElastic-1] Got illegal 3
//  Exception in thread "main" java.lang.IllegalStateException: Got illegal 3
//  	at javacodegeeks.reactor.examples.main/org.javacodegeeks.examples.PresentTimeExampleMain.lambda$main$0(PresentTimeExampleMain.java:24)
//        ...    

For more advanced use cases, the Project Reactor provides the .retryWhen() operator and Retry class that works in conjunction. The Retry API allows fine configuration of retries. For example, with exponential backoff, with callbacks invoked before and after retry, should the error counter be reset on a successful retry attempt or not, and a lot more:

public class RetryWhenMain {

    public static final RetryBackoffSpec RETRY_SPEC =
        Retry.backoff(3, Duration.ofSeconds(2))
            .doBeforeRetry(retrySignal -> System.out.printf("[%s][%s] Error, before retry\n", now(), Thread.currentThread().getName()))
            .doAfterRetry(retrySignal -> System.out.printf("[%s][%s] Error, after retry\n", now(), Thread.currentThread().getName()))
            .scheduler(Schedulers.boundedElastic())
            .transientErrors(false);

    public static void main(String[] args) {
        Flux.just(1, 2, 3, 4, 5, 6)
            .doOnNext(i -> {
                if (i != 3) {
                    System.out.printf("[%s][%s] Got %s\n", now(), Thread.currentThread().getName(), i);
                } else {
                    System.out.printf("[%s][%s] Got illegal %s\n", now(), Thread.currentThread().getName(), i);
                    throw new IllegalStateException("Got illegal 3");
                }
            })
            .retryWhen(RETRY_SPEC)
            .subscribeOn(Schedulers.parallel())
            .blockLast();
    }
}

// Output:
// ---
// [2022-11-02T19:04:00.316492874][parallel-1] Got 1
// [2022-11-02T19:04:00.317204359][parallel-1] Got 2
// [2022-11-02T19:04:00.317347538][parallel-1] Got illegal 3
// [2022-11-02T19:04:00.322003149][parallel-1] Error, before retry
// [2022-11-02T19:04:03.176562622][boundedElastic-1] Error, after retry
// [2022-11-02T19:04:03.177192567][boundedElastic-1] Got 1
// [2022-11-02T19:04:03.177669363][boundedElastic-1] Got 2
// [2022-11-02T19:04:03.178068239][boundedElastic-1] Got illegal 3
// [2022-11-02T19:04:03.178973002][boundedElastic-1] Error, before retry
// [2022-11-02T19:04:07.523104743][boundedElastic-2] Error, after retry
// [2022-11-02T19:04:07.523776491][boundedElastic-2] Got 1
// [2022-11-02T19:04:07.524500027][boundedElastic-2] Got 2
// [2022-11-02T19:04:07.524941269][boundedElastic-2] Got illegal 3
// [2022-11-02T19:04:07.525736592][boundedElastic-2] Error, before retry
// [2022-11-02T19:04:15.969594038][boundedElastic-1] Error, after retry
// [2022-11-02T19:04:15.970121536][boundedElastic-1] Got 1
// [2022-11-02T19:04:15.970579856][boundedElastic-1] Got 2
// [2022-11-02T19:04:15.971036717][boundedElastic-1] Got illegal 3
// Exception in thread "main" reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 3/3
// ...
// 	Suppressed: java.lang.Exception: #block terminated with an error
// 		at reactor.core@3.4.23/reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
// 		at reactor.core@3.4.23/reactor.core.publisher.Flux.blockLast(Flux.java:2645)
// 		at javacodegeeks.reactor.examples.main/org.javacodegeeks.examples.v3.retry.RetryWhenMain.main(RetryWhenMain.java:33)
// Caused by: java.lang.IllegalStateException: Got illegal 3
// 	at javacodegeeks.reactor.examples.main/org.javacodegeeks.examples.v3.retry.RetryWhenMain.lambda$main$2(RetryWhenMain.java:28)
// ...    

Other use cases related to resiliency, like rate limiting, are provided by the resilience4j library, which is integrated with the Reactor very well.

5.4 Worth mentioning

Provided examples are just the tip of the iceberg which is the Project Reactor API. During its evolution, it accumulated operators for almost every imaginable task. To highlight a few often overlooked but extremely handy examples:

5.4.1 Operators .using*()

Intended to be used for closable resources. Analogous to the try-with-resources functionality.

5.4.2 Operators .swtich*()

Allow switching to another publisher on a defined condition: when the upstream is empty, or on the first element. This way an alternative flow may be easily provided.

5.4.3 Operators .expand*()

Operators that allow the recursive provision of elements. This can be a lifesaver for integration with paginated APIs, for example.

5.4.5 Augmenting operators

This set of operators allows to assign an index for elements of flux or to provide elapsed time from the moment of subscription for each element.

5.4.6 Observability operators

Those operators allow to log signals in a reactive chain or collect metrics using Micrometer.

5.4.7 Reactive context

In Java applications it is a common pattern to use ThreadLocals to store a state, but, since reactive code is executed across multiple asynchronous boundaries, they are not fit for this purpose. Instead, Reactor provides the Context API that allows assigning a state tied to the Flux, not thread.

5.4.8 Operators .transform*()

In case no operators provide the necessary functionality, it is possible to implement a custom operator and apply it to a Flux via .transformDeffered() operator. The resilience4j library is implemented using this approach.

5.4.9 Operators guide

In order to describe the provided functionality in a comprehensible way, the docs have a “Which operator do I need” section, describing almost every use case covered by API. Here developers can get a grasp of what is offered and whether is it suitable for the task at hand.

6. Conclusion

This blog post demonstrated use cases where the reactive approach suits better than the Loom and Structured Concurrency, at the moment of writing. It also highlighted some often overlooked functionality of the API. The only major part of API which was not mentioned is hot vs cold publishers, which readers are suggested to explore on their own, as well as the rest of the available functionality.

The next post will demonstrate the opposite: where it doesn’t make sense to employ the reactive approach in light of the Project Loom, as well as the downsides and caveats of it in general, and on the JVM platform, in particular.

7. Download the Source Code

Download
You can download the full source code of this example here: Reactive’s Looming Doom. Part III: Advanced Reactive

Ivan Vyazmitinov

Tech Lead at Vizor Games, primarily focused on data-warehousing and JVM technologies, excited about the Open-Source community, and willing to share knowledge and experience.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button