Ich möchte in diesem kleinen Artikel einen kleinen Codeschnippsel veröffentlichen, der einen kleine Lücke in Spring-Batch schließt.
Hintergrund
Für die Version 2.2.x von Spring-Batch wird in dem Paket spring-batch-integration ein AsyncItemProcessor und ein AsyncItemWriter angeboten. Beide funktionieren als Wrapper um die eigenen ItemProcessors
und ItemWriter
. Hierbei nutzt der AsyncItemProcessor
einen TaskExecutor
um die Arbeit auf mehrere Threads zu verteilen. Es wird pro Item ein FutureTask
erstellt, der die Ausführung im eigenen ItemProcessor „in die Zukunft auslagert“. Dieses Future
wird dann an den AsyncItemWriter übergeben, der nun auf das Ende der Ausführung wartet und dann an den eigenen ItemWriter delegiert. Mit dieser Methode kann also der Processor-Schritt im Spring-Batch parallelisiert werden.
Skipping
Wer in seinem Projekt bisher die LimitCheckingItemSkipPolicy nutzt, um Skips bis zu einem bestimmten Anzahl zuzulassen, der wird bei einer Umstellung auf eine asynchrone Verarbeitung des Processor-Schritts auf die Lücke stoßen. Der LimitCheckingItemSkipPolicy
kann man Exception-Klassen mitgeben, deren Auftreten im Batch dann übersprungen wird ( – bis zum Erreichen der definierten Anzahl). Hier hat man beispielsweise eine IllegalArgumentException
als skippable deklariert. Wenn man nun seine Verarbeitung mithilfe von AsyncItemProcessor und AsyncItemWriter parallelisiert, wird man feststellen, dass diese SkipPolicy nicht mehr funktioniert.
Problem
Wie schon oben erwähnt wird innerhalb der write-Methode des AsyncItemWriters das vom AsyncItemProcessor übergebene Future ausgeführt (future.get()
). Wenn innerhalb dieser Ausführung nun Exceptions auftreten, werden diese in eine java.util.concurrent.ExecutionException verpackt. Leider bietet Spring-Batch hier keine eigene Lösung in dem spring-batch-integration Paket an.
Lösung
Die LimitCheckingItemSkipPolicy
muss derart erweitert werden, dass sie die auftretenden ExecutionExceptions
anschaut und auf die darin enthaltenen Exceptions reagiert.
1package de.codecentric.batch.skip;
2
3import java.util.Map;
4import java.util.concurrent.ExecutionException;
5
6import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy;
7import org.springframework.classify.Classifier;
8
9public class AsyncLimitCheckingItemSkipPolicy extends LimitCheckingItemSkipPolicy {
10
11 public AsyncLimitCheckingItemSkipPolicy() {
12 super();
13 }
14
15 public AsyncLimitCheckingItemSkipPolicy(int skipLimit, Classifier<Throwable, Boolean> skippableExceptionClassifier) {
16 super(skipLimit, skippableExceptionClassifier);
17 }
18
19 public AsyncLimitCheckingItemSkipPolicy(int skipLimit, Map<Class<? extends Throwable>, Boolean> skippableExceptions) {
20 super(skipLimit, skippableExceptions);
21 }
22
23 @Override
24 public boolean shouldSkip(Throwable t, int skipCount) {
25 if (t instanceof ExecutionException) {
26 return super.shouldSkip(t.getCause(), skipCount);
27 }
28 return super.shouldSkip(t, skipCount);
29 }
30
31}
Ergebnis
Mithilfe dieser AsyncLimitCheckingItemSkipPolicy
funktioniert nun das Skipping auch wieder wie zuvor. Anhand dieses Musters kann man auch andere vorhandene SkipPolicies
so modifizieren, dass sie in der Lage sind auch bei Nutzung der asynchronen Verarbeitung wie gewohnt zu funktionieren.
Weitere Beiträge
von Thomas Bosch
Dein Job bei codecentric?
Jobs
Agile Developer und Consultant (w/d/m)
Alle Standorte
Weitere Artikel in diesem Themenbereich
Entdecke spannende weiterführende Themen und lass dich von der codecentric Welt inspirieren.
Gemeinsam bessere Projekte umsetzen.
Wir helfen deinem Unternehmen.
Du stehst vor einer großen IT-Herausforderung? Wir sorgen für eine maßgeschneiderte Unterstützung. Informiere dich jetzt.
Hilf uns, noch besser zu werden.
Wir sind immer auf der Suche nach neuen Talenten. Auch für dich ist die passende Stelle dabei.
Blog-Autor*in
Thomas Bosch
IT Consultant
Du hast noch Fragen zu diesem Thema? Dann sprich mich einfach an.
Du hast noch Fragen zu diesem Thema? Dann sprich mich einfach an.