About Tomasz Nurkiewicz

Java EE developer, Scala enthusiast. Enjoying data analysis and visualization. Strongly believes in the power of testing and automation.

Simplifying trading system with Akka

My colleagues are developing a trading system that processes quite heavy stream of incoming transactions. Each transaction covers one Instrument (think bond or stock) and has some (now) unimportant properties. They are stuck with Java (< 8), so let’s stick to it:
 
 
 
 
 
 
 

class Instrument implements Serializable, Comparable<Instrument> {
    private final String name;

    public Instrument(String name) {
        this.name = name;
    }

    //...Java boilerplate

}

public class Transaction {
    private final Instrument instrument;

    public Transaction(Instrument instrument) {
        this.instrument = instrument;
    }

    //...Java boilerplate

}

Instrument will later be used as a key in HashMap, so for the future we pro-actively implement Comparable<Instrument>. This is our domain, now the requirements:

  1. Transactions come into the system and need to be processed (whatever that means), as soon as possible
  2. We are free to process them in any order
  3. …however transactions for the same instrument need to be processed sequentially in the exact same order as they came in.

Initial implementation was straightforward – put all incoming transactions into a queue (e.g. ArrayBlockingQueue) with a single consumer. This satisfies last requirement, since queue preserves strict FIFO ordering across all transactions. But such an architecture prevents concurrent processing of unrelated transactions for different instruments, thus wasting compelling throughput improvement. Not surprisingly this implementation, while undoubtedly simple, became a bottleneck.

The first idea was to somehow split incoming transactions by instrument and process instruments individually. We came up with the following data structure:

priavate final ConcurrentMap<Instrument, Queue<Transaction>> queues = 
    new ConcurrentHashMap<Instrument, Queue<Transaction>>();

public void accept(Transaction tx) {
    final Instrument instrument = tx.getInstrument();
    if (queues.get(instrument) == null) {
        queues.putIfAbsent(instrument, new LinkedBlockingQueue<Transaction>());
    }
    final Queue<Transaction> queue = queues.get(instrument);
    queue.add(tx);
}

Yuck! But worst is yet to come. How do you make sure at most one thread processes each queue at a time? After all, otherwise two threads could pick up items from one queue (one instrument) and process them in reversed order, which is not allowed. The simplest case is to have a Thread per queue – this won’t scale, as we expect tens of thousands of different instruments. So we can have say N threads and let each of them handle a subset of queues, e.g. instrument.hashCode() % N tells us which thread takes care of given queue. But it’s still not perfect for three reasons:

  1. One thread must “observe” many queues, most likely busy-waiting, iterating over them all the time. Alternatively queue might wake up its parent thread somehow
  2. In worst case scenario all instruments will have conflicting hash codes, targeting only one thread – which is effectively the same as our initial solution
  3. It’s just damn complex! Beautiful code is not complex!

Implementing this monstrosity is possible, but hard and error-prone. Moreover there is another non-functional requirement: instruments come and go and there are hundreds of thousands of them over time. After a while we should remove entries in our map representing instruments that were not seen lately. Otherwise we’ll get a memory leak.

If you can come up with some simpler solution, let me know. In the meantime let me tell you what I suggested my colleagues. As you can guess, it was Akka – and it turned out to be embarrassingly simple. We need two kinds of actors: Dispatcher and Processor. Dispatcher has one instance and receive all incoming transactions. Its responsibility is to find or spawn worker Processor actor for each Instrument and push transaction to it:

public class Dispatcher extends UntypedActor {

    private final Map<Instrument, ActorRef> instrumentProcessors = 
        new HashMap<Instrument, ActorRef>();

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof Transaction) {
            dispatch(((Transaction) message));
        } else {
            unhandled(message);
        }
    }

    private void dispatch(Transaction tx) {
        final ActorRef processor = findOrCreateProcessorFor(tx.getInstrument());
        processor.tell(tx, self());
    }

    private ActorRef findOrCreateProcessorFor(Instrument instrument) {
        final ActorRef maybeActor = instrumentProcessors.get(instrument);
        if (maybeActor != null) {
            return maybeActor;
        } else {
            final ActorRef actorRef = context().actorOf(
                Props.create(Processor.class), instrument.getName());
            instrumentProcessors.put(instrument, actorRef);
            return actorRef;
        }
    }
}

This is dead simple. Since our Dispatcher actor is effectively single-threaded, no synchronization is needed. We barely receive Transaction, lookup or create Processor and pass Transaction further. This is how Processor implementation could look like:

public class Processor extends UntypedActor {

    private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof Transaction) {
            process(((Transaction) message));
        } else {
            unhandled(message);
        }
    }

    private void process(Transaction tx) {
        log.info("Processing {}", tx);
    }
}

That’s it! Interestingly our Akka implementation is almost identical to our first idea with map of queues. After all an actor is just a queue and a (logical) thread processing items in that queue. The difference is: Akka manages limited thread pool and shares it between maybe hundreds of thousands of actors. And because every instrument has its own dedicated (and “single-threaded”) actor, sequential processing of transactions per instrument is guaranteed.

One more thing. As stated earlier, there is an enormous amount of instruments and we don’t want to keep actors for instruments that weren’t seen for quite a while. Let’s say that if a Processor didn’t receive any transaction within an hour, it should be stopped and garbage collected. If later we receive new transaction for such instrument, we can always recreate it. This one is quite tricky – we must ensure that if transaction arrives when processor decided to delete itself, we can’t loose that transaction. Rather than stopping itself, Processor signals its parent it was idle for too long. Dispatcher will then send PoisonPill to it. Because both ProcessorIdle and Transaction messages are processed sequentially, there is no risk of transaction being sent to no longer existing actor.

Each actor manages its lifecycle independently by scheduling timeout using setReceiveTimeout:

public class Processor extends UntypedActor {

    @Override
    public void preStart() throws Exception {
        context().setReceiveTimeout(Duration.create(1, TimeUnit.HOURS));
    }

    @Override
    public void onReceive(Object message) throws Exception {
        //...
        if (message instanceof ReceiveTimeout) {
            log.debug("Idle for two long, shutting down");
            context().parent().tell(ProcessorIdle.INSTANCE, self());
        } else {
            unhandled(message);
        }
    }

}

enum ProcessorIdle {
    INSTANCE
} 

Clearly, when Processor did not receive any message for a period of one hour, it gently signals that to its parent (Dispatcher). But the actor is still alive and can handle transactions if they happen precisely after an hour. What Dispatcher does is it kills given Processor and removes it from a map:

public class Dispatcher extends UntypedActor {

    private final BiMap<Instrument, ActorRef> instrumentProcessors = HashBiMap.create();

    public void onReceive(Object message) throws Exception {
        //...
        if (message == ProcessorIdle.INSTANCE) {
            removeIdleProcessor(sender());
            sender().tell(PoisonPill.getInstance(), self());
        } else {
            unhandled(message);
        }
    }

    private void removeIdleProcessor(ActorRef idleProcessor) {
        instrumentProcessors.inverse().remove(idleProcessor);
    }

    private void dispatch(Transaction tx) {
        final ActorRef processor = findOrCreateProcessorFor(tx.getInstrument());
        processor.tell(tx, self());
    }

    //...

}

There was a slight inconvenience. instrumentProcessors used to be a Map<Instrument, ActorRef>. This proved to be insufficient, since we suddenly have to remove an entry in this map by value. In other words we need to find a key (Instrument) that maps to a given ActorRef (Processor). There are different ways to handle it (e.g. idle Processor could send an Instrumnt it handles), but instead I used BiMap<K, V> from Guava. It works because both Instruments and ActorRefs pointed are unique (actor-per-instrument). Having BiMap I could simply inverse() the map (from BiMap<Instrument, ActorRef> to BiMap<ActorRef, Instrument> and treat ActorRef as key.

This Akka example is not more than “hello, world“. But compared to convoluted solution we would have to write using concurrent queues, locks and thread pools, it’s perfect. My team mates were so excited that by the end of the day they decided to rewrite their whole application to Akka.

Reference: Simplifying trading system with Akka from our JCG partner Tomasz Nurkiewicz at the Java and neighbourhood blog.

Do you want to know how to develop your skillset to become a Java Rockstar?

Subscribe to our newsletter to start Rocking right now!

To get you started we give you two of our best selling eBooks for FREE!

JPA Mini Book

Learn how to leverage the power of JPA in order to create robust and flexible Java applications. With this Mini Book, you will get introduced to JPA and smoothly transition to more advanced concepts.

JVM Troubleshooting Guide

The Java virtual machine is really the foundation of any Java EE platform. Learn how to master it with this advanced guide!

Given email address is already subscribed, thank you!
Oops. Something went wrong. Please try again later.
Please provide a valid email address.
Thank you, your sign-up request was successful! Please check your e-mail inbox.
Please complete the CAPTCHA.
Please fill in the required fields.

3 Responses to "Simplifying trading system with Akka"

  1. nithin says:

    I am not familiar with AKKA and hence can’t comment on that solution. Another simple solution would be to use a a pool of threads and each having it’s own internal queue. Depending on instrument hashcode assign the Transaction to a particular thread in the pool. This way all transactions of a particular Instrument will be assigned to a particular thread. Number of threads will be independent on the number of instruments and this solution is more scalable.

    Cons
    1. Performance depends on the hashcode implementation.
    2. There is an additional overhead of managing the thread pool.

    • Zoltan Juhasz says:

      Your idea is basically the same as the basic one in the post, which was replaced by actors. So it has the same problem, that unrelated events might be assigned to the same thread, and the later one will be blocked, until the thread is free.

      Actor model is much-much more scalable, because the number of actors can be practically unlimited, and they can be processed on any thread. There is also no limitation for the number of threads, we can choose the ideal number based on the number of processing cores.

  2. Zoltan Juhasz says:

    I have one question and one note to the post.

    The question is, what is the latency of the actor based solution? We had exactly the same problem on our project. The initial solution after the single threaded one was the same, x thread, assigned by orderId%10. We looked for another solution because the problem I mentioned, that unrelated events are processed by the same thread, even there are other free threads. The solution was a complex logic, which assigned the event to a free thread if there was no related event under processing, and the related events was assigned to the same thread if there was any.

    I came up with a similar actor based solution, which wasn’t chosen because it is hard to configure, and can increase latency. Unfortunately I didn’t have the chance to make a poc, that’s why I’m curious about your latency experiences.

    The note is that the blocking queue can be very ineffective, I would switch to a nonblocking one.
    Anyway, do you know LMAX architecture? It achieves very good performance on single thread. So be aware, that sometimes threading backfires, and costs more than the return.

Leave a Reply


six × = 48



Java Code Geeks and all content copyright © 2010-2014, Exelixis Media Ltd | Terms of Use | Privacy Policy | Contact
All trademarks and registered trademarks appearing on Java Code Geeks are the property of their respective owners.
Java is a trademark or registered trademark of Oracle Corporation in the United States and other countries.
Java Code Geeks is not connected to Oracle Corporation and is not sponsored by Oracle Corporation.
Do you want to know how to develop your skillset and become a ...
Java Rockstar?

Subscribe to our newsletter to start Rocking right now!

To get you started we give you two of our best selling eBooks for FREE!

Get ready to Rock!
You can download the complementary eBooks using the links below:
Close