Home » Java » Enterprise Java » Executing Blocking calls on a Reactor based Application

About Emmanouil Gkatziouras

Emmanouil Gkatziouras
He is a versatile software engineer with experience in a wide variety of applications/services.He is enthusiastic about new projects, embracing new technologies, and getting to know people in the field of software.

Executing Blocking calls on a Reactor based Application

Project Reactor is a fully non-blocking foundation with back-pressure support included. Although most libraries out there support asynchronous methods thus assist on its usage, there are some cases where a library contains complex blocking methods without an asynchronous implementation. Calling this methods inside a reactor stream would have bad results. Instead we need to make those method to async ones or find if there is a workaround.

Provided you might be short on time and is not possible to contribute a patch to the tool used, or you cannot identify how to reverse engineer the blocking call and implement a non blocking version, then it makes sense to utilise some threads.

First let’s import the dependencies for our project

<dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.projectreactor</groupId>
                <artifactId>reactor-bom</artifactId>
                <version>2020.0.11</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-engine</artifactId>
            <version>5.8.1</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

Let’s start with out blocking service

public String get(String url) throws IOException {
        HttpURLConnection connection = (HttpsURLConnection) new URL(url).openConnection();
        connection.setRequestMethod("GET");
        connection.setDoOutput(true);
        try(InputStream inputStream = connection.getInputStream()) {
            return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
        }
    }

We used HttpsURLConnection since we know for sure that it is a blocking call. To do so we need a Scheduler. For the blocking calls we shall use the boundedElastic scheduler. A scheduler can also be created by an existing executor service.

So let’s transform this method to a non-blocking one.

package com.gkatzioura.blocking;

import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

public class BlockingAsyncService {

    private final BlockingService blockingService;

    public BlockingAsyncService(BlockingService blockingService) {
        this.blockingService = blockingService;
    }

    private Mono<String> get(String url) {
        return Mono.fromCallable(() -> blockingService.get(url))
                .subscribeOn(Schedulers.boundedElastic());
    }

}

What we can see is a Mono created from the callable method. A scheduler subscribes to this mono and thus will receive the event emitted, which shall be scheduled for execution.

Let’s have a test

package com.gkatzioura.blocking;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

class BlockingAsyncServiceTest {

    private BlockingAsyncService blockingAsyncService;

    @BeforeEach
    void setUp() {
        blockingAsyncService = new BlockingAsyncService(new BlockingService());
    }

    @Test
    void name() {
        StepVerifier.create(
                            Mono.just("https://www.google.com/")
                                .map(s -> blockingAsyncService.get(s))
                                .flatMap(s -> s)
                    )
                .consumeNextWith(s -> s.startsWith("<!doctype"))
                .verifyComplete();
    }
}

That’s it! Obviously the best thing to do is to find a way to make this blocking call into an async call and try to find a workaround using the async libraries out there. When it’s not feasible we can fallback on using Threads.

Published on Java Code Geeks with permission by Emmanouil Gkatziouras, partner at our JCG program. See the original article here: Executing Blocking calls on a Reactor based Application

Opinions expressed by Java Code Geeks contributors are their own.

(0 rating, 0 votes)
You need to be a registered member to rate this.
Start the discussion Views Tweet it!
Do you want to know how to develop your skillset to become a Java Rockstar?
Subscribe to our newsletter to start Rocking right now!
To get you started we give you our best selling eBooks for FREE!
1. JPA Mini Book
2. JVM Troubleshooting Guide
3. JUnit Tutorial for Unit Testing
4. Java Annotations Tutorial
5. Java Interview Questions
6. Spring Interview Questions
7. Android UI Design
and many more ....
I agree to the Terms and Privacy Policy
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments