Spring Integration Publisher

Consider a hypothetical requirement – You have a service class in your application and you want to capture some information around the calls to this service:
 
 
 
 
 
 
 
 
 

@Service
public class SampleBean {
 private static final Logger logger = LoggerFactory.getLogger(SampleBean.class);

 public Response call(Request request) {
  logger.info("SampleBean.call invoked");
  return new Response(true);
 }
}

AOP is a great fit for such a requirement, it allows the information around a method call(a pointcut) to be cleanly captured and some processing(an advice) to be done with this information:

public class AuditAspect {
 private static final Logger logger = LoggerFactory.getLogger(AuditAspect.class);
 @Pointcut("execution( * core.SampleBean.call(model.Request)) && args(r)")
 public void beanCalls(Request r){}

 @Around("beanCalls(r)")
 public Object auditCalls(ProceedingJoinPoint pjp, Request r) {
     logger.info("Capturing request: " + r);
  try{
   Object response = pjp.proceed();
   logger.info("Capturing response: " + response);
   return response;
  }catch(Throwable e) {
   throw new RuntimeException(e);
  }
 }
}

This appears to be good enough. Now what if I wanted to return the response back to the client immediately but continue to process the context of the method call – well we can place the logic of Advice in a separate thread using a ThreadPool. Let me add another layer of complexity now, what if we wanted to absolutely ensure that context is not lost – a good way to do this would be to keep the context of the method call outside the JVM, typically messaging providers like RabbitMQ and ActiveMQ will fit in very well.

Considering these additional requirements, a simpler solution especially with messaging scenarios coming into play will be to use Spring Integration. Let us start by defining a new Spring Integration application context:

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
    xmlns:beans="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
  http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

    <annotation-config/>

    <channel id="tobeprocessedlater"/>

    <logging-channel-adapter channel="tobeprocessedlater" log-full-message="true"/>

</beans:beans>

It just has the definition of a channel, and an outbound adapter which reads from this channel and logs the full message. To capture the context of the call to the SampleBean a Publisher annotation can be added to the relevant methods of SampleBean which will direct “stuff” to the channel which is added to the annotation.

@Service
public class SampleBean {
 private static final Logger logger = LoggerFactory.getLogger(SampleBean.class);

 @Publisher(channel = "tobeprocessedlater")
 public Response call(@Header("request") Request request) {
  logger.info("SampleBean.call invoked");
  return new Response(true);
 }
}

what “stuff” is sent to this “tobeprocessedlater” channel is specified through additional annotations – by default the return value from the method is sent to the channel, additionally I have tagged the request also with the @Header annotation, this will make the request to be sent in as a header to the response message. Just for completeness, the integration context has a <annotation-config/> tag, this tag registers the relevant components that look for @Publisher annotation and weave in the additional action to be performed if it finds one.

If this code is executed now, the output will be along these lines:

core.SampleBean - SampleBean.call invoked
o.s.integration.handler.LoggingHandler - [Payload=Response{success=true}][Headers={request=RequestType1{attr='null'}, id=52997b10-dc8e-e0eb-a82a-88c1df68fca5, timestamp=1389268390212}]

Now, to layer in the first requirement, to handle the advice(in this case the logging) in a separate thread of execution:

This can be done with just a configuration change! – instead of publishing the message to a direct channel, publish it to a channel type that can buffer messages or use an executor to dispatch messages, I have opted to use an executor channel in this example:

<channel id="tobeprocessedlater">
        <dispatcher task-executor="taskExecutor"/>
    </channel>

Now to add the requirement to make the async message processing a little more reliable by publishing it to an external Messaging provider(and processing the messages later), let me demonstrate this by publishing the messages to RabbitMQ, the code change again is pure configuration and nothing in the code changes!:

<channel id="tobeprocessedlater"/>

    <int-amqp:outbound-channel-adapter amqp-template="amqpTemplate" channel="tobeprocessedlater"  />

The messaging sink could have been anything – a database, a file system, ActiveMQ, and the change that would have been required is pure configuration.
 

Reference: Spring Integration Publisher from our JCG partner Biju Kunjummen at the all and sundry blog.
Related Whitepaper:

Functional Programming in Java: Harnessing the Power of Java 8 Lambda Expressions

Get ready to program in a whole new way!

Functional Programming in Java will help you quickly get on top of the new, essential Java 8 language features and the functional style that will change and improve your code. This short, targeted book will help you make the paradigm shift from the old imperative way to a less error-prone, more elegant, and concise coding style that’s also a breeze to parallelize. You’ll explore the syntax and semantics of lambda expressions, method and constructor references, and functional interfaces. You’ll design and write applications better using the new standards in Java 8 and the JDK.

Get it Now!  

Leave a Reply


six + 8 =



Java Code Geeks and all content copyright © 2010-2014, Exelixis Media Ltd | Terms of Use
All trademarks and registered trademarks appearing on Java Code Geeks are the property of their respective owners.
Java is a trademark or registered trademark of Oracle Corporation in the United States and other countries.
Java Code Geeks is not connected to Oracle Corporation and is not sponsored by Oracle Corporation.

Sign up for our Newsletter

15,153 insiders are already enjoying weekly updates and complimentary whitepapers! Join them now to gain exclusive access to the latest news in the Java world, as well as insights about Android, Scala, Groovy and other related technologies.

As an extra bonus, by joining you will get our brand new e-books, published by Java Code Geeks and their JCG partners for your reading pleasure! Enter your info and stay on top of things,

  • Fresh trends
  • Cases and examples
  • Research and insights
  • Two complimentary e-books