Finally, last part of the blog series! Today we’ll have a quick look at scaled batch jobs, done via partitioning and multi-threaded step.
This is the sixth post about the new Java based configuration features in Spring Batch 2.2. Previous posts are about a comparison between the new Java DSL and XML , JobParameters, ExecutionContexts and StepScope , profiles and environments , job inheritance and modular configurations . You can find the JavaConfig code examples on Github .
Partitioning
I won’t explain partitioning in detail here, just this: with partitioning you need to find a way to partition your data. Each partition of data gets its own StepExecution
and will be executed in its own thread. The most important interface here is the Partitioner
.
Of course, when working with different threads, we’ll need a source of those threads, and that’ll be a TaskExecutor
. Since that’s a very low level component, we add it to the InfrastructureConfiguration
interface:
1public interface InfrastructureConfiguration {
2
3 @Bean
4 public abstract DataSource dataSource();
5
6 @Bean
7 public abstract TaskExecutor taskExecutor();
8
9}
For testing environments, this can be an implementation:
1@Configuration
2@EnableBatchProcessing
3public class StandaloneInfrastructureConfiguration implements InfrastructureConfiguration {
4
5 @Bean
6 public DataSource dataSource(){
7 EmbeddedDatabaseBuilder embeddedDatabaseBuilder = new EmbeddedDatabaseBuilder();
8 return embeddedDatabaseBuilder.addScript("classpath:org/springframework/batch/core/schema-drop-hsqldb.sql")
9 .addScript("classpath:org/springframework/batch/core/schema-hsqldb.sql")
10 .addScript("classpath:schema-partner.sql")
11 .setType(EmbeddedDatabaseType.HSQL)
12 .build();
13 }
14
15 @Bean
16 public TaskExecutor taskExecutor() {
17 ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
18 taskExecutor.setMaxPoolSize(4);
19 taskExecutor.afterPropertiesSet();
20 return taskExecutor;
21 }
22
23}
The job that I used as an example during the last blog posts read data from one file and wrote that data to a database. Now we want to read data from more than one file, and we want a partition for each file.
Let’s take a look at the important parts of the job configuration:
1@Bean
2 public Job flatfileToDbPartitioningJob(){
3 return jobBuilders.get("flatfileToDbPartitioningJob")
4 .listener(protocolListener())
5 .start(partitionStep())
6 .build();
7 }
8
9 @Bean
10 public Step partitionStep(){
11 return stepBuilders.get("partitionStep")
12 .partitioner(flatfileToDbStep())
13 .partitioner("flatfileToDbStep", partitioner())
14 .taskExecutor(infrastructureConfiguration.taskExecutor())
15 .build();
16 }
17
18 @Bean
19 public Step flatfileToDbStep(){
20 return stepBuilders.get("flatfileToDbStep")
21 .<Partner,Partner>chunk(1)
22 .reader(reader())
23 .processor(processor())
24 .writer(writer())
25 .listener(logProcessListener())
26 .build();
27 }
28
29 @Bean
30 public Partitioner partitioner(){
31 MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
32 Resource[] resources;
33 try {
34 resources = resourcePatternResolver.getResources("file:src/test/resources/*.csv");
35 } catch (IOException e) {
36 throw new RuntimeException("I/O problems when resolving the input file pattern.",e);
37 }
38 partitioner.setResources(resources);
39 return partitioner;
40 }
We defined a Partitioner
that’s looking for csv files in a special location and creating a partition for each file. We defined the step like we did it in the other examples, and then we defined a special partitionStep
that’s combining our standard step, the partitioner
and the TaskExecutor
. And finally, the job is using that partitionStep
.
Multi-threaded step
This is a quite simple way of scaling, it just adds some more threads to the processing of a step. Since reading from a file isn’t suitable for this kind of scaling we need a new use case, and it’ll be reading from a queue and writing to a log file. We need some more infrastructure for it:
1public interface InfrastructureConfiguration {
2
3 @Bean
4 public abstract DataSource dataSource();
5
6 @Bean
7 public abstract TaskExecutor taskExecutor();
8
9 @Bean
10 public abstract ConnectionFactory connectionFactory();
11
12 @Bean
13 public abstract Queue queue();
14
15 @Bean
16 public abstract JmsTemplate jmsTemplate();
17
18}
We are using ActiveMQ in a test environment:
1@Configuration
2@EnableBatchProcessing
3public class StandaloneInfrastructureConfiguration implements InfrastructureConfiguration {
4
5 @Bean
6 public DataSource dataSource(){
7 EmbeddedDatabaseBuilder embeddedDatabaseBuilder = new EmbeddedDatabaseBuilder();
8 return embeddedDatabaseBuilder.addScript("classpath:org/springframework/batch/core/schema-drop-hsqldb.sql")
9 .addScript("classpath:org/springframework/batch/core/schema-hsqldb.sql")
10 .addScript("classpath:schema-partner.sql")
11 .setType(EmbeddedDatabaseType.HSQL)
12 .build();
13 }
14
15 @Bean
16 public TaskExecutor taskExecutor() {
17 ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
18 taskExecutor.setMaxPoolSize(4);
19 taskExecutor.afterPropertiesSet();
20 return taskExecutor;
21 }
22
23 @Bean
24 public ConnectionFactory connectionFactory() {
25 return new ActiveMQConnectionFactory("tcp://localhost:61616");
26 }
27
28 @Bean
29 public Queue queue() {
30 return new ActiveMQQueue("queueName");
31 }
32
33 @Bean
34 public BrokerService broker() throws Exception{
35 BrokerService broker = new BrokerService();
36 // configure the broker
37 broker.addConnector("tcp://localhost:61616");
38 broker.start();
39 return broker;
40 }
41
42 @Bean
43 public JmsTemplate jmsTemplate(){
44 JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory());
45 jmsTemplate.setDefaultDestination(queue());
46 jmsTemplate.setReceiveTimeout(500);
47 return jmsTemplate;
48 }
49
50}
The job configuration is quite simple then:
1@Configuration
2public class MultiThreadedStepJobConfiguration {
3
4 @Autowired
5 private JobBuilderFactory jobBuilders;
6
7 @Autowired
8 private StepBuilderFactory stepBuilders;
9
10 @Autowired
11 private InfrastructureConfiguration infrastructureConfiguration;
12
13 @Bean
14 public Job multiThreadedStepJob(){
15 return jobBuilders.get("multiThreadedStepJob")
16 .listener(protocolListener())
17 .start(step())
18 .build();
19 }
20
21 @Bean
22 public Step step(){
23 return stepBuilders.get("step")
24 .<String,String>chunk(1)
25 .reader(reader())
26 .processor(processor())
27 .writer(writer())
28 .taskExecutor(infrastructureConfiguration.taskExecutor())
29 .throttleLimit(4)
30 .build();
31 }
32
33 @Bean
34 public JmsItemReader<String> reader(){
35 JmsItemReader<String> itemReader = new JmsItemReader<String>();
36 itemReader.setJmsTemplate(infrastructureConfiguration.jmsTemplate());
37 return itemReader;
38 }
39
40 @Bean
41 public ItemProcessor<String,String> processor(){
42 return new LogItemProcessor<String>();
43 }
44
45 @Bean
46 public ItemWriter<String> writer(){
47 return new LogItemWriter<String>();
48 }
49
50 @Bean
51 public ProtocolListener protocolListener(){
52 return new ProtocolListener();
53 }
54
55}
The difference to a job without any scaling is just the calls to taskExecutor
and throttleLimit
in the step definition.
Conclusion
Configuring scalability in Spring Batch jobs is easy in Java based configuration. And again, you can see the advantage of having an interface for the infrastructure configuration to easily switch between environments.
I hope this blog series was useful for you, and if there are any questions, don’t hesitate to comment the blog posts!
More articles
fromTobias Flohre
Your job at codecentric?
Jobs
Agile Developer und Consultant (w/d/m)
Alle Standorte
Gemeinsam bessere Projekte umsetzen.
Wir helfen deinem Unternehmen.
Du stehst vor einer großen IT-Herausforderung? Wir sorgen für eine maßgeschneiderte Unterstützung. Informiere dich jetzt.
Hilf uns, noch besser zu werden.
Wir sind immer auf der Suche nach neuen Talenten. Auch für dich ist die passende Stelle dabei.
Blog author
Tobias Flohre
Senior Software Developer
Do you still have questions? Just send me a message.
Do you still have questions? Just send me a message.