Building Camel-CXF REST Service in OSGi for Karaf – Multicasting and Aggregation

Please check out my other post on building plain CXF services (without Camel) in OSGi on Karaf.

This is a basic tutorial on how to

  1. create a CXF REST service
  2. multicast (and parallelize) the incoming request using Camel
  3. source data from two different services
  4. aggregate the response and
  5. finally return the consolidated result as JSON to the the end user.

You could download the entire codebase from github.

What this application does, in simple terms

The result expected from this service is a hardcoded response which looks like

FinalOutput

As you could see from the image, the top portion of the response is sourced from a service called NameEmailService and the second portion of the response is sourced from a service called AgePhoneService. The calls to enrich both the data are done concurrently and the consolidated result entity – ConsolidatedSearchResult is populated.

The Project structure looks like this :

ProjectStructure2

There are two baby steps for Step 1.

Step 1.a – Create a CXF REST Service

As you might have guessed, there’s nothing complicated in this step. Just an interface and an implementation.

RESTClassDiagram2

Interface

@Path("rest")
public interface RestService {

    @GET
    @Path("query/{queryString}")
    @Produces(MediaType.APPLICATION_JSON)
    public String sourceResultsFromTwoSources(@PathParam("queryString") String queryString);

}

Implementation

public class RestServiceImpl implements RestService {

    private static Logger logger= LoggerFactory.getLogger(AgePhoneServiceImpl.class);

    private NameEmailService nameEmailService;
    private AgePhoneService agePhoneService;

    public RestServiceImpl(){
    }

    //Do nothing. Camel intercepts and routes the requests
    public String sourceResultsFromTwoSources(String queryString) {
        return null;
    }

    public NameEmailResult getNameEmailResult(String queryString){
        logger.info("Invoking getNameEmailResult from RestServiceImpl");
        return nameEmailService.getNameAndEmail(queryString);
    }

    public AgePhoneResult getAgePhoneResult(String queryString){
        logger.info("Invoking getAgePhoneResult from RestServiceImpl");
        return agePhoneService.getAgePhoneResult(queryString);
    }

    public NameEmailService getNameEmailService() {
        return nameEmailService;
    }

    public AgePhoneService getAgePhoneService() {
        return agePhoneService;
    }

    public void setNameEmailService(NameEmailService nameEmailService) {
        this.nameEmailService = nameEmailService;
    }

    public void setAgePhoneService(AgePhoneService agePhoneService) {
        this.agePhoneService = agePhoneService;
    }
}

Note that the method implementation sourceResultsFromTwoSources returns a null. The truth is that this method doesn’t even get called when making a REST call. Camel intercepts all requests to the URL and routes it to various endpoints (calls two methods – getNameEmailResult() and getAgePhoneResult(), in our case).

Step 1.b – Create the Service Implementation

Kiddish implementations of the NameEmailService and the AgePhoneService are below :

ServiceClassDiagram2

NameEmailServiceImpl

public class NameEmailServiceImpl implements NameEmailService {

    public NameEmailResult getNameAndEmail(String queryString){

        return new NameEmailResult("Arun", "arun@arunma.com");

    }

}

AgePhoneServiceImpl

public class AgePhoneServiceImpl implements AgePhoneService {

    public AgePhoneResult getAgePhoneResult(String queryString){
        return new AgePhoneResult(32, "111-222-333");
    }
}

Step 2, 3, 4 & 5

Well, I lied when I said 2,3,4 and 5 were 4 steps. They are all done as a single step using Camel routing and its Enterprise Integration Pattern implementations.

RestToBeanRouter

public class RestToBeanRouter extends RouteBuilder {

    @Override
    public void configure() throws Exception {

        from ("cxfrs://bean://rsServer")
                .multicast()
                .parallelProcessing()
                .aggregationStrategy(new ResultAggregator())
                .beanRef("restServiceImpl", "getNameEmailResult")
                .beanRef("restServiceImpl", "getAgePhoneResult")
                .end()
                .marshal().json(JsonLibrary.Jackson)
                .to("log://camelLogger?level=DEBUG");
    }
}

Our Routing explained

Simply put, what our routerbuilder does is that it

1) from ("cxfrs://bean://rsServer") Intercepts all requests to a JAX-RS server endpoint defined in the rest-blueprint.xml as

rest-blueprint.xml

<cxf:rsServer id="rsServer" address="/karafcxfcamel"
                  serviceClass="me.rerun.karafcxfcamel.rest.RestServiceImpl"
                  loggingFeatureEnabled="true" />

2) The .multicast() forwards the original request untouched to

1. `getNameEmailResult`  &
2. `getAgePhoneResult` methods in `RestServiceImpl`

3) The .parallelProcessing() places concurrent calls to the methods.

4) The .aggregationStrategy(new ResultAggregator()) specifies how the results from various multicasted sources should be, well, aggregated.

Our aggregator looks like :

ResultAggregator

public class ResultAggregator implements AggregationStrategy {

    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {

        ConsolidatedSearchResult consolidatedSearchResult=null;

        if (oldExchange==null){
            consolidatedSearchResult=new ConsolidatedSearchResult();
        }
        else{
            consolidatedSearchResult=oldExchange.getIn().getBody(ConsolidatedSearchResult.class);
        }

        NameEmailResult nameEmailResult=newExchange.getIn().getBody(NameEmailResult.class);
        AgePhoneResult agePhoneResult=newExchange.getIn().getBody(AgePhoneResult.class);

        if (nameEmailResult!=null){
            consolidatedSearchResult.setNameEmailResult(nameEmailResult);
        }

        if (agePhoneResult!=null){
            consolidatedSearchResult.setAgePhoneResult(agePhoneResult);
        }

        newExchange.getIn().setBody(consolidatedSearchResult);

        return newExchange;
    }
}

Our Aggregator explained

The aggregate method in our ResultAggregator is a little crude but does the job.

  1. The aggregate method gets called for all multicasted endpoints whenever they finish.
  2. So, the first time, the oldExchange will be null. We take that as an opportunity to construct the final consolidated result entity that we wanted to respond to the user.
  3. We check whether the newExchange that comes in is the result of a call to the NameEmailService or AgePhoneService and populate the consolidated entity accordingly.
  4. Finally, we return the consolidated entity – the returning does two jobs.
    1. The consolidated entity comes in as oldExchange for the next call to the aggregate method. (more like chaining – the last returned object from the entity is the one which comes in as the incoming exchange for the next call)
    2. Gets returned back to the user if it is the last call of aggregate (all multicast endpoints calls are complete).

 

Related Whitepaper:

Functional Programming in Java: Harnessing the Power of Java 8 Lambda Expressions

Get ready to program in a whole new way!

Functional Programming in Java will help you quickly get on top of the new, essential Java 8 language features and the functional style that will change and improve your code. This short, targeted book will help you make the paradigm shift from the old imperative way to a less error-prone, more elegant, and concise coding style that’s also a breeze to parallelize. You’ll explore the syntax and semantics of lambda expressions, method and constructor references, and functional interfaces. You’ll design and write applications better using the new standards in Java 8 and the JDK.

Get it Now!  

Leave a Reply


5 + = six



Java Code Geeks and all content copyright © 2010-2014, Exelixis Media Ltd | Terms of Use | Privacy Policy
All trademarks and registered trademarks appearing on Java Code Geeks are the property of their respective owners.
Java is a trademark or registered trademark of Oracle Corporation in the United States and other countries.
Java Code Geeks is not connected to Oracle Corporation and is not sponsored by Oracle Corporation.

Sign up for our Newsletter

20,709 insiders are already enjoying weekly updates and complimentary whitepapers! Join them now to gain exclusive access to the latest news in the Java world, as well as insights about Android, Scala, Groovy and other related technologies.

As an extra bonus, by joining you will get our brand new e-books, published by Java Code Geeks and their JCG partners for your reading pleasure! Enter your info and stay on top of things,

  • Fresh trends
  • Cases and examples
  • Research and insights
  • Two complimentary e-books