Home » Tag Archives: RxJava

Tag Archives: RxJava

Idiomatic concurrency: flatMap() vs. parallel() – RxJava FAQ

Simple, effective and safe concurrency was one of the design principles of RxJava. Yet, ironically, it’s probably one of the most misunderstood aspects of this library. Let’s take a simple example: imagine we have a bunch of UUIDs and for each one of them we must perform a set of tasks. The first problem is to perform I/O intensive operation per ...

Read More »

Detecting and testing stalled streams – RxJava FAQ

Imagine you have a stream that publishes events with unpredictable frequency. Sometimes you can expect dozens of messages per second, but occasionally no events can be seen for several seconds. This can be an issue if your stream is transmitted over web socket, SSE or any other network protocol. Silent period taking too long (stall) can be interpreted as network ...

Read More »

Fixed-rate vs. fixed-delay – RxJava FAQ

If you are using plain Java, since version 5 we have a handy scheduler class that allows running tasks at fixed rate or with fixed delay: import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10); Basically it supports two types of operations: scheduler.scheduleAtFixedRate(() -> doStuff(), 2, 1, SECONDS); scheduler.scheduleWithFixedDelay(() -> doStuff(), 2, 1, SECONDS); scheduleAtFixedRate() will make sure doStuff() is invoked precisely every second ...

Read More »

Streaming large JSON file with Jackson – RxJava FAQ

In the previous article, we learned how to parse excessively large XML files and turn them into RxJava streams. This time let’s look at a large JSON file. We will base our examples on tiny colors.json containing almost 150 records of such format: { "aliceblue": [240, 248, 255, 1], "antiquewhite": [250, 235, 215, 1], "aqua": [0, 255, 255, 1], "aquamarine": [127, 255, 212, 1], ...

Read More »

Loading files with backpressure – RxJava FAQ

Processing file as a stream turns out to be tremendously effective and convenient. Many people seem to forget that since Java 8 (3+ years!) we can very easily turn any file into a stream of lines: String filePath = "foobar.txt"; try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) { reader.lines() .filter(line -> !line.startsWith("#")) .map(String::toLowerCase) .flatMap(line -> Stream.of(line.split(" "))) .forEach(System.out::println); } reader.lines() returns ...

Read More »

flatMap() and the order of events – RxJava FAQ

As we already discovered, flatMap() does not preserve the order of original stream. Let’s illustrate this using the GeoNames API example from previous article: public interface GeoNames { Flowable<Long> populationOf(String city); } By requesting population of multiple cities using flatMap() we have no guarantee that they will arrive in order: Flowable<String> cities = Flowable.just("Warsaw", "Paris", "London", "Madrid"); cities .flatMap(geoNames::populationOf) .subscribe(response -> log.info("Population: {}", response)); The output is ...

Read More »

flatMap() vs. concatMap() vs. concatMapEager() – RxJava FAQ

java-interview-questions-answers

There are three, seamlessly similar operators in RxJava 2.x: flatMap(), concatMap() and concatMapEager(). All of them accept the same argument – a function from original stream’s individual item to a (sub-)stream of arbitrary type. In other words if you have a Flowable<T> you provide a function from T to Flowable<R> for arbitrary R type. After applying any of these operators you end up with Flowable<R>. So how are they different? Sample project First ...

Read More »

Eager subscription – RxJava FAQ

java-interview-questions-answers

While teaching and mentoring RxJava, as well as after authoring a book, I noticed some areas are especially problematic. I decided to publish a bunch of short tips that address most common pitfalls. This is the first part. Observables and Flowables are lazy by nature. This means no matter how heavy or long-running logic you place inside your Flowable, it will get evaluated ...

Read More »

Small scale stream processing kata. Part 2: RxJava 1.x/2.x

In part 1: thread pools we designed and implemented relatively simple system for processing events in real time. Make sure you read previous part as it contains some classes that we’ll reuse. Just in case here are the requirements: A system delivers around one thousand events per second. Each Event has at least two attributes: clientId – we expect up ...

Read More »