Enterprise Java

Monitoring and Management

This article is part of our Academy Course titled Spring Integration for EAI.

In this course, you are introduced to Enterprise Application Integration patterns and how Spring Integration addresses them. Next, you delve into the fundamentals of Spring Integration, like channels, transformers and adapters. Check it out here!

1. Introduction

After having experimented with the main components provided by Spring Integration and seen how it integrates well with other systems like JMS queues or web services, this chapter finishes the course by showing different mechanisms of monitoring or gathering more information about what is going on within the messaging system.

Some of these mechanisms consist of managing or monitoring the application through MBeans, which are part of the JMX specification. We will also learn how to monitor messages to see which components were involved during the messaging flow and how to persist messages for components that have the capability to buffer messages.

Another mechanism discussed in this chapter is how we will implement the EIP idempotent receiver pattern using a metadata store.

Finally, the last mechanism described is the control bus. This will let us send messages that will invoke operations on components in the application context.

2. Publishing and receiving JMX notifications

The JMX specification defines a mechanism that allows MBeans to publish notifications that will be sent to other MBeans or to the management application. The Oracle documentation explains how to implement this mechanism.

Spring Integration supports this feature by providing channel adapters that are both able to publish and receive JMX notifications. We are going to see an example that uses both channel adapters:

  • A notification listening channel adapter
  • A notification publishing channel adapter

2.1. Publishing a JMX notification

In the first part of the example, the messaging system receives a String message (a message with a payload of type String) through its entry gateway. It then uses a service activator (notification handler) to build a javax.management.Notification and sends it to the notification publishing channel adapter, which will publish the JMX notification.

The flow of this first part is shown below:

Figure 1
Figure 1

The xml configuration equivalent to the previous graphic:

<context:component-scan base-package="xpadro.spring.integration.jmx.notification"/>
    
<context:mbean-export/>
<context:mbean-server/>

<!-- Sending Notifications -->
<int:gateway service-interface="xpadro.spring.integration.jmx.notification.JmxNotificationGateway" default-request-channel="entryChannel"/>

<int:channel id="entryChannel"/>

<int:service-activator input-channel="entryChannel" output-channel="sendNotificationChannel" 
    ref="notificationHandler" method="buildNotification"/>

<int:channel id="sendNotificationChannel"/>

<int-jmx:notification-publishing-channel-adapter channel="sendNotificationChannel" 
    object-name="xpadro.spring.integration.jmx.adapter:type=integrationMBean,name=integrationMbean"/>

The gateway is as simple as in previous examples. Remember that the @Gateway annotation is not necessary if you have just one method:

public interface JmxNotificationGateway {

    public void send(String type);
}

A Message will reach the service activator, which will build the message with the JMX notification:

@Component("notificationHandler")
public class NotificationHandler {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    private static final String NOTIFICATION_TYPE_HEADER = "jmx_notificationType";
    
    public void receive(Message<Notification> msg) {
        logger.info("Notification received: {}", msg.getPayload().getType());
    }
    
    public Message<Notification> buildNotification(Message<String> msg) {
        Notification notification = new Notification(msg.getPayload(), this, 0);
        
        return MessageBuilder.withPayload(notification)
                .copyHeadersIfAbsent(msg.getHeaders()).setHeader(NOTIFICATION_TYPE_HEADER, "myJmxNotification").build();
    }
}

Notice that we have set a new header. This is necessary to provide the notification type or the JMX adapter will throw an IllegalArgumentException with the message “No notification type header is available, and no default has been provided”.

Finally, we just need to return the message in order to be sent to the publishing adapter. The rest is handled by Spring Integration.

2.2. Receiving a JMX notification

The second part of the flow consists in a notification listening channel adapter that will receive our previously published notification.

Figure 2
Figure 2

The xml configuration:

<!-- Receiving Notifications -->
<int-jmx:notification-listening-channel-adapter channel="receiveNotificationChannel" 
    object-name="xpadro.spring.integration.jmx.adapter:type=integrationMBean,name=integrationMbean"/>

<int:channel id="receiveNotificationChannel"/>

<int:service-activator input-channel="receiveNotificationChannel" 
    ref="notificationHandler" method="receive"/>

We will just receive the notification and log it:

public void receive(Message<Notification> msg) {
    logger.info("Notification received: {}", msg.getPayload().getType());
}

The application that runs the example:

public class NotificationApp {
    public static void main(String[] args) throws InterruptedException {
        AbstractApplicationContext context = new ClassPathXmlApplicationContext("classpath:xpadro/spring/integration/jmx/config/int-notification-config.xml");
        
        JmxNotificationGateway gateway = context.getBean(JmxNotificationGateway.class);
        gateway.send("gatewayNotification");
        Thread.sleep(1000);
        context.close();
    }
}

3. Polling managed attributes from an MBean

Imagine we have an MBean that is monitoring some feature. With the attribute polling channel adapter, your application will be able to poll the MBean and receive the updated data.

I have implemented an MBean that generates a random number every time is asked. Not the most vital feature but will serve us to see an example:

@Component("pollingMbean")
@ManagedResource
public class JmxPollingMBean {
    
    @ManagedAttribute
    public int getNumber() {
        Random rnd = new Random();
        int randomNum = rnd.nextInt(100);
        return randomNum;
    }
}

The flow couldn’t be simpler; we need an attribute polling channel adapter specifying the type and name of our MBean. The adapter will poll the MBean and place the result in the result channel. Each result polled will be shown on console through the stream stdout channel adapter:

<context:component-scan base-package="xpadro.spring.integration.jmx.polling"/>

<context:mbean-export/>
<context:mbean-server/>

<!-- Polling -->
<int-jmx:attribute-polling-channel-adapter channel="resultChannel"
        object-name="xpadro.spring.integration.jmx.polling:type=JmxPollingMBean,name=pollingMbean"
        attribute-name="Number">
    <int:poller max-messages-per-poll="1" fixed-delay="1000"/>
</int-jmx:attribute-polling-channel-adapter>

<int:channel id="resultChannel"/>

<int-stream:stdout-channel-adapter channel="resultChannel" append-newline="true"/>

The application that runs the example:

public class PollingApp {
    public static void main(String[] args) throws InterruptedException {
        AbstractApplicationContext context = new ClassPathXmlApplicationContext("classpath:xpadro/spring/integration/jmx/config/int-polling-config.xml");
        context.registerShutdownHook();
        Thread.sleep(5000);
        context.close();
    }
}

And the console output:

2014-04-16 16:23:43,867|AbstractEndpoint|started org.springframework.integration.config.ConsumerEndpointFactoryBean#0
82
72
20
47
21
2014-04-16 16:23:48,878|AbstractApplicationContext|Closing org.springframework.context.support.ClassPathXmlApplicationContext@7283922

4. Invoking MBean operations

The next mechanism allows us to invoke an operation of an MBean. We are going to implement another bean that contains a single operation, our old friend hello world:

@Component("operationMbean")
@ManagedResource
public class JmxOperationMBean {
    
    @ManagedOperation
    public String hello(String name) {
        return "Hello " + name;
    }
}

Now, we can use a channel adapter if the operation does not return a result, or a gateway if so. With the following xml configuration, we export the MBean and use the gateway to invoke the operation and wait for the result:

<context:component-scan base-package="xpadro.spring.integration.jmx.operation"/>
    
<context:mbean-export/>
<context:mbean-server/>

<int:gateway service-interface="xpadro.spring.integration.jmx.operation.JmxOperationGateway" default-request-channel="entryChannel"/>

<int-jmx:operation-invoking-outbound-gateway request-channel="entryChannel" reply-channel="replyChannel"
    object-name="xpadro.spring.integration.jmx.operation:type=JmxOperationMBean,name=operationMbean" 
    operation-name="hello"/>

<int:channel id="replyChannel"/>

<int-stream:stdout-channel-adapter channel="replyChannel" append-newline="true"/>

In order to work, we have to specify the type and name of the MBean, and the operation we want to invoke. The result will be sent to the stream channel adapter in order to be shown on the console.

The application that runs the example:

public class OperationApp {
    public static void main(String[] args) throws InterruptedException {
        AbstractApplicationContext context = new ClassPathXmlApplicationContext("classpath:xpadro/spring/integration/jmx/config/int-operation-config.xml");
        
        JmxOperationGateway gateway = context.getBean(JmxOperationGateway.class);
        gateway.hello("World");
        Thread.sleep(1000);
        context.close();
    }
}

5. Exporting components as MBeans

This component is used to export message channels, message handlers and message endpoints as MBeans so you can monitor them.

You need to put the following configuration into your application:

<int-jmx:mbean-export id="integrationMBeanExporter"
    default-domain="xpadro.integration.exporter" server="mbeanServer"/>

<bean id="mbeanServer" class="org.springframework.jmx.support.MBeanServerFactoryBean">
    <property name="locateExistingServerIfPossible" value="true"/>
</bean>

And set the following VM arguments as explained in the Spring documentation:

-Dcom.sun.management.jmxremote
-Dcom.sun.management.jmxremote.port=6969
-Dcom.sun.management.jmxremote.ssl=false
-Dcom.sun.management.jmxremote.authenticate=false

The application that runs the example sends three messages:

public class ExporterApp {
    public static void main(String[] args) throws InterruptedException {
        AbstractApplicationContext context = new ClassPathXmlApplicationContext("classpath:xpadro/spring/integration/jmx/config/int-exporter-config.xml");
        context.registerShutdownHook();
        
        JmxExporterGateway gateway = context.getBean(JmxExporterGateway.class);
        gateway.sendMessage("message 1");
        Thread.sleep(500);
        gateway.sendMessage("message 2");
        Thread.sleep(500);
        gateway.sendMessage("message 3");
    }
}

Once the application is running, you can see information about the components. The following screenshot is made on the JConsole:

Figure 3
Figure 3

You can notice that the sendCount attribute of the entry channel has the value 3, because we have sent three messages in our example.

6. Trailing a message path

In a messaging system, components are loosely coupled. This means that the sending component does not need to know who will receive the message. And the other way round, the receiver is just interested in the message received, not who sent it. This benefit can be not so good when we need to debug the application.

The message history consists in attaching to the message, the list of all components the message passed through.

The following application will test this feature by sending a message through several components:

Figure 4
Figure 4

The key element of the configuration is not visible in the previous graphic: the message-history element:

<context:component-scan base-package="xpadro.spring.integration.msg.history"/>

<int:message-history/>

<int:gateway id="historyGateway" service-interface="xpadro.spring.integration.msg.history.HistoryGateway" 
    default-request-channel="entryChannel"/>

<int:channel id="entryChannel"/>

<int:transformer id="msgTransformer" input-channel="entryChannel" 
    expression="payload + 'transformed'" output-channel="transformedChannel"/>

<int:channel id="transformedChannel"/>

<int:service-activator input-channel="transformedChannel" ref="historyActivator"/>

With this configuration set, the service activator at the end of the messaging flow will be able to retrieve the list of visited components by looking at the header of the message:

@Component("historyActivator")
public class HistoryActivator {
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    public void handle(Message<String> msg) {
        MessageHistory msgHistory = msg.getHeaders().get(MessageHistory.HEADER_NAME, MessageHistory.class);
        if (msgHistory != null) {
            logger.info("Components visited: {}", msgHistory.toString());
        }
    }
}

The application running this example:

public class MsgHistoryApp {
    public static void main(String[] args) throws InterruptedException {
        AbstractApplicationContext context = new ClassPathXmlApplicationContext("classpath:xpadro/spring/integration/msg/history/config/int-msg-history-config.xml");
        
        HistoryGateway gateway = context.getBean(HistoryGateway.class);
        gateway.send("myTest");
        Thread.sleep(1000);
        context.close();
    }
}

The result will be shown on the console:

2014-04-16 17:34:52,551|HistoryActivator|Components visited: historyGateway,entryChannel,msgTransformer,transformedChannel

7. Persisting buffered messages

Some of the components in Spring Integration can buffer messages. For example, a queue channel will buffer messages until consumers retrieve them from it. Another example is the aggregator endpoint; as seen in the second tutorial, this endpoint will gather messages until the group is complete basing its decision on the release strategy.

These integration patterns imply that if a failure occurs, buffered messages can be lost. To prevent this, we can persist these messages, for example storing them into a database. By default, Spring Integration stores these messages in memory. We are going to change this using a message store.

For our example, we will store these messages into a MongoDB database. In order to do that, we just need the following configuration:

<bean id="mongoDbFactory" class="org.springframework.data.mongodb.core.SimpleMongoDbFactory">
    <constructor-arg>
        <bean class="com.mongodb.Mongo"/>
    </constructor-arg>
    <constructor-arg value="jcgdb"/>
</bean>

<bean id="mongoDbMessageStore" class="org.springframework.integration.mongodb.store.ConfigurableMongoDbMessageStore">
    <constructor-arg ref="mongoDbFactory"/>
</bean>

Now, we are going to create an application to test this feature. I have implemented a flow that receives through a gateway, a message with a String payload. This message is sent by the gateway to a queue channel that will buffer the messages until the service activator msgStoreActivator retrieves it from the queue. The service activator will poll messages every five seconds:

<context:component-scan base-package="xpadro.spring.integration.msg.store"/>

<import resource="mongodb-config.xml"/>

<int:gateway id="storeGateway" service-interface="xpadro.spring.integration.msg.store.MsgStoreGateway" 
    default-request-channel="entryChannel"/>

<int:channel id="entryChannel">
    <int:queue message-store="myMessageStore"/>
</int:channel>

<int:service-activator input-channel="entryChannel" ref="msgStoreActivator">
    <int:poller fixed-rate="5000"/>
</int:service-activator>

Maybe you have noticed the myMessageStore bean. In order to see how the persisting messages mechanism works, I have extended the ConfigurableMongoDBMessageStore class to put logs in it and debug the result. If you want to try this, you can delete the MongoDB messageStore bean in mongodb-config.xml since we are no longer using it.


 
I have overwritten two methods:

@Component("myMessageStore")
public class MyMessageStore extends ConfigurableMongoDbMessageStore {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    private static final String STORE_COLLECTION_NAME = "messageStoreCollection";

    @Autowired
    public MyMessageStore(MongoDbFactory mongoDbFactory) {
        super(mongoDbFactory, STORE_COLLECTION_NAME);
        logger.info("Creating message store '{}'", STORE_COLLECTION_NAME);
    }
    
    @Override
    public MessageGroup addMessageToGroup(Object groupId, Message<?> message) {
        logger.info("Adding message '{}' to group '{}'", message.getPayload(), groupId);
        return super.addMessageToGroup(groupId, message);
    }

    @Override
    public Message<?> pollMessageFromGroup(Object groupId) {
        Message<?> msg = super.pollMessageFromGroup(groupId);
        if (msg != null) {
            logger.info("polling message '{}' from group '{}'", msg.getPayload(), groupId);
        }
        else {
            logger.info("Polling null message from group {}", groupId);
        }
        
        return msg;
    }
}

This mechanism works as follows:

  1. When a message reaches the queue channel, which have our message store configured, it will invoke the ‘addMessageToGroup’ method. This method will insert a document with the payload to the MongoDB collection specified in the constructor. This is done by using a MongoTemplate.
  2. When the consumer polls the message, the pollMessageFromGroup will be invoked, retrieving the document from the collection.

Let’s see how it works by debugging the code. We will stop just before polling the message to see how it is stored in the database:

Figure 5
Figure 5

At this moment, we can take a look at the database:

Figure 6
Figure 6

Once resumed, the message is polled from the collection:

Figure 7
Figure 7

The application that runs the example:

public class MsgStoreApp {
    public static void main(String[] args) throws InterruptedException {
        AbstractApplicationContext context = new ClassPathXmlApplicationContext("classpath:xpadro/spring/integration/msg/store/config/int-msg-store-config.xml");
        
        MsgStoreGateway gateway = context.getBean(MsgStoreGateway.class);
        
        gateway.send("myMessage");
        Thread.sleep(30000);
        context.close();
    }
}

8. Implementing idempotent components

If our application needs to avoid duplicate messages, Spring Integration provides this mechanism by implementing the idempotent receiver pattern. The responsible of detecting duplicate messages is the metadata store component. This component consists in storing key-value pairs. The framework provides two implementations of the interface MetadataStore:

OK, let’s start with the configuration file:

<context:component-scan base-package="xpadro.spring.integration.msg.metadata"/>

<bean id="metadataStore"
class="org.springframework.integration.metadata.PropertiesPersistingMetadataStore"/>

<int:gateway id="metadataGateway" service-interface="xpadro.spring.integration.msg.metadata.MetadataGateway"
    default-request-channel="entryChannel"/>

<int:channel id="entryChannel"/>

<int:filter input-channel="entryChannel" output-channel="processChannel"
    discard-channel="discardChannel" expression="@metadataStore.get(headers.messageId) == null"/>


<!-- Process message -->            
<int:publish-subscribe-channel id="processChannel"/>

<int:outbound-channel-adapter channel="processChannel" expression="@metadataStore.put(headers.messageId, '')"/>

<int:service-activator input-channel="processChannel" ref="metadataActivator" method="process"/>


<!-- Duplicated message - discard it -->
<int:channel id="discardChannel"/>

<int:service-activator input-channel="discardChannel" ref="metadataActivator" method="discard"/>

We have defined a “metadataStore” in order to use our properties metadata store instead of the default in-memory implementation.

The flow is explained here:

  1. A message is sent to the gateway.
  2. The filter will send the message to the process channel since it is the first time it is sent.
  3. There are two subscribers to the process channel: the service activator that processes the message and an outbound channel adapter. The channel adapter will send the value of the message header messagId to the metadata store.
  4. The metadata store stores the value in the properties file.
  5. Next time the same message is sent; the filter will find the value and discard the message.

The metadata store creates a properties file in the file system. If you are using Windows, you will see a metadata-store.properties file in the ‘C:\Users\username\AppData\Local\Temp\spring-integration’ folder

The example uses a service activator to log if the message has been processed:

@Component("metadataActivator")
public class MetadataActivator {
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    public void process(Message<String> msg) {
        logger.info("Message processed: {}", msg.getPayload());
    }
    
    public void discard(Message<String> msg) {
        logger.info("Message discarded: {}", msg.getPayload());
    }
}

The application will run the example:

public class MetadataApp {
    private static final String MESSAGE_STORE_HEADER = "messageId";
    
    public static void main(String[] args) throws InterruptedException {
        AbstractApplicationContext context = new ClassPathXmlApplicationContext("classpath:xpadro/spring/integration/msg/metadata/config/int-msg-metadata-config.xml");
        
        MetadataGateway gateway = context.getBean(MetadataGateway.class);
        
        Map<String,String> headers = new HashMap<>();
        headers.put(MESSAGE_STORE_HEADER, "msg1");
        Message<String> msg1 = MessageBuilder.withPayload("msg1").copyHeaders(headers).build();
        
        headers = new HashMap<>();
        headers.put(MESSAGE_STORE_HEADER, "msg2");
        Message<String> msg2 = MessageBuilder.withPayload("msg2").copyHeaders(headers).build();
        
        gateway.sendMessage(msg1);
        Thread.sleep(500);
        gateway.sendMessage(msg1);
        Thread.sleep(500);
        gateway.sendMessage(msg2);
        
        Thread.sleep(3000);
        context.close();
    }
}

The first invocation will result in the following output on the console:

2014-04-17 13:00:08,223|MetadataActivator|Message processed: msg1
2014-04-17 13:00:08,726|MetadataActivator|Message discarded: msg1
2014-04-17 13:00:09,229|MetadataActivator|Message processed: msg2

Now remember that the PropertiesPersistingMetadataStore stores the data in a properties file. This means that the data will survive ApplicationContext restarts. So, if we don’t delete the properties file and we run the example again, the result will be different:

2014-04-17 13:02:27,117|MetadataActivator|Message discarded: msg1
2014-04-17 13:02:27,620|MetadataActivator|Message discarded: msg1
2014-04-17 13:02:28,123|MetadataActivator|Message discarded: msg2

9. Sending operation invocation requests

The last mechanism discussed on this tutorial is the control bus. The control bus will let you manage the system the same way it is done by the application. The message will be executed as a Spring Expression Language. To be executable from the control bus, the method needs to use the @ManagedAttribute or @ManagedOperation annotation.

This section’s example uses a control bus to invoke a method on a bean:

<context:component-scan base-package="xpadro.spring.integration.control.bus"/>

<int:channel id="entryChannel"/>

<int:control-bus input-channel="entryChannel" output-channel="resultChannel"/>

<int:channel id="resultChannel"/>

<int:service-activator input-channel="resultChannel" ref="controlbusActivator"/>

The operation that will be invoked is as follows:

@Component("controlbusBean")
public class ControlBusBean {

    @ManagedOperation
    public String greet(String name) {
        return "Hello " + name;
    }
}

The application that runs the example sends a message with the expression to be executed:

public class ControlBusApp {
    public static void main(String[] args) throws InterruptedException {
        AbstractApplicationContext context = new ClassPathXmlApplicationContext("classpath:xpadro/spring/integration/control/bus/config/int-control-bus-config.xml");
        MessageChannel channel = context.getBean("entryChannel", MessageChannel.class);
        
        Message<String> msg = MessageBuilder.withPayload("@controlbusBean.greet('World!')").build();
        channel.send(msg);
        
        Thread.sleep(3000);
        context.close();
    }
}

The result is shown on the console:

2014-04-17 13:21:42,910|ControlBusActivator|Message received: Hello World!

Xavier Padro

Xavier is a software developer working in a consulting firm based in Barcelona. He is specialized in web application development with experience in both frontend and backend. He is interested in everything related to Java and the Spring framework.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button