Core Java

Simplifying trading system with Akka

My colleagues are developing a trading system that processes quite heavy stream of incoming transactions. Each transaction covers one Instrument (think bond or stock) and has some (now) unimportant properties. They are stuck with Java (< 8), so let’s stick to it:
 
 
 
 
 
 
 

class Instrument implements Serializable, Comparable<Instrument> {
    private final String name;

    public Instrument(String name) {
        this.name = name;
    }

    //...Java boilerplate

}

public class Transaction {
    private final Instrument instrument;

    public Transaction(Instrument instrument) {
        this.instrument = instrument;
    }

    //...Java boilerplate

}

Instrument will later be used as a key in HashMap, so for the future we pro-actively implement Comparable<Instrument>. This is our domain, now the requirements:

  1. Transactions come into the system and need to be processed (whatever that means), as soon as possible
  2. We are free to process them in any order
  3. …however transactions for the same instrument need to be processed sequentially in the exact same order as they came in.

Initial implementation was straightforward – put all incoming transactions into a queue (e.g. ArrayBlockingQueue) with a single consumer. This satisfies last requirement, since queue preserves strict FIFO ordering across all transactions. But such an architecture prevents concurrent processing of unrelated transactions for different instruments, thus wasting compelling throughput improvement. Not surprisingly this implementation, while undoubtedly simple, became a bottleneck.

The first idea was to somehow split incoming transactions by instrument and process instruments individually. We came up with the following data structure:

priavate final ConcurrentMap<Instrument, Queue<Transaction>> queues = 
    new ConcurrentHashMap<Instrument, Queue<Transaction>>();

public void accept(Transaction tx) {
    final Instrument instrument = tx.getInstrument();
    if (queues.get(instrument) == null) {
        queues.putIfAbsent(instrument, new LinkedBlockingQueue<Transaction>());
    }
    final Queue<Transaction> queue = queues.get(instrument);
    queue.add(tx);
}

Yuck! But worst is yet to come. How do you make sure at most one thread processes each queue at a time? After all, otherwise two threads could pick up items from one queue (one instrument) and process them in reversed order, which is not allowed. The simplest case is to have a Thread per queue – this won’t scale, as we expect tens of thousands of different instruments. So we can have say N threads and let each of them handle a subset of queues, e.g. instrument.hashCode() % N tells us which thread takes care of given queue. But it’s still not perfect for three reasons:

  1. One thread must “observe” many queues, most likely busy-waiting, iterating over them all the time. Alternatively queue might wake up its parent thread somehow
  2. In worst case scenario all instruments will have conflicting hash codes, targeting only one thread – which is effectively the same as our initial solution
  3. It’s just damn complex! Beautiful code is not complex!

Implementing this monstrosity is possible, but hard and error-prone. Moreover there is another non-functional requirement: instruments come and go and there are hundreds of thousands of them over time. After a while we should remove entries in our map representing instruments that were not seen lately. Otherwise we’ll get a memory leak.

If you can come up with some simpler solution, let me know. In the meantime let me tell you what I suggested my colleagues. As you can guess, it was Akka – and it turned out to be embarrassingly simple. We need two kinds of actors: Dispatcher and Processor. Dispatcher has one instance and receive all incoming transactions. Its responsibility is to find or spawn worker Processor actor for each Instrument and push transaction to it:

public class Dispatcher extends UntypedActor {

    private final Map<Instrument, ActorRef> instrumentProcessors = 
        new HashMap<Instrument, ActorRef>();

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof Transaction) {
            dispatch(((Transaction) message));
        } else {
            unhandled(message);
        }
    }

    private void dispatch(Transaction tx) {
        final ActorRef processor = findOrCreateProcessorFor(tx.getInstrument());
        processor.tell(tx, self());
    }

    private ActorRef findOrCreateProcessorFor(Instrument instrument) {
        final ActorRef maybeActor = instrumentProcessors.get(instrument);
        if (maybeActor != null) {
            return maybeActor;
        } else {
            final ActorRef actorRef = context().actorOf(
                Props.create(Processor.class), instrument.getName());
            instrumentProcessors.put(instrument, actorRef);
            return actorRef;
        }
    }
}

This is dead simple. Since our Dispatcher actor is effectively single-threaded, no synchronization is needed. We barely receive Transaction, lookup or create Processor and pass Transaction further. This is how Processor implementation could look like:

public class Processor extends UntypedActor {

    private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof Transaction) {
            process(((Transaction) message));
        } else {
            unhandled(message);
        }
    }

    private void process(Transaction tx) {
        log.info("Processing {}", tx);
    }
}

That’s it! Interestingly our Akka implementation is almost identical to our first idea with map of queues. After all an actor is just a queue and a (logical) thread processing items in that queue. The difference is: Akka manages limited thread pool and shares it between maybe hundreds of thousands of actors. And because every instrument has its own dedicated (and “single-threaded”) actor, sequential processing of transactions per instrument is guaranteed.

One more thing. As stated earlier, there is an enormous amount of instruments and we don’t want to keep actors for instruments that weren’t seen for quite a while. Let’s say that if a Processor didn’t receive any transaction within an hour, it should be stopped and garbage collected. If later we receive new transaction for such instrument, we can always recreate it. This one is quite tricky – we must ensure that if transaction arrives when processor decided to delete itself, we can’t loose that transaction. Rather than stopping itself, Processor signals its parent it was idle for too long. Dispatcher will then send PoisonPill to it. Because both ProcessorIdle and Transaction messages are processed sequentially, there is no risk of transaction being sent to no longer existing actor.

Each actor manages its lifecycle independently by scheduling timeout using setReceiveTimeout:

public class Processor extends UntypedActor {

    @Override
    public void preStart() throws Exception {
        context().setReceiveTimeout(Duration.create(1, TimeUnit.HOURS));
    }

    @Override
    public void onReceive(Object message) throws Exception {
        //...
        if (message instanceof ReceiveTimeout) {
            log.debug("Idle for two long, shutting down");
            context().parent().tell(ProcessorIdle.INSTANCE, self());
        } else {
            unhandled(message);
        }
    }

}

enum ProcessorIdle {
    INSTANCE
} 

Clearly, when Processor did not receive any message for a period of one hour, it gently signals that to its parent (Dispatcher). But the actor is still alive and can handle transactions if they happen precisely after an hour. What Dispatcher does is it kills given Processor and removes it from a map:

public class Dispatcher extends UntypedActor {

    private final BiMap<Instrument, ActorRef> instrumentProcessors = HashBiMap.create();

    public void onReceive(Object message) throws Exception {
        //...
        if (message == ProcessorIdle.INSTANCE) {
            removeIdleProcessor(sender());
            sender().tell(PoisonPill.getInstance(), self());
        } else {
            unhandled(message);
        }
    }

    private void removeIdleProcessor(ActorRef idleProcessor) {
        instrumentProcessors.inverse().remove(idleProcessor);
    }

    private void dispatch(Transaction tx) {
        final ActorRef processor = findOrCreateProcessorFor(tx.getInstrument());
        processor.tell(tx, self());
    }

    //...

}

There was a slight inconvenience. instrumentProcessors used to be a Map<Instrument, ActorRef>. This proved to be insufficient, since we suddenly have to remove an entry in this map by value. In other words we need to find a key (Instrument) that maps to a given ActorRef (Processor). There are different ways to handle it (e.g. idle Processor could send an Instrumnt it handles), but instead I used BiMap<K, V> from Guava. It works because both Instruments and ActorRefs pointed are unique (actor-per-instrument). Having BiMap I could simply inverse() the map (from BiMap<Instrument, ActorRef> to BiMap<ActorRef, Instrument> and treat ActorRef as key.

This Akka example is not more than “hello, world“. But compared to convoluted solution we would have to write using concurrent queues, locks and thread pools, it’s perfect. My team mates were so excited that by the end of the day they decided to rewrite their whole application to Akka.

Reference: Simplifying trading system with Akka from our JCG partner Tomasz Nurkiewicz at the Java and neighbourhood blog.

Tomasz Nurkiewicz

Java EE developer, Scala enthusiast. Enjoying data analysis and visualization. Strongly believes in the power of testing and automation.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

3 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
nithin
nithin
9 years ago

I am not familiar with AKKA and hence can’t comment on that solution. Another simple solution would be to use a a pool of threads and each having it’s own internal queue. Depending on instrument hashcode assign the Transaction to a particular thread in the pool. This way all transactions of a particular Instrument will be assigned to a particular thread. Number of threads will be independent on the number of instruments and this solution is more scalable.

Cons
1. Performance depends on the hashcode implementation.
2. There is an additional overhead of managing the thread pool.

Zoltan Juhasz
Zoltan Juhasz
9 years ago
Reply to  nithin

Your idea is basically the same as the basic one in the post, which was replaced by actors. So it has the same problem, that unrelated events might be assigned to the same thread, and the later one will be blocked, until the thread is free.

Actor model is much-much more scalable, because the number of actors can be practically unlimited, and they can be processed on any thread. There is also no limitation for the number of threads, we can choose the ideal number based on the number of processing cores.

Zoltan Juhasz
Zoltan Juhasz
9 years ago

I have one question and one note to the post. The question is, what is the latency of the actor based solution? We had exactly the same problem on our project. The initial solution after the single threaded one was the same, x thread, assigned by orderId%10. We looked for another solution because the problem I mentioned, that unrelated events are processed by the same thread, even there are other free threads. The solution was a complex logic, which assigned the event to a free thread if there was no related event under processing, and the related events was assigned… Read more »

Back to top button