Enterprise Java

Enterprise Messaging

This article is part of our Academy Course titled Spring Integration for EAI.

In this course, you are introduced to Enterprise Application Integration patterns and how Spring Integration addresses them. Next, you delve into the fundamentals of Spring Integration, like channels, transformers and adapters. Check it out here!

1. Introduction

This tutorial is focused on explaining how we can integrate our application with Spring Integration and JMS messaging. For this purpose, I will first show you how to install Active MQ, which will be our broker during this tutorial. Next sections will show examples of sending and receiving JMS messages by using the Spring Integration JMS channel adapters. Following these examples, we will see some ways of customizing these invocations by configuring message conversion and destination resolution.

The last part of this tutorial shows briefly how to use Spring Integration with the AMQP protocol. It will go through the installation of RabbitMQ and end up with a basic example of messaging.

This tutorial is composed by the following sections:

  1. Introduction
  2. Preparing the environment
  3. JMS Adapters: Reception
  4. JMS Adapters: Sending
  5. Using Gateways
  6. Message conversion
  7. JMS backed message channels
  8. Dynamic destination resolution
  9. AMQP integration

2. Preparing the environment

If you want to send messages through JMS, you will first need a broker. The examples included in this tutorial are executed over Active MQ, an open source messaging broker. In this section I will help you install the server and implement a simple Spring application that will test it is all set correctly. The explanation is based on a Windows system. If you already have a server installed just skip this section.

The first step is to download the Apache MQ server from Apache.org. Once, downloaded, just extract it into a folder of your choice.

To start the server you just need to execute the file activemq which is located in the apache-activemq-5.9.0\bin folder.

Figure 1
Figure 1

Ok, the server is running. Now we just need to implement the application. We are going to create a producer, a consumer, a spring configuration file and a test.

The producer

You can use any Java class instead of my TicketOrder object.

public class JmsProducer {
    @Autowired
    @Qualifier("jmsTemplate")
    private JmsTemplate jmsTemplate;
    
    public void convertAndSendMessage(TicketOrder order) {
        jmsTemplate.convertAndSend(order);
    }
    
    public void convertAndSendMessage(String destination, TicketOrder order) {
        jmsTemplate.convertAndSend(destination, order);
    }
}

The consumer

public class SyncConsumer {
    @Autowired
    private JmsTemplate jmsTemplate;
    
    public TicketOrder receive() {
        return (TicketOrder) jmsTemplate.receiveAndConvert("test.sync.queue");
    }
}

Spring configuration file

<bean id="consumer" class="xpadro.spring.integration.consumer.SyncConsumer"/>
<bean id="producer" class="xpadro.spring.integration.producer.JmsProducer"/>

<!-- Infrastructure -->
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://localhost:61616" />
</bean>

<bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="connectionFactory"/>
</bean>

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="cachingConnectionFactory"/>
    <property name="defaultDestination" ref="syncTestQueue"/>
</bean>

<!-- Destinations -->
<bean id="syncTestQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="test.sync.queue"/>
</bean>

The test

@ContextConfiguration(locations = {"/xpadro/spring/integration/test/jms-config.xml"})
@RunWith(SpringJUnit4ClassRunner.class)
public class TestJmsConfig {
    @Autowired
    private JmsProducer producer;
    
    @Autowired
    private SyncConsumer consumer;
    
    @Test
    public void testReceiving() throws InterruptedException, RemoteException {
        TicketOrder order = new TicketOrder(1, 5, new Date());
        //Sends the message to the jmsTemplate's default destination
        producer.convertAndSendMessage(order);
        
        Thread.sleep(2000);
        
        TicketOrder receivedOrder = consumer.receive();
        assertNotNull(receivedOrder);
        assertEquals(1, receivedOrder.getFilmId());
        assertEquals(5, receivedOrder.getQuantity());
    }
}

If the test passes, you have all correctly set. We can now move to the next section.

3. JMS Adapters: Reception

Spring Integration provides several adapters and gateways to receive messages from a JMS queue or topic. These adapters are briefly discussed below:

  • Inbound Channel Adapter: It internally uses a JmsTemplate to actively receive messages from a JMS queue or topic.
  • Message Driven Channel Adapter: It internally uses a Spring MessageListener container to passively receive messages.

3.1. Inbound Channel Adapters: Active reception

This section explains how to use the first adapter described in the previous section.

The JMS inbound channel adapter actively polls a queue to retrieve messages from it. Since it uses a poller, you will have to define it in the Spring configuration file. Once the adapter has retrieved a message, it will be sent into the messaging system through the specified message channel. We can then process the message using endpoints like transformers, filters, etc… Or we can send it to a service activator.

This example retrieves a ticket order message from a JMS queue and sends it to a service activator, which will process it and confirm the order. The order is confirmed by sending it to some kind of repository which has a simple List with all the registered orders.

We are using the same producer as in the section “2 preparing the environment”:

<bean id="producer" class="xpadro.spring.integration.producer.JmsProducer"/>

<!-- Infrastructure -->
<!-- Connection factory and jmsTemplate configuration -->
<!-- as seen in the second section -->

<!-- Destinations -->
<bean id="toIntQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="int.sync.queue"/>
</bean>

The test will use the producer to send a message to the “toIntQueue”. Now we are going to set up the Spring Integration configuration:

integration-jms.xml

<context:component-scan base-package="xpadro.spring.integration"/>

<int-jms:inbound-channel-adapter id="jmsAdapter" destination="toIntQueue" channel="jmsChannel"/>

<int:channel id="jmsChannel"/>

<int:service-activator method="processOrder" input-channel="jmsChannel" ref="ticketProcessor"/>

<int:poller id="poller" default="true" fixed-delay="1000"/>

The JMS inbound channel adapter will use the defined poller to retrieve messages from the “toIntQueue”. You have to configure a poller for the adapter or it will throw a runtime exception. In this case, we have defined a default poller. This means that any endpoint that needs a poller will use this one. If you don’t configure a default poller, you will need to define a specific poller for each endpoint that retrieves messages actively.

The consumer

The service activator is just a bean (auto detected by component scan):

@Component("ticketProcessor")
public class TicketProcessor {
    private static final Logger logger = LoggerFactory.getLogger(TicketProcessor.class);
    private static final String ERROR_INVALID_ID = "Order ID is invalid";
    
    @Autowired
    private OrderRepository repository;

    public void processOrder(TicketOrder order) {
        logger.info("Processing order {}", order.getFilmId());
        
        if (isInvalidOrder(order)) {
            logger.info("Error while processing order [{}]", ERROR_INVALID_ID);
            throw new InvalidOrderException(ERROR_INVALID_ID);
        }
        
        float amount = 5.95f * order.getQuantity();
        
        TicketConfirmation confirmation = new TicketConfirmation("123", order.getFilmId(), order.getOrderDate(), order.getQuantity(), amount);
        repository.confirmOrder(confirmation);
    }
    
    private boolean isInvalidOrder(TicketOrder order) {
        if (order.getFilmId() == -1) {
            return true;
        }
        return false;
    }
}

In the previous code snippet, the processOrder method receives a TicketOrder object and directly processes it. However, you could instead define Message<?> or Message<TicketOrder> in order to receive the message. In this way, you will have both access to the payload of the message and to its headers.

Notice also, that the method returns void. We don’t need to return anything since the messaging flow ends here. If needed, you could also define a reply channel to the service adapter and return the confirmation. Additionally, we would then subscribe an endpoint or a gateway to this reply channel in order to send the confirmation to another JMS queue, send it to a web service or store it to a database, for example.

Finally, let’s take a look at the test to see how it is all executed:

@ContextConfiguration(locations = {"/xpadro/spring/integration/test/jms-config.xml",
        "/xpadro/spring/integration/test/int-jms-config.xml"})
@RunWith(SpringJUnit4ClassRunner.class)
public class TestIntegrationJmsConfig {
    @Autowired
    private JmsProducer producer;
    
    @Autowired
    private OrderRepository repository;
    
    @Test
    public void testSendToIntegration() throws InterruptedException, RemoteException {
        TicketOrder order = new TicketOrder(1, 5, new Date());
        //Sends the message to the jmsTemplate's default destination
        producer.convertAndSendMessage("int.sync.queue", order);
        
        Thread.sleep(4000);
        
        assertEquals(1, repository.getConfirmations().size());
        assertNotNull(repository.getConfirmations().get(0));
        TicketConfirmation conf = repository.getConfirmations().get(0);
        assertEquals("123", conf.getId());
    }
}

I have put a Thread.sleep of four seconds to wait for the message to be sent. We could have used a while loop to check if the message have been received until a timeout is reached.

3.2. Inbound Channel Adapters: Passive reception

This second part of the JMS reception section uses a message driven channel adapter. In this way, as soon as a message is sent to the queue, it will be delivered to the adapter, without the need of using pollers. It is the message channel we delivers the message to its subscribers.

The example is very similar to the one seen in the previous section. I will just show what the changes are made in the configuration.

The only thing I have changed from the previous example is the spring integration configuration:

<context:component-scan base-package="xpadro.spring.integration"/>

<int-jms:message-driven-channel-adapter id="jmsAdapter" destination="toIntQueue" channel="jmsChannel" />

<int:channel id="jmsChannel"/>

<int:service-activator method="processOrder" input-channel="jmsChannel" ref="ticketProcessor"/>

I deleted the poller and changed the JMS inbound adapter for a message driven channel adapter. That’s it; the adapter will passively receive messages and deliver them to the jmsChannel.

Take into account that the message listener adapter needs at least one of the following combinations:

  • A message listener container.
  • A connection factory and a destination.

In our example, we have used the second option. The destination is specified in the adapter configuration and the connection factory is defined in the jms-config file, which is also imported by the test.

4. JMS Adapters: Sending

In the previous section we have seen how to receive messages sent to a JMS queue by an external system. This section shows you outbound channel adapters, which let you to send JMS messages outside of our system.

In contrast to inbound adapters, there is only one type of outbound adapter. This adapter uses a JmsTemplate internally to send the message, and in order to configure this adapter you will need to specify at least one of the following:

  • A JmsTemplate.
  • A connection factory and a destination.

As in the inbound example, we are using the second option to send a message to a JMS queue. The configuration is as follows:

For this example, we are going to create a new queue to the jms configuration (jms-config.xml). This is where our Spring Integration application will send the message to:

<bean id="toJmsQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="to.jms.queue"/>
</bean>

Ok, now we configure the integration configuration with the JMS outbound adapter:

<context:component-scan base-package="xpadro.spring.integration"/>

<int:gateway default-request-channel="requestChannel" 
    service-interface="xpadro.spring.integration.service.TicketService"/>
    
<int:channel id="requestChannel"/>

<int-jms:outbound-channel-adapter id="jmsAdapter" channel="requestChannel" destination="toJmsQueue"/>

We are using a gateway as an entry to our messaging system. The test will use this interface to send a new TicketOrder object. The gateway will receive the message and place it into the requestChannel channel. Since it is a direct channel, it will be sent to the JMS outbound channel adapter.

The adapter receives a Spring Integration Message. It then can send the message in two ways:

  • Convert the message into a JMS message. This is done by setting the adapter’s attribute “extract-payload” to true, which is the default value. This is the option we have used in the example.
  • Send the message as it is, a Spring Integration Message. You can accomplish this by setting the “extract-payload” attribute to false.

This decision depends on what type of system is expecting your message. If the other application is a Spring Integration application, you can use the second approach. Otherwise, use the default. In our example, there’s a simple Spring JMS application on the other side. Thus, we have to choose the first option.

Continuing with our example, we now have a look at the test, which uses the gateway interface to send a message and a custom consumer to receive it. In this test, the consumer will play the role of a JMS application which uses a jmsTemplate to retrieve it from the JMS queue:

@ContextConfiguration(locations = {"/xpadro/spring/integration/test/jms-config.xml",
        "/xpadro/spring/integration/test/int-jms-out-config.xml"})
@RunWith(SpringJUnit4ClassRunner.class)
public class TestIntegrationJmsOutboundConfig {
    @Autowired
    private SyncConsumer consumer;
    
    @Autowired
    private TicketService service;
    
    @Test
    public void testSendToJms() throws InterruptedException, RemoteException {
        TicketOrder order = new TicketOrder(1, 5, new Date());
        service.sendOrder(order);
        
        TicketOrder receivedOrder = consumer.receive("to.jms.queue");
        assertNotNull(receivedOrder);
        assertEquals(1, receivedOrder.getFilmId());
        assertEquals(5, receivedOrder.getQuantity());
    }
}

5. Using Gateways

Additionally to channel adapters, Spring Integration provides inbound and outbound gateways. As you may recall from previous tutorials, gateways provide a bidirectional communication with external systems, meaning send and receive or receive and reply operations. In this case, it allows request or retry operations.

In this section, we are going to see an example using a JMS outbound gateway. The gateway will send a JMS message to a queue and wait for a reply. If no reply is sent back, the gateway will throw a MessageTimeoutException.

Spring Integration configuration

<context:component-scan base-package="xpadro.spring.integration"/>

<int:gateway id="inGateway" default-request-channel="requestChannel" 
    service-interface="xpadro.spring.integration.service.TicketService"/>
    
<int:channel id="requestChannel"/>

<int-jms:outbound-gateway id="outGateway" request-destination="toAsyncJmsQueue" 
    request-channel="requestChannel" reply-channel="jmsReplyChannel"/>

<int:channel id="jmsReplyChannel"/>

<int:service-activator method="registerOrderConfirmation" input-channel="jmsReplyChannel" ref="ticketProcessor"/>

The flow is as follows:

  1. A TicketOrder wrapped into a Spring Integration Message will enter the messaging system through the “inGateway” gateway.
  2. The gateway will place the message into the “requestChannel” channel.
  3. The channel sends the message to its subscribed endpoint, the JMS outbound gateway.
  4. The JMS outbound gateway extracts the payload of the message and wraps it to a JMS message.
  5. The gateway sends the message and waits for a reply.
  6. When the reply comes, in the form of a TicketConfirmation wrapped into a JMS message, the gateway will get the payload and wrap it into a Spring Integration message.
  7. The message is sent to the “jmsReplyChannel” channel, where a service activator (TicketProcessor) will process it and register to our OrderRepository.

The order processor is quite simple. It receives the TicketConfirmation and adds it to the ticket repository:

@Component("ticketProcessor")
public class TicketProcessor {
    @Autowired
    private OrderRepository repository;

    public void registerOrderConfirmation(TicketConfirmation confirmation) {
        repository.confirmOrder(confirmation);
    }
}

The test

@RunWith(SpringJUnit4ClassRunner.class)
public class TestIntegrationJmsOutGatewayConfig {
    @Autowired
    private OrderRepository repository;
    
    @Autowired
    private TicketService service;
    
    @Test
    public void testSendToJms() throws InterruptedException, RemoteException {
        TicketOrder order = new TicketOrder(1, 5, new Date());
        service.sendOrder(order);
        
        Thread.sleep(4000);
        
        assertEquals(1, repository.getConfirmations().size());
        assertNotNull(repository.getConfirmations().get(0));
        TicketConfirmation conf = repository.getConfirmations().get(0);
        assertEquals("321", conf.getId());

    }
}

The external system

To fully understand the example, I will show you what happens when the message is delivered to the JMS queue.

Listening to the queue where the message has been sent by Spring Integration, there’s a listener asyncConsumer:

<bean id="toAsyncJmsQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="to.async.jms.queue"/>
</bean>

<!-- Listeners -->
<jms:listener-container connection-factory="connectionFactory">
    <jms:listener destination="to.async.jms.queue" ref="asyncConsumer"/>
</jms:listener-container>

The listener receives the message, creates a new message with the ticket confirmation and replies. Notice that we have to set the correlation ID of the reply message with the same value as the request message. This will allow the client to know which message we are responding to. Also, we are setting the destination to the reply channel configured in the request message.

@Component("asyncConsumer")
public class AsyncConsumer implements MessageListener {
    @Autowired
    private JmsTemplate template;
    
    @Override
    public void onMessage(Message order) {
        final Message msgOrder = order;
        TicketOrder orderObject;
        try {
            orderObject = (TicketOrder) ((ObjectMessage) order).getObject();
        } catch (JMSException e) {
            throw JmsUtils.convertJmsAccessException(e);
        }
        float amount = 5.95f * orderObject.getQuantity();
        TicketConfirmation confirmation = new TicketConfirmation("321", orderObject.getFilmId(), orderObject.getOrderDate(), orderObject.getQuantity(), amount);
        
        try {
            template.convertAndSend(msgOrder.getJMSReplyTo(), confirmation, new MessagePostProcessor() {
                public Message postProcessMessage(Message message) throws JMSException {
                    message.setJMSCorrelationID(msgOrder.getJMSCorrelationID());
                    
                    return message;
                }
            });
        } catch (JmsException | JMSException e) {
            throw JmsUtils.convertJmsAccessException((JMSException) e);
        }
    }
}


 

6. Message conversion

Both message channel adapters and gateways use a message converter to convert incoming messages to Java types, or the opposite way. A converter must implement the MessageConverter interface:

public interface MessageConverter {

    <P> Message<P> toMessage(Object object);

    <P> Object fromMessage(Message<P> message);

}

Spring Integration comes with two implementations of the MessageConverter interface:

MapMessageConverter

Its fromMessage method creates a new HashMap with two keys:

  • payload: The value is the payload of the message (message.getPayload).
  • headers: The value is another HashMap with all the headers from the original message.

“toMessage” method expects a Map instance with the same structure (payload and headers keys) and constructs a Spring Integration message.

SimpleMessageConverter

This is the default converter used by the adapters and gateways. You can see from the source code that it converts from/to an Object:

public Message<?> toMessage(Object object) throws Exception {
    if (object == null) {
        return null;
    }
    if (object instanceof Message<?>) {
        return (Message<?>) object;
    }
    return MessageBuilder.withPayload(object).build();
}

public Object fromMessage(Message<?> message) throws Exception {
    return (message != null) ? message.getPayload() : null;
}

Anyway, if you need your own implementation, you can specify your custom converter at the channel adapter or gateway configuration. For example, using a gateway:

<int-jms:outbound-gateway id="outGateway" request-destination="toAsyncJmsQueue" 
    request-channel="requestChannel" reply-channel="jmsReplyChannel" 
    message-converter="myConverter"/>

Just remember that your converter should implement MessageConverter:

@Component("myConverter")
public class MyConverter implements MessageConverter {

7. JMS backed message channels

Channel adapters and gateways are used to communicate with external systems. JMS backed message channels are used to send and receive JMS messages between consumers and producers that are located within the same application. Although we can still use channel adapters in this situation, it is much simpler to use a JMS channel. The difference from an integration message channel is that the JMS channel will use the JMS broker to send the message. This means that the message will not be just stored in an in-memory channel. Instead, it will be sent to the JMS provider, making it possible to also use transactions. If you use transactions, it will work as follows:

  • The producer sending the message to the JMS backed channel will not write it if the transaction is rolled back.
  • The consumer subscribed to the JMS backed channel will not remove the message from it if the transaction is rolled back.

For this feature, Spring Integration provides both channels: point to point and publish/subscribe channels. They are configured below:

point to point direct channel

<int-jms:channel id="jmsChannel" queue="myQueue"/>

publish/subscribe channel

<int-jms:publish-subscribe-channel id="jmsChannel" topic="myTopic"/>

In the following example, we can see a simple application with two endpoints communicating with each other using a JMS backed channel.

Configuration

Messages sent to the messaging system (TicketOrder objects) arrive to a service activator, the ticket processor. This processor then sends the order (sendJMS) to the JMS backed message. Subscribed to this channel, there is the same processor that will receive the message (receiveJms), process it creating a TicketConfirmation and registering it to the ticket repository:

<context:component-scan base-package="xpadro.spring.integration"/>

<int:gateway default-request-channel="requestChannel" 
    service-interface="xpadro.spring.integration.service.TicketService"/>
    
<int:channel id="requestChannel"/>

<int:service-activator method="sendJms" input-channel="requestChannel" output-channel="jmsChannel" ref="ticketJmsProcessor"/>

<int-jms:channel id="jmsChannel" queue="syncTestQueue"/>

<int:service-activator method="receiveJms" input-channel="jmsChannel" ref="ticketJmsProcessor"/>

The processor

Implements both methods: sendJms and receiveJms:

@Component("ticketJmsProcessor")
public class TicketJmsProcessor {
    private static final Logger logger = LoggerFactory.getLogger(TicketJmsProcessor.class);
    
    @Autowired
    private OrderRepository repository;

    public TicketOrder sendJms(TicketOrder order) {
        logger.info("Sending order {}", order.getFilmId());
        return order;
    }
    
    public void receiveJms(TicketOrder order) {
        logger.info("Processing order {}", order.getFilmId());
        
        float amount = 5.95f * order.getQuantity();
        
        TicketConfirmation confirmation = new TicketConfirmation("123", order.getFilmId(), order.getOrderDate(), order.getQuantity(), amount);
        repository.confirmOrder(confirmation);
    }
}

The test

@ContextConfiguration(locations = {"/xpadro/spring/integration/test/jms-config.xml",
		"/xpadro/spring/integration/test/int-jms-jms-config.xml"})
@RunWith(SpringJUnit4ClassRunner.class)
public class TestIntegrationJmsToJmsConfig {
	@Autowired
	private OrderRepository repository;
	
	@Autowired
	private TicketService service;
	
	@Test
	public void testSendToJms() throws InterruptedException, RemoteException {
		TicketOrder order = new TicketOrder(1, 5, new Date());
		service.sendOrder(order);
		
		Thread.sleep(4000);
		
		assertEquals(1, repository.getConfirmations().size());
		assertNotNull(repository.getConfirmations().get(0));
		TicketConfirmation conf = repository.getConfirmations().get(0);
		assertEquals("123", conf.getId());

	}
}

JMS backed channels offer different possibilities like configuring the queue name instead of the queue reference or using a destination resolver:

<int-jms:channel id="jmsChannel" queue-name="myQueue"
    destination-resolver="myDestinationResolver"/>

8. Dynamic destination resolution

A destination resolver is a class which allows us to resolve destination names into JMS destinations. Any destination resolver must implement the following interface:

public interface DestinationResolver {
    Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain)
            throws JMSException;
}

Destination resolvers can be specified on JMS channel adapters, JMS gateways and JMS backed channels. If you don’t configure a destination resolver explicitly, Spring will use a default implementation, which is DynamicDestinationResolver. This resolver is explained below as the other implementations provided by Spring:

  • DynamicDestinationResolver: Resolves destination names as dynamic destinations by using the standard JMS Session.createTopic and Session.createQueue methods.
  • BeanFactoryDestinationResolver: It will look up at the Spring context for a bean with a name like the destination name provided and expecting it to be of type javax.jms.Destination. If it can’t find it, it will throw a DestinationResolutionException.
  • JndiDestinationResolver: It will assume that the destination name is a JNDI location.

If we don’t want to use the default dynamic resolver, we can implement a custom resolver and configure it in the desired endpoint. For example, the following JMS backed channel uses a different implementation:

<int-jms:channel id="jmsChannel" queue-name="myQueue"
    destination-resolver="myDestinationResolver"/>

9. AMQP integration

9.1. Installation

To install and start the RabbitMQ server you just need to follow the steps described below. If you already have the server installed, just skip this section.

  1. The first step is to install erlang, required for the RabbitMQ server. Go to the following URL, download your system version and install it:
  2. The next step is to download and install RabbitMQ. Download version 3.2.4 release if you want to use the same version as used in this tutorial:
  3. Now, open the command prompt. If you are a windows user, you can go directly by clicking on the start menu and selecting RabbitMQ Command Prompt in the RabbitMQ folder.
  4. Activate the management plugin
  5. > rabbitmq-plugins enable rabbitmq_management
  6. Start the server
  7. > rabbitmq-server.bat

Ok, now we will test that RabbitMQ is correctly installed. Go to http://localhost:15672 and log using ‘guest’ as both username and password. If you are using a version prior to 3.0, then the port will be 55672.

If you see the web UI then is all set.

9.2. Demo application

In order to use AMQP with Spring Integration, we will need to add the following dependencies to our pom.xml file:

Spring AMQP (for rabbitMQ)

<dependency>
   <groupId>org.springframework.amqp</groupId>
   <artifactId>spring-rabbit</artifactId>
   <version>1.3.1.RELEASE</version>
</dependency>

Spring Integration AMQP endpoints

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-amqp</artifactId>
    <version>3.0.2.RELEASE</version>
</dependency>

Now we are going to create a new configuration file amqp-config.xml that will contain rabbitMQ configuration (like the jms-config for JMS we used previously in this tutorial).

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <rabbit:connection-factory id="connectionFactory" />
    
    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
    
    <rabbit:admin connection-factory="connectionFactory" />
    
    <rabbit:queue name="rabbit.queue" />
    
    <rabbit:direct-exchange name="rabbit.exchange">
        <rabbit:bindings>
            <rabbit:binding queue="rabbit.queue" key="rabbit.key.binding" />
        </rabbit:bindings>
    </rabbit:direct-exchange>
</beans>

The next file is the Spring Integration file which contains channels and channel adapters:

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd">
    
    <context:component-scan base-package="xpadro.spring.integration.amqp"/>
    
    <int:gateway default-request-channel="requestChannel" 
        service-interface="xpadro.spring.integration.amqp.service.AMQPService"/>
        
    <int:channel id="requestChannel"/>
    
    <int-amqp:outbound-channel-adapter
        channel="requestChannel" amqp-template="amqpTemplate" exchange-name="rabbit.exchange"
        routing-key="rabbit.key.binding"/>
    
    <int-amqp:inbound-channel-adapter channel="responseChannel"
        queue-names="rabbit.queue" connection-factory="connectionFactory" />
    
    <int:channel id="responseChannel"/>
    
    <int:service-activator ref="amqpProcessor" method="process" input-channel="responseChannel"/>
    
</beans>

The flow is as follows:

  1. The test application sends a message, which will be a simple String, to the gateway.
  2. From the gateway, it will reach the outbound channel adapter through the “requestChannel” channel.
  3. The outbound channel adapter sends the message to the “rabbit.queue” queue.
  4. Subscribed to this “rabbit.queue” queue we have configured an inbound channel adapter. It will receive messages sent to the queue.
  5. The message is sent to the service activator through the “responseChannel” channel.
  6. The service activator simply prints the message.

The gateway that serves as an entry point to the messaging system contains a single method:

public interface AMQPService {
    @Gateway
    public void sendMessage(String message);
}

The service activator amqpProcessor is very simple; it receives a message and prints its payload:

@Component("amqpProcessor")
public class AmqpProcessor {

    public void process(Message<String> msg) {
        System.out.println("Message received: "+msg.getPayload());
    }
}

To finish with the example, here is the application that initiates the flow by invoking the service wrapped by the gateway:

@ContextConfiguration(locations = {"/xpadro/spring/integration/test/amqp-config.xml",
        "/xpadro/spring/integration/test/int-amqp-config.xml"})
@RunWith(SpringJUnit4ClassRunner.class)
public class TestIntegrationAMQPConfig {
    
    @Autowired
    private AMQPService service;
    
    @Test
    public void testSendToJms() throws InterruptedException, RemoteException {
        String msg = "hello";
        
        service.sendMessage(msg);
        
        Thread.sleep(2000);
    }
}

Xavier Padro

Xavier is a software developer working in a consulting firm based in Barcelona. He is specialized in web application development with experience in both frontend and backend. He is interested in everything related to Java and the Spring framework.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

1 Comment
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Monika Arora
Monika Arora
5 years ago

Hi

Just wanted to add a bit here This TicketOrder class must be Serializable because JMSTemplate:convertAndSend expects string, Map or Serializable Object

Thanks
Monika

Back to top button