Lately I’ve been much into event driven architectures because I believe it’s the best approach for microservices, allowing for much more decoupled services than point-to-point communication. There are two main approaches for event driven communication:
- Feed: Each application has a (synchronous) endpoint anybody may pull domain events from in a feed fashion.
- Broker: There is a dedicated broker responsible for distributing the events, like Kafka.
Each approach has its up- and downsides. With a broker you have more infrastructure to handle, but you also have a central place where your events are stored. Feeds are not accessible when the producing application is down. Scaling is easier with a broker – what happens if you suddenly need to double your consuming applications because of load? Who subcribes to the feed? If both subscribe, events are processed twice. With a broker like Kafka you easily create consumer groups, and each event is only processed by one application of this group. So we preferred the broker way, and we decided to use Kafka.
So far so good – but we were impatient. We wanted to learn about event driven architectures, we didn’t want to spend weeks fighting with Kafka. And there came Spring Cloud Stream to the rescue.
Yes, we spent a little time setting up our own little playground with docker-compose, including Kafka and Zookeeper of course, but also Spring Cloud Config, Spring Boot Admin and an integrated Continuous Delivery setup with Jenkins, Nexus and Sonar. You can find it here: https://github.com/codecentric/event-driven-microservices-platform . Then we thought that the tough part would come – connecting to and using Kafka. We stumbled over Spring Cloud Stream – and using Kafka was a matter of minutes.
Dependencies
You need to add one dependency to your pom:
1<dependency> 2 <groupId>org.springframework.cloud</groupId> 3 <artifactId>spring-cloud-starter-stream-kafka</artifactId> 4 </dependency>
As parent I use the spring-cloud-starter-parent
in the most current version (at time of writing Brixton.RC1
). It solves all the version management for me.
1<parent> 2 <groupId>org.springframework.cloud</groupId> 3 <artifactId>spring-cloud-starter-parent</artifactId> 4 <version>Brixton.RC1</version> 5 </parent>
When using Actuator, Spring Cloud Stream automatically adds a HealthIndicator
for the Kafka binder, and a new actuator endpoint /channels
with all the channels used in the application.
Producing events
In our sample application we produce one event every 10 seconds with a Poller.
1@SpringBootApplication
2@EnableBinding(Source.class)
3public class EdmpSampleStreamApplication {
4
5 public static void main(String[] args) {
6 SpringApplication.run(EdmpSampleStreamApplication.class, args);
7 }
8
9 @Bean
10 @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1"))
11 public MessageSource<TimeInfo> timerMessageSource() {
12 return () -> MessageBuilder.withPayload(new TimeInfo(new Date().getTime()+"","Label")).build();
13 }
14
15 public static class TimeInfo{
16
17 private String time;
18 private String label;
19
20 public TimeInfo(String time, String label) {
21 super();
22 this.time = time;
23 this.label = label;
24 }
25
26 public String getTime() {
27 return time;
28 }
29
30 public String getLabel() {
31 return label;
32 }
33
34 }
35
36}
When using @EnableBinding(Source.class)
Spring Cloud Stream automatically creates a message channel with the name output
which is used by the @InboundChannelAdapter
. You may also autowire this message channel and write messages to it manually. Our application.properties looks like this:
spring.cloud.stream.bindings.output.destination=timerTopic
spring.cloud.stream.bindings.output.content-type=application/json
spring.cloud.stream.kafka.binder.zkNodes=kafka
spring.cloud.stream.kafka.binder.brokers=kafka
It basically says that we want to bind the output message channel to the Kafka timerTopic
, and it says that we want to serialize the payload into JSON. And then we need to tell Spring Cloud Stream the host name where Kafka and Zookeeper are running – defaults are localhost
, we are running them in one Docker container named kafka
.
Consuming events
Our sample application for consuming events looks like this:
1@SpringBootApplication
2@EnableBinding(Sink.class)
3public class EdmpSampleStreamSinkApplication {
4
5 private static Logger logger = LoggerFactory.getLogger(EdmpSampleStreamSinkApplication.class);
6
7 public static void main(String[] args) {
8 SpringApplication.run(EdmpSampleStreamSinkApplication.class, args);
9 }
10
11 @StreamListener(Sink.INPUT)
12 public void loggerSink(SinkTimeInfo sinkTimeInfo) {
13 logger.info("Received: " + sinkTimeInfo.toString());
14 }
15
16 public static class SinkTimeInfo{
17
18 private String time;
19 private String label;
20
21 public String getTime() {
22 return time;
23 }
24
25 public void setTime(String time) {
26 this.time = time;
27 }
28
29 public void setSinkLabel(String label) {
30 this.label = label;
31 }
32
33 public String getLabel() {
34 return label;
35 }
36
37 @Override
38 public String toString() {
39 return "SinkTimeInfo [time=" + time + ", label=" + label + "]";
40 }
41
42 }
43
44}
When using @EnableBinding(Sink.class)
Spring Cloud Stream automatically creates a message channel with the name input
which is used by the @StreamListener
above. Our application.properties look like this:
spring.cloud.stream.bindings.input.destination=timerTopic
spring.cloud.stream.bindings.input.content-type=application/json
spring.cloud.stream.bindings.input.group=timerGroup
spring.cloud.stream.kafka.bindings.input.consumer.resetOffsets=true
spring.cloud.stream.kafka.binder.zkNodes=kafka
spring.cloud.stream.kafka.binder.brokers=kafka
We see the binding of input
to timerTopic
, then we see the content-type we expect. Note that we don’t share the class with the producing application – we just deserialize the content in a class of our own.
Then we specify the consumer group this application belongs to – so if another instance of this application is deployed, events are distributed among all instances.
For development purposes we set resetOffsets
of the channel input
to true which means that on new deployment, all events are processed again because the Kafka offset is reset. It could also be a strategy to do that on every startup – having all the state just in memory – and in Kafka. Then, of course, consumer groups don’t make sense, and processing the events should not create other events – consuming the events is just used to create an internal state.
Conclusion
What can I say? Spring Cloud Stream was really easy to use, and I will certainly do that in the future. If you want to try it out for yourself with a real Kafka, I can point you again to https://github.com/codecentric/event-driven-microservices-platform .
Install Docker Toolbox , then do this:
$ docker-machine create -d virtualbox --virtualbox-memory "6000" --virtualbox-disk-size "40000" default
$ eval "$(docker-machine env default)"
$ git clone git@github.com:codecentric/event-driven-microservices-platform.git
$ cd event-driven-microservices-platform
$ docker-compose up
Now get a coffee, have a chat with the colleagues, or surf around the internet while Docker is downloading it. Then go to http://${docker-machine ip default}:18080/
and you should see something like this:
Then go to Spring Boot Admin at http://${docker-machine ip default}:10001/
and you should see something like this:
And if you take a look at the logs of edmp-sample-stream-sink you’ll see the events coming in.
More articles
fromTobias Flohre
Your job at codecentric?
Jobs
Agile Developer und Consultant (w/d/m)
Alle Standorte
Gemeinsam bessere Projekte umsetzen.
Wir helfen deinem Unternehmen.
Du stehst vor einer großen IT-Herausforderung? Wir sorgen für eine maßgeschneiderte Unterstützung. Informiere dich jetzt.
Hilf uns, noch besser zu werden.
Wir sind immer auf der Suche nach neuen Talenten. Auch für dich ist die passende Stelle dabei.
Blog author
Tobias Flohre
Senior Software Developer
Do you still have questions? Just send me a message.
Do you still have questions? Just send me a message.