Home » Archives for Tomasz Nurkiewicz

Author Archives: Tomasz Nurkiewicz

Java EE developer, Scala enthusiast. Enjoying data analysis and visualization. Strongly believes in the power of testing and automation.

Monitoring and measuring reactive application with Dropwizard Metrics

java-interview-questions-answers

In the previous article we created a simple indexing code that hammers ElasticSearch with thousands of concurrent requests. The only way to monitor the performance of our system was an old-school logging statement: .window(Duration.ofSeconds(1)) .flatMap(Flux::count) .subscribe(winSize -> log.debug("Got {} responses in last second", winSize)); It’s fine, but on a production system, we’d rather have some centralized monitoring and charting solution for gathering ...

Read More »

Spring, Reactor and ElasticSearch: bechmarking with fake test data

spring-interview-questions-answers

In the previous article we created a simple adapter from ElasticSearch’s API to Reactor’s Mono, that looks like this: import reactor.core.publisher.Mono; private Mono indexDoc(Doc doc) { //... } Now we would like to run this method at controlled concurrency level, millions of times. Basically, we want to see how our indexing code behaves under load, benchmark it. Fake data with jFairy First, we ...

Read More »

Spring, Reactor and ElasticSearch: from callbacks to reactive streams

spring-interview-questions-answers

Spring 5 (and Boot 2, when it arrives in a couple of weeks) is a revolution. Not the “annotations over XML” or “Java classes over annotations” type of revolution. It’s truly a revolutionary framework that enables writing a brand new class of applications. Over the recent years, I became a little bit intimidated by this framework. “Spring Cloud being framework that ...

Read More »

Idiomatic concurrency: flatMap() vs. parallel() – RxJava FAQ

Simple, effective and safe concurrency was one of the design principles of RxJava. Yet, ironically, it’s probably one of the most misunderstood aspects of this library. Let’s take a simple example: imagine we have a bunch of UUIDs and for each one of them we must perform a set of tasks. The first problem is to perform I/O intensive operation per ...

Read More »

Detecting and testing stalled streams – RxJava FAQ

Imagine you have a stream that publishes events with unpredictable frequency. Sometimes you can expect dozens of messages per second, but occasionally no events can be seen for several seconds. This can be an issue if your stream is transmitted over web socket, SSE or any other network protocol. Silent period taking too long (stall) can be interpreted as network ...

Read More »

Fixed-rate vs. fixed-delay – RxJava FAQ

If you are using plain Java, since version 5 we have a handy scheduler class that allows running tasks at fixed rate or with fixed delay: import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10); Basically it supports two types of operations: scheduler.scheduleAtFixedRate(() -> doStuff(), 2, 1, SECONDS); scheduler.scheduleWithFixedDelay(() -> doStuff(), 2, 1, SECONDS); scheduleAtFixedRate() will make sure doStuff() is invoked precisely every second ...

Read More »

Streaming large JSON file with Jackson – RxJava FAQ

In the previous article, we learned how to parse excessively large XML files and turn them into RxJava streams. This time let’s look at a large JSON file. We will base our examples on tiny colors.json containing almost 150 records of such format: { "aliceblue": [240, 248, 255, 1], "antiquewhite": [250, 235, 215, 1], "aqua": [0, 255, 255, 1], "aquamarine": [127, 255, 212, 1], ...

Read More »

Loading files with backpressure – RxJava FAQ

Processing file as a stream turns out to be tremendously effective and convenient. Many people seem to forget that since Java 8 (3+ years!) we can very easily turn any file into a stream of lines: String filePath = "foobar.txt"; try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) { reader.lines() .filter(line -> !line.startsWith("#")) .map(String::toLowerCase) .flatMap(line -> Stream.of(line.split(" "))) .forEach(System.out::println); } reader.lines() returns ...

Read More »

Generating backpressure-aware streams with Flowable.generate() – RxJava FAQ

RxJava is missing a factory to create an infinite stream of natural numbers. Such a stream is useful e.g. when you want to assign unique sequence numbers to possibly infinite stream of events by zipping both of them: Flowable<Long> naturalNumbers = //??? Flowable<Event> someInfiniteEventStream = //... Flowable<Pair<Long, Event>> sequenced = Flowable.zip( naturalNumbers, someInfiniteEventStream, Pair::of ); Implementing naturalNumbers is surprisingly complex. In RxJava ...

Read More »