Java

Reactive’s Looming Doom. Part IV: The End Of Reactive?

1. Introduction

The last post discussed advanced concepts of the reactive approach and highlighted use cases where it shines. This post will talk about the price that comes with the reactive approach and, finally, will the Project Loom make the reactive approach obsolete on the JVM platform?

2. Cons

While providing a developer with great possibilities for processing an asynchronous stream of data, the reactive approach has its caveats, and, most importantly, it is inherently unnatural for the Java language. Let’s discuss those claims in more detail.

2.1 Caveats

While Reactor hides blunt imperative code, with queues and semaphores, behind a nice declarative facade, the code itself doesn’t go away and continues to affect an application. So, in real practice, every Reactor user faces a need to gain at least a minimal understanding of the implementation details for the operators in use.

A great example of such a caveat can be demonstrated using the code from examples shown in previous posts. As an attentive reader may recall, the asynchronous processing of batches was achieved by a .buffer() and .flatMap() operators:

var finalCounts = Flux.fromIterable(namesList)
    // Split to batches
    .buffer(batchSize)
    // Aggregate intermediate counts asynchronously
    .flatMap(Main::processBatch)

Following the Reactive Spec, .flatMap() requests some elements from the .buffer(). But how much is “some” exactly? Well, the default value is whooping 256 elements, which will result in 256 batches requested, quickly overflowing memory with data. Consider the following example:

public class BufferCaveatExampleMain {
    public static void main(String[] args) {
        Flux.range(1, 100)
            .buffer(10)
            .log()
            .flatMap(BufferCaveatExampleMain::processBatch)
            .blockLast();
    }

    private static Mono<Integer> processBatch(List<Integer> integers) {
        return MathFlux.max(Flux.fromIterable(integers))
            .subscribeOn(Schedulers.boundedElastic())
            .delaySubscription(Duration.ofMinutes(1));
    }
}

// Output
// > Task :BufferCaveatExampleMain.main()
//     [ INFO] (main) onSubscribe(FluxBuffer.BufferExactSubscriber)
//     [ INFO] (main) request(256)
//     [ INFO] (main) onNext([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
//     [ INFO] (main) onNext([11, 12, 13, 14, 15, 16, 17, 18, 19, 20])
//     [ INFO] (main) onNext([21, 22, 23, 24, 25, 26, 27, 28, 29, 30])
//     [ INFO] (main) onNext([31, 32, 33, 34, 35, 36, 37, 38, 39, 40])
//     [ INFO] (main) onNext([41, 42, 43, 44, 45, 46, 47, 48, 49, 50])
//     [ INFO] (main) onNext([51, 52, 53, 54, 55, 56, 57, 58, 59, 60])
//     [ INFO] (main) onNext([61, 62, 63, 64, 65, 66, 67, 68, 69, 70])
//     [ INFO] (main) onNext([71, 72, 73, 74, 75, 76, 77, 78, 79, 80])
//     [ INFO] (main) onNext([81, 82, 83, 84, 85, 86, 87, 88, 89, 90])
//     [ INFO] (main) onNext([91, 92, 93, 94, 95, 96, 97, 98, 99, 100])
//     [ INFO] (main) onComplete()

Here the processing of a batch is delayed for a minute, and, as can be noted, the number of requested batches is 256: [ INFO] (main) request(256), and they are provided immediately.

Why does it work this way? The answer lies in the full signature of the .flatMap():

<V> Flux<V>	flatMap(Function<? super T,? extends Publisher<? extends V>> mapper, int concurrency, int prefetch)

Parameters:
mapper - the Function to transform input sequence into N sequences Publisher
concurrency - the maximum number of in-flight inner sequences
prefetch - the maximum in-flight elements from each inner Publisher sequence

While the intended usage of the concurrency parameter is to control how many inner publishers the .flatMap() is subscribed to, it also controls the size of the queue for elements from the upstream maintained by the .flatMap(), and, consequently, the amount of the element requested. When not defined explicitly, it is equal to reactor.util.concurrent.Queues#SMALL_BUFFER_SIZE that defined as follows:

public static final int SMALL_BUFFER_SIZE = Math.max(16, Integer.parseInt(System.getProperty("reactor.bufferSize.small", "256")));

Hence the request for 256 elements from the upstream. It should be noted that this is a sane default for most cases when dealing with a Flux of small objects, but when requested objects are large, the .flatMap() doesn’t have the capacity to process a lot of them concurrently. Thus, following the reactive philosophy, the .flatMap() should reduce its demand.

Another issue, similar to the one described, manifests itself with the .publishOn() operator: in order to switch a scheduler for the downstream, it queues elements from the upstream internally. Similar to the .flatMap(), by default, it requests Queues#SMALL_BUFFER_SIZE elements and can be configured via an overloaded version of the operator:

final var prefetch = 2;
Flux.range(1, 100_000)
    .publishOn(Schedulers.boundedElastic(), prefetch)
    .subscribe();

The next nuance hides at the plain side and can become quite an unpleasant surprise for those unaware: operator .groupBy(), also used in the examples, is prone to hanging. According to the docs:

Notably when the criteria produces a large amount of groups, it can lead to hanging if the groups are not suitably consumed downstream (eg. due to a flatMap with a maxConcurrency parameter that is set too low).

In order to reproduce it, let’s generate 1000 synthetic groups and try to process them via .flatMap() with concurrency = 2:

Flux.range(1, 1000)
    .groupBy(Function.identity())
    .flatMap(integerIntegerGroupedFlux -> {
        System.out.println("Got " + integerIntegerGroupedFlux.key());
        return integerIntegerGroupedFlux.subscribeOn(Schedulers.boundedElastic()) ;
    }, 2)
    .subscribeOn(Schedulers.boundedElastic())
    .blockLast();
    
// Output
// > Task :GroupByHangExample.main()
// Got 1
// Got 2

The code hangs indefinitely after processing the first two groups. Cranking the concurrency up to 800 fixes the issue.

This is only a glimpse of the issues a developer could face when using the Project Reactor, so the purpose of this section is not to provide a guide for all caveats but to demonstrate that Project Reactor is not a silver bullet and should be handled with care. But, while this can be said about any library or framework, not every technology changes the way to write code as drastically as reactive libraries.

2.2 Fitting in

2.2.1 Paradigm change

The Java language is an imperative one and always has been, even after the adoption of some elements of functional programming. That’s why the monadic style of code is accepted reluctantly by most Java developers, but that is only a part of the problem in case of the reactive approach. While the Streams API, for example, serves an auxiliary role, the reactive API requires to completely shift a mindset of a developer. An example of such shifts: to control a flow of code in reactive code, a developer is forced to employ monadic states of a Flux and branch the logic via switchIfEmpty operators instead of a familiar imperative if statement. Since the monad handles the code flow, it has no other choice but to reinvent the control statements as part of its API.

2.2.2 Color functions

But problems don’t end there: reactive code is susceptible to a problem which is called a color functions. Essentially, the issue is that there is no convenient way to switch between reactive and non-reactive code. When facing a need to bridge them, a developer has to either employ blocking .block*() operators in order to wait for the reactive flow completion or convert the code completely into reactive. It is most notable in legacy projects, where, for some reason, parts of a system are being (re)written in reactive style: usually, the reactive code spreads throughout the system similar to cancer, being bridged with existing modules via some sort of blocking facade.

2.2.3 Debugging

Another notorious issue of reactive code on JVM is its poor debugging experience. With some adjustments that will be described later, reactive code is not convenient to debug using a standard IDE debugger with the “step” functionality since all the code is wrapped in lambdas, but, most importantly, the stack traces of exceptions are de facto lost. Consider the following example:

public class LostStacktraceExampleMain {
    public static void main(String[] args) throws InterruptedException {
        doLogic();
    }

    private static void doLogic() throws InterruptedException {
        Mono.just(1)
            .flatMap(LostStacktraceExampleMain::preProcessReactive)
            .subscribeOn(Schedulers.boundedElastic())
            .subscribe();

        TimeUnit.SECONDS.sleep(5);
    }

    private static int process(int i) {
        return i / 0;
    }

    private static Mono<Integer> preProcessReactive(int i) {
        return Mono.fromCallable(() -> process(i));
    }

}

// Output
// [ERROR] (boundedElastic-1) Operator called default onErrorDropped - reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.ArithmeticException: / by zero
//     reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.ArithmeticException: / by zero
//     Caused by: java.lang.ArithmeticException: / by zero
//     at javacodegeeks.reactor.examples.main/org.javacodegeeks.examples.v4.LostStacktraceExampleMain.process(LostStacktraceExampleMain.java:25)
//     at javacodegeeks.reactor.examples.main/org.javacodegeeks.examples.v4.LostStacktraceExampleMain.lambda$preProcessReactive$0(LostStacktraceExampleMain.java:29)
//     at reactor.core@3.5.2/reactor.core.publisher.MonoCallable.call(MonoCallable.java:72)
//     at reactor.core@3.5.2/reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:174)
//     at reactor.core@3.5.2/reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53)
//     at reactor.core@3.5.2/reactor.core.publisher.Mono.subscribe(Mono.java:4429)
//     at reactor.core@3.5.2/reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.run(MonoSubscribeOn.java:126)
//     at reactor.core@3.5.2/reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
//     at reactor.core@3.5.2/reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
//     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
//     at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
//     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
//     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
//     at java.base/java.lang.Thread.run(Thread.java:1589)

There is a lot going on, but what is most important to note is the lack of LostStacktraceExampleMain#doLogic() call in the stack trace. This happens because code in the doLogic schedules execution on a different thread. And since the new thread doesn’t know anything about his “parent”, the stack trace of the exception starts from Thread#run of the boundedElastic-1 thread. As a reader can imagine, in a system with deep stack traces preserving only the last call in case of exception makes debugging drastically more difficult.

2.2.4 Pain relief

While nothing can be done with the paradigm shift of the reactive approach, the Project Reactor provides a solution to preserve stack traces via a special debug mode, for development, or a dedicated java agent, for production. This changes a stack trace of the last example in the following way:

[ERROR] (boundedElastic-1) Operator called default onErrorDropped - reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.ArithmeticException: / by zero
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.ArithmeticException: / by zero
Caused by: java.lang.ArithmeticException: / by zero
	at javacodegeeks.reactor.examples.main/org.javacodegeeks.examples.v4.LostStacktraceExampleMain.process(LostStacktraceExampleMain.java:25)
	Suppressed: The stacktrace has been enhanced by Reactor, refer to additional information below: 
Assembly trace from producer [reactor.core.publisher.MonoFlatMap] :
	reactor.core@3.5.2/reactor.core.publisher.Mono.flatMap(Mono.java:3080)
	javacodegeeks.reactor.examples.main/org.javacodegeeks.examples.v4.LostStacktraceExampleMain.doLogic(LostStacktraceExampleMain.java:17)
Error has been observed at the following site(s):
	*__ ⇢ reactor.core@3.5.2/reactor.core.publisher.Mono.flatMap(Mono.java:3080)
	|_  ⇢ reactor.core@3.5.2/reactor.core.publisher.Mono.subscribeOn(Mono.java:4497)
Original Stack Trace:
		at javacodegeeks.reactor.examples.main/org.javacodegeeks.examples.v4.LostStacktraceExampleMain.process(LostStacktraceExampleMain.java:25)
		at javacodegeeks.reactor.examples.main/org.javacodegeeks.examples.v4.LostStacktraceExampleMain.lambda$preProcessReactive$0(LostStacktraceExampleMain.java:29)
		at reactor.core@3.5.2/reactor.core.publisher.MonoCallable.call(MonoCallable.java:72)
		at reactor.core@3.5.2/reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:174)
		at reactor.core@3.5.2/reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53)
		at reactor.core@3.5.2/reactor.core.publisher.Mono.subscribe(Mono.java:4429)
		at reactor.core@3.5.2/reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.run(MonoSubscribeOn.java:126)
		at reactor.core@3.5.2/reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
		at reactor.core@3.5.2/reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
		at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
		at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
		at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
		at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
		at java.base/java.lang.Thread.run(Thread.java:1589)    

While it is much harder to read than a traditional stack trace (here is how), it provides the necessary stack trace information necessary for troubleshooting. Moreover, some IDEs even capable of using this information for debugging to show async stack trace on UI.

3. The Looming Doom

3.1 Definite doom

With the pros and cons of the reactive approach discussed, it should be clear now why Java architects decided to improve the existing multithreading API instead of integrating the reactive approach into the platform: imperative code is much more familiar for the majority of developers and natural for the JVM platform itself. But what exactly does Loom do in regard to the reactive approach? Well, it simply eliminates the need to offload IO to a dedicated thread pool: virtual threads are cheap to spawn and block, so there is no need to pool them and make a distinction between IO threads and “worker” threads. As a concrete example, here is the reactive version with offloading to the theoretical IO scheduler:

doBusinessLogic()
.flatMap(dto -> perist(dto).subscribeOn(IO))
.doOnNext(result -> processResult(result))
.subscribeOn(Schedulers.boundedElastic());

And here is the equivalent in terms of CPU utilization:

var dto = doBusinessLogic()
vat result = persist(dto)
procesResult(result)

As a reader can see, the last one is plain old java code in a form that has been on the platform since Java 1.0.

Such a drastic improvement is possible because, under the hood, Loom performs switching between threads, similar to the offloading: every virtual thread has a carrier platform thread, and when a virtual thread is parked for IO, the carrier thread simply switches to executing another virtual thread. When a virtual thread is unparked, it is then picked up by another platform thread. Unfortunately, at the moment, there is an issue of pinning that causes blocking operation to block a carrier thread as well, but work is in progress to eliminate this limitation.

3.2 Debatable doom

Essentially, the Project Loom made .flatMap() operator obsolete. But, as was demonstrated in previous series posts, the reactive approach and, specifically, Project Reactor is much more than that. So, the Loom itself is not a threat to the reactive approach, and can they can happily coexist.

However, what can be really intimidating for Reactor is the incubating Structured Concurrency API that undeservedly received much less attention. While Loom handles the “undercover” part, Structured Concurrency introduces a way of orchestrating parallel/asynchronous computations. This is precisely what Reactor does, but the Structured Concurrency allows to stay in an imperative paradigm without a need to wrap code into a monad. Knowing now how hard it is to adopt the reactive approach on JVM, it should be no surprise that the evolution of Java multithreading made such a turn, but there is a lot to be done for the Structured Concurrency to make it a worthy opponent of the Project Reactor. At the moment of writing,  the Structured Concurrency introduced only basic capabilities of asynchronous orchestration, like split/join functionality and error propagation. All other common functionality for multithreading tasks like back-pressure, batching, parallelism control, etc., fall on a developer’s shoulders, which was demonstrated in the first post of the series. A theoretical example of the API improvement may look like this:

public class IoLoomExampleMain {

    public static void main(String[] args) throws Exception {

        final var thread = Thread.ofVirtual().start(IoLoomExampleMain::start);
        thread.join();
    }

    private static void start() {
        var parallelization = 6;
        try (var resultScope = new CollectingScope(parallelization,
            ConcurrentHashMap<String, Long>::new,
            IoLoomExampleMain::handleComplete)) {
            int batchSize = 1000;
            try (final var lines = Files.lines(Paths.get("src/main/resources/names.txt"));
                 var executorService = Executors.newVirtualThreadPerTaskExecutor()) {
                Iterator<List<String>> batches = executorService.submit(new BatchProvider(batchSize, lines.iterator()));
                batches.forEachRemaining(batch -> resultScope.fork(prepareBatchProcessing(batch)));
            }
            resultScope.join();
            System.out.println("The most frequent name is " + resultScope.mostFrequentName());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static Callable<Map<String, Long>> prepareBatchProcessing(List<String> batch) {
        return () -> {
            Map<String, Long> localCounts = new ConcurrentHashMap<>();
            System.out.printf("[%s][virtual=%s] Processing batch... \n", LocalDateTime.now(), Thread.currentThread().isVirtual());
            for (String name : batch) {
                localCounts.compute(name, (n, c) -> c == null ? 1L : c + 1);
            }
            return localCounts;
        };
    }

    protected void handleComplete(Map<String, Long> result, Map<String, Long> intermediateResult) {
        for (var stringLongEntry : intermediateResult.entrySet()) {
            result.compute(stringLongEntry.getKey(), (n, c) -> {
                Long newCount = stringLongEntry.getValue();
                return c == null ? newCount : (Long) c + newCount;
            });
        }
        return result;
    } 
}

In this code:

  1. The number of parallel tasks is controlled by the StructuredTaskScope. The StructuredTaskScope#fork method will block if the scope is at max capacity
  2. Theoretically, there are predefined scopes implementations provided out of the box, including the CollectingScope, which acts similarly to Stream#collect
  3. The same goes for tasks that go to the ExecutorService, the BatchProvider in the example.
  4. In order to support backpressure, ExecutorService provides an Iterator instead of the Future. The code then will block at the StructuredTaskScope#fork if the parallelization limit is hit, requesting the next batch only when a free slot becomes available

This is just a theoretical example of how the challenges of multithreading programming may be addressed by the new API. The main point of this section is to demonstrate that for the design of the Structured Concurrent API the best practices and solutions from the reactive approach should be taken into account and carefully considered. This will guarantee the optimal developer experience and ease the transition for existing reactive users. Until that, the Project Reactor API remains superior to the Structured Concurrency API.

4. Personal experience

This little section represents the author’s personal experience with reactive libraries.

We are developing an OLAP application, so since our main business logic is the asynchronous processing of streams of data, Reactor became the most natural and obvious choice for ETL. Apart from that, we use it for tasks that require delays, event buses, etc.

But when there is no need for multithreading orchestration or IO offload, we use blocking code, despite it being less effective, because it is easier to write and maintain. So, as a general recommendation: don’t use the reactive approach unless you absolutely need it, especially when the Project Loom goes GA.

5. Conclusion

This series of posts demonstrated what is the reactive approach, its pros and cons, and how the Project Loom affects it on the JVM platform. While the Project Loom is an important milestone for JVM, there is still a lot of room for improvement and development, and until then, the reactive approach on JVM will continue to exist and provide its excellent API for multithreading.

6. Download the Source Code

Download
You can download the full source code of this example here: Reactive’s Looming Doom. Part IV: The End Of Reactive?

Ivan Vyazmitinov

Tech Lead at Vizor Games, primarily focused on data-warehousing and JVM technologies, excited about the Open-Source community, and willing to share knowledge and experience.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

2 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Gökhan
Gökhan
6 months ago

Is it possible to implement the reactive approach using Project Loom? In other words, can a reactive solution leverage virtual threads in the background to harness their benefits? 

Back to top button