Core Java

Small scale stream processing kata. Part 2: RxJava 1.x/2.x

In part 1: thread pools we designed and implemented relatively simple system for processing events in real time. Make sure you read previous part as it contains some classes that we’ll reuse. Just in case here are the requirements:

A system delivers around one thousand events per second. Each Event has at least two attributes:

  • clientId – we expect up to few events per second for one client
  • UUID – globally unique

Consuming one event takes about 10 milliseconds. Design a consumer of such stream that:

  1. allows processing events in real time
  2. events related to one client should be processed sequentially and in order, i.e. you can not parallelize events for the same clientId
  3. if duplicated UUID appeared within 10 seconds, drop it. Assume duplicates will not appear after 10 seconds

What we came up so far was a combination of thread pools and shared cache. This time we will implement the solution using RxJava. First of all I never revealed how EventStream is implemented, only giving the API:

interface EventStream {
 
    void consume(EventConsumer consumer);
 
}

In fact for manual testing I built a simple RxJava stream that behaves like the system from the requirements:

@Slf4j
class EventStream {
 
    void consume(EventConsumer consumer) {
        observe()
            .subscribe(
                consumer::consume,
                e -> log.error("Error emitting event", e)
        );
    }
 
    Observable<Event> observe() {
        return Observable
                .interval(1, TimeUnit.MILLISECONDS)
                .delay(x -> Observable.timer(RandomUtils.nextInt(0, 1_000), TimeUnit.MICROSECONDS))
                .map(x -> new Event(RandomUtils.nextInt(1_000, 1_100), UUID.randomUUID()))
                .flatMap(this::occasionallyDuplicate, 100)
                .observeOn(Schedulers.io());
    }
 
    private Observable<Event> occasionallyDuplicate(Event x) {
        final Observable<Event> event = Observable.just(x);
        if (Math.random() >= 0.01) {
            return event;
        }
        final Observable<Event> duplicated =
                event.delay(RandomUtils.nextInt(10, 5_000), TimeUnit.MILLISECONDS);
        return event.concatWith(duplicated);
    }
 
}

Understanding how this simulator works is not essential, but quite interesting. First we generate steady stream of Longvalues (0, 1, 2…) every millisecond (thousand events per second) using interval() operator. Then we delay each event by random amount of time between 0 and 1_000 microseconds with delay() operator. This way events will appears in less predictable moments in time, a bit more realistic situation. Finally we map (using, ekhem, map()operator) each Long value to a random Event with clientId somewhere between 1_000 and 1_100 (inclusive-exclusive).

The last bit is interesting. We would like to simulate occasional duplicates. In order to do so we map every event (usingflatMap()) to itself (in 99% of the cases). However in 1% of the cases we return this event twice, where the second occurrence happens between 10 milliseconds and 5 seconds later. In practice the duplicated instance of the event will appear after hundreds of other events, which makes the stream behave really realistically.

There are two ways to interact with the EventStream – callback based via consume() and stream based viaobserve(). We can take advantage of Observable<Event> to quickly build processing pipeline very similar in functionality to part 1 but much simpler.

Missing backpressure

The first naive approach to take advantage of RxJava falls short very quickly:

EventStream es = new EventStream();
EventConsumer clientProjection = new ClientProjection(
        new ProjectionMetrics(
                new MetricRegistry()));
 
es.observe()
        .subscribe(
                clientProjection::consume,
                e -> log.error("Fatal error", e)
        );

(ClientProjection, ProjectionMetrics et. al. come from part 1). We get MissingBackpressureException almost instantaneously and that was expected. Remember how our first solution was lagging by handling events with more and more latency? RxJava tries to avoid that, as well as avoiding overflow of queues.MissingBackpressureException is thrown because consumer (ClientProjection) is incapable of handling events in real time. This is fail-fast behavior. The quickest solution is to move consumption to a separate thread pool, just like before, but using RxJava’s facilities:

EventStream es = new EventStream();
EventConsumer clientProjection = new FailOnConcurrentModification(
        new ClientProjection(
                new ProjectionMetrics(
                        new MetricRegistry())));
 
es.observe()
        .flatMap(e -> clientProjection.consume(e, Schedulers.io()))
        .window(1, TimeUnit.SECONDS)
        .flatMap(Observable::count)
        .subscribe(
                c -> log.info("Processed {} events/s", c),
                e -> log.error("Fatal error", e)
        );

EventConsumer interface has a helper method that can consume events asynchronously on a supplied Scheduler:

@FunctionalInterface
interface EventConsumer {
    Event consume(Event event);
 
    default Observable<Event> consume(Event event, Scheduler scheduler) {
        return Observable
                .fromCallable(() -> this.consume(event))
                .subscribeOn(scheduler);
    }
 
}

By consuming events using flatMap() in a separate Scheduler.io() each consumption is invoked asynchronously. This time events are processed near real-time, but there is a bigger problem. I decorated ClientProjection withFailOnConcurrentModification for a reason. Events are consumed independently from each other so it may happen that two events for the same clientId are processed concurrently. Not good. Luckily in RxJava solving this problem is much easier than with plain threads:

es.observe()
        .groupBy(Event::getClientId)
        .flatMap(byClient -> byClient
                .observeOn(Schedulers.io())
                .map(clientProjection::consume))
        .window(1, TimeUnit.SECONDS)
        .flatMap(Observable::count)
        .subscribe(
                c -> log.info("Processed {} events/s", c),
                e -> log.error("Fatal error", e)
        );

A little bit has changed. First of all we group events by clientId. This splits single Observable stream into stream of streams. Each substream named byClient represents all events related to the same clientId. Now if we map over this substream we can be sure that events related to the same clientId are never processed concurrently. The outer stream is lazy so we must subscribe to it. Rather than subscribing to every event separately we collect events every second and count them. This way we receive a single event of type Integer every second representing the number of events consumed per second.

Impure, non-idiomatic, error-prone, unsafe solution of deduplication using global state

Now we must drop duplicate UUIDs. The simplest, yet very foolish way of discarding duplicates is by taking advantage of global state. We can simply filter out duplicates by looking them up in cache available outside of filter() operator:

final Cache<UUID, UUID> seenUuids = CacheBuilder.newBuilder()
        .expireAfterWrite(10, TimeUnit.SECONDS)
        .build();
 
es.observe()
        .filter(e -> seenUuids.getIfPresent(e.getUuid()) == null)
        .doOnNext(e -> seenUuids.put(e.getUuid(), e.getUuid()))
        .subscribe(
                clientProjection::consume,
                e -> log.error("Fatal error", e)
        );

If you want to monitor the usage of this mechanism simply add metric:

Meter duplicates = metricRegistry.meter("duplicates");
 
es.observe()
        .filter(e -> {
            if (seenUuids.getIfPresent(e.getUuid()) != null) {
                duplicates.mark();
                return false;
            } else {
                return true;
            }
        })

Accessing global, especially mutable state from inside of operators is very dangerous and undermines the sole purposes of RxJava – simplifying concurrency. Obviously we use thread-safe Cache from Guava, but in many cases it’s easy to miss places where shared global mutable state is accessed from multiple threads. If you find yourself mutating some variable outside of the operator chain, be very careful.

Custom distinct() operator in RxJava 1.x

RxJava 1.x has a distinct() operator that presumably does the job:

es.observe()
        .distinct(Event::getUuid)
        .groupBy(Event::getClientId)

Unfortunately distinct() stores all keys (UUIDs) internally in ever-growing HashSet. But we only care about duplicates in last 10 seconds! By copy-pasting the implementation of DistinctOperator I created DistinctEventoperator that takes advantage of Guava’s cache to only store last 10 seconds worth of UUID’s. I intentionally hard-coded Event in this operator rather than making it more generic to keep code easier to understand:

class DistinctEvent implements Observable.Operator<Event, Event> {
    private final Duration duration;
     
    DistinctEvent(Duration duration) {
        this.duration = duration;
    }
 
    @Override
    public Subscriber<? super Event> call(Subscriber<? super Event> child) {
        return new Subscriber<Event>(child) {
            final Map<UUID, Boolean> keyMemory = CacheBuilder.newBuilder()
                    .expireAfterWrite(duration.toMillis(), TimeUnit.MILLISECONDS)
                    .<UUID, Boolean>build().asMap();
             
            @Override
            public void onNext(Event event) {
                if (keyMemory.put(event.getUuid(), true) == null) {
                    child.onNext(event);
                } else {
                    request(1);
                }
            }
             
            @Override
            public void onError(Throwable e) {
                child.onError(e);
            }
             
            @Override
            public void onCompleted() {
                child.onCompleted();
            }
             
        };
    }
}

The usage is fairly simple and the whole implementation (plus the custom operator) is as short as:

es.observe()
        .lift(new DistinctEvent(Duration.ofSeconds(10)))
        .groupBy(Event::getClientId)
        .flatMap(byClient -> byClient
                .observeOn(Schedulers.io())
                .map(clientProjection::consume)
        )
        .window(1, TimeUnit.SECONDS)
        .flatMap(Observable::count)
        .subscribe(
                c -> log.info("Processed {} events/s", c),
                e -> log.error("Fatal error", e)
        );

Actually it can be even shorter if you skip logging every second:

es.observe()
        .lift(new DistinctEvent(Duration.ofSeconds(10)))
        .groupBy(Event::getClientId)
        .flatMap(byClient -> byClient
                .observeOn(Schedulers.io())
                .map(clientProjection::consume)
        )
        .subscribe(
                e -> {},
                e -> log.error("Fatal error", e)
        );

This solution is much shorter than previous one based on thread pools and decorators. The only awkward part is custom operator that avoid memory leak when storing too many historic UUIDs. Luckily RxJava 2 to the rescue!

RxJava 2.x and more powerful built-in distinct()

I was actually this close from submitting a PR to RxJava with more powerful implementation of distinct() operator. But before I checked 2.x branch and there it was: distinct() that allows providing custom Collection as opposed to hard-coded HashSet. Believe it or not, dependency inversion is not only about Spring framework or Java EE. When a library allows you to provide custom implementation of its internal data structure, this is also DI. First I create a helper method that can build Set<UUID> backed by Map<UUID, Boolean> backed by Cache<UUID, Boolean>. We sure like delegation!

private Set<UUID> recentUuids() {
    return Collections.newSetFromMap(
            CacheBuilder.newBuilder()
                    .expireAfterWrite(10, TimeUnit.SECONDS)
                    .<UUID, Boolean>build()
                    .asMap()
    );
}

Having this method we can implement the whole task using this expression:

es.observe()
        .distinct(Event::getUuid, this::recentUuids)
        .groupBy(Event::getClientId)
        .flatMap(byClient -> byClient
                .observeOn(Schedulers.io())
                .map(clientProjection::consume)
        )
        .subscribe(
                e -> {},
                e -> log.error("Fatal error", e)
        );

The elegance, the simplicity, the clarity! It reads almost like a problem:

  • observe a stream of events
  • take only distinct UUIDs into account
  • group events by client
  • for each client consume them (sequentially)

Hope you enjoyed all these solutions and you find them useful in your daily work.

See also:

Tomasz Nurkiewicz

Java EE developer, Scala enthusiast. Enjoying data analysis and visualization. Strongly believes in the power of testing and automation.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button