Apache Camel Cheatsheet

 
 
 
 
 
 
 
 
 
 
 

Polling an empty directory (and send an empty message with null body) :

from('file://temp?sendEmptyMessageWhenIdle=true')

Stop a route :

.process(new Processor() {
 public void process(Exchange exchange) throws Exception {
  getContext().stopRoute('ROUTE_ID');
 }
})

Access a property of the object in the body :

admitting the object has a method named ‘getMydata()’ :

new ValueBuilder(simple('${body.mydata}')).isEqualTo(...)

Define an aggregator :

.aggregate(simple('${header.id}.substring(0,15)'), 
       genericAggregationStrategy)
.completionPredicate(header(Exchange.BATCH_COMPLETE)
      .isEqualTo(Boolean.TRUE))
  • '${header.id}.substring(0,15)' : flag to differenciate messages (here, the returned string is common to all messages, we aggregate them all)
  • Exchange.BATCH_COMPLETE : predicate indicating the end of polling (all files parsed for example)
  • genericAggregationStrategy : above, an example of an aggregator grouping all messages’ contents in a list :
public class GenericAggregationStrategy implements AggregationStrategy {
  @SuppressWarnings('unchecked')
  public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
    if (oldExchange == null) {
      ArrayList<Object> list = new ArrayList<Object>();
      list.add(newExchange.getIn().getBody());
      newExchange.getIn().setBody(list);
      return newExchange;
    } else {
      Object oldIn = oldExchange.getIn().getBody();
      ArrayList<Object> list = null;
      if(oldIn instanceof ArrayList) {
        list = (ArrayList<Object>) oldIn;
      } else {
        list = new ArrayList<Object>();
        list.add(oldIn);
      }
      list.add(newExchange.getIn().getBody());
      newExchange.getIn().setBody(list);
      return newExchange;
    }
  }
}

Manually trigger an aggregation’s completion (whatever it is) :

Send a message with the header Exchange.AGGREGATION_COMPLETE_ALL_GROUPS = true
It is possible to do from('bean:...'), knowing that the bean will be polled permanently (like with ‘file’) and re-instanciated each time.Modify the message’s body on a route, using :

.transform(myExpression)

with myExpression :

public class MyExpression implements Expression {
  public <T> T evaluate(Exchange exchange, Class<T> type) {
    MyBean newData = ...;
    return exchange.getContext().getTypeConverter()
      .convertTo(type, newData);
  }
}

Using JaxB :

  • on a route :
    .[un]marshal().jaxb('my.business_classes.package')
  • with a configurable DataFormat :
    .[un]marshal(jaxbDataFormat)

    with :

// indicate to Jaxb to not write XML prolog :
JaxbDataFormat jaxbDataFormat = 
   new JaxbDataFormat('my.business_classes.package');
jaxb.setFragment(true);

General concepts for threads management :

  • a from(...) = a thread
  • except for from('direct:...') wich creates a ‘named route’ with a unique identifier only callable by another route (in the same thread than the caller).
  • The component .resequence().batch() creates a new thread to rethrow the messages.

Define a shutdown strategy :

getContext().setShutdownStrategy(new MyShutdownStrategy(getContext()));

With :

public class MyShutdownStrategy extends DefaultShutdownStrategy {
   protected CamelContext camelContext;
   private long timeout = 1;
   private TimeUnit timeUnit = TimeUnit.SECONDS;
   public SpiralShutdownStrategy(CamelContext camelContext) {
      this.camelContext = camelContext;
   }

   @Override
   public long getTimeout() {
      return this.timeout;
   }

   @Override
   public TimeUnit getTimeUnit() {
      return this.timeUnit;
   }

   @Override
   public CamelContext getCamelContext() {
      return this.camelContext;
   }

   /**
   * To ensure shutdown
   *
   */
   @Override
   public void suspend(CamelContext context, 
        List<RouteStartupOrder> routes) throws Exception {
      doShutdown(context, routes, getTimeout(), 
          getTimeUnit(), false, false, false);
   }

   /**
   * To ensure shutdown
   *
   */
   @Override
   public void shutdown(CamelContext context, 
        List<RouteStartupOrder> routes, long timeout, 
        TimeUnit timeUnit) throws Exception {
      doShutdown(context, routes, this.timeout, 
           this.timeUnit, false, false, false);
   }

   /**
   * To ensure shutdown
   *
   */
   @Override
   public boolean shutdown(CamelContext context, RouteStartupOrder route, 
      long timeout, TimeUnit timeUnit, boolean abortAfterTimeout)
         throws Exception {
      super.shutdown(context, route, this.timeout, 
          this.timeUnit, false);
      return true;
   }
}

Stop a batch :

.process(new Processor() {
   public void process(Exchange exchange) throws Exception {
      context.stop();
   }
});

Calling a method of a bean from a route:

  1. method’s return is always affected to message’s body. For example :
    • public void myMethod(Exchange e) :
      Will not modify the body
    • public boolean myMethod(Exchange e) :
      the boolean (or whatever primitive type) will be set in the body
    • public Object myMethod(Exchange e) :
      the Object will be placed in the body (even if null)
    • public Message myMethod(Exchange e) :
      the Message will be placed in the body (better avoid this)
    • public List<Object> myMethod(Exchange e) :
      the list will be set in the body : useful to use with .split(), each object will be sent in a new message
    • public List<Message> myMethod(Exchange e) :
      the list will be set in the body : a .split() will create a new message for each element (better avoid, see upper)
  2. configurable method’s parameters :
    • public void myMethod(Exchange e) :
      the complete Exchange will be passed
    • public void myMethod(Object o) :
      Camel will try to convert the body in the required parameter’s class
    • public void myMethod(@Body File o, @Header('myHeader') String myParamHeader) :
      Camel will inject each parameter as specified

Exceptions management on routes :

  • in a global way (to be declared before all routes) :
    onException(MyException.class, RuntimeCamelException.class).to(...)...
  • to truly handle Exception and not bubble it in routes (and logs) :
    onException(...).handled(true).to(...)...
  • to continue process in a route after an Exception :
    onException(...).continued(true).to(...)...
  • An exception is ‘handled’ or ‘continued’
  • local way (in a route) :
    from(...)
    .onException(...).to('manage_error').log('FAIL !!').end()
    .to('continue_route')...

For writing file, only the header Exchange.FILE_NAME is necessary.

Reorder messages with component .resequence :

  • uses an expression to compute the new order of messages, from a unique Comparable ‘key’ (number, String or custom Comparator)
  • two ways :
    • .batch() : batch mode. Waits the reception of ALL the messages befor reorder them. ATTENTION : a new thread is created to rethrow messages.
    • .stream() : streaming mode. Uses a gap detection between the messages’ keys to re-send them. It is possible to configure a maximal capacity and a timeout.

Split the body with a token :

.split(body().tokenize('TOKEN'))

Knowing that the TOKEN will be deleted from content. For example, if receiving a message containing : ‘data1TOKENdata2TOKENdata3′, messages created will be : ‘data1′, ‘data2, ‘data3′. So avoid this when treating XML data, prefer ‘tokenizeXML()’.

Dynamic access to body’s data :

Sending mails :

from('direct:mail')
 .setHeader('To', constant(mailTo))
 .setHeader('From', constant(mailFrom)) 
 .setHeader('Subject', constant(mailSubject)) 
 .to('smtp://${user}@${server}:${port}?password=${password}');

With attachment :

.beanRef(MAIL_ATTACHER, 'attachLog');
//with
public class MailAttacher {
   public void attachLog(Exchange exc) throws Exception {
      File toAttach = ...;   
      exc.getIn().addAttachment(toAttach.getName(), 
               new DataHandler(new FileDataSource(toAttach)));
      // if needed
      exc.setProperty(Exchange.CHARSET_NAME, 'UTF-8');
   }
}

Useful Exchange’s properties :

  • Exchange.AGGREGATED_* : aggregations management
  • Exchange.BATCH_* : treated messages management
  • Exchange.FILE_* : File messages management
  • Exchange.HTTP_* : web requests management
  • Exchange.LOOP_* : loops management
  • Exchange.REDELIVERY_* : exceptions management
  • Exchange.SPLIT_* : splitted contents management

Loop a route :

from('direct:...')
.loop(countExpression)
.to('direct:insideLoop')
.end()

Where ‘countExpression’ is an Expression used to dynamically compute the loop count (evaluated entering the loop). It is preferable to move the loop’s code in a new route if the process is complex.

Headers management :

Message’s headers are defined at its creation. When using a ‘.split()’, all subsequent messages will have the same headers from the original message (so be careful when managing files). In an aggregation, custom headers will have to be managed manually to be preserved in the rest of the route.

Intercept a message

and execute a route parallely (to be declared before routes) :

interceptSendToEndpoint('ENDPOINT_TO_INTERSEPT').to(...)...

 

Reference: Apache Camel Cheatsheet from our JCG partner Paul-Emmanuel at the Developpef blog.

Related Whitepaper:

Functional Programming in Java: Harnessing the Power of Java 8 Lambda Expressions

Get ready to program in a whole new way!

Functional Programming in Java will help you quickly get on top of the new, essential Java 8 language features and the functional style that will change and improve your code. This short, targeted book will help you make the paradigm shift from the old imperative way to a less error-prone, more elegant, and concise coding style that’s also a breeze to parallelize. You’ll explore the syntax and semantics of lambda expressions, method and constructor references, and functional interfaces. You’ll design and write applications better using the new standards in Java 8 and the JDK.

Get it Now!  

Leave a Reply


− 3 = four



Java Code Geeks and all content copyright © 2010-2014, Exelixis Media Ltd | Terms of Use
All trademarks and registered trademarks appearing on Java Code Geeks are the property of their respective owners.
Java is a trademark or registered trademark of Oracle Corporation in the United States and other countries.
Java Code Geeks is not connected to Oracle Corporation and is not sponsored by Oracle Corporation.

Sign up for our Newsletter

15,153 insiders are already enjoying weekly updates and complimentary whitepapers! Join them now to gain exclusive access to the latest news in the Java world, as well as insights about Android, Scala, Groovy and other related technologies.

As an extra bonus, by joining you will get our brand new e-books, published by Java Code Geeks and their JCG partners for your reading pleasure! Enter your info and stay on top of things,

  • Fresh trends
  • Cases and examples
  • Research and insights
  • Two complimentary e-books