Coherence Event Processing by using Map Trigger Feature

This article shows how to process Coherence events by using Map Triggers. Basically, Distributed Data Management in Oracle Coherence is suggested to look over basic configuration and implementation of Oracle Coherence API
Map Triggers are one of the most important features of Oracle Coherence to provide a highly customized cache management system. MapTrigger represents a functional agent that allows to validate, reject or modify mutating operations against an underlying map. Also, they can prevent invalid transactions, enforce security, provide event logging and auditing, and gather statistics on data modifications.

For example, we have code that is working with a NamedCache, and we want to change an entry’s behavior or contents before the entry is inserted into the map. This change can be made without modifying all the existing code by enabling a map trigger.

There are two ways to add Map Triggers feature to application :

1) A MapTriggerListener can be used to register a MapTrigger with a Named Cache
2) The class-factory mechanism can be used in the coherence-cache-config.xml configuration file

In the following sample application, MapTrigger functionality is implemented by following the first way. A new cluster called OTV, is created and User bean is distributed by user-map NamedCache object used among two members of the cluster.

Used Technologies :

JDK 1.6.0_35
Spring 3.1.2
Coherence 3.7.1
Maven 3.0.2
 
STEP 1 : CREATE MAVEN PROJECT

A maven project is created as below. (It can be created by using Maven or IDE Plug-in).


 
STEP 2 : COHERENCE PACKAGE

Coherence is downloaded via Coherence Package

STEP 3 : LIBRARIES

Firstly, Spring dependencies are added to Maven’ s pom.xml.

	<!-- Spring 3.1.2 dependencies -->
	<dependency>
		<groupId>org.springframework</groupId>
		<artifactId>spring-core</artifactId>
		<version>${spring.version}</version>
	</dependency>
	<dependency>
		<groupId>org.springframework</groupId>
		<artifactId>spring-context</artifactId>
		<version>${spring.version}</version>
	</dependency>

Coherence library is installed to Local Maven Repository manually and its description is added to pom.xml as below. Also if Maven is not used to manage the project, coherence.jar file can be added to classpath.

	<!-- Coherence library(from local repository) -->
	<dependency>
		<groupId>com.tangosol</groupId>
		<artifactId>coherence</artifactId>
		<version>3.7.1</version>
	</dependency>

For creating runnable-jar, the following Maven plugin can be used.

	<plugin>
		<groupId>org.apache.maven.plugins</groupId>
		<artifactId>maven-shade-plugin</artifactId>
		<version>1.3.1</version>

		<executions>
			<execution>
				<phase>package</phase>
				<goals>
					<goal>shade</goal>
				</goals>
				<configuration>
					<transformers>
						<transformer
							implementation='org.apache.maven.plugins.shade.resource.ManifestResourceTransformer'>
							<mainClass>com.otv.exe.Application</mainClass>
						</transformer>
						<transformer
							implementation='org.apache.maven.plugins.shade.resource.AppendingTransformer'>
							<resource>META-INF/spring.handlers</resource>
						</transformer>
						<transformer
							implementation='org.apache.maven.plugins.shade.resource.AppendingTransformer'>
							<resource>META-INF/spring.schemas</resource>
						</transformer>
					</transformers>
				</configuration>
			</execution>
		</executions>
	</plugin>

 
STEP 4 : CREATE otv-coherence-cache-config.xml

First Coherence configuration file is otv-coherence-cache-config.xml. It contains caching-schemes(distributed or replicated) and caching-scheme-mapping configuration. Created cache configuration should be added to coherence-cache-config.xml.

<?xml version='1.0'?>

<cache-config xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'
	xmlns='http://xmlns.oracle.com/coherence/coherence-cache-config'
	xsi:schemaLocation='http://xmlns.oracle.com/coherence/coherence-cache-config
   coherence-cache-config.xsd'>

	<caching-scheme-mapping>
		<cache-mapping>
			<cache-name>user-map</cache-name>
			<scheme-name>MapDistCache</scheme-name>
		</cache-mapping>
	</caching-scheme-mapping>

    <caching-schemes>
		<distributed-scheme>
			<scheme-name>MapDistCache</scheme-name>
			<service-name>MapDistCache</service-name>
			<backing-map-scheme>
				<local-scheme>
					<unit-calculator>BINARY</unit-calculator>
				</local-scheme>
			</backing-map-scheme>
			<autostart>true</autostart>
		</distributed-scheme>
	</caching-schemes>

</cache-config>

 
STEP 5 : CREATE tangosol-coherence-override.xml

Second Coherence configuration file is tangosol-coherence-override.xml. It contains cluster, member-identity and configurable-cache-factory configuration.

tangosol-coherence-override.xml for first member of the cluster :

<?xml version='1.0'?>

<coherence xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'
   xmlns='http://xmlns.oracle.com/coherence/coherence-operational-config'
   xsi:schemaLocation='http://xmlns.oracle.com/coherence/coherence-operational-config coherence-operational-config.xsd'>

   <cluster-config>

      <member-identity>
         <cluster-name>OTV</cluster-name>
         <role-name>OTV1</role-name>
      </member-identity>

      <unicast-listener>
	      <well-known-addresses>
	        <socket-address id='1'>
	          <address>x.x.x.x</address>
	          <port>8089</port>
	        </socket-address>
	        <socket-address id='2'>
	          <address>x.x.x.x</address>
	          <port>8090</port>
	        </socket-address>
	      </well-known-addresses>

	      <machine-id>1001</machine-id>
     	  <address>x.x.x.x</address>
     	  <port>8089</port>
     	  <port-auto-adjust>true</port-auto-adjust>
      </unicast-listener>

   </cluster-config>

   <configurable-cache-factory-config>
      <init-params>
         <init-param>
            <param-type>java.lang.String</param-type>
            <param-value system-property='tangosol.coherence.cacheconfig'>
              otv-coherence-cache-config.xml</param-value>
         </init-param>
      </init-params>
   </configurable-cache-factory-config>

</coherence>

 
tangosol-coherence-override.xml for second member of the cluster :

<?xml version='1.0'?>

<coherence xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'
   xmlns='http://xmlns.oracle.com/coherence/coherence-operational-config'
   xsi:schemaLocation='http://xmlns.oracle.com/coherence/coherence-operational-config coherence-operational-config.xsd'>

   <cluster-config>

      <member-identity>
         <cluster-name>OTV</cluster-name>
         <role-name>OTV2</role-name>
      </member-identity>

      <unicast-listener>      

	      <well-known-addresses>
	        <socket-address id='1'>
	          <address>x.x.x.x</address>
	          <port>8090</port>
	        </socket-address>
	        <socket-address id='2'>
	          <address>x.x.x.x</address>
	          <port>8089</port>
	        </socket-address>
	      </well-known-addresses>

	      <machine-id>1002</machine-id>
     	  <address>x.x.x.x</address>
     	  <port>8090</port>
     	  <port-auto-adjust>true</port-auto-adjust>

      </unicast-listener>

   </cluster-config>

   <configurable-cache-factory-config>
      <init-params>
         <init-param>
            <param-type>java.lang.String</param-type>
            <param-value system-property='tangosol.coherence.cacheconfig'>
              otv-coherence-cache-config.xml</param-value>
         </init-param>
      </init-params>
   </configurable-cache-factory-config>

</coherence>

 
STEP 6 : CREATE applicationContext.xml

Spring Configuration file, applicationContext.xml, is created.

<beans xmlns='http://www.springframework.org/schema/beans'
	xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'
	xsi:schemaLocation='http://www.springframework.org/schema/beans

http://www.springframework.org/schema/beans/spring-beans-3.0.xsd'>

	<!-- Beans Declaration -->
	<bean id='userCacheService' class='com.otv.srv.UserCacheService'></bean>

    <bean id='userCacheUpdater' class='com.otv.exe.UserCacheUpdater'>
    	<property name='userCacheService' ref='userCacheService' />
    </bean>

</beans>

 
STEP 7 : CREATE User CLASS

A new User Spring bean is created. This bean will be distributed between two node in OTV cluster. For serializing, java.io.Serializable interface has been implemented but PortableObject can be implemented for better performance.

package com.otv.user;

import java.io.Serializable;

/**
 * User Bean
 *
 * @author onlinetechvision.com
 * @since 29 Oct 2012
 * @version 1.0.0
 *
 */
public class User implements Serializable {

	private static final long serialVersionUID = -1963764656789800896L;

	private String id;
	private String name;
	private String surname;	

	public String getId() {
		return id;
	}

	public void setId(String id) {
		this.id = id;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public String getSurname() {
		return surname;
	}

	public void setSurname(String surname) {
		this.surname = surname;
	}

	@Override
	public String toString() {
		StringBuilder strBuff = new StringBuilder();
		strBuff.append('id : ').append(id);
		strBuff.append(', name : ').append(name);
		strBuff.append(', surname : ').append(surname);
		return strBuff.toString();
	}
}

 
STEP 8 : CREATE IUserCacheService INTERFACE

A new IUserCacheService Interface is created for service layer to expose cache functionality.

package com.otv.srv;

import com.tangosol.net.NamedCache;

/**
 * IUserCacheService Interface exposes User Cache operations
 *
 * @author onlinetechvision.com
 * @since 29 Oct 2012
 * @version 1.0.0
 *
 */
public interface IUserCacheService {

	/**
     * Adds user entries to cache
     *
     * @param Object key
     * @param Object value
     *
     */
	void addToUserCache(Object key, Object value);

	/**
     * Deletes user entries from cache
     *
     * @param Object key
     *
     */
	void deleteFromUserCache(Object key);

	/**
     * Gets user cache
     *
     * @retun NamedCache Coherence named cache
     */
	NamedCache getUserCache();

}

 
STEP 9 : CREATE UserCacheService IMPL CLASS

UserCacheService is created by implementing IUserCacheService.

package com.otv.srv;

import com.otv.listener.UserMapListener;
import com.otv.trigger.UserMapTrigger;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
import com.tangosol.util.MapTriggerListener;

/**
 * CacheService Class implements the ICacheService
 *
 * @author onlinetechvision.com
 * @since 29 Oct 2012
 * @version 1.0.0
 *
 */
public class UserCacheService implements IUserCacheService {

	private NamedCache userCache = null;
	private static final String USER_MAP = 'user-map';
	private static final long   LOCK_TIMEOUT = -1;

	public UserCacheService() {
		setUserCache(CacheFactory.getCache(USER_MAP));
		getUserCache().addMapListener(new UserMapListener());
		getUserCache().addMapListener(new MapTriggerListener(new UserMapTrigger()));
	}	

	/**
     * Adds user entries to cache
     *
     * @param Object key
     * @param Object value
     *
     */
	public void addToUserCache(Object key, Object value) {
		// key is locked
		getUserCache().lock(key, LOCK_TIMEOUT);
		try {
			// application logic
			getUserCache().put(key, value);
		} finally {
			// key is unlocked
			getUserCache().unlock(key);
		}
	}

	/**
     * Deletes user entries from cache
     *
     * @param Object key
     *
     */
	public void deleteFromUserCache(Object key) {
		// key is locked
		getUserCache().lock(key, LOCK_TIMEOUT);
		try {
			// application logic
			getUserCache().remove(key);
		} finally {
			// key is unlocked
			getUserCache().unlock(key);
		}
	}

	/**
     * Gets user cache
     *
     * @retun NamedCache Coherence named cache
     */
	public NamedCache getUserCache() {
		return userCache;
	}

	public void setUserCache(NamedCache userCache) {
		this.userCache = userCache;
	}

}

 
STEP 10 : CREATE UserMapTrigger CLASS

A new UserMapTrigger class is created by implementing com.tangosol.util.MapTrigger Interface. This trigger processes the logic before the entry is inserted into the user-map.

package com.otv.trigger;

import org.apache.log4j.Logger;

import com.otv.listener.UserMapListener;
import com.otv.user.User;
import com.tangosol.util.MapTrigger;

/**
 * UserMapTrigger executes required logic before the operation is committed
 *
 * @author onlinetechvision.com
 * @since 29 Oct 2012
 * @version 1.0.0
 *
 */
public class UserMapTrigger implements MapTrigger {

	private static final long serialVersionUID = 5411263646665358790L;
	private static Logger logger = Logger.getLogger(UserMapListener.class);

	/**
     * Processes user cache entries
     *
     * @param MapTrigger.Entry entry
     *
     */
	public void process(MapTrigger.Entry entry) {
		User user = (User) entry.getValue();
		String id = user.getId();
		String name = user.getName();
		String updatedName = name.toUpperCase();

		String surname = user.getSurname();
		String updatedSurname = surname.toUpperCase();

		if (!updatedName.equals(name)) {
			user.setName(updatedName);
		}

		if (!updatedSurname.equals(surname)) {
			user.setSurname(updatedSurname);
		}

		user.setId(user.getName() + '_' + user.getSurname());

		entry.setValue(user);

		logger.debug('UserMapTrigger processes the entry before committing. '
							+ 'oldId : ' + id
	            			+ ', newId : ' + ((User)entry.getValue()).getId()
		                    + ', oldName : ' + name
				            + ', newName : ' + ((User)entry.getValue()).getName()
				            + ', oldSurname : ' + surname
				            + ', newSurname : ' + ((User)entry.getValue()).getSurname()
				            );
	}

	public boolean equals(Object o) {
		return o != null && o.getClass() == this.getClass();
	}

	public int hashCode() {
		return getClass().getName().hashCode();
	}
}

 
STEP 11 : CREATE USERMAPLISTENER IMPL CLASS

A new UserMapListener class is created. This listener receives distributed user-map events.

package com.otv.listener;

import org.apache.log4j.Logger;

import com.tangosol.util.MapEvent;
import com.tangosol.util.MapListener;

/**
 * UserMapListener Class listens user cache events
 *
 * @author onlinetechvision.com
 * @since 29 Oct 2012
 * @version 1.0.0
 *
 */
public class UserMapListener implements MapListener {

	private static Logger logger = Logger.getLogger(UserMapListener.class);

	public void entryDeleted(MapEvent me) {
		logger.debug('Deleted Key = ' + me.getKey() + ', Value = ' + me.getOldValue());
	}

	public void entryInserted(MapEvent me) {
		logger.debug('Inserted Key = ' + me.getKey() + ', Value = ' + me.getNewValue());
	}

	public void entryUpdated(MapEvent me) {
//		logger.debug('Updated Key = ' + me.getKey() + ', New_Value = ' + me.getNewValue() + ', Old Value = ' + me.getOldValue());
	}
}

 
STEP 12 : CREATE CacheUpdater CLASS

CacheUpdater Class is created to add new entry to cache and monitor cache content.

package com.otv.exe;

import java.util.Collection;

import org.apache.log4j.Logger;

import com.otv.srv.IUserCacheService;
import com.otv.user.User;

/**
 * CacheUpdater Class updates and prints user cache entries
 *
 * @author onlinetechvision.com
 * @since 29 Oct 2012
 * @version 1.0.0
 *
 */
public class UserCacheUpdater implements Runnable {

	private static Logger logger = Logger.getLogger(UserCacheUpdater.class);

	private IUserCacheService userCacheService;

	/**
     * Runs the UserCacheUpdater Thread
     *
     */
	public void run() {		

		//New User are created...
		User user = new User();

		//Only Name and Surname properties are set and Id property will be set at trigger level.
		user.setName('James');
		user.setSurname('Joyce');

		//Entries are added to cache...
		getUserCacheService().addToUserCache('user1', user);

//		The following code block shows the entry which will be inserted via second member of the cluster
//      so it should be opened and above code block should be commented-out before the project is built.

//		user.setName('Thomas');
//		user.setSurname('Moore');
//		getUserCacheService().addToUserCache('user2', user);

		//Cache Entries are being printed...
		printCacheEntries();
	}

	/**
     * Prints User Cache Entries
     *
     */
	@SuppressWarnings('unchecked')
	private void printCacheEntries() {
		Collection<User> userCollection = null;
		try {
			while(true) {
				userCollection = (Collection<User>)getUserCacheService().getUserCache().values();

				for(User user : userCollection) {
					logger.debug('Cache Content : '+user);
				}

				Thread.sleep(60000);
			}
		} catch (InterruptedException e) {
			logger.error('CacheUpdater is interrupted!', e);
		}
	}

	public IUserCacheService getUserCacheService() {
		return userCacheService;
	}

	public void setUserCacheService(IUserCacheService userCacheService) {
		this.userCacheService = userCacheService;
	}
}

 
STEP 13 : CREATE Application CLASS

Application Class is created to run the application.

package com.otv.exe;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * Application class starts the application
 *
 * @author onlinetechvision.com
 * @since 29 Oct 2012
 * @version 1.0.0
 *
 */
public class Application {

	/**
     * Starts the application
     *
     * @param  String[] args
     *
     */
	public static void main(String[] args) {
		ApplicationContext context = new ClassPathXmlApplicationContext('applicationContext.xml');

		UserCacheUpdater cacheUpdater = (UserCacheUpdater) context.getBean('userCacheUpdater');
		new Thread(cacheUpdater).start();
	}
}

nbsp;
STEP 14 : BUILD PROJECT

After OTV_Spring_Coherence_MapTrigger Project is build, OTV_Spring_Coherence_MapTrigger-0.0.1-SNAPSHOT.jar will be created.
Important Note : The Members of the cluster have got different configuration for Coherence so the project should be built separately for each member.

STEP 15 : RUN PROJECT BY STARTING ON MEMBER OF THE CLUSTER

After created OTV_Spring_Coherence-0.0.1-SNAPSHOT.jar file is run at the members of the cluster, below output logs will be shown on first member’ s console:

--A new cluster is created and First Member joins the cluster and adds a new entry to the cache.
29.10.2012 18:26:44 DEBUG (UserMapTrigger.java:49) - UserMapTrigger processes the entry before committing. oldId : null, newId : JAMES_JOYCE
, oldName : James, newName : JAMES, oldSurname : Joyce, newSurname : JOYCE
29.10.2012 18:26:44 DEBUG (UserMapListener.java:25) - Inserted Key = user1, Value = id : JAMES_JOYCE, name : JAMES, surname : JOYCE
29.10.2012 18:26:44 DEBUG (UserCacheUpdater.java:63) - Cache Content : id : JAMES_JOYCE, name : JAMES, surname : JOYCE

.......

--Second Member joins the cluster and adds a new entry to the cache.
29.10.2012 18:27:33 DEBUG (UserMapTrigger.java:49) - UserMapTrigger processes the entry before committing. oldId : null, newId : THOMAS_MOORE,
oldName : Thomas, newName : THOMAS, oldSurname : Moore, newSurname : MOORE
29.10.2012 18:27:34 DEBUG (UserMapListener.java:25) - Inserted Key = user2, Value = id : THOMAS_MOORE, name : THOMAS, surname : MOORE

.......

--After second member adds a new entry, cache content is shown as below :
29.10.2012 18:27:44 DEBUG (UserCacheUpdater.java:63) - Cache Content : id : THOMAS_MOORE, name : THOMAS, surname : MOORE
29.10.2012 18:27:45 DEBUG (UserCacheUpdater.java:63) - Cache Content : id : JAMES_JOYCE, name : JAMES, surname : JOYCE
29.10.2012 18:28:45 DEBUG (UserCacheUpdater.java:63) - Cache Content : id : THOMAS_MOORE, name : THOMAS, surname : MOORE
29.10.2012 18:28:45 DEBUG (UserCacheUpdater.java:63) - Cache Content : id : JAMES_JOYCE, name : JAMES, surname : JOYCE

Second member’ s console :

--After Second Member joins the cluster and adds a new entry to the cache, cache content is shown as below and the members has got same entries :.
29.10.2012 18:27:34 DEBUG (UserMapListener.java:25) - Inserted Key = user2, Value = id : THOMAS_MOORE, name : THOMAS, surname : MOORE
29.10.2012 18:27:34 DEBUG (UserCacheUpdater.java:63) - Cache Content : id : JAMES_JOYCE, name : JAMES, surname : JOYCE
29.10.2012 18:27:34 DEBUG (UserCacheUpdater.java:63) - Cache Content : id : THOMAS_MOORE, name : THOMAS, surname : MOORE
29.10.2012 18:28:34 DEBUG (UserCacheUpdater.java:63) - Cache Content : id : JAMES_JOYCE, name : JAMES, surname : JOYCE
29.10.2012 18:28:34 DEBUG (UserCacheUpdater.java:63) - Cache Content : id : THOMAS_MOORE, name : THOMAS, surname : MOORE

 
STEP 16 : DOWNLOAD

https://github.com/erenavsarogullari/OTV_Spring_Coherence_MapTrigger
 

Reference: Coherence Event Processing by using Map Trigger Feature from our JCG partner Eren Avsarogullari at the Online Technology Vision blog.

Do you want to know how to develop your skillset to become a Java Rockstar?

Subscribe to our newsletter to start Rocking right now!

To get you started we give you two of our best selling eBooks for FREE!

JPA Mini Book

Learn how to leverage the power of JPA in order to create robust and flexible Java applications. With this Mini Book, you will get introduced to JPA and smoothly transition to more advanced concepts.

JVM Troubleshooting Guide

The Java virtual machine is really the foundation of any Java EE platform. Learn how to master it with this advanced guide!

Given email address is already subscribed, thank you!
Oops. Something went wrong. Please try again later.
Please provide a valid email address.
Thank you, your sign-up request was successful! Please check your e-mail inbox.
Please complete the CAPTCHA.
Please fill in the required fields.

Leave a Reply


7 − = five



Java Code Geeks and all content copyright © 2010-2014, Exelixis Media Ltd | Terms of Use | Privacy Policy | Contact
All trademarks and registered trademarks appearing on Java Code Geeks are the property of their respective owners.
Java is a trademark or registered trademark of Oracle Corporation in the United States and other countries.
Java Code Geeks is not connected to Oracle Corporation and is not sponsored by Oracle Corporation.
Do you want to know how to develop your skillset and become a ...
Java Rockstar?

Subscribe to our newsletter to start Rocking right now!

To get you started we give you two of our best selling eBooks for FREE!

Get ready to Rock!
You can download the complementary eBooks using the links below:
Close