Camel: Build a message based application
- Getting started with Apache Camel with Java
- Improving startup of routes with a CamelRunner
- Building message based application using Camel
But since I’ve prepared a camel-demo-1.0.0-SNAPSHOT-project.zip that has all these materials included, I thought it would easier to combine them and present it as whole.
Getting started with Apache Camel with Java
Trying out Camel with few Groovy lines is one thing, but getting a full scale project in Java is another matter. Today, I will show you how to get things started on Apache Camel with Maven based project. You may also use the provided camel-demo
as project template to jump start your own Apache Camel project. You would just need to rename the Java package and rename the pom’s group and artifact id’s to match your need.
Preparing a Maven based project with Camel dependencies
Unzip the camel-demo
project source, and you will see the basic directory layout.
camel-demo +- bin +- config +- data +- src +- pom.xml +- README.txt
What makes this demo a Camel based project is just the declaration in pom.xml
. Let’s take a look the file and its dependencies.
<?xml version='1.0' encoding='UTF-8'?> <project xmlns='http://maven.apache.org/POM/4.0.0' xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance' xsi:schemaLocation='http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd'> <modelVersion>4.0.0</modelVersion> <groupId>deng.cameldemo</groupId> <artifactId>camel-demo</artifactId> <version>1.0.0-SNAPSHOT</version> <packaging>jar</packaging> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <slf4j.version>1.6.6</slf4j.version> <camel.version>2.10.1</camel.version> </properties> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.6</source> <target>1.6</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>2.3</version> <configuration> <descriptorRefs> <descriptorRef>project</descriptorRef> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> <dependencies> <!-- Unit testing lib --> <dependency> <groupId>junit</groupId> <artifactId>junit-dep</artifactId> <version>4.10</version> <scope>test</scope> </dependency> <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-library</artifactId> <version>1.2.1</version> <scope>test</scope> </dependency> <!-- Logging lib --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${slf4j.version}</version> <scope>runtime</scope> <optional>true</optional> </dependency> <!-- Apache Commons lib --> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>2.6</version> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.0.1</version> </dependency> <!-- Apache Camel --> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-core</artifactId> <version>${camel.version}</version> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-spring</artifactId> <version>${camel.version}</version> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-groovy</artifactId> <version>${camel.version}</version> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-jackson</artifactId> <version>${camel.version}</version> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-mina</artifactId> <version>${camel.version}</version> </dependency> </dependencies> </project>
This pom.xml
decalares a Java based application and it will produce jar
. It requires minimal of JDK 6 or higher. Besides the typical junit
and hamcrest
for unit testing, I also added slf4j
for logging. I have added couple Apache’s commons-lang/io
to the project as well. I think these are basic settings that any Java based application should use them.
The maven-assembly-plugin
I have declared is only for this demo packging purpose, and you may change or remove to suite your own project need.
For Camel dependencies, you would need minimal camel-core
for routes building. And then you can add any additional components you plan to use in your project. I have added the following for building typical message based application development:
- The
camel-spring
– we want to have option to declare Camel routes in xml files as configuration. Seecamel-demo/config
directory for samples. - The
camel-jackson
– we want to process messaging data in our application as JSON format. - The
camel-mina
– we want to send messaging data accross network through TCP socket. - The
camel-groovy
– [optional] we want to be able to add dynamic scripting to route, even inside the xml config. This is great for debug and POC.
Note that since we use multiple camel components dependencies, I choose to set a Maven property ${camel.version}
so that when we upgrade Camel, it’s easier to maintain the pom.xml
file in one place.
You should able to cd into the project directory and run mvn compile
to verify that the project. It should compile without error.
Improving startup of routes with a CamelRunner
With the project pom.xml
file ready, you can start creating Camel routes to handle your own business logics. Before we get too excited, let’s try out a simple HelloRoute
to see how it works and how we can run it first. Here is the route defnition code in src/main/java/deng/cameldemo/HelloRoute.java
.
package deng.cameldemo; import org.apache.camel.builder.RouteBuilder; public class HelloRoute extends RouteBuilder { @Override public void configure() throws Exception { from('timer://helloTimer?period=3000'). to('log:' + getClass().getName()); } }
Take a test ride with the Camel
To see above in action, we need to add it into a CamelContext
and start the context. For Java standalone program, we would write this setup code in a Main
class. The Camel actually comes with a org.apache.camel.main.MainSupport
abstract class that you may use to extend your own Main
. However, I think it would be even nicer if Camel would provide a CamelRunner
that can run like this.
$ java CamelRunner deng.cameldemo.HelloRoute
Such CamelRunner
would be very user friendly and re-usable to have, so that’s what I did. I wrote one like this:
package deng.cameldemo; import org.apache.camel.CamelContext; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.impl.DefaultCamelContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.support.FileSystemXmlApplicationContext; /** * A main program to start Camel and run as a server using RouteBuilder class names or * Spring config files. * * <p>Usage: * * java deng.cameldemo.CamelRunner deng.cameldemo.HelloRoute * * or * * java -Dspring=true deng.cameldemo.CamelRunner /path/to/camel-spring.xml * * @author Zemian Deng */ public class CamelRunner { public static void main(String[] args) throws Exception { CamelRunner runner = new CamelRunner(); runner.run(args); } private static Logger logger = LoggerFactory.getLogger(CamelRunner.class); public void run(String[] args) throws Exception { if (Boolean.parseBoolean(System.getProperty('spring', 'false'))) runWithSpringConfig(args); else runWithCamelRoutes(args); // Wait for user to hit CRTL+C to stop the service synchronized(this) { this.wait(); } } private void runWithSpringConfig(String[] args) { final ConfigurableApplicationContext springContext = new FileSystemXmlApplicationContext(args); // Register proper shutdown. Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { try { springContext.close(); logger.info('Spring stopped.'); } catch (Exception e) { logger.error('Failed to stop Spring.', e); } } }); // Start spring logger.info('Spring started.'); } private void runWithCamelRoutes(String[] args) throws Exception { final CamelContext camelContext = new DefaultCamelContext(); // Register proper shutdown. Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { try { camelContext.stop(); logger.info('Camel stopped for {}', camelContext); } catch (Exception e) { logger.error('Failed to stop Camel.', e); } } }); // Added RouteBuilder from args for (String className : args) { Class<?> cls = Class.forName(className); if (RouteBuilder.class.isAssignableFrom(cls)) { Object obj = cls.newInstance(); RouteBuilder routeBuilder = (RouteBuilder)obj; camelContext.addRoutes(routeBuilder); } else { throw new RuntimeException('Unable to add Camel RouteBuilder ' + className); } } // Start camel camelContext.start(); logger.info('Camel started for {}', camelContext); } }
To help you run the main class, I have provided a run-java wrapper script under the project’s bin
directory, so that you may quickly test it without having to setup classpath.
$ mvn package $ bin/run-java deng.cameldemo.CamelRunner deng.cameldemo.HelloRoute
You will see that the program will load the HelloRoute
in a DefaultCamelContext
and start it as a server. The HelloRoute
itself will generate a 3 seconds timer message and send it to a logger, which should be printing onto your console screen. This will continue forever until you hit CTRL+C
to end it.
NOTE: You only have to invoke mvn package
command once, so that it will package up all the dependencies jars in order for run-java
to auto-detect them. If you are not going to use maven-assembly-plugin
during package
phase, then use mvn dependency:copy-dependencies
command explicitly will work fine as well.
Take a test ride with the Camel, Part 2: running Camel with Spring xml configuration
The HelloRoute
example above would simply provide route definition that formed by using component URI’s. It will be nice if we can configure the route in a declarative manner so that we may change the route without re-compile a class file. This will be very handy especially if you are not familiar with each component’s options and want to explore and try things out. Well, that’s what the camel-spring
is for. Beside giving you an option to load route in xml config file, it also provides a very flexible way to register custom services/processors bean in the Spring IoC container.
If you are a keen reader, you will notice in the CamelRunner
code above that it has an extra runWithSpringConfig
part. So the CamelRunner
can actually bootstrap any Spring xml file and start a context as a server. You may use it like this:
$ bin/run-java deng.cameldemo.CamelRunner -Dspring=true config/hellocamel-spring.xml
The config/hellocamel-spring.xml
is just an equivalent of our HelloRoute
code but in Spring xml form:
<beans xmlns='http://www.springframework.org/schema/beans' xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance' xsi:schemaLocation=' http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd'> <camelContext id='helloCamel' xmlns='http://camel.apache.org/schema/spring'> <route> <from uri='timer://jdkTimer?period=3000'/> <to uri='log://deng.cameldemo.HelloCamel'/> </route> </camelContext> </beans>
This remove the need to compile/re-compile HelloRoute
to define the Camel route to run.
Building message based application using Camel
To present you with a more practical demo, I would show you further on how to setup Camel to process message based application. In many IT shops, it’s common that you would have a server to take message data as input and process them. A practical use case is to take any JSON formated message and transform it into object and process it. To do this in Camel, what you want to build is a route that will take input messages from a TCP port, and then process it in a pipeflow with any business logic you may have. You will run the route as a server, and then client may use any mean to submit the message to the TCP port. Client may even be another thin Camel client app to submit data as well. Let me show you how to get started.
Writing the server side code with Camel route
The server side would need a route to listen from a TCP port, and this is provided by camel-mina
component. The first step is you need a route.
package deng.cameldemo; import org.apache.camel.builder.RouteBuilder; public class TcpMsgRoute extends RouteBuilder { @Override public void configure() throws Exception { String port = System.getProperty('port', '12345'); from('mina:tcp://localhost:' + port + '?sync=false'). to('log:' + getClass().getName()); } }
Then the next step is … done! No way, you mean that’s all there to it for a server? Too good to be true? Well, let’s try it out
$ bin/run-java deng.cameldemo.CamelRunner deng.cameldemo.TcpMsgRoute -Dport=12345 15:21:41 main INFO org.apache.camel.impl.DefaultCamelContext:1391 | Apache Camel 2.10.1 (CamelContext: camel-1) is starting 15:21:41 main INFO org.apache.camel.management.ManagementStrategyFactory:43 | JMX enabled. 15:21:42 main INFO org.apache.camel.impl.converter.DefaultTypeConverter:45 | Loaded 172 type converters 15:21:42 main INFO org.apache.camel.component.mina.MinaConsumer:59 | Binding to server address: localhost/127.0.0.1:12345 using acceptor: org.apache.mina.transport.socket.nio.SocketAcceptor@2ffad8fe 15:21:42 main INFO org.apache.camel.impl.DefaultCamelContext:2045 | Route: route1 started and consuming from: Endpoint[mina://tcp://localhost:12345?sync=true] 15:21:42 main INFO org.apache.camel.management.DefaultManagementLifecycleStrategy:859 | StatisticsLevel at All so enabling load performance statistics 15:21:42 main INFO org.apache.camel.impl.DefaultCamelContext:1426 | Total 1 routes, of which 1 is started. 15:21:42 main INFO org.apache.camel.impl.DefaultCamelContext:1427 | Apache Camel 2.10.1 (CamelContext: camel-1) started in 0.505 seconds 15:21:42 main INFO deng.cameldemo.CamelRunner:93 | Camel started for CamelContext(camel-1)
Voila! The server is up and waiting for your users to send messages through port 12345
. Not too bad for few lines of code.
Writing the client side code with Camel ProducerTemplate
Since our server expose a TCP port and take in any text content message, you can create any client that’s capable writing to a TCP socket. In here, I will show you how to use Camel to write a thin client.
package deng.cameldemo.client; import java.io.FileReader; import org.apache.camel.CamelContext; import org.apache.camel.ProducerTemplate; import org.apache.camel.impl.DefaultCamelContext; import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TcpMsgSender { public static void main(String[] args) throws Exception { TcpMsgSender runner = new TcpMsgSender(); runner.run(args); } private static Logger logger = LoggerFactory.getLogger(TcpMsgSender.class); public void run(String[] args) throws Exception { String fileName = args.length > 0 ? args[0] : 'data/msg.txt'; String[] hostPort = (args.length > 1 ? args[1] : 'localhost:12345').split(':'); String host = hostPort[0]; String port = hostPort.length > 1 ? hostPort[1] : '12345'; logger.info('Sending tcp message {} to host={}, port={}', new Object[]{ fileName, host, port}); String text = IOUtils.toString(new FileReader(fileName)); logger.debug('File size={}', text.length()); CamelContext camelContext = new DefaultCamelContext(); ProducerTemplate producer = camelContext.createProducerTemplate(); producer.sendBody('mina:tcp://' + host + ':' + port + '?sync=false', text); logger.info('Message sent.'); } }
This TcpMsgSender
can send any text file to your server endpoint. Try this out while your server is running:
$ bin/run-java deng.cameldemo.client.TcpMsgSender data/test-msg.json localhost:12345 15:22:35 main INFO deng.cameldemo.client.TcpMsgSender:24 | Sending tcp message data/test-msg.json to host=localhost, port=12345 15:22:35 main DEBUG deng.cameldemo.client.TcpMsgSender:27 | File size=47 15:22:35 main INFO org.apache.camel.impl.converter.DefaultTypeConverter:45 | Loaded 172 type converters 15:22:35 main INFO org.apache.camel.management.ManagementStrategyFactory:43 | JMX enabled. 15:22:35 main INFO deng.cameldemo.client.TcpMsgSender:32 | Message sent.
You should able to verify from your server console output that it received the msg. The msg I sent is in data/test-msg.json
, which contains this simple text:
{ 'firstName' : 'Zemian', 'lastName' : 'Deng' }
Note that our server simply receive plain text and log it. We will discuss how to process the message next.
Processing message data in JSON format with Camel and Spring xml config
You thought the server code was easy from above, guess again. You can actually replace the TcpMsgRoute
with just some simple xml lines!
<beans xmlns='http://www.springframework.org/schema/beans' xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance' xsi:schemaLocation=' http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd'> <camelContext id='tcpMsgServer' xmlns='http://camel.apache.org/schema/spring'> <route> <from uri='mina:tcp://localhost:12345?sync=false'/> <to uri='log://deng.cameldemo.TcpMsgServer'/> </route> </camelContext> </beans>
Save it as config/tcpmsgserver-spring.xml
. Then re-run the server, and you should get the same result as above.
$ bin/run-java deng.cameldemo.CamelRunner -Dspring=true config/tcpmsgserver-spring.xml
Now let us improve the above xml to further process the JSON message data. We will like to transform the plain text to a Java object then process by a custom bean. To do that, we first would need to add unmarshal component to the route. This is where the camel-jackson
comes into play. In our demo, the unmarshalling step would convert the JSON text into a java.util.Map
and then pass it to a processor bean named myMsgProcessor
. Let’s create a new xml file named config/tcpmsgserver-json-spring.xml
as follow.
<beans xmlns='http://www.springframework.org/schema/beans' xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance' xsi:schemaLocation=' http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd'> <camelContext id='tcpMsgServer' xmlns='http://camel.apache.org/schema/spring'> <route> <from uri='mina:tcp://localhost:12345?sync=false'/> <to uri='log://deng.cameldemo.TcpMsgServer'/> <unmarshal> <json library='Jackson'/> </unmarshal> <to uri='bean:myMsgProcessor?method=process'/> </route> </camelContext> <bean id='myMsgProcessor' class='deng.cameldemo.MyMsgProcessor'> </bean> </beans>
The myMsgProcessor
is an Spring bean that we provide custom logic code to process the data. At this point we have a full Java object to manipulate. The content of the processor can be any POJO with the method name specified in the URI. Here is an example one:
package deng.cameldemo; import org.apache.camel.builder.RouteBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; public class MyMsgProcessor { private static Logger logger = LoggerFactory.getLogger(MyMsgProcessor.class); public void process(Map<String, String> data) { logger.info('We should slice and dice the data: ' + data); } }
Try re-run the server with the new xml file above, and you should able to re-invoke the same client command to test it out. Here is a sample output of my server:
$ bin/run-java deng.cameldemo.CamelRunner -Dspring=true config/tcpmsgserver-json-spring.xml 17:05:25 main INFO org.springframework.context.support.FileSystemXmlApplicationContext:456 | Refreshing org.springframework.context.support.FileSystemXmlApplicationContext@4200309: startup date [Sat Sep 15 17:05:25 EDT 2012]; root of context hierarchy 17:05:25 main INFO org.springframework.beans.factory.xml.XmlBeanDefinitionReader:315 | Loading XML bean definitions from file [/Users/zemian/projects/sandbox/camel-demo/config/tcpmsgserver-json-spring.xml] 17:05:27 main INFO org.springframework.beans.factory.support.DefaultListableBeanFactory:557 | Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@27b75165: defining beans [template,consumerTemplate,tcpMsgServer:beanPostProcessor,tcpMsgServer,myMsgProcessor]; root of factory hierarchy 17:05:27 main INFO org.apache.camel.spring.SpringCamelContext:1391 | Apache Camel 2.10.1 (CamelContext: tcpMsgServer) is starting 17:05:27 main INFO org.apache.camel.management.ManagementStrategyFactory:43 | JMX enabled. 17:05:27 main INFO org.apache.camel.impl.converter.DefaultTypeConverter:45 | Loaded 172 type converters 17:05:28 main INFO org.apache.camel.component.mina.MinaConsumer:59 | Binding to server address: localhost/127.0.0.1:12345 using acceptor: org.apache.mina.transport.socket.nio.SocketAcceptor@5a3cae4a 17:05:28 main INFO org.apache.camel.spring.SpringCamelContext:2045 | Route: route1 started and consuming from: Endpoint[mina://tcp://localhost:12345?sync=false] 17:05:28 main INFO org.apache.camel.management.DefaultManagementLifecycleStrategy:859 | StatisticsLevel at All so enabling load performance statistics 17:05:28 main INFO org.apache.camel.spring.SpringCamelContext:1426 | Total 1 routes, of which 1 is started. 17:05:28 main INFO org.apache.camel.spring.SpringCamelContext:1427 | Apache Camel 2.10.1 (CamelContext: tcpMsgServer) started in 0.695 seconds 17:05:28 main INFO deng.cameldemo.CamelRunner:61 | Spring started. 17:05:35 Camel (tcpMsgServer) thread #3 - MinaThreadPool INFO deng.cameldemo.TcpMsgServer:96 | Exchange[ExchangePattern:InOnly, BodyType:String, Body:{ 'firstName' : 'Zemian', 'lastName' : 'Deng' }] 17:05:35 Camel (tcpMsgServer) thread #3 - MinaThreadPool INFO deng.cameldemo.MyMsgProcessor:11 | We should slice and dice the data: {lastName=Deng, firstName=Zemian}
Pay attention that Camel will auto convert the data format in your route! Our client only sends the plain text as JSON format, but when server receives it, it unmarshals it using Jackson library, and then converts it into a java Map object. It then passes the map object into our processor bean. Also, in this demo, I choose to use a generic java.util.Map
as processor method argument (which is output of the JSON unmarshal), but you can easily define your own business data type, such as MyCustomerData
. This reveals the power of Camel, since you don’t need to push the message in your flow, but only worry about writing your ‘processor’ as a POJO. The Camel will ‘glue’ components together to form a route and carry the message data through the pipeline flow.
On the same token, when you write your business logic in one or more processors, it’s a good idea that you limit your POJO logic to be as small unit as possible. When you do this, then you can maximize the reusability of the processors. The bigger POJO you make, with many business logics mixed in, it will also make it difficult to test. So I recommend you when developing these processor beans, try to think them as LEGO pieces — small POJO. You want to let Camel define the route and glue the LEGO pieces togther. Once you get into this habit of thiking, then you can use Camel in a more effectively way to solve many of your domain problems.
Well, that’s all for today folks. I hope you enjoyed the Camel ride.
Happy coding and don’t forget to share!
Reference: Building message based application using Camel from our JCG partner Zemian Deng at the A Programmer’s Journal blog.