Beliebte Suchanfragen
//

CQRS and ES with Akka

24.8.2016 | 10 minutes of reading time

Somewhere back in December 2015 Heiko Seeberger visited us for a Scala training. At the end of the last day he showed us Akka Persistence . Knowing the concepts of CQRS (Command-Query Responsibility Segregation) and to some extent ES (Event Sourcing), this fired my interests and I decided to build a demo app to try out these concepts.
In this blog post I’ll go through the code of my demo app , explain what I tried to accomplish and the reasoning behind certain decisions. The way I did things might not be the right way and most certainly is not the only way. I’m looking forward to any feedback and improvements!

Short CQRS and ES intro

In this section I will give a quick introduction to the concepts I tried to achieve with my demo app. If you like to know more, I can recommend these resources:

CQRS is all about separating the writing of data (changes) from the reading (querying) of data.

© A CQRS Journey – MicrosoftCQRS has several advantages:

  • Storing and reading of data can be optimized separately
  • Architecture is split up into small components, easy to reason about
  • Good fit for event-based system / task-based UI

Of course there are also the disadvantages:

  • Complexity of the system as a whole increases significantly
  • Architecture often leads to eventual consistency, with its own related issues

The fundamental concept of Event Sourcing is to store all changes to data / state in a sequential, logbook-like manner. By replaying all events it should be possible to reconstruct the state of the system.
Event Sourcing combines well with CQRS as it can be applied for the write-side of the system and can be highly optimized to make storing changes very fast.

The things I wanted to accomplish with my demo app were:

  • Store changes / updates in a logbook-like manner
  • Query a relational database on the read side
  • ‘Physically’ separate the write- and read side and use events to keep them in sync
  • Make sure the system is reliable and consistent to a reasonable extent

I started out with Akka Persistence and Cassandra for the write side based on Heiko’s demo. Camel and RabbitMQ are used to propagate changes from the write side to the SQL database at the read side. On the read side I used Slick in combination with MariaDB to store and query the data. Many thanks to my colleagues for providing some input there 😉

Because the demo app evolved in many small steps I will only show the end result and not all steps in between.

The demo app

First some background for the application. In its current state the application only supports creating new users and querying for a list of users. A user consists of their first name, last name and an email address. The email address must be unique.
The app is reachable via a REST api which, in an early form, is described here: Swagger for Akka HTTP . Everything runs locally with the services (RabbitMQ, Cassandra and MariaDB) running in Docker containers. Remember that the source code is available on Github .

Implementing the read side

Let’s start off with the read side, because it’s the simplest.

The read side will receive updates via RabbitMQ. It should be sufficient to receive the current state for a user, only to persist and then query the data. After all, the write side has already handled the command resulting in the current state. We therefore only receive the User object as an event. As can be seen in the code below, the message also contains a MESSAGE_ID which is persisted together with the user and can be used to prevent handling duplicate messages.
Since updates are not possible yet we check whether the user already exists at the receiving side. If not, we can add this user and confirm to Camel / RabbitMQ that the message was received correctly (the “origSender ! Ack“).

Another noteworthy thing is the “val origSender = sender()” statement. Since Slick returns futures as response for all methods, we need to capture the value for the sending actor to be able to send a response later. Futures are executed within their own ExecutionContext. This is a different context from the one in which the Actor’s Receive method is executed. And since certain Akka values such as the sender are local to their context, they won’t be available later on when the code inside the future is being executed.

1class EventReceiver(userRepository: UserRepository) extends Consumer with ActorSettings with ActorLogging {
2  override def endpointUri: String = settings.rabbitMQ.uri
3  override def autoAck = false
4 
5  override def receive: Receive = {
6    case msg: CamelMessage =>
7      val origSender = sender()
8      val body: Xor[Error, User] = decode[User](msg.bodyAs[String])
9 
10      body.fold({ error =>
11        origSender ! Failure(error)
12      }, { user =>
13        val messageId: Long = msg.headers.get(RabbitMQConstants.MESSAGE_ID) match {
14          case Some(id: Long) => id
15          case Some(id: String) => id.toLong
16          case _ => -1
17        }
18        log.info("Event Received with id {} and for user: {}", messageId, user.email)
19 
20        userRepository.getUserByEmail(user.email).foreach {
21          case Some(_) => log.debug("User with email {} already exists")
22          case None =>
23            userRepository.createUser(UserEntity(messageSeqNr = messageId, userInfo = user)).onComplete {
24              case scala.util.Success(_) => origSender ! Ack // Send ACK when storing User succeeded
25              case scala.util.Failure(t) => log.error(t, "Failed to persist user with email: {}", user.email)
26            }
27        }
28      })
29    case _ => log.warning("Unexpected event received")
30  }
31}

The UserRepository being used contains some methods for persisting and querying the UserEntity persistence object. Note that methods return Futures, making all database actions asynchronous. The nice thing about Slick is that you can interact with the database as if it were a collection.

1class UserRepository(val databaseService: DatabaseService)(implicit executionContext: ExecutionContext)
2    extends UserEntityTable {
3 
4  def getUsers(): Future[Seq[UserEntity]] = db.run(users.result)
5 
6  def getUserByEmail(email: String): Future[Option[UserEntity]] =
7    db.run(users.filter(_.email === email).result.headOption)
8 
9  def createUser(user: UserEntity): Future[Long] = db.run((users returning users.map(_.id)) += user)
10}

The UserEntity is a simple case class extending our existing User object, adding some fields such as the message id and insert / update timestamps. At a later stage these timestamps can help us maintain consistency or help in debugging when records were added or updated.
The id, createdAt and updatedAt fields are of type Option because they are generated by the database.

1final case class UserEntity(
2  id: Option[Long] = None,
3  createdAt: Option[Timestamp] = None,
4  updatedAt: Option[Timestamp] = None,
5  messageSeqNr: Long,
6  userInfo: User
7)

Finally there is the UserEntityTable, an object required by Slick for mapping between database tables and the entity object. Note here the special syntax of “.?” for the optional fields mentioned for the entity above.

1trait UserEntityTable {
2 
3  class Users(tag: Tag) extends Table[UserEntity](tag, "CFE_USERS") {
4    def id = column[Long]("ID", O.PrimaryKey, O.AutoInc)
5    def createdAt =
6      column[Timestamp]("CREATED_AT", SqlType("timestamp not null default CURRENT_TIMESTAMP"))
7    def updatedAt =
8      column[Timestamp](
9        "UPDATED_AT",
10        SqlType("timestamp not null default CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP")
11      )
12    def messageSeqNr = column[Long]("MSG_SEQ_NR")
13    def email = column[String]("EMAIL")
14    def firstName = column[String]("LAST_NAME")
15    def lastName = column[String]("FIRST_NAME")
16 
17    def * =
18      (id.?, createdAt.?, updatedAt.?, messageSeqNr, (email, firstName, lastName)).shaped <> ({
19        case (id, createdAt, updatedAt, messageSeqNr, userInfo) =>
20          UserEntity(id, createdAt, updatedAt, messageSeqNr, User.tupled.apply(userInfo))
21      }, { ue: UserEntity =>
22        def f1(u: User) = User.unapply(u).get
23        Some((ue.id, ue.createdAt, ue.updatedAt, ue.messageSeqNr, f1(ue.userInfo)))
24      })
25 
26    def idx_user = index("idx_user", email, unique = true)
27  }
28  protected val users = TableQuery[Users]
29}

Implementing the write side

The write side consists of three actors: UserAggregate, UserRepository and EventSender. The UserAggregate will receive a command – in our case only to add new users – and decide how to handle it. The command to add the user is persisted only when the user does not yet exist and an event is sent to inform the read side.

The UserAggregate mixes in the AtLeastOnceDelivery trait. This makes it a special type of PersistentActor which will expect sent messages to be confirmed. In the updateState function, the deliver function is being called to send an event to a specific Actor. A delivery id is being generated to later confirm that the sending was successful. By persisting all messages that are sent and received, the system is made reliable and can pick up where it left off even in the event of a crash. See Akka – AtLeastOnceDelivery for details about this trait and how it works.

Besides having to capture certain values when working with Futures within Actors, the persist function needs even more attention. This function must also be executed within the Actor’s ExecutionContext, which makes it hard to combine with the ask pattern. For that purpose the GetUsersForwardResponse case class was created.
When we receive a response from the UserRepository Actor, this result is piped to ourself. That way it is received as a separate message which is executed within the Actor’s own ExecutionContext. From there we can safely persist any objects.

1object UserAggregate {
2  sealed trait Evt
3  final case class MsgAddUser(u: User) extends Evt
4  final case class MsgConfirmed(deliveryId: Long) extends Evt
5  final case class GetUsersForwardResponse(senderActor: ActorRef, existingUsers: Set[User], newUser: User)
6}
7 
8class UserAggregate extends PersistentActor with AtLeastOnceDelivery with ActorLogging {
9  override def receiveCommand: Receive = {
10    case AddUserCmd(newUser) =>
11      val origSender = sender()
12      val usersFuture = userRepository ? GetUsers
13      pipe(usersFuture.mapTo[Set[User]].map(GetUsersForwardResponse(origSender, _, newUser))) to self
14 
15    case GetUsersForwardResponse(origSender, users, newUser) =>
16      if (users.exists(_.email == newUser.email)) {
17        origSender ! UserExistsResp(newUser)
18      } else {
19        persist(MsgAddUser(newUser)) { persistedMsg =>
20          updateState(persistedMsg)
21          origSender ! UserAddedResp(newUser)
22        }
23      }
24 
25    case ConfirmAddUser(deliveryId) =>
26      persist(MsgConfirmed(deliveryId))(updateState)
27    case Confirm(deliveryId) =>
28      persist(MsgConfirmed(deliveryId))(updateState)
29  }
30 
31  override def receiveRecover: Receive = {
32    case evt: Evt => updateState(evt)
33  }
34 
35  def updateState(evt: Evt): Unit = evt match {
36    case MsgAddUser(u) =>
37      deliver(eventSender.path)(deliveryId => Msg(deliveryId, u))
38      deliver(userRepository.path)(deliveryId => AddUser(deliveryId, u))
39    case MsgConfirmed(deliveryId) =>
40      confirmDelivery(deliveryId)
41  }
42}

The UserRepository is really simple, in the sense that it receives an AddUser command and just adds the user to its set. All necessary checks should have been performed by the UserAggregate already.

1class UserRepository extends PersistentActor with ActorLogging {
2  private var users = Set.empty[User]
3 
4  override def receiveCommand: Receive = {
5    case GetUsers =>
6      sender() ! users
7    case AddUser(id, user) =>
8      log.info(s"Adding $id new user with email; ${user.email}")
9      persist(user) { persistedUser =>
10        receiveRecover(persistedUser)
11        sender() ! ConfirmAddUser(id)
12      }
13  }
14 
15  override def receiveRecover: Receive = {
16    case user: User => users += user
17  }
18}

Finally the EventSender actually consists of two Actors, because we are using Camel. The CamelSender is needed to set the endpoint uri and override some defaults. In the EventSender we have some logic to convert the User object to JSON and keep track of unconfirmed messages. As the UserAggregate expects a confirmation message and the Camel confirmation message is sent separately we use a map to keep track of our senders that need to receive a confirmation.

1class EventSender extends Actor with ActorLogging {
2  private var unconfirmed = immutable.SortedMap.empty[Long, ActorPath]
3 
4  override def receive: Receive = {
5    case Msg(deliveryId, user) =>
6      log.info("Sending msg for user: {}", user.email)
7      unconfirmed = unconfirmed.updated(deliveryId, sender().path)
8      val headersMap = Map(RabbitMQConstants.MESSAGE_ID -> deliveryId, RabbitMQConstants.CORRELATIONID -> deliveryId)
9      camelSender ! CamelMessage(user.asJson.noSpaces, headersMap)
10 
11    case CamelMessage(_, headers) =>
12      val deliveryId: Long = headers.getOrElse(RabbitMQConstants.MESSAGE_ID, -1L).asInstanceOf[Long]
13      log.info("Event successfully delivered for id {}, sending confirmation", deliveryId)
14      unconfirmed
15        .get(deliveryId)
16        .foreach(
17          senderActor => {
18            unconfirmed -= deliveryId
19            context.actorSelection(senderActor) ! Confirm(deliveryId)
20          }
21        )
22 
23    case Status.Failure(ex) =>
24      log.error("Event delivery failed. Reason: {}", ex.toString)
25  }
26}
27 
28class CamelSender extends Actor with Producer with ActorSettings {
29  override def endpointUri: String = settings.rabbitMQ.uri
30 
31  override def headersToCopy: Set[String] =
32    super.headersToCopy + RabbitMQConstants.CORRELATIONID + RabbitMQConstants.MESSAGE_ID
33}

Disclaimer

The examples I showed you here are loosely based on Heiko Seeberger’s reactive-flows project regarding the Akka HTTP setup. A demo project can be easily generated with SBT-Fresh sbt-fresh .

Hope you liked this blog and I would love to hear your comments!

share post

//

More articles in this subject area

Discover exciting further topics and let the codecentric world inspire you.

//

Gemeinsam bessere Projekte umsetzen.

Wir helfen deinem Unternehmen.

Du stehst vor einer großen IT-Herausforderung? Wir sorgen für eine maßgeschneiderte Unterstützung. Informiere dich jetzt.

Hilf uns, noch besser zu werden.

Wir sind immer auf der Suche nach neuen Talenten. Auch für dich ist die passende Stelle dabei.