Late-Binding parameters set at JobExecutionContext level in your Spring Batch ItemReader

Batch Jobs are especially useful when bulk-oriented, non-interactive, data or computation-intensive processing is required. Though an age-old form of data processing, batch processing, and batch jobs exist even in modern microservices and the cloud-native realm. Specifically, in the Spring ecosystem, the way to develop batch jobs is to use the Spring Batch framework.

What is the Spring Batch Framework?


Building upon the POJO-based development approach of the Spring framework, Spring Batch provides a lightweight, comprehensive framework for writing robust batch jobs.

Spring Batch provides reusable functions that are essential in processing large volumes of records, including logging/tracing, transaction management, job processing statistics, job restart, skip, and resource management. It also provides more advanced technical services and features that will enable extremely high-volume and high-performance batch jobs through optimization and partitioning techniques. Simple as well as complex, high-volume batch jobs can leverage the framework in a highly scalable manner to process significant volumes of information.

A primer on Spring Batch


image.png The above image from the Spring Batch documentation gives us an overview of the Spring Batch landscape and terminologies. Simply put, it follows the known batch architecture where the JobRepository is responsible for scheduling and interacting with the Batch Job. The Batch Job itself composes of one or more Steps. Each Step usually follows the same approach – reads data using the ItemReader, processes it by defining some business logic inside an ItemProcessor, and writes it out by using a configured ItemWriter. This sequence can be visualized as below, as per the Spring documentation: image.png

I provided a primer on Spring Batch but for the purpose of this article, I won’t be walking through how to actually write a Batch Job. I assume that you already know the basics of Spring Batch. So then, what is the scope of this article?

The Use-Case


Usually, the ItemReader in a Step of a Job reads everything from the configured source and passes it onto the ItemProcessor. However, there might be a situation where the ItemReader itself has to filter the data it is reading. Now there are certainly two ways about it:

  1. Read everything and have the ItemProcessor filter out the required data and discard everything else.
  2. Filter the data at the source itself so that the ItemProcessor is only concerned with implementing the Job logic.

In a previous internship, I came across a similar situation where I had a Flask microservice call a REST endpoint that was triggering the Batch Job. In the request body, there was a parameter value. This value had to be used to filter data when reading from the MongoDB database. What follows is an account of how I developed the solution to this Use-Case.

JobParameters


Upon reading, I found that I had to use JobParameters to pass any parameter to my Batch Job. An instance of JobParameters had to be passed as an argument to the JobLauncher.run() method. So this solved the first part of my problem i.e. how to pass a parameter to a running JobInstance of my Job.

@RestController
public class JobLauncherController {
    …
    @RequestMapping(value="/run", method=RequestMethod.POST)
    public BaseResponseDTO runAggregatorJob(@RequestBody String parameter) throws Exception {
        HashMap<String, JobParameter> params = new HashMap<>();
        params.put("parameter", new JobParameter(parameter));
        JobParameters jobParams = new JobParameters(params);
        this.jobLauncher.run(this.aggregatorJobConfig.importAggregatorJob(), jobParameters);
        return new BaseResponseDTO(“Success”, None);
    }
}

Breaking down the AggregatorJobConfig


The AggregatorJobConfig class is used to set up different aspects of the batch job, for example, the Steps involved. It is usually best practice to have such a Config class with an import method that sets up the required Steps and returns a Job instance to the JobLauncher.run() method. This is how the class is written:

@Configuration
@EnableBatchProcessing
@Primary
public class AggregatorJobConfig extends DefaultBatchConfigurer {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    private String parameter = "";

    @Bean
    public AggregatorJobProcessor aggregatorJobProcessor() {
        return new AggregatorJobProcessor();
    }

    @Bean
    @StepScope
    public AggregatorJobReader aggregatorJobReader(
            @Value("#{jobParameters['parameter']}") final String parameter) {
        return new AggregatorJobReader(parameter);
    }

    @Bean
    public AggregatorJobWriter aggregatorJobWriter() {
        return new aggregatorJobWriter();
    }

    @Bean
    public Job importAggregatorJob() {
        return jobBuilderFactory.get("aggregator_job")
                .incrementer(new RunIdIncrementer())
                .flow(importAggregatorStep()).end().build();
    }

    @Bean
    public Step importAggregatorStep() {
        return stepBuilderFactory.get("step1").<Document, Document> chunk(10)
                .reader(aggregatorJobReader(this.parameter))
                .processor(aggregatorJobProcessor())
                .writer(aggregatorJobWriter())
                .allowStartIfComplete(true)
                .build();
    }

}

Starting with the importAggregatorJob(), it simply uses the JobBuilderFactory from Spring Batch to build a new Job and the flow() is defined as whatever Step is returned from the importAggregatorStep() method. A close look at the importAggregatorStep() tells us that it configures the Reader, Processor, and Writer parts of the single Step of the Job. We see that the reader is injected from the aggregatorJobReader() method. The parameter class variable is passed as an argument that flows down to the reader. This is just to ensure that the compilation passes since the JobParameter is not available when setting up the Step but only when the JobInstance is created and run.

A note on chunk()


<Document, Document> chunk(10) is for implementing a widely used Batch processing pattern called Chunk processing. This is Spring Batch configuring the Step saying that at any given time, the Step will read, process, and write in chunks of 10. The Document here simply means the bson Document from the org.bson dependency.

A note on @StepScope


If we look at the aggregatorJobReader() method definition again from the AggregatorJobConfig :

@Bean
@StepScope
public AggregatorJobReader aggregatorJobReader(@Value("#{jobParameters['parameter']}") final String parameter) {
        return new AggregatorJobReader(parameter);
}

The method is annotated with @StepScope. This is required to make the Bean unique for every Step and not a singleton. So by default, Beans in Spring have a singleton scope i.e. a single copy of them is maintained in the application’s runtime context and shared with anyone who needs it. However, when we talk about the JobParameters, the parameter value may be different each time the Job is executed. So by annotating with @StepScope, we force the Reader to be instantiated newly for every step execution with whatever JobParameter that is passed to it for that particular execution. This especially proves helpful for parameter late binding where a parameter is specified either at StepContext or at the JobExecutionContext level.

Using @StepScope also allows scalability since if the same Reader is used in parallel Steps and the Reader would have been maintaining some internal state, the @StepScope will ensure that Reader in one thread does not hamper the state maintained by the Reader in another thread.

The AggregatorJobReader and how it uses the parameter


public class AggregatorJobReader implements ItemReader<Document>, ItemStream {
    private MongoItemReader<Document> reader;
    private final static Logger logger = LogManager.getLogger(AggregatorJobReader.class);
    private String parameter;

    public PlayerAggregatorJobReader(String parameter) {
        this.parameter = parameter;
    }

    @Override
    public void open(ExecutionContext arg0) throws ItemStreamException {
        reader = new MongoItemReader<>();
        MongoTemplate template = new MongoTemplate(
                MongoInstance.INSTANCE.getClient(),
                PropertyFileReader.getValue("mongoDatabaseName"));

        reader.setTemplate(template);
        reader.setTargetType(Document.class);

        reader.setQuery("{'parameter' : " + this.parameter + "}");

        reader.setCollection("…");
        reader.setPageSize(10);

        logger.info("IN AggregatorJobReader, MongoItemReader initialized successfully");
      }
}

We simply use a constructor to initialize the parameter class member from the Config class and use it as our query when reading data by setting it on the reader variable in the open() method using the reader.setQuery() method.

Did you find this article valuable?

Support Jimil Shah by becoming a sponsor. Any amount is appreciated!