About Xavier Padro

Xavier is a software developer working in a consulting firm based in Barcelona. He is specialized in web application development with experience in both frontend and backend. He is interested in everything related to Java and the Spring framework

Spring Integration – Using RMI Channel Adapters

1.Introduction

This article explains how to send and receive messages over RMI using Spring Integration RMI channel adapters. It is composed of the following sections:

  • Implementing the service: The first section focuses on creating and exposing a service.
  • Implementing the client: Shows how to invoke the service using the MessagingTemplate class.
  • Abstracting SI logic: Finally, I’ve added another section explaining how to implement the same client abstracting all Spring Integration code, leaving the client focused on its business logic.

You can get the source code at github.

2.Implementing the service

This first part is pretty simple. The service is defined through annotations, so it will be autodetected by component scanning. It has a repository injected, which gets the data from an embedded database, as it will be shown in this same section:

@Service("defaultEmployeeService")
public class EmployeeServiceImpl implements EmployeeService {
    @Autowired
    private EmployeeRepository employeeRepository;

    @Override
    public Employee retrieveEmployee(int id) {
        return employeeRepository.getEmployee(id);
    }
}

The repository is as follows:

@Repository
public class EmployeeRepositoryImpl implements EmployeeRepository {
    private JdbcTemplate template;
    private RowMapper<Employee> rowMapper = new EmployeeRowMapper();
    private static final String SEARCH = "select * from employees where id = ?";
    private static final String COLUMN_ID = "id";
    private static final String COLUMN_NAME = "name";

    @Autowired
    public EmployeeRepositoryImpl(DataSource dataSource) {
        this.template = new JdbcTemplate(dataSource);
    }

    public Employee getEmployee(int id) {
        return template.queryForObject(SEARCH, rowMapper, id);
    }

    private class EmployeeRowMapper implements RowMapper<Employee> {
        public Employee mapRow(ResultSet rs, int i) throws SQLException {
            Employee employee = new Employee();
            employee.setId(rs.getInt(COLUMN_ID));
            employee.setName(rs.getString(COLUMN_NAME));

            return employee;
        }
    }
}

The following configuration exposes the service over RMI:

server-config.xml

<context:component-scan base-package="xpadro.spring.integration"/>

<int-rmi:inbound-gateway request-channel="requestEmployee"/>

<int:channel id="requestEmployee"/>

<int:service-activator method="retrieveEmployee" input-channel="requestEmployee" ref="defaultEmployeeService"/>

<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
    <property name="dataSource" ref="dataSource" />
</bean>

<!-- in-memory database -->
<jdbc:embedded-database id="dataSource">
    <jdbc:script location="classpath:db/schemas/schema.sql" />
    <jdbc:script location="classpath:db/schemas/data.sql" />
</jdbc:embedded-database>

Let’s focus on the lines with the ‘int’ namespace:

The gateway’s function is to separate the plumbing of the messaging system from the rest of the application. In this way, it gets hidden from the business logic. A gateway is bidirectional, so you have:

  • Inbound gateway: Brings a message into the application and waits for a response.
  • Outbound gateway: Invokes an external system and sends the response back to the application.

In this example, we are using a RMI inbound gateway. It will receive a message over RMI and send it to the requestEmployee channel, which is also defined here.

Finally, the service activator allows you to connect a spring bean to a message channel. Here, it is connected to the requestEmployee channel. The message will arrive to the channel and the service activator will invoke the retrieveEmployee method. Take into account that the ‘method’ attribute is not necessary if the bean has only one public method or has a method annotated with @ServiceActivator.

The response will then be sent to the reply channel. Since we didn’t define this channel, it will create a temporary reply channel.

temporaryChannel

3.Implementing the client

The client we are going to implement will invoke the service in order to retrieve an employee. To do this, it will use the MessagingTemplate class:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations={"classpath:xpadro/spring/integration/test/config/client-config.xml"})
public class TestRmiClient {
    @Autowired
    MessageChannel localChannel;

    @Autowired
    MessagingTemplate template;

    @Test
    public void retrieveExistingEmployee() {
        Employee employee = (Employee) template.convertSendAndReceive(localChannel, 2);

        Assert.assertNotNull(employee);
        Assert.assertEquals(2, employee.getId());
        Assert.assertEquals("Bruce Springsteen", employee.getName());
    }
}

The client uses the messagingTemplate to convert the Integer object to a Message and send it to the local channel. As shown below, there’s an outbound gateway connected to the local channel. This outbound gateway will send the request message over RMI.

<int-rmi:outbound-gateway request-channel="localChannel" remote-channel="requestEmployee" host="localhost"/>

<int:channel id="localChannel"/>

<bean class="org.springframework.integration.core.MessagingTemplate" />

4.Abstracting SI logic

In the previous section, you may have noticed that the client class which accesses the service has Spring Integration specific logic mixed with its business code:

  • It uses the MessagingTemplate, which is a SI class.
  • It knows about the local channel, which is specific to the messaging system

In this section, I will implement this same example abstracting the messaging logic, so the client will only care about its business logic.

First, let’s take a look at the new client:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations={"classpath:xpadro/spring/integration/test/config/client-gateway-config.xml"})
public class TestRmiGatewayClient {
    @Autowired
    private EmployeeService service;

    @Test
    public void retrieveExistingEmployee() {
        Employee employee = service.retrieveEmployee(2);

        Assert.assertNotNull(employee);
        Assert.assertEquals(2, employee.getId());
        Assert.assertEquals("Bruce Springsteen", employee.getName());
    }
}

We can see now that the client just implement its business logic, without using neither message channels nor messaging template. It will just call the service interface. All the messaging definitions are in the configuration file.

<int-rmi:outbound-gateway request-channel="localChannel" remote-channel="requestEmployee" host="localhost"/>

<int:channel id="localChannel"/>

<int:gateway default-request-channel="localChannel" 
    service-interface="xpadro.spring.integration.service.EmployeeService"/>

client-gateway-config.xml

What we did here is add a gateway that will intercept calls to the service interface EmployeeService. Spring Integration will use the GatewayProxyFactoryBean class to create a proxy around the service interface. This proxy will use a messaging template to send the invocation to the request channel and wait for the response.

gatewayProxyFactoryBean

5.Conclusion

We have seen how to use Spring Integration to access a service over RMI.  We have also seen that we can not only explicitly send messages using the MessagingTemplate but also do it transparently with GatewayProxyFactoryBean.
 

Related Whitepaper:

Functional Programming in Java: Harnessing the Power of Java 8 Lambda Expressions

Get ready to program in a whole new way!

Functional Programming in Java will help you quickly get on top of the new, essential Java 8 language features and the functional style that will change and improve your code. This short, targeted book will help you make the paradigm shift from the old imperative way to a less error-prone, more elegant, and concise coding style that’s also a breeze to parallelize. You’ll explore the syntax and semantics of lambda expressions, method and constructor references, and functional interfaces. You’ll design and write applications better using the new standards in Java 8 and the JDK.

Get it Now!  

2 Responses to "Spring Integration – Using RMI Channel Adapters"

  1. logan says:

    It thrown an exception when I ran TestRmiClient. Could you please help to fix it?

    org.springframework.integration.MessageDeliveryException: Dispatcher has no subscribers.
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:104)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:61)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:128)
    at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
    at org.springframework.integration.core.MessagingTemplate.doSendAndReceive(MessagingTemplate.java:318)
    at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:239)
    at org.springframework.integration.core.MessagingTemplate.convertSendAndReceive(MessagingTemplate.java:254)
    at xpadro.spring.integration.client.TestRmiClient.retrieveExistingEmployee(TestRmiClient.java:27)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:74)
    at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:83)
    at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:72)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:231)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:88)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
    at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
    at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:71)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:174)
    at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
    at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)

  2. Hi Logan,

    Just cloned my github repository and run the test. Everything goes fine. Did you modify anything? The error message shows that the template is not finding its subscriber, which is the outbound gateway:

    Did you modify the configuration?

Leave a Reply


two × = 8



Java Code Geeks and all content copyright © 2010-2014, Exelixis Media Ltd | Terms of Use | Privacy Policy
All trademarks and registered trademarks appearing on Java Code Geeks are the property of their respective owners.
Java is a trademark or registered trademark of Oracle Corporation in the United States and other countries.
Java Code Geeks is not connected to Oracle Corporation and is not sponsored by Oracle Corporation.

Sign up for our Newsletter

20,709 insiders are already enjoying weekly updates and complimentary whitepapers! Join them now to gain exclusive access to the latest news in the Java world, as well as insights about Android, Scala, Groovy and other related technologies.

As an extra bonus, by joining you will get our brand new e-books, published by Java Code Geeks and their JCG partners for your reading pleasure! Enter your info and stay on top of things,

  • Fresh trends
  • Cases and examples
  • Research and insights
  • Two complimentary e-books