The Kotlin language is gaining more and more attention and is being used in an increasing number of projects. One thing that Kotlin can be used for is implementing special domain-specific-languages (DSLs).
The Wikipedia entry on DSL states:
A domain-specific language (DSL) is a computer language specialized to a particular application domain.
In this post I will implement a minimal DSL for accessing Apache Kafka which uses keywords like kafka, producer, consumer. It also defines a nesting structure that models the relationship between these concepts.
The purpose of this blog post is to show how to create a custom DSL with Kotlin. The DSL created will only have minimal functionality. The messages that are written to the Kafka topics are just plain strings. The focus is on creating the DSL, not using Apache Kafka.
The complete code for this sample project is available on Github.
The target DSL
The following code pieces show the DSL in action. The first example shows a producer application that writes strings into Kafka. The second example is a consumer application that reads them from Kafka.
I first show the final code that results in using the DSL before explaining how to get there:
The producer
fun main(args: Array) {
val log: Logger = LoggerFactory.getLogger("de.codecentric.ProducerRunnerDSL")
kafka("localhost:9092") {
producer("kt-topic") {
(1..10).forEach {
val msg = "test message $it ${LocalDateTime.now()}"
log.info("sending $msg")
send(msg)
}
flush()
}
}
}
In this code I first use the kafka("localhost:9092")
function to define the connection to the Kafka broker that is running on the local machine. Within this context I then create a producer for the topic with producer("kt-topic")
.
In the context of the producer there is a loop that produces 10 messages. These messages are sent to Kafka by calling the producer's send(msg)
method. After the loop, the flush()
method of the producer is called to make sure that the messages are delivered to Kafka before the program terminates.
The consumer
fun main(args: Array) {
val log: Logger = LoggerFactory.getLogger("de.codecentric.ConsumerRunnerDSL")
kafka("localhost:9092") {
consumer("kt-topic") {
Runtime.getRuntime().addShutdownHook(Thread(Runnable {
stop()
}))
consume {
log.info("got $it")
}
}
}
}
Like the producer, the consumer application first defines the connection the Kafka broker. It then creates a consumer for the topic with consumer("kt-topic")
. In this consumer, a shutdownhook is added which will call the consumer's stop()
method to ensure that the underlying KafkaConsumer
is shut down properly on program termination.
Then the consumer's consume
method is called with a lambda function that processes the String which is retrieved from the Kafka topic by logging its value. This consume
method blocks on reading the data from Kafka and will only return on an error or when the stop()
method has been called from a different thread, for example on program shutdown.
Prerequesites
Apache Kafka
To get the programs running, it is necessary to have a running Apache Kafka . I used the Confluent Platform OpenSource to test my implementation, any other Apache Kafka setup is fine as well, as long as you have the address of a Kafka broker.
Kotlin concepts
There are two Kotlin language features that are essential for the writing of a DSL like I show it in this article:
lambda arguments to functions:
When a function func
has another function as last argument and in the call to this function this argument is passed in as a lambda, then the argument can be written after the function argument call (documentation ):
// func gets a String as first and a function from String to String as second argument.
// it is calling the second argument with the first as parameter and prints the result
fun func(arg1: String, calledWithArg1: (String) -> String) {
println(calledWithArg1(arg1))
}
// this might be called like this (don't do it this way!):
func("foo", { s: String -> s + s })
// prints foofoo
// the Kotlin way, moved out of the parenthesis and using the its default parameter:
func("foo") {
it + it
}
// prints foofoo as well
extension functions:
Extension functions offer the possibility to add functionality to existing classes. They are not implemented in the class itself (Documentation) – – the example is a bit off-topic here
class PaymentInfo(var amount: Double, var creditCard: String)
val p = PaymentInfo(42.0, "1234-5678-0123-1234")
// extension function to double the price, not defined within the class
fun PaymentInfo.twice() {
// here 'this' is the PaymentInfo object
amount *= 2
}
// call the extension function for an instance of class PaymentInfo
p.twice()
// now the amount is 84.0
// a function to modify a PaymentInfo with an extension passed in as a lambda
fun modifyPayment(payment: PaymentInfo, modifier: PaymentInfo.() -> Unit) {
payment.modifier()
}
// mask the credit card
modifyPayment(p) {
// hier ist 'this' das PaymentInfo Objekt
creditCard = "XXX"
}
// now the creditcard value in the object is masked
We can use extension functions to either add functionality to some classes which cannot be modified (like for example String
or List
).
The other main purpose is that by using an extension function, a lambda can be passed to a function which is then executed in the context of the object – like in the example when the credit card number is masked. The modifyPayment()
call in the example again shows the Kotlin way of moving the lambda out of the parenthesis.
Step 1: internal classes used to access Apache Kafka
In a first step, we create some Kotlin classes to write to and read from Kafka. These classes use the official Kafka client library written in Java. The classes make up a small Kotlin layer around the Kafka client library.
Kafka configuration class
data class Kafka(val bootstrapServers: String)
This first very simple class just stores the configuration for the connection to the Kafka broker. I kept the configurable properties to the absolute minimum to keep the example small.
The Producer clas
class Producer(kafka: Kafka, private val topic: String) {
private val kafkaProducer: KafkaProducer<String, String>
init {
val config = Properties()
config[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = kafka.bootstrapServers
config[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
config[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
kafkaProducer = KafkaProducer(config)
}
fun send(msg: String) {
kafkaProducer.send(ProducerRecord(topic, msg))
}
fun flush() = kafkaProducer.flush()
}
The Producer class is initialized with a Kafka
configuration object and the topic name. In the init
block, a KafkaProducer
from the official client library is instantiated with the necessary values.
The send(msg: String)
method creates a ProducerRecord
and sends it to Kafka, and the flush()
is just passed on to the client library.
The Consumer class
class Consumer(kafka: Kafka, topic: String) {
private val kafkaConsumer: KafkaConsumer<String, String>
@Volatile
var keepGoing = true
init {
val config = Properties()
config[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = kafka.bootstrapServers
config[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
config[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
config[ConsumerConfig.GROUP_ID_CONFIG] = UUID.randomUUID().toString()
kafkaConsumer = KafkaConsumer<String, String>(config).apply {
subscribe(listOf(topic))
}
}
fun consume(handler: (value: String) -> Unit) = Thread(Runnable {
keepGoing = true
kafkaConsumer.use { kc ->
while (keepGoing) {
kc.poll(500)?.forEach {
handler(it?.value() ?: "???")
}
}
}
}).start()
fun stop() {
keepGoing = false
}
}
This class as well is initialized with the Kafka configuration and topic and creates a KafkaConsumer
instance that subscribes to the requested topic. The class has a property named keepGoing
which is set to true when its consume(handler: (value: String) -> Unit)
method is called. This method starts a new thread that polls the Kafka topic and calls the handler method for the retrieved values.
The stop()
method is used to set the keepRunning
flag to false which stops the polling loop and ends the thread.
Applications using these classes
Here are two applications that use these classes to produce and consume data without using the custom DSL; this is just plain Kotlin code:
fun main(args: Array) {
val log: Logger = LoggerFactory.getLogger("de.codecentric.ProducerRunner")
val kafka = Kafka("localhost:9092")
val topic = "kt-topic"
val producer = Producer(kafka, topic)
(1..10).forEach {
val msg = "test message $it ${LocalDateTime.now()}"
log.info("sending $msg")
producer.send(msg)
}
producer.flush()
}
fun main(args: Array) {
val log: Logger = LoggerFactory.getLogger("de.codecentric.ConsumerRunner")
val kafka = Kafka("localhost:9092")
val topic = "kt-topic"
val consumer = Consumer(kafka, topic)
Runtime.getRuntime().addShutdownHook(Thread(Runnable {
consumer.stop()
}))
consumer.consume {
log.info("got $it")
}
}
Step 2: The code defining the DSL
In order to create the DSL using these classes the following code is required:
class KafkaDSL(bootstrapServers: String) {
private val kafka = Kafka(bootstrapServers)
fun producer(topic: String, doProduce: Producer.() -> Unit) =
Producer(kafka, topic).doProduce()
fun consumer(topic: String, doConsume: Consumer.() -> Unit) =
Consumer(kafka, topic).doConsume()
}
fun kafka(bootstrapServers: String, init: KafkaDSL.() -> Unit) =
KafkaDSL(bootstrapServers).init()
This is all that is necessary to create our DSL. I will explain these classes and methods step by step for both our producer and consumer example.
The Producer
kafka("localhost:9092") {
// this is the body of the KafkaDSL.init() function.
// In this block, 'this' is the KafkaDSL object.
}
This call to the global kafka
function creates a KafkaDSL
object and passes the bootstrap servers in the constructor. The KafkaDSL
object uses this information to create a Kafka
configuration object. The second argument – init
– to the kafka()
function is a Kotlin extension function for the KafkaDSL
class which – in Kotlin style – is written after the kafka()
parenthesis. This function is called for the newly created KafkaDSL
object after creation.
kafka("localhost:9092") {
// this is the body of the KafkaDSL.init() function.
// In this block, 'this' is the KafkaDSL object.
producer("kt-topic") {
// this is the body of the Producer.doProduce() function.
// In this block, 'this' is the Producer object
}
}
The KafkaDSL.init()
function, which has been passed to the KafkaDSL
object, calls the method producer("kt-topic"){}
. This is a call to the corresponding method of the KafkaDSL
class, the first parameter being the topic name and the second being an extension function for the Producer
class. This call creates a Producer object with the necessary arguments and then calls the extension function on this Producer object.
kafka("localhost:9092") {
// this is the body of the KafkaDSL.init() function.
// In this block, 'this' is the KafkaDSL object.
producer("kt-topic") {
// this is the body of the Producer.doProduce() function.
// In this block, 'this' is the Producer object
(1..10).forEach {
send("test message $it ${LocalDateTime.now()}") // <- Producer.send()
}
flush() // <- Producer.flush()
}
}
In the body of the Producer.doProduce()
extension function, the loop creating the messages is implemented and the messages are sent with the send()
and flush()
methods of the Producer
object.
The consumer
The code for the consumer is similar to the producer code, so I will just explain the differences. First a KafkaDSL
object is created. The difference lies in the implementation of the KafkaDSL.init()
function:
kafka("localhost:9092") {
// this is the body of the KafkaDSL.init() function.
// In this block, 'this' is the KafkaDSL object.
consumer("kt-topic") {
// this is the body of the Consumer.doConsume() function.
// In this block, 'this' is the Consumer object
}
}
Here the consumer("kt-topic")
method is called which creates a Consumer
object and then calls the Consumer.doConsume()
extension function which is passed in as second argument.
kafka("localhost:9092") {
// this is the body of the KafkaDSL.init() function.
// In this block, 'this' is the KafkaDSL object.
consumer("kt-topic") {
// this is the body of the Consumer.doConsume() function.
// In this block, 'this' is the Consumer object
Runtime.getRuntime().addShutdownHook(Thread(Runnable {
stop() // <- Consumer.stop()
}))
consume { // <- Consumer.consume(handler)
log.info("got $it")
}
}
}
The body of the Consumer.doConsume()
function first installs the shutdownhook and then calls the Consumer
's consume()
method passing in a lambda as handler to process the values read from Kafka.
And this is all that we need to create our small custom DSL.
Conclusion
First we wrote some helper classes in Kotlin to have the desired functionality to write to and read from Apache Kafka. And with only one global method and a helper class with two methods, we implemented our DSL for that. This example shows the potential that lies in Kotlin's language constructs like extension functions or lambda arguments to functions. Even more possibilities are in using operator overloading or infix functions. I refrained from using them here to keep the example small.
Coming from Java, these language features seem strange in the first place, but it's definitely worth working through this stuff.
Hope you enjoyed this excursion into the world of Kotlin.
More articles
fromPeter-Josef Meisch
Your job at codecentric?
Jobs
Agile Developer und Consultant (w/d/m)
Alle Standorte
More articles in this subject area
Discover exciting further topics and let the codecentric world inspire you.
Gemeinsam bessere Projekte umsetzen.
Wir helfen deinem Unternehmen.
Du stehst vor einer großen IT-Herausforderung? Wir sorgen für eine maßgeschneiderte Unterstützung. Informiere dich jetzt.
Hilf uns, noch besser zu werden.
Wir sind immer auf der Suche nach neuen Talenten. Auch für dich ist die passende Stelle dabei.
Blog author
Peter-Josef Meisch
Senio� �.
Do you still have questions? Just send me a message.
Do you still have questions? Just send me a message.