About Bilgin Ibryam

Bilgin Ibryam is a senior software engineer based in London interested in service-oriented architecture, enterprise application integration and application development. He is also open source enthusiast, Apache Open for Business and Apache Camel committer.

Transactional caching for Camel with Infinispan

Some time ago I created a Redis connector for Camel. Redis is awesome key-value store (and a lot more) but then I needed a cache running in the same JVM as Camel and noticed Infinispan which has just switched to ASL v2. There are already other connectors in Camel for caching on the JVM, like Hazelcast and EHCache, but if you are already using Camel as part of other Red Hat products or want to see how LIRS eviction overperforms LRU, Infinispan is worth trying.

Briefly, Infinispan is transactional in-memory key-value store and data grid. When used in embedded mode, Infinispan resides in the same JVM as Camel and allows Camel consumer to receive cache change notifications:
 

<route>
  <from uri="infinispan://localhost?cacheContainer=#cacheContainer&caseName=orders&eventTypes=CACHE_ENTRY_CREATED"/>
  <filter>
    <simple>${out.header.CamelInfinispanIsPre} == true</simple>
    <to uri="log:com.mycompany.order?showHeaders=true"/>
  </filter>
</route>

In the example above, when a cache entry is created, Infinispan will fire two events – one before and one after the cache entry has been created. It is also possible to receive the events synchronously, meaning in the same thread that processes the cache action, or asynchronously in a separate thread without blocking the cache action.

Using Infinispan as a local cache is simple, it exposes a ConcurrentMap interface and has the usual expiration, eviction, passivation, persistent store, querying, etc features. What makes Infinispan also a data grid is the ability of the nodes to discover other nodes and replicate or distribute data among themselves. Replication allows sharing data across a cluster whereas distribution uses consistent hashing algorithm to achieve better scalability.

In client-server mode, Infinispan is running as standalone application, and Camel producer can send messages using Infinispan’s Hot Rod client. Hot Rod is a binary, language neutral, intelligent protocol, allowing interaction with Infinisnap servers in topology and hash-distribution-aware fashion.

The Infinispan producer in Camel currently offers GET, PUT, REMOVE and CLEAR operations. Here is an example of the producer putting data to orders cache:

<route>
  <from uri="direct:orderCache"/>
  <setHeader headerName="CamelInfinispanKey">
    <simple>${in.header.orderId}</simple>
  </setHeader>
  <setHeader headerName="CamelInfinispanValue">
    <simple>${in.header.orderTotal}</simple>
  </setHeader>
  <setHeader headerName="CamelInfinispanOperation">
    <simple>CamelInfinispanOperationPut</simple>
  </setHeader>
  <to uri="infinispan://localhost?caseName=orders"/>
</route>

Let’s create a more interesting example. Infinispan is also JTA compliant and can participate in transactions. We will create a REST API for registering persons, which will first persist the person in a relational database using Camel sql component, and then will put the firstName into Infinispan cache in the same transaction. We will do that using a transacted Camel route, so if an error occurs during routing, at any stage, Camel will make sure the transaction(for the cache and the database) is rolled back, so that the database and the cache are always in a consistent state.

<route>
  <from uri="restlet:/persons?restletMethod=POST"/>
  <transacted/>

  <!-- PERSIST TO DB -->
  <to uri="sql:insert into person(firstName, lastName) values(:#firstName,:#lastName)?dataSource=#dataSource"/>

  <!-- DAMN EXCEPTION THROWER-->
  <filter>
    <simple>${in.header.lastName} == "damn"</simple>
    <throwException ref="damn"/>
  </filter>

  <!-- PUT TO CACHE -->
  <to uri="sql:select id from person WHERE id = (select max(id) from person)?dataSource=#dataSource"/>
  <setHeader headerName="personId">
    <simple>${body[0][ID]}</simple>
  </setHeader>
  <setHeader headerName="CamelInfinispanKey">
    <simple>${headerAs(personId, String)}</simple>
  </setHeader>
  <setHeader headerName="CamelInfinispanValue">
    <simple>${in.header.firstName}</simple>
  </setHeader>
  <setHeader headerName="CamelInfinispanOperation">
    <simple>CamelInfinispanOperationPut</simple>
  </setHeader>
  <to uri="infinispan://localhost?cacheContainer=#cacheContainer&caseName=orders"/>
</route>

As you can see there is no magic or extra configuration in the route, it is a standard route. We have small bit of code that throws an exception when the person lastName is damn to simulate errors in the middle of the route.

The application runs in a standalone mode with atomikos JTA transaction manager. First we create a JtaTransactionManager to be used by the transacted route:

<bean id="userTransaction" class="com.atomikos.icatch.jta.UserTransactionImp"/>
<bean id="userTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"/>
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"> 
  <constructor-arg ref="userTransaction"/>
  <constructor-arg ref="userTransactionManager"/>
</bean>

Then wrap our DataSource with it:

public AtomikosDataSourceBean atomikosDataSourceBean() throws Exception {
    EmbeddedXADataSource ds = new EmbeddedXADataSource();
    ds.setCreateDatabase("create");
    ds.setDatabaseName("target/testdb");
    ds.setUser("");
    ds.setPassword("");
 
    AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
    xaDataSource.setXaDataSource(ds);
    xaDataSource.setUniqueResourceName("xaDerby");
    return xaDataSource;
}

and using a TransactionManagerLookup tell Infinispan to participate in the same transaction:

public BasicCacheContainer basicCacheContainer() throws Throwable {
    GlobalConfiguration glob = new GlobalConfigurationBuilder().nonClusteredDefault().build();

    Configuration loc = new ConfigurationBuilder()
        .transaction().transactionMode(TransactionMode.TRANSACTIONAL)
        .transactionManagerLookup(new TransactionManagerLookup() {
            @Override
            public TransactionManager getTransactionManager() throws Exception {
            return jtaTransactionManager.getTransactionManager();
            }
        }).build();

    return new DefaultCacheManager(glob, loc, true);
}

After all this boilerplate code, we have our datasource, cache and Camel route participating in the same transaction. To see the full REST example with two phase commit and rollback, get the source code from github and play with it.

BTW Camel-infinispan component is still not part of Camel trunk, to run the example you will need that too.
 

Reference: Transactional caching for Camel with Infinispan from our JCG partner Bilgin Ibryam at the OFBIZian blog.
Related Whitepaper:

Functional Programming in Java: Harnessing the Power of Java 8 Lambda Expressions

Get ready to program in a whole new way!

Functional Programming in Java will help you quickly get on top of the new, essential Java 8 language features and the functional style that will change and improve your code. This short, targeted book will help you make the paradigm shift from the old imperative way to a less error-prone, more elegant, and concise coding style that’s also a breeze to parallelize. You’ll explore the syntax and semantics of lambda expressions, method and constructor references, and functional interfaces. You’ll design and write applications better using the new standards in Java 8 and the JDK.

Get it Now!  

Leave a Reply


+ 4 = thirteen



Java Code Geeks and all content copyright © 2010-2014, Exelixis Media Ltd | Terms of Use
All trademarks and registered trademarks appearing on Java Code Geeks are the property of their respective owners.
Java is a trademark or registered trademark of Oracle Corporation in the United States and other countries.
Java Code Geeks is not connected to Oracle Corporation and is not sponsored by Oracle Corporation.

Sign up for our Newsletter

15,153 insiders are already enjoying weekly updates and complimentary whitepapers! Join them now to gain exclusive access to the latest news in the Java world, as well as insights about Android, Scala, Groovy and other related technologies.

As an extra bonus, by joining you will get our brand new e-books, published by Java Code Geeks and their JCG partners for your reading pleasure! Enter your info and stay on top of things,

  • Fresh trends
  • Cases and examples
  • Research and insights
  • Two complimentary e-books