About Bilgin Ibryam

Bilgin Ibryam is a senior software engineer based in London interested in service-oriented architecture, enterprise application integration and application development. He is also open source enthusiast, Apache Open for Business and Apache Camel committer.

Transactional caching for Camel with Infinispan

Some time ago I created a Redis connector for Camel. Redis is awesome key-value store (and a lot more) but then I needed a cache running in the same JVM as Camel and noticed Infinispan which has just switched to ASL v2. There are already other connectors in Camel for caching on the JVM, like Hazelcast and EHCache, but if you are already using Camel as part of other Red Hat products or want to see how LIRS eviction overperforms LRU, Infinispan is worth trying.

Briefly, Infinispan is transactional in-memory key-value store and data grid. When used in embedded mode, Infinispan resides in the same JVM as Camel and allows Camel consumer to receive cache change notifications:
 

<route>
  <from uri="infinispan://localhost?cacheContainer=#cacheContainer&caseName=orders&eventTypes=CACHE_ENTRY_CREATED"/>
  <filter>
    <simple>${out.header.CamelInfinispanIsPre} == true</simple>
    <to uri="log:com.mycompany.order?showHeaders=true"/>
  </filter>
</route>

In the example above, when a cache entry is created, Infinispan will fire two events – one before and one after the cache entry has been created. It is also possible to receive the events synchronously, meaning in the same thread that processes the cache action, or asynchronously in a separate thread without blocking the cache action.

Using Infinispan as a local cache is simple, it exposes a ConcurrentMap interface and has the usual expiration, eviction, passivation, persistent store, querying, etc features. What makes Infinispan also a data grid is the ability of the nodes to discover other nodes and replicate or distribute data among themselves. Replication allows sharing data across a cluster whereas distribution uses consistent hashing algorithm to achieve better scalability.

In client-server mode, Infinispan is running as standalone application, and Camel producer can send messages using Infinispan’s Hot Rod client. Hot Rod is a binary, language neutral, intelligent protocol, allowing interaction with Infinisnap servers in topology and hash-distribution-aware fashion.

The Infinispan producer in Camel currently offers GET, PUT, REMOVE and CLEAR operations. Here is an example of the producer putting data to orders cache:

<route>
  <from uri="direct:orderCache"/>
  <setHeader headerName="CamelInfinispanKey">
    <simple>${in.header.orderId}</simple>
  </setHeader>
  <setHeader headerName="CamelInfinispanValue">
    <simple>${in.header.orderTotal}</simple>
  </setHeader>
  <setHeader headerName="CamelInfinispanOperation">
    <simple>CamelInfinispanOperationPut</simple>
  </setHeader>
  <to uri="infinispan://localhost?caseName=orders"/>
</route>

Let’s create a more interesting example. Infinispan is also JTA compliant and can participate in transactions. We will create a REST API for registering persons, which will first persist the person in a relational database using Camel sql component, and then will put the firstName into Infinispan cache in the same transaction. We will do that using a transacted Camel route, so if an error occurs during routing, at any stage, Camel will make sure the transaction(for the cache and the database) is rolled back, so that the database and the cache are always in a consistent state.

<route>
  <from uri="restlet:/persons?restletMethod=POST"/>
  <transacted/>

  <!-- PERSIST TO DB -->
  <to uri="sql:insert into person(firstName, lastName) values(:#firstName,:#lastName)?dataSource=#dataSource"/>

  <!-- DAMN EXCEPTION THROWER-->
  <filter>
    <simple>${in.header.lastName} == "damn"</simple>
    <throwException ref="damn"/>
  </filter>

  <!-- PUT TO CACHE -->
  <to uri="sql:select id from person WHERE id = (select max(id) from person)?dataSource=#dataSource"/>
  <setHeader headerName="personId">
    <simple>${body[0][ID]}</simple>
  </setHeader>
  <setHeader headerName="CamelInfinispanKey">
    <simple>${headerAs(personId, String)}</simple>
  </setHeader>
  <setHeader headerName="CamelInfinispanValue">
    <simple>${in.header.firstName}</simple>
  </setHeader>
  <setHeader headerName="CamelInfinispanOperation">
    <simple>CamelInfinispanOperationPut</simple>
  </setHeader>
  <to uri="infinispan://localhost?cacheContainer=#cacheContainer&caseName=orders"/>
</route>

As you can see there is no magic or extra configuration in the route, it is a standard route. We have small bit of code that throws an exception when the person lastName is damn to simulate errors in the middle of the route.

The application runs in a standalone mode with atomikos JTA transaction manager. First we create a JtaTransactionManager to be used by the transacted route:

<bean id="userTransaction" class="com.atomikos.icatch.jta.UserTransactionImp"/>
<bean id="userTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"/>
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"> 
  <constructor-arg ref="userTransaction"/>
  <constructor-arg ref="userTransactionManager"/>
</bean>

Then wrap our DataSource with it:

public AtomikosDataSourceBean atomikosDataSourceBean() throws Exception {
    EmbeddedXADataSource ds = new EmbeddedXADataSource();
    ds.setCreateDatabase("create");
    ds.setDatabaseName("target/testdb");
    ds.setUser("");
    ds.setPassword("");
 
    AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
    xaDataSource.setXaDataSource(ds);
    xaDataSource.setUniqueResourceName("xaDerby");
    return xaDataSource;
}

and using a TransactionManagerLookup tell Infinispan to participate in the same transaction:

public BasicCacheContainer basicCacheContainer() throws Throwable {
    GlobalConfiguration glob = new GlobalConfigurationBuilder().nonClusteredDefault().build();

    Configuration loc = new ConfigurationBuilder()
        .transaction().transactionMode(TransactionMode.TRANSACTIONAL)
        .transactionManagerLookup(new TransactionManagerLookup() {
            @Override
            public TransactionManager getTransactionManager() throws Exception {
            return jtaTransactionManager.getTransactionManager();
            }
        }).build();

    return new DefaultCacheManager(glob, loc, true);
}

After all this boilerplate code, we have our datasource, cache and Camel route participating in the same transaction. To see the full REST example with two phase commit and rollback, get the source code from github and play with it.

BTW Camel-infinispan component is still not part of Camel trunk, to run the example you will need that too.
 

Reference: Transactional caching for Camel with Infinispan from our JCG partner Bilgin Ibryam at the OFBIZian blog.

Do you want to know how to develop your skillset to become a Java Rockstar?

Subscribe to our newsletter to start Rocking right now!

To get you started we give you two of our best selling eBooks for FREE!

JPA Mini Book

Learn how to leverage the power of JPA in order to create robust and flexible Java applications. With this Mini Book, you will get introduced to JPA and smoothly transition to more advanced concepts.

JVM Troubleshooting Guide

The Java virtual machine is really the foundation of any Java EE platform. Learn how to master it with this advanced guide!

Given email address is already subscribed, thank you!
Oops. Something went wrong. Please try again later.
Please provide a valid email address.
Thank you, your sign-up request was successful! Please check your e-mail inbox.
Please complete the CAPTCHA.
Please fill in the required fields.

Leave a Reply


+ 3 = eleven



Java Code Geeks and all content copyright © 2010-2014, Exelixis Media Ltd | Terms of Use | Privacy Policy
All trademarks and registered trademarks appearing on Java Code Geeks are the property of their respective owners.
Java is a trademark or registered trademark of Oracle Corporation in the United States and other countries.
Java Code Geeks is not connected to Oracle Corporation and is not sponsored by Oracle Corporation.
Do you want to know how to develop your skillset and become a ...
Java Rockstar?

Subscribe to our newsletter to start Rocking right now!

To get you started we give you two of our best selling eBooks for FREE!

Get ready to Rock!
You can download the complementary eBooks using the links below:
Close