Enterprise Java

Spring Integration – Using RMI Channel Adapters

1.Introduction

This article explains how to send and receive messages over RMI using Spring Integration RMI channel adapters. It is composed of the following sections:

  • Implementing the service: The first section focuses on creating and exposing a service.
  • Implementing the client: Shows how to invoke the service using the MessagingTemplate class.
  • Abstracting SI logic: Finally, I’ve added another section explaining how to implement the same client abstracting all Spring Integration code, leaving the client focused on its business logic.

You can get the source code at github.

2.Implementing the service

This first part is pretty simple. The service is defined through annotations, so it will be autodetected by component scanning. It has a repository injected, which gets the data from an embedded database, as it will be shown in this same section:

@Service("defaultEmployeeService")
public class EmployeeServiceImpl implements EmployeeService {
    @Autowired
    private EmployeeRepository employeeRepository;

    @Override
    public Employee retrieveEmployee(int id) {
        return employeeRepository.getEmployee(id);
    }
}

The repository is as follows:

@Repository
public class EmployeeRepositoryImpl implements EmployeeRepository {
    private JdbcTemplate template;
    private RowMapper<Employee> rowMapper = new EmployeeRowMapper();
    private static final String SEARCH = "select * from employees where id = ?";
    private static final String COLUMN_ID = "id";
    private static final String COLUMN_NAME = "name";

    @Autowired
    public EmployeeRepositoryImpl(DataSource dataSource) {
        this.template = new JdbcTemplate(dataSource);
    }

    public Employee getEmployee(int id) {
        return template.queryForObject(SEARCH, rowMapper, id);
    }

    private class EmployeeRowMapper implements RowMapper<Employee> {
        public Employee mapRow(ResultSet rs, int i) throws SQLException {
            Employee employee = new Employee();
            employee.setId(rs.getInt(COLUMN_ID));
            employee.setName(rs.getString(COLUMN_NAME));

            return employee;
        }
    }
}

The following configuration exposes the service over RMI:

server-config.xml

<context:component-scan base-package="xpadro.spring.integration"/>

<int-rmi:inbound-gateway request-channel="requestEmployee"/>

<int:channel id="requestEmployee"/>

<int:service-activator method="retrieveEmployee" input-channel="requestEmployee" ref="defaultEmployeeService"/>

<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
    <property name="dataSource" ref="dataSource" />
</bean>

<!-- in-memory database -->
<jdbc:embedded-database id="dataSource">
    <jdbc:script location="classpath:db/schemas/schema.sql" />
    <jdbc:script location="classpath:db/schemas/data.sql" />
</jdbc:embedded-database>

Let’s focus on the lines with the ‘int’ namespace:

The gateway’s function is to separate the plumbing of the messaging system from the rest of the application. In this way, it gets hidden from the business logic. A gateway is bidirectional, so you have:

  • Inbound gateway: Brings a message into the application and waits for a response.
  • Outbound gateway: Invokes an external system and sends the response back to the application.

In this example, we are using a RMI inbound gateway. It will receive a message over RMI and send it to the requestEmployee channel, which is also defined here.

Finally, the service activator allows you to connect a spring bean to a message channel. Here, it is connected to the requestEmployee channel. The message will arrive to the channel and the service activator will invoke the retrieveEmployee method. Take into account that the ‘method’ attribute is not necessary if the bean has only one public method or has a method annotated with @ServiceActivator.

The response will then be sent to the reply channel. Since we didn’t define this channel, it will create a temporary reply channel.

temporaryChannel

3.Implementing the client

The client we are going to implement will invoke the service in order to retrieve an employee. To do this, it will use the MessagingTemplate class:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations={"classpath:xpadro/spring/integration/test/config/client-config.xml"})
public class TestRmiClient {
    @Autowired
    MessageChannel localChannel;

    @Autowired
    MessagingTemplate template;

    @Test
    public void retrieveExistingEmployee() {
        Employee employee = (Employee) template.convertSendAndReceive(localChannel, 2);

        Assert.assertNotNull(employee);
        Assert.assertEquals(2, employee.getId());
        Assert.assertEquals("Bruce Springsteen", employee.getName());
    }
}

The client uses the messagingTemplate to convert the Integer object to a Message and send it to the local channel. As shown below, there’s an outbound gateway connected to the local channel. This outbound gateway will send the request message over RMI.

<int-rmi:outbound-gateway request-channel="localChannel" remote-channel="requestEmployee" host="localhost"/>

<int:channel id="localChannel"/>

<bean class="org.springframework.integration.core.MessagingTemplate" />

4.Abstracting SI logic

In the previous section, you may have noticed that the client class which accesses the service has Spring Integration specific logic mixed with its business code:

  • It uses the MessagingTemplate, which is a SI class.
  • It knows about the local channel, which is specific to the messaging system

In this section, I will implement this same example abstracting the messaging logic, so the client will only care about its business logic.

First, let’s take a look at the new client:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations={"classpath:xpadro/spring/integration/test/config/client-gateway-config.xml"})
public class TestRmiGatewayClient {
    @Autowired
    private EmployeeService service;

    @Test
    public void retrieveExistingEmployee() {
        Employee employee = service.retrieveEmployee(2);

        Assert.assertNotNull(employee);
        Assert.assertEquals(2, employee.getId());
        Assert.assertEquals("Bruce Springsteen", employee.getName());
    }
}

We can see now that the client just implement its business logic, without using neither message channels nor messaging template. It will just call the service interface. All the messaging definitions are in the configuration file.

<int-rmi:outbound-gateway request-channel="localChannel" remote-channel="requestEmployee" host="localhost"/>

<int:channel id="localChannel"/>

<int:gateway default-request-channel="localChannel" 
    service-interface="xpadro.spring.integration.service.EmployeeService"/>

client-gateway-config.xml

What we did here is add a gateway that will intercept calls to the service interface EmployeeService. Spring Integration will use the GatewayProxyFactoryBean class to create a proxy around the service interface. This proxy will use a messaging template to send the invocation to the request channel and wait for the response.

gatewayProxyFactoryBean

5.Conclusion

We have seen how to use Spring Integration to access a service over RMI.  We have also seen that we can not only explicitly send messages using the MessagingTemplate but also do it transparently with GatewayProxyFactoryBean.
 

Xavier Padro

Xavier is a software developer working in a consulting firm based in Barcelona. He is specialized in web application development with experience in both frontend and backend. He is interested in everything related to Java and the Spring framework.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

2 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
logan
logan
10 years ago

It thrown an exception when I ran TestRmiClient. Could you please help to fix it?

org.springframework.integration.MessageDeliveryException: Dispatcher has no subscribers.
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:104)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:61)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:128)
at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
at org.springframework.integration.core.MessagingTemplate.doSendAndReceive(MessagingTemplate.java:318)
at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:239)
at org.springframework.integration.core.MessagingTemplate.convertSendAndReceive(MessagingTemplate.java:254)
at xpadro.spring.integration.client.TestRmiClient.retrieveExistingEmployee(TestRmiClient.java:27)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:74)
at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:83)
at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:72)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:231)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:88)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:71)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:174)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)

Xavier Padro
10 years ago

Hi Logan,

Just cloned my github repository and run the test. Everything goes fine. Did you modify anything? The error message shows that the template is not finding its subscriber, which is the outbound gateway:

Did you modify the configuration?

Back to top button