Home » Java » Enterprise Java » Speed Up Services With Reactive API in Java EE 8

About Ondrej Mihalyi

Ondrej Mihalyi
Ondrej is a lecturer and consultant inventing and evangelizing new approaches with already proven Java tooling. As a Scrum Master and expert in Java EE ecosystem, he helps companies to build and educate their developer teams, improve their development processes and be flexible and successful in meeting client requirements.

Speed Up Services With Reactive API in Java EE 8

Services can often be optimized with asynchronous processing even without changing their behavior towards the outside world.

The reason why some services aren’t efficient is that they need to wait for other services to provide a result to continue further.

Let’s look how to call external REST services without waiting for them and also do multiple parallel calls independently and combine their results later with a reactive pipeline in Java EE 8.

If our service calls multiple microservices and waits for each call to finish and return results before doing another call, it’s a good candidate to refactor using reactive API. In order to make the service more efficient, it could do all the calls to external services in parallel if they don’t depend on each other. This would decrease the time spent waiting and thus speed up the microservice.

In order to call REST services in parallel, we’ll use the new reactive client API in JAX-RS. We’ll combine it with the RxJava library to combine their results when available. This combination will allow us to write clean and efficient code. And with an additional benefit that the current thread can be released for further processing while wating for results from remote calls.

We’ll build a pipeline which processes the results as they arrive and finally merges them into a single response. The first part of the pipeline will call each remote service. Instead of waiting for the results, we’ll specify what to do with each received result and continue with calling other services. Using the rx() method on the JAX-RS client request builder allows us to call a version of the get() method, which immediately returns instead of waiting for the result. In order to process results when they arrive, we can chain method handlers onto a CompletionStage returned from the rx version of the get() method:

CompletionStage stage = temperatureServiceTarget
  .request()
  .rx()
  .get(Temperature.class)
  .thenApply(temperature -> new Forecast(temperature));

The above code will call a temperature service and then register a lambda expression to process the resulting temperature when it arrives. This maps the temperature to a forecast object, which can be accessed with the stage variable later.

However, we want to use another variant of the get() method together with an RxJava Flowable Invoker from the Jersey project to get a Flowable from RxJava instead of a CompletionStage. The Flowable interface makes it easier to combine multiple asynchronous results with much simpler code than CompletionStage and also more efficiently.

With the following code, we will call an external service and return a Flowable:

Flowable flowable = temperatureServiceTarget
  .register(RxFlowableInvokerProvider.class)
  .request()
  .rx(RxFlowableInvoker.class)
  .get(Temperature.class)
  .map(temperature -> new Forecast(temperature);

We register additional RxFlowableInvokerProvider, which allows to request RxFlowableInvoker later. This invoker then gives us the Flowable return type from RxJava. These classes are not in the JAX-RS API and we must add them with the Jersey RxJava2 library:

<dependency>
  <groupId>org.glassfish.jersey.ext.rx</groupId>
  <artifactId>jersey-rx-client-rxjava2</artifactId>
  <version>2.26</version>
</dependency>

On the first sight it seems we made the code more complicated while doing the same thing. But a Flowable instance allows us to combine multiple calls easily:

Flowable.concat(flowable1, flowable2)
  .doOnNext(forecast -> {
    forecasts.add(forecast);
  })
  .doOnComplete(() -> {
    asyncResponse.resume(Response.ok(forecasts).build());
  })
  .doOnError(asyncResponse::resume)
  .subscribe();
}

For each forecast received from any flowable, we add it to a list of forecasts. Finally, we send the list of forecasts as a response or send an error response. The final call to subscribe() is necessary to register the listeners, otherwise they would be ignored.

You may have also noticed the asyncResponse variable used to send the final response or signal an error. This is a JAX-RS asynchronous response instance, which is used to complete a REST response at later time, when the data is available, without blocking the initial processing thread. Using the asynchronous response helps us save thread resources while waiting for results from external REST services. In order to turn on asynchronous processing in our REST endpoint, we will inject javax.ws.rs.container.AsyncResponse as the REST method argument, together with the @Suspended annotation. We will also change the return type to void because we’ll be building the response using the AsyncResponse instance:

@GET
@Produces(MediaType.APPLICATION_JSON)
public void getForecasts(@Suspended AsyncResponse asyncResponse) {
  ...here come some asynchronous calls to REST services...
  asyncResponse.resume(...)
}

Final code example

The following code will:

  • turn on asynchronous processing of REST requests in the getForecasts method
  • set 5 minute timeout on the asynchronous response
  • execute the temperature service twice, for London and Beijing, without waiting for results
  • combine the results into a sequence of forecasts
  • add every forecast in the sequence into a list
  • send the complete list when all results processed
  • send an error result in case of an exception
  • register the handlers with the subscribe  method
private Flowable getTemperature(String location) {
  return temperatureTarget
    .register(RxFlowableInvokerProvider.class)
    .resolveTemplate("city", location)
    .request()
    .rx(RxFlowableInvoker.class)
    .get(Temperature.class)
    .map(temperature -> new Forecast(location, temperature));
}
 
@GET
@Produces(MediaType.APPLICATION_JSON)
public void getForecasts(@Suspended AsyncResponse asyncResponse) {
  List forecasts = new ArrayList<>();
  asyncResponse.setTimeout(5, TimeUnit.MINUTES);
  Flowable.concat(getTemperature("London"), getTemperature("Beijing"))
    .doOnNext(forecast -> {
      forecasts.add(forecast);
    })
  .doOnComplete(() -> {
    asyncResponse.resume(Response.ok(forecasts).build());
  })
  .doOnError(asyncResponse::resume)
  .subscribe();
}
Published on Java Code Geeks with permission by Ondrej Mihalyi, partner at our JCG program. See the original article here: Speed Up Services With Reactive API in Java EE 8

Opinions expressed by Java Code Geeks contributors are their own.

(0 rating, 0 votes)
You need to be a registered member to rate this.
Start the discussion Views Tweet it!
Do you want to know how to develop your skillset to become a Java Rockstar?
Subscribe to our newsletter to start Rocking right now!
To get you started we give you our best selling eBooks for FREE!
1. JPA Mini Book
2. JVM Troubleshooting Guide
3. JUnit Tutorial for Unit Testing
4. Java Annotations Tutorial
5. Java Interview Questions
6. Spring Interview Questions
7. Android UI Design
and many more ....
I agree to the Terms and Privacy Policy

Leave a Reply

avatar

This site uses Akismet to reduce spam. Learn how your comment data is processed.

  Subscribe  
Notify of