Enterprise Java

Spring 3 HornetQ 2.1 Integration Tutorial

Utilize the new ultra high performance messaging system from JBoss through Spring framework.

HornetQ is an open source project to build a multi-protocol, embeddable, very high performance, clustered, asynchronous messaging system. It is written in Java and runs on any platform with a Java 5 or later runtime. HornetQ class-beating high performance journal provides persistent messaging performance at rates normally seen for non-persistent messaging. Non-persistent messaging performance is also extremely high. Among other “sexy” features, HornetQ offers server replication and automatic client failover to eliminate lost or duplicated messages in case of server failure, can be configured for clustered usage where geographically distributed clusters of HornetQ servers know how to load balance messages and provides a comprehensive management API to manage and monitor all HornetQ servers.

In this tutorial we will show you how to utilize HornetQ through Spring framework. To make things more interesting we are going to continue from where we left of at our previous article about Spring GWT Hibernate JPA Infinispan integration. We are going to use our GWTSpringInfinispan project and empower it with messaging functionality! Of course you can follow this article to integrate your Spring based project with HornetQ.

We will use HornetQ version 2.1.0.Final which you can download from here. We will also need jboss-logging-spi library. JBoss Logging SPI version 2.1.1.GA will be used, which you can download from JBoss Maven repository here

In order to properly integrate Spring and HornetQ at runtime, we must provide all necessary libraries to the Web application. So copy the files listed below under /war/WEB-INF/lib (copy the relevant files if you are using different versions)

From the HornetQ distribution

  • /lib/hornetq-bootstrap.jar
  • /lib/hornetq-core.jar
  • /lib/hornetq-jms.jar
  • /lib/hornetq-logging.jar
  • /lib/jnpserver.jar
  • /lib/netty.jar

The JBoss Logging SPI library

  • jboss-logging-spi-2.1.1.GA.jar

Finally, for HornetQ to work properly at runtime, several configuration files must be available at Web application classpath. As mentioned at the introductory section of this tutorial we can create clusters of HornetQ servers, for load balancing and high available messaging, or we can use HornetQ in a non clustered environment. Either case requires different configuration. HornetQ distribution contains all flavors of configuration files under /config directory. We will use the jboss-as-5 clustered configuration just to be able to use the full capabilities of the messaging platform. Copy the following files from /config/jboss-as-5/clustered directory to your application /resources package :

  • hornetq-configuration.xml – This is the main HornetQ configuration file
  • hornetq-jms.xml – The server side JMS service configuration file

Unless you are deploying inside JBoss application server, edit hornetq-configuration.xml file and replace “${jboss.server.data.dir}” with “${data.dir:../data}”

Copy the following file from /config/stand-alone/clustered directory to your application /resources package :

  • hornetq-users.xml – The user credentials file for HornetQ security manager

Before we continue to the actual integration and client implementation examples, let us pinpoint a few useful details about HornetQ server architecture and the configuration files mentioned above.

The HornetQ server does not speak JMS and in fact does not know anything about JMS, it’s a protocol agnostic messaging server designed to be used with multiple different protocols. HornetQ clients, potentially on different physical machines interact with the HornetQ server. HornetQ currently provides two APIs for messaging at the client side :

  • Core client API. This is a simple intuitive Java API that allows the full set of messaging functionality without some of the complexities of JMS
  • JMS client API. The standard JMS API is available at the client side

JMS semantics are implemented by a thin JMS facade layer on the client side. When a user uses the JMS API on the client side, all JMS interactions are translated into operations on the HornetQ core client API before being transferred over the wire using the HornetQ wire format. The server always just deals with core API interactions.

The standard stand – alone messaging server configuration comprises a core messaging server, a JMS service and a JNDI service.

The role of the JMS Service is to deploy and bind to JNDI any JMS Queue, Topic and ConnectionFactory instances from any server side hornetq-jms.xml configuration files. It also provides a simple management API for creating and destroying Queues, Topics and ConnectionFactory instances which can be accessed via JMX or the connection. It is a separate service to the HornetQ core server, since the core server is JMS agnostic. If you don’t want to deploy any JMS Queue, Topic or ConnectionFactory instances via server side XML configuration and don’t require a JMS management API on the server side then you can disable this service.

A JNDI server is also included since JNDI is a common requirement when using JMS to lookup Queues, Topics and ConnectionFactory instances. If you do not require JNDI then this service can also be disabled. HornetQ allows you to programmatically create JMS and core objects directly on the client side as opposed to looking them up from JNDI, so a JNDI server is not always a requirement.

HornetQ ships with a basic security manager implementation which obtains user credentials
from the hornetq-users.xml file. This file contains user, password and role information.

We are going to use HornetQ JMS Service and execute JMS client code in the same JVM with the naming server so we must create a “jndi.properties” file and place it under our application /resources package along with the rest HornetQ configuration files described above. The contents of the “jndi.properties” file should be as follows :

java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory

Before continuing we have to take care of dependences for our Eclipse project. The following jars should be included in the Java build path of the project :

  • jms.jar

Lets now integrate Spring with HornetQ. Locate your applicationContext.xml file /war/WEB-INF folder and add the following beans :

<bean name="namingServerImpl" class="org.jnp.server.NamingBeanImpl" init-method="start" destroy-method="stop" />

<bean name="namingServer" class="org.jnp.server.Main" init-method="start" destroy-method="stop">
 <property name="namingInfo" ref="namingServerImpl" />
 <property name="port" value="1099" />
 <property name="bindAddress" value="localhost" />
 <property name="rmiPort" value="1098" />
 <property name="rmiBindAddress" value="localhost" />
</bean>

<bean name="mbeanServer" class="java.lang.management.ManagementFactory" factory-method="getPlatformMBeanServer" />

<bean name="fileConfiguration" class="org.hornetq.core.config.impl.FileConfiguration" init-method="start" destroy-method="stop" />

<bean name="hornetQSecurityManagerImpl" class="org.hornetq.spi.core.security.HornetQSecurityManagerImpl" />

<!-- The core server -->
<bean name="hornetQServerImpl" class="org.hornetq.core.server.impl.HornetQServerImpl">
 <constructor-arg ref="fileConfiguration" />
 <constructor-arg ref="mbeanServer" />
 <constructor-arg ref="hornetQSecurityManagerImpl" />
</bean>

<!-- The JMS server -->
<bean name="jmsServerManagerImpl" class="org.hornetq.jms.server.impl.JMSServerManagerImpl" init-method="start" destroy-method="stop" depends-on="namingServer">
 <constructor-arg ref="hornetQServerImpl" />
</bean>

If you intend to configure Spring and HornetQ in a standalone environment the aforementioned configuration should be enough. In our case, where we are deploying a Web application on Apache – Tomcat, minor modifications should be made.

Apache – Tomcat provides a JNDI Service for all deployed Web applications to configure environment attributes and resources. Furthermore the Naming Context available at runtime is read only, due to the fact that environment and resource management is done using deployment descriptor files such as web.xml and context.xml. In addition, upon startup, Apache – Tomcat initializes its JNDI environment using system properties. As a result “in VM” clients that use JNDI InitialContext class (without providing constructor environment parameters) to perform naming operations, always retrieve Apache – Tomcat JNDI implementation Context interface.

In order for HornetQ JNDI server to coexist with Apache – Tomcat Naming Service and HornetQ JMS Service to be able to bind Queues, Topics and ConnectionFactory instances to JNDI, we must perform the following actions :

  • Disable Apache – Tomcat Naming Service for our Web application
  • Configure HornetQ JNDI server not to use an existing JNDI service if available, but always create a new one

To disable Apache – Tomcat Naming Service for our Web application we must perform the following actions :

  • Create a META-INF folder under /war folder of our project
  • Create a context.xml file containing the following context directive :
<Context override="true" useNaming="false" />

To configure HornetQ JNDI server not to use an existing JNDI service if available, we must add the following property to “namingServerImpl” Spring bean :

<property name="useGlobalService" value="false" />

In order to use HornetQ messaging service through Spring we can either create a connection factory, or lookup one from JNDI. A connection factory and “JmsTemplate” example is provided below :

<bean name="connectionFactory" class="org.hornetq.jms.client.HornetQConnectionFactory" >
 <constructor-arg>
  <bean class="org.hornetq.api.core.TransportConfiguration">
   <constructor-arg value="org.hornetq.integration.transports.netty.NettyConnectorFactory" />
   <constructor-arg>
    <map key-type="java.lang.String" value-type="java.lang.Object">
     <entry key="port" value="5445"></entry>
    </map>
   </constructor-arg>
  </bean>
 </constructor-arg>
</bean>

<bean name="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
 <property name="connectionFactory" ref="connectionFactory"></property>
</bean>

A JNDI lookup for a connection factory example is shown below :

<bean id="inVMConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean" depends-on="jmsServerManagerImpl">
 <property name="jndiName">
  <value>java:/ConnectionFactory</value>
 </property>
</bean>

We will use the JNDI lookup method to obtain a connection factory, so add the above configuration to your applicationContext.xml file.

Thats all the configuration we have to do, lets continue to implement an hypothetic business case using our newly integrated messaging service. Our Web application exposes functionality to add, update and retrieve “employee” data. Lets assume that we want to be notified every time an addition or alteration of “employee” data is performed. For the sake of simplicity the notification will be a log on the Apache – Tomcat console. We are going to implement a JMS producer to send a message to a “Notifications” queue every time a user performs an update to “employee” data. Additionally a JMS consumer must be implemented so as to process the “Notifications” queue messages and log to the console.

To create the “Notifications” queue and bind it to JNDI under the name “/queue/Notifications”, add the following to your hornetq-jms.xml file :

<queue name="Notifications">
 <entry name="/queue/Notifications"/>
</queue>

To be able to use the newly created “Notifications” queue through Spring beans, add the following JNDI lookup directive to your applicationContext.xml file :

<bean id="notificationsQueue" class="org.springframework.jndi.JndiObjectFactoryBean" depends-on="jmsServerManagerImpl">
 <property name="jndiName">
  <value>/queue/Notifications</value>
 </property>
</bean>

Since both JMS producer and consumer are server side components, they must be placed under /server subpackage of our application. We choose to create them under the /server/utils subpackage because they are utility classes in nature. Example JMS producer and consumer classes are provided below :

package com.javacodegeeks.gwtspring.server.utils;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service("notificationsProducer")
public class NotificationsProducer {

 @Autowired
 Queue notificationsQueue;

 @Autowired
 ConnectionFactory inVMConnectionFactory;

 private Connection notificationsQueueConnection;
 private Session notificationsQueueSession;
 private MessageProducer notificationsQueueProducer;


 @PostConstruct
 public void init() throws Exception {
  notificationsQueueConnection = inVMConnectionFactory.createConnection();
  notificationsQueueSession = notificationsQueueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  notificationsQueueProducer = notificationsQueueSession.createProducer(notificationsQueue);
  notificationsQueueProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
 }

 @PreDestroy
 public void destroy() throws Exception {
  if(notificationsQueueConnection != null)
   notificationsQueueConnection.close();
 }

 public void sendNotification(final String message) throws Exception {

  TextMessage textMessage = notificationsQueueSession.createTextMessage(message);
  notificationsQueueProducer.send(textMessage);

 }

}

And the consumer,

package com.javacodegeeks.gwtspring.server.utils;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service("notificationsConsumer")
public class NotificationsConsumer implements MessageListener {

 @Autowired
 Queue notificationsQueue;

 @Autowired
 ConnectionFactory inVMConnectionFactory;

 private Connection notificationsQueueConnection;

 @PostConstruct
 public void init() throws Exception {
  notificationsQueueConnection = inVMConnectionFactory.createConnection();
  Session notificationsQueueSession = notificationsQueueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  MessageConsumer notificationsQueueConsumer = notificationsQueueSession.createConsumer(notificationsQueue);
  notificationsQueueConsumer.setMessageListener(this);
  notificationsQueueConnection.start();
 }

 @PreDestroy
 public void destroy() throws Exception {
  if(notificationsQueueConnection != null)
   notificationsQueueConnection.close();
 }

 @Override
 public void onMessage(Message message) {
  if (message instanceof TextMessage) {
   try {
    String text = ((TextMessage) message).getText();
    System.out.println("The Notification Message is : \n" + text);
   } catch (JMSException ex) {
     throw new RuntimeException(ex);
   }
  } else {
    throw new IllegalArgumentException("Message must be of type TextMessage");
  }
 }

}

To conclude our example business case we have to modify “employeeService” Spring bean so as to use the “notificationsProducer” utility bean to send notification messages every time the user requests to save or update “employee” data. We use the “@Autowire” annotation to wire-up “notificationProducer” inside “employeeService” and invoke the “sendNotification” operation from “notificationProducer” in order to send a notification every time the saveOrUpdateEmployee” operation of “employeeService“ “ is requested. The complete code is shown below :

package com.javacodegeeks.gwtspring.server.services;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import com.javacodegeeks.gwtspring.server.dao.EmployeeDAO;
import com.javacodegeeks.gwtspring.server.utils.NotificationsProducer;
import com.javacodegeeks.gwtspring.shared.dto.EmployeeDTO;
import com.javacodegeeks.gwtspring.shared.services.EmployeeService;

@Service("employeeService")
public class EmployeeServiceImpl implements EmployeeService {

 @Autowired
 private EmployeeDAO employeeDAO;

 @Autowired
 NotificationsProducer notificationsProducer;

 @PostConstruct
 public void init() throws Exception {
 }

 @PreDestroy
 public void destroy() {
 }

 @Transactional(propagation=Propagation.SUPPORTS, rollbackFor=Exception.class)
 public EmployeeDTO findEmployee(long employeeId) {

  return employeeDAO.findById(employeeId);

 }

 @Transactional(propagation=Propagation.REQUIRED, rollbackFor=Exception.class)
 public void saveEmployee(long employeeId, String name, String surname, String jobDescription) throws Exception {

  EmployeeDTO employeeDTO = employeeDAO.findById(employeeId);

  if(employeeDTO == null) {
   employeeDTO = new EmployeeDTO(employeeId, name,surname, jobDescription);
   employeeDAO.persist(employeeDTO);
  }

 }

 @Transactional(propagation=Propagation.REQUIRED, rollbackFor=Exception.class)
 public void updateEmployee(long employeeId, String name, String surname, String jobDescription) throws Exception {

  EmployeeDTO employeeDTO = employeeDAO.findById(employeeId);

  if(employeeDTO != null) {
   employeeDTO.setEmployeeName(name);
   employeeDTO.setEmployeeSurname(surname);
   employeeDTO.setJob(jobDescription);
  }

 }

 @Transactional(propagation=Propagation.REQUIRED, rollbackFor=Exception.class)
 public void deleteEmployee(long employeeId) throws Exception {

  EmployeeDTO employeeDTO = employeeDAO.findById(employeeId);

  if(employeeDTO != null)
   employeeDAO.remove(employeeDTO);

 }

 @Transactional(propagation=Propagation.REQUIRED, rollbackFor=Exception.class)
 public void saveOrUpdateEmployee(long employeeId, String name, String surname, String jobDescription) throws Exception {

  EmployeeDTO employeeDTO = new EmployeeDTO(employeeId, name,surname, jobDescription);

  employeeDAO.merge(employeeDTO);

  notificationsProducer.sendNotification("Save Or Update Employee with values : \nID : " + employeeId + "\nName : " + name + "\nSurname : " + surname + "\nJob description : " + jobDescription);

 }

}

Thats it! To deploy the web application just copy the /war folder in Apache – Tomcat “webapps” folder. You can change the name of the war folder to whatever you like, preferably rename it after the project name e.g. GWTSpringInfinispanHornetQ

Prior lunching the application do not forget to create the database schema, here “javacodegeeks”.

To lunch the application point your browser to the following address

http://localhost:8080/GWTSpringInfinispanHornetQ/

If all went well you should see your main web page. Two text boxes should be displayed followed by a button each. In the first text box you can save or update an employee to the database. Provide as input the id, the name, the surname, and a job description separated by a space character. Clicking on the “SaveOrUpdate” button the provided information will be stored to the database. For existing “employee” entries (same id) an update will be performed. In both cases a notification log should be recorded. The log format should be as follows :

The Notification Message is :
Save Or Update Employee with values :
ID : xxx
Name : xxx
Surname : xxx
Job description : xxx

Where “xxx” should be the “employee” information you provided. Please see the log files (catalina.out). The second text box is used to retrieve existing “employee” entries. Provide an “employee” id and click on the “Retrieve” button. If the “employee” exists you should see the “employee” id, name, surname and job description.

You can download the project from here (required 3rd party libraries as described at the beginning and previous articles are not included)

Have Fun!

Justin

Related Articles :

Byron Kiourtzoglou

Byron is a master software engineer working in the IT and Telecom domains. He is an applications developer in a wide variety of applications/services. He is currently acting as the team leader and technical architect for a proprietary service creation and integration platform for both the IT and Telecom industries in addition to a in-house big data real-time analytics solution. He is always fascinated by SOA, middleware services and mobile development. Byron is co-founder and Executive Editor at Java Code Geeks.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

4 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Idan Fridman
Idan Fridman
11 years ago

Hi, I tried your example on standalone application and tried to ignore the tomcat parts of yours. I get this exception: org.springframework.beans.factory.BeanCreationException: Error creating bean with name ‘namingServerImpl’ defined in class path resource [applicationContext.xml]: Invocation of init method failed; nested exception is java.lang.NoSuchMethodError: org.jnp.interfaces.NamingContext.getLocal()Lorg/jnp/interfaces/Naming; at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1455) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:519) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:456) at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:294) at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:225) at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:291) at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:193) at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:585) at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:913) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:464) at org.springframework.context.support.ClassPathXmlApplicationContext.(ClassPathXmlApplicationContext.java:139) at org.springframework.context.support.ClassPathXmlApplicationContext.(ClassPathXmlApplicationContext.java:83) at com.finbird.fixgw.daemon.FeedDaemon.start(FeedDaemon.java:78) at com.finbird.fixgw.daemon.FeedDaemon.main(FeedDaemon.java:97)Caused by: java.lang.NoSuchMethodError: org.jnp.interfaces.NamingContext.getLocal()Lorg/jnp/interfaces/Naming; at org.jnp.server.NamingBeanImpl.start(NamingBeanImpl.java:136) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.lang.reflect.Method.invoke(Unknown Source) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeCustomInitMethod(AbstractAutowireCapableBeanFactory.java:1581) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1522) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1452) … 13 more any idea… Read more »

Idan Fridman
Idan Fridman
11 years ago

Hi,
Any success with that? could you refer me to some complete example? thanks.

Alex
Alex
11 years ago

Hi Justin, thanks for the interesting example! I’m no, JMS or HornetQ expert, but your NotificationsConsumer looks like it can only consume a single message at a time (please correct me if I’m wrong). Maybe this was intentional given the scope of the post (which would be fair enough). How would you propose to extend this example to consume messages in parallel? Thanks again for the post and any suggestions! Alex

Lokesh Gupta
Lokesh Gupta
11 years ago

I tried my hands on this integration, and was quite successful.. [Plz do not forget to add additional jar files in classpath]

http://howtodoinjava.com/2013/03/22/hornetq-stand-alone-server-example-using-maven/
http://howtodoinjava.com/2013/03/22/basic-jms-messaging-example-using-hornetq-stand-alone-server/

Back to top button