Beliebte Suchanfragen
//

Streaming Wikipedia mit Apache Kafka

15.8.2022 | 10 Minuten Lesezeit

Apache Kafka ist in aller Munde und entwickelt sich im Kontext von verteilten Systemen zum De-facto-Standard als Plattform für Event Streaming. Im Rahmen unserer OffProject Time (Weiterbildungszeit) haben wir uns die Plattform auch näher angeschaut und eine kleine Beispielanwendung geschrieben, die verschiedene Features von Kafka nutzt.

TL;DR

  • Apache Kafka stellt einige APIs bereit, um mit den Komponenten von Kafka arbeiten zu können
  • Kafka Connectors sind eine Komponente, um Daten aus Fremdsystemen nach Kafka zu übernehmen (Source Connector) oder Daten aus Kafka bereitzustellen (Sink Connector)
  • Mit Kotlin und Gradle konnten wir einen Connector zum Empfangen von Server-Sent Events entwickeln
  • Die empfangenen Events wurden mit Kafka Streams und Kafka Consumern weiterverarbeitet

Alle Codebeispiele sind auf GitHub verfügbar.

Kafka Connect mit Kotlin und Gradle

Um sinnvoll mit Kafka arbeiten zu können, brauchen wir einen Eventstrom, der regelmäßig Daten erzeugt, auf denen Auswertungen möglich sind. Wir haben uns im Rahmen dieser Anwendung dazu entschieden, die RecentChanges Stream API von Wikimedia  dafür zu benutzen. Über diese API werden via Server-Sent Events unter anderem alle Änderungen an Wikipedia-Artikeln zur Verfügung gestellt. Unser erster Schritt war, einen Kafka Producer (auf Basis von Spring-Boot, Gradle und Kotlin) zu entwickeln, der die gesendeten Events mit einem Webflux Client empfängt und anschließend in ein Kafka Topic schreibt (siehe kafka-playground-producer ).

Auf Events auf diesem Topic lauscht eine weitere Spring-Boot-Applikation, die mit Kafka Streams Auswertungen auf den empfangenen Events macht (zum Beispiel, wie viele Änderungen auf den jeweiligen länderspezifischen Wikipedia-Seiten gemacht wurden). Die Auswertungen werden in eigene Topics als neue Events geschrieben.

Eine dritte Spring-Boot-Applikation empfängt als Consumer diese Events und schreibt die empfangenen Events als Log-Ausgabe.

Ein besonderes Merkmal von Kafka ist die Komponente Kafka Connect. Über sogenannte Connectors können diverse Systeme (z. B. Datenbanken, Messaging-Systeme) an Kafka angebunden werden. Nachdem die Anbindung der Wikimedia API über einen Producer gut geklappt hat, möchten wir diese Anbindung als Connector realisieren. Auch dafür werden Kotlin und Gradle eingesetzt. Wie Kafka Connectors genau funktionieren und wie die Realisierung unseres Connectors gelaufen ist, erfahrt ihr in diesem Beitrag.

Kafka Connect

Kafka Connect ist eine Apache-Kafka-Komponente, die es ermöglicht, externe Systeme einfach und schnell an das Kafka-Cluster anzubinden. Hierbei wird zwischen Kafka und dem externen System ein sogenannter Connector eingesetzt. Es wird zwischen einem Source und einem Sink Connector unterschieden. Während ein Source Connector kontinuierlich Daten aus einem externen Quellsystem liest und als Event auf einem Kafka Topic legt, konsumiert ein Sink Connector die Events auf einem Topic und beliefert damit ein externes System.

Ein Connector besteht aus mehreren Komponenten:

Tasks kümmern sich um die eigentliche Logik: den Datenimport (Source Task) bzw. -export (Sink Task). Der Connector selbst verarbeitet keine Daten, sondern lädt die Konfiguration und verteilt die Arbeit der Tasks auf Workers [1] .

Mehr Details zu Kafka Connect und wie man einen fertigen Connector konfiguriert und installiert hat Akhlaq Malik in seinem Blogpost “ETL with Kafka” beschrieben.

Server-Sent Events vs. WebSockets

Server-Sent Events gibt es bereits seit 2006, dennoch findet man in vielen Applikationen eher WebSockets zur Umsetzung von Nachrichten, die vom Server an den Client gesendet werden. Über das Kommunikationsprotokoll WebSocket können Nachrichten in beide Richtungen – also vom Client zum Server und vom Server zum Client – gesendet werden. Dafür wird eine TCP-Verbindung zwischen Client und Server genutzt. Klassische Anwendungsfälle für WebSockets sind zum Beispiel Chats oder Multiplayer-Spiele. Besonders aufpassen muss man bei der Verwendung von WebSockets im Enterprise-Umfeld: Einige Firewalls haben Probleme mit WebSockets, sodass das Protokoll nicht verwendet werden kann.

Bei der Kommunikation von Client und Server via WebSockets findet zunächst ein Handshake über HTTP statt. Anschließend können sowohl Client als auch Server über das WebSocket-Protokoll miteinander kommunizieren, d. h. beide können Nachrichten an den anderen übermitteln. Die Verbindung kann von beiden Seiten geschlossen werden [2] .

Im Gegensatz zur bidirektionalen Kommunikation über WebSockets läuft die Kommunikation bei Server-Sent Events nur in eine Richtung, nämlich vom Server zum Client. Bei Server-Sent Events melden sich Clients beim Server an und der Server schickt dann neue Nachrichten via HTTP an den Client (ähnlich zu einem Publish-Subscribe-Mechanismus). Dadurch, dass die Kommunikation nicht über ein eigenes Protokoll, sondern über HTTP läuft, sind Server-Sent Events kein Problem für Firewalls im Enterprise-Umfeld. Anwendungsfälle für Server-Sent Events sind zum Beispiel Feeds.

Bei Server-Sent Events meldet sich der Client beim Server an. Der Server publiziert Events an den Client, der diese verarbeiten kann. Der Client kann in diesem Szenario keine Nachrichten an den Server senden. Die Verbindung muss explizit geschlossen werden – bei einem Verbindungsabbruch findet standardmäßig automatisch ein neuer Verbindungsversuch statt. Wenn der Server mit Statuscode 204 No Content antwortet, ist das das Signal für den Client, dass er sich nicht neu verbinden soll [3] .

Für unsere Beispiel-Implementierung benutzen wir eine öffentliche API von Wikimedia, die Server-Sent Events zur Verfügung stellt. Die API (inkl. OpenAPI Spezifikation) ist erreichbar unter [4] .

Entwicklung des Server-Sent Event Connectors

Abhängigkeiten

Für unseren Connector benötigen wir lediglich die connect-api von Kafka und einen HTTP Client, der Server-Sent Events unterstützt. Für Letzteren fiel die Entscheidung auf okhttp-eventsource , da diese Bibliothek ziemlich leichtgewichtig ist. Außerdem bringt sie bereits den Typ MessageEvent mit, sodass man die empfangenen Events nicht selbst aus einem String parsen muss (was z. B. bei Verwendung des HttpClients aus java.net.* notwendig wäre).

1implementation("org.apache.kafka:connect-api:3.2.1")
2implementation("com.launchdarkly:okhttp-eventsource:2.7.0")

Zur Umsetzung eines Kafka Source Connectors sind im einfachsten Fall nur wenige Schritte notwendig: die Definition der Konfigurationsparameter, die Implementierung des SourceConnector Interface sowie die Implementierung des SourceTask Interface.

Konfigurationsparameter

Für den Connector, der die Events von Wikimedia abholen und in ein Kafka Topic als Event schreiben soll, benötigen wir zwei Konfigurationsparameter: die URL der bereitgestellten API sowie das Topic, in das publiziert werden soll.

1class ServerSentEventSourceConnectorConfig(originalProps: Map<*, *>?) : AbstractConfig(CONFIG_DEF, originalProps) {
2   companion object {
3       const val TOPIC_PARAM_CONFIG = "topic"
4       private const val TOPIC_PARAM_DOC = "This is the topic where the events will be sent to"
5       const val SSE_URI_PARAM_CONFIG = "sse.uri"
6       private const val SSE_URI_PARAM_DOC = "The URI where the application will fetch events"
7
8       val CONFIG_DEF: ConfigDef = ConfigDef()
9           .define(TOPIC_PARAM_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TOPIC_PARAM_DOC)
10           .define(SSE_URI_PARAM_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SSE_URI_PARAM_DOC)
11   }
12}

Die Konfigurationsklasse erbt von AbstractConfig – einer abstrakten Klasse aus der connect-api-Bibliothek. Wir definieren in dieser Klasse unsere zwei Parameter topic und sse.uri vom Typ String und dokumentieren diese.

SourceConnector Interface

Die Konfigurationsklasse wird auch in der Implementierung des Connector Interface wiederverwendet. Eine relativ simple Implementierung des Interface kann so aussehen:

1class ServerSentEventSourceConnector : SourceConnector() {
2   private val log = KotlinLogging.logger {}
3   private lateinit var originalProps: Map<String, String>
4
5   override fun version() = PropertiesUtil.connectorVersion
6
7   override fun config() = ServerSentEventSourceConnectorConfig.CONFIG_DEF
8
9   override fun taskClass(): Class<out Task?> = ReceiveServerSentEventTask::class.java
10
11   override fun start(originalProps: Map<String, String>) {
12       this.originalProps = originalProps
13       log.info("Starting connector")
14   }
15
16   override fun taskConfigs(maxTasks: Int) = listOf<Map<String, String>>(HashMap(originalProps))
17
18   override fun stop() {
19       log.info("Stopping connector")
20   }
21}

Beim Start und Stop des Connectors wird jeweils eine Log-Ausgabe geschrieben. Durch die Methode taskClass wird die Implementierung des Tasks festgelegt. Im Task ist dann die tatsächliche „Arbeit“ des Connectors umgesetzt. Dazu später mehr.

In unserer Beispielanwendung haben wir im SourceConnector noch die Methode validate überschrieben, um sicherzustellen, dass unsere beiden Konfigurationsparameter auch gesetzt wurden:

1override fun validate(connectorConfigs: Map<String, String>): Config {
2   val config = super.validate(connectorConfigs)
3   val configValues = config.configValues()
4   var missingTopicDefinition = true
5   var missingSseUriDefinition = true
6
7   for (configValue in configValues) {
8       when(configValue.name()) {
9           TOPIC_PARAM_CONFIG -> if (configValue.value() != null) missingTopicDefinition = false
10           SSE_URI_PARAM_CONFIG -> if (configValue.value() != null) missingSseUriDefinition = false
11       }
12
13       if (!missingTopicDefinition && !missingSseUriDefinition) {
14           break
15       }
16   }
17   if (missingTopicDefinition || missingSseUriDefinition) {
18       throw ConnectException(
19           String.format(
20               "Properties '%s' and '%s' must be set in the configuration.",
21               TOPIC_PARAM_CONFIG,
22               SSE_URI_PARAM_CONFIG
23           )
24       )
25   }
26   return config
27}

Beide Konfigurationsparameter sind für die Ausführung des Tasks Pflicht – daher wird eine Exception geworfen, sobald einer der beiden Parameter nicht gesetzt ist.

SourceTask Interface und Server-Sent Events Client

Die eigentliche Arbeit des Connectors wird in einer SourceTask-Implementierung umgesetzt. Bevor wir uns diese Implementierung ansehen, werfen wir einen Blick auf die Implementierung des Clients zum Empfang der Server-Sent Events von Wikimedia. Dieser basiert auf der Bibliothek okhttp-eventsource und implementiert das EventHandler Interface aus dieser Bibliothek:

1class ServerSentEventClient(url: String) : EventHandler {
2   private val log = KotlinLogging.logger {}
3
4   private var eventSource: EventSource
5
6   private val queue: BlockingQueue<MessageEvent> = LinkedBlockingDeque()
7
8   init {
9       eventSource = try {
10           EventSource.Builder(this, URI(url))
11               .build()
12       } catch (ex: URISyntaxException) {
13           throw IllegalArgumentException("Bad URI: $url")
14       }
15   }
16
17   fun start() {
18       log.info("Start to receive events")
19       eventSource.start()
20   }
21
22   fun stop() {
23       log.info("Stop to receive events")
24       eventSource.close()
25   }
26
27   fun receiveEvents(): List<MessageEvent> {
28       val records: MutableList<MessageEvent> = ArrayList()
29       queue.drainTo(records)
30       return records
31   }
32
33   override fun onOpen() {
34       log.info("Event handler now open")
35   }
36
37   override fun onClosed() {
38       log.info("Event handler now closed")
39   }
40
41   override fun onMessage(eventName: String, messageEvent: MessageEvent) {
42       log.debug { "Received event with name $eventName" }
43       queue.offer(messageEvent)
44   }
45
46   override fun onComment(comment: String) {
47       log.info { "Received comment on event handler: $comment"}
48   }
49
50   override fun onError(throwable: Throwable) {
51       log.error("An exception occurred", throwable)
52   }
53}

Dieser Client ist nicht nur für den Empfang von Server-Sent Events der Wikimedia API geeignet, sondern kann mit jeder API arbeiten, die Server-Sent Events anbietet. Im Konstruktor wird deshalb die URL übergeben, die zum Empfang der Events aufgerufen werden soll. Bei der Initialisierung der Klasse wird die EventSource [5]  konfiguriert. Über die Methoden start und stop wird die konfigurierte EventSource geöffnet bzw. geschlossen. Jedes empfangene Event wird in einer internen Queue gespeichert, bis die empfangen Events über receiveEvents() abgeholt werden.

Dieser Client wird in der Implementierung des Tasks benutzt:

1class ReceiveServerSentEventTask : SourceTask() {
2   private val log = KotlinLogging.logger {}
3
4   private lateinit var config: ServerSentEventSourceConnectorConfig
5   private lateinit var sseClient: ServerSentEventClient
6
7   override fun version() = PropertiesUtil.connectorVersion
8
9   override fun start(properties: Map<String, String>) {
10       log.info("Starting Sample Source Task")
11       config = ServerSentEventSourceConnectorConfig(properties)
12       sseClient = ServerSentEventClient(config.getString(SSE_URI_PARAM_CONFIG))
13       sseClient.start()
14   }
15
16   override fun poll() = sseClient.receiveEvents()
17           .filter { event -> event.data.startsWith("{") || event.data.startsWith("[") } // send only events where data is valid json (starts with { or [)
18           .map { event ->
19                   SourceRecord(
20                       emptyMap<String, Any>(),
21                       emptyMap<String, Any>(),
22                       config.getString(TOPIC_PARAM_CONFIG),
23                       Schema.STRING_SCHEMA,
24                       UUID.randomUUID().toString(),
25                       Schema.STRING_SCHEMA,
26                       event.data
27                   )
28           }
29           .toList()
30
31   override fun stop() {
32       log.info("Stopping Task")
33       sseClient.stop()
34   }
35}

Zu Beginn des Tasks wird der Client initialisiert und das Empfangen der Server-Sent Events gestartet. Solange, bis der Task gestoppt wird (und damit auch das Empfangen der Server-Sent Events gestoppt wird), wird vom Connector immer wieder die Methode poll aufgerufen. Hier holen wir uns die empfangen Events vom Client ab und wandeln die Events in SourceRecords um. Diese Liste an SourceRecords wird vom Connector in das spezifizierte Topic publiziert. Als Key für das Event wird eine UUID generiert.

Und damit ist unser leichtgewichtiger Kafka Connector für Server-Sent Events bereits fertig implementiert.

Build & Deploy Kafka Connector

Es gibt zwei Möglichkeiten, den Kafka Connector im Connect-Container zu installieren. Entweder werden alle JARs und Third-Party-Abhängigkeiten, die benötigt werden, in ein gemeinsames Verzeichnis kopiert; oder es wird ein einzelnes Uber-JAR erzeugt, das alle notwendigen Klassen des Connectors und seine Third-Party-Abhängigkeiten enthält [6] . Wir haben uns für Letzteres entschieden und nutzen das Gradle-Plug-in shadowJar zur Erzeugung des Uber-JARs [7] .

Weitere Hinweise zur Installation und zum Start der Anwendung stehen im README des GitHub-Repository.

Schlussgedanken

  • Die Implementierung eines Kafka Connectors erfordert überschaubaren Aufwand. Für Entwickler*innen ist es trotzdem nicht ganz trivial: Immerhin ist die Grundvoraussetzung für die Implementierung eine laufende Kafka-Connect-Instanz, auf der der eigene Connector immer wieder aufgespielt werden muss. Debugging ist dann über Remote-JVM-Debugging möglich. Den eigenen Connector einfach über die IDE starten und ohne Kafka-Connect-Instanz auszuprobieren ist leider nicht möglich.
  • Innerhalb des Kafka-Connect-Containers wird Java 11 genutzt (Stand August 2022). Wir haben unseren Connector zuerst für die aktuelle LTS-Version (17) kompiliert, dann wurde aber unsere Connector-Klasse nicht gefunden (ClassNotFound-Fehler). Mit JVM-Ziel 11 hat sich der Fehler dann erledigt.
  • Die Wikimedia API eignet sich gut, um mit Event-Streaming und Kafka herumzuspielen. Es kommen kontinuierlich Events und diese Events enthalten ordentliche Daten, mit denen man gut spielen und auf denen man Auswertungen machen kann. Außerdem ist die API kostenlos nutzbar und gut dokumentiert (OpenAPI). Dank der API mussten wir uns nicht damit beschäftigen, selbst Testdaten zu generieren. 🙂

Referenzen

[1] Confluent: https://docs.confluent.io/platform/current/connect/devguide.html
[2] WebSockets Living Standard (2022): https://websockets.spec.whatwg.org/ (Link aufgerufen am 11. August 2022)
[3] HTML Living Standard (2022): https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events (Link aufgerufen am 11. August 2022)
[4] Wikimedia EventStreams API (v.0.7.3): https://stream.wikimedia.org/?doc#/ (Link aufgerufen am 11. August 2022)
[5] HTML Living Standard (2022): https://html.spec.whatwg.org/multipage/server-sent-events.html#eventsource (Link aufgerufen am 11. August 2022)
[6] Confluent: https://docs.confluent.io/home/connect/self-managed/userguide.html#installing-plugins (Link aufgerufen am 15. August 2022)
[7] Matt Schroeder (2018): https://objectpartners.com/2018/07/12/building-a-kafka-connector-with-gradle/ (Link aufgerufen am 15. August 2022)

Beitrag teilen

//

Weitere Artikel in diesem Themenbereich

Entdecke spannende weiterführende Themen und lass dich von der codecentric Welt inspirieren.

//

Gemeinsam bessere Projekte umsetzen.

Wir helfen deinem Unternehmen.

Du stehst vor einer großen IT-Herausforderung? Wir sorgen für eine maßgeschneiderte Unterstützung. Informiere dich jetzt.

Hilf uns, noch besser zu werden.

Wir sind immer auf der Suche nach neuen Talenten. Auch für dich ist die passende Stelle dabei.