Enterprise Java

Apache Camel Cheatsheet

 
 
 
 
 
 
 
 
 
 
 

Polling an empty directory (and send an empty message with null body) :

from('file://temp?sendEmptyMessageWhenIdle=true')

Stop a route :

.process(new Processor() {
 public void process(Exchange exchange) throws Exception {
  getContext().stopRoute('ROUTE_ID');
 }
})

Access a property of the object in the body :

admitting the object has a method named ‘getMydata()’ :

new ValueBuilder(simple('${body.mydata}')).isEqualTo(...)

Define an aggregator :

.aggregate(simple('${header.id}.substring(0,15)'), 
       genericAggregationStrategy)
.completionPredicate(header(Exchange.BATCH_COMPLETE)
      .isEqualTo(Boolean.TRUE))
  • '${header.id}.substring(0,15)' : flag to differenciate messages (here, the returned string is common to all messages, we aggregate them all)
  • Exchange.BATCH_COMPLETE : predicate indicating the end of polling (all files parsed for example)
  • genericAggregationStrategy : above, an example of an aggregator grouping all messages’ contents in a list :
public class GenericAggregationStrategy implements AggregationStrategy {
  @SuppressWarnings('unchecked')
  public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
    if (oldExchange == null) {
      ArrayList<Object> list = new ArrayList<Object>();
      list.add(newExchange.getIn().getBody());
      newExchange.getIn().setBody(list);
      return newExchange;
    } else {
      Object oldIn = oldExchange.getIn().getBody();
      ArrayList<Object> list = null;
      if(oldIn instanceof ArrayList) {
        list = (ArrayList<Object>) oldIn;
      } else {
        list = new ArrayList<Object>();
        list.add(oldIn);
      }
      list.add(newExchange.getIn().getBody());
      newExchange.getIn().setBody(list);
      return newExchange;
    }
  }
}

Manually trigger an aggregation’s completion (whatever it is) :

Send a message with the header Exchange.AGGREGATION_COMPLETE_ALL_GROUPS = true
It is possible to do from('bean:...'), knowing that the bean will be polled permanently (like with ‘file’) and re-instanciated each time.Modify the message’s body on a route, using :

.transform(myExpression)

with myExpression :

public class MyExpression implements Expression {
  public <T> T evaluate(Exchange exchange, Class<T> type) {
    MyBean newData = ...;
    return exchange.getContext().getTypeConverter()
      .convertTo(type, newData);
  }
}

Using JaxB :

  • on a route :
    .[un]marshal().jaxb('my.business_classes.package')
  • with a configurable DataFormat :
    .[un]marshal(jaxbDataFormat)

    with :

// indicate to Jaxb to not write XML prolog :
JaxbDataFormat jaxbDataFormat = 
   new JaxbDataFormat('my.business_classes.package');
jaxb.setFragment(true);

General concepts for threads management :

  • a from(...) = a thread
  • except for from('direct:...') wich creates a ‘named route’ with a unique identifier only callable by another route (in the same thread than the caller).
  • The component .resequence().batch() creates a new thread to rethrow the messages.

Define a shutdown strategy :

getContext().setShutdownStrategy(new MyShutdownStrategy(getContext()));

With :

public class MyShutdownStrategy extends DefaultShutdownStrategy {
   protected CamelContext camelContext;
   private long timeout = 1;
   private TimeUnit timeUnit = TimeUnit.SECONDS;
   public SpiralShutdownStrategy(CamelContext camelContext) {
      this.camelContext = camelContext;
   }

   @Override
   public long getTimeout() {
      return this.timeout;
   }

   @Override
   public TimeUnit getTimeUnit() {
      return this.timeUnit;
   }

   @Override
   public CamelContext getCamelContext() {
      return this.camelContext;
   }

   /**
   * To ensure shutdown
   *
   */
   @Override
   public void suspend(CamelContext context, 
        List<RouteStartupOrder> routes) throws Exception {
      doShutdown(context, routes, getTimeout(), 
          getTimeUnit(), false, false, false);
   }

   /**
   * To ensure shutdown
   *
   */
   @Override
   public void shutdown(CamelContext context, 
        List<RouteStartupOrder> routes, long timeout, 
        TimeUnit timeUnit) throws Exception {
      doShutdown(context, routes, this.timeout, 
           this.timeUnit, false, false, false);
   }

   /**
   * To ensure shutdown
   *
   */
   @Override
   public boolean shutdown(CamelContext context, RouteStartupOrder route, 
      long timeout, TimeUnit timeUnit, boolean abortAfterTimeout)
         throws Exception {
      super.shutdown(context, route, this.timeout, 
          this.timeUnit, false);
      return true;
   }
}

Stop a batch :

.process(new Processor() {
   public void process(Exchange exchange) throws Exception {
      context.stop();
   }
});

Calling a method of a bean from a route:

  1. method’s return is always affected to message’s body. For example :
    • public void myMethod(Exchange e) :
      Will not modify the body
    • public boolean myMethod(Exchange e) :
      the boolean (or whatever primitive type) will be set in the body
    • public Object myMethod(Exchange e) :
      the Object will be placed in the body (even if null)
    • public Message myMethod(Exchange e) :
      the Message will be placed in the body (better avoid this)
    • public List<Object> myMethod(Exchange e) :
      the list will be set in the body : useful to use with .split(), each object will be sent in a new message
    • public List<Message> myMethod(Exchange e) :
      the list will be set in the body : a .split() will create a new message for each element (better avoid, see upper)
  2. configurable method’s parameters :
    • public void myMethod(Exchange e) :
      the complete Exchange will be passed
    • public void myMethod(Object o) :
      Camel will try to convert the body in the required parameter’s class
    • public void myMethod(@Body File o, @Header('myHeader') String myParamHeader) :
      Camel will inject each parameter as specified

Exceptions management on routes :

  • in a global way (to be declared before all routes) :
    onException(MyException.class, RuntimeCamelException.class).to(...)...
  • to truly handle Exception and not bubble it in routes (and logs) :
    onException(...).handled(true).to(...)...
  • to continue process in a route after an Exception :
    onException(...).continued(true).to(...)...
  • An exception is ‘handled’ or ‘continued’
  • local way (in a route) :
    from(...)
    .onException(...).to('manage_error').log('FAIL !!').end()
    .to('continue_route')...

For writing file, only the header Exchange.FILE_NAME is necessary.

Reorder messages with component .resequence :

  • uses an expression to compute the new order of messages, from a unique Comparable ‘key’ (number, String or custom Comparator)
  • two ways :
    • .batch() : batch mode. Waits the reception of ALL the messages befor reorder them. ATTENTION : a new thread is created to rethrow messages.
    • .stream() : streaming mode. Uses a gap detection between the messages’ keys to re-send them. It is possible to configure a maximal capacity and a timeout.

Split the body with a token :

.split(body().tokenize('TOKEN'))

Knowing that the TOKEN will be deleted from content. For example, if receiving a message containing : ‘data1TOKENdata2TOKENdata3’, messages created will be : ‘data1’, ‘data2, ‘data3’. So avoid this when treating XML data, prefer ‘tokenizeXML()’.

Dynamic access to body’s data :

Sending mails :

from('direct:mail')
 .setHeader('To', constant(mailTo))
 .setHeader('From', constant(mailFrom)) 
 .setHeader('Subject', constant(mailSubject)) 
 .to('smtp://${user}@${server}:${port}?password=${password}');

With attachment :

.beanRef(MAIL_ATTACHER, 'attachLog');
//with
public class MailAttacher {
   public void attachLog(Exchange exc) throws Exception {
      File toAttach = ...;   
      exc.getIn().addAttachment(toAttach.getName(), 
               new DataHandler(new FileDataSource(toAttach)));
      // if needed
      exc.setProperty(Exchange.CHARSET_NAME, 'UTF-8');
   }
}

Useful Exchange’s properties :

  • Exchange.AGGREGATED_* : aggregations management
  • Exchange.BATCH_* : treated messages management
  • Exchange.FILE_* : File messages management
  • Exchange.HTTP_* : web requests management
  • Exchange.LOOP_* : loops management
  • Exchange.REDELIVERY_* : exceptions management
  • Exchange.SPLIT_* : splitted contents management

Loop a route :

from('direct:...')
.loop(countExpression)
.to('direct:insideLoop')
.end()

Where ‘countExpression’ is an Expression used to dynamically compute the loop count (evaluated entering the loop). It is preferable to move the loop’s code in a new route if the process is complex.

Headers management :

Message’s headers are defined at its creation. When using a ‘.split()’, all subsequent messages will have the same headers from the original message (so be careful when managing files). In an aggregation, custom headers will have to be managed manually to be preserved in the rest of the route.

Intercept a message

and execute a route parallely (to be declared before routes) :

interceptSendToEndpoint('ENDPOINT_TO_INTERSEPT').to(...)...

 

Reference: Apache Camel Cheatsheet from our JCG partner Paul-Emmanuel at the Developpef blog.

Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button