Processing large data sets is often a complex activity that software developers have to deal with. Developers require advanced tools and technologies to process large chunks of data sets. Although, there are quite a few tools available to process large data sets, Spring Batch stands out from the competition. Spring Batch is a lightweight and robust framework that can be easily integrated with the Grails framework. This blog post explains how ‘chunk oriented processing’ can be integrated with ‘Spring Integration’ module.
Spring Batch Processing
Spring batch processing can be accomplished in two different ways.
- Chunk Oriented Processing
- Task Oriented Processing
This blog also explores the process of stopping a Spring Batch job effectively without compromising on performance, while executing big data sets. Further, it talks about restarting a failed/stopped batch job. Spring Integration provides a simple model for implementing complex enterprise integration solutions and facilitates asynchronous, message-driven behavior within a Spring-based application.
Chunk Oriented Processing
Chunk oriented processing refers to the process of reading data, one piece at a time and creating ‘chunks’ that are written to I/O stream within a transaction. In this process, an item is read from ‘ItemReader’ and handed over to an ‘ItemProcessor’ followed by ‘ItemWriter’. When the number of items read, matches commit interval the entire chunk is written out via ‘ItemWriter’ thereby committing the transaction.
Chunk oriented processing is applied when reading and writing is required for at least one data item. Subsequently, if reading/writing is required for data items then ‘TaskletStep Oriented Processing’ is used. The ‘Chunk Oriented Processing’ model has three important interfaces ‘ItemReader’, ‘ItemProcessor’ and ‘ItemWriter’, which are part of org.springframework.batch.item package.
- ItemReader – The ‘ItemReader’ interface is used for providing data; it reads the data to be processed.
- ItemProcessor – The ‘ItemProcessor’ interface is used for item transformation; it processes input object from ‘ItemReader’, transforming it to an output object.
- ItemWriter – The ‘ItemWriter’ interface is used for generic output operations. It writes the data, which is transformed by ‘ItemProcessor’ i.e. data that is written to a database, memory, or output stream.
To provide clarity on Spring Batch integration, we have illustrated a sample application along with code, which we will write to a Microsoft Excel spreadsheet. Here are the steps and code that has been written in Grails 2.4 version. Further, the configuration/steps can easily function in any framework similar to Spring with minimal changes.
Step 1 – Resource.xml
The resource.xml defines all Spring Batch beans such as transaction manager and job repository to establishing a job context. Further, the message channel, message handler, message endpoint beans are defined for the message queue. Subsequently, bean job explorer and job registry are used to stop a running job and restart any failed/stopped jobs.
[xml] <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc" xmlns:batch="http://www.springframework.org/schema/batch" xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:task="http://www.springframework.org/schema/task" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration/jdbc http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc-4.1.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.1.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-3.0.xsd http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-4.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd"> <!-- ************************************************** *Spring Batch stuff ************************************************** --> <bean id="batchTxManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="dataSource" ref="dataSource_rimdb" /> </bean> <bean id="batchJobRepository" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean"> <property name="dataSource" ref="dataSource_rimdb" /> <property name="transactionManager" ref="batchTxManager" /> <property name="isolationLevelForCreate" value="ISOLATION_READ_COMMITTED" /> <property name="validateTransactionState" value="false" /> </bean> <!-- ************************************************** * Spring Integration stuff ************************************************** --> <bean id="msgStore" class="org.springframework.integration.jdbc.store.JdbcChannelMessageStore"> <property name="dataSource" ref="dataSource_rimdb"/> <property name="channelMessageStoreQueryProvider" ref="msgStoreQueryProvider" /> </bean> <!-- See also http://docs.spring.io/spring-integration/reference/htmlsingle/#channel-configuration-queuechannel and http://docs.spring.io/spring-integration/docs/latest-ga/reference/html/system-management-chapter.html#message-store --> <int:channel id="msgStoreChannel"> <int:queue message-store="msgStore"/> </int:channel> <bean id="msgTxManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="dataSource" ref="dataSource_rimdb" /> </bean> <bean id="msgHandler" class="com.test.batch.report.ReportJobDispatcher" /> <!-- We tie together the filter and polling consumer in a chain so we need not define additional channels. --> <int:chain id="msgEndpointChain" input-channel="msgStoreChannel"> <!-- Messages that are discarded becaus the thread pool is saturated will be queued again by setting the discard-channel to our input message queue --> <int:filter id="msgFilter" discard-channel="msgStoreChannel"> <bean class="com.test.batch.integration.MsgFilter" /> </int:filter> <!-- See also http://docs.spring.io/spring-integration/docs/latest-ga/reference/html/messaging-endpoints-chapter.html#endpoint Adjust as needed. max-messages-per-poll can also be increased, each message is still handled within its own transaction See also http://docs.spring.io/spring-integration/docs/latest-ga/reference/html/transactions.html#transaction-poller --> <int:poller max-messages-per-poll="1" fixed-rate="1000"> <int:transactional transaction-manager="msgTxManager" propagation="REQUIRED" isolation="DEFAULT" timeout="10000" read-only="false" /> </int:poller> <int:outbound-channel-adapter id="msgOutboundChannelAdapter" ref="msgHandler" method="handleMessage" /> </int:chain> <bean id="jobExplorer" class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean"> <property name="dataSource" ref="dataSource_rimdb" /> </bean> <bean id="jobRegistry" class="org.springframework.batch.core.configuration.support.MapJobRegistry" /> </beans> [/xml]
Step 2 – BatchReportConfig
BatchReportConfig is a simple class that is used to obtain any bean from the Grails context created in resource.xml. Further, it also helps in accessing any Grails bean outside the Grails context i.e. other than the Grails controller/service classes.
Another option is the convenience class, which can be used for accessing batch-related configuration. This restricts the rest of application from being made aware of the Spring configuration thus centralizing the access to config.groovy.
[xml]import org.codehaus.groovy.grails.web.mapping.LinkGenerator import org.springframework.batch.core.configuration.JobRegistry; import org.springframework.batch.core.explore.JobExplorer; import org.springframework.batch.core.launch.JobOperator; import org.springframework.batch.core.repository.JobRepository; import org.springframework.messaging.MessageChannel; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.transaction.PlatformTransactionManager; class BatchReportConfig { private static JobRepository jobRepository private static PlatformTransactionManager txManager private static MessageChannel msgChannel private static ThreadPoolTaskExecutor taskExecutor private static LinkGenerator linkGenerator private static ConfigObject configObj private static JobExplorer jobExplorer private static JobRegistry jobRegistry private BatchReportConfig() {} // **************************************************** // Convenience methods for Spring Beans // **************************************************** public static synchronized MessageChannel getMessageChannel() { if (!msgChannel) msgChannel = ApplicationContextHolder.getBean("msgStoreChannel") return msgChannel } public static synchronized PlatformTransactionManager getTxManager() { if (!txManager) txManager = ApplicationContextHolder.getBean("batchTxManager") return txManager } public static synchronized JobRepository getJobRepository() { if (!jobRepository) { jobRepository = ApplicationContextHolder.getBean("batchJobRepository") } return jobRepository } public static synchronized JobExplorer getJobExplorer() { if (!jobExplorer) jobExplorer = ApplicationContextHolder.getBean("jobExplorer") return jobExplorer } public static synchronized JobRegistry getJobRegistry() { if (!jobRegistry) jobRegistry = ApplicationContextHolder.getBean("jobRegistry") return jobRegistry } public static synchronized LinkGenerator getLinkGenerator() { if (!linkGenerator) linkGenerator = ApplicationContextHolder.getBean("grailsLinkGenerator") return linkGenerator } }[/xml]
Step 3 – ReportJobFactory
ReportJobFactory is an abstract class that declares abstract methods, so that they can be override/implemented in order to return appropriate ‘ItemReader’, ‘ItemProcessor’ and ‘ItemWriter’. A concrete method ‘buildJob’ is used to build a report job with job parameters and appropriate ‘ItemReader’, ‘ItemProcessor’ and ‘ItemWriter’ as specified in below ReportJob class. Additionally, the abstract class helps in creating a report job based on submitted job request type. However, this class must be subclassed for each new type of tool report.
[xml]import java.util.List import java.util.Map import com.test.batch.report.ReportJob import com.test.batch.report.ReportJobConfig import com.test.batch.integration.InputSource import org.springframework.batch.core.JobParameters import org.springframework.batch.item.ItemProcessor import org.springframework.batch.item.ItemReader import org.springframework.batch.item.ItemWriter abstract class ReportJobFactory { private JobParameters jobParams private ReportJobConfig jobConfig private List inputList public ReportJobFactory(JobParameters jobParams, ReportJobConfig jobConfig, List inputList) { assert jobParams && jobConfig this.jobParams = jobParams this.jobConfig = jobConfig this.inputList = inputList } protected abstract ItemReader newListItemReader(JobParameters jobParams, List inputList) protected abstract ItemReader newFileItemReader(JobParameters jobParams) protected abstract ItemProcessor newItemProcessor(JobParameters jobParams) protected abstract ItemWriter newItemWriter(JobParameters jobParams) public ReportJob buildJob() { ItemReader itemReader if (InputSource.LIST.toString () == jobParams.getString("inputSource")) itemReader = newListItemReader(jobParams, inputList) else itemReader = newFileItemReader(jobParams) ItemProcessor itemProcessor = newItemProcessor(jobParams) ItemWriter itemWriter = newItemWriter(jobParams) return new ReportJob(jobConfig, jobParams, itemReader, itemProcessor, itemWriter) } } [/xml]
Step 4 – ReportJob
Wrapper class is used to build a report tool that is specific to batch job/step configuration. Further, this class generalizes necessary functionality, thereby making it easy to add new tool reports i.e. Developers need not have exhaustive knowledge of Spring Batch.
The wrapper class builds two steps:
- First is the ‘report step’ which executes ‘ItemReader’, ‘ItemProcessor’ and ‘ItemWriter’ with a chunk size (i.e. commit interval) that is used to segregate data into chunks/pieces. Moreover, the chunk size/commit interval has a greater importance in stopping a running job.
- The second is an optional email notification sent to an email after completion of ‘report step’ process with a message ‘success or failure’.
[xml]import com.test.batch.integration.ReportDelivery import groovy.json.JsonBuilder import java.util.concurrent.Executors import org.apache.tomcat.jdbc.pool.DataSource import org.springframework.batch.core.Job import org.springframework.batch.core.JobExecution import org.springframework.batch.core.JobExecutionListener import org.springframework.batch.core.JobParameter import org.springframework.batch.core.JobParameters import org.springframework.batch.core.JobParametersBuilder import org.springframework.batch.core.Step import org.springframework.batch.core.job.SimpleJob import org.springframework.batch.core.job.builder.JobBuilder import org.springframework.batch.core.job.builder.SimpleJobBuilder import org.springframework.batch.core.launch.support.RunIdIncrementer import org.springframework.batch.core.repository.JobRepository import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean import org.springframework.batch.core.repository.support.SimpleJobRepository import org.springframework.batch.core.step.builder.SimpleStepBuilder import org.springframework.batch.core.step.builder.StepBuilder import org.springframework.batch.core.step.builder.TaskletStepBuilder import org.springframework.batch.core.step.tasklet.Tasklet import org.springframework.batch.core.step.tasklet.TaskletStep import org.springframework.batch.item.ItemProcessor import org.springframework.batch.item.ItemReader import org.springframework.batch.item.ItemWriter import org.springframework.batch.item.support.IteratorItemReader class ReportJob { private static final String REPORT_STEP_NAME = "report" private static final String NOTIFICATION_STEP_NAME = "notification" private static final String CLEANUP_STEP_NAME = "cleanup" private ReportJobConfig config private ItemReader itemReader private ItemProcessor itemProcessor private ItemWriter itemWriter private SimpleJob job private JobParameters jobParams public ReportJob(ReportJobConfig config, JobParameters jobParameters, ItemReader itemReader, ItemProcessor itemProcessor, ItemWriter itemWriter) { assert config && jobParameters && itemReader && itemProcessor && itemWriter this.config = config this.jobParams = jobParameters this.itemReader = itemReader this.itemProcessor = itemProcessor this.itemWriter = itemWriter job = buildJob() job.registerJobExecutionListener(new ReportJobExecutionListener()) } public Job getJob() { return job } public JobParameters getJobParameters() { return jobParams } public JobRepository getJobRepository() { return config.jobRepository } public String getToolName() { return jobParams.getString("toolName") } protected Step newReportStep() { SimpleStepBuilder stepBuilder = new StepBuilder(REPORT_STEP_NAME) ./*<String, Map>*/ chunk(jobParams.getLong("chunkSize").intValue()) // <input type, output type> .reader(itemReader) .writer(itemWriter) .processor(itemProcessor) .repository(config.jobRepository) .transactionManager(config.txManager) .startLimit(3) // give up after 3 tries TaskletStep step = stepBuilder.build() return step } protected Step newNotificationStep() { Tasklet tasklet = new MailNotificationTasklet(jobParams.getString("userEmail"),jobParams.getString("ccEmail"), jobParams.getString("toolDescr"), jobParams.getString("jobRef"), config.fileDir, Enum.valueOf(ReportDelivery, jobParams.getString("reportDelivery"))) TaskletStepBuilder stepBuilder = new StepBuilder(NOTIFICATION_STEP_NAME) .tasklet(tasklet) .repository(config.jobRepository) .transactionManager(config.txManager) TaskletStep step = stepBuilder.build() return step } protected SimpleJob buildJob() { SimpleJobBuilder jobBuilder = new JobBuilder(jobParams.getString("toolName")) .start(newReportStep()) .next(newNotificationStep()) .repository(config.jobRepository) .incrementer(new RunIdIncrementer()) // do we need this? return jobBuilder.build() } } [/xml]
Step 5 – ReportJobDispatcher
ReportJobDispatcher handles single job requests retrieved from a message queue. This is called from a polling outbound channel adapter in transaction contexts. If the method throws an exception (i.e. if the task executor is saturated) then message remains in the queue. Furthermore, it creates a new job instance from the given job request and instantly launches a job request. If the launch fails (i.e. due to executor threads being busy) then an execution is raised to retain message in the queue.
[xml]import com.test.batch.report.factory.ReportJobFactory import com.test.batch.report.factory.ReportJobFactoryFactory import com.test.batch.integration.ReportJobRequest import org.springframework.batch.core.JobParameters import groovy.json.JsonSlurper import org.springframework.messaging.Message import org.springframework.messaging.MessageHandler import org.springframework.messaging.MessagingException class ReportJobDispatcher implements MessageHandler { @Override public void handleMessage(Message<ReportJobRequest> message) throws MessagingException { assert message ReportJobRequest jobRequest = new ReportJobRequest(message.getPayload()) dispatch(jobRequest) } protected void dispatch(ReportJobRequest jobRequest) { assert jobRequest ReportJobConfig jobConfig = BatchReportConfig.newDefaultJobConfig() JobParameters jobParams = ReportJobParamsBuilder.build(jobRequest) ReportJobFactory reportJobFactory = ReportJobFactoryFactory.newReportJobFactory(jobParams, jobConfig, jobRequest.inputList) ReportJob reportJob = reportJobFactory.buildJob() if(jobRequest.jobExecutionId){ if(jobRequest.jobRquestType == "Restart"){ ReportJobLauncher.restart(reportJob, jobRequest.jobExecutionId) }else if(jobRequest.jobRquestType == "Stop"){ ReportJobLauncher.stop(reportJob, jobRequest.jobExecutionId) }else{ } }else{ ReportJobLauncher.submit(reportJob) } } } [/xml]
Step 6 – ReportJobFactoryFactory
This is a factory class used for creating a report job factory based on the tool name.
[xml]import com.test.batch.domainreports.gadatafeed.GaDataFeedReportJobFactory; import com.test.batch.domainreports.livesitereport.LiveSiteReportReportJobFactory; import com.test.batch.report.ReportJobConfig import org.springframework.batch.core.JobParameters class ReportJobFactoryFactory { private ReportJobFactoryFactory() {} public static ReportJobFactory newReportJobFactory(JobParameters jobParams, ReportJobConfig jobConfig, List inputList) { assert jobParams && jobConfig ReportJobFactory factory String toolName = jobParams.getString("toolName") switch(toolName) { case "abc": factory = new AbcReportJobFactory(jobParams, jobConfig, inputList) break case "xyz": factory = new XyzReportJobFactory(jobParams, jobConfig, inputList) break default: throw new Exception("Report factory for tool [${toolName}] not found") } assert factory return factory } } [/xml]
A new job factory instantiated/created in the above class extends to ‘ReportJobfactory’ and overrides all the methods to return appropriate ‘ItemReader’, ‘ItemProcessor’ and ‘ItemWriter’.
[xml]import java.util.List import java.util.Map import com.test.batch.report.PassthroughFlatFileItemReader import com.test.batch.report.ReportJobConfig import com.test.batch.report.factory.ReportJobFactory import org.springframework.batch.core.JobParameters import org.springframework.batch.item.ItemProcessor import org.springframework.batch.item.ItemReader import org.springframework.batch.item.ItemWriter import org.springframework.batch.item.support.IteratorItemReader class AbcReportJobFactory extends ReportJobFactory { public AbcReportJobFactory(JobParameters jobParams, ReportJobConfig jobConfig, List inputList) { super(jobParams, jobConfig, inputList) } @Override protected ItemReader newListItemReader(JobParameters jobParams, List inputList) { return new IteratorItemReader(inputList) } @Override protected ItemReader newFileItemReader(JobParameters jobParams) { PassthroughFlatFileItemReader<String> itemReader = new PassthroughFlatFileItemReader<String>() itemReader.setSaveState(false) return itemReader } @Override protected ItemProcessor newItemProcessor(JobParameters jobParams) { return new AbcItemProcessor() } @Override protected ItemWriter newItemWriter(JobParameters jobParams) { AbcLineItemWriter itemWriter = new AbcLineItemWriter() return itemWriter } } [/xml]
ItemReader
The ‘ItemReader’ that is returned here is either a generic ‘IteratorItemReader’ or a ‘FlatFileItemReader’, which is adjudged depending on the data input.
- If the input data is a list, then it returns ‘IteratorItemReader’
- If it is a text file, then it returns ‘FlatFileItemReader’.
ItemProcessor
The ‘ItemProcessor’ implements a generic ItemProcessor with string as an input, which in turn returns a data map. The implementation process takes place in the process method and ‘ItemProcessor’ is called for each and every entry returned from ‘Itemreader’.
[xml]import java.util.Map import org.springframework.batch.core.JobParameters import org.springframework.batch.core.StepExecution import org.springframework.batch.core.annotation.BeforeStep import org.springframework.batch.item.ItemProcessor class AbcItemProcessor implements ItemProcessor<String, Map> { private boolean showIpAddr, showIp6Addr @BeforeStep void beforeStep(StepExecution stepExecution) { } @Override public Map process(String domain) throws Exception { return [:] } } [/xml]
ItemWriter
The ‘ItemWriter’ implements generic ItemWriter data map, which is called after per set or chunk of data is processed by ‘ItemProcessor’ in order to write any output stream.
[xml]import com.test.batch.report.BatchReportFileUtils; import java.io.File; import java.io.IOException; import java.io.Writer; import java.util.HashMap; import java.util.List; import org.springframework.batch.core.JobParameters import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.annotation.AfterStep; import org.springframework.batch.core.annotation.BeforeStep; import org.springframework.batch.item.ExecutionContext import org.springframework.batch.item.ItemWriter; class AbcLineItemWriter implements ItemWriter<HashMap> { @BeforeStep void beforeStep(StepExecution stepExecution) { } @AfterStep void afterStep(StepExecution stepExecution) { } @Override public void write(List<? extends HashMap> lstNameServerMap) throws Exception { } } [/xml]
ReportJobLauncher
The ReportJobLauncher picks a report job and launches/restarts/stops a job using Spring Batch job launcher with appropriate thread pool. We can use one thread pool per different tool to simplify the implementation process.
Submitting/Running a Job
Synchronize the ‘submit method’ to ensure no issues are reported while accessing some executor pool properties (although there should not be more than one instance of a caller). Submit method is called within transaction context of polling message handler to extract a job message from the queue. If this throws an exception, then transaction is rolled back and message is retained in the queue for subsequent processing.
What we need for execution is a non-queuing thread pool with a maximum number of threads (i.e. direct hand-off; we do not want queuing by the thread pool, as it is not persistent. All persistent queuing is taken care by the message queue.), which can be achieved using a synchronous queue.
Note: If the maximum pool size is too small then the perseverant queue will build over time. For further details, please refer thread pool bean configuration under Resource.xml section.
[xml]import java.util.Map; import java.util.concurrent.BlockingQueue import java.util.concurrent.SynchronousQueue import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit import com.test.ApplicationContextHolder import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.JobExecution import org.springframework.batch.core.JobParameters import org.springframework.batch.core.configuration.JobRegistry import org.springframework.batch.core.configuration.support.MapJobRegistry import org.springframework.batch.core.configuration.support.ReferenceJobFactory import org.springframework.batch.core.explore.JobExplorer import org.springframework.batch.core.launch.support.SimpleJobLauncher import org.springframework.batch.core.launch.support.SimpleJobOperator import org.springframework.batch.core.repository.JobRepository import org.springframework.core.task.TaskExecutor; import org.springframework.core.task.TaskRejectedException import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor import org.springframework.batch.core.ExitStatus class ReportJobLauncher { private static final Logger log = LoggerFactory.getLogger(ReportJobLauncher.class) public static synchronized void submit(ReportJob reportJob) { assert reportJob ThreadPoolTaskExecutor taskExecutor = ReportJobThreadPoolManager.getInstance().getExecutor(reportJob.getToolName()) if (taskExecutor.getThreadPoolExecutor().isShutdown()) throw new TaskRejectedException("Task executor is shut down") if (taskExecutor.getThreadPoolExecutor().getActiveCount() >= BatchReportConfig.getToolMaxThreads(reportJob.getToolName())) throw new TaskRejectedException("No threads for batch execution of tool [${reportJob.getToolName()}] available") SimpleJobLauncher launcher = new SimpleJobLauncher() launcher.setJobRepository(reportJob.getJobRepository()) launcher.setTaskExecutor(taskExecutor) launcher.afterPropertiesSet() JobExecution jobExecution = launcher.run(reportJob.getJob(), reportJob.getJobParameters()) if (jobExecution.getExitStatus() == ExitStatus.FAILED) // We should never reach this because we check for available threads above throw new Exception(jobExecution.ExitStatus.getExitDescription()) } }[/xml]
Restarting a Job
Here is the code snippet to restart a failed/stopped job using the job execution id.
[xml]import java.util.Map; import java.util.concurrent.BlockingQueue import java.util.concurrent.SynchronousQueue import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit import com.test.ApplicationContextHolder import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.JobExecution import org.springframework.batch.core.JobParameters import org.springframework.batch.core.configuration.JobRegistry import org.springframework.batch.core.configuration.support.MapJobRegistry import org.springframework.batch.core.configuration.support.ReferenceJobFactory import org.springframework.batch.core.explore.JobExplorer import org.springframework.batch.core.launch.support.SimpleJobLauncher import org.springframework.batch.core.launch.support.SimpleJobOperator import org.springframework.batch.core.repository.JobRepository import org.springframework.core.task.TaskExecutor; import org.springframework.core.task.TaskRejectedException import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor import org.springframework.batch.core.ExitStatus class ReportJobLauncher { private static final Logger log = LoggerFactory.getLogger(ReportJobLauncher.class) public static synchronized void restart(ReportJob reportJob, Long jobExecutionId) { assert reportJob try { ThreadPoolTaskExecutor taskExecutor = ReportJobThreadPoolManager.getInstance().getExecutor(reportJob.getToolName()) if (taskExecutor.getThreadPoolExecutor().isShutdown()) throw new TaskRejectedException("Task executor is shut down") if (taskExecutor.getThreadPoolExecutor().getActiveCount() >= BatchReportConfig.getToolMaxThreads(reportJob.getToolName())){ throw new TaskRejectedException("No threads for batch execution of tool [${reportJob.getToolName()}] available") } JobExplorer jobExplorer = BatchReportConfig.getJobExplorer() JobRegistry jobRegistry = new MapJobRegistry() JobRepository jobRepository = BatchReportConfig.getJobRepository() JobExecution jobExecution = jobExplorer.getJobExecution(jobExecutionId) if(jobExecution){ jobRegistry.register(new ReferenceJobFactory(reportJob.getJob())) if(!(jobExecution.getStatus().equals(BatchStatus.COMPLETED))){ SimpleJobOperator simpleJobOperator = new SimpleJobOperator() simpleJobOperator.setJobExplorer(jobExplorer) simpleJobOperator.setJobRegistry(jobRegistry) simpleJobOperator.setJobRepository(jobRepository) SimpleJobLauncher launcher = new SimpleJobLauncher() launcher.setJobRepository(jobRepository) launcher.setTaskExecutor(taskExecutor) launcher.afterPropertiesSet() simpleJobOperator.setJobLauncher(launcher) simpleJobOperator.restart(jobExecution.getId()) } } } catch (Exception e) { throw new Exception(e) } } }[/xml]
Stopping a Job
To stop a job effectively with minimal time and greater performance, set the chunk size/commit interval as small as possible. The chunk size/commit interval is defined and used in ‘ReportJob’ class to build a new report step function.
Note: When writing data to a CSV, Excel or other file systems, retaining smaller chunk size/commit interval value is a great option. However, if we are using a data source (database) for writing then this impacts the performance (as smaller the commit interval, higher the database interaction).
[xml]import java.util.Map; import java.util.concurrent.BlockingQueue import java.util.concurrent.SynchronousQueue import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit import com.test.ApplicationContextHolder import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.JobExecution import org.springframework.batch.core.JobParameters import org.springframework.batch.core.configuration.JobRegistry import org.springframework.batch.core.configuration.support.MapJobRegistry import org.springframework.batch.core.configuration.support.ReferenceJobFactory import org.springframework.batch.core.explore.JobExplorer import org.springframework.batch.core.launch.support.SimpleJobLauncher import org.springframework.batch.core.launch.support.SimpleJobOperator import org.springframework.batch.core.repository.JobRepository import org.springframework.core.task.TaskExecutor; import org.springframework.core.task.TaskRejectedException import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor import org.springframework.batch.core.ExitStatus class ReportJobLauncher { private static final Logger log = LoggerFactory.getLogger(ReportJobLauncher.class) public static synchronized void stop(ReportJob reportJob, Long jobExecutionId) { assert reportJob try { ThreadPoolTaskExecutor taskExecutor = ReportJobThreadPoolManager.getInstance().getExecutor(reportJob.getToolName()) if (taskExecutor.getThreadPoolExecutor().isShutdown()) throw new TaskRejectedException("Task executor is shut down") if (taskExecutor.getThreadPoolExecutor().getActiveCount() >= BatchReportConfig.getToolMaxThreads(reportJob.getToolName())){ throw new TaskRejectedException("No threads for batch execution of tool [${reportJob.getToolName()}] available") } JobExplorer jobExplorer = BatchReportConfig.getJobExplorer() JobRegistry jobRegistry = new MapJobRegistry() JobRepository jobRepository = BatchReportConfig.getJobRepository() JobExecution jobExecution = jobExplorer.getJobExecution(jobExecutionId) if(jobExecution){ jobRegistry.register(new ReferenceJobFactory(reportJob.getJob())) if(!(jobExecution.getStatus().equals(BatchStatus.COMPLETED))){ jobExecution.stop() SimpleJobOperator simpleJobOperator = new SimpleJobOperator() simpleJobOperator.setJobExplorer(jobExplorer) simpleJobOperator.setJobRegistry(jobRegistry) simpleJobOperator.setJobRepository(jobRepository) SimpleJobLauncher launcher = new SimpleJobLauncher() launcher.setJobRepository(jobRepository) launcher.setTaskExecutor(taskExecutor) launcher.afterPropertiesSet() simpleJobOperator.setJobLauncher(launcher) simpleJobOperator.stop(jobExecution.getId()) } } } catch (Exception e) { throw new Exception(e) } } } [/xml]
Create a utility class to submit/restart/stop a job. To set up a job in a message queue use the below class. The message queue can pick a job and submit it to report job dispatcher to start/restart/dispatch a job.
[xml]import java.util.List; import java.util.Map; import com.test.batch.report.BatchReportConfig import com.test.batch.report.BatchReportFileUtils import com.test.security.SecurityUtils import org.apache.commons.io.FilenameUtils; import org.codehaus.groovy.grails.web.mapping.LinkGenerator import org.springframework.batch.core.JobExecution import org.springframework.batch.core.JobParameters import org.springframework.batch.core.explore.JobExplorer import org.springframework.messaging.MessageChannel import org.springframework.messaging.support.GenericMessage import org.springframework.web.multipart.MultipartFile; class IntegrationUtils { private IntegrationUtils() {} private static ReportJobRequest postReportJobRequest(String toolName, String email, Date submissionDate, List items, String jobRef = null, Map params = null, Long jobExecutionId = null, String jobRquestType = null) { assert toolName && email && submissionDate ReportJobRequest jobRequest = new ReportJobRequest() jobRequest.submissionTime = submissionDate.getTime() jobRequest.toolName = toolName jobRequest.toolDescr = BatchReportConfig.getToolDescr(toolName) jobRequest.userEmail = email jobRequest.jobRef = jobRef jobRequest.toolParams = params jobRequest.inputSource = items ? InputSource.LIST : InputSource.FILE jobRequest.inputList = items jobRequest.reportDelivery = ReportDelivery.LINK jobRequest.chunkSize = BatchReportConfig.getConfigObj().batch.report.chunkSize jobRequest.jobExecutionId = jobExecutionId jobRequest.jobRquestType = jobRquestType jobRequest.validate() MessageChannel msgChannel = BatchReportConfig.getMessageChannel() msgChannel.send(new GenericMessage<String>(jobRequest.toJson())) return jobRequest } } [/xml]
Summing Up
The primary goal of this blog post is to explain how ‘chunk oriented processing’ is integrated with the ‘Spring Integration’ module. Please feel free to drop in your comments and questions (if any) in the comments section.
An Open Source Solutions Partnership with Evoke
We at Evoke Technologies bring more than a decade’s experience as an IT leader to the design and implementation of open source solutions for commercial enterprises. Our dedicated open source experts will understand your company’s most pressing challenges and guide you in developing an OSS plan to meet them. Whether e-commerce, CRM, content management or quality assurance, Evoke has open source expertise to benefit your business.
Contact Evoke Technologies at (937) 202-4161, and learn how we, as your open source solution provider, can start making your company’s software development and operations budget go farther today!
Author
Sathish Jannarapu is working as a Senior Technical Associate at Evoke Technologies. He is currently focused on Java/J2EE based technologies including Spring, Hibernate and Web Services. In his spare time Sathish likes listening to music and reading articles on emerging technologies. |