In meinen bisherigen Projekten läuft die Datenverarbeitung über Mule üblicherweise folgendermaßen ab: Erhält Mule z. B. einen HTTP-Request von einem Client, wird dieser an ein Drittsystem weitergeleitet. Das Drittsystem verarbeitet die Daten und sendet direkt eine Response zurück an Mule, von wo aus sie wiederum zurück an den Client geleitet wird.
In meinem letzten Projekt stellte sich mir jedoch eine abweichende Anforderung durch ein Drittsystem:
Wie bisher erhält Mule einen HTTP-Request und leitet diesen an ein Drittsystem weiter. Diesmal wird mit der Response aber lediglich der Empfang der Daten quittiert. Die verarbeiteten Daten werden erst zu einem späteren Zeitpunkt über einen neuen HTTP-Request seitens des Drittsystems an Mule übersendet. Dadurch ergibt sich eine vermeintlich kleine Änderung mit großer Auswirkung auf die Implementierung.
Welche Probleme bringt das mit sich?
Grundsätzlich gibt es bei dieser Anforderung zwei Problemstellungen, welche sich aus der zugrundeliegenden Architektur des Drittsystems ergeben.
- Der initiale HTTP-Request des Client wartet an dieser Stelle auf eine Antwort, welche durch das asynchrone Verhalten der Drittanwendung im Optimalfall mit einer geringen Verzögerung – im schlechtesten Fall gar nicht – ankommt.
- Die verarbeiteten Daten, welche per HTTP-Request durch das Drittsystem an Mule übergeben werden, müssen dem initialen Request zugeordnet werden.
Mit dem Request-Reply Scope
stellt Mule für diesen und ähnlich gelagerte Anwendungsfälle das passende Werkzeug zur Verfügung.
Der Request-Reply Scope
Wie der Name schon sagt, besteht der Request-Reply Scope
aus einem Request- und einem Reply-Kanal.
Der Request-Kanal wird dabei genutzt, um mit einem Outbound-Endpoint
eine Anfrage z. B. an einen asynchronen Flow oder eine asynchrone Drittanwendung zu leiten.
Im Reply-Kanal gelangt dann die Antwort durch einen Inbound-Endpoint
wieder zurück an den Flow, welcher den Request-Reply Scope
beinhaltet und kann dort weiterverarbeitet werden.
Request und Reply können dabei als voneinander losgelöst betrachtet werden. Um dies zu erreichen, wird mit dem Aufruf des Requests gleichzeitig die sogenannte MULE_CORRELATION_ID
als Inbound-Property
mitgeliefert. Mit dieser ist es möglich, den auf eine Antwort wartenden Reply-Kanal später eindeutig zu identifizieren und anzusteuern.
Zusätzlich bietet der Scope die Möglichkeit, einen Timeout in Millisekunden zu konfigurieren, welcher den Prozess abbricht und eine Fehlerverarbeitung anhand einer org.mule.api.routing.ResponseTimeoutException
ermöglicht, wenn auf dem Reply-Kanal binnen des definierten Zeitraumes keine Nachricht eingeht.
Um die Funktionsweise des Request-Reply Scope
zu veranschaulichen, werde ich im Folgenden ein Beispielszenario skizzieren, welches sich an dem ursprünglich beschriebenen Fall orientiert und als Grundlage für die Implementierung dienen soll.
Das Szenario – „Was macht eigentlich der…?“
Stellen wir uns also vor, dass wir an unserem Arbeitsplatz sitzen und einen Anruf aus der Personalabteilung bekommen. Diese hat den Überblick verloren, welcher Mitarbeiter eigentlich welcher Beschäftigung im Unternehmen nachgeht und benötigt dafür eine Lösung.
Wir wissen sofort, dass wir mit Mule in kurzer Zeit einen Service bereitstellen können, welcher nach Übergabe einer Mitarbeiter-ID die Position des Mitarbeiters im Unternehmen bereitstellt. Unglücklicherweise müssen wir aber auch feststellen, dass für diese Information ein Drittsystem aufgerufen werden muss, welches etwas in die Jahre gekommen ist. Die initiale Anfrage von Mule wird mit „ACK“ bzw. „NACK“ quittiert, je nachdem, ob die Anfrage akzeptiert wurde oder nicht. Die eigentlichen Businessdaten – wer hätte es gedacht – werden zeitverzögert und über einen neuen HTTP-Request bereitstellt.
Mit diesem Bild im Kopf kann mit der Implementierung begonnen werden.
Auf geht’s – die Implementierung
Die Flows erstellen wir dabei in der Reihenfolge, in welcher sie abgearbeitet werden.
Der Request-Reply Flow
Der erste Flow, in welchem wir die Mitarbeiter-ID entgegennehmen, startet mit einem HTTP-Inbound-Listener (localhost:8081)
.
1<http:listener config-ref="HTTP_Listener_Configuration" path="/{id}" 2 doc:name="HTTP" allowedMethods="GET"/>
Diesen Parameter extrahieren wir aus der URI
und setzen ihn in den Payload
.
1<set-payload value="#[message.inboundProperties.'http.uri.params'.id]" 2 doc:name="Set Payload"/>
Nun folgt der Request-Reply Scope
. Diesen konfigurieren wir so, dass wir eine ResponseTimeoutException
erhalten, wenn wir nach Ablauf von zehn Sekunden noch keine Antwort im Reply-Kanal erhalten haben.
Als Outbound-
bzw. Inbound-Endpoint
nutzen wir jeweils einen one-way VM-Transport .
Den Queue Path
setzen wir auf „request“ bzw. „reply“.
Der Request Flow
Der Request Flow ist in unserem Fall dafür verantwortlich, die Anfrage an das Drittsystem weiterzuleiten.
Dieser Flow bekommt als Inbound-Endpoint
einen VM-Transport
, welcher als Queue Path
den Wert „request“ bekommt, so dass die Daten aus dem
Request-Kanal an diese Stelle weitergeleitet werden.
1<vm:inbound-endpoint exchange-pattern="one-way" path="request" doc:name="VM"/>
An der folgenden Stelle ist es dann wichtig, sich die MULE_CORRELATION_ID
zu merken, da wir diese später noch benötigen werden, um die asynchrone Antwort an den richtigen Reply-Kanal zu leiten. Für diesen Zweck nutzen wir einen in-memory ObjectStore
, in welchem wir als Key
die übergebene „ID“ nutzen und als Value
die MULE_CORRELATION_ID
.
1<objectstore:store config-ref="ObjectStore__Connector" key="#[payload]" 2 value-ref="#[message.inboundProperties.MULE_CORRELATION_ID]" 3 doc:name="persist correlation id ObjectStore"/>
Nun können wir die Daten per HTTP-Request an das Drittsystem senden. (In diesem Beispiel ist dies ein weiterer Flow, welcher das Drittsystem simuliert. Dazu gleich mehr.)
1<http:request config-ref="HTTP_Request_Configuration" path="/{id]" 2 method="GET" doc:name="HTTP"/> 3<byte-array-to-string-transformer doc:name="Byte Array to String"/>
Zu diesem Zeitpunkt wissen wir ja bereits, ob die Nachricht erfolgreich („ACK“) oder nicht erfolgreich („NACK“) von dem Drittsystem entgegengenommen werden konnte.
Wenn die Nachricht erfolgreich übermittelt wurde, können wir anhand eines Expression-Filters den Prozess an dieser Stelle beenden, da wir auf die „richtige“ Antwort mit den verarbeiteten Daten warten. Sollte die Antwort „NACK“ gewesen sein, können wir die Nachricht durch den Filter lassen, damit diese Info direkt an den Reply-Kanal gesendet und der Aufrufer über dieses Problem informiert wird.
1<expression-filter expression="#[payload.contains("NACK")]" 2 doc:name="Expression"/>
Das Drittsystem
In diesem Artikel soll es ja eigentlich darum gehen, Probleme zu lösen und nicht Neue zu erschaffen. Da uns aber für diesen Beispielfall natürlich kein System zur Verfügung steht, welchem man sich bedienen kann, muss an dieser Stelle ein weiterer Mule Flow dieses simulieren.
Um es so einfach wie möglich zu halten, erhält dieser Flow einen HTTP-Inbound-Listener (localhost:8082)
, welcher den Request entgegennimmt. Die Response des Flows lautet der Einfachheit halber in diesem Fall immer „ACK“. Dazwischen nutzen wir einen Async-Scope
, da wir das „ACK“ bzw. „NACK“ sofort – die eigentliche Antwort aber mit etwas Verzögerung versenden wollen. Diese fünf Verzögerung erreichen wir innerhalb des Async-Scope
durch ein Thread.sleep(5000) im Payload
. Weiterhin setzen wir dort die eigentliche Nachricht im XML-Format (im Beispiel immer die übergebene „ID“ und statisch den Job des Mitarbeiters).
1<data> 2 <id>#[payload]</id> 3 <job>Developer</job> 4</data>
Der Reply Flow
Der Reply Flow nimmt den Request aus dem Drittsystem per HTTP-Inbound-Listener (localhost:8083)
entgegen und ist dafür zuständig, dass die Nachricht an den auf die Antwort wartenden VM-Transport
im Reply-Kanal weitergeleitet wird. Genau jetzt benötigen wir die Daten aus dem ObjectStore
, welche wir zu Beginn im Request Flow persistiert haben.
Aus dem Drittsystem erhalten wir die Daten im XML-Format, welche zum einen die ID und zum anderen den ausgeübten Job beinhalten. Anhand der ID können wir die zugehörige MULE_CORRELATION_ID
nun wieder aus dem ObjectStore auslesen und diese als Outbound-Property
setzen.
1<byte-array-to-object-transformer doc:name="Byte Array to Object"/> 2<objectstore:retrieve config-ref="ObjectStore__Connector" 3 key="#[xpath3('//id')]" targetProperty="MULE_CORRELATION_ID" 4 targetScope="OUTBOUND" doc:name="read correlation id ObjectStore"/>
Die Daten werden dann wie gewünscht per VM-Transport
an den wartenden VM-Transport
im Reply-Kanal des Request-Reply Scope
gesendet.
1<vm:outbound-endpoint exchange-pattern="one-way" path="reply" doc:name="VM"/>
Testlauf
So weit, so gut – jetzt ist unsere Anwendung soweit, dass wir sie das erste Mal im Anypoint Studio starten können.
Wenn die Anwendung hochgefahren ist, können wir eine Anfrage, z. B. per Postman an unsere Anwendung schicken.
Wenn wir nun Informationen zum Arbeitsplatz des Mitarbeiters mit der ID „42“ haben möchten, schicken wir folgenden Request:
… und bekommen nach ziemlich genau 5 Sekunden folgende Antwort:
Fazit
Durch den Request-Reply Scope
konnte das dargelegte Problem einer asynchronen Antwort innerhalb eines synchronen Flows auf verständliche und technisch saubere Art und Weise gelöst werden, ohne auf Hacks oder andere unschöne Selfmade-Lösungen zurückgreifen zu müssen.
Ich freue mich, wenn Ihnen der Artikel weitergeholfen hat und stehe gerne für Fragen, Diskussionen und Anregungen zur Verfügung.
Tipp: Am 20.04. veranstaltet die codecentric AG gemeinsam mit ihrem Partner MuleSoft das MuleSoft Forum in Solingen. Unter dem Motto „Connect anything, change everything“ widmet sich der Tag ganz dem API-Management und der Anypoint Platform. Zur Veranschaulichung stellen wir Ihnen aktuelle Integrationsprojekte vor. Informationen, Agenda und Anmeldung gibt es hier: https://info.codecentric.de/mulesoft-forum-2018
Weiterführende Infos
Weitere Beiträge
von Pasquale Brunelli
Dein Job bei codecentric?
Jobs
Agile Developer und Consultant (w/d/m)
Alle Standorte
Weitere Artikel in diesem Themenbereich
Entdecke spannende weiterführende Themen und lass dich von der codecentric Welt inspirieren.
Gemeinsam bessere Projekte umsetzen.
Wir helfen deinem Unternehmen.
Du stehst vor einer großen IT-Herausforderung? Wir sorgen für eine maßgeschneiderte Unterstützung. Informiere dich jetzt.
Hilf uns, noch besser zu werden.
Wir sind immer auf der Suche nach neuen Talenten. Auch für dich ist die passende Stelle dabei.
Blog-Autor*in
Pasquale Brunelli
Integration Specialist
Du hast noch Fragen zu diesem Thema? Dann sprich mich einfach an.
Du hast noch Fragen zu diesem Thema? Dann sprich mich einfach an.