Enterprise Java

Writing an Event-Sourced CQRS Read Model

Discussions about event sourcing and CQRS seem to usually focus on the overall system architecture or various flavors of domain-driven design in CQRS context. However, the read models are often neglected, even though there are some interesting considerations on this side as well. In this post we’re going to present a sample implementation of populating a view model by consuming event stream. es_cqrs_projection_funnel_300_2

Overview

The idea of a read model is really simple. You take the event log, apply (replay) all the events on an initially empty data model using appropriate functions, and you get the populated model. The code could look like:

List<Event> events = getEvents();
Model model = Model.empty();
for (Event event : events) {
    apply(model, event);
}

We can make this even shorter with functional programming:

Model m = reduce(getEvents(),
                 Model.empty(),
                 (m, e) -> apply(m, e));

That is the essence. Note that it is just the abstract outline and realistic implementation is likely to differ, including buffering, batching (or streaming), persistence etc.

Applying Events

The actual Java code to apply the events may look similar to the below:

EventProcessingResult processEvents() {
    if (getState().isRunning()) {
        int batchSize = getEventsPerIteration();
        List<Event> events = eventStore.getEventsForAllStreams(getLastEventId(),
                                                               batchSize);
        if (events.isEmpty()) {
            return NO_EVENTS_TO_PROCESS;
        } else {
            return processEvents(events);
        }
    } else {
        return NOT_RUNNING;
    }
}

EventProcessingResult processEvents(List<Event> events) {
    try {
        for (Event event : events) {
            dispatchEvent(event);
        }
        return SUCCESS;
    } catch (RuntimeException e) {
        return FAILURE;
    }
}

All in all it’s really simple and straightforward. It is possible to enhance it with hooks before and after processing individual events and the entire batch. Such hooks could be used to:

  • implement transactions,
  • plug in monitoring,
  • implement error handling,
  • calculate the batch size depending on speed,
  • perform arbitrary operations, e.g. setting something up or recalculating once per batch.

The last interesting piece is the dispatchEvent method. Aside from walking the type hierarchy, error handling and making it all optional, it boils down to:

void dispatchEvent(Event e) {
    Method handler = projector.getClass().findMethod("on", e.getClass());
    handler.invoke(projector, e);
}

In other words, for each event type (like OrderCreated), we look for a public method called on that takes a single argument of matching type, on a projector object.

All of the above is part of an engine, a piece of infrastructure backing many view models. All that is necessary to implement a projection is actually provide the projector, with handlers for interesting event types. All other events will simply be ignored.

It could look like this:

public class OrderProjector {
    @Inject
    private OrderDao orders;

    public void on(OrderCreated e) {
        orders.save(new Order(e.getOrderNumber()));
    }

    public void on(OrderApproved e) {
        Order o = orders.find(e.getOrderNumber());
        o.setApproved(true);
    }
}

Projection Thread

Let’s discuss multi-threading for a moment. Shared mutable state immediately brings numerous problems and should be avoided as much as possible. One of the ways to deal with it is not having concurrency in the first place, e.g. by limiting writes to a single thread. In most cases a single-threaded writer combined with ACID transactions is more than enough to keep up with the write load. (The read/query load can be heavy and use many threads – all of the details here are only about the writes.)

The thread is responsible for applying the events to the read model, all the way from querying the event store to updating the view model database. Normally it just loads batches of events from the store and applies them. It continues as long as there are more events to process, and goes to sleep after it’s caught up. It wakes up after a certain amount of time or when notified about new events by the event store.

We also have some control over this thread’s life cycle. For example, we have a way to programmatically pause and resume each projection’s thread, even exposed in an admin GUI.

projection_admin

Push or Pull?

With a database-backed event store, it’s very easy to query repeatedly for new events. This is the pull model. Unfortunately, it also means that you may end up polling too often and generating needless load, or polling too infrequently and thus possibly taking longer to propagate changes to the view model.

That’s why in addition to polling the event store it’s a good idea to introduce notifications that wake up the read models as soon as new events are saved. This effectively becomes a push model with minimal delays and load. We found JGroups to be a very good tool for the job – it supports multiple protocols and is very easy to set up, involving much less hassle than a full-blown message queue.

The notifications may or may not contain actual events.

In the latter (and simpler) design, they only spread the information that a new event has been saved, along with its sequential ID (so that all projections can estimate how much behind they are). When awakened, the executor can continue along its normal path, starting with querying the event store.

Why? Because handling events coming from a single source is easier, but more importantly because a DB-backed event store trivially guarantees ordering and has no issues with lost or duplicate messages. Querying the database is very fast, given that we’re reading a single table sequentially by primary key, and most of the time the data is in RAM cache anyway. The bottleneck is in the projection thread updating its read model database.

However, there are no obstacles to putting event data in the notifications (except for maybe size or network traffic considerations). It would likely decrease the load on the event store and save some round trips to database. The projector would need to maintain a buffer and fall back to querying the event store when needed. Or the system could use a more reliable message queue.

Restarting Projections

Aside from pause/resume, the above screenshot shows one more action: restart. Innocuous as it looks, it’s a really nice and powerful feature.

Since the view model is completely derived from the event log, at any time it can be thrown away and recreated from the beginning (or from some initial state/old enough snapshot). Data is safe in the event log, the ultimate source of truth.

It’s useful when anything about the view changes: a field or a table is added, a bug is fixed, something is calculated differently. When it happens, it’s often easier (or required) to just start from the beginning, rather than for example implement massive SQL migration script.

It’s even possible to go as far as fully automating it, so that when the system starts up and it detects the DB schema does not match the corresponding Java model, it can automatically recreate the schema and reprocess the event log. It’s like running with Hibernate create-drop policy, except for that it doesn’t lose data.

Performance

The solution may appear quite limited with regards to performance.

One point that could raise an eyebrow is the single-threaded writer. In reality a single thread is usually fast enough to easily keep up with the load. Concurrency is not only more difficult to implement and maintain, but it also introduces contention. Reads (queries) can be heavily multi-threaded and easy to scale out.

We also gain a lot by having multiple read models, for example separating analytics from administration and “transactional” data. Each model is single-threaded (for writing), but the multiple models consume events in parallel. Finally, the solution could be modified to use sharding or some kind of fork-join processing.

Another interesting point is restarting projections from scratch.

A good solution is something like kappa architecture:

  • Keep the outdated projection up and running and answering all the queries.
  • Start a new projection, e.g. to another database. Just let it process the events, don’t point any traffic to it.
  • When the new projection catches up, redirect traffic and shut down the old one.

On a very small instance, especially for development, it may even be possible to do a restart online, on the same instance. It depends on answers to the following questions: How long does it take to reprocess all events? Is it acceptable for this projection to be stale for 30 minutes? Can we deploy at night or weekend, when nobody is using the system anyway? Do we have to replay all the history?

Another factor to consider here is persistence. If it’s too much of a bottleneck and cannot be further optimized, consider using in-memory view models.

Summing Up

In essence, that’s all it takes to implement a read model consuming an event store. It gains much simplicity thanks to a linear event store and processing everything in a single thread. So much that in the end it’s really just a loop, implementing the reduction shown in the beginning.

In future posts I am going to dig deeper into practical concerns of implementing projections.

Reference: Writing an Event-Sourced CQRS Read Model from our JCG partner Konrad Garus at the Squirrel’s blog.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Back to top button