Enterprise Java

A reactive and performant Spray + Akka solution to “Playing with concurrency and performance in Java and Node.js”

In my previous post I examined a fictitious trading engine and compared a Java based blocking solution to a Node.js based non-blocking solution. At the end of the post I wrote that:

I suspect that following the recent success of Node.js, more and more asynchronous Java libraries will start to appear.

Well such libraries already exist, for example: Akka, Spray, and this Mysql async driver.
 
 
I set myself the challenge of creating a non-blocking Java based solution using exactly those libraries, so that I could compare its performance to that of the Node.js solution created for the last article. The first thing you might have noticed is that these are all Scala based libraries, but I wrote this solution in Java even though it is a little less syntactically elegant. In the last article I introduced a solution based upon Akka whereby the trading engine was wrapped in an actor. Here, I have dropped Tomcat as the HTTP server and replaced it with Spray, which neatly integrates the HTTP server straight into Akka. In theory this should make no difference to performance, because Spray is NIO just as Tomcat 8 is, out of the box. But what attracted me to this solution was that overall, the number of threads is greatly reduced, as Spray, Akka and the async Mysql library all use the same execution context. Running on my Windows development machine, Tomcat has over 30 threads compared to just a few over 10 for the solution built here, or compared to Websphere or JBoss where there are hundreds of threads. The execution context is basically a pool of threads which run tasks that are given to it. Since all the libraries used in the solution presented here were non-blocking, the number of threads can be kept low and close to the theoretical optimum, so that as little context switching takes place as possible, making the process run efficiently.

The code written for this article is on GitHub. The first part of the program is the main method which starts up Spray and Akka:

public static final ActorSystem system = ActorSystem.create("system");

public static void main(String[] args) {
    ...
    ActorRef listener = system.actorOf(Props.create(HttpActor.class), "httpActor"); 
    
    InetSocketAddress endpoint = new InetSocketAddress(3000);
    int backlog = 100;
    List<Inet.SocketOption> options = JavaConversions.asScalaBuffer(new ArrayList<Inet.SocketOption>()).toList();
    Option<ServerSettings> settings = scala.Option.empty();
    ServerSSLEngineProvider sslEngineProvider = null;
    Bind bind = new Http.Bind(listener, endpoint, backlog, options, settings, sslEngineProvider);
    IO.apply(spray.can.Http$.MODULE$, system).tell(bind, ActorRef.noSender());
    
    system.scheduler().schedule(new FiniteDuration(5, TimeUnit.SECONDS), new FiniteDuration(5, TimeUnit.SECONDS), ()->{
        System.out.println(new Date() + " - numSales=" + numSales.get());
    }, system.dispatcher());
}

Line 1 creates an actor system which is public so that I can access it from elsewhere, since it is used to access the single execution context which I want to use throughout the program. (In code where maintainability is an issue I would write something so that this object could be injected into the relevant parts of the program.) Line 5 then uses the system to instantiate an actor which is used to handle all HTTP requests for purchase and sales orders. Lines 7-11 just set up configuration data for the server. Lines 12 and 13 are where we then take the configuration and our actor and tell Akka IO use them and the HTTP module to send all HTTP requests as messages to our actor from line 5. Lines 15-17 are where I effectively setup a timer task which fires every 5 seconds to output some statistics. The important part here is to notice that I am not using Java’s Timer to schedule the task since that just adds more unnecessary threads to my process. Instead I use the same execution context as Akka, so as few threads as possible are created.

Next is the actor for handling the HTTP requests:

private static class HttpActor extends AbstractActor {

    private static final HttpProtocol HTTP_1_1 = HttpProtocols.HTTP$div1$u002E1();

    public HttpActor() {
        final Router router = partitionAndCreateRouter();
        
        receive(ReceiveBuilder
            .match(HttpRequest.class, r -> {
                int id = Constants.ID.getAndIncrement();
                String path = String.valueOf(r.uri().path());
                if("/sell".equals(path)){
                    String productId = r.uri().query().get("productId").get();
                    ...
                    SalesOrder so = new SalesOrder(price, productId, quantity, id);
                    so.setSeller(new Seller(who));
                    router.route(so, self());
                    replyOK(id);
                }else if("/buy".equals(path)){
                    ...
                }else{
                    handleUnexpected(r);
                }
            }).match(Tcp.Connected.class, r ->{
                sender().tell(new Http.Register(self(), Http.EmptyFastPath$.MODULE$), self()); //tell that connection will be handled here!
            }).build());
    }

Line 3 shows an example of how integrating Scala in a Java program can be ugly, but how you can sometimes hide away those ugly parts by adding your own abstractions. The HTTP actor which responds to HTTP requests has 3 jobs. The first job, on line 6, is where it creates a router which I shall describe below, and which it can use to delegate work to. The second job is to handle all new connections on lines 24-25 which tells Spray that this actor will also handle the actual requests and not only the connections. The third job this actor has is shown on lines 9-18 where the actor takes an HTTP request and delegates (routes) some work to another actor in the system.

This actor knows the HTTP model but the HTTP abstraction doesn’t leak into the next layer of the system. Instead, the actor passes domain objects (or value objects or case classes or similar) onto the actors which encapsulate the trading engines. The construction of such a domain objects can be seen on lines 15 and 16, using data extracted from the HTTP request, e.g. on line 13, or out of say a JSON object in the request body. Spray contains useful directives which can help you extract the data from the request, and abstract a little away from HTTP, if that is what you want. Which domain object to construct depend on the REST-like interface which I have built and is handled on lines 9, 12 and 19. Had I used Scala, I could have written more elegant code using pattern matching on the HttpRequest object. The domain object is passed onto the trading engine by getting the router from line 6 to route the domain object to a suitable actor, on line 17. Last, but not least, line 18 is where the sales order request is acknowledged in an HTTP response which passes a JSON object back to the consumer, together with the unique ID assigned to the order, so that its status can be queried later (it gets persisted into the sales objects).

The next snippet shows how we partition the market and create a number of actors to handle requests in parallel.

private Router partitionAndCreateRouter() {
    Map<String, ActorRef> kids = new HashMap<>();
    java.util.List<Routee> routees = new ArrayList<Routee>();
    int chunk = Constants.PRODUCT_IDS.length / NUM_KIDS;
    for (int i = 0, j = Constants.PRODUCT_IDS.length; i < j; i += chunk) {
        String[] temparray = Arrays.copyOfRange(Constants.PRODUCT_IDS, i, i + chunk);
        LOGGER.info("created engine for products " + temparray);
        ActorRef actor = getContext().actorOf(Props.create(EngineActor.class));
        getContext().watch(actor);
        routees.add(new ActorRefRoutee(actor));

        for (int k = 0; k < temparray.length; k++) {
            LOGGER.debug("mapping productId '" + temparray[k] + "' to engine " + i);
            kids.put(temparray[k], actor);
        }
        LOGGER.info("---started trading");
        actor.tell(EngineActor.RUN, ActorRef.noSender());
    }			
    Router router = new Router(new PartitioningRoutingLogic(kids), routees);
    return router;
}

This code is similar to what we did in the last article. In order to scale out and use more than one core concurrently, the market is partitioned by product ID and each trading engine runs concurrently for a different market partition. In the solution presented here, an EngineActor is created per partition and wrapped in a Routee on line 10. A map of actors keyed by product ID is also filled on line 14. The router is built using the routees and the map on line 19 and it is this which the HttpActor uses in the previous snippet, when delegating work. Note also line 17, which starts the trading engine contained in the EngineActor, so that it is up and running, ready to trade purchase and sales orders, when they are passed to these actors.

The EngineActor class isn’t shown here explicitly since it is almost identical to the actors used in the last article, and it simply encapsulate a trading engine which handles all products from a particular market partition. Line 19 above uses a RoutingLogic to build the router, which is shown below:

public static class PartitioningRoutingLogic implements RoutingLogic {

    private Map<String, ActorRef> kids;

    public PartitioningRoutingLogic(Map<String, ActorRef> kids) {
        this.kids = kids;
    }

    @Override
    public Routee select(Object message, IndexedSeq<Routee> routees) {

        //find which product ID is relevant here
        String productId = null;
        if(message instanceof PurchaseOrder){
            productId = ((PurchaseOrder) message).getProductId();
        }else if(message instanceof SalesOrder){
            productId = ((SalesOrder) message).getProductId();
        }
        ActorRef actorHandlingProduct = kids.get(productId);

        //no go find the routee for the relevant actor
        for(Routee r : JavaConversions.asJavaIterable(routees)){
            ActorRef a = ((ActorRefRoutee) r).ref(); //cast ok, since the are by definition in this program all routees to ActorRefs
            if(a.equals(actorHandlingProduct)){
                return r;
            }
        }
        
        return akka.routing.NoRoutee$.MODULE$; //none found, return NoRoutee
    }
}

The select(...) method on line 10 is called by the router when it receives an object which it must route to the correct actor. Using the map created in the previous listing, and the product ID obtained from the request, it is easy to find the actor which contains the trading engine responsible for the relevant market partition. By returning the routee which wraps that actor, Akka will pass the order object on to the correct EngineActor, which then puts the data into the model when that message is handled at a time when the trading engine is between trading cycles and the actor next checks its inbox.

OK, so that is the front end dealt with. The second major change that was required to the solution from the previous article, was the design of the method which persists sales after trading takes place. In the Java based solution I was synchronously iterating over each sale and sending an insert statement to the database and only processing the next sale once the database had replied. With the solution presented here, I chose to process the sales in parallel by fire off an insert request to the database and immediately moving to the next sale and doing the same. The responses were handled asynchronously within the execution context using a callback which I provided. I wrote the program to wait for the last insert to be acknowledged before trading continued with newly created purchase and sales orders which had arrived since the last trading session had started. This is shown in the following listing:

private void persistSales(List<Sale> sales, final PersistenceComplete f) {
    if (!sales.isEmpty()) {
        LOGGER.info("preparing to persist sales");

        final AtomicInteger count = new AtomicInteger(sales.size());
        sales.forEach(sale -> {
            List values = Arrays.asList(sale.getBuyer().getName(), 
                                        sale.getSeller().getName(),
                                        sale.getProductId(),
                                        sale.getPrice(),
                                        sale.getQuantity(),
                                        sale.getPurchaseOrder().getId(),
                                        sale.getSalesOrder().getId());
            
            Future<QueryResult> sendQuery = POOL.sendPreparedStatement(SQL, JavaConversions.asScalaBuffer(values));
            sendQuery.onComplete(new JFunction1<Try<QueryResult>, Void>() {
                @Override
                public Void apply(Try<QueryResult> t) {
                    if(t.isSuccess()){
                        QueryResult qr = t.get();
                        //the query result doesnt contain auto generated IDs! library seems immature...
                        //sale.setId(???);
                    }
                    
                    if(count.decrementAndGet() == 0){
                        if(t.isSuccess()){
                            f.apply(null);
                        }else{
                            f.apply(t.failed().get());
                        }
                        
                    }
                    return null; //coz of Void
                }
            }, Main.system.dispatcher());
        });
    }else{
        f.apply(null); //nothing to do, so continue immediately
    }
}

The persistSales(...) method is called by the trading engine after each trading cycle, and is passed a list of sales made during that trading cycle, and a callback function to be called once all the persistence is complete. If nothing was sold, then line 38 calls the callback immediately. Otherwise, a counter is created on line 5 which is initialised with the number of sales to be persisted. Each sale is persisted asynchronously on lines 7-15. Note how a Future is returned on line 15 and how we use another callback on lines 16-35 to handle completion of the future – there is no blocking done here, waiting for the future to complete! The above mentioned counter is decremented on line 25, once the sale is persisted, and once all sales are persisted, the callback passed into the persistSales(...) method is called. Note that the class JFunction1 used on line 16 is a shim allowing easier integration of Scala – the code is on GitHub at the link given above. Lines 21 and 22 show that I had a little problem with the async Mysql library that I used. It is still a beta, and doesn’t seem to have a way to get hold of the generated (autoincrement) primary key of the sale. Note also line 35, where I pass in the execution context which Akka is using, so that the Future which handles completion of the insert statement is processed on one of the existing threads, rather than some new thread – again, keeping the total number of threads as low as possible.

This listing also shows an interesting problem, namely that the thread which calls the database to insert the data is not necessarily the same thread which might need to close the connection [1]. In normal Java EE and Spring there is often use of thread local storage (also see here). If you called through to a bean from the function handling the completion of the future, resources which are injected into it may not work, because the container cannot work out what the context is. Scala solves this problem using implicit parameters, which are passed into methods under the hood.

The listing above uses the PersistenceComplete callback, which is shown below on lines 14-16. It also uses a connection pool which is created using the following code. Yet again, the execution context which Akka uses is passed over to the async Mysql library, on line 10 below. Line 10 below also shows a non-default pool configuration where I allow a maximum queue size of up to a thousand. During load testing I was getting a lot of errors indicating that the pool was saturated, and increasing this value solved the problem.

private static final String SQL = "INSERT INTO SALES (BUYER_NAME, SELLER_NAME, PRODUCT_ID, PRICE, QUANTITY, PO_ID, SO_ID) VALUES (?, ?, ?, ?, ?, ?, ?)";

private static final ConnectionPool<MySQLConnection> POOL;
static {
    Duration connectTimeout = Duration.apply(5.0, TimeUnit.SECONDS);
    Duration testTimeout = Duration.apply(5.0, TimeUnit.SECONDS);
    Configuration configuration = new Configuration("root", Main.DB_HOST, 3306, Option.apply("password"), Option.apply("TRADER"), io.netty.util.CharsetUtil.UTF_8, 16777216, PooledByteBufAllocator.DEFAULT, connectTimeout, testTimeout);
    
    MySQLConnectionFactory factory = new MySQLConnectionFactory(configuration);
    POOL = new ConnectionPool<MySQLConnection>(factory, new PoolConfiguration(1000, 4, 1000, 4000), Main.system.dispatcher());
}


private static interface PersistenceComplete {
    void apply(Throwable failure);
}

The callback passed into persistSales(...) is shown in the next listing. The following code is hardly different from the original shown in the last article, except that it is now asynchronous in style. It is called once all sales are persisted and only then does the callback send a message (via its event listener) to the actor, on line 14 below. That message will normally be at the back of the inbox after a load of new purchase and sales orders. Each of those messages will be processed, leading to the trading engine model being updated with the new orders, before trading is recommenced.

persistSales(sales, t -> {
    if(t != null){
        LOGGER.error("failed to persist sales: " + sales, t);
    }else{
        LOGGER.info("persisting completed, notifying involved parties...");
        sales.stream().forEach(sale -> {
            if (sale.getBuyer().listener != null)
                sale.getBuyer().listener.onEvent(EventType.PURCHASE, sale);
            if (sale.getSeller().listener != null)
                sale.getSeller().listener.onEvent(EventType.SALE, sale);
        });
        ...
    }
    listener.onEvent(EventType.STOPPED, null);
});

The final code listing is the modification to the Node.js solution which was made so that it too would persist sales in parallel, rather than one after the other, as was the case in the last article.

function persistSales(sales, callback){
    if(sales.length === 0 || process.env.skipPersistence) {
        callback(); //nothing to do, so continue immediately
    }else{
        resources.dbConnection(function(err, connection) {
            if(err) callback(err); else {
                logger.info('preparing to persist ' + sales.length + ' sales');
                var count = sales.length;
                _.each(sales, function(sale){ //save them in parallel
                    connection.query(
                            'INSERT INTO SALES (BUYER_NAME, SELLER_NAME, PRODUCT_ID, PRICE, QUANTITY, PO_ID, SO_ID) values (?, ?, ?, ?, ?, ?, ?)',
                            [sale.buyer.name, sale.seller.name, sale.productId, sale.price, sale.quantity, sale.po.id, sale.so.id],
                            function(err, rows, fields) {
                                if(err) callback(err); else {
                                    sale.id = rows.insertId;
                                    count--;
                                    if(count == 0){
                                        logger.info('persisted all sales');
                                        connection.release();
                                        callback();
                                    }
                                }
                            }
                    );
                });
            }
        });
    }
}

Line 5 fetches a connection from the pool and the same connection is reused for all sales, “in parallel”, and only released, i.e. returned to the pool, once the last sale is persisted, on line 19.

So, yet again, it’s time to compare the solutions via some load tests. This time I chose to see what maximum rate of sales I could achieve with each of the following three solutions:

  • Case 1 – The solution presented here, namely Spray + Akka + the async Mysql driver,
  • Case 2 – The modified Node.js solution using persistence in parallel,
  • Case 3 – The original Tomcat non-blocking connector, but with synchronous persistence.

The cases were run using the hardware from the last article, with the trading engines running on the fast hardware and the database on the slow hardware, because that was the best setup to show how blocking I/O causes performance problems. For each case, there were three variables which I could adjust while tuning. These were:

  • Number of trading engines (either as actors or as child processes),
  • Time waited by client between calls to the server,
  • Number of concurrent clients.

The last two basically tuned the number of requests per second, since the connections were not kept open awaiting the trading results (see previous article). The results were as follows, with the best performance shown in bold.

Case 1 – Spray + Akka + async Mysql driver
# trading enginesclient wait time between callsconcurrent clientssales per minuteapprox. CPU on trading hardware
8100ms6042,81025-35%
880ms7062,39225-35%
860ms8075,60030-40%
840ms9059,21730-50%
1060ms80too many DB connection problems
560ms6067,39825-35%
660ms8079,53625-35%

 

Case 2 – Node.js with persistence in parallel
# trading enginesclient wait time between callsconcurrent clientssales per minuteapprox. CPU on trading hardware
8200ms306,68440-50%
8100ms60started to lag behind
8100ms4017,05825-35%
8100ms50started to lag behind
12100ms5020,80845-60%
16100ms6024,96045-65%
20100ms8032,71845-70%
2560ms8051,23475-85%
3050ms8022,02675-85%
2510ms7017,60475-90%

 

Case 3 – Tomcat 8 NIO, with synchronous blocking persistence
# trading enginesclient wait time between callsconcurrent clientssales per minuteapprox. CPU on trading hardware
4200ms309,5865%
4150ms3010,2215%
8200ms309,5105%

The results show that bolting a NIO connector onto Tomcat and thinking that you are non-blocking and performant is dangerous, as that solution underperformed by a factor of nearly 8 compared to the Akka solution. The results also show that by using non-blocking libraries and writing a non-blocking solution in Java, it is possible to create very performant solution in comparison to Node.js. Not only was the Java solution capable of some 50% throughput, it used less than half the CPU doing so.

Very important: please note that this is a result particular to the algorithms used here and my architecture, design and implementation. It is also dependent on using “non-standard” Java libraries, and indeed, the Mysql library I used was missing functionality, for example reading generated primary keys out of the result of an insert. Please do your own experiments for your use cases before drawing conclusions on relative performance of Java vs. Scala vs. Node.js!

A noteworthy point when comparing the variation of the number of trading engines: in Node.js it directly controlled the number of child processes, analagous to the number of threads; in the Akka solution it had no effect whatsoever on the number of threads in the system – that number stayed constant! In Akka solutions, varying the number of actors has an effect on the number of messages in their inboxes.

Further information pertaining to the use of Akka and Spray can be found at this good video. Please take the time to also quickly read up about the reactive manifesto. The Akka solution presented here is reactive because it is responsive (highest throughput of all three cases), resilient (Akka provides easy ways to deal with failure, although none were necessary here), elastic (it automatically scales out because Akka manages the thread pool size in the execution context and it scales up because Akka provides transparent location of actors), and it is message driven (due to using the actor model).

[1] The Mysql library used here doesn’t require that the connection be closed and returned to the pool, as e.g. Apache database pool does. Doing so in fact causes problems! Leaving it open causes no problems, as proven by the load tests which I ran.

Ant Kutschera

Ant is a freelance Java architect and developer. He has been writing a blog and white papers since 2004 and writes about anything he finds interesting, related to Java or software. Most recently he has been working on enterprise systems involving Eclipse RCP, Google GWT, Hibernate, Spring and J2ME. He believes very strongly in being involved in all parts of the software life cycle.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

2 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Vyacheslav
Vyacheslav
8 years ago

Am i understand right that you use only one node.js process on multi-core hardware?

Ant Kutschera
8 years ago

Hi Vyacheslav,

No, the main node process forks child processes, as can been seen here: https://gist.github.com/maxant/839736d7bd7cb7214d0f (search for “.fork(” – line 38 of trading_engine_parent3.js). It creates one child process per processor, i.e. the optimum (note in the code its hard coded to 2, and should be changed according to your hardware).

Cheers,
Ant

Back to top button