Kontinuierliche Datenströme in verteilten Systemen ohne Zeitverzögerung zu verarbeiten, birgt einige Herausforderungen. Wir zeigen euch, wie Stream Processing mit Kafka Streams und Spring Boot gelingen kann.
Alles im Fluss: Betrachtet man Daten als fortlaufenden Strom von Informationen, kann man einiges an Geschwindigkeit rausholen.
TL;DR
Wenig Zeit? Dann gibt’s hier die ultrakurze Zusammenfassung:
- Stream Processing eignet sich gut, um große Datenmengen asynchron und mit minimaler Verzögerung zu verarbeiten.
- Moderne Streaming-Frameworks erlauben euch, eure Anwendungsarchitektur komplett nach Event Streams auszurichten und eure Datenhaltung „auf links zu drehen“. Der Event-Stream wird zur „Source of Truth“.
- Kafka bietet mit Kafka Streams ein API, um Streams zu verarbeiten und komplexe Operationen darauf abzubilden. Mittels KStreams und KTables könnt ihr auch komplexere Use Cases abbilden, die einen Zustand halten müssen. Dieser State wird durch Kafka verwaltet, so dass ihr euch nicht selbst um die Datenhaltung kümmern müsst.
- Spring Boot bietet eine Abstraktion für Streams an, mit der sich Stream Processing Workloads implementieren lassen.
Den gesamten Code zu unserem Beispielprojekt findet ihr auf GitHub .
Große Datenmengen schnell verarbeiten – ein Dauerthema
In unserem alltäglichen Projektumfeld haben wir es oft mit Use Cases zu tun, bei denen wir einen kontinuierlichen Strom von Ereignissen durch mehrere beteiligte Systeme möglichst verzögerungsfrei verarbeiten müssen. Dazu zwei Beispiele:
- Stellen wir uns einen klassischen Webshop vor: Kundinnen und Kunden bestellen rund um die Uhr Ware. Die Informationen über die eingegangene Bestellung sind für diverse Teilsysteme von Interesse: Unter anderem benötigt das Lager Infos über die zu versendenden Positionen, wir müssen eine Rechnung schreiben und vielleicht nun selbst Ware nachbestellen.
- Ein anderes Szenario: Ein Autohersteller analysiert zur Verbesserung der Haltbarkeit seiner Fahrzeuge deren Telemetriedaten. Hierzu senden die Bauteile tausender Autos im Sekundentakt Sensordaten, die anschließend auf Anomalien untersucht werden müssen.
Je größer in beiden Beispielen die Datenmengen werden, umso schwieriger wird es für uns, unser System adäquat skalieren zu lassen und die Daten in möglichst kurzer Zeit zu verarbeiten. Dies beschreibt ein generelles Problem: Das Datenaufkommen, mit dem wir im Alltag konfrontiert sind, steigt kontinuierlich, während unsere Kundinnen und Kunden von uns erwarten, dass die Daten so schnell wie möglich verarbeitet werden und nutzbar sind. Moderne Stream Processing Frameworks sollen genau diese Aspekte adressieren.
In diesem Blogpost möchten wir anhand eines konkreten Use Cases demonstrieren, wie sich eine Stream-Processing-Architektur mit Spring Boot und Apache Kafka Streams umsetzen lässt. Auf die Konzeption des Gesamtsystems wollen wir dabei ebenso eingehen wie auf die alltägliche Probleme, die wir bei der Implementierung berücksichtigen sollten.
Streams und Events kurz angerissen
Wir können Stream Processing als Alternative zum Batch Processing einordnen. Anstatt alle eingehenden Daten erst einmal „auf Halde“ zu legen und zu einem späteren Zeitpunkt en bloc zu verarbeiten, ist die Idee beim Stream Processing, eingehende Daten als kontinuierlichen Strom zu betrachten: Die Verarbeitung der Daten erfolgt fortlaufend. Je nach API und Programmiermodell können wir über eine entsprechende domänenspezifische Sprache Operationen auf dem Strom der Daten definieren. Sender und Empfänger benötigen hierbei keinerlei Wissen übereinander. Die am Stream teilnehmenden Systeme sind in der Regel über ein entsprechendes Messaging Backbone voneinander entkoppelt.
Neben den Vorteilen zur zeitkritischen Verarbeitung von Daten ist das Konzept gut dazu geeignet, um Abhängigkeiten zwischen Services in verteilten Systemen zu reduzieren. Durch die Indirektion über ein zentrales Messaging Backbone können Services nun in ein asynchrones Kommunikationsmodell wechseln, indem diese nicht mehr über Commands, sondern über Events kommunizieren. Während Commands direkte, synchrone Aufrufe zwischen Services sind, die eine Aktion auslösen oder einen Zustandswechsel zur Folge haben, wird bei Events lediglich die Information übermittelt, dass ein Ereignis erfolgt ist. Beim Event Processing entscheidet der Empfänger selbst, wann und wie diese Information verarbeitet wird. Dieses Vorgehen ist hilfreich, um eine losere Kopplung zwischen den Komponenten eines Gesamtsystems zu erreichen [1] .
Der Einsatz von Kafka als Stream-Processing-Plattform ermöglicht es uns, unser Gesamtsystem nach Events auszurichten, wie der nächste Abschnitt zeigt.
Kafka als Stream-Processing-Plattform
Was Kafka von klassischen Message Brokern wie RabbitMQ oder Amazon SQS unterscheidet, ist die dauerhafte Speicherung von Eventströmen und die Bereitstellung eines API zur Verarbeitung dieser Events als Streams. Dies ermöglicht es uns, die Architektur eines verteilten Systems umzukrempeln und diese Events zur „Source of Truth“ zu machen: Wenn sich der Zustand unseres Gesamtsystems anhand der Folge aller Events herstellen lässt und diese Events dauerhaft gespeichert sind, kann dieser Zustand jederzeit durch die Abarbeitung des Eventlogs (wieder)hergestellt werden. Das Konzept eines global verfügbaren, unveränderlichen Eventlogs wurde von Martin Kleppmann als „turning the database inside out“ bezeichnet [2] . Gemeint ist damit, dass wir die Konzepte, die wir traditionell innerhalb einer relationalen Datenbank als Black Box gekapselt bereit stellen (ein Transaktionslog, eine Abfrageengine, Indizes und Caches), durch Kafka und Kafka Streams auf die Komponenten eines Systems verteilen können.
Um eine Streamingarchitektur anhand dieser Theorie aufzubauen, bedienen wir uns zwei verschiedener Komponenten aus dem Kafka-Ökosystem:
- Kafka Cluster: Stellt den Storage der Events zur Verfügung. Fungiert als das unveränderliche und permanent gespeicherte Transaktionslog.
- Kafka Streams: Stellt das API zum Stream Processing (Streams API) zur Verfügung. Abstrahiert die Komponenten zum Erzeugen und Konsumieren der Nachrichten und stellt das Programmiermodell bereit, um die Events zu verarbeiten sowie Caches und Abfragen darauf abbilden zu können [3]
Neben der Ausrichtung nach Events und der Bereitstellung von APIs zur Verarbeitung dieser bringt Kafka noch einige Mechanismen mit, um mit großen Datenmengen zu skalieren. Der wichtigste Mechanismus ist das Partitioning: Hierbei werden Nachrichten auf verschiedene Partitionen verteilt, damit diese möglichst effizient parallel gelesen und geschrieben werden können. Eine gute Übersicht über die zentralen Konzepte und Vokabeln im Kafka-Kosmos bietet [4] .
Anhand eines konkreten Use Cases wollen wir euch nun vorstellen, wie wir eine verteilte Streaming-Architektur mit Spring Boot und Kafka implementieren können.
Ein beispielhafter Use Case
Stellen wir uns vor, dass uns unser Kunde – eine Weltraumagentur – mit der Entwicklung eines Systems zur Auswertung von Telemetriedaten der diversen Raumsonden im All beauftragt hat. Die Rahmenbedingungen und Anforderungen gestalten sich wie folgt:
- Wir haben eine nicht näher spezifizierte Menge von Sonden, die uns einen fortwährenden Strom von Telemetriemesswerten übermitteln. Diese Sonden gehören entweder zur US-Weltraumagentur (NASA) oder zur Europäischen Weltraumagentur (ESA).
- Alle Raumsonden senden ihre Messdaten im imperialen System.
- Unser Kunde ist lediglich an den aggregierten Messdaten je Sonde interessiert:
- Was ist die Gesamtdistanz, die eine gegebene Sonde bisher zurückgelegt hat?
- Welche Höchstgeschwindigkeit hat die Sonde bislang erreicht?
- Da die Messdaten der Sonden von NASA und ESA von unterschiedlichen Teams weiterverarbeitet werden, sollen diese getrennt voneinander konsumiert werden können.
- Daten der ESA-Sonden sollen vom imperialen ins metrische System konvertiert werden.
Die Zielarchitektur
In unserem Beispiel haben wir es mit einem kontinuierlichen Strom von Messwerten zu tun, den wir als unsere Events betrachten. Da wir auf diese eine Reihe von Transformationen und Aggregationen vornehmen müssen, eignet sich der Use Case gut zur Verarbeitung als Stream. Die Notwendigkeit, Messdaten zu aggregieren, legt zudem nahe, dass ein Teil unserer Anwendung sich einen Zustand merken können muss, um die je Sonde summierten Werte zu halten.
Zur Umsetzung des Use Cases teilen wir die Anwendung in 3 Teilkomponenten auf. Zentraler Hub für die Kommunikation der Komponenten untereinander bildet ein Kafka Cluster:
Diese Beispielarchitektur werden wir aufbauen, um unseren Use Case abzubilden. Dabei verwenden wir Kafka als zentralen Kommunikations-Hub zwischen unseren Services.
Die Aufgabenverteilung zwischen des Services gestalten wir wie folgt:
- kafka-samples-producer: Überführt die empfangenen Messdaten in ein maschinenlesbares Format und legt diese auf einem Kafka-Topic ab. Da wir im Moment keine echten Raumsonden griffbereit haben, lassen wir diesen Service zufällige Messdaten generieren.
- kafka-samples-streams: Führt die Berechnung der aggregierten Messdaten und die Unterteilung nach Messdaten für NASA oder ESA durch. Da in die Berechnung auch die zuvor berechneten Werte einfließen, muss die Anwendung einen lokalen Zustand halten. Diesen bilden wir mithilfe des Streams API in Form zweier KTables ab (wir trennen hier bereits nach Weltraumagentur auf). Transparent für die Anwendung werden die KTables durch einen sogenannten State Store materialisiert, welcher den Verlauf des Zustands in Kafka Topics speichert.
- kafka-samples-consumer: Stellt einen beispielhaften Client Service einer Weltraumagentur dar, der für die Weiterverarbeitung der aggregierten Messdaten verantwortlich ist. In unserem Fall liest dieser beide Ausgangstopics aus, führt im Fall der ESA eine Konvertierung ins metrische System durch und loggt diese Werte auf stdout.
Implementierung der Services
Alle Services haben wir mit Spring Boot und Kotlin implementiert und nutzen für die Konfiguration und Implementierung die Spring-Abstraktion für Streams . In den nachfolgenden Abschnitten gehen wir auf die konkrete Umsetzung der einzelnen Services ein.
Generierung der Telemetriedaten (kafka-samples-producer)
Zum Schreiben der (fiktiven) Sondenmessdaten verwenden wir das Kafka Producer API, das in Spring über den Spring Cloud Stream Binder für Kafka bereit gestellt wird. Den Service konfigurieren wir über die (application.yml ) wie folgt:
spring:
application:
name: kafka-telemetry-data-producer
cloud:
stream:
kafka:
binder:
brokers: "localhost:29092"
bindings:
telemetry-data-out-0:
producer:
configuration:
key.serializer: org.springframework.kafka.support.serializer.ToStringSerializer
value.serializer: org.springframework.kafka.support.serializer.JsonSerializer
# Otherwise de.codecentric.samples.kafkasamplesproducer.event.TelemetryData will be added as a header info
# which can't be deserialized by consumers (unless they have kafka.properties.spring.json.use.type.headers: false themselves)
spring.json.add.type.headers: false
bindings:
telemetry-data-out-0:
producer:
# use kafka internal encoding
useNativeEncoding: true
destination: space-probe-telemetry-data
Die Konfiguration setzt sich aus einem Kafka-spezifischen (oberer bindings
-Konfigurationsblock) und einem Technologie-agnostischen (unterer bindings
-Konfigurationsblock) Teil zusammen, die über das Binding aneinander gebunden werden.
Im Beispiel erstellen wir das Binding telemetry-data-out-0
. Folgende Konvention liegt dieser Deklaration zugrunde:
<Funktionsname>-<in|out>-<n>
Durch in
oder out
wird definiert, ob es sich um einen Input (einen eingehenden Strom an Daten) oder Output (einen ausgehenden Strom von Daten) Binding handelt. Über die von 0 aufsteigende Zahl am Ende kann eine Funktion an mehrere Bindings gehängt werden – und somit mit einer Funktion aus mehreren Topics gelesen – oder in mehrere Topics geschrieben werden.
Im Kafka-spezifischen Teil verhindern wir, dass Spring jeder Message einen Type Header hinzufügt. Dies würde sonst dazu führen, dass ein Konsument der Nachricht – sollte er dies nicht aktiv verhindern – die im Header angegebene Klasse nicht kennt und dadurch die Nachricht nicht deserialisieren kann.
Der Technologie-agnostische Teil wird in dieser Form für alle Spring-Cloud-Streams-unterstützten Implementierungen wie RabbitMQ, AWS SQS, etc. verwendet. Hier ist lediglich das Ausgabeziel (destination
) anzugeben – in unserem Fall mappt das auf den Namen des Kafka Topics, das wir beschreiben wollen.
Nachdem der Service konfiguriert ist, definieren wir eine Spring-Komponente zum Schreiben der Messdaten (TelemetryDataStreamBridge.kt ):
package de.codecentric.samples.kafkasamplesproducer
import de.codecentric.samples.kafkasamplesproducer.event.TelemetryData
import mu.KotlinLogging
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.cloud.stream.function.StreamBridge
import org.springframework.kafka.support.KafkaHeaders
import org.springframework.messaging.support.MessageBuilder
import org.springframework.stereotype.Component
@Component
class TelemetryDataStreamBridge(@Autowired val streamBridge: StreamBridge) {
private val logger = KotlinLogging.logger {}
fun send(telemetryData: TelemetryData) {
val kafkaMessage = MessageBuilder
.withPayload(telemetryData)
// Make sure all messages for a given probe go to the same partition to ensure proper ordering
.setHeader(KafkaHeaders.MESSAGE_KEY, telemetryData.probeId)
.build()
logger.info { "Publishing space probe telemetry data: Payload: '${kafkaMessage.payload}'" }
streamBridge.send("telemetry-data-out-0", kafkaMessage)
}
}
Als Eingangsschnittstelle in die Streaming-Welt bietet Spring Cloud Streaming zwei verschiedene Möglichkeiten:
- Die imperative
StreamBridge
- Den reaktiven
EmitterProcessor
Für diesen Use Case nutzen wir an dieser Stelle die StreamBridge
. Diese können wir uns von Spring injecten lassen und damit die generierten Sonden-Daten auf unser Topic schreiben. Als Message Key nutzen wir die ID der jeweiligen Sonde, damit Daten einer Sonde immer auf derselben Partition landen. Der Funktion send()
übergeben wir das in der Konfiguration erstellte Binding.
Verarbeitung der Telemetriedaten (kafka-samples-streams)
Der Großteil des Use Cases wird in diesem Teil unserer Anwendung verarbeitet. Wir verwenden das Kafka Streams API, um die generierten Sondendaten zu konsumieren, die erforderlichen Berechnungen durchzuführen und die aggregierten Messdaten anschließend auf die beiden Ziel-Topics zu schreiben. Zugriff auf das Streams API erhalten wir in Spring Boot über den Spring Cloud Stream Binder für Kafka Streams .
Wir starten analog zum Producer API mit dem Erstellen unserer Bindings und konfigurieren unseren Service über die Datei application.yml.
spring:
kafka.properties.spring.json.use.type.headers: false
application:
name: kafka-telemetry-data-aggregator
cloud:
function:
definition: aggregateTelemetryData
stream:
bindings:
aggregateTelemetryData-in-0:
destination: space-probe-telemetry-data
aggregateTelemetryData-out-0:
destination: space-probe-aggregate-telemetry-data-nasa
aggregateTelemetryData-out-1:
destination: space-probe-aggregate-telemetry-data-esa
kafka:
binder:
brokers: "localhost:29092"
streams:
bindings:
aggregateTelemetryData-in-0.consumer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: com.example.kafkasamplesstreams.serdes.TelemetryDataPointSerde
deserializationExceptionHandler: logAndContinue
aggregateTelemetryData-out-0.producer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: com.example.kafkasamplesstreams.serdes.AggregateTelemetryDataSerde
aggregateTelemetryData-out-1.producer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: com.example.kafkasamplesstreams.serdes.AggregateTelemetryDataSerde
management:
endpoints:
web:
exposure:
include: "*"
Zur Umsetzung des Features nutzen wir den functional style, der mit Spring Cloud Stream 3.0.0 eingeführt wurde. Dazu legen wir in der Funktionsdefinition mit aggregateTelemetryData
den Namen unserer Bean fest, welche die Function implementiert. Diese wird die eigentliche Fachlogik enthalten.
Da wir aus einem Topic lesen und in zwei Topics schreiben, benötigen wir hier drei Bindings:
- Ein
IN
Binding zum Konsumieren unserer Messdaten - Ein
OUT
Binding zum Schreiben unserer aggregierten Messdaten für die NASA - Ein
OUT
Binding zum Schreiben unserer aggregierten Messdaten für die ESA
Die im oberen Teil der Konfiguration deklarierte Function können wir als Abbildung unseres IN
Bindings auf unsere OUT
Bindings betrachten. Damit diese mit den Bindings assoziiert werden kann, müssen wir uns an die im vorigen Abschnitt beschriebene Spring-Konvention halten.
Nachdem die Binding-Konfiguration abgeschlossen ist, können wir uns der Implementierung unserer Fachlogik widmen. Hierzu erstellen wir eine Funktion, die mit dem Namen des Functional Bindings aus unserer Konfiguration übereinstimmt. Diese Funktion bildet unsere Kafka Streams Topologie und die Berechnungslogik ab (KafkaStreamsHandler.kt ):
package com.example.kafkasamplesstreams
import com.example.kafkasamplesstreams.events.AggregatedTelemetryData
import com.example.kafkasamplesstreams.events.SpaceAgency
import com.example.kafkasamplesstreams.events.TelemetryDataPoint
import com.example.kafkasamplesstreams.serdes.AggregateTelemetryDataSerde
import mu.KotlinLogging
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Materialized
import org.apache.kafka.streams.kstream.Predicate
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
@Configuration
class KafkaStreamsHandler {
private val logger = KotlinLogging.logger {}
@Bean
fun aggregateTelemetryData(): java.util.function.Function<
KStream<String, TelemetryDataPoint>,
Array<KStream<String, AggregatedTelemetryData>>> {
return java.util.function.Function<
KStream<String, TelemetryDataPoint>,
Array<KStream<String, AggregatedTelemetryData>>> { telemetryRecords ->
telemetryRecords.branch(
// Split up the processing pipeline into 2 streams, depending on the space agency of the probe
Predicate { _, v -> v.spaceAgency == SpaceAgency.NASA },
Predicate { _, v -> v.spaceAgency == SpaceAgency.ESA }
).map { telemetryRecordsPerAgency ->
// Apply aggregation logic on each stream separately
telemetryRecordsPerAgency
.groupByKey()
.aggregate(
// KTable initializer
{ AggregatedTelemetryData(maxSpeedMph = 0.0, traveledDistanceFeet = 0.0) },
// Calculation function for telemetry data aggregation
{ probeId, lastTelemetryReading, aggregatedTelemetryData ->
updateTotals(
probeId,
lastTelemetryReading,
aggregatedTelemetryData
)
},
// Configure Serdes for State Store topic
Materialized.with(Serdes.StringSerde(), AggregateTelemetryDataSerde())
)
.toStream()
}.toTypedArray()
}
}
/**
* Performs calculation of per-probe aggregate measurement data.
* The currently calculated totals are held in a Kafka State Store
* backing the KTable created with aggregate() and the most recently
* created aggregate telemetry data record is passed on downstream.
*/
fun updateTotals(
probeId: String,
lastTelemetryReading: TelemetryDataPoint,
currentAggregatedValue: AggregatedTelemetryData
): AggregatedTelemetryData {
val totalDistanceTraveled =
lastTelemetryReading.traveledDistanceFeet + currentAggregatedValue.traveledDistanceFeet
val maxSpeed = if (lastTelemetryReading.currentSpeedMph > currentAggregatedValue.maxSpeedMph)
lastTelemetryReading.currentSpeedMph else currentAggregatedValue.maxSpeedMph
val aggregatedTelemetryData = AggregatedTelemetryData(
traveledDistanceFeet = totalDistanceTraveled,
maxSpeedMph = maxSpeed
)
logger.info {
"Calculated new aggregated telemetry data for probe $probeId. New max speed: ${aggregatedTelemetryData.maxSpeedMph} and " +
"traveled distance ${aggregatedTelemetryData.traveledDistanceFeet}"
}
return aggregatedTelemetryData
}
}
Zur Berechnung der aggregierten Telemetriedaten pro Sonde müssen wir in der Implementierung der Function drei Probleme lösen:
Implementierung der Berechnung
Das Kafka Streams API bietet eine Reihe vordefinierter Operationen, die wir auf dem eingehenden Stream anwenden können, um die Berechnung dem Use Case entsprechend durchzuführen. Da wir die aggregierten Sondendaten nach Weltraumagenturen trennen müssen, nutzen wir zunächst die branch()
Operation), die uns als Resultat ein Array von KStream
zurückgibt. Wir erhalten zwei Streams, die nun bereits nach Weltraumagentur getrennt sind. Nun kann die Berechnung der Sondendaten erfolgen. Da die Berechnung für beide Agenturen identisch ist, nutzen wir die map()
-Operation von Kotlin, um die nachfolgenden Schritte für beide Streams nur einmalig definieren zu müssen. Um die Sondendaten nach ihrer Probe ID zu gruppieren und schlussendlich zu aggregieren, nutzen wir wieder native Operationen des Streams API. Die Operation aggregate()
benötigt drei Parameter:
- Einen Initializer, der den Anfangswert bestimmt, falls noch keine aggregierten Daten vorhanden sind
- Eine Aggregator-Funktion, die unsere Berechnungslogik zum Aggregieren der Messwerte bestimmt
- Die zu verwendenden Serializer/Deserializer zum Speichern der aggregierten Werte
Die aggregate()
Operation gibt uns als Resultat eine KTable
zurück, die je Sonden-ID den jeweils aktuellsten berechneten Gesamtwert enthält. Da unsere Kunden an den jeweils aktuellsten Daten interessiert sind, wandeln wir die KTable
wieder in einen KStream
um – jede Änderung der KTable
generiert damit ein Event, das den zuletzt errechneten Gesamtwert enthält.
Speicherung der aggregierten Messdaten
Die aggregate()
-Funktion, die wir in unserem Beispiel zur Berechnung der Gesamtwerte verwenden, ist eine sogenannte stateful operation – also eine zustandsbehaftete Operation, die einen lokalen Zustand benötigt, um alle zuvor berechneten Werte zur Berechnung der aktuell gültigen Gesamtwerts berücksichtigen zu können. Das Kafka Streams API übernimmt die Verwaltung des Zustands für uns, indem die Operation eine KTable
materialisiert. Eine KTable
können wir uns als Changelog für Key/Value-Paare vorstellen, das es uns ermöglicht, einen lokalen State zu persistieren (und bei Bedarf wiederherstellen) zu können. KTables
werden im Kafka-Cluster durch sogenannte State Stores materialisiert. Der State Store nutzt ein durch Kafka verwaltetes Topic, um Daten im Cluster persistieren zu können. Das erspart uns die Verwaltung weiterer Infrastrukturkomponenten wie beispielsweise einer Datenbank zur Haltung des Zustands.
Belieferung der unterschiedlichen Weltraumagenturen
Um die NASA und ESA mit den jeweils für sie relevanten, aggregierten Messdaten zu beliefern, haben wir vor der Berechnung die branch()
-Operation verwendet. Infolgedessen besitzt unsere Funktion einen Rückgabewert von Array>
, deren Indizes mit den Weltraumagenturen korrelieren. Das bedeutet für unseren Fall, dass sich in Array[0]
die Daten der NASA befinden und in Array[1]
die Daten der ESA. Diese Aufteilung passt wiederum zu unserer Binding-Config.
Die entstandenen KStreams sind unser Aggregationsergebnis und werden auf die beiden Ausgangstopics geschrieben.
Konsumieren der Telemetriedaten (kafka-samples-consumer)
Zum Lesen der aggregierten Sondenmessdaten verwenden wir das Kafka Consumer API, das wie beim Producer über den Spring Cloud Stream Binder für Kafka bereitgestellt wird. Wir konfigurieren den Service hierzu wie folgt (application.yml ):
spring:
application:
name: kafka-telemetry-data-consumer
# Ignore type headers in kafka message
kafka.properties.spring.json.use.type.headers: false
cloud:
stream:
kafka:
binder:
brokers: "localhost:29092"
bindings:
# this has to match the consumer bean name with suffix in-0 (for consumer)
processNasaTelemetryData-in-0:
consumer:
configuration:
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: de.codecentric.samples.kafkasamplesconsumer.serdes.TelemetryDataDeserializer
processEsaTelemetryData-in-0:
consumer:
configuration:
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: de.codecentric.samples.kafkasamplesconsumer.serdes.TelemetryDataDeserializer
bindings:
processNasaTelemetryData-in-0:
group: ${spring.application.name}
destination: space-probe-aggregate-telemetry-data-nasa
processEsaTelemetryData-in-0:
group: ${spring.application.name}
destination: space-probe-aggregate-telemetry-data-esa
function:
# We define this explicitly since we have several consumer functions
definition: processNasaTelemetryData;processEsaTelemetryData
Weiter geht es nach bekanntem Muster: Wir implementieren Beans, die das Binding umsetzen, und beginnen dabei mit der NASA.
Wir erstellen eine Funktion, die Consumer
implementiert (siehe KafkaConsumerConfiguration.kt ). Dies bedeutet, dass wir einen Input, aber keinen Output auf ein weiteres Topic haben und der Stream in dieser Funktion endet.
Der Consumer für die ESA folgt dem gleichen Muster des Consumers der NASA und hat lediglich den Unterschied, dass die übertragenen Daten im imperialen System in das metrische System umgerechnet werden. Diese Funktionalität haben wir in der init
Funktion unserer Klasse MetricTelemetryData.kt gekapselt.
Mit der Umsetzung der Consumer ist unsere Stream-Processing-Pipeline abgeschlossen und alle Anforderungen umgesetzt.
Die fertige Lösung in Action
Wenn wir unsere Services nun starten, sollten wir nach einigen Sekunden die ersten aggregierten Sondendaten im Log des Consumer Service sehen. Zusätzlich können wir uns mit einem Blick auf AKHQ einen Überblick über die Topics uns Messages in Kafka verschaffen:
Wir erkennen die eingehenden und ausgehenden Topics, auf die unsere Services zugreifen ebenso wie die State Stores, die unser Aggregator Service im Hintergrund für uns in Form von Kafka Topics materialisiert hat.
Lessons learned
Falls ihr nun darüber nachdenkt, das Ganze in euren Projekten einzusetzen, haben wir euch einige Fragen und Argumentationshilfen vorbereitet, damit ihr die für euch passende Entscheidung treffen könnt.
Wann solltet ihr über den Einsatz von Stream Processing nachdenken?
Stream Processing kann überall dort sinnvoll sein, wo ihr mit der Verarbeitung großer Datenmengen konfrontiert seid und Zeitverzögerungen minimiert werden sollen. Die Entscheidung für oder gegen Stream Processing solltet ihr nicht an einer einzelnen Komponente festmachen – die Lösung sollte zur Gesamtarchitektur des Systems und zur Problemstellung passen. Wenn euer Use Case die folgenden Attribute besitzt, könnte Stream Processing eine Lösung sein:
- Ihr seid mit einem dauerhaften Strom von Daten konfrontiert. Beispiel: IoT-Devices senden euch fortlaufend Sensordaten.
- Euer Workload ist kontinuierlich und hat nicht den Charakter einer Datenlieferung. Beispiel: Ihr erhaltet einmal täglich einen Datenexport aus einem Altsystem und müsst beispielsweise das Ende einer Lieferung definitiv bestimmen können. Hier ist Batch Processing in der Regel sinnvoller.
- Die zu verarbeitenden Daten sind zeitkritisch und sofort zu verarbeiten. Hier müsst ihr euch im Fall größerer Datenmengen oder komplexer Berechnungen immer auch Gedanken über die Skalierbarkeit eurer Services machen, um die Verarbeitungsdauer niedrig zu halten. Hierfür sind Streams passend, weil ihr durch Partitionierung und asynchrones Event Processing recht problemlos horizontal skalieren könnt.
Solltet ihr eure Stream Processing Workloads mit Kafka umsetzen?
Dazu geben wir ein vorsichtiges „Ja“ ab. Kafkas Datenhaltung, gepaart mit dem Stream Processing API, ist ein mächtiges Tool und wird durch verschiedene Anbieter as a Service angeboten. Das flacht die Lernkurve ab und minimiert den Wartungsaufwand. In Event-getriebenen Anwendungsfällen fühlt sich das gut und richtig an. Leider haben wir schon mehrfach erlebt, dass Kafka in Anwendungsarchitekturen als reiner Message Bus, für Batch-Workloads oder in Situationen eingesetzt wird, wo synchrone Kommunikation zwischen Services sinnvoller gewesen wäre. Die Vorteile von Kafka und Event-getriebenen Architekturen bleiben somit ungenutzt oder schlimmer: Wir tun uns mit der Problemlösung schwerer als nötig.
Wenn Kafka in eurer Architektur bereits vorhanden ist und eure Problemstellung zur Technologie passt, würden wir euch empfehlen, euch mit den Möglichkeiten des Streams API zu befassen – damit lassen sich Datenpipelines oft ohne weitere Infrastruktur-Komponenten aufbauen und ihr könnt auf Komponenten wie relationale Datenbanken oder In-Memory Datastores verzichten. Confluent bietet sehr gute Dokumente für den Einstieg an.
In Fällen, wo ihr Kafka nicht as a Service nutzen könnt, kann der Aufwand, ein Kafka-Cluster aufzubauen und selbst zu betreiben, den Nutzen überwiegen. In diesen Fällen ist es daher möglicherweise sinnvoller, einen klassischen Message Broker und eine relationale Datenbank einzusetzen.
Solltet ihr Stream Processing Workloads mit Kafka Streams und Spring Boot umsetzen?
Klare Antwort: Kommt drauf an. Wenn ihr in euren Projekten Spring Boot bereits flächendeckend einsetzt, lässt sich durch die Spring Streams-Abstraktion bei der Inbetriebnahme neuer Services einiges an Zeit sparen, da Konfiguration und Implementierung stets einem sehr ähnlichen Schema folgen und wir einen Teil der Komplexität bei der Implementierung verbergen können. Ganz perfekt ist der Spring-Weg jedoch nicht. Hier die Themen, die uns Schmerzen bereitet haben:
- Konventionen & Doku: Die Konfiguration mit der Spring-Abstraktion besteht aus einigen nicht immer sauber dokumentierten und bisweilen intransparenten Konventionen, die Nerven und Zeit kosten können. Als wir diesen Artikel verfasst haben, waren Teile der Spring-Dokumentation veraltet (das von uns verwendete funktionale Programmierparadigma findet in der aktuellen Version der Dokumentation beispielsweise noch keine Erwähnung)
- Fehlerbehandlung: Bei der Verwendung des Stream Binders für Kafka Streams wie in unserer Klasse KafkaStreamsHandler.kt existiert aktuell keine komfortable Lösung, Exceptions, die außerhalb der Deserialisierung auftreten, mit Bordmitteln zu behandeln (was mit Fehlern bei der Deserialisierung passieren soll, definieren wir in application.yml ). Die derzeit einzige Lösung hierfür ist, zur Fehlerbehandlung am Streams API vorbei zu implementieren oder sicherzustellen, dass etwaige Fehler bei der Deserialiserung abgefangen werden. Einen beispielhaften Ansatz dafür liefert TelemetryAggregationTransformer.kt . Durch das Umgehen der Streams API können wir Fehler auf Message-Ebene behandeln, indem wir beispielsweise
try/catch
Logik implementieren. Da wir in diesem Beispiel eine Abstraktionsebene abgestiegen sind, verlieren wir damit leider auch das automatische State-Management überKTables
– State Stores müssen wir bei Bedarf selbst verwalten. Aktuell müsst ihr in diesem Fall also leider abwägen, was euch wichtiger ist. - Aktualität: Die Spring Dependencies hängen immer einige Kafka-Releases hinterher, so dass nicht alle Features immer sofort nutzbar sind (siehe vorheriger Punkt).
Als Alternative zur Spring-Abstraktion existieren diverse frei nutzbare Bibliotheken, um die Konzepte aus Kafka Streams in diverse Tech-Stacks zu integrieren. Confluent bietet gut dokumentierte Step-By-Step Rezepte für zahlreiche unterstützte Umgebungen und Programmiersprachen an, um die Einstiegshürden unabhängig von eurer Umgebung niedrig zu halten. Insofern könnt ihr hier frei entscheiden. Wenn ihr euch mit Spring Boot wohlfühlt: super! Wenn nicht: auch ok!
Ein paar Worte zum Schluss
In diesem Blogpost haben wir euch demonstriert, wie ihr die Konzepte von Stream Processing anhand eines konkreten Use Cases mit Spring Boot und Kafka Streams umsetzen könnt. Wir hoffen, dass ihr mit Stream Processing nun ein weiteres Tool in eurem Werkzeugkasten besitzt und eurem nächsten Projekt nun ganz gelassen entgegen treten könnt.
Den kompletten Code zu unserem Beispielprojekt findet ihr auf GitHub .
Bonusmaterial
Um den Rahmen unseres Blogposts nicht zu sprengen, haben wir uns auf einen recht simplen Use Case beschränkt. Mit KTables
lassen sich aber auch deutlich anspruchsvollere Szenarien umsetzen. Ein weiteres, etwas komplexeres Beispiel (wie führen wir mehrere eingehende Streams zusammen?) findet ihr in unserem GitHub Repo auf einem separaten Branch . Die Zusammenführung der eingehenden Streams bilden wir in der Klasse KafkaStreamsHandler.kt mithilfe der join()
-Operation ab.
Referenzen
[1] Ben Stopford (2018): Designing Event Driven Systems , S. 29 ff.
[2] Martin Kleppmann (2015): Turning the Database inside-out with Apache Samza
[3] Apache Software Foundation (2017): Kafka Streams Core Concepts
[4] Apache Software Foundation (2017): Kafka Main Concepts and Terminology
Weitere Beiträge
von Maik Fleuter & Lukas Maier
Dein Job bei codecentric?
Jobs
Agile Developer und Consultant (w/d/m)
Alle Standorte
Weitere Artikel in diesem Themenbereich
Entdecke spannende weiterführende Themen und lass dich von der codecentric Welt inspirieren.
Gemeinsam bessere Projekte umsetzen.
Wir helfen deinem Unternehmen.
Du stehst vor einer großen IT-Herausforderung? Wir sorgen für eine maßgeschneiderte Unterstützung. Informiere dich jetzt.
Hilf uns, noch besser zu werden.
Wir sind immer auf der Suche nach neuen Talenten. Auch für dich ist die passende Stelle dabei.
Blog-Autor*innen
Maik Fleuter
Senior IT Consultant
Du hast noch Fragen zu diesem Thema? Dann sprich mich einfach an.
Lukas Maier
IT Consultant
Du hast noch Fragen zu diesem Thema? Dann sprich mich einfach an.
Du hast noch Fragen zu diesem Thema? Dann sprich mich einfach an.