With this article I want to publish a small code snippet that fills a gap in Spring-Batch code.
Background
With version 2.2.x of Spring-Batch, Spring offers an AsyncItemProcessor and an AsyncItemWriter as part of the library spring-batch-integration. Both of them run as wrappers around own single threaded ItemProcessors
and ItemWriters
. The AsyncItemProcessor
uses a TaskExecutor
to distribute his work to separate threads. This is done by creating a FutureTask
per Item “to move the processing to the future”. This Future
is given to the AsyncItemWriter that itself waits for the end of processing. If the Future has completed his work the processed Item is delegated to the own ItemWriter. With this method it is easy to parallalize the processing step of a Spring-Batch.
Skipping
If you are using the LimitCheckingItemSkipPolicy to handle Exceptions you would see the already mentioned gap when you migrate to asynchronous processing. When you create a LimitCheckingItemSkipPolicy
you have to pass Exception classes that will be skipped at runtime ( – until reaching the given limit). For instance you can pass an IllegalArgumentException
. When you parallelize your processing with AsyncItemProcessor and AsyncItemWriter you will note that the SkipPolicy is not working anymore.
Problem
As mentioned before the AsyncItemWriters write method will perform the Future (future.get()
) that was passed by the AsyncItemProcessor. If there raises an Exception inside this method execution this Exception is wrapped into a java.util.concurrent.ExecutionException. Unfortunately Spring-Batch doesn’t offer a build-in solution inside spring-batch-integration.
Solution
You have to extend LimitCheckingItemSkipPolicy so that it react on the Exceptions included in upcoming ExecutionExceptions.
1package de.codecentric.batch.skip;
2
3import java.util.Map;
4import java.util.concurrent.ExecutionException;
5
6import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy;
7import org.springframework.classify.Classifier;
8
9public class AsyncLimitCheckingItemSkipPolicy extends LimitCheckingItemSkipPolicy {
10
11 public AsyncLimitCheckingItemSkipPolicy() {
12 super();
13 }
14
15 public AsyncLimitCheckingItemSkipPolicy(int skipLimit, Classifier<Throwable, Boolean> skippableExceptionClassifier) {
16 super(skipLimit, skippableExceptionClassifier);
17 }
18
19 public AsyncLimitCheckingItemSkipPolicy(int skipLimit, Map<Class<? extends Throwable>, Boolean> skippableExceptions) {
20 super(skipLimit, skippableExceptions);
21 }
22
23 @Override
24 public boolean shouldSkip(Throwable t, int skipCount) {
25 if (t instanceof ExecutionException) {
26 return super.shouldSkip(t.getCause(), skipCount);
27 }
28 return super.shouldSkip(t, skipCount);
29 }
30
31}
Conclusion
With this custom AsyncLimitCheckingItemSkipPolicy
the skipping is now working as before. You can use this pattern also to extend other SkipPolicies
so that they behave as desired, also after migrating to asynchronous processing.
More articles
fromThomas Bosch
Your job at codecentric?
Jobs
Agile Developer und Consultant (w/d/m)
Alle Standorte
More articles in this subject area
Discover exciting further topics and let the codecentric world inspire you.
Gemeinsam bessere Projekte umsetzen.
Wir helfen deinem Unternehmen.
Du stehst vor einer großen IT-Herausforderung? Wir sorgen für eine maßgeschneiderte Unterstützung. Informiere dich jetzt.
Hilf uns, noch besser zu werden.
Wir sind immer auf der Suche nach neuen Talenten. Auch für dich ist die passende Stelle dabei.
Blog author
Thomas Bosch
IT Consultant
Do you still have questions? Just send me a message.
Do you still have questions? Just send me a message.