Enterprise Java

Chunk Oriented Processing in Spring Batch

Big Data Sets’ Processing is one of the most important problem in the software world. Spring Batch is a lightweight and robust batch framework to process the data sets.

Spring Batch Framework offers ‘TaskletStep Oriented’ and ‘Chunk Oriented’ processing style. In this article, Chunk Oriented Processing Model is explained. Also, TaskletStep Oriented Processing in Spring Batch Article is definitely suggested to investigate how to develop TaskletStep Oriented Processing in Spring Batch.
 
 
 
 
 
Chunk Oriented Processing Feature has come with Spring Batch v2.0. It refers to reading the data one at a time, and creating ‘chunks’ that will be written out, within a transaction boundary. One item is read from an ItemReader, handed to an ItemProcessor, and written. Once the number of items read equals the commit interval, the entire chunk is written out via the ItemWriter, and then the transaction is committed.

Basically, this feature should be used if at least one data item’ s reading and writing is required. Otherwise, TaskletStep Oriented processing can be used if the data item’ s only reading or writing is required.

Chunk Oriented Processing model exposes three important interface as ItemReader, ItemProcessor and ItemWriter via org.springframework.batch.item package.

  • ItemReader : This interface is used for providing the data. It reads the data which will be processed.
  • ItemProcessor : This interface is used for item transformation. It processes input object and transforms to output object.
  • ItemWriter : This interface is used for generic output operations. It writes the datas which are transformed by ItemProcessor. For example, the datas can be written to database, memory or outputstream (etc). In this sample application, we will write to database.

Let us take a look how to develop Chunk Oriented Processing Model.

Used Technologies :

  • JDK 1.7.0_09
  • Spring 3.1.3
  • Spring Batch 2.1.9
  • Hibernate 4.1.8
  • Tomcat JDBC 7.0.27
  • MySQL 5.5.8
  • MySQL Connector 5.1.17
  • Maven 3.0.4

STEP 1 : CREATE MAVEN PROJECT

A maven project is created as below. (It can be created by using Maven or IDE Plug-in).

STEP 2 : LIBRARIES

A new USER Table is created by executing below script:

CREATE TABLE ONLINETECHVISION.USER (
   id int(11) NOT NULL AUTO_INCREMENT,
   name varchar(45) NOT NULL,
   surname varchar(45) NOT NULL,
   PRIMARY KEY (`id`)
);

STEP 3 : LIBRARIES

Firstly, dependencies are added to Maven’ s pom.xml.

  <properties>
      <spring.version>3.1.3.RELEASE</spring.version>
      <spring-batch.version>2.1.9.RELEASE</spring-batch.version>
  </properties>

  <dependencies>

     <dependency>
         <groupId>org.springframework</groupId>
         <artifactId>spring-core</artifactId>
         <version>${spring.version}</version>
     </dependency>

     <dependency>
         <groupId>org.springframework</groupId>
         <artifactId>spring-context</artifactId>
         <version>${spring.version}</version>
     </dependency>    

	 <dependency>
		<groupId>org.springframework</groupId>
		<artifactId>spring-tx</artifactId>
		<version>${spring.version}</version>
	 </dependency>

	 <dependency>
		<groupId>org.springframework</groupId>
		<artifactId>spring-orm</artifactId>
		<version>${spring.version}</version>
	 </dependency>

	 <dependency>
		<groupId>org.springframework.batch</groupId>
		<artifactId>spring-batch-core</artifactId>
		<version>${spring-batch.version}</version>
	 </dependency>

	 <!-- Hibernate dependencies -->
     <dependency>
        <groupId>org.hibernate</groupId>
        <artifactId>hibernate-core</artifactId>
        <version>4.1.8.Final</version>
     </dependency>

	  <!-- Tomcat DBCP -->
	 <dependency>
		<groupId>org.apache.tomcat</groupId>
		<artifactId>tomcat-jdbc</artifactId>
		<version>7.0.27</version>
	 </dependency>

	 <!-- MySQL Java Connector library -->
	 <dependency>
		<groupId>mysql</groupId>
		<artifactId>mysql-connector-java</artifactId>
		<version>5.1.17</version>
	 </dependency>

	 <!-- Log4j library -->
	 <dependency>
		<groupId>log4j</groupId>
		<artifactId>log4j</artifactId>
		<version>1.2.16</version>
	 </dependency>

  </dependencies>

maven-compiler-plugin(Maven Plugin) is used to compile the project with JDK 1.7

	<plugin>
		<groupId>org.apache.maven.plugins</groupId>
		<artifactId>maven-compiler-plugin</artifactId>
		<version>3.0</version>
		<configuration>
		  <source>1.7</source>
		  <target>1.7</target>
		</configuration>
	</plugin>

The following Maven plugin can be used to create runnable-jar,

	<plugin>
		<groupId>org.apache.maven.plugins</groupId>
		<artifactId>maven-shade-plugin</artifactId>
		<version>2.0</version>

		<executions>
			<execution>
				<phase>package</phase>
				<goals>
					<goal>shade</goal>
				</goals>
				<configuration>
					<configuration>
			          <source>1.7</source>
			          <target>1.7</target>
			        </configuration>
					<transformers>
						<transformer
							implementation='org.apache.maven.plugins.shade.resource.
ManifestResourceTransformer'>
							<mainClass>com.onlinetechvision.exe.Application</mainClass>
						</transformer>
						<transformer
							implementation='org.apache.maven.plugins.shade.resource.
AppendingTransformer'>
							<resource>META-INF/spring.handlers</resource>
						</transformer>
						<transformer
							implementation='org.apache.maven.plugins.shade.resource.
AppendingTransformer'>
							<resource>META-INF/spring.schemas</resource>
						</transformer>
					</transformers>
				</configuration>
			</execution>
		</executions>
	</plugin>

STEP 4 : CREATE User ENTITY

User Entity is created. This entity will be stored after processing.

package com.onlinetechvision.user;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;

/**
 * User Entity
 *
 * @author onlinetechvision.com
 * @since 10 Dec 2012
 * @version 1.0.0
 *
 */
@Entity
@Table(name='USER')
public class User {

    private int id;
    private String name;
    private String surname;

    @Id
    @GeneratedValue(strategy=GenerationType.AUTO)
    @Column(name='ID', unique = true, nullable = false)
    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    @Column(name='NAME', unique = true, nullable = false)
    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Column(name='SURNAME', unique = true, nullable = false)
    public String getSurname() {
        return surname;
    }

    public void setSurname(String surname) {
        this.surname = surname;
    }   

    @Override
    public String toString() {
        StringBuffer strBuff = new StringBuffer();
        strBuff.append('id : ').append(getId());
        strBuff.append(', name : ').append(getName());
        strBuff.append(', surname : ').append(getSurname());
        return strBuff.toString();
    }
}

STEP 5 : CREATE IUserDAO INTERFACE

IUserDAO Interface is created to expose data access functionality.

package com.onlinetechvision.user.dao;

import java.util.List;

import com.onlinetechvision.user.User;

/**
 * User DAO Interface
 *
 * @author onlinetechvision.com
 * @since 10 Dec 2012
 * @version 1.0.0
 *
 */
public interface IUserDAO {

    /**
     * Adds User
     *
     * @param  User user
     */
    void addUser(User user);

    /**
     * Gets User List
     *
     */
    List<User> getUsers();
}

STEP 6 : CREATE UserDAO IMPL

UserDAO Class is created by implementing IUserDAO Interface.

package com.onlinetechvision.user.dao;

import java.util.List;

import org.hibernate.SessionFactory;

import com.onlinetechvision.user.User;

/**
 * User DAO
 *
 * @author onlinetechvision.com
 * @since 10 Dec 2012
 * @version 1.0.0
 *
 */
public class UserDAO implements IUserDAO {

    private SessionFactory sessionFactory;

    /**
     * Gets Hibernate Session Factory
     *
     * @return SessionFactory - Hibernate Session Factory
     */
    public SessionFactory getSessionFactory() {
        return sessionFactory;
    }

    /**
     * Sets Hibernate Session Factory
     *
     * @param SessionFactory - Hibernate Session Factory
     */
    public void setSessionFactory(SessionFactory sessionFactory) {
        this.sessionFactory = sessionFactory;
    }

    /**
     * Adds User
     *
     * @param  User user
     */
    @Override
    public void addUser(User user) {
        getSessionFactory().getCurrentSession().save(user);
    }

    /**
     * Gets User List
     *
     * @return List - User list
     */
    @SuppressWarnings({ 'unchecked' })
	@Override
    public List<User> getUsers() {
        List<User> list = getSessionFactory().getCurrentSession().createQuery('from User').list();
        return list;
    }

}

STEP 7 : CREATE IUserService INTERFACE

IUserService Interface is created for service layer.

package com.onlinetechvision.user.service;

import java.util.List;

import com.onlinetechvision.user.User;

/**
 *
 * User Service Interface
 *
 * @author onlinetechvision.com
 * @since 10 Dec 2012
 * @version 1.0.0
 *
 */
public interface IUserService {

    /**
     * Adds User
     *
     * @param  User user
     */
    void addUser(User user);

    /**
     * Gets User List
     *
     * @return List - User list
     */
    List<User> getUsers();
}

STEP 8 : CREATE UserService IMPL

UserService Class is created by implementing IUserService Interface.

package com.onlinetechvision.user.service;

import java.util.List;

import org.springframework.transaction.annotation.Transactional;

import com.onlinetechvision.user.User;
import com.onlinetechvision.user.dao.IUserDAO;

/**
 *
 * User Service
 *
 * @author onlinetechvision.com
 * @since 10 Dec 2012
 * @version 1.0.0
 *
 */
@Transactional(readOnly = true)
public class UserService implements IUserService {

    IUserDAO userDAO;

    /**
     * Adds User
     *
     * @param  User user
     */
    @Transactional(readOnly = false)
    @Override
    public void addUser(User user) {
        getUserDAO().addUser(user);
    }

    /**
     * Gets User List
     *
     */
    @Override
    public List<User> getUsers() {
        return getUserDAO().getUsers();
    }

    public IUserDAO getUserDAO() {
        return userDAO;
    }

    public void setUserDAO(IUserDAO userDAO) {
        this.userDAO = userDAO;
    }
}

STEP 9 : CREATE TestReader IMPL

TestReader Class is created by implementing ItemReader Interface. This class is called in order to read items. When read method returns null, reading operation is completed. The following steps explains with details how to be executed firstJob.

The commit-interval value of firstjob is 2 and the following steps are executed :

1) firstTestReader is called to read first item(firstname_0, firstsurname_0)
2) firstTestReader is called again to read second item(firstname_1, firstsurname_1)
3) testProcessor is called to process first item(FIRSTNAME_0, FIRSTSURNAME_0)
4) testProcessor is called to process second item(FIRSTNAME_1, FIRSTSURNAME_1)
5) testWriter is called to write first item(FIRSTNAME_0, FIRSTSURNAME_0) to database
6) testWriter is called to write second item(FIRSTNAME_1, FIRSTSURNAME_1) to database
7) first and second items are committed and the transaction is closed.
firstTestReader is called to read third item(firstname_2, firstsurname_2)
9) maxIndex value of firstTestReader is 3. read method returns null and item reading operation is completed.
10) testProcessor is called to process third item(FIRSTNAME_2, FIRSTSURNAME_2)
11) testWriter is called to write first item(FIRSTNAME_2, FIRSTSURNAME_2) to database
12) third item is committed and the transaction is closed.

firstStep is completed with COMPLETED status and secondStep is started. secondJob and thirdJob are executed in the same way.

package com.onlinetechvision.item;

import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;

import com.onlinetechvision.user.User;

/**
 * TestReader Class is created to read items which will be processed
 *
 * @author onlinetechvision.com
 * @since 10 Dec 2012
 * @version 1.0.0
 *
 */
public class TestReader implements ItemReader<User> {
	private int index;
	private int maxIndex;
	private String namePrefix;
	private String surnamePrefix;

	/**
     * Reads items one by one
     *
     * @return User
     *
     * @throws Exception
     * @throws UnexpectedInputException
     * @throws ParseException
     * @throws NonTransientResourceException
     *
     */
	@Override
	public User read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
		User user = new User();
		user.setName(getNamePrefix() + '_' + index);
		user.setSurname(getSurnamePrefix() + '_' + index);

		if(index > getMaxIndex()) {
			return null;
		}

		incrementIndex();

		return user;
	}

	/**
     * Increments index which defines read-count
     *
     * @return int
     *
     */
	private int incrementIndex() {
		return index++;
	}

	public int getMaxIndex() {
		return maxIndex;
	}

	public void setMaxIndex(int maxIndex) {
		this.maxIndex = maxIndex;
	}

	public String getNamePrefix() {
		return namePrefix;
	}

	public void setNamePrefix(String namePrefix) {
		this.namePrefix = namePrefix;
	}

	public String getSurnamePrefix() {
		return surnamePrefix;
	}

	public void setSurnamePrefix(String surnamePrefix) {
		this.surnamePrefix = surnamePrefix;
	}

}

STEP 10 : CREATE FailedCaseTestReader IMPL

FailedCaseTestReader Class is created in order to simulate the failed job status. In this sample application, when thirdJob is processed at fifthStep, failedCaseTestReader is called and exception is thrown so its status will be FAILED.

package com.onlinetechvision.item;

import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;

import com.onlinetechvision.user.User;

/**
 * FailedCaseTestReader Class is created in order to simulate the failed job status.
 *
 * @author onlinetechvision.com
 * @since 10 Dec 2012
 * @version 1.0.0
 *
 */
public class FailedCaseTestReader implements ItemReader<User> {
	private int index;
	private int maxIndex;
	private String namePrefix;
	private String surnamePrefix;

	/**
     * Reads items one by one
     *
     * @return User
     *
     * @throws Exception
     * @throws UnexpectedInputException
     * @throws ParseException
     * @throws NonTransientResourceException
     *
     */
	@Override
	public User read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
		User user = new User();
		user.setName(getNamePrefix() + '_' + index);
		user.setSurname(getSurnamePrefix() + '_' + index);

		if(index >= getMaxIndex()) {
			throw new Exception('Unexpected Error!');
		}

		incrementIndex();

		return user;
	}

	/**
     * Increments index which defines read-count
     *
     * @return int
     *
     */
	private int incrementIndex() {
		return index++;
	}

	public int getMaxIndex() {
		return maxIndex;
	}

	public void setMaxIndex(int maxIndex) {
		this.maxIndex = maxIndex;
	}

	public String getNamePrefix() {
		return namePrefix;
	}

	public void setNamePrefix(String namePrefix) {
		this.namePrefix = namePrefix;
	}

	public String getSurnamePrefix() {
		return surnamePrefix;
	}

	public void setSurnamePrefix(String surnamePrefix) {
		this.surnamePrefix = surnamePrefix;
	}

}

STEP 11 : CREATE TestProcessor IMPL

TestProcessor Class is created by implementing ItemProcessor Interface. This class is called to process items. User item is received from TestReader, processed and returned to TestWriter.

package com.onlinetechvision.item;

import java.util.Locale;

import org.springframework.batch.item.ItemProcessor;

import com.onlinetechvision.user.User;

/**
 * TestProcessor Class is created to process items.
 *
 * @author onlinetechvision.com
 * @since 10 Dec 2012
 * @version 1.0.0
 *
 */
public class TestProcessor implements ItemProcessor<User, User>  {

	/**
     * Processes items one by one
     *
     * @param User user
     * @return User
     * @throws Exception
     *
     */
	@Override
	public User process(User user) throws Exception {
		user.setName(user.getName().toUpperCase(Locale.ENGLISH));
		user.setSurname(user.getSurname().toUpperCase(Locale.ENGLISH));
		return user;
	}

}

STEP 12 : CREATE TestWriter IMPL

TestWriter Class is created by implementing ItemWriter Interface. This class is called to write items to DB, memory etc…

package com.onlinetechvision.item;

import java.util.List;

import org.springframework.batch.item.ItemWriter;

import com.onlinetechvision.user.User;
import com.onlinetechvision.user.service.IUserService;

/**
 * TestWriter Class is created to write items to DB, memory etc...
 *
 * @author onlinetechvision.com
 * @since 10 Dec 2012
 * @version 1.0.0
 *
 */
public class TestWriter implements ItemWriter<User> {

	private IUserService userService;

	/**
     * Writes items via list
     *
     * @throws Exception
     *
     */
	@Override
	public void write(List<? extends User> userList) throws Exception {
		for(User user : userList) {
			getUserService().addUser(user);
		}
		System.out.println('User List : ' + getUserService().getUsers());
	}

	public IUserService getUserService() {
		return userService;
	}

	public void setUserService(IUserService userService) {
		this.userService = userService;
	}

}

STEP 13 : CREATE FailedStepTasklet CLASS

FailedStepTasklet is created by implementing Tasklet Interface. It illustrates business logic in failed step.

package com.onlinetechvision.tasklet;

import org.apache.log4j.Logger;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;

/**
 * FailedStepTasklet Class illustrates a failed job.
 *
 * @author onlinetechvision.com
 * @since 10 Dec 2012
 * @version 1.0.0
 *
 */
public class FailedStepTasklet implements Tasklet {

	private static final Logger logger = Logger.getLogger(FailedStepTasklet.class);

    private String taskResult;

    /**
     * Executes FailedStepTasklet
     *
     * @param StepContribution stepContribution
     * @param ChunkContext chunkContext
     * @return RepeatStatus
     * @throws Exception
     *
     */
    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
    	logger.debug('Task Result : ' + getTaskResult());
    	throw new Exception('Error occurred!');
	}

	public String getTaskResult() {
		return taskResult;
	}

	public void setTaskResult(String taskResult) {
		this.taskResult = taskResult;
	} 

}

STEP 14 : CREATE BatchProcessStarter CLASS

BatchProcessStarter Class is created to launch the jobs. Also, it logs their execution results.

package com.onlinetechvision.spring.batch;

import org.apache.log4j.Logger;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.JobRestartException;

/**
 * BatchProcessStarter Class launches the jobs and logs their execution results.
 *
 * @author onlinetechvision.com
 * @since 10 Dec 2012
 * @version 1.0.0
 *
 */
public class BatchProcessStarter {

	private static final Logger logger = Logger.getLogger(BatchProcessStarter.class);

	private Job firstJob;
	private Job secondJob;
	private Job thirdJob;
	private JobLauncher jobLauncher;
	private JobRepository jobRepository;

	/**
     * Starts the jobs and logs their execution results.
     *
     */
	public void start() {
		JobExecution jobExecution = null;
		JobParametersBuilder builder = new JobParametersBuilder();

		try {
			getJobLauncher().run(getFirstJob(), builder.toJobParameters());
			jobExecution = getJobRepository().getLastJobExecution(getFirstJob().getName(), builder.toJobParameters());
			logger.debug(jobExecution.toString());			

			getJobLauncher().run(getSecondJob(), builder.toJobParameters());
			jobExecution = getJobRepository().getLastJobExecution(getSecondJob().getName(), builder.toJobParameters());
			logger.debug(jobExecution.toString());

			getJobLauncher().run(getThirdJob(), builder.toJobParameters());
			jobExecution = getJobRepository().getLastJobExecution(getThirdJob().getName(), builder.toJobParameters());
			logger.debug(jobExecution.toString());

		} catch (JobExecutionAlreadyRunningException
					| JobRestartException
					| JobInstanceAlreadyCompleteException
					| JobParametersInvalidException e) {
			logger.error(e);
		}

	}	

	public Job getFirstJob() {
		return firstJob;
	}

	public void setFirstJob(Job firstJob) {
		this.firstJob = firstJob;
	}

	public Job getSecondJob() {
		return secondJob;
	}

	public void setSecondJob(Job secondJob) {
		this.secondJob = secondJob;
	}	

	public Job getThirdJob() {
		return thirdJob;
	}

	public void setThirdJob(Job thirdJob) {
		this.thirdJob = thirdJob;
	}

	public JobLauncher getJobLauncher() {
		return jobLauncher;
	}

	public void setJobLauncher(JobLauncher jobLauncher) {
		this.jobLauncher = jobLauncher;
	}

	public JobRepository getJobRepository() {
		return jobRepository;
	}

	public void setJobRepository(JobRepository jobRepository) {
		this.jobRepository = jobRepository;
	}	

}

STEP 15 : CREATE dataContext.xml

jdbc.properties, is created. It defines data-source informations and is read via dataContext.xml

jdbc.db.driverClassName=com.mysql.jdbc.Driver
jdbc.db.url=jdbc:mysql://localhost:3306/onlinetechvision
jdbc.db.username=root
jdbc.db.password=root
jdbc.db.initialSize=10
jdbc.db.minIdle=3
jdbc.db.maxIdle=10
jdbc.db.maxActive=10
jdbc.db.testWhileIdle=true
jdbc.db.testOnBorrow=true
jdbc.db.testOnReturn=true
jdbc.db.initSQL=SELECT 1 FROM DUAL
jdbc.db.validationQuery=SELECT 1 FROM DUAL
jdbc.db.timeBetweenEvictionRunsMillis=30000

STEP 16 : CREATE dataContext.xml

Spring Configuration file, dataContext.xml, is created. It covers dataSource, sessionFactory and transactionManager definitions.

<?xml version='1.0' encoding='UTF-8'?>
<beans xmlns='http://www.springframework.org/schema/beans'
	   xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'
	   xmlns:context='http://www.springframework.org/schema/context'
	   xmlns:p='http://www.springframework.org/schema/p'
	   xmlns:batch='http://www.springframework.org/schema/batch'
	   xmlns:tx='http://www.springframework.org/schema/tx'
	   xsi:schemaLocation='http://www.springframework.org/schema/beans 

http://www.springframework.org/schema/beans/spring-beans-3.0.xsd

http://www.springframework.org/schema/context

http://www.springframework.org/schema/context/spring-context-3.0.xsd

http://www.springframework.org/schema/batch

http://www.springframework.org/schema/batch/spring-batch-2.1.xsd

http://www.springframework.org/schema/tx

http://www.springframework.org/schema/tx/spring-tx-3.0.xsd'>

    <context:property-placeholder location='classpath:jdbc.properties'/>

    <!-- Enable the configuration of transactional behavior based on annotations -->
    <tx:annotation-driven transaction-manager='transactionManager'/>

    <!-- Data Source Declaration -->
	<bean id='dataSource' class='org.apache.tomcat.jdbc.pool.DataSource' destroy-method='close'
			    p:driverClassName='${jdbc.db.driverClassName}'
			    p:url='${jdbc.db.url}'
			    p:username='${jdbc.db.username}'
			    p:password='${jdbc.db.password}'
			    p:initialSize='${jdbc.db.initialSize}'
			    p:minIdle='${jdbc.db.minIdle}'
			    p:maxIdle='${jdbc.db.maxIdle}'
			    p:maxActive='${jdbc.db.maxActive}'
			    p:testWhileIdle='${jdbc.db.testWhileIdle}'
			    p:testOnBorrow='${jdbc.db.testOnBorrow}'
			    p:testOnReturn='${jdbc.db.testOnReturn}'
			    p:initSQL='${jdbc.db.initSQL}'
			    p:validationQuery='${jdbc.db.validationQuery}'
			    p:timeBetweenEvictionRunsMillis='${jdbc.db.timeBetweenEvictionRunsMillis}'/>	

    <!-- Session Factory Declaration -->
	<bean id='sessionFactory' class='org.springframework.orm.hibernate4.LocalSessionFactoryBean'>
		<property name='dataSource' ref='dataSource' />
		<property name='annotatedClasses'>
			<list>
				<value>com.onlinetechvision.user.User</value>
			</list>
		</property>
		<property name='hibernateProperties'>
			<props>
				<prop key='hibernate.dialect'>org.hibernate.dialect.MySQLDialect</prop>
				<prop key='hibernate.show_sql'>true</prop>
			</props>
		</property>
	</bean>

	<!-- Transaction Manager Declaration -->
    <bean id='transactionManager' class='org.springframework.orm.hibernate4.HibernateTransactionManager'>
       <property name='sessionFactory' ref='sessionFactory'/>
    </bean>

</beans>

STEP 17 : CREATE jobContext.xml

Spring Configuration file, jobContext.xml, is created. It covers jobRepository, jobLauncher, item reader, item processor, item writer, tasklet and job definitions.

<?xml version='1.0' encoding='UTF-8'?>
<beans xmlns='http://www.springframework.org/schema/beans'
	xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'
	xmlns:batch='http://www.springframework.org/schema/batch'
	xsi:schemaLocation='http://www.springframework.org/schema/beans 

http://www.springframework.org/schema/beans/spring-beans-3.0.xsd

http://www.springframework.org/schema/batch

http://www.springframework.org/schema/batch/spring-batch-2.1.xsd'>

    <import resource='dataContext.xml'/>

    <!-- jobRepository Declaration -->
    <bean id='jobRepository' class='org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean'>
		<property name='transactionManager' ref='transactionManager' />
    </bean>

    <!-- jobLauncher Declaration -->
    <bean id='jobLauncher' class='org.springframework.batch.core.launch.support.SimpleJobLauncher' >
        <property name='jobRepository' ref='jobRepository'/>
    </bean>

    <!-- Reader Bean Declarations -->
    <bean id='firstTestReader' class='com.onlinetechvision.item.TestReader'>
    	<property name='maxIndex' value='2'/>
	    <property name='namePrefix' value='firstname'/>
	    <property name='surnamePrefix' value='firstsurname'/>
    </bean>

    <bean id='secondTestReader' class='com.onlinetechvision.item.TestReader'>
        <property name='maxIndex' value='2'/>
	    <property name='namePrefix' value='secondname'/>
	    <property name='surnamePrefix' value='secondsurname'/>
    </bean>

    <bean id='thirdTestReader' class='com.onlinetechvision.item.TestReader'>
        <property name='maxIndex' value='3'/>
	    <property name='namePrefix' value='thirdname'/>
	    <property name='surnamePrefix' value='thirdsurname'/>
    </bean>

    <bean id='fourthTestReader' class='com.onlinetechvision.item.TestReader'>
        <property name='maxIndex' value='3'/>
	    <property name='namePrefix' value='fourthname'/>
	    <property name='surnamePrefix' value='fourthsurname'/>
    </bean>

    <bean id='fifthTestReader' class='com.onlinetechvision.item.TestReader'>
        <property name='maxIndex' value='3'/>
	    <property name='namePrefix' value='fifthname'/>
	    <property name='surnamePrefix' value='fifthsurname'/>
    </bean>

    <bean id='failedCaseTestReader' class='com.onlinetechvision.item.FailedCaseTestReader'>
        <property name='maxIndex' value='1'/>
	    <property name='namePrefix' value='failedcasename'/>
	    <property name='surnamePrefix' value='failedcasesurname'/>
    </bean>

    <!-- Processor Bean Declaration -->
    <bean id='testProcessor' class='com.onlinetechvision.item.TestProcessor' />

    <!-- Writer Bean Declaration -->
    <bean id='testWriter' class='com.onlinetechvision.item.TestWriter' >
    	<property name='userService' ref='userService'/>
    </bean>

    <!-- Failed Step Tasklet Declaration -->
    <bean id='failedStepTasklet' class='com.onlinetechvision.tasklet.FailedStepTasklet'>
        <property name='taskResult'  value='Error occurred!' />
    </bean> 

    <!-- Batch Job Declarations -->
    <batch:job id='firstJob'>
		<batch:step id='firstStep' next='secondStep'>
			<batch:tasklet>
				<batch:chunk reader='firstTestReader' processor='testProcessor' writer='testWriter' commit-interval='2'/>
			</batch:tasklet>
		</batch:step>
		<batch:step id='secondStep'>
			<batch:tasklet>
				<batch:chunk reader='secondTestReader' processor='testProcessor' writer='testWriter' commit-interval='2'/>
			</batch:tasklet>
		</batch:step>
	</batch:job>

	<batch:job id='secondJob'>
		<batch:step id='thirdStep'>
			<batch:tasklet>
				<batch:chunk reader='thirdTestReader' processor='testProcessor' writer='testWriter' commit-interval='2'/>
			</batch:tasklet>
			<batch:next on='*' to='fourthStep' />
	        <batch:next on='FAILED' to='firstFailedStep' />
	    </batch:step>
	    <batch:step id='fourthStep'>
			<batch:tasklet>
				<batch:chunk reader='fourthTestReader' processor='testProcessor' writer='testWriter' commit-interval='2'/>
			</batch:tasklet>
		</batch:step>
		<batch:step id='firstFailedStep'>
            <batch:tasklet ref='failedStepTasklet' />
        </batch:step>
	</batch:job>

	<batch:job id='thirdJob'>
		<batch:step id='fifthStep'>
			<batch:tasklet>
				<batch:chunk reader='failedCaseTestReader' processor='testProcessor' writer='testWriter' commit-interval='2'/>
			</batch:tasklet>
			<batch:next on='*' to='sixthStep' />
	        <batch:next on='FAILED' to='secondFailedStep' />
	    </batch:step>
	    <batch:step id='sixthStep'>
			<batch:tasklet>
				<batch:chunk reader='fifthTestReader' processor='testProcessor' writer='testWriter' commit-interval='2'/>
			</batch:tasklet>
		</batch:step>
		<batch:step id='secondFailedStep'>
            <batch:tasklet ref='failedStepTasklet' />
        </batch:step>
	</batch:job>

</beans>

STEP 18 : CREATE applicationContext.xml

Spring Configuration file, applicationContext.xml, is created. It covers bean definitions.

<?xml version='1.0' encoding='UTF-8'?>
<beans xmlns='http://www.springframework.org/schema/beans'
	xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'
	xmlns:batch='http://www.springframework.org/schema/batch'
	xsi:schemaLocation='http://www.springframework.org/schema/beans 

http://www.springframework.org/schema/beans/spring-beans-3.0.xsd

http://www.springframework.org/schema/batch

http://www.springframework.org/schema/batch/spring-batch-2.1.xsd'>

	<import resource='jobContext.xml'/>

    <!-- User DAO Declaration -->
	<bean id='userDAO' class='com.onlinetechvision.user.dao.UserDAO'>
		<property name='sessionFactory' ref='sessionFactory' />
	</bean>

	<!-- User Service Declaration -->
	<bean id='userService' class='com.onlinetechvision.user.service.UserService'>
		<property name='userDAO' ref='userDAO' />
	</bean>	

	<!-- BatchProcessStarter Declaration -->
	<bean id='batchProcessStarter' class='com.onlinetechvision.spring.batch.BatchProcessStarter'>
		<property name='jobLauncher' ref='jobLauncher'/>
		<property name='jobRepository' ref='jobRepository'/>
		<property name='firstJob' ref='firstJob'/>
		<property name='secondJob' ref='secondJob'/>
		<property name='thirdJob' ref='thirdJob'/>
    </bean> 

</beans>

STEP 19 : CREATE Application CLASS

Application Class is created to run the application.

package com.onlinetechvision.exe;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import com.onlinetechvision.spring.batch.BatchProcessStarter;

/**
 * Application Class starts the application.
 *
 * @author onlinetechvision.com
 * @since 10 Dec 2012
 * @version 1.0.0
 *
 */
public class Application {

	/**
     * Starts the application
     *
     * @param  String[] args
     *
     */
	public static void main(String[] args) {
		ApplicationContext appContext = new ClassPathXmlApplicationContext('applicationContext.xml');
		BatchProcessStarter batchProcessStarter = (BatchProcessStarter)appContext.getBean('batchProcessStarter');
		batchProcessStarter.start();
	}

}

STEP 20 : BUILD PROJECT

After OTV_SpringBatch_Chunk_Oriented_Processing Project is built, OTV_SpringBatch_Chunk_Oriented_Processing-0.0.1-SNAPSHOT.jar will be created.

STEP 21 : RUN PROJECT

After created OTV_SpringBatch_Chunk_Oriented_Processing-0.0.1-SNAPSHOT.jar file is run, the following database and console output logs will be shown :

Database screenshot :

First Job’ s console output :

16.12.2012 19:30:41  INFO (SimpleJobLauncher.java:118) - Job: [FlowJob: [name=firstJob]] launched with the following parameters: [{}]

16.12.2012 19:30:41 DEBUG (AbstractJob.java:278) - Job execution starting: JobExecution: id=0, version=0, startTime=null, endTime=null, lastUpdated=Sun Dec 16 19:30:41 GMT 2012, status=STARTING, exitStatus=exitCode=UNKNOWN;exitDescription=, job=[JobInstance: id=0, version=0, JobParameters=[{}], Job=[firstJob]]

User List : [id : 181, name : FIRSTNAME_0, surname : FIRSTSURNAME_0, id : 182, name : FIRSTNAME_1, surname : FIRSTSURNAME_1, id : 183, name : FIRSTNAME_2, surname : FIRSTSURNAME_2, id : 184, name : SECONDNAME_0, surname : SECONDSURNAME_0, id : 185, name : SECONDNAME_1, surname : SECONDSURNAME_1, id : 186, name : SECONDNAME_2, surname : SECONDSURNAME_2]

16.12.2012 19:30:42 DEBUG (BatchProcessStarter.java:43) - JobExecution: id=0, version=2, startTime=Sun Dec 16 19:30:41 GMT 2012, endTime=Sun Dec 16 19:30:42 GMT 2012, lastUpdated=Sun Dec 16 19:30:42 GMT 2012, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=0, version=0, JobParameters=[{}], Job=[firstJob]]

Second Job’ s console output :

16.12.2012 19:30:42  INFO (SimpleJobLauncher.java:118) - Job: [FlowJob: [name=secondJob]] launched with the following parameters: [{}]

16.12.2012 19:30:42 DEBUG (AbstractJob.java:278) - Job execution starting: JobExecution: id=1, version=0, startTime=null, endTime=null, lastUpdated=Sun Dec 16 19:30:42 GMT 2012, status=STARTING, exitStatus=exitCode=UNKNOWN;exitDescription=, job=[JobInstance: id=1, version=0, JobParameters=[{}], Job=[secondJob]]

User List : [id : 181, name : FIRSTNAME_0, surname : FIRSTSURNAME_0, id : 182, name : FIRSTNAME_1, surname : FIRSTSURNAME_1, id : 183, name : FIRSTNAME_2, surname : FIRSTSURNAME_2, id : 184, name : SECONDNAME_0, surname : SECONDSURNAME_0, id : 185, name : SECONDNAME_1, surname : SECONDSURNAME_1, id : 186, name : SECONDNAME_2, surname : SECONDSURNAME_2, id : 187, name : THIRDNAME_0, surname : THIRDSURNAME_0, id : 188, name : THIRDNAME_1, surname : THIRDSURNAME_1, id : 189, name : THIRDNAME_2, surname : THIRDSURNAME_2, id : 190, name : THIRDNAME_3, surname : THIRDSURNAME_3, id : 191, name : FOURTHNAME_0, surname : FOURTHSURNAME_0, id : 192, name : FOURTHNAME_1, surname : FOURTHSURNAME_1, id : 193, name : FOURTHNAME_2, surname : FOURTHSURNAME_2, id : 194, name : FOURTHNAME_3, surname : FOURTHSURNAME_3]

16.12.2012 19:30:42 DEBUG (BatchProcessStarter.java:47) - JobExecution: id=1, version=2, startTime=Sun Dec 16 19:30:42 GMT 2012, endTime=Sun Dec 16 19:30:42 GMT 2012, lastUpdated=Sun Dec 16 19:30:42 GMT 2012, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=1, version=0, JobParameters=[{}], Job=[secondJob]]

Third Job’ s console output :

16.12.2012 19:30:42  INFO (SimpleJobLauncher.java:118) - Job: [FlowJob: [name=thirdJob]] launched with the following parameters: [{}]

16.12.2012 19:30:42 DEBUG (AbstractJob.java:278) - Job execution starting: JobExecution: id=2, version=0, startTime=null, endTime=null, lastUpdated=Sun Dec 16 19:30:42 GMT 2012, status=STARTING, exitStatus=exitCode=UNKNOWN;exitDescription=, job=[JobInstance: id=2, version=0, JobParameters=[{}], Job=[thirdJob]]

16.12.2012 19:30:42 DEBUG (TransactionTemplate.java:159) - Initiating transaction rollback on application exception
org.springframework.batch.repeat.RepeatException: Exception in batch process; nested exception is java.lang.Exception: Unexpected Error!
...

16.12.2012 19:30:43 DEBUG (BatchProcessStarter.java:51) - JobExecution: id=2, version=2, startTime=Sun Dec 16 19:30:42 GMT 2012, endTime=Sun Dec 16 19:30:43 GMT 2012, lastUpdated=Sun Dec 16 19:30:43 GMT 2012, status=FAILED, exitStatus=exitCode=FAILED;exitDescription=, job=[JobInstance: id=2, version=0, JobParameters=[{}], Job=[thirdJob]]

STEP 22 : DOWNLOAD
https://github.com/erenavsarogullari/OTV_SpringBatch_Chunk_Oriented_Processing

Resources:

Chunk Oriented Processing in Spring Batch
 

Reference: Chunk Oriented Processing in Spring Batch from our JCG partner Eren Avsarogullari at the Online Technology Vision blog.

Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button