Batch Processing Large Data Sets with Spring Boot and Spring Batch

Batch processing of data is an efficient way of processing large volumes of data where data is collected, processed and then batch results are produced. Batch processing can be applied in many use cases. One common use case of batch processing is transforming a large set of flat, CSV or JSON files into a structured format that is ready for further processing.

In this article, I am going to demonstrate batch processing using one of the projects of Spring which is Spring Batch. Spring Batch provides functions for processing large volumes of data in batch jobs. This includes logging, transaction management, job restart (if a job is not completed), job skip, job processing statistics, and resource management.

Let us look at how Spring Batch works in a nutshell.

Spring Batch overview

A step is an object that encapsulates sequential phase of a job and holds all the necessary information to define and control processing. It delegates all the information to a Job to carry out its task.

Spring Batch uses chunk oriented style of processing which is reading data one at a time, and creating chunks that will be written out within a transaction. The item is read by ItemReader and passed onto ItemProcessor, then it is written out by ItemWriter  once the item is ready. The Job Repository will be used to store the step execution periodically during the item processing.

Let’s get into coding.

Setting up Project

Create a sample Spring Boot application. Here is my sample project structure.

Project structure

In this article, I will be using sample data which represents voltage drop for a discharging Capacitor. We will read this data from a CSV file and write it out to an in-memory database which is H2.

Add the required dependencies to pom.xml.

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
        </dependency>

The CSV file Volts.csv contains two fields volt and time. Let us create a JPA entity called Voltage. Note that this entity is just for the example. It is not production ready code.  

package com.techshard.batch.dao.entity;

import javax.persistence.*;
import javax.validation.constraints.NotNull;
import java.math.BigDecimal;

@Entity
public class Voltage {

    @Id
    @Column (name = "ID", nullable = false)
    @GeneratedValue (strategy = GenerationType.IDENTITY)
    private long id;

    @NotNull
    @Column (name = "volt", precision = 10, scale = 4, nullable = false)
    private BigDecimal volt;

    @NotNull
    @Column (name = "time", nullable = false)
    private double time;

    public Voltage() {
    }

    public Voltage(final BigDecimal volt, final double time) {
        this.volt = volt;
        this.time = time;
    }

    public long getId(){
        return id;
    }

    public BigDecimal getVolt(){
        return volt;
    }

    public void setVolt(final BigDecimal volt){
        this.volt = volt;
    }

    public double getTime(){
        return time;
    }

    public void setTime(final double time){
        this.time = time;
    }
}

Batch configuration

Let’s create a batch configuration class:

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
}

@EnableBatchProcessing enables Spring Batch features and provides a base configuration for setting up batch jobs in an @Configuration class.

We need to include two components in the above class.

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

JobBuilderFactory creates a job builder. Using StepBuilderFactory, Spring Batch will create a step builder and will initialize its job repository and transaction manager.

Configuring ItemReader

We will now define ItemReader interface for our model Voltage which will be used for reading data from CSV file.

@Bean
    public FlatFileItemReader<Voltage> reader() {
        return new FlatFileItemReaderBuilder<Voltage>()
                .name("voltItemReader")
                .resource(new ClassPathResource("Volts.csv"))
                .delimited()
                .names(new String[]{"volt", "time"})
                .lineMapper(lineMapper())
                .fieldSetMapper(new BeanWrapperFieldSetMapper<Voltage>() {{
                    setTargetType(Voltage.class);
                }})
                .build();
    }

Here, we are creating FlatFileItemReaderBuilder of model Voltage.

name – Name of the ItemReader

resource – Specify path for the resource file to be read.

delimited – Builds delimited tokenizer.

names      —  Pass the fields that are to be read

lineMapper – Interface to map lines from file to domain object.

fieldSetMapper — Interface to map data obtained from a fieldset to an object.

Note that, we have passed custom lineMapper() above. Let us define that bean.

@Bean
    public LineMapper<Voltage> lineMapper() {

        final DefaultLineMapper<Voltage> defaultLineMapper = new DefaultLineMapper<>();
        final DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
        lineTokenizer.setDelimiter(";");
        lineTokenizer.setStrict(false);
        lineTokenizer.setNames(new String[] {"volt","time"});

        final VoltageFieldSetMapper fieldSetMapper = new VoltageFieldSetMapper();
        defaultLineMapper.setLineTokenizer(lineTokenizer);
        defaultLineMapper.setFieldSetMapper(fieldSetMapper);

        return defaultLineMapper;
    }

 In the custom lineMapper, we can specify the delimiter to be read from CSV file and also used for reading string values into database specific datatypes. The VoltageFieldSetMapper is defined as follows:

package com.techshard.batch.configuration;

import com.techshard.batch.dao.entity.Voltage;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.stereotype.Component;

@Component
public class VoltageFieldSetMapper implements FieldSetMapper<Voltage> {

    @Override
    public Voltage mapFieldSet(FieldSet fieldSet) {
        final Voltage voltage = new Voltage();

        voltage.setVolt(fieldSet.readBigDecimal("volt"));
        voltage.setTime(fieldSet.readDouble("time"));
        return voltage;

    }
}

Configuring ItemProcessor

We will define the processor in Batch configuration as follows:

    @Bean
    public VoltageProcessor processor() {
        return new VoltageProcessor();
    }

We have defined a custom processor VoltageProcessor. Once the data is read, this processor is used for processing the data such as data conversion, applying business logic and so on. This is just an example. This custom processor may not always be required. It can be defined depending on your application requirements.

package com.techshard.batch.configuration;

import com.techshard.batch.dao.entity.Voltage;

import org.springframework.batch.item.ItemProcessor;

import java.math.BigDecimal;

public class VoltageProcessor implements ItemProcessor<Voltage, Voltage>{

    @Override
    public Voltage process(final Voltage voltage) {
        final BigDecimal volt = voltage.getVolt();
        final double time = voltage.getTime();

        final Voltage processedVoltage = new Voltage();
        processedVoltage.setVolt(volt);
        processedVoltage.setTime(time);
        return processedVoltage;
    }
}

ItemWriter

Once the data is processed, the data needs to be stored in database as per our requirement. We will define a JdbcBatchWriter to insert data into database table. There is also JPA specific JpaItemWriter which can be used with EntityManager.

@Bean
    public JdbcBatchItemWriter<Voltage> writer(final DataSource dataSource) {
        return new JdbcBatchItemWriterBuilder<Voltage>()
                .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
                .sql("INSERT INTO voltage (volt, time) VALUES (:volt, :time)")
                .dataSource(dataSource)
                .build();
    }

Job and Step Configuration

We will now define a Step which will contain a reader, processor, and writer in the same way we need a  StepBuilderFactory, which will be used to inject in our Job() method.

@Bean
    public Step step1(JdbcBatchItemWriter<Voltage> writer) {
        return stepBuilderFactory.get("step1")
                .<Voltage, Voltage> chunk(10)
                .reader(reader())
                .processor(processor())
                .writer(writer)
                .build();
    }

Here, step1 is just a name of the Step which we can define. We can also specify chunk size in Step configuration.

Finally, a Job is defined as follows:

@Bean
    public Job importVoltageJob(NotificationListener listener, Step step1) {
        return jobBuilderFactory.get("importVoltageJob")
                .incrementer(new RunIdIncrementer())
                .listener(listener)
                .flow(step1)
                .end()
                .build();
    }

Note that we have passed NotificationListener that extends Spring Batch’s JobExecutionListenerSupport. It can log results before or after job execution. Here, we have only defined afterJob(). JobExecutionListenerSupport also provides beforeJob() to log any information before the job execution.

package com.techshard.batch.configuration;

import com.techshard.batch.dao.entity.Voltage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

@Component
public class NotificationListener extends JobExecutionListenerSupport{

    private static final Logger LOGGER = LoggerFactory.getLogger(NotificationListener.class);

    private final JdbcTemplate jdbcTemplate;

    @Autowired
    public NotificationListener(final JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public void afterJob(final JobExecution jobExecution) {
        if(jobExecution.getStatus() == BatchStatus.COMPLETED) {
            LOGGER.info("!!! JOB FINISHED! Time to verify the results");

            jdbcTemplate.query("SELECT volt, time FROM voltage",
                    (rs, row) -> new Voltage(
                            rs.getBigDecimal(1),
                            rs.getDouble(2))
            ).forEach(voltage -> LOGGER.info("Found <" + voltage + "> in the database."));
        }
    }
}

Before we run the application, we will enable H2 (in-memory) console in application.properties.

spring.datasource.url=jdbc:h2:mem:batchdb
spring.datasource.driverClassName=org.h2.Driver
spring.datasource.username=sa
spring.datasource.password=password
spring.jpa.database-platform=org.hibernate.dialect.H2Dialect
spring.h2.console.enabled=true

Additionally, I have also configured Aspect using Spring AOP to measure the time taken by batch execution.

package com.techshard.batch;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Aspect
@Component
public class TracePerformanceAspect {

    private final Logger logger = LoggerFactory.getLogger(TracePerformanceAspect.class);

    @Around ("execution(* com.techshard..*.*(..)))")
    public Object logTracePerformanceAspect(ProceedingJoinPoint joinPoint) throws Throwable {

        MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();

        //Get intercepted method details
        String className = methodSignature.getDeclaringType().getSimpleName();
        String methodName = methodSignature.getName();

        long start = System.currentTimeMillis();

        Object result = joinPoint.proceed();
        long end = System.currentTimeMillis();

        //Log method execution time
        logger.info("Execution time of " + className + "." + methodName + " :: " + (end - start) + " ms");

        return result;
    }
}

Running the Application

Run the Spring Boot application. Once the application is started, login to H2 console using link http://localhost:8080/h2-console/ . Then, you will get a login screen as below.

H2 console login page

Once we login, we will be able to see the table Voltage and all the tables created by Spring Batch. In these tables, we will find all the details about job execution such as job name, status, id and so on.

H2 Database

Conclusion

This article just scratched the surface of Spring Batch in general. The example used in this article is not production ready code. You can define job configuration depending on your project requirements. I hope you enjoyed this article. Let me know if you have any comments or suggestions.

The complete code can be found on my GitHub repository.

Advertisements

3 thoughts on “Batch Processing Large Data Sets with Spring Boot and Spring Batch

  1. Nice article. Just a comment: the utility class StopWatch from the spring framework library could have been used to measure time differences.

    • Hi Marcus, thanks for reading the article.

      I am aware of StopWatch from Spring. This object is said to conceal the use of System.currentTimeMillis() and it is not meant to be used in production since it is not thread safe. There is another StopWatch utility class from Apache Commons which uses java.lang.System.nanoTime() internally. java.lang.System.nanoTime() gives more accurate time.

      Anyway, long story short, I just wanted to use something quickly as it was not my main concern for the article 🙂

      • Hi

        I don’t see thread-safety is a concern, as every code is contained in the method body; but ok.

Reader Comments

This site uses Akismet to reduce spam. Learn how your comment data is processed.