Beliebte Suchanfragen
|
//

Skipping in asynchroner Batchverarbeitung

28.3.2014 | 2 Minuten Lesezeit

Ich möchte in diesem kleinen Artikel einen kleinen Codeschnippsel veröffentlichen, der einen kleine Lücke in Spring-Batch schließt.

Hintergrund

Für die Version 2.2.x von Spring-Batch wird in dem Paket spring-batch-integration ein AsyncItemProcessor und ein AsyncItemWriter angeboten. Beide funktionieren als Wrapper um die eigenen ItemProcessors und ItemWriter. Hierbei nutzt der AsyncItemProcessor einen TaskExecutor um die Arbeit auf mehrere Threads zu verteilen. Es wird pro Item ein FutureTask erstellt, der die Ausführung im eigenen ItemProcessor „in die Zukunft auslagert“. Dieses Future wird dann an den AsyncItemWriter übergeben, der nun auf das Ende der Ausführung wartet und dann an den eigenen ItemWriter delegiert. Mit dieser Methode kann also der Processor-Schritt im Spring-Batch parallelisiert werden.

Skipping

Wer in seinem Projekt bisher die LimitCheckingItemSkipPolicy nutzt, um Skips bis zu einem bestimmten Anzahl zuzulassen, der wird bei einer Umstellung auf eine asynchrone Verarbeitung des Processor-Schritts auf die Lücke stoßen. Der LimitCheckingItemSkipPolicy kann man Exception-Klassen mitgeben, deren Auftreten im Batch dann übersprungen wird ( – bis zum Erreichen der definierten Anzahl). Hier hat man beispielsweise eine IllegalArgumentException als skippable deklariert. Wenn man nun seine Verarbeitung mithilfe von AsyncItemProcessor und AsyncItemWriter parallelisiert, wird man feststellen, dass diese SkipPolicy nicht mehr funktioniert.

Problem

Wie schon oben erwähnt wird innerhalb der write-Methode des AsyncItemWriters das vom AsyncItemProcessor übergebene Future ausgeführt (future.get()). Wenn innerhalb dieser Ausführung nun Exceptions auftreten, werden diese in eine java.util.concurrent.ExecutionException verpackt. Leider bietet Spring-Batch hier keine eigene Lösung in dem spring-batch-integration Paket an.

Lösung

Die LimitCheckingItemSkipPolicy muss derart erweitert werden, dass sie die auftretenden ExecutionExceptions anschaut und auf die darin enthaltenen Exceptions reagiert.

1package de.codecentric.batch.skip;
2 
3import java.util.Map;
4import java.util.concurrent.ExecutionException;
5 
6import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy;
7import org.springframework.classify.Classifier;
8 
9public class AsyncLimitCheckingItemSkipPolicy extends LimitCheckingItemSkipPolicy {
10 
11    public AsyncLimitCheckingItemSkipPolicy() {
12        super();
13    }
14 
15    public AsyncLimitCheckingItemSkipPolicy(int skipLimit, Classifier<Throwable, Boolean> skippableExceptionClassifier) {
16        super(skipLimit, skippableExceptionClassifier);
17    }
18 
19    public AsyncLimitCheckingItemSkipPolicy(int skipLimit, Map<Class<? extends Throwable>, Boolean> skippableExceptions) {
20        super(skipLimit, skippableExceptions);
21    }
22 
23    @Override
24    public boolean shouldSkip(Throwable t, int skipCount) {
25        if (t instanceof ExecutionException) {
26            return super.shouldSkip(t.getCause(), skipCount);
27        }
28        return super.shouldSkip(t, skipCount);
29    }
30 
31}
Ergebnis

Mithilfe dieser AsyncLimitCheckingItemSkipPolicy funktioniert nun das Skipping auch wieder wie zuvor. Anhand dieses Musters kann man auch andere vorhandene SkipPolicies so modifizieren, dass sie in der Lage sind auch bei Nutzung der asynchronen Verarbeitung wie gewohnt zu funktionieren.

|

Beitrag teilen

//

Weitere Artikel in diesem Themenbereich

Entdecke spannende weiterführende Themen und lass dich von der codecentric Welt inspirieren.

//

Gemeinsam bessere Projekte umsetzen.

Wir helfen deinem Unternehmen.

Du stehst vor einer großen IT-Herausforderung? Wir sorgen für eine maßgeschneiderte Unterstützung. Informiere dich jetzt.

Hilf uns, noch besser zu werden.

Wir sind immer auf der Suche nach neuen Talenten. Auch für dich ist die passende Stelle dabei.